feat(inputs.statsd): Allow reporting sets and timings count as floats (#15853)
This commit is contained in:
parent
ad1af9855e
commit
5d996ac0a2
|
|
@ -127,6 +127,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## Enabling this would ensure that both counters and guages are both emitted
|
## Enabling this would ensure that both counters and guages are both emitted
|
||||||
## as floats.
|
## as floats.
|
||||||
# float_counters = false
|
# float_counters = false
|
||||||
|
|
||||||
|
## Emit timings `metric_<name>_count` field as float, the same as all other
|
||||||
|
## histogram fields
|
||||||
|
# float_timings = false
|
||||||
|
|
||||||
|
## Emit sets as float
|
||||||
|
# float_sets = false
|
||||||
```
|
```
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
|
||||||
|
|
@ -100,3 +100,10 @@
|
||||||
## Enabling this would ensure that both counters and guages are both emitted
|
## Enabling this would ensure that both counters and guages are both emitted
|
||||||
## as floats.
|
## as floats.
|
||||||
# float_counters = false
|
# float_counters = false
|
||||||
|
|
||||||
|
## Emit timings `metric_<name>_count` field as float, the same as all other
|
||||||
|
## histogram fields
|
||||||
|
# float_timings = false
|
||||||
|
|
||||||
|
## Emit sets as float
|
||||||
|
# float_sets = false
|
||||||
|
|
|
||||||
|
|
@ -77,6 +77,8 @@ type Statsd struct {
|
||||||
DeleteTimings bool `toml:"delete_timings"`
|
DeleteTimings bool `toml:"delete_timings"`
|
||||||
ConvertNames bool `toml:"convert_names"`
|
ConvertNames bool `toml:"convert_names"`
|
||||||
FloatCounters bool `toml:"float_counters"`
|
FloatCounters bool `toml:"float_counters"`
|
||||||
|
FloatTimings bool `toml:"float_timings"`
|
||||||
|
FloatSets bool `toml:"float_sets"`
|
||||||
|
|
||||||
EnableAggregationTemporality bool `toml:"enable_aggregation_temporality"`
|
EnableAggregationTemporality bool `toml:"enable_aggregation_temporality"`
|
||||||
|
|
||||||
|
|
@ -260,7 +262,11 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
|
||||||
fields[prefix+"sum"] = stats.Sum()
|
fields[prefix+"sum"] = stats.Sum()
|
||||||
fields[prefix+"upper"] = stats.Upper()
|
fields[prefix+"upper"] = stats.Upper()
|
||||||
fields[prefix+"lower"] = stats.Lower()
|
fields[prefix+"lower"] = stats.Lower()
|
||||||
fields[prefix+"count"] = stats.Count()
|
if s.FloatTimings {
|
||||||
|
fields[prefix+"count"] = float64(stats.Count())
|
||||||
|
} else {
|
||||||
|
fields[prefix+"count"] = stats.Count()
|
||||||
|
}
|
||||||
for _, percentile := range s.Percentiles {
|
for _, percentile := range s.Percentiles {
|
||||||
name := fmt.Sprintf("%s%v_percentile", prefix, percentile)
|
name := fmt.Sprintf("%s%v_percentile", prefix, percentile)
|
||||||
fields[name] = stats.Percentile(float64(percentile))
|
fields[name] = stats.Percentile(float64(percentile))
|
||||||
|
|
@ -306,7 +312,11 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
|
||||||
for _, m := range s.sets {
|
for _, m := range s.sets {
|
||||||
fields := make(map[string]interface{})
|
fields := make(map[string]interface{})
|
||||||
for field, set := range m.fields {
|
for field, set := range m.fields {
|
||||||
fields[field] = int64(len(set))
|
if s.FloatSets {
|
||||||
|
fields[field] = float64(len(set))
|
||||||
|
} else {
|
||||||
|
fields[field] = int64(len(set))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if s.EnableAggregationTemporality {
|
if s.EnableAggregationTemporality {
|
||||||
fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
|
fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
|
||||||
|
|
|
||||||
|
|
@ -473,6 +473,51 @@ func TestParse_Sets(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParse_Sets_SetsAsFloat(t *testing.T) {
|
||||||
|
s := NewTestStatsd()
|
||||||
|
s.FloatSets = true
|
||||||
|
|
||||||
|
// Test that sets work
|
||||||
|
validLines := []string{
|
||||||
|
"unique.user.ids:100|s",
|
||||||
|
"unique.user.ids:100|s",
|
||||||
|
"unique.user.ids:200|s",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, line := range validLines {
|
||||||
|
require.NoErrorf(t, s.parseStatsdLine(line), "Parsing line %s should not have resulted in an error", line)
|
||||||
|
}
|
||||||
|
|
||||||
|
validations := []struct {
|
||||||
|
name string
|
||||||
|
value int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"unique_user_ids",
|
||||||
|
2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range validations {
|
||||||
|
require.NoError(t, testValidateSet(test.name, test.value, s.sets))
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"unique_user_ids",
|
||||||
|
map[string]string{"metric_type": "set"},
|
||||||
|
map[string]interface{}{"value": 2.0},
|
||||||
|
time.Now(),
|
||||||
|
telegraf.Untyped,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
require.NoError(t, s.Gather(acc))
|
||||||
|
metrics := acc.GetTelegrafMetrics()
|
||||||
|
testutil.PrintMetrics(metrics)
|
||||||
|
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics())
|
||||||
|
}
|
||||||
|
|
||||||
// Tests low-level functionality of counters
|
// Tests low-level functionality of counters
|
||||||
func TestParse_Counters(t *testing.T) {
|
func TestParse_Counters(t *testing.T) {
|
||||||
s := NewTestStatsd()
|
s := NewTestStatsd()
|
||||||
|
|
@ -676,6 +721,37 @@ func TestParse_Timings(t *testing.T) {
|
||||||
acc.AssertContainsFields(t, "test_timing", valid)
|
acc.AssertContainsFields(t, "test_timing", valid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParse_Timings_TimingsAsFloat(t *testing.T) {
|
||||||
|
s := NewTestStatsd()
|
||||||
|
s.FloatTimings = true
|
||||||
|
s.Percentiles = []Number{90.0}
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
|
||||||
|
// Test that timings work
|
||||||
|
validLines := []string{
|
||||||
|
"test.timing:100|ms",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, line := range validLines {
|
||||||
|
require.NoErrorf(t, s.parseStatsdLine(line), "Parsing line %s should not have resulted in an error", line)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, s.Gather(acc))
|
||||||
|
|
||||||
|
valid := map[string]interface{}{
|
||||||
|
"90_percentile": float64(100),
|
||||||
|
"count": float64(1),
|
||||||
|
"lower": float64(100),
|
||||||
|
"mean": float64(100),
|
||||||
|
"median": float64(100),
|
||||||
|
"stddev": float64(0),
|
||||||
|
"sum": float64(100),
|
||||||
|
"upper": float64(100),
|
||||||
|
}
|
||||||
|
|
||||||
|
acc.AssertContainsFields(t, "test_timing", valid)
|
||||||
|
}
|
||||||
|
|
||||||
// Tests low-level functionality of distributions
|
// Tests low-level functionality of distributions
|
||||||
func TestParse_Distributions(t *testing.T) {
|
func TestParse_Distributions(t *testing.T) {
|
||||||
s := NewTestStatsd()
|
s := NewTestStatsd()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue