diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 1fe44afa3..d6e0b165c 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -7328,6 +7328,9 @@ # ## Parses datadog extensions to the statsd format # datadog_extensions = false # +# ## Parses distributions metric from datadog's extension to the statsd format +# datadog_distributions = false +# # ## Statsd data translation templates, more info can be read here: # ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md # # templates = [ diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index 26cbe2628..a302f4095 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -50,6 +50,10 @@ ## http://docs.datadoghq.com/guides/dogstatsd/ datadog_extensions = false + ## Parses distributions metric as specified in the datadog statsd format + ## https://docs.datadoghq.com/developers/metrics/types/?tab=distribution#definition + datadog_distributions = false + ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md # templates = [ @@ -98,6 +102,10 @@ implementation. In short, the telegraf statsd listener will accept: - `load.time:320|ms` - `load.time.nanoseconds:1|h` - `load.time:200|ms|@0.1` <- sampled 1/10 of the time +- Distributions + - `load.time:320|d` + - `load.time.nanoseconds:1|d` + - `load.time:200|d|@0.1` <- sampled 1/10 of the time It is possible to omit repetitive names and merge individual stats into a single line by separating them with additional colons: @@ -172,6 +180,9 @@ metric type: that `P%` of all the values statsd saw for that stat during that time period are below x. The most common value that people use for `P` is the `90`, this is a great number to try to optimize. +- Distributions + - The Distribution metric represents the global statistical distribution of a set of values calculated across your entire distributed infrastructure in one time interval. A Distribution can be used to instrument logical objects, like services, independently from the underlying hosts. + - Unlike the Histogram metric type, which aggregates on the Agent during a given time interval, a Distribution metric sends all the raw data during a time interval. ### Plugin arguments @@ -195,6 +206,7 @@ the accuracy of percentiles but also increases the memory usage and cpu time. measurements and tags. - **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) - **datadog_extensions** boolean: Enable parsing of DataDog's extensions to dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/) +- **datadog_distributions** boolean: Enable parsing of the Distribution metric in DataDog's dogstatsd format (https://docs.datadoghq.com/developers/metrics/types/?tab=distribution#definition) - **max_ttl** config.Duration: Max duration (TTL) for each metric to stay cached/reported without being updated. ### Statsd bucket -> InfluxDB line-protocol Templates diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index f74eb0ef4..a88fe847c 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -70,6 +70,11 @@ type Statsd struct { // http://docs.datadoghq.com/guides/dogstatsd/ DataDogExtensions bool `toml:"datadog_extensions"` + // Parses distribution metrics in the datadog statsd format. + // Requires the DataDogExtension flag to be enabled. + // https://docs.datadoghq.com/developers/metrics/types/?tab=distribution#definition + DataDogDistributions bool `toml:"datadog_distributions"` + // UDPPacketSize is deprecated, it's only here for legacy support // we now always create 1 max size buffer and then copy only what we need // into the in channel @@ -98,10 +103,12 @@ type Statsd struct { // Cache gauges, counters & sets so they can be aggregated as they arrive // gauges and counters map measurement/tags hash -> field name -> metrics // sets and timings map measurement/tags hash -> metrics - gauges map[string]cachedgauge - counters map[string]cachedcounter - sets map[string]cachedset - timings map[string]cachedtimings + // distributions aggregate measurement/tags and are published directly + gauges map[string]cachedgauge + counters map[string]cachedcounter + sets map[string]cachedset + timings map[string]cachedtimings + distributions []cacheddistributions // bucket -> influx templates Templates []string @@ -190,6 +197,12 @@ type cachedtimings struct { expiresAt time.Time } +type cacheddistributions struct { + name string + value float64 + tags map[string]string +} + func (_ *Statsd) Description() string { return "Statsd UDP/TCP Server" } @@ -237,6 +250,10 @@ const sampleConfig = ` ## Parses datadog extensions to the statsd format datadog_extensions = false + ## Parses distributions metric as specified in the datadog statsd format + ## https://docs.datadoghq.com/developers/metrics/types/?tab=distribution#definition + datadog_distributions = false + ## Statsd data translation templates, more info can be read here: ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md # templates = [ @@ -265,6 +282,14 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { defer s.Unlock() now := time.Now() + for _, m := range s.distributions { + fields := map[string]interface{}{ + defaultFieldName: m.value, + } + acc.AddFields(m.name, fields, m.tags, now) + } + s.distributions = make([]cacheddistributions, 0) + for _, m := range s.timings { // Defining a template to parse field names for timers allows us to split // out multiple fields per timer. In this case we prefix each stat with the @@ -336,6 +361,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) s.timings = make(map[string]cachedtimings) + s.distributions = make([]cacheddistributions, 0) s.Lock() defer s.Unlock() @@ -601,7 +627,7 @@ func (s *Statsd) parseStatsdLine(line string) error { // Validate metric type switch pipesplit[1] { - case "g", "c", "s", "ms", "h": + case "g", "c", "s", "ms", "h", "d": m.mtype = pipesplit[1] default: s.Log.Errorf("Metric type %q unsupported", pipesplit[1]) @@ -618,7 +644,7 @@ func (s *Statsd) parseStatsdLine(line string) error { } switch m.mtype { - case "g", "ms", "h": + case "g", "ms", "h", "d": v, err := strconv.ParseFloat(pipesplit[0], 64) if err != nil { s.Log.Errorf("Parsing value to float64, unable to parse metric: %s", line) @@ -658,6 +684,8 @@ func (s *Statsd) parseStatsdLine(line string) error { m.tags["metric_type"] = "timing" case "h": m.tags["metric_type"] = "histogram" + case "d": + m.tags["metric_type"] = "distribution" } if len(lineTags) > 0 { for k, v := range lineTags { @@ -749,6 +777,15 @@ func (s *Statsd) aggregate(m metric) { defer s.Unlock() switch m.mtype { + case "d": + if s.DataDogExtensions && s.DataDogDistributions { + cached := cacheddistributions{ + name: m.name, + value: m.floatvalue, + tags: m.tags, + } + s.distributions = append(s.distributions, cached) + } case "ms", "h": // Check if the measurement exists cached, ok := s.timings[m.hash] diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 4a129266d..7e6a78223 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -31,6 +31,7 @@ func NewTestStatsd() *Statsd { s.counters = make(map[string]cachedcounter) s.sets = make(map[string]cachedset) s.timings = make(map[string]cachedtimings) + s.distributions = make([]cacheddistributions, 0) s.MetricSeparator = "_" @@ -430,7 +431,7 @@ func TestParse_Timings(t *testing.T) { s.Percentiles = []internal.Number{{Value: 90.0}} acc := &testutil.Accumulator{} - // Test that counters work + // Test that timings work validLines := []string{ "test.timing:1|ms", "test.timing:11|ms", @@ -461,6 +462,63 @@ func TestParse_Timings(t *testing.T) { acc.AssertContainsFields(t, "test_timing", valid) } +// Tests low-level functionality of distributions +func TestParse_Distributions(t *testing.T) { + s := NewTestStatsd() + acc := &testutil.Accumulator{} + + parseMetrics := func() { + // Test that distributions work + validLines := []string{ + "test.distribution:1|d", + "test.distribution2:2|d", + "test.distribution3:3|d", + "test.distribution4:1|d", + "test.distribution5:1|d", + } + + for _, line := range validLines { + err := s.parseStatsdLine(line) + if err != nil { + t.Errorf("Parsing line %s should not have resulted in an error\n", line) + } + } + + s.Gather(acc) + } + + validMeasurementMap := map[string]float64{ + "test_distribution": 1, + "test_distribution2": 2, + "test_distribution3": 3, + "test_distribution4": 1, + "test_distribution5": 1, + } + + // Test parsing when DataDogExtensions and DataDogDistributions aren't enabled + parseMetrics() + for key := range validMeasurementMap { + acc.AssertDoesNotContainMeasurement(t, key) + } + + // Test parsing when DataDogDistributions is enabled but not DataDogExtensions + s.DataDogDistributions = true + parseMetrics() + for key := range validMeasurementMap { + acc.AssertDoesNotContainMeasurement(t, key) + } + + // Test parsing when DataDogExtensions and DataDogDistributions are enabled + s.DataDogExtensions = true + parseMetrics() + for key, value := range validMeasurementMap { + field := map[string]interface{}{ + "value": float64(value), + } + acc.AssertContainsFields(t, key, field) + } +} + func TestParseScientificNotation(t *testing.T) { s := NewTestStatsd() sciNotationLines := []string{