From 722a265da6cb380a38f6e1f3f9110a53940e36c5 Mon Sep 17 00:00:00 2001 From: Ted M Lin Date: Tue, 22 Feb 2022 10:13:52 -0500 Subject: [PATCH] fix(dedup): Modifying slice while iterating is dangerous (#10684) --- plugins/processors/dedup/dedup.go | 20 ++++++++++++-------- plugins/processors/dedup/dedup_test.go | 12 ++++++++++++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/plugins/processors/dedup/dedup.go b/plugins/processors/dedup/dedup.go index 1ffe18325..46c12c608 100644 --- a/plugins/processors/dedup/dedup.go +++ b/plugins/processors/dedup/dedup.go @@ -27,12 +27,6 @@ func (d *Dedup) Description() string { return "Filter metrics with repeating field values" } -// Remove single item from slice -func remove(slice []telegraf.Metric, i int) []telegraf.Metric { - slice[len(slice)-1], slice[i] = slice[i], slice[len(slice)-1] - return slice[:len(slice)-1] -} - // Remove expired items from cache func (d *Dedup) cleanup() { // No need to cleanup cache too often. Lets save some CPU @@ -57,19 +51,24 @@ func (d *Dedup) save(metric telegraf.Metric, id uint64) { // main processing method func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric { - for idx, metric := range metrics { + idx := 0 + for _, metric := range metrics { id := metric.HashID() m, ok := d.Cache[id] // If not in cache then just save it if !ok { d.save(metric, id) + metrics[idx] = metric + idx++ continue } // If cache item has expired then refresh it if time.Since(m.Time()) >= time.Duration(d.DedupInterval) { d.save(metric, id) + metrics[idx] = metric + idx++ continue } @@ -103,16 +102,21 @@ func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric { // If any field value has changed then refresh the cache if changed { d.save(metric, id) + metrics[idx] = metric + idx++ continue } if sametime && added { + metrics[idx] = metric + idx++ continue } // In any other case remove metric from the output - metrics = remove(metrics, idx) + metric.Drop() } + metrics = metrics[:idx] d.cleanup() return metrics } diff --git a/plugins/processors/dedup/dedup_test.go b/plugins/processors/dedup/dedup_test.go index d5cc83192..7a2904689 100644 --- a/plugins/processors/dedup/dedup_test.go +++ b/plugins/processors/dedup/dedup_test.go @@ -197,3 +197,15 @@ func TestSameTimestamp(t *testing.T) { out = dedup.Apply(in) require.Equal(t, []telegraf.Metric{}, out) // drop } + +func TestSuppressMultipleRepeatedValue(t *testing.T) { + deduplicate := createDedup(time.Now()) + // Create metric in the past + source := createMetric(1, time.Now().Add(-1*time.Second)) + _ = deduplicate.Apply(source) + source = createMetric(1, time.Now()) + target := deduplicate.Apply(source, source, source, source) + + assertCacheHit(t, &deduplicate, source) + assertMetricSuppressed(t, target) +}