fix(dedup): Modifying slice while iterating is dangerous (#10684)
This commit is contained in:
parent
77390b6495
commit
722a265da6
|
|
@ -27,12 +27,6 @@ func (d *Dedup) Description() string {
|
||||||
return "Filter metrics with repeating field values"
|
return "Filter metrics with repeating field values"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove single item from slice
|
|
||||||
func remove(slice []telegraf.Metric, i int) []telegraf.Metric {
|
|
||||||
slice[len(slice)-1], slice[i] = slice[i], slice[len(slice)-1]
|
|
||||||
return slice[:len(slice)-1]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove expired items from cache
|
// Remove expired items from cache
|
||||||
func (d *Dedup) cleanup() {
|
func (d *Dedup) cleanup() {
|
||||||
// No need to cleanup cache too often. Lets save some CPU
|
// No need to cleanup cache too often. Lets save some CPU
|
||||||
|
|
@ -57,19 +51,24 @@ func (d *Dedup) save(metric telegraf.Metric, id uint64) {
|
||||||
|
|
||||||
// main processing method
|
// main processing method
|
||||||
func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
|
func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
|
||||||
for idx, metric := range metrics {
|
idx := 0
|
||||||
|
for _, metric := range metrics {
|
||||||
id := metric.HashID()
|
id := metric.HashID()
|
||||||
m, ok := d.Cache[id]
|
m, ok := d.Cache[id]
|
||||||
|
|
||||||
// If not in cache then just save it
|
// If not in cache then just save it
|
||||||
if !ok {
|
if !ok {
|
||||||
d.save(metric, id)
|
d.save(metric, id)
|
||||||
|
metrics[idx] = metric
|
||||||
|
idx++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// If cache item has expired then refresh it
|
// If cache item has expired then refresh it
|
||||||
if time.Since(m.Time()) >= time.Duration(d.DedupInterval) {
|
if time.Since(m.Time()) >= time.Duration(d.DedupInterval) {
|
||||||
d.save(metric, id)
|
d.save(metric, id)
|
||||||
|
metrics[idx] = metric
|
||||||
|
idx++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -103,16 +102,21 @@ func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
|
||||||
// If any field value has changed then refresh the cache
|
// If any field value has changed then refresh the cache
|
||||||
if changed {
|
if changed {
|
||||||
d.save(metric, id)
|
d.save(metric, id)
|
||||||
|
metrics[idx] = metric
|
||||||
|
idx++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if sametime && added {
|
if sametime && added {
|
||||||
|
metrics[idx] = metric
|
||||||
|
idx++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// In any other case remove metric from the output
|
// In any other case remove metric from the output
|
||||||
metrics = remove(metrics, idx)
|
metric.Drop()
|
||||||
}
|
}
|
||||||
|
metrics = metrics[:idx]
|
||||||
d.cleanup()
|
d.cleanup()
|
||||||
return metrics
|
return metrics
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -197,3 +197,15 @@ func TestSameTimestamp(t *testing.T) {
|
||||||
out = dedup.Apply(in)
|
out = dedup.Apply(in)
|
||||||
require.Equal(t, []telegraf.Metric{}, out) // drop
|
require.Equal(t, []telegraf.Metric{}, out) // drop
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSuppressMultipleRepeatedValue(t *testing.T) {
|
||||||
|
deduplicate := createDedup(time.Now())
|
||||||
|
// Create metric in the past
|
||||||
|
source := createMetric(1, time.Now().Add(-1*time.Second))
|
||||||
|
_ = deduplicate.Apply(source)
|
||||||
|
source = createMetric(1, time.Now())
|
||||||
|
target := deduplicate.Apply(source, source, source, source)
|
||||||
|
|
||||||
|
assertCacheHit(t, &deduplicate, source)
|
||||||
|
assertMetricSuppressed(t, target)
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue