From 99ea0b1ca69692368fbe2e602a7c21ba05a1f63b Mon Sep 17 00:00:00 2001 From: Michael Hoffmann <88316042+mhoffm-aiven@users.noreply.github.com> Date: Mon, 3 Apr 2023 20:44:08 +0200 Subject: [PATCH] feat(serializer.prometheusremote): Improve performance (#12971) --- plugins/serializers/prometheus/convert.go | 33 ++----- .../prometheusremotewrite.go | 88 +++++++++---------- .../prometheusremotewrite_test.go | 52 +++++++++++ 3 files changed, 101 insertions(+), 72 deletions(-) diff --git a/plugins/serializers/prometheus/convert.go b/plugins/serializers/prometheus/convert.go index dca865e47..178c0b9cb 100644 --- a/plugins/serializers/prometheus/convert.go +++ b/plugins/serializers/prometheus/convert.go @@ -5,6 +5,7 @@ import ( "unicode" dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/model" "github.com/influxdata/telegraf" ) @@ -55,35 +56,10 @@ var LabelNameTable = Table{ }, } -func isValid(name string, table Table) bool { - if name == "" { - return false - } - - for i, r := range name { - switch { - case i == 0: - if !unicode.In(r, table.First) { - return false - } - default: - if !unicode.In(r, table.Rest) { - return false - } - } - } - - return true -} - // Sanitize checks if the name is valid according to the table. If not, it // attempts to replaces invalid runes with an underscore to create a valid // name. func sanitize(name string, table Table) (string, bool) { - if isValid(name, table) { - return name, true - } - var b strings.Builder for i, r := range name { @@ -105,7 +81,6 @@ func sanitize(name string, table Table) (string, bool) { if name == "" { return "", false } - return name, true } @@ -113,6 +88,9 @@ func sanitize(name string, table Table) (string, bool) { // not, it attempts to replaces invalid runes with an underscore to create a // valid name. func SanitizeMetricName(name string) (string, bool) { + if model.IsValidMetricName(model.LabelValue(name)) { + return name, true + } return sanitize(name, MetricNameTable) } @@ -120,6 +98,9 @@ func SanitizeMetricName(name string) (string, bool) { // not, it attempts to replaces invalid runes with an underscore to create a // valid name. func SanitizeLabelName(name string) (string, bool) { + if model.LabelName(name).IsValid() { + return name, true + } return sanitize(name, LabelNameTable) } diff --git a/plugins/serializers/prometheusremotewrite/prometheusremotewrite.go b/plugins/serializers/prometheusremotewrite/prometheusremotewrite.go index bfb9d7bea..a36247b1f 100644 --- a/plugins/serializers/prometheusremotewrite/prometheusremotewrite.go +++ b/plugins/serializers/prometheusremotewrite/prometheusremotewrite.go @@ -53,9 +53,11 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { var buf bytes.Buffer + var entries = make(map[MetricKey]prompb.TimeSeries) + var labels = make([]prompb.Label, 0) for _, metric := range metrics { - commonLabels := s.createLabels(metric) + labels = s.appendCommonLabels(labels[:0], metric) var metrickey MetricKey var promts prompb.TimeSeries for _, field := range metric.FieldList() { @@ -64,6 +66,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { if !ok { continue } + switch metric.Type() { case telegraf.Counter: fallthrough @@ -74,26 +77,24 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { if !ok { continue } - metrickey, promts = getPromTS(metricName, commonLabels, value, metric.Time()) + metrickey, promts = getPromTS(metricName, labels, value, metric.Time()) case telegraf.Histogram: switch { case strings.HasSuffix(field.Key, "_bucket"): // if bucket only, init sum, count, inf - metrickeysum, promtssum := getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, float64(0), metric.Time()) + metrickeysum, promtssum := getPromTS(fmt.Sprintf("%s_sum", metricName), labels, float64(0), metric.Time()) if _, ok = entries[metrickeysum]; !ok { entries[metrickeysum] = promtssum } - metrickeycount, promtscount := getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(0), metric.Time()) + metrickeycount, promtscount := getPromTS(fmt.Sprintf("%s_count", metricName), labels, float64(0), metric.Time()) if _, ok = entries[metrickeycount]; !ok { entries[metrickeycount] = promtscount } - labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1) - copy(labels, commonLabels) - labels = append(labels, prompb.Label{ + extraLabel := prompb.Label{ Name: "le", Value: "+Inf", - }) - metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(0), metric.Time()) + } + metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(0), metric.Time(), extraLabel) if _, ok = entries[metrickeyinf]; !ok { entries[metrickeyinf] = promtsinf } @@ -111,20 +112,18 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { continue } - labels = make([]prompb.Label, len(commonLabels), len(commonLabels)+1) - copy(labels, commonLabels) - labels = append(labels, prompb.Label{ + extraLabel = prompb.Label{ Name: "le", Value: fmt.Sprint(bound), - }) - metrickey, promts = getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time()) + } + metrickey, promts = getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time(), extraLabel) case strings.HasSuffix(field.Key, "_sum"): sum, ok := prometheus.SampleSum(field.Value) if !ok { continue } - metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, sum, metric.Time()) + metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), labels, sum, metric.Time()) case strings.HasSuffix(field.Key, "_count"): count, ok := prometheus.SampleCount(field.Value) if !ok { @@ -132,18 +131,16 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { } // if no bucket generate +Inf entry - labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1) - copy(labels, commonLabels) - labels = append(labels, prompb.Label{ + extraLabel := prompb.Label{ Name: "le", Value: "+Inf", - }) - metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time()) + } + metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time(), extraLabel) if minf, ok := entries[metrickeyinf]; !ok || minf.Samples[0].Value == 0 { entries[metrickeyinf] = promtsinf } - metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(count), metric.Time()) + metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), labels, float64(count), metric.Time()) default: continue } @@ -155,14 +152,14 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { continue } - metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, sum, metric.Time()) + metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), labels, sum, metric.Time()) case strings.HasSuffix(field.Key, "_count"): count, ok := prometheus.SampleCount(field.Value) if !ok { continue } - metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(count), metric.Time()) + metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), labels, float64(count), metric.Time()) default: quantileTag, ok := metric.GetTag("quantile") if !ok { @@ -177,13 +174,11 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { continue } - labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1) - copy(labels, commonLabels) - labels = append(labels, prompb.Label{ + extraLabel := prompb.Label{ Name: "quantile", Value: fmt.Sprint(quantile), - }) - metrickey, promts = getPromTS(metricName, labels, value, metric.Time()) + } + metrickey, promts = getPromTS(metricName, labels, value, metric.Time(), extraLabel) } default: return nil, fmt.Errorf("unknown type %v", metric.Type()) @@ -252,8 +247,7 @@ func hasLabel(name string, labels []prompb.Label) bool { return false } -func (s *Serializer) createLabels(metric telegraf.Metric) []prompb.Label { - labels := make([]prompb.Label, 0, len(metric.TagList())) +func (s *Serializer) appendCommonLabels(labels []prompb.Label, metric telegraf.Metric) []prompb.Label { for _, tag := range metric.TagList() { // Ignore special tags for histogram and summary types. switch metric.Type() { @@ -284,7 +278,6 @@ func (s *Serializer) createLabels(metric telegraf.Metric) []prompb.Label { return labels } - addedFieldLabel := false for _, field := range metric.FieldList() { value, ok := field.Value.(string) if !ok { @@ -303,13 +296,6 @@ func (s *Serializer) createLabels(metric telegraf.Metric) []prompb.Label { } labels = append(labels, prompb.Label{Name: name, Value: value}) - addedFieldLabel = true - } - - if addedFieldLabel { - sort.Slice(labels, func(i, j int) bool { - return labels[i].Name < labels[j].Name - }) } return labels @@ -326,23 +312,33 @@ func MakeMetricKey(labels []prompb.Label) MetricKey { return MetricKey(h.Sum64()) } -func getPromTS(name string, labels []prompb.Label, value float64, ts time.Time) (MetricKey, prompb.TimeSeries) { +func getPromTS(name string, labels []prompb.Label, value float64, ts time.Time, extraLabels ...prompb.Label) (MetricKey, prompb.TimeSeries) { + labelscopy := make([]prompb.Label, len(labels), len(labels)+1) + copy(labelscopy, labels) + sample := []prompb.Sample{{ // Timestamp is int milliseconds for remote write. Timestamp: ts.UnixNano() / int64(time.Millisecond), Value: value, }} - labelscopy := make([]prompb.Label, len(labels), len(labels)+1) - copy(labelscopy, labels) - labels = append(labelscopy, prompb.Label{ + labelscopy = append(labelscopy, extraLabels...) + labelscopy = append(labelscopy, prompb.Label{ Name: "__name__", Value: name, }) // we sort the labels since Prometheus TSDB does not like out of order labels - sort.Slice(labels, func(i, j int) bool { - return labels[i].Name < labels[j].Name - }) + sort.Sort(sortableLabels(labelscopy)) - return MakeMetricKey(labels), prompb.TimeSeries{Labels: labels, Samples: sample} + return MakeMetricKey(labelscopy), prompb.TimeSeries{Labels: labelscopy, Samples: sample} +} + +type sortableLabels []prompb.Label + +func (sl sortableLabels) Len() int { return len(sl) } +func (sl sortableLabels) Less(i, j int) bool { + return sl[i].Name < sl[j].Name +} +func (sl sortableLabels) Swap(i, j int) { + sl[i], sl[j] = sl[j], sl[i] } diff --git a/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go b/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go index 2bad84917..77a10204a 100644 --- a/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go +++ b/plugins/serializers/prometheusremotewrite/prometheusremotewrite_test.go @@ -16,6 +16,28 @@ import ( "github.com/influxdata/telegraf/testutil" ) +func BenchmarkRemoteWrite(b *testing.B) { + batch := make([]telegraf.Metric, 1000) + for i := range batch { + batch[i] = testutil.MustMetric( + "cpu", + map[string]string{ + "host": "example.org", + "C": "D", + "A": "B", + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ) + } + s := NewSerializer(FormatConfig{}) + for n := 0; n < b.N; n++ { + _, _ = s.SerializeBatch(batch) + } +} + func TestRemoteWriteSerialize(t *testing.T) { tests := []struct { name string @@ -23,6 +45,36 @@ func TestRemoteWriteSerialize(t *testing.T) { metric telegraf.Metric expected []byte }{ + // the only way that we can produce an empty metric name is if the + // metric is called "prometheus" and has no fields. + { + name: "empty name is skipped", + metric: testutil.MustMetric( + "prometheus", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + expected: []byte(``), + }, + { + name: "empty labels are skipped", + metric: testutil.MustMetric( + "cpu", + map[string]string{ + "": "example.org", + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + expected: []byte(` +cpu_time_idle 42 +`), + }, { name: "simple", metric: testutil.MustMetric(