telegraf/plugins/aggregators/basicstats/basicstats.go

326 lines
6.8 KiB
Go
Raw Normal View History

//go:generate ../../../tools/readme_config_includer/generator
2017-10-11 03:02:01 +08:00
package basicstats
import (
_ "embed"
2017-10-11 03:02:01 +08:00
"math"
"time"
2017-10-11 03:02:01 +08:00
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
)
//go:embed sample.conf
var sampleConfig string
2017-10-11 03:02:01 +08:00
type BasicStats struct {
Stats []string `toml:"stats"`
Log telegraf.Logger
cache map[uint64]aggregate
statsConfig *configuredStats
}
type configuredStats struct {
count bool
min bool
max bool
mean bool
variance bool
stdev bool
sum bool
diff bool
nonNegativeDiff bool
rate bool
nonNegativeRate bool
percentChange bool
interval bool
last bool
first bool
2017-10-11 03:02:01 +08:00
}
func NewBasicStats() *BasicStats {
return &BasicStats{
cache: make(map[uint64]aggregate),
}
2017-10-11 03:02:01 +08:00
}
type aggregate struct {
fields map[string]basicstats
name string
tags map[string]string
}
type basicstats struct {
count float64
min float64
max float64
sum float64
mean float64
diff float64
rate float64
interval time.Duration
last float64
first float64
M2 float64 // intermediate value for variance/stdev
PREVIOUS float64 // intermediate value for diff
TIME time.Time // intermediate value for rate
2017-10-11 03:02:01 +08:00
}
func (*BasicStats) SampleConfig() string {
return sampleConfig
}
func (b *BasicStats) Add(in telegraf.Metric) {
2017-10-11 03:02:01 +08:00
id := in.HashID()
if _, ok := b.cache[id]; !ok {
2017-10-11 03:02:01 +08:00
// hit an uncached metric, create caches for first time:
a := aggregate{
name: in.Name(),
tags: in.Tags(),
fields: make(map[string]basicstats),
}
for _, field := range in.FieldList() {
if fv, ok := convert(field.Value); ok {
a.fields[field.Key] = basicstats{
count: 1,
min: fv,
max: fv,
mean: fv,
sum: fv,
diff: 0.0,
rate: 0.0,
last: fv,
first: fv,
M2: 0.0,
PREVIOUS: fv,
TIME: in.Time(),
2017-10-11 03:02:01 +08:00
}
}
}
b.cache[id] = a
2017-10-11 03:02:01 +08:00
} else {
for _, field := range in.FieldList() {
if fv, ok := convert(field.Value); ok {
if _, ok := b.cache[id].fields[field.Key]; !ok {
2017-10-11 03:02:01 +08:00
// hit an uncached field of a cached metric
b.cache[id].fields[field.Key] = basicstats{
count: 1,
min: fv,
max: fv,
mean: fv,
sum: fv,
diff: 0.0,
rate: 0.0,
interval: 0,
last: fv,
first: fv,
M2: 0.0,
PREVIOUS: fv,
TIME: in.Time(),
2017-10-11 03:02:01 +08:00
}
continue
}
tmp := b.cache[id].fields[field.Key]
// https://en.m.wikipedia.org/wiki/Algorithms_for_calculating_variance
// variable initialization
2017-10-11 03:02:01 +08:00
x := fv
mean := tmp.mean
m2 := tmp.M2
// counter compute
2017-10-11 03:02:01 +08:00
n := tmp.count + 1
tmp.count = n
// mean compute
2017-10-11 03:02:01 +08:00
delta := x - mean
mean = mean + delta/n
tmp.mean = mean
// variance/stdev compute
m2 = m2 + delta*(x-mean)
tmp.M2 = m2
// max/min compute
2017-10-11 03:02:01 +08:00
if fv < tmp.min {
tmp.min = fv
} else if fv > tmp.max {
tmp.max = fv
}
// sum compute
tmp.sum += fv
// diff compute
tmp.diff = fv - tmp.PREVIOUS
// interval compute
tmp.interval = in.Time().Sub(tmp.TIME)
// rate compute
if !in.Time().Equal(tmp.TIME) {
tmp.rate = tmp.diff / tmp.interval.Seconds()
}
// last compute
tmp.last = fv
// store final data
b.cache[id].fields[field.Key] = tmp
2017-10-11 03:02:01 +08:00
}
}
}
}
func (b *BasicStats) Push(acc telegraf.Accumulator) {
for _, aggregate := range b.cache {
fields := make(map[string]interface{})
2017-10-11 03:02:01 +08:00
for k, v := range aggregate.fields {
if b.statsConfig.count {
fields[k+"_count"] = v.count
}
if b.statsConfig.min {
fields[k+"_min"] = v.min
}
if b.statsConfig.max {
fields[k+"_max"] = v.max
}
if b.statsConfig.mean {
fields[k+"_mean"] = v.mean
}
if b.statsConfig.sum {
fields[k+"_sum"] = v.sum
}
if b.statsConfig.last {
fields[k+"_last"] = v.last
}
if b.statsConfig.first {
fields[k+"_first"] = v.first
}
// v.count always >=1
2017-10-11 03:02:01 +08:00
if v.count > 1 {
variance := v.M2 / (v.count - 1)
if b.statsConfig.variance {
fields[k+"_s2"] = variance
}
if b.statsConfig.stdev {
fields[k+"_stdev"] = math.Sqrt(variance)
}
if b.statsConfig.diff {
fields[k+"_diff"] = v.diff
}
if b.statsConfig.nonNegativeDiff && v.diff >= 0 {
fields[k+"_non_negative_diff"] = v.diff
}
if b.statsConfig.rate {
fields[k+"_rate"] = v.rate
}
if b.statsConfig.percentChange {
fields[k+"_percent_change"] = v.diff / v.PREVIOUS * 100
}
if b.statsConfig.nonNegativeRate && v.diff >= 0 {
fields[k+"_non_negative_rate"] = v.rate
}
if b.statsConfig.interval {
fields[k+"_interval"] = v.interval.Nanoseconds()
}
2017-10-11 03:02:01 +08:00
}
// if count == 1 StdDev = infinite => so I won't send data
2017-10-11 03:02:01 +08:00
}
if len(fields) > 0 {
acc.AddFields(aggregate.name, fields, aggregate.tags)
}
}
}
// member function for logging.
func (b *BasicStats) parseStats() *configuredStats {
parsed := &configuredStats{}
for _, name := range b.Stats {
switch name {
case "count":
parsed.count = true
case "min":
parsed.min = true
case "max":
parsed.max = true
case "mean":
parsed.mean = true
case "s2":
parsed.variance = true
case "stdev":
parsed.stdev = true
case "sum":
parsed.sum = true
case "diff":
parsed.diff = true
case "non_negative_diff":
parsed.nonNegativeDiff = true
case "rate":
parsed.rate = true
case "non_negative_rate":
parsed.nonNegativeRate = true
case "percent_change":
parsed.percentChange = true
case "interval":
parsed.interval = true
case "last":
parsed.last = true
case "first":
parsed.first = true
default:
b.Log.Warnf("Unrecognized basic stat %q, ignoring", name)
}
2017-10-11 03:02:01 +08:00
}
return parsed
}
func (b *BasicStats) initConfiguredStats() {
if b.Stats == nil {
b.statsConfig = &configuredStats{
count: true,
min: true,
max: true,
mean: true,
variance: true,
stdev: true,
sum: false,
diff: false,
nonNegativeDiff: false,
rate: false,
nonNegativeRate: false,
percentChange: false,
interval: false,
last: false,
first: false,
}
} else {
b.statsConfig = b.parseStats()
}
2017-10-11 03:02:01 +08:00
}
func (b *BasicStats) Reset() {
b.cache = make(map[uint64]aggregate)
2017-10-11 03:02:01 +08:00
}
func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
return v, true
case int64:
return float64(v), true
case uint64:
return float64(v), true
2017-10-11 03:02:01 +08:00
default:
return 0, false
}
}
func (b *BasicStats) Init() error {
b.initConfiguredStats()
return nil
}
2017-10-11 03:02:01 +08:00
func init() {
aggregators.Add("basicstats", func() telegraf.Aggregator {
return NewBasicStats()
})
}