feat(inputs.statsd): Add optional temporality and start_time tag for statsd metrics (#13087)

This commit is contained in:
Povilas Versockas 2023-05-02 20:19:30 +03:00 committed by GitHub
parent 9284bdabf0
commit 92b6d96486
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 0 deletions

View File

@ -56,6 +56,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Reset timings & histograms every interval (default=true)
delete_timings = true
## Enable aggregation temporality adds temporality=delta or temporality=commulative tag, and
## start_time field, which adds the start time of the metric accumulation.
## You should use this when using OpenTelemetry output.
# enable_aggregation_temporality = false
## Percentiles to calculate for timing & histogram stats.
percentiles = [50.0, 90.0, 99.0, 99.9, 99.95, 100.0]

View File

@ -29,6 +29,11 @@
## Reset timings & histograms every interval (default=true)
delete_timings = true
## Enable aggregation temporality adds temporality=delta or temporality=commulative tag, and
## start_time field, which adds the start time of the metric accumulation.
## You should use this when using OpenTelemetry output.
# enable_aggregation_temporality = false
## Percentiles to calculate for timing & histogram stats.
percentiles = [50.0, 90.0, 99.0, 99.9, 99.95, 100.0]

View File

@ -77,6 +77,8 @@ type Statsd struct {
DeleteTimings bool `toml:"delete_timings"`
ConvertNames bool `toml:"convert_names" deprecated:"0.12.0;2.0.0;use 'metric_separator' instead"`
EnableAggregationTemporality bool `toml:"enable_aggregation_temporality"`
// MetricSeparator is the separator between parts of the metric name.
MetricSeparator string `toml:"metric_separator"`
// This flag enables parsing of tags in the dogstatsd extension to the
@ -156,6 +158,8 @@ type Statsd struct {
UDPBytesRecv selfstat.Stat
ParseTimeNS selfstat.Stat
PendingMessages selfstat.Stat
lastGatherTime time.Time
}
type input struct {
@ -226,6 +230,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
fields := map[string]interface{}{
defaultFieldName: m.value,
}
if s.EnableAggregationTemporality {
fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
}
acc.AddFields(m.name, fields, m.tags, now)
}
s.distributions = make([]cacheddistributions, 0)
@ -252,6 +259,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
fields[name] = stats.Percentile(float64(percentile))
}
}
if s.EnableAggregationTemporality {
fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
}
acc.AddFields(m.name, fields, m.tags, now)
}
@ -260,6 +270,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
}
for _, m := range s.gauges {
if s.EnableAggregationTemporality && m.fields != nil {
m.fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
}
acc.AddGauge(m.name, m.fields, m.tags, now)
}
if s.DeleteGauges {
@ -267,6 +281,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
}
for _, m := range s.counters {
if s.EnableAggregationTemporality && m.fields != nil {
m.fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
}
acc.AddCounter(m.name, m.fields, m.tags, now)
}
if s.DeleteCounters {
@ -278,6 +296,10 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
for field, set := range m.fields {
fields[field] = int64(len(set))
}
if s.EnableAggregationTemporality {
fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
}
acc.AddFields(m.name, fields, m.tags, now)
}
if s.DeleteSets {
@ -286,6 +308,7 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
s.expireCachedMetrics()
s.lastGatherTime = now
return nil
}
@ -297,6 +320,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.acc = ac
// Make data structures
s.lastGatherTime = time.Now()
s.gauges = make(map[string]cachedgauge)
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
@ -305,6 +329,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.Lock()
defer s.Unlock()
//
tags := map[string]string{
"address": s.ServiceAddress,
@ -644,6 +669,14 @@ func (s *Statsd) parseStatsdLine(line string) error {
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
if s.EnableAggregationTemporality {
if s.DeleteCounters {
m.tags["temporality"] = "delta"
} else {
m.tags["temporality"] = "cumulative"
}
}
case "g":
m.tags["metric_type"] = "gauge"
case "s":

View File

@ -1995,3 +1995,67 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) {
require.NoError(t, conn.Close())
}
func TestParse_DeltaCounter(t *testing.T) {
statsd := Statsd{
Log: testutil.Logger{},
Protocol: "tcp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
TCPKeepAlive: true,
NumberWorkerThreads: 5,
// Delete Counters causes Delta temporality to be added
DeleteCounters: true,
lastGatherTime: time.Now(),
EnableAggregationTemporality: true,
}
acc := &testutil.Accumulator{}
require.NoError(t, statsd.Start(acc))
defer statsd.Stop()
addr := statsd.TCPlistener.Addr().String()
conn, err := net.Dial("tcp", addr)
require.NoError(t, err)
_, err = conn.Write([]byte("cpu.time_idle:42|c\n"))
require.NoError(t, err)
require.Eventuallyf(t, func() bool {
require.NoError(t, statsd.Gather(acc))
acc.Lock()
defer acc.Unlock()
fmt.Println(acc.NMetrics())
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"metric_type": "counter",
"temporality": "delta",
},
map[string]interface{}{
"value": 42,
},
time.Now(),
telegraf.Counter,
),
}
got := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, got, testutil.IgnoreTime(), testutil.IgnoreFields("start_time"))
startTime, ok := got[0].GetField("start_time")
require.True(t, ok, "expected start_time field")
startTimeStr, ok := startTime.(string)
require.True(t, ok, "expected start_time field to be a string")
_, err = time.Parse(time.RFC3339, startTimeStr)
require.NoError(t, err, "execpted start_time field to be in RFC3339 format")
return acc.NMetrics() >= 1
}, time.Second, 100*time.Millisecond, "Expected 1 metric found %d", acc.NMetrics())
require.NoError(t, conn.Close())
}