feat(serializers.prometheusremotewrite): Log metric conversion errors (#15893)
This commit is contained in:
parent
135fca55e4
commit
7df29b0a32
|
|
@ -42,3 +42,4 @@ it is not included in the final metric name.
|
||||||
Prometheus labels are produced for each tag.
|
Prometheus labels are produced for each tag.
|
||||||
|
|
||||||
**Note:** String fields are ignored and do not produce Prometheus metrics.
|
**Note:** String fields are ignored and do not produce Prometheus metrics.
|
||||||
|
Set **log_level** to `trace` to see all serialization issues.
|
||||||
|
|
|
||||||
|
|
@ -20,8 +20,9 @@ import (
|
||||||
type MetricKey uint64
|
type MetricKey uint64
|
||||||
|
|
||||||
type Serializer struct {
|
type Serializer struct {
|
||||||
SortMetrics bool `toml:"prometheus_sort_metrics"`
|
SortMetrics bool `toml:"prometheus_sort_metrics"`
|
||||||
StringAsLabel bool `toml:"prometheus_string_as_label"`
|
StringAsLabel bool `toml:"prometheus_string_as_label"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
|
|
@ -29,8 +30,15 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
var buf bytes.Buffer
|
var lastErr error
|
||||||
|
// traceAndKeepErr logs on Trace level every passed error.
|
||||||
|
// with each call it updates lastErr, so it can be logged later with higher level.
|
||||||
|
traceAndKeepErr := func(format string, a ...any) {
|
||||||
|
lastErr = fmt.Errorf(format, a...)
|
||||||
|
s.Log.Trace(lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
var entries = make(map[MetricKey]prompb.TimeSeries)
|
var entries = make(map[MetricKey]prompb.TimeSeries)
|
||||||
var labels = make([]prompb.Label, 0)
|
var labels = make([]prompb.Label, 0)
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
|
|
@ -41,6 +49,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
metricName := prometheus.MetricName(metric.Name(), field.Key, metric.Type())
|
metricName := prometheus.MetricName(metric.Name(), field.Key, metric.Type())
|
||||||
metricName, ok := prometheus.SanitizeMetricName(metricName)
|
metricName, ok := prometheus.SanitizeMetricName(metricName)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
traceAndKeepErr("failed to parse metric name %q", metricName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -52,6 +61,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
case telegraf.Untyped:
|
case telegraf.Untyped:
|
||||||
value, ok := prometheus.SampleValue(field.Value)
|
value, ok := prometheus.SampleValue(field.Value)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
metrickey, promts = getPromTS(metricName, labels, value, metric.Time())
|
metrickey, promts = getPromTS(metricName, labels, value, metric.Time())
|
||||||
|
|
@ -78,14 +88,17 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
|
|
||||||
le, ok := metric.GetTag("le")
|
le, ok := metric.GetTag("le")
|
||||||
if !ok {
|
if !ok {
|
||||||
|
traceAndKeepErr("failed to parse %q: can't find `le` label", metricName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
bound, err := strconv.ParseFloat(le, 64)
|
bound, err := strconv.ParseFloat(le, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
traceAndKeepErr("failed to parse %q: can't parse %q value: %w", metricName, le, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
count, ok := prometheus.SampleCount(field.Value)
|
count, ok := prometheus.SampleCount(field.Value)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -97,6 +110,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
case strings.HasSuffix(field.Key, "_sum"):
|
case strings.HasSuffix(field.Key, "_sum"):
|
||||||
sum, ok := prometheus.SampleSum(field.Value)
|
sum, ok := prometheus.SampleSum(field.Value)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -104,6 +118,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
case strings.HasSuffix(field.Key, "_count"):
|
case strings.HasSuffix(field.Key, "_count"):
|
||||||
count, ok := prometheus.SampleCount(field.Value)
|
count, ok := prometheus.SampleCount(field.Value)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -119,6 +134,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
|
|
||||||
metrickey, promts = getPromTS(metricName+"_count", labels, float64(count), metric.Time())
|
metrickey, promts = getPromTS(metricName+"_count", labels, float64(count), metric.Time())
|
||||||
default:
|
default:
|
||||||
|
traceAndKeepErr("failed to parse %q: series %q should have `_count`, `_sum` or `_bucket` suffix", metricName, field.Key)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
case telegraf.Summary:
|
case telegraf.Summary:
|
||||||
|
|
@ -126,6 +142,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
case strings.HasSuffix(field.Key, "_sum"):
|
case strings.HasSuffix(field.Key, "_sum"):
|
||||||
sum, ok := prometheus.SampleSum(field.Value)
|
sum, ok := prometheus.SampleSum(field.Value)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -133,6 +150,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
case strings.HasSuffix(field.Key, "_count"):
|
case strings.HasSuffix(field.Key, "_count"):
|
||||||
count, ok := prometheus.SampleCount(field.Value)
|
count, ok := prometheus.SampleCount(field.Value)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -140,14 +158,17 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
default:
|
default:
|
||||||
quantileTag, ok := metric.GetTag("quantile")
|
quantileTag, ok := metric.GetTag("quantile")
|
||||||
if !ok {
|
if !ok {
|
||||||
|
traceAndKeepErr("failed to parse %q: can't find `quantile` label", metricName)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
quantile, err := strconv.ParseFloat(quantileTag, 64)
|
quantile, err := strconv.ParseFloat(quantileTag, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
traceAndKeepErr("failed to parse %q: can't parse %q value: %w", metricName, quantileTag, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
value, ok := prometheus.SampleValue(field.Value)
|
value, ok := prometheus.SampleValue(field.Value)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
traceAndKeepErr("failed to parse %q: bad sample value %#v", metricName, field.Value)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -162,11 +183,12 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// A batch of metrics can contain multiple values for a single
|
// A batch of metrics can contain multiple values for a single
|
||||||
// Prometheus sample. If this metric is older than the existing
|
// Prometheus sample. If this metric is older than the existing
|
||||||
// sample then we can skip over it.
|
// sample then we can skip over it.
|
||||||
m, ok := entries[metrickey]
|
m, ok := entries[metrickey]
|
||||||
if ok {
|
if ok {
|
||||||
if metric.Time().Before(time.Unix(0, m.Samples[0].Timestamp*1_000_000)) {
|
if metric.Time().Before(time.Unix(0, m.Samples[0].Timestamp*1_000_000)) {
|
||||||
|
traceAndKeepErr("metric %q has samples with timestamp %v older than already registered before", metric.Name(), metric.Time())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -174,6 +196,12 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if lastErr != nil {
|
||||||
|
// log only the last recorded error in the batch, as it could have many errors and logging each one
|
||||||
|
// could be too verbose. The following log line still provides enough info for user to act on.
|
||||||
|
s.Log.Errorf("some series were dropped, %d series left to send; last recorded error: %v", len(entries), lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
var promTS = make([]prompb.TimeSeries, len(entries))
|
var promTS = make([]prompb.TimeSeries, len(entries))
|
||||||
var i int
|
var i int
|
||||||
for _, promts := range entries {
|
for _, promts := range entries {
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ func BenchmarkRemoteWrite(b *testing.B) {
|
||||||
time.Unix(0, 0),
|
time.Unix(0, 0),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
s := &Serializer{}
|
s := &Serializer{Log: &testutil.CaptureLogger{}}
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
//nolint:errcheck // Benchmarking so skip the error check to avoid the unnecessary operations
|
//nolint:errcheck // Benchmarking so skip the error check to avoid the unnecessary operations
|
||||||
s.SerializeBatch(batch)
|
s.SerializeBatch(batch)
|
||||||
|
|
@ -188,6 +188,7 @@ http_request_duration_seconds_bucket{le="0.5"} 129389
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
s := &Serializer{
|
s := &Serializer{
|
||||||
|
Log: &testutil.CaptureLogger{},
|
||||||
SortMetrics: true,
|
SortMetrics: true,
|
||||||
}
|
}
|
||||||
data, err := s.Serialize(tt.metric)
|
data, err := s.Serialize(tt.metric)
|
||||||
|
|
@ -201,6 +202,83 @@ http_request_duration_seconds_bucket{le="0.5"} 129389
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRemoteWriteSerializeNegative(t *testing.T) {
|
||||||
|
clog := &testutil.CaptureLogger{}
|
||||||
|
s := &Serializer{Log: clog}
|
||||||
|
|
||||||
|
assert := func(msg string, err error) {
|
||||||
|
t.Helper()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
lastMsg := clog.LastError()
|
||||||
|
if lastMsg == "" {
|
||||||
|
t.Fatal("expected non-empty last message")
|
||||||
|
}
|
||||||
|
if !strings.Contains(lastMsg, msg) {
|
||||||
|
t.Fatalf("expected to have log message %q; got %q instead", msg, lastMsg)
|
||||||
|
}
|
||||||
|
// reset logger so it can be reused again
|
||||||
|
clog.Clear()
|
||||||
|
}
|
||||||
|
|
||||||
|
m := testutil.MustMetric("@@!!", nil, map[string]interface{}{"!!": "@@"}, time.Unix(0, 0))
|
||||||
|
_, err := s.Serialize(m)
|
||||||
|
assert("failed to parse metric name", err)
|
||||||
|
|
||||||
|
m = testutil.MustMetric("prometheus", nil,
|
||||||
|
map[string]interface{}{
|
||||||
|
"http_requests_total": "asd",
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
)
|
||||||
|
_, err = s.Serialize(m)
|
||||||
|
assert("bad sample", err)
|
||||||
|
|
||||||
|
m = testutil.MustMetric(
|
||||||
|
"prometheus",
|
||||||
|
map[string]string{
|
||||||
|
"le": "0.5",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"http_request_duration_seconds_bucket": "asd",
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
telegraf.Histogram,
|
||||||
|
)
|
||||||
|
_, err = s.Serialize(m)
|
||||||
|
assert("bad sample", err)
|
||||||
|
|
||||||
|
m = testutil.MustMetric(
|
||||||
|
"prometheus",
|
||||||
|
map[string]string{
|
||||||
|
"code": "400",
|
||||||
|
"method": "post",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"http_requests_total": 3.0,
|
||||||
|
"http_requests_errors_total": "3.0",
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
telegraf.Gauge,
|
||||||
|
)
|
||||||
|
_, err = s.Serialize(m)
|
||||||
|
assert("bad sample", err)
|
||||||
|
|
||||||
|
m = testutil.MustMetric(
|
||||||
|
"prometheus",
|
||||||
|
map[string]string{"quantile": "0.01a"},
|
||||||
|
map[string]interface{}{
|
||||||
|
"rpc_duration_seconds": 3102.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
telegraf.Summary,
|
||||||
|
)
|
||||||
|
_, err = s.Serialize(m)
|
||||||
|
assert("failed to parse", err)
|
||||||
|
}
|
||||||
|
|
||||||
func TestRemoteWriteSerializeBatch(t *testing.T) {
|
func TestRemoteWriteSerializeBatch(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
|
@ -679,6 +757,7 @@ rpc_duration_seconds_sum 17560473
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
s := &Serializer{
|
s := &Serializer{
|
||||||
|
Log: &testutil.CaptureLogger{},
|
||||||
SortMetrics: true,
|
SortMetrics: true,
|
||||||
StringAsLabel: tt.stringAsLabel,
|
StringAsLabel: tt.stringAsLabel,
|
||||||
}
|
}
|
||||||
|
|
@ -733,7 +812,7 @@ func protoToSamples(req *prompb.WriteRequest) model.Samples {
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkSerialize(b *testing.B) {
|
func BenchmarkSerialize(b *testing.B) {
|
||||||
s := &Serializer{}
|
s := &Serializer{Log: &testutil.CaptureLogger{}}
|
||||||
metrics := serializers.BenchmarkMetrics(b)
|
metrics := serializers.BenchmarkMetrics(b)
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
|
|
@ -743,7 +822,7 @@ func BenchmarkSerialize(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkSerializeBatch(b *testing.B) {
|
func BenchmarkSerializeBatch(b *testing.B) {
|
||||||
s := &Serializer{}
|
s := &Serializer{Log: &testutil.CaptureLogger{}}
|
||||||
m := serializers.BenchmarkMetrics(b)
|
m := serializers.BenchmarkMetrics(b)
|
||||||
metrics := m[:]
|
metrics := m[:]
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue