Add rate and interval to the basicstats aggregator plugin (#8428)

This commit is contained in:
Olli-Pekka Lehto 2020-11-20 14:53:51 -06:00 committed by GitHub
parent 521caf3995
commit 245bef2f3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 134 additions and 23 deletions

View File

@ -16,7 +16,7 @@ emitting the aggregate every `period` seconds.
drop_original = false drop_original = false
## Configures which basic stats to push as fields ## Configures which basic stats to push as fields
# stats = ["count","diff","min","max","mean","non_negative_diff","stdev","s2","sum"] # stats = ["count","diff","rate","min","max","mean","non_negative_diff","non_negative_rate","stdev","s2","sum","interval"]
``` ```
- stats - stats
@ -28,13 +28,16 @@ emitting the aggregate every `period` seconds.
- measurement1 - measurement1
- field1_count - field1_count
- field1_diff (difference) - field1_diff (difference)
- field1_rate (rate per second)
- field1_max - field1_max
- field1_min - field1_min
- field1_mean - field1_mean
- field1_non_negative_diff (non-negative difference) - field1_non_negative_diff (non-negative difference)
- field1_non_negative_rate (non-negative rate per second)
- field1_sum - field1_sum
- field1_s2 (variance) - field1_s2 (variance)
- field1_stdev (standard deviation) - field1_stdev (standard deviation)
- field1_interval (interval in nanoseconds)
### Tags: ### Tags:
@ -46,8 +49,8 @@ No tags are applied by this aggregator.
$ telegraf --config telegraf.conf --quiet $ telegraf --config telegraf.conf --quiet
system,host=tars load1=1 1475583980000000000 system,host=tars load1=1 1475583980000000000
system,host=tars load1=1 1475583990000000000 system,host=tars load1=1 1475583990000000000
system,host=tars load1_count=2,load1_diff=0,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0 1475584010000000000 system,host=tars load1_count=2,load1_diff=0,load1_rate=0,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0,load1_interval=10000000000i 1475584010000000000
system,host=tars load1=1 1475584020000000000 system,host=tars load1=1 1475584020000000000
system,host=tars load1=3 1475584030000000000 system,host=tars load1=3 1475584030000000000
system,host=tars load1_count=2,load1_diff=2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162 1475584010000000000 system,host=tars load1_count=2,load1_diff=2,load1_rate=0.2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162,load1_interval=10000000000i 1475584010000000000
``` ```

View File

@ -2,6 +2,7 @@ package basicstats
import ( import (
"math" "math"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators" "github.com/influxdata/telegraf/plugins/aggregators"
@ -25,6 +26,9 @@ type configuredStats struct {
sum bool sum bool
diff bool diff bool
non_negative_diff bool non_negative_diff bool
rate bool
non_negative_rate bool
interval bool
} }
func NewBasicStats() *BasicStats { func NewBasicStats() *BasicStats {
@ -40,14 +44,17 @@ type aggregate struct {
} }
type basicstats struct { type basicstats struct {
count float64 count float64
min float64 min float64
max float64 max float64
sum float64 sum float64
mean float64 mean float64
diff float64 diff float64
M2 float64 //intermediate value for variance/stdev rate float64
LAST float64 //intermediate value for diff interval time.Duration
M2 float64 //intermediate value for variance/stdev
LAST float64 //intermediate value for diff
TIME time.Time //intermediate value for rate
} }
var sampleConfig = ` var sampleConfig = `
@ -88,8 +95,10 @@ func (b *BasicStats) Add(in telegraf.Metric) {
mean: fv, mean: fv,
sum: fv, sum: fv,
diff: 0.0, diff: 0.0,
rate: 0.0,
M2: 0.0, M2: 0.0,
LAST: fv, LAST: fv,
TIME: in.Time(),
} }
} }
} }
@ -100,14 +109,17 @@ func (b *BasicStats) Add(in telegraf.Metric) {
if _, ok := b.cache[id].fields[field.Key]; !ok { if _, ok := b.cache[id].fields[field.Key]; !ok {
// hit an uncached field of a cached metric // hit an uncached field of a cached metric
b.cache[id].fields[field.Key] = basicstats{ b.cache[id].fields[field.Key] = basicstats{
count: 1, count: 1,
min: fv, min: fv,
max: fv, max: fv,
mean: fv, mean: fv,
sum: fv, sum: fv,
diff: 0.0, diff: 0.0,
M2: 0.0, rate: 0.0,
LAST: fv, interval: 0,
M2: 0.0,
LAST: fv,
TIME: in.Time(),
} }
continue continue
} }
@ -138,6 +150,12 @@ func (b *BasicStats) Add(in telegraf.Metric) {
tmp.sum += fv tmp.sum += fv
//diff compute //diff compute
tmp.diff = fv - tmp.LAST tmp.diff = fv - tmp.LAST
//interval compute
tmp.interval = in.Time().Sub(tmp.TIME)
//rate compute
if !in.Time().Equal(tmp.TIME) {
tmp.rate = tmp.diff / tmp.interval.Seconds()
}
//store final data //store final data
b.cache[id].fields[field.Key] = tmp b.cache[id].fields[field.Key] = tmp
} }
@ -182,7 +200,15 @@ func (b *BasicStats) Push(acc telegraf.Accumulator) {
if b.statsConfig.non_negative_diff && v.diff >= 0 { if b.statsConfig.non_negative_diff && v.diff >= 0 {
fields[k+"_non_negative_diff"] = v.diff fields[k+"_non_negative_diff"] = v.diff
} }
if b.statsConfig.rate {
fields[k+"_rate"] = v.rate
}
if b.statsConfig.non_negative_rate && v.diff >= 0 {
fields[k+"_non_negative_rate"] = v.rate
}
if b.statsConfig.interval {
fields[k+"_interval"] = v.interval.Nanoseconds()
}
} }
//if count == 1 StdDev = infinite => so I won't send data //if count == 1 StdDev = infinite => so I won't send data
} }
@ -217,7 +243,12 @@ func (b *BasicStats) parseStats() *configuredStats {
parsed.diff = true parsed.diff = true
case "non_negative_diff": case "non_negative_diff":
parsed.non_negative_diff = true parsed.non_negative_diff = true
case "rate":
parsed.rate = true
case "non_negative_rate":
parsed.non_negative_rate = true
case "interval":
parsed.interval = true
default: default:
b.Log.Warnf("Unrecognized basic stat %q, ignoring", name) b.Log.Warnf("Unrecognized basic stat %q, ignoring", name)
} }
@ -237,6 +268,8 @@ func (b *BasicStats) getConfiguredStats() {
stdev: true, stdev: true,
sum: false, sum: false,
non_negative_diff: false, non_negative_diff: false,
rate: false,
non_negative_rate: false,
} }
} else { } else {
b.statsConfig = b.parseStats() b.statsConfig = b.parseStats()

View File

@ -19,7 +19,7 @@ var m1, _ = metric.New("m1",
"d": float64(2), "d": float64(2),
"g": int64(3), "g": int64(3),
}, },
time.Now(), time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
) )
var m2, _ = metric.New("m1", var m2, _ = metric.New("m1",
map[string]string{"foo": "bar"}, map[string]string{"foo": "bar"},
@ -34,7 +34,7 @@ var m2, _ = metric.New("m1",
"andme": true, "andme": true,
"g": int64(1), "g": int64(1),
}, },
time.Now(), time.Date(2000, 1, 1, 0, 0, 0, 1e6, time.UTC),
) )
func BenchmarkApply(b *testing.B) { func BenchmarkApply(b *testing.B) {
@ -498,6 +498,81 @@ func TestBasicStatsWithDiff(t *testing.T) {
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
} }
func TestBasicStatsWithRate(t *testing.T) {
aggregator := NewBasicStats()
aggregator.Stats = []string{"rate"}
aggregator.Log = testutil.Logger{}
aggregator.getConfiguredStats()
aggregator.Add(m1)
aggregator.Add(m2)
acc := testutil.Accumulator{}
aggregator.Push(&acc)
expectedFields := map[string]interface{}{
"a_rate": float64(0),
"b_rate": float64(2000),
"c_rate": float64(2000),
"d_rate": float64(4000),
"g_rate": float64(-2000),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
func TestBasicStatsWithNonNegativeRate(t *testing.T) {
aggregator := NewBasicStats()
aggregator.Stats = []string{"non_negative_rate"}
aggregator.Log = testutil.Logger{}
aggregator.getConfiguredStats()
aggregator.Add(m1)
aggregator.Add(m2)
acc := testutil.Accumulator{}
aggregator.Push(&acc)
expectedFields := map[string]interface{}{
"a_non_negative_rate": float64(0),
"b_non_negative_rate": float64(2000),
"c_non_negative_rate": float64(2000),
"d_non_negative_rate": float64(4000),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
func TestBasicStatsWithInterval(t *testing.T) {
aggregator := NewBasicStats()
aggregator.Stats = []string{"interval"}
aggregator.Log = testutil.Logger{}
aggregator.getConfiguredStats()
aggregator.Add(m1)
aggregator.Add(m2)
acc := testutil.Accumulator{}
aggregator.Push(&acc)
expectedFields := map[string]interface{}{
"a_interval": int64(time.Millisecond),
"b_interval": int64(time.Millisecond),
"c_interval": int64(time.Millisecond),
"d_interval": int64(time.Millisecond),
"g_interval": int64(time.Millisecond),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
// Test only aggregating non_negative_diff // Test only aggregating non_negative_diff
func TestBasicStatsWithNonNegativeDiff(t *testing.T) { func TestBasicStatsWithNonNegativeDiff(t *testing.T) {