feat(outputs.datadog): Add support for submitting alongside dd-agent (#15702)

This commit is contained in:
Joseph Heyburn 2024-08-07 15:58:25 +01:00 committed by GitHub
parent 61efaee971
commit 66a042f592
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 660 additions and 18 deletions

View File

@ -36,6 +36,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Override the default (none) compression used to send data. ## Override the default (none) compression used to send data.
## Supports: "zlib", "none" ## Supports: "zlib", "none"
# compression = "none" # compression = "none"
## When non-zero, converts count metrics submitted by inputs.statsd
## into rate, while dividing the metric value by this number.
## Note that in order for metrics to be submitted simultaenously alongside
## a Datadog agent, rate_interval has to match the interval used by the
## agent - which defaults to 10s
# rate_interval = 0s
``` ```
## Metrics ## Metrics
@ -46,11 +53,13 @@ field key with a `.` character.
Field values are converted to floating point numbers. Strings and floats that Field values are converted to floating point numbers. Strings and floats that
cannot be sent over JSON, namely NaN and Inf, are ignored. cannot be sent over JSON, namely NaN and Inf, are ignored.
We do not send `Rate` types. Counts are sent as `count`, with an Setting `rate_interval` to non-zero will convert `count` metrics to `rate`
interval hard-coded to 1. Note that this behavior does *not* play and divide its value by this interval before submitting to Datadog.
super-well if running simultaneously with current Datadog agents; they This allows Telegraf to submit metrics alongside Datadog agents when their rate
will attempt to change to `Rate` with `interval=10`. We prefer this intervals are the same (Datadog defaults to `10s`).
method, however, as it reflects the raw data more accurately. Note that this only supports metrics ingested via `inputs.statsd` given
the dependency on the `metric_type` tag it creates. There is only support for
`counter` metrics, and `count` values from `timing` and `histogram` metrics.
[metrics]: https://docs.datadoghq.com/api/v1/metrics/#submit-metrics [metrics]: https://docs.datadoghq.com/api/v1/metrics/#submit-metrics
[apikey]: https://app.datadoghq.com/account/settings#api [apikey]: https://app.datadoghq.com/account/settings#api

View File

@ -25,11 +25,12 @@ import (
var sampleConfig string var sampleConfig string
type Datadog struct { type Datadog struct {
Apikey string `toml:"apikey"` Apikey string `toml:"apikey"`
Timeout config.Duration `toml:"timeout"` Timeout config.Duration `toml:"timeout"`
URL string `toml:"url"` URL string `toml:"url"`
Compression string `toml:"compression"` Compression string `toml:"compression"`
Log telegraf.Logger `toml:"-"` RateInterval config.Duration `toml:"rate_interval"`
Log telegraf.Logger `toml:"-"`
client *http.Client client *http.Client
proxy.HTTPProxy proxy.HTTPProxy
@ -75,15 +76,15 @@ func (d *Datadog) Connect() error {
return nil return nil
} }
func (d *Datadog) Write(metrics []telegraf.Metric) error { func (d *Datadog) convertToDatadogMetric(metrics []telegraf.Metric) []*Metric {
ts := TimeSeries{}
tempSeries := []*Metric{} tempSeries := []*Metric{}
metricCounter := 0
for _, m := range metrics { for _, m := range metrics {
if dogMs, err := buildMetrics(m); err == nil { if dogMs, err := buildMetrics(m); err == nil {
metricTags := buildTags(m.TagList()) metricTags := buildTags(m.TagList())
host, _ := m.GetTag("host") host, _ := m.GetTag("host")
// Retrieve the metric_type tag created by inputs.statsd
statsDMetricType, _ := m.GetTag("metric_type")
if len(dogMs) == 0 { if len(dogMs) == 0 {
continue continue
@ -99,9 +100,21 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
dname = m.Name() + "." + fieldName dname = m.Name() + "." + fieldName
} }
var tname string var tname string
var interval int64
interval = 1
switch m.Type() { switch m.Type() {
case telegraf.Counter: case telegraf.Counter, telegraf.Untyped:
tname = "count" if d.RateInterval > 0 && isRateable(statsDMetricType, fieldName) {
// interval is expected to be in seconds
rateIntervalSeconds := time.Duration(d.RateInterval).Seconds()
interval = int64(rateIntervalSeconds)
dogM[1] = dogM[1] / rateIntervalSeconds
tname = "rate"
} else if m.Type() == telegraf.Counter {
tname = "count"
} else {
tname = ""
}
case telegraf.Gauge: case telegraf.Gauge:
tname = "gauge" tname = "gauge"
default: default:
@ -112,23 +125,28 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
Tags: metricTags, Tags: metricTags,
Host: host, Host: host,
Type: tname, Type: tname,
Interval: 1, Interval: interval,
} }
metric.Points[0] = dogM metric.Points[0] = dogM
tempSeries = append(tempSeries, metric) tempSeries = append(tempSeries, metric)
metricCounter++
} }
} else { } else {
d.Log.Infof("Unable to build Metric for %s due to error '%v', skipping", m.Name(), err) d.Log.Infof("Unable to build Metric for %s due to error '%v', skipping", m.Name(), err)
} }
} }
return tempSeries
}
func (d *Datadog) Write(metrics []telegraf.Metric) error {
ts := TimeSeries{}
tempSeries := d.convertToDatadogMetric(metrics)
if len(tempSeries) == 0 { if len(tempSeries) == 0 {
return nil return nil
} }
redactedAPIKey := "****************" redactedAPIKey := "****************"
ts.Series = make([]*Metric, metricCounter) ts.Series = make([]*Metric, len(tempSeries))
copy(ts.Series, tempSeries[0:]) copy(ts.Series, tempSeries[0:])
tsBytes, err := json.Marshal(ts) tsBytes, err := json.Marshal(ts)
if err != nil { if err != nil {
@ -220,6 +238,20 @@ func verifyValue(v interface{}) bool {
return true return true
} }
func isRateable(statsDMetricType string, fieldName string) bool {
switch statsDMetricType {
case
"counter":
return true
case
"timing",
"histogram":
return fieldName == "count"
default:
return false
}
}
func (p *Point) setValue(v interface{}) error { func (p *Point) setValue(v interface{}) error {
switch d := v.(type) { switch d := v.(type) {
case int64: case int64:

View File

@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -305,3 +306,596 @@ func TestInfIsSkipped(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
} }
func TestNonZeroRateIntervalConvertsRatesToCount(t *testing.T) {
d := &Datadog{
Apikey: "123456",
RateInterval: config.Duration(10 * time.Second),
}
var tests = []struct {
name string
metricsIn []telegraf.Metric
metricsOut []*Metric
}{
{
"convert counter metrics to rate",
[]telegraf.Metric{
testutil.MustMetric(
"count_metric",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 100,
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
telegraf.Counter,
),
},
[]*Metric{
{
Metric: "count_metric",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
10,
},
},
Type: "rate",
Tags: []string{
"metric_type:counter",
},
Interval: 10,
},
},
},
{
"convert count value in timing metrics to rate",
[]telegraf.Metric{
testutil.MustMetric(
"timing_metric",
map[string]string{
"metric_type": "timing",
},
map[string]interface{}{
"count": 1,
"lower": float64(10),
"mean": float64(10),
"median": float64(10),
"stddev": float64(0),
"sum": float64(10),
"upper": float64(10),
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
telegraf.Untyped,
),
},
[]*Metric{
{
Metric: "timing_metric.count",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
0.1,
},
},
Type: "rate",
Tags: []string{
"metric_type:timing",
},
Interval: 10,
},
{
Metric: "timing_metric.lower",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.mean",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.median",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.stddev",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(0),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.sum",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.upper",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
},
},
{
"convert count value in histogram metrics to rate",
[]telegraf.Metric{
testutil.MustMetric(
"histogram_metric",
map[string]string{
"metric_type": "histogram",
},
map[string]interface{}{
"count": 1,
"lower": float64(10),
"mean": float64(10),
"median": float64(10),
"stddev": float64(0),
"sum": float64(10),
"upper": float64(10),
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
telegraf.Untyped,
),
},
[]*Metric{
{
Metric: "histogram_metric.count",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
0.1,
},
},
Type: "rate",
Tags: []string{
"metric_type:histogram",
},
Interval: 10,
},
{
Metric: "histogram_metric.lower",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.mean",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.median",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.stddev",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(0),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.sum",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.upper",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualMetricsOut := d.convertToDatadogMetric(tt.metricsIn)
require.ElementsMatch(t, tt.metricsOut, actualMetricsOut)
})
}
}
func TestZeroRateIntervalConvertsRatesToCount(t *testing.T) {
d := &Datadog{
Apikey: "123456",
}
var tests = []struct {
name string
metricsIn []telegraf.Metric
metricsOut []*Metric
}{
{
"does not convert counter metrics to rate",
[]telegraf.Metric{
testutil.MustMetric(
"count_metric",
map[string]string{
"metric_type": "counter",
},
map[string]interface{}{
"value": 100,
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
telegraf.Counter,
),
},
[]*Metric{
{
Metric: "count_metric",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
100,
},
},
Type: "count",
Tags: []string{
"metric_type:counter",
},
Interval: 1,
},
},
},
{
"does not convert count value in timing metrics to rate",
[]telegraf.Metric{
testutil.MustMetric(
"timing_metric",
map[string]string{
"metric_type": "timing",
},
map[string]interface{}{
"count": 1,
"lower": float64(10),
"mean": float64(10),
"median": float64(10),
"stddev": float64(0),
"sum": float64(10),
"upper": float64(10),
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
telegraf.Untyped,
),
},
[]*Metric{
{
Metric: "timing_metric.count",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
1,
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.lower",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.mean",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.median",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.stddev",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(0),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.sum",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
{
Metric: "timing_metric.upper",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:timing",
},
Interval: 1,
},
},
},
{
"does not convert count value in histogram metrics to rate",
[]telegraf.Metric{
testutil.MustMetric(
"histogram_metric",
map[string]string{
"metric_type": "histogram",
},
map[string]interface{}{
"count": 1,
"lower": float64(10),
"mean": float64(10),
"median": float64(10),
"stddev": float64(0),
"sum": float64(10),
"upper": float64(10),
},
time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
telegraf.Untyped,
),
},
[]*Metric{
{
Metric: "histogram_metric.count",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
1,
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.lower",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.mean",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.median",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.stddev",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(0),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.sum",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
{
Metric: "histogram_metric.upper",
Points: [1]Point{
{
float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()),
float64(10),
},
},
Type: "",
Tags: []string{
"metric_type:histogram",
},
Interval: 1,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actualMetricsOut := d.convertToDatadogMetric(tt.metricsIn)
require.ElementsMatch(t, tt.metricsOut, actualMetricsOut)
})
}
}

View File

@ -18,3 +18,10 @@
## Override the default (none) compression used to send data. ## Override the default (none) compression used to send data.
## Supports: "zlib", "none" ## Supports: "zlib", "none"
# compression = "none" # compression = "none"
## When non-zero, converts count metrics submitted by inputs.statsd
## into rate, while dividing the metric value by this number.
## Note that in order for metrics to be submitted simultaenously alongside
## a Datadog agent, rate_interval has to match the interval used by the
## agent - which defaults to 10s
# rate_interval = 0s