Add configurable Max TTL duration for statsd input plugin entries (#8509)

* Adding max TTL duration for all metric caches in the statsd input plugin

* Update README.md

was missing type in readme
This commit is contained in:
David Bennett 2020-12-04 14:39:00 -05:00 committed by GitHub
parent 7c5754ef8d
commit 2187baceea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 156 additions and 38 deletions

View File

@ -68,6 +68,9 @@
## Maximum socket buffer size in bytes, once the buffer fills up, metrics ## Maximum socket buffer size in bytes, once the buffer fills up, metrics
## will start dropping. Defaults to the OS default. ## will start dropping. Defaults to the OS default.
# read_buffer_size = 65535 # read_buffer_size = 65535
## Max duration (TTL) for each metric to stay cached/reported without being updated.
# max_ttl = "10h"
``` ```
### Description ### Description
@ -192,6 +195,7 @@ the accuracy of percentiles but also increases the memory usage and cpu time.
measurements and tags. measurements and tags.
- **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) - **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/)
- **datadog_extensions** boolean: Enable parsing of DataDog's extensions to dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) - **datadog_extensions** boolean: Enable parsing of DataDog's extensions to dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/)
- **max_ttl** config.Duration: Max duration (TTL) for each metric to stay cached/reported without being updated.
### Statsd bucket -> InfluxDB line-protocol Templates ### Statsd bucket -> InfluxDB line-protocol Templates

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
@ -117,6 +118,9 @@ type Statsd struct {
TCPKeepAlive bool `toml:"tcp_keep_alive"` TCPKeepAlive bool `toml:"tcp_keep_alive"`
TCPKeepAlivePeriod *internal.Duration `toml:"tcp_keep_alive_period"` TCPKeepAlivePeriod *internal.Duration `toml:"tcp_keep_alive_period"`
// Max duration for each metric to stay cached without being updated.
MaxTTL config.Duration `toml:"max_ttl"`
graphiteParser *graphite.GraphiteParser graphiteParser *graphite.GraphiteParser
acc telegraf.Accumulator acc telegraf.Accumulator
@ -131,7 +135,7 @@ type Statsd struct {
UDPBytesRecv selfstat.Stat UDPBytesRecv selfstat.Stat
ParseTimeNS selfstat.Stat ParseTimeNS selfstat.Stat
Log telegraf.Logger Log telegraf.Logger `toml:"-"`
// A pool of byte slices to handle parsing // A pool of byte slices to handle parsing
bufPool sync.Pool bufPool sync.Pool
@ -159,27 +163,31 @@ type metric struct {
} }
type cachedset struct { type cachedset struct {
name string name string
fields map[string]map[string]bool fields map[string]map[string]bool
tags map[string]string tags map[string]string
expiresAt time.Time
} }
type cachedgauge struct { type cachedgauge struct {
name string name string
fields map[string]interface{} fields map[string]interface{}
tags map[string]string tags map[string]string
expiresAt time.Time
} }
type cachedcounter struct { type cachedcounter struct {
name string name string
fields map[string]interface{} fields map[string]interface{}
tags map[string]string tags map[string]string
expiresAt time.Time
} }
type cachedtimings struct { type cachedtimings struct {
name string name string
fields map[string]RunningStats fields map[string]RunningStats
tags map[string]string tags map[string]string
expiresAt time.Time
} }
func (_ *Statsd) Description() string { func (_ *Statsd) Description() string {
@ -243,6 +251,9 @@ const sampleConfig = `
## calculation of percentiles. Raising this limit increases the accuracy ## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time. ## of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000 percentile_limit = 1000
## Max duration (TTL) for each metric to stay cached/reported without being updated.
#max_ttl = "1000h"
` `
func (_ *Statsd) SampleConfig() string { func (_ *Statsd) SampleConfig() string {
@ -306,6 +317,9 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
if s.DeleteSets { if s.DeleteSets {
s.sets = make(map[string]cachedset) s.sets = make(map[string]cachedset)
} }
s.expireCachedMetrics()
return nil return nil
} }
@ -527,9 +541,6 @@ func (s *Statsd) parser() error {
// parseStatsdLine will parse the given statsd line, validating it as it goes. // parseStatsdLine will parse the given statsd line, validating it as it goes.
// If the line is valid, it will be cached for the next call to Gather() // If the line is valid, it will be cached for the next call to Gather()
func (s *Statsd) parseStatsdLine(line string) error { func (s *Statsd) parseStatsdLine(line string) error {
s.Lock()
defer s.Unlock()
lineTags := make(map[string]string) lineTags := make(map[string]string)
if s.DataDogExtensions { if s.DataDogExtensions {
recombinedSegments := make([]string, 0) recombinedSegments := make([]string, 0)
@ -734,6 +745,9 @@ func parseKeyValue(keyvalue string) (string, string) {
// aggregates and caches the current value(s). It does not deal with the // aggregates and caches the current value(s). It does not deal with the
// Delete* options, because those are dealt with in the Gather function. // Delete* options, because those are dealt with in the Gather function.
func (s *Statsd) aggregate(m metric) { func (s *Statsd) aggregate(m metric) {
s.Lock()
defer s.Unlock()
switch m.mtype { switch m.mtype {
case "ms", "h": case "ms", "h":
// Check if the measurement exists // Check if the measurement exists
@ -761,61 +775,67 @@ func (s *Statsd) aggregate(m metric) {
field.AddValue(m.floatvalue) field.AddValue(m.floatvalue)
} }
cached.fields[m.field] = field cached.fields[m.field] = field
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.timings[m.hash] = cached s.timings[m.hash] = cached
case "c": case "c":
// check if the measurement exists // check if the measurement exists
_, ok := s.counters[m.hash] cached, ok := s.counters[m.hash]
if !ok { if !ok {
s.counters[m.hash] = cachedcounter{ cached = cachedcounter{
name: m.name, name: m.name,
fields: make(map[string]interface{}), fields: make(map[string]interface{}),
tags: m.tags, tags: m.tags,
} }
} }
// check if the field exists // check if the field exists
_, ok = s.counters[m.hash].fields[m.field] _, ok = cached.fields[m.field]
if !ok { if !ok {
s.counters[m.hash].fields[m.field] = int64(0) cached.fields[m.field] = int64(0)
} }
s.counters[m.hash].fields[m.field] = cached.fields[m.field] = cached.fields[m.field].(int64) + m.intvalue
s.counters[m.hash].fields[m.field].(int64) + m.intvalue cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.counters[m.hash] = cached
case "g": case "g":
// check if the measurement exists // check if the measurement exists
_, ok := s.gauges[m.hash] cached, ok := s.gauges[m.hash]
if !ok { if !ok {
s.gauges[m.hash] = cachedgauge{ cached = cachedgauge{
name: m.name, name: m.name,
fields: make(map[string]interface{}), fields: make(map[string]interface{}),
tags: m.tags, tags: m.tags,
} }
} }
// check if the field exists // check if the field exists
_, ok = s.gauges[m.hash].fields[m.field] _, ok = cached.fields[m.field]
if !ok { if !ok {
s.gauges[m.hash].fields[m.field] = float64(0) cached.fields[m.field] = float64(0)
} }
if m.additive { if m.additive {
s.gauges[m.hash].fields[m.field] = cached.fields[m.field] = cached.fields[m.field].(float64) + m.floatvalue
s.gauges[m.hash].fields[m.field].(float64) + m.floatvalue
} else { } else {
s.gauges[m.hash].fields[m.field] = m.floatvalue cached.fields[m.field] = m.floatvalue
} }
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.gauges[m.hash] = cached
case "s": case "s":
// check if the measurement exists // check if the measurement exists
_, ok := s.sets[m.hash] cached, ok := s.sets[m.hash]
if !ok { if !ok {
s.sets[m.hash] = cachedset{ cached = cachedset{
name: m.name, name: m.name,
fields: make(map[string]map[string]bool), fields: make(map[string]map[string]bool),
tags: m.tags, tags: m.tags,
} }
} }
// check if the field exists // check if the field exists
_, ok = s.sets[m.hash].fields[m.field] _, ok = cached.fields[m.field]
if !ok { if !ok {
s.sets[m.hash].fields[m.field] = make(map[string]bool) cached.fields[m.field] = make(map[string]bool)
} }
s.sets[m.hash].fields[m.field][m.strvalue] = true cached.fields[m.field][m.strvalue] = true
cached.expiresAt = time.Now().Add(time.Duration(s.MaxTTL))
s.sets[m.hash] = cached
} }
} }
@ -932,6 +952,39 @@ func (s *Statsd) isUDP() bool {
return strings.HasPrefix(s.Protocol, "udp") return strings.HasPrefix(s.Protocol, "udp")
} }
func (s *Statsd) expireCachedMetrics() {
// If Max TTL wasn't configured, skip expiration.
if s.MaxTTL == 0 {
return
}
now := time.Now()
for key, cached := range s.gauges {
if now.After(cached.expiresAt) {
delete(s.gauges, key)
}
}
for key, cached := range s.sets {
if now.After(cached.expiresAt) {
delete(s.sets, key)
}
}
for key, cached := range s.timings {
if now.After(cached.expiresAt) {
delete(s.timings, key)
}
}
for key, cached := range s.counters {
if now.After(cached.expiresAt) {
delete(s.counters, key)
}
}
}
func init() { func init() {
inputs.Add("statsd", func() telegraf.Input { inputs.Add("statsd", func() telegraf.Input {
return &Statsd{ return &Statsd{

View File

@ -2,15 +2,17 @@ package statsd
import ( import (
"fmt" "fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net" "net"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -1077,6 +1079,65 @@ func TestParse_MeasurementsWithSameName(t *testing.T) {
} }
} }
// Test that the metric caches expire (clear) an entry after the entry hasn't been updated for the configurable MaxTTL duration.
func TestCachesExpireAfterMaxTTL(t *testing.T) {
s := NewTestStatsd()
s.MaxTTL = config.Duration(100 * time.Microsecond)
acc := &testutil.Accumulator{}
s.parseStatsdLine("valid:45|c")
s.parseStatsdLine("valid:45|c")
require.NoError(t, s.Gather(acc))
// Max TTL goes by, our 'valid' entry is cleared.
time.Sleep(100 * time.Microsecond)
require.NoError(t, s.Gather(acc))
// Now when we gather, we should have a counter that is reset to zero.
s.parseStatsdLine("valid:45|c")
require.NoError(t, s.Gather(acc))
testutil.RequireMetricsEqual(t,
[]telegraf.Metric{
testutil.MustMetric(
"valid",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 90,
},
time.Now(),
telegraf.Counter,
),
testutil.MustMetric(
"valid",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 90,
},
time.Now(),
telegraf.Counter,
),
testutil.MustMetric(
"valid",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 45,
},
time.Now(),
telegraf.Counter,
),
},
acc.GetTelegrafMetrics(),
testutil.IgnoreTime(),
)
}
// Test that measurements with multiple bits, are treated as different outputs // Test that measurements with multiple bits, are treated as different outputs
// but are equal to their single-measurement representation // but are equal to their single-measurement representation
func TestParse_MeasurementsWithMultipleValues(t *testing.T) { func TestParse_MeasurementsWithMultipleValues(t *testing.T) {