Add Derivative Aggregator Plugin (#3762)
Calculate derivatives based on time or fields.
This commit is contained in:
parent
ee09a39de5
commit
927d34f66c
|
|
@ -3,6 +3,7 @@ package all
|
|||
import (
|
||||
//Blank imports for plugins to register themselves
|
||||
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats"
|
||||
_ "github.com/influxdata/telegraf/plugins/aggregators/derivative"
|
||||
_ "github.com/influxdata/telegraf/plugins/aggregators/final"
|
||||
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
|
||||
_ "github.com/influxdata/telegraf/plugins/aggregators/merge"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,166 @@
|
|||
# Derivative Aggregator Plugin
|
||||
The Derivative Aggregator Plugin estimates the derivative for all fields of the
|
||||
aggregated metrics.
|
||||
|
||||
### Time Derivatives
|
||||
|
||||
In its default configuration it determines the first and last measurement of
|
||||
the period. From these measurements the time difference in seconds is
|
||||
calculated. This time difference is than used to divide the difference of each
|
||||
field using the following formula:
|
||||
```
|
||||
field_last - field_first
|
||||
derivative = --------------------------
|
||||
time_difference
|
||||
```
|
||||
For each field the derivative is emitted with a naming pattern
|
||||
`<fieldname>_rate`.
|
||||
|
||||
### Custom Derivation Variable
|
||||
|
||||
The plugin supports to use a field of the aggregated measurements as derivation
|
||||
variable in the denominator. This variable is assumed to be a monotonically
|
||||
increasing value. In this feature the following formula is used:
|
||||
```
|
||||
field_last - field_first
|
||||
derivative = --------------------------------
|
||||
variable_last - variable_first
|
||||
```
|
||||
**Make sure the specified variable is not filtered and exists in the metrics passed to this aggregator!**
|
||||
|
||||
When using a custom derivation variable, you should change the `suffix` of the derivative name.
|
||||
See the next section on [customizing the derivative name](#customize-the-derivative-name) for details.
|
||||
|
||||
### Customize the Derivative Name
|
||||
|
||||
The derivatives generated by the aggregator are named `<fieldname>_rate`, i.e. they are composed of the field name and a suffix `_rate`.
|
||||
You can configure the suffix to be used by changing the `suffix` parameter.
|
||||
|
||||
### Roll-Over to next Period
|
||||
|
||||
Calculating the derivative for a period requires at least two distinct measurements during that period.
|
||||
Whether those are available depends on the configuration of the aggregator `period` and the agent `interval`.
|
||||
By default the last measurement is used as first measurement in the next
|
||||
aggregation period. This enables a continuous calculation of the derivative. If
|
||||
within the next period an earlier timestamp is encountered this measurement will
|
||||
replace the roll-over metric. A main benefit of this roll-over is the ability to
|
||||
cope with multiple "quiet" periods, where no new measurement is pushed to the
|
||||
aggregator. The roll-over will take place at most `max_roll_over` times.
|
||||
|
||||
#### Example of Roll-Over
|
||||
|
||||
Let us assume we have an input plugin, that generates a measurement with a single metric "test" every 2 seconds.
|
||||
Let this metric increase the first 10 seconds from 0.0 to 10.0 and then decrease the next 10 seconds form 10.0 to 0.0:
|
||||
|
||||
| timestamp | value |
|
||||
|-----------|-------|
|
||||
| 0 | 0.0 |
|
||||
| 2 | 2.0 |
|
||||
| 4 | 4.0 |
|
||||
| 6 | 6.0 |
|
||||
| 8 | 8.0 |
|
||||
| 10 | 10.0 |
|
||||
| 12 | 8.0 |
|
||||
| 14 | 6.0 |
|
||||
| 16 | 4.0 |
|
||||
| 18 | 2.0 |
|
||||
| 20 | 0.0 |
|
||||
|
||||
To avoid thinking about border values, we consider periods to be inclusive at the start but exclusive in the end.
|
||||
Using `period = "10s"` and `max_roll_over = 0` we would get the following aggregates:
|
||||
|
||||
| timestamp | value | aggregate | explanantion |
|
||||
|-----------|-------|-----------|--------------|
|
||||
| 0 | 0.0 |
|
||||
| 2 | 2.0 |
|
||||
| 4 | 4.0 |
|
||||
| 6 | 6.0 |
|
||||
| 8 | 8.0 |
|
||||
||| 1.0 | (8.0 - 0.0) / (8 - 0) |
|
||||
| 10 | 10.0 |
|
||||
| 12 | 8.0 |
|
||||
| 14 | 6.0 |
|
||||
| 16 | 4.0 |
|
||||
| 18 | 2.0 |
|
||||
||| -1.0 | (2.0 - 10.0) / (18 - 10)
|
||||
| 20 | 0.0 |
|
||||
|
||||
If we now decrease the period with `period = 2s`, no derivative could be calculated since there would only one measurement for each period.
|
||||
The aggregator will emit the log messages `Same first and last event for "test", skipping.`.
|
||||
This changes, if we use `max_roll_over = 1`, since now end measurements of a period are taking as start for the next period.
|
||||
|
||||
| timestamp | value | aggregate | explanantion |
|
||||
|-----------|-------|-----------|--------------|
|
||||
| 0 | 0.0 |
|
||||
| 2 | 2.0 | 1.0 | (2.0 - 0.0) / (2 - 0) |
|
||||
| 4 | 4.0 | 1.0 | (4.0 - 2.0) / (4 - 2) |
|
||||
| 6 | 6.0 | 1.0 | (6.0 - 4.0) / (6 - 4) |
|
||||
| 8 | 8.0 | 1.0 | (8.0 - 6.0) / (8 - 6) |
|
||||
| 10 | 10.0 | 1.0 | (10.0 - 8.0) / (10 - 8) |
|
||||
| 12 | 8.0 | -1.0 | (8.0 - 10.0) / (12 - 10) |
|
||||
| 14 | 6.0 | -1.0 | (6.0 - 8.0) / (14 - 12) |
|
||||
| 16 | 4.0 | -1.0 | (4.0 - 6.0) / (16 - 14) |
|
||||
| 18 | 2.0 | -1.0 | (2.0 - 4.0) / (18 - 16) |
|
||||
| 20 | 0.0 | -1.0 | (0.0 - 2.0) / (20 - 18) |
|
||||
|
||||
The default `max_roll_over = 10` allows for multiple periods without measurements either due to configuration or missing input.
|
||||
|
||||
There may be a slight difference in the calculation when using `max_roll_over` compared to running without.
|
||||
To illustrate this, let us compare the derivatives for `period = "7s"`.
|
||||
|
||||
| timestamp | value | `max_roll_over = 0` | `max_roll_over = 1` |
|
||||
|-----------|-------|-----------|--------------|
|
||||
| 0 | 0.0 |
|
||||
| 2 | 2.0 |
|
||||
| 4 | 4.0 |
|
||||
| 6 | 6.0 |
|
||||
||| 1.0 | 1.0 |
|
||||
| 8 | 8.0 |
|
||||
| 10 | 10.0 |
|
||||
| 12 | 8.0 |
|
||||
||| 0.0 | 0.33... |
|
||||
| 14 | 6.0 |
|
||||
| 16 | 4.0 |
|
||||
| 18 | 2.0 |
|
||||
| 20 | 0.0 |
|
||||
||| -1.0 | -1.0 |
|
||||
|
||||
The difference stems from the change of the value between periods, e.g. from 6.0 to 8.0 between first and second period.
|
||||
Thoses changes are omitted with `max_roll_over = 0` but are respected with `max_roll_over = 1`.
|
||||
That there are no more differences in the calculated derivatives is due to the example data, which has constant derivatives in during the first and last period, even when including the gap between the periods.
|
||||
Using `max_roll_over` with a value greater 0 may be important, if you need to detect changes between periods, e.g. when you have very few measurements in a period or quasi-constant metrics with only occasional changes.
|
||||
|
||||
### Configuration
|
||||
|
||||
```toml
|
||||
[[aggregators.derivative]]
|
||||
## Specific Derivative Aggregator Arguments:
|
||||
|
||||
## Configure a custom derivation variable. Timestamp is used if none is given.
|
||||
# variable = ""
|
||||
|
||||
## Suffix to add to the field name for the derivative name.
|
||||
# suffix = "_rate"
|
||||
|
||||
## Roll-Over last measurement to first measurement of next period
|
||||
# max_roll_over = 10
|
||||
|
||||
## General Aggregator Arguments:
|
||||
|
||||
## calculate derivative every 30 seconds
|
||||
period = "30s"
|
||||
```
|
||||
|
||||
### Tags:
|
||||
No tags are applied by this aggregator.
|
||||
Existing tags are passed throug the aggregator untouched.
|
||||
|
||||
### Example Output
|
||||
|
||||
```
|
||||
net bytes_recv=15409i,packets_recv=164i,bytes_sent=16649i,packets_sent=120i 1508843640000000000
|
||||
net bytes_recv=73987i,packets_recv=364i,bytes_sent=87328i,packets_sent=452i 1508843660000000000
|
||||
net bytes_recv_by_packets_recv=292.89 1508843660000000000
|
||||
net packets_sent_rate=16.6,bytes_sent_rate=3533.95 1508843660000000000
|
||||
net bytes_sent_by_packet=292.89 1508843660000000000
|
||||
```
|
||||
|
|
@ -0,0 +1,224 @@
|
|||
package derivative
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/plugins/aggregators"
|
||||
)
|
||||
|
||||
type Derivative struct {
|
||||
Variable string `toml:"variable"`
|
||||
Suffix string `toml:"suffix"`
|
||||
MaxRollOver uint `toml:"max_roll_over"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
cache map[uint64]*aggregate
|
||||
}
|
||||
|
||||
type aggregate struct {
|
||||
first *event
|
||||
last *event
|
||||
name string
|
||||
tags map[string]string
|
||||
rollOver uint
|
||||
}
|
||||
|
||||
type event struct {
|
||||
fields map[string]float64
|
||||
time time.Time
|
||||
}
|
||||
|
||||
const defaultSuffix = "_rate"
|
||||
|
||||
func NewDerivative() *Derivative {
|
||||
derivative := &Derivative{Suffix: defaultSuffix, MaxRollOver: 10}
|
||||
derivative.cache = make(map[uint64]*aggregate)
|
||||
derivative.Reset()
|
||||
return derivative
|
||||
}
|
||||
|
||||
var sampleConfig = `
|
||||
## The period in which to flush the aggregator.
|
||||
period = "30s"
|
||||
##
|
||||
## If true, the original metric will be dropped by the
|
||||
## aggregator and will not get sent to the output plugins.
|
||||
drop_original = false
|
||||
##
|
||||
## This aggregator will estimate a derivative for each field, which is
|
||||
## contained in both the first and last metric of the aggregation interval.
|
||||
## Without further configuration the derivative will be calculated with
|
||||
## respect to the time difference between these two measurements in seconds.
|
||||
## The formula applied is for every field:
|
||||
##
|
||||
## value_last - value_first
|
||||
## derivative = --------------------------
|
||||
## time_difference_in_seconds
|
||||
##
|
||||
## The resulting derivative will be named *fieldname_rate*. The suffix
|
||||
## "_rate" can be configured by the *suffix* parameter. When using a
|
||||
## derivation variable you can include its name for more clarity.
|
||||
# suffix = "_rate"
|
||||
##
|
||||
## As an abstraction the derivative can be calculated not only by the time
|
||||
## difference but by the difference of a field, which is contained in the
|
||||
## measurement. This field is assumed to be monotonously increasing. This
|
||||
## feature is used by specifying a *variable*.
|
||||
## Make sure the specified variable is not filtered and exists in the metrics
|
||||
## passed to this aggregator!
|
||||
# variable = ""
|
||||
##
|
||||
## When using a field as the derivation parameter the name of that field will
|
||||
## be used for the resulting derivative, e.g. *fieldname_by_parameter*.
|
||||
##
|
||||
## Note, that the calculation is based on the actual timestamp of the
|
||||
## measurements. When there is only one measurement during that period, the
|
||||
## measurement will be rolled over to the next period. The maximum number of
|
||||
## such roll-overs can be configured with a default of 10.
|
||||
# max_roll_over = 10
|
||||
##
|
||||
`
|
||||
|
||||
func (d *Derivative) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (d *Derivative) Description() string {
|
||||
return "Calculates a derivative for every field."
|
||||
}
|
||||
|
||||
func (d *Derivative) Add(in telegraf.Metric) {
|
||||
id := in.HashID()
|
||||
current, ok := d.cache[id]
|
||||
if !ok {
|
||||
// hit an uncached metric, create caches for first time:
|
||||
d.cache[id] = newAggregate(in)
|
||||
return
|
||||
}
|
||||
if current.first.time.After(in.Time()) {
|
||||
current.first = newEvent(in)
|
||||
current.rollOver = 0
|
||||
} else if current.first.time.Equal(in.Time()) {
|
||||
upsertConvertedFields(in.Fields(), current.first.fields)
|
||||
current.rollOver = 0
|
||||
}
|
||||
if current.last.time.Before(in.Time()) {
|
||||
current.last = newEvent(in)
|
||||
current.rollOver = 0
|
||||
} else if current.last.time.Equal(in.Time()) {
|
||||
upsertConvertedFields(in.Fields(), current.last.fields)
|
||||
current.rollOver = 0
|
||||
}
|
||||
}
|
||||
|
||||
func newAggregate(in telegraf.Metric) *aggregate {
|
||||
event := newEvent(in)
|
||||
return &aggregate{
|
||||
name: in.Name(),
|
||||
tags: in.Tags(),
|
||||
first: event,
|
||||
last: event,
|
||||
rollOver: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func newEvent(in telegraf.Metric) *event {
|
||||
return &event{
|
||||
fields: extractConvertedFields(in),
|
||||
time: in.Time(),
|
||||
}
|
||||
}
|
||||
|
||||
func extractConvertedFields(in telegraf.Metric) map[string]float64 {
|
||||
fields := make(map[string]float64, len(in.Fields()))
|
||||
upsertConvertedFields(in.Fields(), fields)
|
||||
return fields
|
||||
}
|
||||
|
||||
func upsertConvertedFields(source map[string]interface{}, target map[string]float64) {
|
||||
for k, v := range source {
|
||||
if value, ok := convert(v); ok {
|
||||
target[k] = value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func convert(in interface{}) (float64, bool) {
|
||||
switch v := in.(type) {
|
||||
case float64:
|
||||
return v, true
|
||||
case int64:
|
||||
return float64(v), true
|
||||
case uint64:
|
||||
return float64(v), true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func (d *Derivative) Push(acc telegraf.Accumulator) {
|
||||
for _, aggregate := range d.cache {
|
||||
if aggregate.first == aggregate.last {
|
||||
d.Log.Debugf("Same first and last event for %q, skipping.", aggregate.name)
|
||||
continue
|
||||
}
|
||||
var denominator float64
|
||||
denominator = aggregate.last.time.Sub(aggregate.first.time).Seconds()
|
||||
if len(d.Variable) > 0 {
|
||||
var first float64
|
||||
var last float64
|
||||
var found bool
|
||||
if first, found = aggregate.first.fields[d.Variable]; !found {
|
||||
d.Log.Debugf("Did not find %q in first event for %q.", d.Variable, aggregate.name)
|
||||
continue
|
||||
}
|
||||
if last, found = aggregate.last.fields[d.Variable]; !found {
|
||||
d.Log.Debugf("Did not find %q in last event for %q.", d.Variable, aggregate.name)
|
||||
continue
|
||||
}
|
||||
denominator = last - first
|
||||
}
|
||||
if denominator == 0 {
|
||||
d.Log.Debugf("Got difference 0 in denominator for %q, skipping.", aggregate.name)
|
||||
continue
|
||||
}
|
||||
derivatives := make(map[string]interface{})
|
||||
for key, start := range aggregate.first.fields {
|
||||
if key == d.Variable {
|
||||
// Skip derivation variable
|
||||
continue
|
||||
}
|
||||
if end, ok := aggregate.last.fields[key]; ok {
|
||||
d.Log.Debugf("Adding derivative %q to %q.", key+d.Suffix, aggregate.name)
|
||||
derivatives[key+d.Suffix] = (end - start) / denominator
|
||||
}
|
||||
}
|
||||
acc.AddFields(aggregate.name, derivatives, aggregate.tags)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Derivative) Reset() {
|
||||
for id, aggregate := range d.cache {
|
||||
if aggregate.rollOver < d.MaxRollOver {
|
||||
aggregate.first = aggregate.last
|
||||
aggregate.rollOver = aggregate.rollOver + 1
|
||||
d.cache[id] = aggregate
|
||||
d.Log.Debugf("Roll-Over %q for the %d time.", aggregate.name, aggregate.rollOver)
|
||||
} else {
|
||||
delete(d.cache, id)
|
||||
d.Log.Debugf("Removed %q from cache.", aggregate.name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Derivative) Init() error {
|
||||
d.Suffix = strings.TrimSpace(d.Suffix)
|
||||
d.Variable = strings.TrimSpace(d.Variable)
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
aggregators.Add("derivative", func() telegraf.Aggregator {
|
||||
return NewDerivative()
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,404 @@
|
|||
package derivative
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
var start, _ = metric.New("TestMetric",
|
||||
map[string]string{"state": "full"},
|
||||
map[string]interface{}{
|
||||
"increasing": int64(0),
|
||||
"decreasing": int64(100),
|
||||
"unchanged": int64(42),
|
||||
"ignored": "strings are not supported",
|
||||
"parameter": float64(0.0),
|
||||
},
|
||||
time.Now(),
|
||||
)
|
||||
|
||||
var finish, _ = metric.New("TestMetric",
|
||||
map[string]string{"state": "full"},
|
||||
map[string]interface{}{
|
||||
"increasing": int64(1000),
|
||||
"decreasing": int64(0),
|
||||
"unchanged": int64(42),
|
||||
"ignored": "strings are not supported",
|
||||
"parameter": float64(10.0),
|
||||
},
|
||||
time.Now().Add(time.Second),
|
||||
)
|
||||
|
||||
func TestTwoFullEventsWithParameter(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
derivative := &Derivative{
|
||||
Variable: "parameter",
|
||||
Suffix: "_by_parameter",
|
||||
cache: make(map[uint64]*aggregate),
|
||||
}
|
||||
derivative.Log = testutil.Logger{}
|
||||
derivative.Init()
|
||||
|
||||
derivative.Add(start)
|
||||
derivative.Add(finish)
|
||||
derivative.Push(&acc)
|
||||
|
||||
expectedFields := map[string]interface{}{
|
||||
"increasing_by_parameter": 100.0,
|
||||
"decreasing_by_parameter": -10.0,
|
||||
"unchanged_by_parameter": 0.0,
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"state": "full",
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "TestMetric", expectedFields, expectedTags)
|
||||
}
|
||||
|
||||
func TestTwoFullEventsWithParameterReverseSequence(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
derivative := &Derivative{
|
||||
Variable: "parameter",
|
||||
Suffix: "_by_parameter",
|
||||
cache: make(map[uint64]*aggregate),
|
||||
}
|
||||
derivative.Log = testutil.Logger{}
|
||||
derivative.Init()
|
||||
|
||||
derivative.Add(finish)
|
||||
derivative.Add(start)
|
||||
derivative.Push(&acc)
|
||||
|
||||
expectedFields := map[string]interface{}{
|
||||
"increasing_by_parameter": 100.0,
|
||||
"decreasing_by_parameter": -10.0,
|
||||
"unchanged_by_parameter": 0.0,
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"state": "full",
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "TestMetric", expectedFields, expectedTags)
|
||||
}
|
||||
|
||||
func TestTwoFullEventsWithoutParameter(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
derivative := NewDerivative()
|
||||
derivative.Log = testutil.Logger{}
|
||||
derivative.Init()
|
||||
|
||||
startTime := time.Now()
|
||||
duration, _ := time.ParseDuration("2s")
|
||||
endTime := startTime.Add(duration)
|
||||
|
||||
first, _ := metric.New("One Field",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": int64(10),
|
||||
},
|
||||
startTime,
|
||||
)
|
||||
last, _ := metric.New("One Field",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": int64(20),
|
||||
},
|
||||
endTime,
|
||||
)
|
||||
|
||||
derivative.Add(first)
|
||||
derivative.Add(last)
|
||||
derivative.Push(&acc)
|
||||
|
||||
acc.AssertContainsFields(t,
|
||||
"One Field",
|
||||
map[string]interface{}{
|
||||
"value_rate": float64(5),
|
||||
},
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
func TestTwoFullEventsInSeperatePushes(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
derivative := &Derivative{
|
||||
Variable: " parameter",
|
||||
Suffix: "_wrt_parameter",
|
||||
MaxRollOver: 10,
|
||||
cache: make(map[uint64]*aggregate),
|
||||
}
|
||||
derivative.Log = testutil.Logger{}
|
||||
derivative.Init()
|
||||
|
||||
derivative.Add(start)
|
||||
derivative.Push(&acc)
|
||||
|
||||
acc.AssertDoesNotContainMeasurement(t, "TestMetric")
|
||||
|
||||
acc.ClearMetrics()
|
||||
|
||||
derivative.Add(finish)
|
||||
derivative.Push(&acc)
|
||||
|
||||
expectedFields := map[string]interface{}{
|
||||
"increasing_wrt_parameter": 100.0,
|
||||
"decreasing_wrt_parameter": -10.0,
|
||||
"unchanged_wrt_parameter": 0.0,
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"state": "full",
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "TestMetric", expectedFields, expectedTags)
|
||||
}
|
||||
|
||||
func TestTwoFullEventsInSeperatePushesWithSeveralRollOvers(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
derivative := &Derivative{
|
||||
Variable: "parameter",
|
||||
Suffix: "_wrt_parameter",
|
||||
MaxRollOver: 10,
|
||||
cache: make(map[uint64]*aggregate),
|
||||
}
|
||||
derivative.Log = testutil.Logger{}
|
||||
derivative.Init()
|
||||
|
||||
derivative.Add(start)
|
||||
derivative.Push(&acc)
|
||||
|
||||
acc.AssertDoesNotContainMeasurement(t, "TestMetric")
|
||||
|
||||
derivative.Push(&acc)
|
||||
derivative.Push(&acc)
|
||||
derivative.Push(&acc)
|
||||
|
||||
derivative.Add(finish)
|
||||
derivative.Push(&acc)
|
||||
|
||||
expectedFields := map[string]interface{}{
|
||||
"increasing_wrt_parameter": 100.0,
|
||||
"decreasing_wrt_parameter": -10.0,
|
||||
"unchanged_wrt_parameter": 0.0,
|
||||
}
|
||||
|
||||
acc.AssertContainsFields(t, "TestMetric", expectedFields)
|
||||
}
|
||||
|
||||
func TestTwoFullEventsInSeperatePushesWithOutRollOver(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
derivative := &Derivative{
|
||||
Variable: "parameter",
|
||||
Suffix: "_by_parameter",
|
||||
MaxRollOver: 0,
|
||||
cache: make(map[uint64]*aggregate),
|
||||
}
|
||||
derivative.Log = testutil.Logger{}
|
||||
derivative.Init()
|
||||
|
||||
derivative.Add(start)
|
||||
// This test relies on RunningAggregator always callining Reset after Push
|
||||
// to remove the first metric after max-rollover of 0 has been reached.
|
||||
derivative.Push(&acc)
|
||||
derivative.Reset()
|
||||
|
||||
acc.AssertDoesNotContainMeasurement(t, "TestMetric")
|
||||
|
||||
acc.ClearMetrics()
|
||||
derivative.Add(finish)
|
||||
derivative.Push(&acc)
|
||||
|
||||
acc.AssertDoesNotContainMeasurement(t, "TestMetric")
|
||||
}
|
||||
|
||||
func TestIgnoresMissingVariable(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
derivative := &Derivative{
|
||||
Variable: "parameter",
|
||||
Suffix: "_by_parameter",
|
||||
cache: make(map[uint64]*aggregate),
|
||||
}
|
||||
derivative.Log = testutil.Logger{}
|
||||
derivative.Init()
|
||||
|
||||
noParameter, _ := metric.New("TestMetric",
|
||||
map[string]string{"state": "no_parameter"},
|
||||
map[string]interface{}{
|
||||
"increasing": int64(100),
|
||||
"decreasing": int64(0),
|
||||
"unchanged": int64(42),
|
||||
},
|
||||
time.Now(),
|
||||
)
|
||||
|
||||
derivative.Add(noParameter)
|
||||
derivative.Push(&acc)
|
||||
|
||||
acc.AssertDoesNotContainMeasurement(t, "TestMetric")
|
||||
|
||||
acc.ClearMetrics()
|
||||
derivative.Add(noParameter)
|
||||
derivative.Add(start)
|
||||
derivative.Add(noParameter)
|
||||
derivative.Add(finish)
|
||||
derivative.Add(noParameter)
|
||||
derivative.Push(&acc)
|
||||
expectedFields := map[string]interface{}{
|
||||
"increasing_by_parameter": 100.0,
|
||||
"decreasing_by_parameter": -10.0,
|
||||
"unchanged_by_parameter": 0.0,
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"state": "full",
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "TestMetric", expectedFields, expectedTags)
|
||||
}
|
||||
|
||||
func TestMergesDifferenMetricsWithSameHash(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
derivative := NewDerivative()
|
||||
derivative.Log = testutil.Logger{}
|
||||
derivative.Init()
|
||||
|
||||
startTime := time.Now()
|
||||
duration, _ := time.ParseDuration("2s")
|
||||
endTime := startTime.Add(duration)
|
||||
part1, _ := metric.New("TestMetric",
|
||||
map[string]string{"state": "full"},
|
||||
map[string]interface{}{"field1": int64(10)},
|
||||
startTime,
|
||||
)
|
||||
part2, _ := metric.New("TestMetric",
|
||||
map[string]string{"state": "full"},
|
||||
map[string]interface{}{"field2": int64(20)},
|
||||
startTime,
|
||||
)
|
||||
final, _ := metric.New("TestMetric",
|
||||
map[string]string{"state": "full"},
|
||||
map[string]interface{}{
|
||||
"field1": int64(30),
|
||||
"field2": int64(30),
|
||||
},
|
||||
endTime,
|
||||
)
|
||||
|
||||
derivative.Add(part1)
|
||||
derivative.Push(&acc)
|
||||
derivative.Add(part2)
|
||||
derivative.Push(&acc)
|
||||
derivative.Add(final)
|
||||
derivative.Push(&acc)
|
||||
|
||||
expectedFields := map[string]interface{}{
|
||||
"field1_rate": 10.0,
|
||||
"field2_rate": 5.0,
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"state": "full",
|
||||
}
|
||||
|
||||
acc.AssertContainsTaggedFields(t, "TestMetric", expectedFields, expectedTags)
|
||||
}
|
||||
|
||||
func TestDropsAggregatesOnMaxRollOver(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
derivative := &Derivative{
|
||||
MaxRollOver: 1,
|
||||
cache: make(map[uint64]*aggregate),
|
||||
}
|
||||
derivative.Log = testutil.Logger{}
|
||||
derivative.Init()
|
||||
|
||||
derivative.Add(start)
|
||||
derivative.Push(&acc)
|
||||
derivative.Reset()
|
||||
derivative.Push(&acc)
|
||||
derivative.Reset()
|
||||
derivative.Add(finish)
|
||||
derivative.Push(&acc)
|
||||
derivative.Reset()
|
||||
|
||||
acc.AssertDoesNotContainMeasurement(t, "TestMetric")
|
||||
}
|
||||
|
||||
func TestAddMetricsResetsRollOver(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
derivative := &Derivative{
|
||||
Variable: "parameter",
|
||||
Suffix: "_by_parameter",
|
||||
MaxRollOver: 1,
|
||||
cache: make(map[uint64]*aggregate),
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
derivative.Init()
|
||||
|
||||
derivative.Add(start)
|
||||
derivative.Push(&acc)
|
||||
derivative.Reset()
|
||||
derivative.Add(start)
|
||||
derivative.Reset()
|
||||
derivative.Add(finish)
|
||||
derivative.Push(&acc)
|
||||
|
||||
expectedFields := map[string]interface{}{
|
||||
"increasing_by_parameter": 100.0,
|
||||
"decreasing_by_parameter": -10.0,
|
||||
"unchanged_by_parameter": 0.0,
|
||||
}
|
||||
|
||||
acc.AssertContainsFields(t, "TestMetric", expectedFields)
|
||||
}
|
||||
|
||||
func TestCalculatesCorrectDerivativeOnTwoConsecutivePeriods(t *testing.T) {
|
||||
acc := testutil.Accumulator{}
|
||||
period, _ := time.ParseDuration("10s")
|
||||
derivative := NewDerivative()
|
||||
derivative.Log = testutil.Logger{}
|
||||
derivative.Init()
|
||||
|
||||
startTime := time.Now()
|
||||
first, _ := metric.New("One Field",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": int64(10),
|
||||
},
|
||||
startTime,
|
||||
)
|
||||
derivative.Add(first)
|
||||
derivative.Push(&acc)
|
||||
derivative.Reset()
|
||||
|
||||
second, _ := metric.New("One Field",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": int64(20),
|
||||
},
|
||||
startTime.Add(period),
|
||||
)
|
||||
derivative.Add(second)
|
||||
derivative.Push(&acc)
|
||||
derivative.Reset()
|
||||
|
||||
acc.AssertContainsFields(t, "One Field", map[string]interface{}{
|
||||
"value_rate": 1.0,
|
||||
})
|
||||
|
||||
acc.ClearMetrics()
|
||||
third, _ := metric.New("One Field",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": int64(40),
|
||||
},
|
||||
startTime.Add(period).Add(period),
|
||||
)
|
||||
derivative.Add(third)
|
||||
derivative.Push(&acc)
|
||||
derivative.Reset()
|
||||
|
||||
acc.AssertContainsFields(t, "One Field", map[string]interface{}{
|
||||
"value_rate": 2.0,
|
||||
})
|
||||
}
|
||||
Loading…
Reference in New Issue