Add quantile aggregator plugin (#8594)

This commit is contained in:
Sven Rebhan 2021-02-17 23:22:33 +01:00 committed by GitHub
parent a5385a2557
commit b6b5d34060
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1050 additions and 11 deletions

8
go.mod
View File

@ -36,7 +36,7 @@ require (
github.com/bitly/go-hostpool v0.1.0 // indirect
github.com/bmatcuk/doublestar/v3 v3.0.0
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869
github.com/caio/go-tdigest v2.3.0+incompatible // indirect
github.com/caio/go-tdigest v3.1.0+incompatible
github.com/cenkalti/backoff v2.0.0+incompatible // indirect
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20190531143454-82441e232cf6
github.com/cockroachdb/apd v1.1.0 // indirect
@ -92,7 +92,6 @@ require (
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/kubernetes/apimachinery v0.0.0-20190119020841-d41becfba9ee
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 // indirect
github.com/lib/pq v1.3.0 // indirect
github.com/mailru/easyjson v0.0.0-20180717111219-efc7eb8984d6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1
@ -135,7 +134,7 @@ require (
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc // indirect
github.com/vjeantet/grok v1.0.1
github.com/vmware/govmomi v0.19.0
github.com/wavefronthq/wavefront-sdk-go v0.9.2
github.com/wavefronthq/wavefront-sdk-go v0.9.7
github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
@ -150,7 +149,6 @@ require (
golang.org/x/text v0.3.3
golang.org/x/tools v0.0.0-20200317043434-63da46f3035e // indirect
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4
gonum.org/v1/gonum v0.6.2 // indirect
google.golang.org/api v0.20.0
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884
google.golang.org/grpc v1.33.1
@ -159,7 +157,7 @@ require (
gopkg.in/ldap.v3 v3.1.0
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce
gopkg.in/olivere/elastic.v5 v5.0.70
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v2 v2.3.0
gotest.tools v2.2.0+incompatible
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
k8s.io/apimachinery v0.17.1 // indirect

15
go.sum
View File

@ -146,8 +146,8 @@ github.com/bmatcuk/doublestar/v3 v3.0.0 h1:TQtVPlDnAYwcrVNB2JiGuMc++H5qzWZd9PhkN
github.com/bmatcuk/doublestar/v3 v3.0.0/go.mod h1:6PcTVMw80pCY1RVuoqu3V++99uQB3vsSYKPTd8AWA0k=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/caio/go-tdigest v2.3.0+incompatible h1:zP6nR0nTSUzlSqqr7F/LhslPlSZX/fZeGmgmwj2cxxY=
github.com/caio/go-tdigest v2.3.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI=
github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds=
github.com/caio/go-tdigest v3.1.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI=
github.com/cenkalti/backoff v2.0.0+incompatible h1:5IIPUHhlnUZbcHQsQou5k1Tn58nJkeJL9U+ig5CHJbY=
github.com/cenkalti/backoff v2.0.0+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -607,6 +607,7 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62 h1:Oj2e7Sae4XrOsk3ij21QjjEgAcVSeo9nkp0dI//cD2o=
@ -628,8 +629,8 @@ github.com/vjeantet/grok v1.0.1 h1:2rhIR7J4gThTgcZ1m2JY4TrJZNgjn985U28kT2wQrJ4=
github.com/vjeantet/grok v1.0.1/go.mod h1:ax1aAchzC6/QMXMcyzHQGZWaW1l195+uMYIkCWPCNIo=
github.com/vmware/govmomi v0.19.0 h1:CR6tEByWCPOnRoRyhLzuHaU+6o2ybF3qufNRWS/MGrY=
github.com/vmware/govmomi v0.19.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU=
github.com/wavefronthq/wavefront-sdk-go v0.9.2 h1:/LvWgZYNjHFUg+ZUX+qv+7e+M8sEMi0lM15zPp681Gk=
github.com/wavefronthq/wavefront-sdk-go v0.9.2/go.mod h1:hQI6y8M9OtTCtc0xdwh+dCER4osxXdEAeCpacjpDZEU=
github.com/wavefronthq/wavefront-sdk-go v0.9.7 h1:SrtABcXXeKCW5SerQYsnCzHo15GeggjZmL+DjtTy6CI=
github.com/wavefronthq/wavefront-sdk-go v0.9.7/go.mod h1:JTGsu+KKgxx+GitC65VVdftN2iep1nVpQi/8EGR6v4Y=
github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf h1:TOV5PC6fIWwFOFra9xJfRXZcL2pLhMI8oNuDugNxg9Q=
github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf/go.mod h1:nxx7XRXbR9ykhnC8lXqQyJS0rfvJGxKyKw/sT1YOttg=
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a h1:ILoU84rj4AQ3q6cjQvtb9jBjx4xzR/Riq/zYhmDQiOk=
@ -851,8 +852,8 @@ golang.zx2c4.com/wireguard v0.0.20200121/go.mod h1:P2HsVp8SKwZEufsnezXZA4GRX/T49
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4 h1:KTi97NIQGgSMaN0v/oxniJV0MEzfzmrDUOAWxombQVc=
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4/go.mod h1:UdS9frhv65KTfwxME1xE8+rHYoFpbm36gOud1GhBe9c=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.6.2 h1:4r+yNT0+8SWcOkXP+63H2zQbN+USnC73cjGUxnDF94Q=
gonum.org/v1/gonum v0.6.2/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU=
gonum.org/v1/gonum v0.7.0 h1:Hdks0L0hgznZLG9nzXb8vZ0rRvqNvAcgAp84y7Mwkgw=
gonum.org/v1/gonum v0.7.0/go.mod h1:L02bwd0sqlsvRv41G7wGWFCsVNZFv/k1xzGIxeANHGM=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0 h1:OE9mWmgKkjJyEmDAAtGMPjXu+YNeGvK9VTSHY6+Qihc=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
@ -958,6 +959,8 @@ gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -7,5 +7,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
_ "github.com/influxdata/telegraf/plugins/aggregators/merge"
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
_ "github.com/influxdata/telegraf/plugins/aggregators/quantile"
_ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter"
)

View File

@ -0,0 +1,127 @@
# Quantile Aggregator Plugin
The quantile aggregator plugin aggregates specified quantiles for each numeric field
per metric it sees and emits the quantiles every `period`.
### Configuration
```toml
[[aggregators.quantile]]
## General Aggregator Arguments:
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## Quantiles to output in the range [0,1]
# quantiles = [0.25, 0.5, 0.75]
## Type of aggregation algorithm
## Supported are:
## "t-digest" -- approximation using centroids, can cope with large number of samples
## "exact R7" -- exact computation also used by Excel or NumPy (Hyndman & Fan 1996 R7)
## "exact R8" -- exact computation (Hyndman & Fan 1996 R8)
## NOTE: Do not use "exact" algorithms with large number of samples
## to not impair performance or memory consumption!
# algorithm = "t-digest"
## Compression for approximation (t-digest). The value needs to be
## greater or equal to 1.0. Smaller values will result in more
## performance but less accuracy.
# compression = 100.0
```
#### Algorithm types
##### t-digest
Proposed by [Dunning & Ertl (2019)][tdigest_paper] this type uses a
special data-structure to cluster data. These clusters are later used
to approximate the requested quantiles. The bounds of the approximation
can be controlled by the `compression` setting where smaller values
result in higher performance but less accuracy.
Due to its incremental nature, this algorithm can handle large
numbers of samples efficiently. It is recommended for applications
where exact quantile calculation isn't required.
For implementation details see the underlying [golang library][tdigest_lib].
##### exact R7 and R8
These algorithms compute quantiles as described in [Hyndman & Fan (1996)][hyndman_fan].
The R7 variant is used in Excel and NumPy. The R8 variant is recommended
by Hyndman & Fan due to its independence of the underlying sample distribution.
These algorithms save all data for the aggregation `period`. They require
a lot of memory when used with a large number of series or a
large number of samples. They are slower than the `t-digest`
algorithm and are recommended only to be used with a small number of samples and series.
#### Benchmark (linux/amd64)
The benchmark was performed by adding 100 metrics with six numeric
(and two non-numeric) fields to the aggregator and the derive the aggregation
result.
| algorithm | # quantiles | avg. runtime |
| :------------ | -------------:| -------------:|
| t-digest | 3 | 376372 ns/op |
| exact R7 | 3 | 9782946 ns/op |
| exact R8 | 3 | 9158205 ns/op |
| t-digest | 100 | 899204 ns/op |
| exact R7 | 100 | 7868816 ns/op |
| exact R8 | 100 | 8099612 ns/op |
### Measurements
Measurement names are passed trough this aggregator.
### Fields
For all numeric fields (int32/64, uint32/64 and float32/64) new *quantile*
fields are aggregated in the form `<fieldname>_<quantile*100>`. Other field
types (e.g. boolean, string) are ignored and dropped from the output.
For example passing in the following metric as *input*:
- somemetric
- average_response_ms (float64)
- minimum_response_ms (float64)
- maximum_response_ms (float64)
- status (string)
- ok (boolean)
and the default setting for `quantiles ` you get the following *output*
- somemetric
- average_response_ms_025 (float64)
- average_response_ms_050 (float64)
- average_response_ms_075 (float64)
- minimum_response_ms_025 (float64)
- minimum_response_ms_050 (float64)
- minimum_response_ms_075 (float64)
- maximum_response_ms_025 (float64)
- maximum_response_ms_050 (float64)
- maximum_response_ms_075 (float64)
The `status` and `ok` fields are dropped because they are not numeric. Note that the
number of resulting fields scales with the number of `quantiles` specified.
### Tags
Tags are passed through to the output by this aggregator.
### Example Output
```
cpu,cpu=cpu-total,host=Hugin usage_user=10.814851731872487,usage_system=2.1679541490155687,usage_irq=1.046598554697342,usage_steal=0,usage_guest_nice=0,usage_idle=85.79616247197244,usage_nice=0,usage_iowait=0,usage_softirq=0.1744330924495688,usage_guest=0 1608288360000000000
cpu,cpu=cpu-total,host=Hugin usage_guest=0,usage_system=2.1601016518428664,usage_iowait=0.02541296060990694,usage_irq=1.0165184243964942,usage_softirq=0.1778907242693666,usage_steal=0,usage_guest_nice=0,usage_user=9.275730622616953,usage_idle=87.34434561626493,usage_nice=0 1608288370000000000
cpu,cpu=cpu-total,host=Hugin usage_idle=85.78199052131747,usage_nice=0,usage_irq=1.0476428036915637,usage_guest=0,usage_guest_nice=0,usage_system=1.995510102269591,usage_iowait=0,usage_softirq=0.1995510102269662,usage_steal=0,usage_user=10.975305562484735 1608288380000000000
cpu,cpu=cpu-total,host=Hugin usage_guest_nice_075=0,usage_user_050=10.814851731872487,usage_guest_075=0,usage_steal_025=0,usage_irq_025=1.031558489546918,usage_irq_075=1.0471206791944527,usage_iowait_025=0,usage_guest_050=0,usage_guest_nice_050=0,usage_nice_075=0,usage_iowait_050=0,usage_system_050=2.1601016518428664,usage_irq_050=1.046598554697342,usage_guest_nice_025=0,usage_idle_050=85.79616247197244,usage_softirq_075=0.1887208672481664,usage_steal_075=0,usage_system_025=2.0778058770562287,usage_system_075=2.1640279004292173,usage_softirq_050=0.1778907242693666,usage_nice_050=0,usage_iowait_075=0.01270648030495347,usage_user_075=10.895078647178611,usage_nice_025=0,usage_steal_050=0,usage_user_025=10.04529117724472,usage_idle_025=85.78907649664495,usage_idle_075=86.57025404411868,usage_softirq_025=0.1761619083594677,usage_guest_025=0 1608288390000000000
```
# References
- Dunning & Ertl: "Computing Extremely Accurate Quantiles Using t-Digests", arXiv:1902.04023 (2019) [pdf][tdigest_paper]
- Hyndman & Fan: "Sample Quantiles in Statistical Packages", The American Statistician, vol. 50, pp. 361-365 (1996) [pdf][hyndman_fan]
[tdigest_paper]: https://arxiv.org/abs/1902.04023
[tdigest_lib]: https://github.com/caio/go-tdigest
[hyndman_fan]: http://www.maths.usyd.edu.au/u/UG/SM/STAT3022/r/current/Misc/Sample%20Quantiles%20in%20Statistical%20Packages.pdf

View File

@ -0,0 +1,110 @@
package quantile
import (
"math"
"sort"
"github.com/caio/go-tdigest"
)
type algorithm interface {
Add(value float64) error
Quantile(q float64) float64
}
func newTDigest(compression float64) (algorithm, error) {
return tdigest.New(tdigest.Compression(compression))
}
type exactAlgorithmR7 struct {
xs []float64
sorted bool
}
func newExactR7(compression float64) (algorithm, error) {
return &exactAlgorithmR7{xs: make([]float64, 0, 100), sorted: false}, nil
}
func (e *exactAlgorithmR7) Add(value float64) error {
e.xs = append(e.xs, value)
e.sorted = false
return nil
}
func (e *exactAlgorithmR7) Quantile(q float64) float64 {
size := len(e.xs)
// No information
if len(e.xs) == 0 {
return math.NaN()
}
// Sort the array if necessary
if !e.sorted {
sort.Float64s(e.xs)
e.sorted = true
}
// Get the quantile index and the fraction to the neighbor
// Hyndman & Fan; Sample Quantiles in Statistical Packages; The American Statistician vol 50; pp 361-365; 1996 -- R7
// Same as Excel and Numpy.
N := float64(size)
n := q * (N - 1)
i, gamma := math.Modf(n)
j := int(i)
if j < 0 {
return e.xs[0]
}
if j >= size {
return e.xs[size-1]
}
// Linear interpolation
return e.xs[j] + gamma*(e.xs[j+1]-e.xs[j])
}
type exactAlgorithmR8 struct {
xs []float64
sorted bool
}
func newExactR8(compression float64) (algorithm, error) {
return &exactAlgorithmR8{xs: make([]float64, 0, 100), sorted: false}, nil
}
func (e *exactAlgorithmR8) Add(value float64) error {
e.xs = append(e.xs, value)
e.sorted = false
return nil
}
func (e *exactAlgorithmR8) Quantile(q float64) float64 {
size := len(e.xs)
// No information
if size == 0 {
return math.NaN()
}
// Sort the array if necessary
if !e.sorted {
sort.Float64s(e.xs)
e.sorted = true
}
// Get the quantile index and the fraction to the neighbor
// Hyndman & Fan; Sample Quantiles in Statistical Packages; The American Statistician vol 50; pp 361-365; 1996 -- R8
N := float64(size)
n := q*(N+1.0/3.0) - (2.0 / 3.0) // Indices are zero-base here but one-based in the paper
i, gamma := math.Modf(n)
j := int(i)
if j < 0 {
return e.xs[0]
}
if j >= size {
return e.xs[size-1]
}
// Linear interpolation
return e.xs[j] + gamma*(e.xs[j+1]-e.xs[j])
}

View File

@ -0,0 +1,165 @@
package quantile
import (
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
)
type Quantile struct {
Quantiles []float64 `toml:"quantiles"`
Compression float64 `toml:"compression"`
AlgorithmType string `toml:"algorithm"`
newAlgorithm newAlgorithmFunc
cache map[uint64]aggregate
suffixes []string
}
type aggregate struct {
name string
fields map[string]algorithm
tags map[string]string
}
type newAlgorithmFunc func(compression float64) (algorithm, error)
var sampleConfig = `
## General Aggregator Arguments:
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
## Quantiles to output in the range [0,1]
# quantiles = [0.25, 0.5, 0.75]
## Type of aggregation algorithm
## Supported are:
## "t-digest" -- approximation using centroids, can cope with large number of samples
## "exact R7" -- exact computation also used by Excel or NumPy (Hyndman & Fan 1996 R7)
## "exact R8" -- exact computation (Hyndman & Fan 1996 R8)
## NOTE: Do not use "exact" algorithms with large number of samples
## to not impair performance or memory consumption!
# algorithm = "t-digest"
## Compression for approximation (t-digest). The value needs to be
## greater or equal to 1.0. Smaller values will result in more
## performance but less accuracy.
# compression = 100.0
`
func (q *Quantile) SampleConfig() string {
return sampleConfig
}
func (q *Quantile) Description() string {
return "Keep the aggregate quantiles of each metric passing through."
}
func (q *Quantile) Add(in telegraf.Metric) {
id := in.HashID()
if cached, ok := q.cache[id]; ok {
fields := in.Fields()
for k, algo := range cached.fields {
if field, ok := fields[k]; ok {
if v, isconvertible := convert(field); isconvertible {
algo.Add(v)
}
}
}
return
}
// New metric, setup cache and init algorithm
a := aggregate{
name: in.Name(),
tags: in.Tags(),
fields: make(map[string]algorithm),
}
for k, field := range in.Fields() {
if v, isconvertible := convert(field); isconvertible {
// This should never error out as we tested it in Init()
algo, _ := q.newAlgorithm(q.Compression)
algo.Add(v)
a.fields[k] = algo
}
}
q.cache[id] = a
}
func (q *Quantile) Push(acc telegraf.Accumulator) {
for _, aggregate := range q.cache {
fields := map[string]interface{}{}
for k, algo := range aggregate.fields {
for i, qtl := range q.Quantiles {
fields[k+q.suffixes[i]] = algo.Quantile(qtl)
}
}
acc.AddFields(aggregate.name, fields, aggregate.tags)
}
}
func (q *Quantile) Reset() {
q.cache = make(map[uint64]aggregate)
}
func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
return v, true
case int64:
return float64(v), true
case uint64:
return float64(v), true
default:
return 0, false
}
}
func (q *Quantile) Init() error {
switch q.AlgorithmType {
case "t-digest", "":
q.newAlgorithm = newTDigest
case "exact R7":
q.newAlgorithm = newExactR7
case "exact R8":
q.newAlgorithm = newExactR8
default:
return fmt.Errorf("unknown algorithm type %q", q.AlgorithmType)
}
if _, err := q.newAlgorithm(q.Compression); err != nil {
return fmt.Errorf("cannot create %q algorithm: %v", q.AlgorithmType, err)
}
if len(q.Quantiles) == 0 {
q.Quantiles = []float64{0.25, 0.5, 0.75}
}
duplicates := make(map[float64]bool)
q.suffixes = make([]string, len(q.Quantiles))
for i, qtl := range q.Quantiles {
if qtl < 0.0 || qtl > 1.0 {
return fmt.Errorf("quantile %v out of range", qtl)
}
if _, found := duplicates[qtl]; found {
return fmt.Errorf("duplicate quantile %v", qtl)
}
duplicates[qtl] = true
q.suffixes[i] = fmt.Sprintf("_%03d", int(qtl*100.0))
}
q.Reset()
return nil
}
func init() {
aggregators.Add("quantile", func() telegraf.Aggregator {
return &Quantile{Compression: 100}
})
}

View File

@ -0,0 +1,635 @@
package quantile
import (
"math/rand"
"testing"
"time"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestConfigInvalidAlgorithm(t *testing.T) {
q := Quantile{AlgorithmType: "a strange one"}
err := q.Init()
require.Error(t, err)
require.Contains(t, err.Error(), "unknown algorithm type")
}
func TestConfigInvalidCompression(t *testing.T) {
q := Quantile{Compression: 0, AlgorithmType: "t-digest"}
err := q.Init()
require.Error(t, err)
require.Contains(t, err.Error(), "cannot create \"t-digest\" algorithm")
}
func TestConfigInvalidQuantiles(t *testing.T) {
q := Quantile{Compression: 100, Quantiles: []float64{-0.5}}
err := q.Init()
require.Error(t, err)
require.Contains(t, err.Error(), "quantile -0.5 out of range")
q = Quantile{Compression: 100, Quantiles: []float64{1.5}}
err = q.Init()
require.Error(t, err)
require.Contains(t, err.Error(), "quantile 1.5 out of range")
q = Quantile{Compression: 100, Quantiles: []float64{0.1, 0.2, 0.3, 0.1}}
err = q.Init()
require.Error(t, err)
require.Contains(t, err.Error(), "duplicate quantile")
}
func TestSingleMetricTDigest(t *testing.T) {
acc := testutil.Accumulator{}
q := Quantile{Compression: 100}
err := q.Init()
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a_025": 24.75,
"a_050": 49.50,
"a_075": 74.25,
"b_025": 24.75,
"b_050": 49.50,
"b_075": 74.25,
"c_025": 24.75,
"c_050": 49.50,
"c_075": 74.25,
"d_025": 24.75,
"d_050": 49.50,
"d_075": 74.25,
"e_025": 24.75,
"e_050": 49.50,
"e_075": 74.25,
"f_025": 24.75,
"f_050": 49.50,
"f_075": 74.25,
"g_025": 0.2475,
"g_050": 0.4950,
"g_075": 0.7425,
},
time.Now(),
),
}
metrics := make([]telegraf.Metric, 100)
for i := range metrics {
metrics[i] = testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": int32(i),
"b": int64(i),
"c": uint32(i),
"d": uint64(i),
"e": float32(i),
"f": float64(i),
"g": float64(i) / 100.0,
"x1": "string",
"x2": true,
},
time.Now(),
)
}
for _, m := range metrics {
q.Add(m)
}
q.Push(&acc)
epsilon := cmpopts.EquateApprox(0, 1e-3)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime(), epsilon)
}
func TestMultipleMetricsTDigest(t *testing.T) {
acc := testutil.Accumulator{}
q := Quantile{Compression: 100}
err := q.Init()
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{"series": "foo"},
map[string]interface{}{
"a_025": 24.75, "a_050": 49.50, "a_075": 74.25,
"b_025": 24.75, "b_050": 49.50, "b_075": 74.25,
},
time.Now(),
),
testutil.MustMetric(
"test",
map[string]string{"series": "bar"},
map[string]interface{}{
"a_025": 49.50, "a_050": 99.00, "a_075": 148.50,
"b_025": 49.50, "b_050": 99.00, "b_075": 148.50,
},
time.Now(),
),
}
metricsA := make([]telegraf.Metric, 100)
metricsB := make([]telegraf.Metric, 100)
for i := range metricsA {
metricsA[i] = testutil.MustMetric(
"test",
map[string]string{"series": "foo"},
map[string]interface{}{"a": int64(i), "b": float64(i), "x1": "string", "x2": true},
time.Now(),
)
}
for i := range metricsB {
metricsB[i] = testutil.MustMetric(
"test",
map[string]string{"series": "bar"},
map[string]interface{}{"a": int64(2 * i), "b": float64(2 * i), "x1": "string", "x2": true},
time.Now(),
)
}
for _, m := range metricsA {
q.Add(m)
}
for _, m := range metricsB {
q.Add(m)
}
q.Push(&acc)
epsilon := cmpopts.EquateApprox(0, 1e-3)
sort := testutil.SortMetrics()
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime(), epsilon, sort)
}
func TestSingleMetricExactR7(t *testing.T) {
acc := testutil.Accumulator{}
q := Quantile{AlgorithmType: "exact R7"}
err := q.Init()
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a_025": 24.75,
"a_050": 49.50,
"a_075": 74.25,
"b_025": 24.75,
"b_050": 49.50,
"b_075": 74.25,
"c_025": 24.75,
"c_050": 49.50,
"c_075": 74.25,
"d_025": 24.75,
"d_050": 49.50,
"d_075": 74.25,
"e_025": 24.75,
"e_050": 49.50,
"e_075": 74.25,
"f_025": 24.75,
"f_050": 49.50,
"f_075": 74.25,
"g_025": 0.2475,
"g_050": 0.4950,
"g_075": 0.7425,
},
time.Now(),
),
}
metrics := make([]telegraf.Metric, 100)
for i := range metrics {
metrics[i] = testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": int32(i),
"b": int64(i),
"c": uint32(i),
"d": uint64(i),
"e": float32(i),
"f": float64(i),
"g": float64(i) / 100.0,
"x1": "string",
"x2": true,
},
time.Now(),
)
}
for _, m := range metrics {
q.Add(m)
}
q.Push(&acc)
epsilon := cmpopts.EquateApprox(0, 1e-3)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime(), epsilon)
}
func TestMultipleMetricsExactR7(t *testing.T) {
acc := testutil.Accumulator{}
q := Quantile{AlgorithmType: "exact R7"}
err := q.Init()
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{"series": "foo"},
map[string]interface{}{
"a_025": 24.75, "a_050": 49.50, "a_075": 74.25,
"b_025": 24.75, "b_050": 49.50, "b_075": 74.25,
},
time.Now(),
),
testutil.MustMetric(
"test",
map[string]string{"series": "bar"},
map[string]interface{}{
"a_025": 49.50, "a_050": 99.00, "a_075": 148.50,
"b_025": 49.50, "b_050": 99.00, "b_075": 148.50,
},
time.Now(),
),
}
metricsA := make([]telegraf.Metric, 100)
metricsB := make([]telegraf.Metric, 100)
for i := range metricsA {
metricsA[i] = testutil.MustMetric(
"test",
map[string]string{"series": "foo"},
map[string]interface{}{"a": int64(i), "b": float64(i), "x1": "string", "x2": true},
time.Now(),
)
}
for i := range metricsB {
metricsB[i] = testutil.MustMetric(
"test",
map[string]string{"series": "bar"},
map[string]interface{}{"a": int64(2 * i), "b": float64(2 * i), "x1": "string", "x2": true},
time.Now(),
)
}
for _, m := range metricsA {
q.Add(m)
}
for _, m := range metricsB {
q.Add(m)
}
q.Push(&acc)
epsilon := cmpopts.EquateApprox(0, 1e-3)
sort := testutil.SortMetrics()
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime(), epsilon, sort)
}
func TestSingleMetricExactR8(t *testing.T) {
acc := testutil.Accumulator{}
q := Quantile{AlgorithmType: "exact R8"}
err := q.Init()
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a_025": 24.417,
"a_050": 49.500,
"a_075": 74.583,
"b_025": 24.417,
"b_050": 49.500,
"b_075": 74.583,
"c_025": 24.417,
"c_050": 49.500,
"c_075": 74.583,
"d_025": 24.417,
"d_050": 49.500,
"d_075": 74.583,
"e_025": 24.417,
"e_050": 49.500,
"e_075": 74.583,
"f_025": 24.417,
"f_050": 49.500,
"f_075": 74.583,
"g_025": 0.24417,
"g_050": 0.49500,
"g_075": 0.74583,
},
time.Now(),
),
}
metrics := make([]telegraf.Metric, 100)
for i := range metrics {
metrics[i] = testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": int32(i),
"b": int64(i),
"c": uint32(i),
"d": uint64(i),
"e": float32(i),
"f": float64(i),
"g": float64(i) / 100.0,
"x1": "string",
"x2": true,
},
time.Now(),
)
}
for _, m := range metrics {
q.Add(m)
}
q.Push(&acc)
epsilon := cmpopts.EquateApprox(0, 1e-3)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime(), epsilon)
}
func TestMultipleMetricsExactR8(t *testing.T) {
acc := testutil.Accumulator{}
q := Quantile{AlgorithmType: "exact R8"}
err := q.Init()
require.NoError(t, err)
expected := []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{"series": "foo"},
map[string]interface{}{
"a_025": 24.417, "a_050": 49.500, "a_075": 74.583,
"b_025": 24.417, "b_050": 49.500, "b_075": 74.583,
},
time.Now(),
),
testutil.MustMetric(
"test",
map[string]string{"series": "bar"},
map[string]interface{}{
"a_025": 48.833, "a_050": 99.000, "a_075": 149.167,
"b_025": 48.833, "b_050": 99.000, "b_075": 149.167,
},
time.Now(),
),
}
metricsA := make([]telegraf.Metric, 100)
metricsB := make([]telegraf.Metric, 100)
for i := range metricsA {
metricsA[i] = testutil.MustMetric(
"test",
map[string]string{"series": "foo"},
map[string]interface{}{"a": int64(i), "b": float64(i), "x1": "string", "x2": true},
time.Now(),
)
}
for i := range metricsB {
metricsB[i] = testutil.MustMetric(
"test",
map[string]string{"series": "bar"},
map[string]interface{}{"a": int64(2 * i), "b": float64(2 * i), "x1": "string", "x2": true},
time.Now(),
)
}
for _, m := range metricsA {
q.Add(m)
}
for _, m := range metricsB {
q.Add(m)
}
q.Push(&acc)
epsilon := cmpopts.EquateApprox(0, 1e-3)
sort := testutil.SortMetrics()
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime(), epsilon, sort)
}
func BenchmarkDefaultTDigest(b *testing.B) {
metrics := make([]telegraf.Metric, 100)
for i := range metrics {
metrics[i] = testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": rand.Int31(),
"b": rand.Int63(),
"c": rand.Uint32(),
"d": rand.Uint64(),
"e": rand.Float32(),
"f": rand.Float64(),
"x1": "string",
"x2": true,
},
time.Now(),
)
}
q := Quantile{Compression: 100}
err := q.Init()
require.NoError(b, err)
acc := testutil.Accumulator{}
for n := 0; n < b.N; n++ {
for _, m := range metrics {
q.Add(m)
}
q.Push(&acc)
}
}
func BenchmarkDefaultTDigest100Q(b *testing.B) {
metrics := make([]telegraf.Metric, 100)
for i := range metrics {
metrics[i] = testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": rand.Int31(),
"b": rand.Int63(),
"c": rand.Uint32(),
"d": rand.Uint64(),
"e": rand.Float32(),
"f": rand.Float64(),
"x1": "string",
"x2": true,
},
time.Now(),
)
}
quantiles := make([]float64, 100)
for i := range quantiles {
quantiles[i] = 0.01 * float64(i)
}
q := Quantile{Compression: 100, Quantiles: quantiles}
err := q.Init()
require.NoError(b, err)
acc := testutil.Accumulator{}
for n := 0; n < b.N; n++ {
for _, m := range metrics {
q.Add(m)
}
q.Push(&acc)
}
}
func BenchmarkDefaultExactR7(b *testing.B) {
metrics := make([]telegraf.Metric, 100)
for i := range metrics {
metrics[i] = testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": rand.Int31(),
"b": rand.Int63(),
"c": rand.Uint32(),
"d": rand.Uint64(),
"e": rand.Float32(),
"f": rand.Float64(),
"x1": "string",
"x2": true,
},
time.Now(),
)
}
q := Quantile{AlgorithmType: "exact R7"}
err := q.Init()
require.NoError(b, err)
acc := testutil.Accumulator{}
for n := 0; n < b.N; n++ {
for _, m := range metrics {
q.Add(m)
}
q.Push(&acc)
}
}
func BenchmarkDefaultExactR7100Q(b *testing.B) {
metrics := make([]telegraf.Metric, 100)
for i := range metrics {
metrics[i] = testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": rand.Int31(),
"b": rand.Int63(),
"c": rand.Uint32(),
"d": rand.Uint64(),
"e": rand.Float32(),
"f": rand.Float64(),
"x1": "string",
"x2": true,
},
time.Now(),
)
}
quantiles := make([]float64, 100)
for i := range quantiles {
quantiles[i] = 0.01 * float64(i)
}
q := Quantile{AlgorithmType: "exact R7", Quantiles: quantiles}
err := q.Init()
require.NoError(b, err)
acc := testutil.Accumulator{}
for n := 0; n < b.N; n++ {
for _, m := range metrics {
q.Add(m)
}
q.Push(&acc)
}
}
func BenchmarkDefaultExactR8(b *testing.B) {
metrics := make([]telegraf.Metric, 100)
for i := range metrics {
metrics[i] = testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": rand.Int31(),
"b": rand.Int63(),
"c": rand.Uint32(),
"d": rand.Uint64(),
"e": rand.Float32(),
"f": rand.Float64(),
"x1": "string",
"x2": true,
},
time.Now(),
)
}
q := Quantile{AlgorithmType: "exact R8"}
err := q.Init()
require.NoError(b, err)
acc := testutil.Accumulator{}
for n := 0; n < b.N; n++ {
for _, m := range metrics {
q.Add(m)
}
q.Push(&acc)
}
}
func BenchmarkDefaultExactR8100Q(b *testing.B) {
metrics := make([]telegraf.Metric, 100)
for i := range metrics {
metrics[i] = testutil.MustMetric(
"test",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": rand.Int31(),
"b": rand.Int63(),
"c": rand.Uint32(),
"d": rand.Uint64(),
"e": rand.Float32(),
"f": rand.Float64(),
"x1": "string",
"x2": true,
},
time.Now(),
)
}
quantiles := make([]float64, 100)
for i := range quantiles {
quantiles[i] = 0.01 * float64(i)
}
q := Quantile{AlgorithmType: "exact R8", Quantiles: quantiles}
err := q.Init()
require.NoError(b, err)
acc := testutil.Accumulator{}
for n := 0; n < b.N; n++ {
for _, m := range metrics {
q.Add(m)
}
q.Push(&acc)
}
}