Add support for datadog distributions metric (#8179)

* Add support for datadog distributions in statsd

* Parse metric distribution correctly

* Add tests to check distributions are parsed correctly

* Update Statsd plugin Readme with details about Distributions metric

* Refactor metric distribution initialization code

* Update distribution metric interface to replace fields with value

* Refactor statsd distribution metric test code

* Fix go formatting errors

* Add tests to parse only when DataDog Distributions config is enabled

* Add config to enable parsing DataDog Statsd Distributions

* Document use of datadog_distributions config in Readme
This commit is contained in:
Sreejith Pp 2021-02-16 23:20:01 +05:30 committed by GitHub
parent f09e551cbd
commit f888136333
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 7 deletions

View File

@ -7328,6 +7328,9 @@
# ## Parses datadog extensions to the statsd format
# datadog_extensions = false
#
# ## Parses distributions metric from datadog's extension to the statsd format
# datadog_distributions = false
#
# ## Statsd data translation templates, more info can be read here:
# ## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md
# # templates = [

View File

@ -50,6 +50,10 @@
## http://docs.datadoghq.com/guides/dogstatsd/
datadog_extensions = false
## Parses distributions metric as specified in the datadog statsd format
## https://docs.datadoghq.com/developers/metrics/types/?tab=distribution#definition
datadog_distributions = false
## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md
# templates = [
@ -98,6 +102,10 @@ implementation. In short, the telegraf statsd listener will accept:
- `load.time:320|ms`
- `load.time.nanoseconds:1|h`
- `load.time:200|ms|@0.1` <- sampled 1/10 of the time
- Distributions
- `load.time:320|d`
- `load.time.nanoseconds:1|d`
- `load.time:200|d|@0.1` <- sampled 1/10 of the time
It is possible to omit repetitive names and merge individual stats into a
single line by separating them with additional colons:
@ -172,6 +180,9 @@ metric type:
that `P%` of all the values statsd saw for that stat during that time
period are below x. The most common value that people use for `P` is the
`90`, this is a great number to try to optimize.
- Distributions
- The Distribution metric represents the global statistical distribution of a set of values calculated across your entire distributed infrastructure in one time interval. A Distribution can be used to instrument logical objects, like services, independently from the underlying hosts.
- Unlike the Histogram metric type, which aggregates on the Agent during a given time interval, a Distribution metric sends all the raw data during a time interval.
### Plugin arguments
@ -195,6 +206,7 @@ the accuracy of percentiles but also increases the memory usage and cpu time.
measurements and tags.
- **parse_data_dog_tags** boolean: Enable parsing of tags in DataDog's dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/)
- **datadog_extensions** boolean: Enable parsing of DataDog's extensions to dogstatsd format (http://docs.datadoghq.com/guides/dogstatsd/)
- **datadog_distributions** boolean: Enable parsing of the Distribution metric in DataDog's dogstatsd format (https://docs.datadoghq.com/developers/metrics/types/?tab=distribution#definition)
- **max_ttl** config.Duration: Max duration (TTL) for each metric to stay cached/reported without being updated.
### Statsd bucket -> InfluxDB line-protocol Templates

View File

@ -70,6 +70,11 @@ type Statsd struct {
// http://docs.datadoghq.com/guides/dogstatsd/
DataDogExtensions bool `toml:"datadog_extensions"`
// Parses distribution metrics in the datadog statsd format.
// Requires the DataDogExtension flag to be enabled.
// https://docs.datadoghq.com/developers/metrics/types/?tab=distribution#definition
DataDogDistributions bool `toml:"datadog_distributions"`
// UDPPacketSize is deprecated, it's only here for legacy support
// we now always create 1 max size buffer and then copy only what we need
// into the in channel
@ -98,10 +103,12 @@ type Statsd struct {
// Cache gauges, counters & sets so they can be aggregated as they arrive
// gauges and counters map measurement/tags hash -> field name -> metrics
// sets and timings map measurement/tags hash -> metrics
gauges map[string]cachedgauge
counters map[string]cachedcounter
sets map[string]cachedset
timings map[string]cachedtimings
// distributions aggregate measurement/tags and are published directly
gauges map[string]cachedgauge
counters map[string]cachedcounter
sets map[string]cachedset
timings map[string]cachedtimings
distributions []cacheddistributions
// bucket -> influx templates
Templates []string
@ -190,6 +197,12 @@ type cachedtimings struct {
expiresAt time.Time
}
type cacheddistributions struct {
name string
value float64
tags map[string]string
}
func (_ *Statsd) Description() string {
return "Statsd UDP/TCP Server"
}
@ -237,6 +250,10 @@ const sampleConfig = `
## Parses datadog extensions to the statsd format
datadog_extensions = false
## Parses distributions metric as specified in the datadog statsd format
## https://docs.datadoghq.com/developers/metrics/types/?tab=distribution#definition
datadog_distributions = false
## Statsd data translation templates, more info can be read here:
## https://github.com/influxdata/telegraf/blob/master/docs/TEMPLATE_PATTERN.md
# templates = [
@ -265,6 +282,14 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
defer s.Unlock()
now := time.Now()
for _, m := range s.distributions {
fields := map[string]interface{}{
defaultFieldName: m.value,
}
acc.AddFields(m.name, fields, m.tags, now)
}
s.distributions = make([]cacheddistributions, 0)
for _, m := range s.timings {
// Defining a template to parse field names for timers allows us to split
// out multiple fields per timer. In this case we prefix each stat with the
@ -336,6 +361,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)
s.distributions = make([]cacheddistributions, 0)
s.Lock()
defer s.Unlock()
@ -601,7 +627,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
// Validate metric type
switch pipesplit[1] {
case "g", "c", "s", "ms", "h":
case "g", "c", "s", "ms", "h", "d":
m.mtype = pipesplit[1]
default:
s.Log.Errorf("Metric type %q unsupported", pipesplit[1])
@ -618,7 +644,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
}
switch m.mtype {
case "g", "ms", "h":
case "g", "ms", "h", "d":
v, err := strconv.ParseFloat(pipesplit[0], 64)
if err != nil {
s.Log.Errorf("Parsing value to float64, unable to parse metric: %s", line)
@ -658,6 +684,8 @@ func (s *Statsd) parseStatsdLine(line string) error {
m.tags["metric_type"] = "timing"
case "h":
m.tags["metric_type"] = "histogram"
case "d":
m.tags["metric_type"] = "distribution"
}
if len(lineTags) > 0 {
for k, v := range lineTags {
@ -749,6 +777,15 @@ func (s *Statsd) aggregate(m metric) {
defer s.Unlock()
switch m.mtype {
case "d":
if s.DataDogExtensions && s.DataDogDistributions {
cached := cacheddistributions{
name: m.name,
value: m.floatvalue,
tags: m.tags,
}
s.distributions = append(s.distributions, cached)
}
case "ms", "h":
// Check if the measurement exists
cached, ok := s.timings[m.hash]

View File

@ -31,6 +31,7 @@ func NewTestStatsd() *Statsd {
s.counters = make(map[string]cachedcounter)
s.sets = make(map[string]cachedset)
s.timings = make(map[string]cachedtimings)
s.distributions = make([]cacheddistributions, 0)
s.MetricSeparator = "_"
@ -430,7 +431,7 @@ func TestParse_Timings(t *testing.T) {
s.Percentiles = []internal.Number{{Value: 90.0}}
acc := &testutil.Accumulator{}
// Test that counters work
// Test that timings work
validLines := []string{
"test.timing:1|ms",
"test.timing:11|ms",
@ -461,6 +462,63 @@ func TestParse_Timings(t *testing.T) {
acc.AssertContainsFields(t, "test_timing", valid)
}
// Tests low-level functionality of distributions
func TestParse_Distributions(t *testing.T) {
s := NewTestStatsd()
acc := &testutil.Accumulator{}
parseMetrics := func() {
// Test that distributions work
validLines := []string{
"test.distribution:1|d",
"test.distribution2:2|d",
"test.distribution3:3|d",
"test.distribution4:1|d",
"test.distribution5:1|d",
}
for _, line := range validLines {
err := s.parseStatsdLine(line)
if err != nil {
t.Errorf("Parsing line %s should not have resulted in an error\n", line)
}
}
s.Gather(acc)
}
validMeasurementMap := map[string]float64{
"test_distribution": 1,
"test_distribution2": 2,
"test_distribution3": 3,
"test_distribution4": 1,
"test_distribution5": 1,
}
// Test parsing when DataDogExtensions and DataDogDistributions aren't enabled
parseMetrics()
for key := range validMeasurementMap {
acc.AssertDoesNotContainMeasurement(t, key)
}
// Test parsing when DataDogDistributions is enabled but not DataDogExtensions
s.DataDogDistributions = true
parseMetrics()
for key := range validMeasurementMap {
acc.AssertDoesNotContainMeasurement(t, key)
}
// Test parsing when DataDogExtensions and DataDogDistributions are enabled
s.DataDogExtensions = true
parseMetrics()
for key, value := range validMeasurementMap {
field := map[string]interface{}{
"value": float64(value),
}
acc.AssertContainsFields(t, key, field)
}
}
func TestParseScientificNotation(t *testing.T) {
s := NewTestStatsd()
sciNotationLines := []string{