test(processors.dedup): Add unit-test for tracking metrics (#14745)

This commit is contained in:
Sven Rebhan 2024-02-12 15:43:37 +01:00 committed by GitHub
parent 298a1d4396
commit aa5d304e6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 433 additions and 185 deletions

View File

@ -1,6 +1,7 @@
package dedup
import (
"sync"
"testing"
"time"
@ -9,203 +10,450 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
const metricName = "m1"
func TestMetrics(t *testing.T) {
now := time.Now()
func createMetric(value int64, when time.Time) telegraf.Metric {
m := metric.New(metricName,
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": value},
when,
)
return m
}
tests := []struct {
name string
input []telegraf.Metric
expected []telegraf.Metric
cacheContent []telegraf.Metric
}{
{
name: "retain metric",
input: []telegraf.Metric{
metric.New("m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now,
),
},
expected: []telegraf.Metric{
metric.New("m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now,
),
},
cacheContent: []telegraf.Metric{
metric.New("m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now,
),
},
},
{
name: "suppress repeated metric",
input: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Second),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now,
),
},
expected: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Second),
),
},
cacheContent: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Second),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Second),
),
},
},
{
name: "pass updated metric",
input: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Second),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 2},
now,
),
},
expected: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Second),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 2},
now,
),
},
cacheContent: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Second),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 2},
now,
),
},
},
{
name: "pass after cache expired",
input: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Hour),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now,
),
},
expected: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Hour),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now,
),
},
cacheContent: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Hour),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now,
),
},
},
{
name: "cache retains metrics",
input: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-3*time.Hour),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-2*time.Hour),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now,
),
},
expected: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-3*time.Hour),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-2*time.Hour),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now,
),
},
cacheContent: []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-3*time.Hour),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-2*time.Hour),
),
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now,
),
},
},
{
name: "same timestamp",
input: []telegraf.Metric{
metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 1}, // field
now,
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 1}, // different field
now,
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 2}, // same field different value
now,
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 2}, // same field same value
now,
),
},
expected: []telegraf.Metric{
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 1},
now,
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 1},
now,
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 2},
now,
),
},
cacheContent: []telegraf.Metric{
metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 1},
now,
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 1, "bar": 1},
now,
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 2},
now,
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 2},
now,
),
},
},
}
func createDedup(initTime time.Time) Dedup {
return Dedup{
DedupInterval: config.Duration(10 * time.Minute),
FlushTime: initTime,
Cache: make(map[uint64]telegraf.Metric),
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create plugin instance
plugin := &Dedup{
DedupInterval: config.Duration(10 * time.Minute),
FlushTime: now.Add(-1 * time.Second),
Cache: make(map[uint64]telegraf.Metric),
}
// Feed the input metrics and record the outputs
var actual []telegraf.Metric
for i, m := range tt.input {
actual = append(actual, plugin.Apply(m)...)
// Check the cache content
if cm := tt.cacheContent[i]; cm == nil {
require.Empty(t, plugin.Cache)
} else {
id := m.HashID()
require.NotEmpty(t, plugin.Cache)
require.Contains(t, plugin.Cache, id)
testutil.RequireMetricEqual(t, cm, plugin.Cache[id])
}
}
// Check if we got the expected metrics
testutil.RequireMetricsEqual(t, tt.expected, actual)
})
}
}
func assertCacheRefresh(t *testing.T, proc *Dedup, item telegraf.Metric) {
id := item.HashID()
name := item.Name()
// cache is not empty
require.NotEmpty(t, proc.Cache)
// cache has metric with proper id
cache, present := proc.Cache[id]
require.True(t, present)
// cache has metric with proper name
require.Equal(t, name, cache.Name())
// cached metric has proper field
cValue, present := cache.GetField("value")
require.True(t, present)
iValue, _ := item.GetField("value")
require.Equal(t, cValue, iValue)
// cached metric has proper timestamp
require.Equal(t, cache.Time(), item.Time())
}
func assertCacheHit(t *testing.T, proc *Dedup, item telegraf.Metric) {
id := item.HashID()
name := item.Name()
// cache is not empty
require.NotEmpty(t, proc.Cache)
// cache has metric with proper id
cache, present := proc.Cache[id]
require.True(t, present)
// cache has metric with proper name
require.Equal(t, name, cache.Name())
// cached metric has proper field
cValue, present := cache.GetField("value")
require.True(t, present)
iValue, _ := item.GetField("value")
require.Equal(t, cValue, iValue)
// cached metric did NOT change timestamp
require.NotEqual(t, cache.Time(), item.Time())
}
func assertMetricPassed(t *testing.T, target []telegraf.Metric, source telegraf.Metric) {
// target is not empty
require.NotEmpty(t, target)
// target has metric with proper name
require.Equal(t, metricName, target[0].Name())
// target metric has proper field
tValue, present := target[0].GetField("value")
require.True(t, present)
sValue, present := source.GetField("value")
require.True(t, present)
require.Equal(t, tValue, sValue)
// target metric has proper timestamp
require.Equal(t, target[0].Time(), source.Time())
}
func assertMetricSuppressed(t *testing.T, target []telegraf.Metric) {
// target is empty
require.Empty(t, target)
}
func TestProcRetainsMetric(t *testing.T) {
deduplicate := createDedup(time.Now())
source := createMetric(1, time.Now())
target := deduplicate.Apply(source)
assertCacheRefresh(t, &deduplicate, source)
assertMetricPassed(t, target, source)
}
func TestSuppressRepeatedValue(t *testing.T) {
deduplicate := createDedup(time.Now())
// Create metric in the past
source := createMetric(1, time.Now().Add(-1*time.Second))
_ = deduplicate.Apply(source)
source = createMetric(1, time.Now())
target := deduplicate.Apply(source)
assertCacheHit(t, &deduplicate, source)
assertMetricSuppressed(t, target)
}
func TestPassUpdatedValue(t *testing.T) {
deduplicate := createDedup(time.Now())
// Create metric in the past
source := createMetric(1, time.Now().Add(-1*time.Second))
target := deduplicate.Apply(source)
assertMetricPassed(t, target, source)
source = createMetric(2, time.Now())
target = deduplicate.Apply(source)
assertCacheRefresh(t, &deduplicate, source)
assertMetricPassed(t, target, source)
}
func TestPassAfterCacheExpire(t *testing.T) {
deduplicate := createDedup(time.Now())
// Create metric in the past
source := createMetric(1, time.Now().Add(-1*time.Hour))
target := deduplicate.Apply(source)
assertMetricPassed(t, target, source)
source = createMetric(1, time.Now())
target = deduplicate.Apply(source)
assertCacheRefresh(t, &deduplicate, source)
assertMetricPassed(t, target, source)
}
func TestCacheRetainsMetrics(t *testing.T) {
deduplicate := createDedup(time.Now())
// Create metric in the past 3sec
source := createMetric(1, time.Now().Add(-3*time.Hour))
deduplicate.Apply(source)
// Create metric in the past 2sec
source = createMetric(1, time.Now().Add(-2*time.Hour))
deduplicate.Apply(source)
source = createMetric(1, time.Now())
deduplicate.Apply(source)
assertCacheRefresh(t, &deduplicate, source)
}
func TestCacheShrink(t *testing.T) {
// Time offset is more than 2 * DedupInterval
deduplicate := createDedup(time.Now().Add(-2 * time.Hour))
// Time offset is more than 1 * DedupInterval
source := createMetric(1, time.Now().Add(-1*time.Hour))
deduplicate.Apply(source)
require.Empty(t, deduplicate.Cache)
}
func TestSameTimestamp(t *testing.T) {
now := time.Now()
dedup := createDedup(now)
var in telegraf.Metric
var out []telegraf.Metric
in = metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 1}, // field
now,
)
out = dedup.Apply(in)
require.Equal(t, []telegraf.Metric{in}, out) // pass
// Time offset is more than 2 * DedupInterval
plugin := &Dedup{
DedupInterval: config.Duration(10 * time.Minute),
FlushTime: now.Add(-2 * time.Hour),
Cache: make(map[uint64]telegraf.Metric),
}
in = metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 1}, // different field
now,
)
out = dedup.Apply(in)
require.Equal(t, []telegraf.Metric{in}, out) // pass
in = metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 2}, // same field different value
now,
)
out = dedup.Apply(in)
require.Equal(t, []telegraf.Metric{in}, out) // pass
in = metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"bar": 2}, // same field same value
now,
)
out = dedup.Apply(in)
require.Equal(t, []telegraf.Metric{}, out) // drop
// Time offset is more than 1 * DedupInterval
input := []telegraf.Metric{
metric.New(
"m1",
map[string]string{"tag": "tag_value"},
map[string]interface{}{"value": 1},
now.Add(-1*time.Hour),
),
}
actual := plugin.Apply(input...)
expected := input
testutil.RequireMetricsEqual(t, expected, actual)
require.Empty(t, plugin.Cache)
}
func TestSuppressMultipleRepeatedValue(t *testing.T) {
deduplicate := createDedup(time.Now())
// Create metric in the past
source := createMetric(1, time.Now().Add(-1*time.Second))
_ = deduplicate.Apply(source)
source = createMetric(1, time.Now())
target := deduplicate.Apply(source, source, source, source)
func TestTracking(t *testing.T) {
now := time.Now()
assertCacheHit(t, &deduplicate, source)
assertMetricSuppressed(t, target)
inputRaw := []telegraf.Metric{
metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 1},
now.Add(-2*time.Second),
),
metric.New("metric",
map[string]string{"tag": "pass"},
map[string]interface{}{"foo": 1},
now.Add(-2*time.Second),
),
metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 1},
now.Add(-1*time.Second),
),
metric.New("metric",
map[string]string{"tag": "pass"},
map[string]interface{}{"foo": 1},
now.Add(-1*time.Second),
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 3},
now,
),
}
var mu sync.Mutex
delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw))
notify := func(di telegraf.DeliveryInfo) {
mu.Lock()
defer mu.Unlock()
delivered = append(delivered, di)
}
input := make([]telegraf.Metric, 0, len(inputRaw))
for _, m := range inputRaw {
tm, _ := metric.WithTracking(m, notify)
input = append(input, tm)
}
expected := []telegraf.Metric{
metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 1},
now.Add(-2*time.Second),
),
metric.New("metric",
map[string]string{"tag": "pass"},
map[string]interface{}{"foo": 1},
now.Add(-2*time.Second),
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 3},
now,
),
}
// Create plugin instance
plugin := &Dedup{
DedupInterval: config.Duration(10 * time.Minute),
FlushTime: now.Add(-1 * time.Second),
Cache: make(map[uint64]telegraf.Metric),
}
// Process expected metrics and compare with resulting metrics
actual := plugin.Apply(input...)
testutil.RequireMetricsEqual(t, expected, actual)
// Simulate output acknowledging delivery
for _, m := range actual {
m.Accept()
}
// Check delivery
require.Eventuallyf(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(input) == len(delivered)
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected))
}