diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index 2284be291..cdd159c18 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -56,6 +56,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Reset timings & histograms every interval (default=true) delete_timings = true + ## Enable aggregation temporality adds temporality=delta or temporality=commulative tag, and + ## start_time field, which adds the start time of the metric accumulation. + ## You should use this when using OpenTelemetry output. + # enable_aggregation_temporality = false + ## Percentiles to calculate for timing & histogram stats. percentiles = [50.0, 90.0, 99.0, 99.9, 99.95, 100.0] diff --git a/plugins/inputs/statsd/sample.conf b/plugins/inputs/statsd/sample.conf index 8b3da152e..897324d0b 100644 --- a/plugins/inputs/statsd/sample.conf +++ b/plugins/inputs/statsd/sample.conf @@ -29,6 +29,11 @@ ## Reset timings & histograms every interval (default=true) delete_timings = true + ## Enable aggregation temporality adds temporality=delta or temporality=commulative tag, and + ## start_time field, which adds the start time of the metric accumulation. + ## You should use this when using OpenTelemetry output. + # enable_aggregation_temporality = false + ## Percentiles to calculate for timing & histogram stats. percentiles = [50.0, 90.0, 99.0, 99.9, 99.95, 100.0] diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 0eab2bafa..7d2817c44 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -77,6 +77,8 @@ type Statsd struct { DeleteTimings bool `toml:"delete_timings"` ConvertNames bool `toml:"convert_names" deprecated:"0.12.0;2.0.0;use 'metric_separator' instead"` + EnableAggregationTemporality bool `toml:"enable_aggregation_temporality"` + // MetricSeparator is the separator between parts of the metric name. MetricSeparator string `toml:"metric_separator"` // This flag enables parsing of tags in the dogstatsd extension to the @@ -156,6 +158,8 @@ type Statsd struct { UDPBytesRecv selfstat.Stat ParseTimeNS selfstat.Stat PendingMessages selfstat.Stat + + lastGatherTime time.Time } type input struct { @@ -226,6 +230,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { fields := map[string]interface{}{ defaultFieldName: m.value, } + if s.EnableAggregationTemporality { + fields["start_time"] = s.lastGatherTime.Format(time.RFC3339) + } acc.AddFields(m.name, fields, m.tags, now) } s.distributions = make([]cacheddistributions, 0) @@ -252,6 +259,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { fields[name] = stats.Percentile(float64(percentile)) } } + if s.EnableAggregationTemporality { + fields["start_time"] = s.lastGatherTime.Format(time.RFC3339) + } acc.AddFields(m.name, fields, m.tags, now) } @@ -260,6 +270,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { } for _, m := range s.gauges { + if s.EnableAggregationTemporality && m.fields != nil { + m.fields["start_time"] = s.lastGatherTime.Format(time.RFC3339) + } + acc.AddGauge(m.name, m.fields, m.tags, now) } if s.DeleteGauges { @@ -267,6 +281,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { } for _, m := range s.counters { + if s.EnableAggregationTemporality && m.fields != nil { + m.fields["start_time"] = s.lastGatherTime.Format(time.RFC3339) + } + acc.AddCounter(m.name, m.fields, m.tags, now) } if s.DeleteCounters { @@ -278,6 +296,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { for field, set := range m.fields { fields[field] = int64(len(set)) } + if s.EnableAggregationTemporality { + fields["start_time"] = s.lastGatherTime.Format(time.RFC3339) + } + acc.AddFields(m.name, fields, m.tags, now) } if s.DeleteSets { @@ -286,6 +308,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { s.expireCachedMetrics() + s.lastGatherTime = now return nil } @@ -297,6 +320,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { s.acc = ac // Make data structures + s.lastGatherTime = time.Now() s.gauges = make(map[string]cachedgauge) s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) @@ -305,6 +329,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { s.Lock() defer s.Unlock() + // tags := map[string]string{ "address": s.ServiceAddress, @@ -644,6 +669,14 @@ func (s *Statsd) parseStatsdLine(line string) error { switch m.mtype { case "c": m.tags["metric_type"] = "counter" + + if s.EnableAggregationTemporality { + if s.DeleteCounters { + m.tags["temporality"] = "delta" + } else { + m.tags["temporality"] = "cumulative" + } + } case "g": m.tags["metric_type"] = "gauge" case "s": diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index f1d2c0099..f6d2358d3 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -1995,3 +1995,67 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) { require.NoError(t, conn.Close()) } + +func TestParse_DeltaCounter(t *testing.T) { + statsd := Statsd{ + Log: testutil.Logger{}, + Protocol: "tcp", + ServiceAddress: "localhost:8125", + AllowedPendingMessages: 10000, + MaxTCPConnections: 250, + TCPKeepAlive: true, + NumberWorkerThreads: 5, + // Delete Counters causes Delta temporality to be added + DeleteCounters: true, + lastGatherTime: time.Now(), + EnableAggregationTemporality: true, + } + + acc := &testutil.Accumulator{} + require.NoError(t, statsd.Start(acc)) + defer statsd.Stop() + + addr := statsd.TCPlistener.Addr().String() + conn, err := net.Dial("tcp", addr) + require.NoError(t, err) + + _, err = conn.Write([]byte("cpu.time_idle:42|c\n")) + require.NoError(t, err) + + require.Eventuallyf(t, func() bool { + require.NoError(t, statsd.Gather(acc)) + acc.Lock() + defer acc.Unlock() + + fmt.Println(acc.NMetrics()) + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu_time_idle", + map[string]string{ + "metric_type": "counter", + "temporality": "delta", + }, + map[string]interface{}{ + "value": 42, + }, + time.Now(), + telegraf.Counter, + ), + } + got := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, got, testutil.IgnoreTime(), testutil.IgnoreFields("start_time")) + + startTime, ok := got[0].GetField("start_time") + require.True(t, ok, "expected start_time field") + + startTimeStr, ok := startTime.(string) + require.True(t, ok, "expected start_time field to be a string") + + _, err = time.Parse(time.RFC3339, startTimeStr) + require.NoError(t, err, "execpted start_time field to be in RFC3339 format") + + return acc.NMetrics() >= 1 + }, time.Second, 100*time.Millisecond, "Expected 1 metric found %d", acc.NMetrics()) + + require.NoError(t, conn.Close()) +}