From 92b6d96486f28577c355f23966f8319b70d2024d Mon Sep 17 00:00:00 2001
From: Povilas Versockas
Date: Tue, 2 May 2023 20:19:30 +0300
Subject: [PATCH] feat(inputs.statsd): Add optional temporality and start_time
tag for statsd metrics (#13087)
---
plugins/inputs/statsd/README.md | 5 +++
plugins/inputs/statsd/sample.conf | 5 +++
plugins/inputs/statsd/statsd.go | 33 ++++++++++++++
plugins/inputs/statsd/statsd_test.go | 64 ++++++++++++++++++++++++++++
4 files changed, 107 insertions(+)
diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md
index 2284be291..cdd159c18 100644
--- a/plugins/inputs/statsd/README.md
+++ b/plugins/inputs/statsd/README.md
@@ -56,6 +56,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Reset timings & histograms every interval (default=true)
delete_timings = true
+ ## Enable aggregation temporality adds temporality=delta or temporality=commulative tag, and
+ ## start_time field, which adds the start time of the metric accumulation.
+ ## You should use this when using OpenTelemetry output.
+ # enable_aggregation_temporality = false
+
## Percentiles to calculate for timing & histogram stats.
percentiles = [50.0, 90.0, 99.0, 99.9, 99.95, 100.0]
diff --git a/plugins/inputs/statsd/sample.conf b/plugins/inputs/statsd/sample.conf
index 8b3da152e..897324d0b 100644
--- a/plugins/inputs/statsd/sample.conf
+++ b/plugins/inputs/statsd/sample.conf
@@ -29,6 +29,11 @@
## Reset timings & histograms every interval (default=true)
delete_timings = true
+ ## Enable aggregation temporality adds temporality=delta or temporality=commulative tag, and
+ ## start_time field, which adds the start time of the metric accumulation.
+ ## You should use this when using OpenTelemetry output.
+ # enable_aggregation_temporality = false
+
## Percentiles to calculate for timing & histogram stats.
percentiles = [50.0, 90.0, 99.0, 99.9, 99.95, 100.0]
diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go
index 0eab2bafa..7d2817c44 100644
--- a/plugins/inputs/statsd/statsd.go
+++ b/plugins/inputs/statsd/statsd.go
@@ -77,6 +77,8 @@ type Statsd struct {
DeleteTimings bool `toml:"delete_timings"`
ConvertNames bool `toml:"convert_names" deprecated:"0.12.0;2.0.0;use 'metric_separator' instead"`
+ EnableAggregationTemporality bool `toml:"enable_aggregation_temporality"`
+
// MetricSeparator is the separator between parts of the metric name.
MetricSeparator string `toml:"metric_separator"`
// This flag enables parsing of tags in the dogstatsd extension to the
@@ -156,6 +158,8 @@ type Statsd struct {
UDPBytesRecv selfstat.Stat
ParseTimeNS selfstat.Stat
PendingMessages selfstat.Stat
+
+ lastGatherTime time.Time
}
type input struct {
@@ -226,6 +230,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
fields := map[string]interface{}{
defaultFieldName: m.value,
}
+ if s.EnableAggregationTemporality {
+ fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
+ }
acc.AddFields(m.name, fields, m.tags, now)
}
s.distributions = make([]cacheddistributions, 0)
@@ -252,6 +259,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
fields[name] = stats.Percentile(float64(percentile))
}
}
+ if s.EnableAggregationTemporality {
+ fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
+ }
acc.AddFields(m.name, fields, m.tags, now)
}
@@ -260,6 +270,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
}
for _, m := range s.gauges {
+ if s.EnableAggregationTemporality && m.fields != nil {
+ m.fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
+ }
+
acc.AddGauge(m.name, m.fields, m.tags, now)
}
if s.DeleteGauges {
@@ -267,6 +281,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
}
for _, m := range s.counters {
+ if s.EnableAggregationTemporality && m.fields != nil {
+ m.fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
+ }
+
acc.AddCounter(m.name, m.fields, m.tags, now)
}
if s.DeleteCounters {
@@ -278,6 +296,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
for field, set := range m.fields {
fields[field] = int64(len(set))
}
+ if s.EnableAggregationTemporality {
+ fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
+ }
+
acc.AddFields(m.name, fields, m.tags, now)
}
if s.DeleteSets {
@@ -286,6 +308,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
s.expireCachedMetrics()
+ s.lastGatherTime = now
return nil
}
@@ -297,6 +320,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.acc = ac
// Make data structures
+ s.lastGatherTime = time.Now()
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
@@ -305,6 +329,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.Lock()
defer s.Unlock()
+
//
tags := map[string]string{
"address": s.ServiceAddress,
@@ -644,6 +669,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
+
+ if s.EnableAggregationTemporality {
+ if s.DeleteCounters {
+ m.tags["temporality"] = "delta"
+ } else {
+ m.tags["temporality"] = "cumulative"
+ }
+ }
case "g":
m.tags["metric_type"] = "gauge"
case "s":
diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go
index f1d2c0099..f6d2358d3 100644
--- a/plugins/inputs/statsd/statsd_test.go
+++ b/plugins/inputs/statsd/statsd_test.go
@@ -1995,3 +1995,67 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) {
require.NoError(t, conn.Close())
}
+
+func TestParse_DeltaCounter(t *testing.T) {
+ statsd := Statsd{
+ Log: testutil.Logger{},
+ Protocol: "tcp",
+ ServiceAddress: "localhost:8125",
+ AllowedPendingMessages: 10000,
+ MaxTCPConnections: 250,
+ TCPKeepAlive: true,
+ NumberWorkerThreads: 5,
+ // Delete Counters causes Delta temporality to be added
+ DeleteCounters: true,
+ lastGatherTime: time.Now(),
+ EnableAggregationTemporality: true,
+ }
+
+ acc := &testutil.Accumulator{}
+ require.NoError(t, statsd.Start(acc))
+ defer statsd.Stop()
+
+ addr := statsd.TCPlistener.Addr().String()
+ conn, err := net.Dial("tcp", addr)
+ require.NoError(t, err)
+
+ _, err = conn.Write([]byte("cpu.time_idle:42|c\n"))
+ require.NoError(t, err)
+
+ require.Eventuallyf(t, func() bool {
+ require.NoError(t, statsd.Gather(acc))
+ acc.Lock()
+ defer acc.Unlock()
+
+ fmt.Println(acc.NMetrics())
+ expected := []telegraf.Metric{
+ testutil.MustMetric(
+ "cpu_time_idle",
+ map[string]string{
+ "metric_type": "counter",
+ "temporality": "delta",
+ },
+ map[string]interface{}{
+ "value": 42,
+ },
+ time.Now(),
+ telegraf.Counter,
+ ),
+ }
+ got := acc.GetTelegrafMetrics()
+ testutil.RequireMetricsEqual(t, expected, got, testutil.IgnoreTime(), testutil.IgnoreFields("start_time"))
+
+ startTime, ok := got[0].GetField("start_time")
+ require.True(t, ok, "expected start_time field")
+
+ startTimeStr, ok := startTime.(string)
+ require.True(t, ok, "expected start_time field to be a string")
+
+ _, err = time.Parse(time.RFC3339, startTimeStr)
+ require.NoError(t, err, "execpted start_time field to be in RFC3339 format")
+
+ return acc.NMetrics() >= 1
+ }, time.Second, 100*time.Millisecond, "Expected 1 metric found %d", acc.NMetrics())
+
+ require.NoError(t, conn.Close())
+}