feat(serializer.prometheusremote): Improve performance (#12971)

This commit is contained in:
Michael Hoffmann 2023-04-03 20:44:08 +02:00 committed by GitHub
parent 2320bbd1a8
commit 99ea0b1ca6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 72 deletions

View File

@ -5,6 +5,7 @@ import (
"unicode" "unicode"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
@ -55,35 +56,10 @@ var LabelNameTable = Table{
}, },
} }
func isValid(name string, table Table) bool {
if name == "" {
return false
}
for i, r := range name {
switch {
case i == 0:
if !unicode.In(r, table.First) {
return false
}
default:
if !unicode.In(r, table.Rest) {
return false
}
}
}
return true
}
// Sanitize checks if the name is valid according to the table. If not, it // Sanitize checks if the name is valid according to the table. If not, it
// attempts to replaces invalid runes with an underscore to create a valid // attempts to replaces invalid runes with an underscore to create a valid
// name. // name.
func sanitize(name string, table Table) (string, bool) { func sanitize(name string, table Table) (string, bool) {
if isValid(name, table) {
return name, true
}
var b strings.Builder var b strings.Builder
for i, r := range name { for i, r := range name {
@ -105,7 +81,6 @@ func sanitize(name string, table Table) (string, bool) {
if name == "" { if name == "" {
return "", false return "", false
} }
return name, true return name, true
} }
@ -113,6 +88,9 @@ func sanitize(name string, table Table) (string, bool) {
// not, it attempts to replaces invalid runes with an underscore to create a // not, it attempts to replaces invalid runes with an underscore to create a
// valid name. // valid name.
func SanitizeMetricName(name string) (string, bool) { func SanitizeMetricName(name string) (string, bool) {
if model.IsValidMetricName(model.LabelValue(name)) {
return name, true
}
return sanitize(name, MetricNameTable) return sanitize(name, MetricNameTable)
} }
@ -120,6 +98,9 @@ func SanitizeMetricName(name string) (string, bool) {
// not, it attempts to replaces invalid runes with an underscore to create a // not, it attempts to replaces invalid runes with an underscore to create a
// valid name. // valid name.
func SanitizeLabelName(name string) (string, bool) { func SanitizeLabelName(name string) (string, bool) {
if model.LabelName(name).IsValid() {
return name, true
}
return sanitize(name, LabelNameTable) return sanitize(name, LabelNameTable)
} }

View File

@ -53,9 +53,11 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var buf bytes.Buffer var buf bytes.Buffer
var entries = make(map[MetricKey]prompb.TimeSeries) var entries = make(map[MetricKey]prompb.TimeSeries)
var labels = make([]prompb.Label, 0)
for _, metric := range metrics { for _, metric := range metrics {
commonLabels := s.createLabels(metric) labels = s.appendCommonLabels(labels[:0], metric)
var metrickey MetricKey var metrickey MetricKey
var promts prompb.TimeSeries var promts prompb.TimeSeries
for _, field := range metric.FieldList() { for _, field := range metric.FieldList() {
@ -64,6 +66,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
if !ok { if !ok {
continue continue
} }
switch metric.Type() { switch metric.Type() {
case telegraf.Counter: case telegraf.Counter:
fallthrough fallthrough
@ -74,26 +77,24 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
if !ok { if !ok {
continue continue
} }
metrickey, promts = getPromTS(metricName, commonLabels, value, metric.Time()) metrickey, promts = getPromTS(metricName, labels, value, metric.Time())
case telegraf.Histogram: case telegraf.Histogram:
switch { switch {
case strings.HasSuffix(field.Key, "_bucket"): case strings.HasSuffix(field.Key, "_bucket"):
// if bucket only, init sum, count, inf // if bucket only, init sum, count, inf
metrickeysum, promtssum := getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, float64(0), metric.Time()) metrickeysum, promtssum := getPromTS(fmt.Sprintf("%s_sum", metricName), labels, float64(0), metric.Time())
if _, ok = entries[metrickeysum]; !ok { if _, ok = entries[metrickeysum]; !ok {
entries[metrickeysum] = promtssum entries[metrickeysum] = promtssum
} }
metrickeycount, promtscount := getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(0), metric.Time()) metrickeycount, promtscount := getPromTS(fmt.Sprintf("%s_count", metricName), labels, float64(0), metric.Time())
if _, ok = entries[metrickeycount]; !ok { if _, ok = entries[metrickeycount]; !ok {
entries[metrickeycount] = promtscount entries[metrickeycount] = promtscount
} }
labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1) extraLabel := prompb.Label{
copy(labels, commonLabels)
labels = append(labels, prompb.Label{
Name: "le", Name: "le",
Value: "+Inf", Value: "+Inf",
}) }
metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(0), metric.Time()) metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(0), metric.Time(), extraLabel)
if _, ok = entries[metrickeyinf]; !ok { if _, ok = entries[metrickeyinf]; !ok {
entries[metrickeyinf] = promtsinf entries[metrickeyinf] = promtsinf
} }
@ -111,20 +112,18 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
continue continue
} }
labels = make([]prompb.Label, len(commonLabels), len(commonLabels)+1) extraLabel = prompb.Label{
copy(labels, commonLabels)
labels = append(labels, prompb.Label{
Name: "le", Name: "le",
Value: fmt.Sprint(bound), Value: fmt.Sprint(bound),
}) }
metrickey, promts = getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time()) metrickey, promts = getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time(), extraLabel)
case strings.HasSuffix(field.Key, "_sum"): case strings.HasSuffix(field.Key, "_sum"):
sum, ok := prometheus.SampleSum(field.Value) sum, ok := prometheus.SampleSum(field.Value)
if !ok { if !ok {
continue continue
} }
metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, sum, metric.Time()) metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), labels, sum, metric.Time())
case strings.HasSuffix(field.Key, "_count"): case strings.HasSuffix(field.Key, "_count"):
count, ok := prometheus.SampleCount(field.Value) count, ok := prometheus.SampleCount(field.Value)
if !ok { if !ok {
@ -132,18 +131,16 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
} }
// if no bucket generate +Inf entry // if no bucket generate +Inf entry
labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1) extraLabel := prompb.Label{
copy(labels, commonLabels)
labels = append(labels, prompb.Label{
Name: "le", Name: "le",
Value: "+Inf", Value: "+Inf",
}) }
metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time()) metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time(), extraLabel)
if minf, ok := entries[metrickeyinf]; !ok || minf.Samples[0].Value == 0 { if minf, ok := entries[metrickeyinf]; !ok || minf.Samples[0].Value == 0 {
entries[metrickeyinf] = promtsinf entries[metrickeyinf] = promtsinf
} }
metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(count), metric.Time()) metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), labels, float64(count), metric.Time())
default: default:
continue continue
} }
@ -155,14 +152,14 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
continue continue
} }
metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, sum, metric.Time()) metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), labels, sum, metric.Time())
case strings.HasSuffix(field.Key, "_count"): case strings.HasSuffix(field.Key, "_count"):
count, ok := prometheus.SampleCount(field.Value) count, ok := prometheus.SampleCount(field.Value)
if !ok { if !ok {
continue continue
} }
metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(count), metric.Time()) metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), labels, float64(count), metric.Time())
default: default:
quantileTag, ok := metric.GetTag("quantile") quantileTag, ok := metric.GetTag("quantile")
if !ok { if !ok {
@ -177,13 +174,11 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
continue continue
} }
labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1) extraLabel := prompb.Label{
copy(labels, commonLabels)
labels = append(labels, prompb.Label{
Name: "quantile", Name: "quantile",
Value: fmt.Sprint(quantile), Value: fmt.Sprint(quantile),
}) }
metrickey, promts = getPromTS(metricName, labels, value, metric.Time()) metrickey, promts = getPromTS(metricName, labels, value, metric.Time(), extraLabel)
} }
default: default:
return nil, fmt.Errorf("unknown type %v", metric.Type()) return nil, fmt.Errorf("unknown type %v", metric.Type())
@ -252,8 +247,7 @@ func hasLabel(name string, labels []prompb.Label) bool {
return false return false
} }
func (s *Serializer) createLabels(metric telegraf.Metric) []prompb.Label { func (s *Serializer) appendCommonLabels(labels []prompb.Label, metric telegraf.Metric) []prompb.Label {
labels := make([]prompb.Label, 0, len(metric.TagList()))
for _, tag := range metric.TagList() { for _, tag := range metric.TagList() {
// Ignore special tags for histogram and summary types. // Ignore special tags for histogram and summary types.
switch metric.Type() { switch metric.Type() {
@ -284,7 +278,6 @@ func (s *Serializer) createLabels(metric telegraf.Metric) []prompb.Label {
return labels return labels
} }
addedFieldLabel := false
for _, field := range metric.FieldList() { for _, field := range metric.FieldList() {
value, ok := field.Value.(string) value, ok := field.Value.(string)
if !ok { if !ok {
@ -303,13 +296,6 @@ func (s *Serializer) createLabels(metric telegraf.Metric) []prompb.Label {
} }
labels = append(labels, prompb.Label{Name: name, Value: value}) labels = append(labels, prompb.Label{Name: name, Value: value})
addedFieldLabel = true
}
if addedFieldLabel {
sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})
} }
return labels return labels
@ -326,23 +312,33 @@ func MakeMetricKey(labels []prompb.Label) MetricKey {
return MetricKey(h.Sum64()) return MetricKey(h.Sum64())
} }
func getPromTS(name string, labels []prompb.Label, value float64, ts time.Time) (MetricKey, prompb.TimeSeries) { func getPromTS(name string, labels []prompb.Label, value float64, ts time.Time, extraLabels ...prompb.Label) (MetricKey, prompb.TimeSeries) {
labelscopy := make([]prompb.Label, len(labels), len(labels)+1)
copy(labelscopy, labels)
sample := []prompb.Sample{{ sample := []prompb.Sample{{
// Timestamp is int milliseconds for remote write. // Timestamp is int milliseconds for remote write.
Timestamp: ts.UnixNano() / int64(time.Millisecond), Timestamp: ts.UnixNano() / int64(time.Millisecond),
Value: value, Value: value,
}} }}
labelscopy := make([]prompb.Label, len(labels), len(labels)+1) labelscopy = append(labelscopy, extraLabels...)
copy(labelscopy, labels) labelscopy = append(labelscopy, prompb.Label{
labels = append(labelscopy, prompb.Label{
Name: "__name__", Name: "__name__",
Value: name, Value: name,
}) })
// we sort the labels since Prometheus TSDB does not like out of order labels // we sort the labels since Prometheus TSDB does not like out of order labels
sort.Slice(labels, func(i, j int) bool { sort.Sort(sortableLabels(labelscopy))
return labels[i].Name < labels[j].Name
})
return MakeMetricKey(labels), prompb.TimeSeries{Labels: labels, Samples: sample} return MakeMetricKey(labelscopy), prompb.TimeSeries{Labels: labelscopy, Samples: sample}
}
type sortableLabels []prompb.Label
func (sl sortableLabels) Len() int { return len(sl) }
func (sl sortableLabels) Less(i, j int) bool {
return sl[i].Name < sl[j].Name
}
func (sl sortableLabels) Swap(i, j int) {
sl[i], sl[j] = sl[j], sl[i]
} }

View File

@ -16,6 +16,28 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
func BenchmarkRemoteWrite(b *testing.B) {
batch := make([]telegraf.Metric, 1000)
for i := range batch {
batch[i] = testutil.MustMetric(
"cpu",
map[string]string{
"host": "example.org",
"C": "D",
"A": "B",
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
)
}
s := NewSerializer(FormatConfig{})
for n := 0; n < b.N; n++ {
_, _ = s.SerializeBatch(batch)
}
}
func TestRemoteWriteSerialize(t *testing.T) { func TestRemoteWriteSerialize(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -23,6 +45,36 @@ func TestRemoteWriteSerialize(t *testing.T) {
metric telegraf.Metric metric telegraf.Metric
expected []byte expected []byte
}{ }{
// the only way that we can produce an empty metric name is if the
// metric is called "prometheus" and has no fields.
{
name: "empty name is skipped",
metric: testutil.MustMetric(
"prometheus",
map[string]string{
"host": "example.org",
},
map[string]interface{}{},
time.Unix(0, 0),
),
expected: []byte(``),
},
{
name: "empty labels are skipped",
metric: testutil.MustMetric(
"cpu",
map[string]string{
"": "example.org",
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
expected: []byte(`
cpu_time_idle 42
`),
},
{ {
name: "simple", name: "simple",
metric: testutil.MustMetric( metric: testutil.MustMetric(