166 lines
4.0 KiB
Go
166 lines
4.0 KiB
Go
|
|
package quantile
|
||
|
|
|
||
|
|
import (
|
||
|
|
"fmt"
|
||
|
|
|
||
|
|
"github.com/influxdata/telegraf"
|
||
|
|
"github.com/influxdata/telegraf/plugins/aggregators"
|
||
|
|
)
|
||
|
|
|
||
|
|
type Quantile struct {
|
||
|
|
Quantiles []float64 `toml:"quantiles"`
|
||
|
|
Compression float64 `toml:"compression"`
|
||
|
|
AlgorithmType string `toml:"algorithm"`
|
||
|
|
|
||
|
|
newAlgorithm newAlgorithmFunc
|
||
|
|
|
||
|
|
cache map[uint64]aggregate
|
||
|
|
suffixes []string
|
||
|
|
}
|
||
|
|
|
||
|
|
type aggregate struct {
|
||
|
|
name string
|
||
|
|
fields map[string]algorithm
|
||
|
|
tags map[string]string
|
||
|
|
}
|
||
|
|
|
||
|
|
type newAlgorithmFunc func(compression float64) (algorithm, error)
|
||
|
|
|
||
|
|
var sampleConfig = `
|
||
|
|
## General Aggregator Arguments:
|
||
|
|
## The period on which to flush & clear the aggregator.
|
||
|
|
period = "30s"
|
||
|
|
|
||
|
|
## If true, the original metric will be dropped by the
|
||
|
|
## aggregator and will not get sent to the output plugins.
|
||
|
|
drop_original = false
|
||
|
|
|
||
|
|
## Quantiles to output in the range [0,1]
|
||
|
|
# quantiles = [0.25, 0.5, 0.75]
|
||
|
|
|
||
|
|
## Type of aggregation algorithm
|
||
|
|
## Supported are:
|
||
|
|
## "t-digest" -- approximation using centroids, can cope with large number of samples
|
||
|
|
## "exact R7" -- exact computation also used by Excel or NumPy (Hyndman & Fan 1996 R7)
|
||
|
|
## "exact R8" -- exact computation (Hyndman & Fan 1996 R8)
|
||
|
|
## NOTE: Do not use "exact" algorithms with large number of samples
|
||
|
|
## to not impair performance or memory consumption!
|
||
|
|
# algorithm = "t-digest"
|
||
|
|
|
||
|
|
## Compression for approximation (t-digest). The value needs to be
|
||
|
|
## greater or equal to 1.0. Smaller values will result in more
|
||
|
|
## performance but less accuracy.
|
||
|
|
# compression = 100.0
|
||
|
|
`
|
||
|
|
|
||
|
|
func (q *Quantile) SampleConfig() string {
|
||
|
|
return sampleConfig
|
||
|
|
}
|
||
|
|
|
||
|
|
func (q *Quantile) Description() string {
|
||
|
|
return "Keep the aggregate quantiles of each metric passing through."
|
||
|
|
}
|
||
|
|
|
||
|
|
func (q *Quantile) Add(in telegraf.Metric) {
|
||
|
|
id := in.HashID()
|
||
|
|
if cached, ok := q.cache[id]; ok {
|
||
|
|
fields := in.Fields()
|
||
|
|
for k, algo := range cached.fields {
|
||
|
|
if field, ok := fields[k]; ok {
|
||
|
|
if v, isconvertible := convert(field); isconvertible {
|
||
|
|
algo.Add(v)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// New metric, setup cache and init algorithm
|
||
|
|
a := aggregate{
|
||
|
|
name: in.Name(),
|
||
|
|
tags: in.Tags(),
|
||
|
|
fields: make(map[string]algorithm),
|
||
|
|
}
|
||
|
|
for k, field := range in.Fields() {
|
||
|
|
if v, isconvertible := convert(field); isconvertible {
|
||
|
|
// This should never error out as we tested it in Init()
|
||
|
|
algo, _ := q.newAlgorithm(q.Compression)
|
||
|
|
algo.Add(v)
|
||
|
|
a.fields[k] = algo
|
||
|
|
}
|
||
|
|
}
|
||
|
|
q.cache[id] = a
|
||
|
|
}
|
||
|
|
|
||
|
|
func (q *Quantile) Push(acc telegraf.Accumulator) {
|
||
|
|
for _, aggregate := range q.cache {
|
||
|
|
fields := map[string]interface{}{}
|
||
|
|
for k, algo := range aggregate.fields {
|
||
|
|
for i, qtl := range q.Quantiles {
|
||
|
|
fields[k+q.suffixes[i]] = algo.Quantile(qtl)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
acc.AddFields(aggregate.name, fields, aggregate.tags)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (q *Quantile) Reset() {
|
||
|
|
q.cache = make(map[uint64]aggregate)
|
||
|
|
}
|
||
|
|
|
||
|
|
func convert(in interface{}) (float64, bool) {
|
||
|
|
switch v := in.(type) {
|
||
|
|
case float64:
|
||
|
|
return v, true
|
||
|
|
case int64:
|
||
|
|
return float64(v), true
|
||
|
|
case uint64:
|
||
|
|
return float64(v), true
|
||
|
|
default:
|
||
|
|
return 0, false
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (q *Quantile) Init() error {
|
||
|
|
switch q.AlgorithmType {
|
||
|
|
case "t-digest", "":
|
||
|
|
q.newAlgorithm = newTDigest
|
||
|
|
case "exact R7":
|
||
|
|
q.newAlgorithm = newExactR7
|
||
|
|
case "exact R8":
|
||
|
|
q.newAlgorithm = newExactR8
|
||
|
|
default:
|
||
|
|
return fmt.Errorf("unknown algorithm type %q", q.AlgorithmType)
|
||
|
|
}
|
||
|
|
if _, err := q.newAlgorithm(q.Compression); err != nil {
|
||
|
|
return fmt.Errorf("cannot create %q algorithm: %v", q.AlgorithmType, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
if len(q.Quantiles) == 0 {
|
||
|
|
q.Quantiles = []float64{0.25, 0.5, 0.75}
|
||
|
|
}
|
||
|
|
|
||
|
|
duplicates := make(map[float64]bool)
|
||
|
|
q.suffixes = make([]string, len(q.Quantiles))
|
||
|
|
for i, qtl := range q.Quantiles {
|
||
|
|
if qtl < 0.0 || qtl > 1.0 {
|
||
|
|
return fmt.Errorf("quantile %v out of range", qtl)
|
||
|
|
}
|
||
|
|
if _, found := duplicates[qtl]; found {
|
||
|
|
return fmt.Errorf("duplicate quantile %v", qtl)
|
||
|
|
}
|
||
|
|
duplicates[qtl] = true
|
||
|
|
q.suffixes[i] = fmt.Sprintf("_%03d", int(qtl*100.0))
|
||
|
|
}
|
||
|
|
|
||
|
|
q.Reset()
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func init() {
|
||
|
|
aggregators.Add("quantile", func() telegraf.Aggregator {
|
||
|
|
return &Quantile{Compression: 100}
|
||
|
|
})
|
||
|
|
}
|