chore(parsers.prometheus): Consolidation v1 and v2 parsers (#14580)
This commit is contained in:
parent
4cff76685c
commit
b638c886f7
1
go.mod
1
go.mod
|
|
@ -130,7 +130,6 @@ require (
|
||||||
github.com/linkedin/goavro/v2 v2.12.0
|
github.com/linkedin/goavro/v2 v2.12.0
|
||||||
github.com/logzio/azure-monitor-metrics-receiver v1.0.1
|
github.com/logzio/azure-monitor-metrics-receiver v1.0.1
|
||||||
github.com/lxc/lxd v0.0.0-20220920163450-e9b4b514106a
|
github.com/lxc/lxd v0.0.0-20220920163450-e9b4b514106a
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.4
|
|
||||||
github.com/mdlayher/apcupsd v0.0.0-20220319200143-473c7b5f3c6a
|
github.com/mdlayher/apcupsd v0.0.0-20220319200143-473c7b5f3c6a
|
||||||
github.com/mdlayher/vsock v1.2.1
|
github.com/mdlayher/vsock v1.2.1
|
||||||
github.com/microsoft/ApplicationInsights-Go v0.4.4
|
github.com/microsoft/ApplicationInsights-Go v0.4.4
|
||||||
|
|
|
||||||
2
go.sum
2
go.sum
|
|
@ -1761,8 +1761,6 @@ github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4
|
||||||
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
|
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
|
||||||
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
|
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
|
|
||||||
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
|
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
|
||||||
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
|
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
|
||||||
github.com/mdlayher/apcupsd v0.0.0-20220319200143-473c7b5f3c6a h1:JOlLsLUQnokTyWWwEvOVoKH3XUl6oDMP8jisO54l6J8=
|
github.com/mdlayher/apcupsd v0.0.0-20220319200143-473c7b5f3c6a h1:JOlLsLUQnokTyWWwEvOVoKH3XUl6oDMP8jisO54l6J8=
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ import (
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ValueType(mt dto.MetricType) telegraf.ValueType {
|
func mapValueType(mt dto.MetricType) telegraf.ValueType {
|
||||||
switch mt {
|
switch mt {
|
||||||
case dto.MetricType_COUNTER:
|
case dto.MetricType_COUNTER:
|
||||||
return telegraf.Counter
|
return telegraf.Counter
|
||||||
|
|
@ -20,7 +20,7 @@ func ValueType(mt dto.MetricType) telegraf.ValueType {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTagsFromLabels(m *dto.Metric, defaultTags map[string]string) map[string]string {
|
func getTagsFromLabels(m *dto.Metric, defaultTags map[string]string) map[string]string {
|
||||||
result := map[string]string{}
|
result := map[string]string{}
|
||||||
|
|
||||||
for key, value := range defaultTags {
|
for key, value := range defaultTags {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,85 @@
|
||||||
|
package prometheus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (p *Parser) extractMetricsV1(prommetrics *dto.MetricFamily) []telegraf.Metric {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Convert each prometheus metrics to the corresponding telegraf metrics.
|
||||||
|
// You will get one telegraf metric with one field per prometheus metric
|
||||||
|
// for "simple" types like Gauge and Counter but a telegraf metric with
|
||||||
|
// multiple fields for "complex" types like Summary or Histogram.
|
||||||
|
var metrics []telegraf.Metric
|
||||||
|
metricName := prommetrics.GetName()
|
||||||
|
metricType := prommetrics.GetType()
|
||||||
|
for _, pm := range prommetrics.Metric {
|
||||||
|
// Extract the timestamp of the metric if it exists and should
|
||||||
|
// not be ignored.
|
||||||
|
t := now
|
||||||
|
if ts := pm.GetTimestampMs(); !p.IgnoreTimestamp && ts > 0 {
|
||||||
|
t = time.UnixMilli(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert the labels to tags
|
||||||
|
tags := getTagsFromLabels(pm, p.DefaultTags)
|
||||||
|
|
||||||
|
// Construct the metrics
|
||||||
|
switch metricType {
|
||||||
|
case dto.MetricType_SUMMARY:
|
||||||
|
summary := pm.GetSummary()
|
||||||
|
|
||||||
|
// Collect the fields
|
||||||
|
fields := make(map[string]interface{}, len(summary.Quantile)+2)
|
||||||
|
fields["count"] = float64(summary.GetSampleCount())
|
||||||
|
fields["sum"] = summary.GetSampleSum()
|
||||||
|
for _, q := range summary.Quantile {
|
||||||
|
if v := q.GetValue(); !math.IsNaN(v) {
|
||||||
|
fname := strconv.FormatFloat(q.GetQuantile(), 'g', -1, 64)
|
||||||
|
fields[fname] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
metrics = append(metrics, metric.New(metricName, tags, fields, t, telegraf.Summary))
|
||||||
|
case dto.MetricType_HISTOGRAM:
|
||||||
|
histogram := pm.GetHistogram()
|
||||||
|
|
||||||
|
// Collect the fields
|
||||||
|
fields := make(map[string]interface{}, len(histogram.Bucket)+2)
|
||||||
|
fields["count"] = float64(pm.GetHistogram().GetSampleCount())
|
||||||
|
fields["sum"] = pm.GetHistogram().GetSampleSum()
|
||||||
|
for _, b := range histogram.Bucket {
|
||||||
|
fname := strconv.FormatFloat(b.GetUpperBound(), 'g', -1, 64)
|
||||||
|
fields[fname] = float64(b.GetCumulativeCount())
|
||||||
|
}
|
||||||
|
metrics = append(metrics, metric.New(metricName, tags, fields, t, telegraf.Histogram))
|
||||||
|
default:
|
||||||
|
var fname string
|
||||||
|
var v float64
|
||||||
|
if gauge := pm.GetGauge(); gauge != nil {
|
||||||
|
fname = "gauge"
|
||||||
|
v = gauge.GetValue()
|
||||||
|
} else if counter := pm.GetCounter(); counter != nil {
|
||||||
|
fname = "counter"
|
||||||
|
v = counter.GetValue()
|
||||||
|
} else if untyped := pm.GetUntyped(); untyped != nil {
|
||||||
|
fname = "value"
|
||||||
|
v = untyped.GetValue()
|
||||||
|
}
|
||||||
|
if fname != "" && !math.IsNaN(v) {
|
||||||
|
fields := map[string]interface{}{fname: v}
|
||||||
|
vtype := mapValueType(metricType)
|
||||||
|
metrics = append(metrics, metric.New(metricName, tags, fields, t, vtype))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,107 @@
|
||||||
|
package prometheus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (p *Parser) extractMetricsV2(prommetrics *dto.MetricFamily) []telegraf.Metric {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Convert each prometheus metric to a corresponding telegraf metric
|
||||||
|
// with one field each. The process will filter NaNs in values and skip
|
||||||
|
// the corresponding metrics.
|
||||||
|
var metrics []telegraf.Metric
|
||||||
|
metricName := prommetrics.GetName()
|
||||||
|
metricType := prommetrics.GetType()
|
||||||
|
for _, pm := range prommetrics.Metric {
|
||||||
|
// Extract the timestamp of the metric if it exists and should
|
||||||
|
// not be ignored.
|
||||||
|
t := now
|
||||||
|
if ts := pm.GetTimestampMs(); !p.IgnoreTimestamp && ts > 0 {
|
||||||
|
t = time.UnixMilli(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert the labels to tags
|
||||||
|
tags := getTagsFromLabels(pm, p.DefaultTags)
|
||||||
|
|
||||||
|
// Construct the metrics
|
||||||
|
switch metricType {
|
||||||
|
case dto.MetricType_SUMMARY:
|
||||||
|
summary := pm.GetSummary()
|
||||||
|
|
||||||
|
// Add an overall metric containing the number of samples and and its sum
|
||||||
|
summaryFields := make(map[string]interface{})
|
||||||
|
summaryFields[metricName+"_count"] = float64(summary.GetSampleCount())
|
||||||
|
summaryFields[metricName+"_sum"] = summary.GetSampleSum()
|
||||||
|
metrics = append(metrics, metric.New("prometheus", tags, summaryFields, t, telegraf.Summary))
|
||||||
|
|
||||||
|
// Add one metric per quantile
|
||||||
|
for _, q := range summary.Quantile {
|
||||||
|
quantileTags := tags
|
||||||
|
quantileTags["quantile"] = strconv.FormatFloat(q.GetQuantile(), 'g', -1, 64)
|
||||||
|
quantileFields := map[string]interface{}{
|
||||||
|
metricName: q.GetValue(),
|
||||||
|
}
|
||||||
|
m := metric.New("prometheus", quantileTags, quantileFields, t, telegraf.Summary)
|
||||||
|
metrics = append(metrics, m)
|
||||||
|
}
|
||||||
|
case dto.MetricType_HISTOGRAM:
|
||||||
|
histogram := pm.GetHistogram()
|
||||||
|
|
||||||
|
// Add an overall metric containing the number of samples and and its sum
|
||||||
|
histFields := make(map[string]interface{})
|
||||||
|
histFields[metricName+"_count"] = float64(histogram.GetSampleCount())
|
||||||
|
histFields[metricName+"_sum"] = histogram.GetSampleSum()
|
||||||
|
metrics = append(metrics, metric.New("prometheus", tags, histFields, t, telegraf.Histogram))
|
||||||
|
|
||||||
|
// Add one metric per histogram bucket
|
||||||
|
var infSeen bool
|
||||||
|
for _, b := range histogram.Bucket {
|
||||||
|
bucketTags := tags
|
||||||
|
bucketTags["le"] = strconv.FormatFloat(b.GetUpperBound(), 'g', -1, 64)
|
||||||
|
bucketFields := map[string]interface{}{
|
||||||
|
metricName + "_bucket": float64(b.GetCumulativeCount()),
|
||||||
|
}
|
||||||
|
m := metric.New("prometheus", bucketTags, bucketFields, t, telegraf.Histogram)
|
||||||
|
metrics = append(metrics, m)
|
||||||
|
|
||||||
|
// Record if any of the buckets marks an infinite upper bound
|
||||||
|
infSeen = infSeen || math.IsInf(b.GetUpperBound(), +1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Infinity bucket is required for proper function of histogram in prometheus
|
||||||
|
if !infSeen {
|
||||||
|
infTags := tags
|
||||||
|
infTags["le"] = "+Inf"
|
||||||
|
infFields := map[string]interface{}{
|
||||||
|
metricName + "_bucket": float64(histogram.GetSampleCount()),
|
||||||
|
}
|
||||||
|
m := metric.New("prometheus", infTags, infFields, t, telegraf.Histogram)
|
||||||
|
metrics = append(metrics, m)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
v := math.Inf(1)
|
||||||
|
if gauge := pm.GetGauge(); gauge != nil {
|
||||||
|
v = gauge.GetValue()
|
||||||
|
} else if counter := pm.GetCounter(); counter != nil {
|
||||||
|
v = counter.GetValue()
|
||||||
|
} else if untyped := pm.GetUntyped(); untyped != nil {
|
||||||
|
v = untyped.GetValue()
|
||||||
|
}
|
||||||
|
if !math.IsNaN(v) {
|
||||||
|
fields := map[string]interface{}{metricName: v}
|
||||||
|
vtype := mapValueType(metricType)
|
||||||
|
metrics = append(metrics, metric.New("prometheus", tags, fields, t, vtype))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics
|
||||||
|
}
|
||||||
|
|
@ -1,11 +1,16 @@
|
||||||
package prometheus
|
package prometheus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
"github.com/prometheus/common/expfmt"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Parser struct {
|
type Parser struct {
|
||||||
|
|
@ -13,20 +18,67 @@ type Parser struct {
|
||||||
MetricVersion int `toml:"prometheus_metric_version"`
|
MetricVersion int `toml:"prometheus_metric_version"`
|
||||||
Header http.Header `toml:"-"` // set by the prometheus input
|
Header http.Header `toml:"-"` // set by the prometheus input
|
||||||
DefaultTags map[string]string `toml:"-"`
|
DefaultTags map[string]string `toml:"-"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Parser) SetDefaultTags(tags map[string]string) {
|
func (p *Parser) SetDefaultTags(tags map[string]string) {
|
||||||
p.DefaultTags = tags
|
p.DefaultTags = tags
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
func (p *Parser) Parse(data []byte) ([]telegraf.Metric, error) {
|
||||||
switch p.MetricVersion {
|
// Make sure we have a finishing newline but no trailing one
|
||||||
case 0, 2:
|
data = bytes.TrimPrefix(data, []byte("\n"))
|
||||||
return p.parseV2(buf)
|
if !bytes.HasSuffix(data, []byte("\n")) {
|
||||||
case 1:
|
data = append(data, []byte("\n")...)
|
||||||
return p.parseV1(buf)
|
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("unknown prometheus metric version %d", p.MetricVersion)
|
buf := bytes.NewBuffer(data)
|
||||||
|
|
||||||
|
// Determine the metric transport-type derived from the response header and
|
||||||
|
// create a matching decoder.
|
||||||
|
format := expfmt.ResponseFormat(p.Header)
|
||||||
|
if format == expfmt.FmtUnknown {
|
||||||
|
p.Log.Warnf("Unknown format %q... Trying to continue...", p.Header.Get("Content-Type"))
|
||||||
|
}
|
||||||
|
decoder := expfmt.NewDecoder(buf, format)
|
||||||
|
|
||||||
|
// Decode the input data into prometheus metrics
|
||||||
|
var metrics []telegraf.Metric
|
||||||
|
for {
|
||||||
|
var mf dto.MetricFamily
|
||||||
|
if err := decoder.Decode(&mf); err != nil {
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("decoding response failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch p.MetricVersion {
|
||||||
|
case 0, 2:
|
||||||
|
metrics = append(metrics, p.extractMetricsV2(&mf)...)
|
||||||
|
case 1:
|
||||||
|
metrics = append(metrics, p.extractMetricsV1(&mf)...)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown prometheus metric version %d", p.MetricVersion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return metrics, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||||
|
metrics, err := p.Parse([]byte(line))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(metrics) < 1 {
|
||||||
|
return nil, fmt.Errorf("no metrics in line")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(metrics) > 1 {
|
||||||
|
return nil, fmt.Errorf("more than one metric in line")
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
||||||
|
|
@ -1,147 +0,0 @@
|
||||||
package prometheus
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"bytes"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"math"
|
|
||||||
"mime"
|
|
||||||
"net/http"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
|
||||||
dto "github.com/prometheus/client_model/go"
|
|
||||||
"github.com/prometheus/common/expfmt"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"github.com/influxdata/telegraf/metric"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (p *Parser) parseV1(buf []byte) ([]telegraf.Metric, error) {
|
|
||||||
var parser expfmt.TextParser
|
|
||||||
var metrics []telegraf.Metric
|
|
||||||
var err error
|
|
||||||
// parse even if the buffer begins with a newline
|
|
||||||
buf = bytes.TrimPrefix(buf, []byte("\n"))
|
|
||||||
// Read raw data
|
|
||||||
buffer := bytes.NewBuffer(buf)
|
|
||||||
reader := bufio.NewReader(buffer)
|
|
||||||
|
|
||||||
// Prepare output
|
|
||||||
metricFamilies := make(map[string]*dto.MetricFamily)
|
|
||||||
|
|
||||||
if isProtobuf(p.Header) {
|
|
||||||
for {
|
|
||||||
mf := &dto.MetricFamily{}
|
|
||||||
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
|
|
||||||
if errors.Is(ierr, io.EOF) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("reading metric family protocol buffer failed: %w", ierr)
|
|
||||||
}
|
|
||||||
metricFamilies[mf.GetName()] = mf
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
metricFamilies, err = parser.TextToMetricFamilies(reader)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("reading text format failed: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
// read metrics
|
|
||||||
for metricName, mf := range metricFamilies {
|
|
||||||
for _, m := range mf.Metric {
|
|
||||||
// reading tags
|
|
||||||
tags := GetTagsFromLabels(m, p.DefaultTags)
|
|
||||||
|
|
||||||
// reading fields
|
|
||||||
var fields map[string]interface{}
|
|
||||||
if mf.GetType() == dto.MetricType_SUMMARY {
|
|
||||||
// summary metric
|
|
||||||
fields = makeQuantilesV1(m)
|
|
||||||
fields["count"] = float64(m.GetSummary().GetSampleCount())
|
|
||||||
//nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40
|
|
||||||
fields["sum"] = float64(m.GetSummary().GetSampleSum())
|
|
||||||
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
|
|
||||||
// histogram metric
|
|
||||||
fields = makeBucketsV1(m)
|
|
||||||
fields["count"] = float64(m.GetHistogram().GetSampleCount())
|
|
||||||
//nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40
|
|
||||||
fields["sum"] = float64(m.GetHistogram().GetSampleSum())
|
|
||||||
} else {
|
|
||||||
// standard metric
|
|
||||||
fields = getNameAndValueV1(m)
|
|
||||||
}
|
|
||||||
// converting to telegraf metric
|
|
||||||
if len(fields) > 0 {
|
|
||||||
var t time.Time
|
|
||||||
if !p.IgnoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 {
|
|
||||||
t = time.Unix(0, *m.TimestampMs*1000000)
|
|
||||||
} else {
|
|
||||||
t = now
|
|
||||||
}
|
|
||||||
m := metric.New(metricName, tags, fields, t, ValueType(mf.GetType()))
|
|
||||||
metrics = append(metrics, m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return metrics, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func isProtobuf(header http.Header) bool {
|
|
||||||
mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return mediatype == "application/vnd.google.protobuf" &&
|
|
||||||
params["encoding"] == "delimited" &&
|
|
||||||
params["proto"] == "io.prometheus.client.MetricFamily"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get Quantiles from summary metric
|
|
||||||
func makeQuantilesV1(m *dto.Metric) map[string]interface{} {
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
for _, q := range m.GetSummary().Quantile {
|
|
||||||
if !math.IsNaN(q.GetValue()) {
|
|
||||||
//nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40
|
|
||||||
fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fields
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get Buckets from histogram metric
|
|
||||||
func makeBucketsV1(m *dto.Metric) map[string]interface{} {
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
for _, b := range m.GetHistogram().Bucket {
|
|
||||||
fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount())
|
|
||||||
}
|
|
||||||
return fields
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get name and value from metric
|
|
||||||
func getNameAndValueV1(m *dto.Metric) map[string]interface{} {
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
if m.Gauge != nil {
|
|
||||||
if !math.IsNaN(m.GetGauge().GetValue()) {
|
|
||||||
//nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40
|
|
||||||
fields["gauge"] = float64(m.GetGauge().GetValue())
|
|
||||||
}
|
|
||||||
} else if m.Counter != nil {
|
|
||||||
if !math.IsNaN(m.GetCounter().GetValue()) {
|
|
||||||
//nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40
|
|
||||||
fields["counter"] = float64(m.GetCounter().GetValue())
|
|
||||||
}
|
|
||||||
} else if m.Untyped != nil {
|
|
||||||
if !math.IsNaN(m.GetUntyped().GetValue()) {
|
|
||||||
//nolint:unconvert // Conversion may be needed for float64 https://github.com/mdempsky/unconvert/issues/40
|
|
||||||
fields["value"] = float64(m.GetUntyped().GetValue())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fields
|
|
||||||
}
|
|
||||||
|
|
@ -1,197 +0,0 @@
|
||||||
package prometheus
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"bytes"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"math"
|
|
||||||
"mime"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
|
||||||
dto "github.com/prometheus/client_model/go"
|
|
||||||
"github.com/prometheus/common/expfmt"
|
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
|
||||||
"github.com/influxdata/telegraf/metric"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (p *Parser) parseV2(buf []byte) ([]telegraf.Metric, error) {
|
|
||||||
var parser expfmt.TextParser
|
|
||||||
var metrics []telegraf.Metric
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// Make sure we have a finishing newline but no trailing one
|
|
||||||
buf = bytes.TrimPrefix(buf, []byte("\n"))
|
|
||||||
if !bytes.HasSuffix(buf, []byte("\n")) {
|
|
||||||
buf = append(buf, []byte("\n")...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read raw data
|
|
||||||
buffer := bytes.NewBuffer(buf)
|
|
||||||
reader := bufio.NewReader(buffer)
|
|
||||||
|
|
||||||
// Prepare output
|
|
||||||
metricFamilies := make(map[string]*dto.MetricFamily)
|
|
||||||
mediatype, params, err := mime.ParseMediaType(p.Header.Get("Content-Type"))
|
|
||||||
if err == nil && mediatype == "application/vnd.google.protobuf" &&
|
|
||||||
params["encoding"] == "delimited" &&
|
|
||||||
params["proto"] == "io.prometheus.client.MetricFamily" {
|
|
||||||
for {
|
|
||||||
mf := &dto.MetricFamily{}
|
|
||||||
if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil {
|
|
||||||
if errors.Is(ierr, io.EOF) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("reading metric family protocol buffer failed: %w", ierr)
|
|
||||||
}
|
|
||||||
metricFamilies[mf.GetName()] = mf
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
metricFamilies, err = parser.TextToMetricFamilies(reader)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("reading text format failed: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
// read metrics
|
|
||||||
for metricName, mf := range metricFamilies {
|
|
||||||
for _, m := range mf.Metric {
|
|
||||||
// reading tags
|
|
||||||
tags := GetTagsFromLabels(m, p.DefaultTags)
|
|
||||||
t := p.getTimestampV2(m, now)
|
|
||||||
|
|
||||||
if mf.GetType() == dto.MetricType_SUMMARY {
|
|
||||||
// summary metric
|
|
||||||
telegrafMetrics := makeQuantilesV2(m, tags, metricName, mf.GetType(), t)
|
|
||||||
metrics = append(metrics, telegrafMetrics...)
|
|
||||||
} else if mf.GetType() == dto.MetricType_HISTOGRAM {
|
|
||||||
// histogram metric
|
|
||||||
telegrafMetrics := makeBucketsV2(m, tags, metricName, mf.GetType(), t)
|
|
||||||
metrics = append(metrics, telegrafMetrics...)
|
|
||||||
} else {
|
|
||||||
// standard metric
|
|
||||||
// reading fields
|
|
||||||
fields := getNameAndValueV2(m, metricName)
|
|
||||||
// converting to telegraf metric
|
|
||||||
if len(fields) > 0 {
|
|
||||||
m := metric.New("prometheus", tags, fields, t, ValueType(mf.GetType()))
|
|
||||||
metrics = append(metrics, m)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return metrics, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
|
||||||
metrics, err := p.Parse([]byte(line))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(metrics) < 1 {
|
|
||||||
return nil, fmt.Errorf("no metrics in line")
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(metrics) > 1 {
|
|
||||||
return nil, fmt.Errorf("more than one metric in line")
|
|
||||||
}
|
|
||||||
|
|
||||||
return metrics[0], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get Quantiles for summary metric & Buckets for histogram
|
|
||||||
func makeQuantilesV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric {
|
|
||||||
metrics := make([]telegraf.Metric, 0, len(m.GetSummary().Quantile)+1)
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
|
|
||||||
fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount())
|
|
||||||
fields[metricName+"_sum"] = m.GetSummary().GetSampleSum()
|
|
||||||
met := metric.New("prometheus", tags, fields, t, ValueType(metricType))
|
|
||||||
metrics = append(metrics, met)
|
|
||||||
|
|
||||||
for _, q := range m.GetSummary().Quantile {
|
|
||||||
newTags := tags
|
|
||||||
fields = make(map[string]interface{})
|
|
||||||
|
|
||||||
newTags["quantile"] = fmt.Sprint(q.GetQuantile())
|
|
||||||
fields[metricName] = q.GetValue()
|
|
||||||
|
|
||||||
quantileMetric := metric.New("prometheus", newTags, fields, t, ValueType(metricType))
|
|
||||||
metrics = append(metrics, quantileMetric)
|
|
||||||
}
|
|
||||||
return metrics
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get Buckets from histogram metric
|
|
||||||
func makeBucketsV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, t time.Time) []telegraf.Metric {
|
|
||||||
metrics := make([]telegraf.Metric, 0, len(m.GetHistogram().Bucket)+2)
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
|
|
||||||
fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount())
|
|
||||||
fields[metricName+"_sum"] = m.GetHistogram().GetSampleSum()
|
|
||||||
|
|
||||||
met := metric.New("prometheus", tags, fields, t, ValueType(metricType))
|
|
||||||
metrics = append(metrics, met)
|
|
||||||
|
|
||||||
infSeen := false
|
|
||||||
for _, b := range m.GetHistogram().Bucket {
|
|
||||||
newTags := tags
|
|
||||||
fields = make(map[string]interface{})
|
|
||||||
newTags["le"] = fmt.Sprint(b.GetUpperBound())
|
|
||||||
fields[metricName+"_bucket"] = float64(b.GetCumulativeCount())
|
|
||||||
|
|
||||||
histogramMetric := metric.New("prometheus", newTags, fields, t, ValueType(metricType))
|
|
||||||
metrics = append(metrics, histogramMetric)
|
|
||||||
if math.IsInf(b.GetUpperBound(), +1) {
|
|
||||||
infSeen = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Infinity bucket is required for proper function of histogram in prometheus
|
|
||||||
if !infSeen {
|
|
||||||
newTags := tags
|
|
||||||
newTags["le"] = "+Inf"
|
|
||||||
|
|
||||||
fields = make(map[string]interface{})
|
|
||||||
fields[metricName+"_bucket"] = float64(m.GetHistogram().GetSampleCount())
|
|
||||||
|
|
||||||
histogramInfMetric := metric.New("prometheus", newTags, fields, t, ValueType(metricType))
|
|
||||||
metrics = append(metrics, histogramInfMetric)
|
|
||||||
}
|
|
||||||
return metrics
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get name and value from metric
|
|
||||||
func getNameAndValueV2(m *dto.Metric, metricName string) map[string]interface{} {
|
|
||||||
fields := make(map[string]interface{})
|
|
||||||
if m.Gauge != nil {
|
|
||||||
if !math.IsNaN(m.GetGauge().GetValue()) {
|
|
||||||
fields[metricName] = m.GetGauge().GetValue()
|
|
||||||
}
|
|
||||||
} else if m.Counter != nil {
|
|
||||||
if !math.IsNaN(m.GetCounter().GetValue()) {
|
|
||||||
fields[metricName] = m.GetCounter().GetValue()
|
|
||||||
}
|
|
||||||
} else if m.Untyped != nil {
|
|
||||||
if !math.IsNaN(m.GetUntyped().GetValue()) {
|
|
||||||
fields[metricName] = m.GetUntyped().GetValue()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fields
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Parser) getTimestampV2(m *dto.Metric, now time.Time) time.Time {
|
|
||||||
var t time.Time
|
|
||||||
if !p.IgnoreTimestamp && m.TimestampMs != nil && *m.TimestampMs > 0 {
|
|
||||||
t = time.Unix(0, m.GetTimestampMs()*1000000)
|
|
||||||
} else {
|
|
||||||
t = now
|
|
||||||
}
|
|
||||||
return t
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue