feat(parser.prometheusremotewrite): Add dense metric version to better support histograms (#16493)

This commit is contained in:
Yu Long 2025-03-06 16:11:27 -08:00 committed by GitHub
parent 441fa9c9c6
commit edb7f30521
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 477 additions and 77 deletions

2
go.mod
View File

@ -93,6 +93,7 @@ require (
github.com/go-stomp/stomp v2.1.4+incompatible
github.com/gobwas/glob v0.2.3
github.com/gofrs/uuid/v5 v5.3.0
github.com/gogo/protobuf v1.3.2
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
github.com/golang/snappy v0.0.4
@ -351,7 +352,6 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect

View File

@ -16,6 +16,9 @@ additional configuration options for Prometheus Remote Write Samples.
## Data format to consume.
data_format = "prometheusremotewrite"
## Metric version to use, either 1 or 2
# metric_version = 2
```
## Example Input
@ -39,7 +42,13 @@ prompb.WriteRequest{
```
## Example Output
## Example Output (v1)
```text
go_gc_duration_seconds,instance=localhost:9090,job=prometheus,quantile=0.99 value=4.63 1614889298859000000
```
## Example Output (v2)
```text
prometheus_remote_write,instance=localhost:9090,job=prometheus,quantile=0.99 go_gc_duration_seconds=4.63 1614889298859000000
@ -47,4 +56,5 @@ prometheus_remote_write,instance=localhost:9090,job=prometheus,quantile=0.99 go_
## For alignment with the [InfluxDB v1.x Prometheus Remote Write Spec](https://docs.influxdata.com/influxdb/v1.8/supported_protocols/prometheus/#how-prometheus-metrics-are-parsed-in-influxdb)
- Use the [Starlark processor rename prometheus remote write script](https://github.com/influxdata/telegraf/blob/master/plugins/processors/starlark/testdata/rename_prometheus_remote_write.star) to rename the measurement name to the fieldname and rename the fieldname to value.
- V1: already aligned, it parses metrics according to the spec.
- V2: Use the [Starlark processor rename prometheus remote write script](https://github.com/influxdata/telegraf/blob/master/plugins/processors/starlark/testdata/rename_prometheus_remote_write.star) to rename the measurement name to the fieldname and rename the fieldname to value.

View File

@ -0,0 +1,106 @@
package prometheusremotewrite
import (
"fmt"
"math"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
func (p *Parser) extractMetricsV1(ts *prompb.TimeSeries) ([]telegraf.Metric, error) {
t := time.Now()
// Convert each prometheus metrics to the corresponding telegraf metrics.
// You will get one telegraf metric with one field per prometheus metric
// for "simple" types like Gauge and Counter.
// However, since in prometheus remote write, a "complex" type is already
// broken down into multiple "simple" types metrics, you will still get
// multiple telegraf metrics per Histogram or Summary.
// One bucket of a histogram could also be split into multiple remote
// write requests, so we won't try to aggregate them here.
// However, for Native Histogram, you will get one telegraf metric with
// multiple fields.
metrics := make([]telegraf.Metric, 0, len(ts.Samples)+len(ts.Histograms))
tags := make(map[string]string, len(p.DefaultTags)+len(ts.Labels))
for key, value := range p.DefaultTags {
tags[key] = value
}
for _, l := range ts.Labels {
tags[l.Name] = l.Value
}
metricName := tags[model.MetricNameLabel]
if metricName == "" {
return nil, fmt.Errorf("metric name %q not found in tag-set or empty", model.MetricNameLabel)
}
delete(tags, model.MetricNameLabel)
for _, s := range ts.Samples {
if math.IsNaN(s.Value) {
continue
}
// In prometheus remote write,
// You won't know if it's a counter or gauge or a sub-counter in a histogram
fields := map[string]interface{}{"value": s.Value}
if s.Timestamp > 0 {
t = time.Unix(0, s.Timestamp*1000000)
}
m := metric.New(metricName, tags, fields, t)
metrics = append(metrics, m)
}
for _, hp := range ts.Histograms {
h := hp.ToFloatHistogram()
if hp.Timestamp > 0 {
t = time.Unix(0, hp.Timestamp*1000000)
}
fields := map[string]any{
"counter_reset_hint": uint64(h.CounterResetHint),
"schema": int64(h.Schema),
"zero_threshold": h.ZeroThreshold,
"zero_count": h.ZeroCount,
"count": h.Count,
"sum": h.Sum,
}
count := 0.0
iter := h.AllBucketIterator()
for iter.Next() {
bucket := iter.At()
count = count + bucket.Count
fields[fmt.Sprintf("%g", bucket.Upper)] = count
}
// expand positiveSpans and negativeSpans into fields
for i, span := range h.PositiveSpans {
fields[fmt.Sprintf("positive_span_%d_offset", i)] = int64(span.Offset)
fields[fmt.Sprintf("positive_span_%d_length", i)] = uint64(span.Length)
}
for i, span := range h.NegativeSpans {
fields[fmt.Sprintf("negative_span_%d_offset", i)] = int64(span.Offset)
fields[fmt.Sprintf("negative_span_%d_length", i)] = uint64(span.Length)
}
// expand positiveBuckets and negativeBuckets into fields
for i, bucket := range h.PositiveBuckets {
fields[fmt.Sprintf("positive_bucket_%d", i)] = bucket
}
for i, bucket := range h.NegativeBuckets {
fields[fmt.Sprintf("negative_bucket_%d", i)] = bucket
}
m := metric.New(metricName, tags, fields, t, telegraf.Histogram)
metrics = append(metrics, m)
}
return metrics, nil
}

View File

@ -0,0 +1,91 @@
package prometheusremotewrite
import (
"fmt"
"math"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
func (p *Parser) extractMetricsV2(ts *prompb.TimeSeries) ([]telegraf.Metric, error) {
t := time.Now()
// Convert each prometheus metric to a corresponding telegraf metric
// with one field each. The process will filter NaNs in values and skip
// the corresponding metrics.
metrics := make([]telegraf.Metric, 0)
tags := make(map[string]string, len(p.DefaultTags)+len(ts.Labels))
for key, value := range p.DefaultTags {
tags[key] = value
}
for _, l := range ts.Labels {
tags[l.Name] = l.Value
}
metricName := tags[model.MetricNameLabel]
if metricName == "" {
return nil, fmt.Errorf("metric name %q not found in tag-set or empty", model.MetricNameLabel)
}
delete(tags, model.MetricNameLabel)
for _, s := range ts.Samples {
if math.IsNaN(s.Value) {
continue
}
// converting to telegraf metric
fields := map[string]interface{}{metricName: s.Value}
if s.Timestamp > 0 {
t = time.Unix(0, s.Timestamp*1000000)
}
m := metric.New("prometheus_remote_write", tags, fields, t)
metrics = append(metrics, m)
}
for _, hp := range ts.Histograms {
h := hp.ToFloatHistogram()
if hp.Timestamp > 0 {
t = time.Unix(0, hp.Timestamp*1000000)
}
fields := map[string]any{
metricName + "_sum": h.Sum,
}
m := metric.New("prometheus_remote_write", tags, fields, t)
metrics = append(metrics, m)
fields = map[string]any{
metricName + "_count": h.Count,
}
m = metric.New("prometheus_remote_write", tags, fields, t)
metrics = append(metrics, m)
count := 0.0
iter := h.AllBucketIterator()
for iter.Next() {
bucket := iter.At()
count = count + bucket.Count
fields = map[string]any{
metricName: count,
}
localTags := make(map[string]string, len(tags)+1)
localTags[metricName+"_le"] = fmt.Sprintf("%g", bucket.Upper)
for k, v := range tags {
localTags[k] = v
}
m := metric.New("prometheus_remote_write", localTags, fields, t)
metrics = append(metrics, m)
}
}
return metrics, nil
}

View File

@ -3,19 +3,16 @@ package prometheusremotewrite
import (
"errors"
"fmt"
"math"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)
type Parser struct {
DefaultTags map[string]string
MetricVersion int `toml:"prometheus_metric_version"`
DefaultTags map[string]string
}
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
@ -27,79 +24,22 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
return nil, fmt.Errorf("unable to unmarshal request body: %w", err)
}
now := time.Now()
for _, ts := range req.Timeseries {
tags := make(map[string]string, len(p.DefaultTags)+len(ts.Labels))
for key, value := range p.DefaultTags {
tags[key] = value
var metricsFromTs []telegraf.Metric
switch p.MetricVersion {
case 0, 2:
metricsFromTs, err = p.extractMetricsV2(&ts)
case 1:
metricsFromTs, err = p.extractMetricsV1(&ts)
default:
return nil, fmt.Errorf("unknown prometheus metric version %d", p.MetricVersion)
}
for _, l := range ts.Labels {
tags[l.Name] = l.Value
}
metricName := tags[model.MetricNameLabel]
if metricName == "" {
return nil, fmt.Errorf("metric name %q not found in tag-set or empty", model.MetricNameLabel)
}
delete(tags, model.MetricNameLabel)
t := now
for _, s := range ts.Samples {
fields := make(map[string]interface{})
if !math.IsNaN(s.Value) {
fields[metricName] = s.Value
}
// converting to telegraf metric
if len(fields) > 0 {
if s.Timestamp > 0 {
t = time.Unix(0, s.Timestamp*1000000)
}
m := metric.New("prometheus_remote_write", tags, fields, t)
metrics = append(metrics, m)
}
}
for _, hp := range ts.Histograms {
h := hp.ToFloatHistogram()
if hp.Timestamp > 0 {
t = time.Unix(0, hp.Timestamp*1000000)
}
fields := map[string]any{
metricName + "_sum": h.Sum,
}
m := metric.New("prometheus_remote_write", tags, fields, t)
metrics = append(metrics, m)
fields = map[string]any{
metricName + "_count": h.Count,
}
m = metric.New("prometheus_remote_write", tags, fields, t)
metrics = append(metrics, m)
count := 0.0
iter := h.AllBucketIterator()
for iter.Next() {
bucket := iter.At()
count = count + bucket.Count
fields = map[string]any{
metricName: count,
}
localTags := make(map[string]string, len(tags)+1)
localTags[metricName+"_le"] = fmt.Sprintf("%g", bucket.Upper)
for k, v := range tags {
localTags[k] = v
}
m := metric.New("prometheus_remote_write", localTags, fields, t)
metrics = append(metrics, m)
}
if err != nil {
return nil, err
}
metrics = append(metrics, metricsFromTs...)
}
return metrics, err
}

View File

@ -1,18 +1,121 @@
package prometheusremotewrite
import (
"bytes"
"fmt"
"os"
"path/filepath"
"testing"
"time"
"github.com/gogo/protobuf/jsonpb"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
test "github.com/influxdata/telegraf/testutil/plugin_input"
)
const (
testCasesDir = "testcases"
benchmarkFolder = "benchmark"
inputFilename = "input.json"
expectedFilename = "expected_v%d.out"
configFilename = "telegraf.conf"
)
func TestCases(t *testing.T) {
// Get all directories in testcases
folders, err := os.ReadDir(testCasesDir)
require.NoError(t, err)
// Make sure testdata contains data
require.NotEmpty(t, folders)
for _, f := range folders {
if !f.IsDir() {
continue
}
fname := f.Name()
testdataPath := filepath.Join(testCasesDir, fname)
// Load input data
inputFilePath := filepath.Join(testdataPath, inputFilename)
buf, err := os.ReadFile(inputFilePath)
require.NoError(t, err)
var writeRequest prompb.WriteRequest
err = jsonpb.Unmarshal(bytes.NewReader(buf), &writeRequest)
require.NoError(t, err)
inputBytes, err := writeRequest.Marshal()
require.NoError(t, err)
versions := []int{1, 2}
for _, version := range versions {
t.Run(fmt.Sprintf("%s_v%d", fname, version), func(t *testing.T) {
// Load parser
configFilePath := filepath.Join(testdataPath, configFilename)
cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfig(configFilePath))
require.Len(t, cfg.Inputs, 1)
plugin := cfg.Inputs[0].Input.(*test.Plugin)
parser := plugin.Parser.(*models.RunningParser).Parser.(*Parser)
parser.MetricVersion = version
// Load expected output
expectedFilePath := filepath.Join(testdataPath, fmt.Sprintf(expectedFilename, version))
var expected []telegraf.Metric
influxParser := &influx.Parser{}
require.NoError(t, influxParser.Init())
expected, err := testutil.ParseMetricsFromFile(expectedFilePath, influxParser)
require.NoError(t, err)
// Act and assert
parsed, err := parser.Parse(inputBytes)
require.NoError(t, err)
require.Len(t, parsed, len(expected))
// Ignore type when comparing, because expected metrics are parsed from influx lines and thus always untyped
testutil.RequireMetricsEqual(t, expected, parsed, testutil.SortMetrics(), testutil.IgnoreType())
})
}
}
}
func BenchmarkParsingMetricVersion1(b *testing.B) {
parser := &Parser{
MetricVersion: 1,
}
benchmarkData, err := os.ReadFile(filepath.Join(testCasesDir, benchmarkFolder, inputFilename))
require.NoError(b, err)
require.NotEmpty(b, benchmarkData)
for n := 0; n < b.N; n++ {
//nolint:errcheck // Benchmarking so skip the error check to avoid the unnecessary operations
parser.Parse(benchmarkData)
}
}
func BenchmarkParsingMetricVersion2(b *testing.B) {
parser := &Parser{
MetricVersion: 2,
}
benchmarkData, err := os.ReadFile(filepath.Join(testCasesDir, benchmarkFolder, inputFilename))
require.NoError(b, err)
require.NotEmpty(b, benchmarkData)
for n := 0; n < b.N; n++ {
//nolint:errcheck // Benchmarking so skip the error check to avoid the unnecessary operations
parser.Parse(benchmarkData)
}
}
func TestParse(t *testing.T) {
prompbInput := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{

View File

@ -0,0 +1,2 @@
benchmark_a,source=myhost,tags_platform=python,tags_sdkver=3.11.5 value=5 1740375855000000
benchmark_b,source=myhost,tags_platform=python,tags_sdkver=3.11.4 value=4 1740375855000000

View File

@ -0,0 +1,2 @@
prometheus_remote_write,source=myhost,tags_platform=python,tags_sdkver=3.11.5 benchmark_a=5 1740375855000000
prometheus_remote_write,source=myhost,tags_platform=python,tags_sdkver=3.11.4 benchmark_b=4 1740375855000000

View File

@ -0,0 +1,22 @@
{
"timeseries": [
{
"labels": [
{ "name": "__name__", "value": "benchmark_a" },
{ "name": "source", "value": "myhost" },
{ "name": "tags_platform", "value": "python" },
{ "name": "tags_sdkver", "value": "3.11.5" }
],
"samples": [{ "value": 5.0, "timestamp": 1740375855 }]
},
{
"labels": [
{ "name": "__name__", "value": "benchmark_b" },
{ "name": "source", "value": "myhost" },
{ "name": "tags_platform", "value": "python" },
{ "name": "tags_sdkver", "value": "3.11.4" }
],
"samples": [{ "value": 4.0, "timestamp": 1740375855 }]
}
]
}

View File

@ -0,0 +1,3 @@
[[inputs.test]]
files = ["input.json"]
data_format = "prometheusremotewrite"

View File

@ -0,0 +1 @@
foo,__eg__=bar,defaultTag=defaultTagValue value=1 1740375855000000

View File

@ -0,0 +1 @@
prometheus_remote_write,__eg__=bar,defaultTag=defaultTagValue foo=1 1740375855000000

View File

@ -0,0 +1,11 @@
{
"timeseries": [
{
"labels": [
{ "name": "__name__", "value": "foo" },
{ "name": "__eg__", "value": "bar" }
],
"samples": [{ "value": 1, "timestamp": 1740375855 }]
}
]
}

View File

@ -0,0 +1,4 @@
[[inputs.test]]
files = ["input.json"]
data_format = "prometheusremotewrite"
default_tags = {"defaultTag" = "defaultTagValue"}

View File

@ -0,0 +1 @@
test_metric_seconds -0.5=5,-1=3,1=7,2=8,4=11,64=16,8=13,count=16,counter_reset_hint=0u,negative_bucket_0=2,negative_bucket_1=3,negative_span_0_length=2u,negative_span_0_offset=0i,positive_bucket_0=2,positive_bucket_1=1,positive_bucket_2=3,positive_bucket_3=2,positive_bucket_4=3,positive_span_0_length=4u,positive_span_0_offset=0i,positive_span_1_length=1u,positive_span_1_offset=2i,schema=0i,sum=158.63,zero_count=0,zero_threshold=0.001 1740375855000000

View File

@ -0,0 +1,9 @@
prometheus_remote_write test_metric_seconds_sum=158.63 1740375855000000
prometheus_remote_write test_metric_seconds_count=16 1740375855000000
prometheus_remote_write,test_metric_seconds_le=-1 test_metric_seconds=3 1740375855000000
prometheus_remote_write,test_metric_seconds_le=-0.5 test_metric_seconds=5 1740375855000000
prometheus_remote_write,test_metric_seconds_le=1 test_metric_seconds=7 1740375855000000
prometheus_remote_write,test_metric_seconds_le=2 test_metric_seconds=8 1740375855000000
prometheus_remote_write,test_metric_seconds_le=4 test_metric_seconds=11 1740375855000000
prometheus_remote_write,test_metric_seconds_le=8 test_metric_seconds=13 1740375855000000
prometheus_remote_write,test_metric_seconds_le=64 test_metric_seconds=16 1740375855000000

View File

@ -0,0 +1,23 @@
{
"timeseries": [
{
"labels": [{ "name": "__name__", "value": "test_metric_seconds" }],
"histograms": [
{
"count_float": 16.0,
"zero_threshold": 0.001,
"sum": 158.63,
"schema": 0,
"positive_spans": [
{ "offset": 0, "length": 4 },
{ "offset": 2, "length": 1 }
],
"positive_counts": [2, 1, 3, 2, 3],
"negative_spans": [{ "offset": 0, "length": 2 }],
"negative_counts": [2, 3],
"timestamp": 1740375855
}
]
}
]
}

View File

@ -0,0 +1,3 @@
[[inputs.test]]
files = ["input.json"]
data_format = "prometheusremotewrite"

View File

@ -0,0 +1 @@
test_metric_seconds -0.5=5,-1=3,1=7,2=8,4=11,64=16,8=13,count=16,counter_reset_hint=0u,negative_bucket_0=2,negative_bucket_1=3,negative_span_0_length=2u,negative_span_0_offset=0i,positive_bucket_0=2,positive_bucket_1=1,positive_bucket_2=3,positive_bucket_3=2,positive_bucket_4=3,positive_span_0_length=4u,positive_span_0_offset=0i,positive_span_1_length=1u,positive_span_1_offset=2i,schema=0i,sum=158.63,zero_count=0,zero_threshold=0.001 1740375855000000

View File

@ -0,0 +1,9 @@
prometheus_remote_write test_metric_seconds_sum=158.63 1740375855000000
prometheus_remote_write test_metric_seconds_count=16 1740375855000000
prometheus_remote_write,test_metric_seconds_le=-1 test_metric_seconds=3 1740375855000000
prometheus_remote_write,test_metric_seconds_le=-0.5 test_metric_seconds=5 1740375855000000
prometheus_remote_write,test_metric_seconds_le=1 test_metric_seconds=7 1740375855000000
prometheus_remote_write,test_metric_seconds_le=2 test_metric_seconds=8 1740375855000000
prometheus_remote_write,test_metric_seconds_le=4 test_metric_seconds=11 1740375855000000
prometheus_remote_write,test_metric_seconds_le=8 test_metric_seconds=13 1740375855000000
prometheus_remote_write,test_metric_seconds_le=64 test_metric_seconds=16 1740375855000000

View File

@ -0,0 +1,23 @@
{
"timeseries": [
{
"labels": [{ "name": "__name__", "value": "test_metric_seconds" }],
"histograms": [
{
"count_int": 16,
"zero_threshold": 0.001,
"sum": 158.63,
"schema": 0,
"positive_spans": [
{ "offset": 0, "length": 4 },
{ "offset": 2, "length": 1 }
],
"positive_deltas": [2, -1, 2, -1, 1],
"negative_spans": [{ "offset": 0, "length": 2 }],
"negative_deltas": [2, 1],
"timestamp": 1740375855
}
]
}
]
}

View File

@ -0,0 +1,3 @@
[[inputs.test]]
files = ["input.json"]
data_format = "prometheusremotewrite"

View File

@ -0,0 +1,2 @@
go_gc_duration_seconds,quantile=0.99 value=4.63 1740375855000000
prometheus_target_interval_length_seconds,job=prometheus value=14.99 1740375855000000

View File

@ -0,0 +1,2 @@
prometheus_remote_write,quantile=0.99 go_gc_duration_seconds=4.63 1740375855000000
prometheus_remote_write,job=prometheus prometheus_target_interval_length_seconds=14.99 1740375855000000

View File

@ -0,0 +1,21 @@
{
"timeseries": [
{
"labels": [
{ "name": "__name__", "value": "go_gc_duration_seconds" },
{ "name": "quantile", "value": "0.99" }
],
"samples": [{ "value": 4.63, "timestamp": 1740375855 }]
},
{
"labels": [
{
"name": "__name__",
"value": "prometheus_target_interval_length_seconds"
},
{ "name": "job", "value": "prometheus" }
],
"samples": [{ "value": 14.99, "timestamp": 1740375855 }]
}
]
}

View File

@ -0,0 +1,3 @@
[[inputs.test]]
files = ["input.json"]
data_format = "prometheusremotewrite"

View File

@ -158,6 +158,10 @@ func IgnoreTime() cmp.Option {
return cmpopts.IgnoreFields(metricDiff{}, "Time")
}
func IgnoreType() cmp.Option {
return cmpopts.IgnoreFields(metricDiff{}, "Type")
}
// IgnoreFields disables comparison of the fields with the given names.
// The field-names are case-sensitive!
func IgnoreFields(names ...string) cmp.Option {