From 8561dedb079ce7f8858cff1f43d5a00ddf8b27c9 Mon Sep 17 00:00:00 2001 From: Lars Stegman Date: Wed, 2 Oct 2024 21:11:26 +0200 Subject: [PATCH] feat(outputs): Only copy metric if its not filtered out (#15883) --- agent/agent.go | 6 +++--- models/running_output.go | 20 ++++++++++++++++++- .../cloud_pubsub_push_test.go | 1 + 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 5d7cc033f..1e1b26b5e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -873,10 +873,10 @@ func (a *Agent) runOutputs( for metric := range unit.src { for i, output := range unit.outputs { - if i == len(a.Config.Outputs)-1 { - output.AddMetric(metric) + if i == len(unit.outputs)-1 { + output.AddMetricNoCopy(metric) } else { - output.AddMetric(metric.Copy()) + output.AddMetric(metric) } } } diff --git a/models/running_output.go b/models/running_output.go index 4cdd4d15a..c89f78c6d 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -207,8 +207,22 @@ func (r *RunningOutput) Close() { } // AddMetric adds a metric to the output. -// Takes ownership of metric +// The given metric will be copied if the output selects the metric. func (r *RunningOutput) AddMetric(metric telegraf.Metric) { + ok, err := r.Config.Filter.Select(metric) + if err != nil { + r.log.Errorf("filtering failed: %v", err) + } else if !ok { + r.MetricsFiltered.Incr(1) + return + } + + r.add(metric.Copy()) +} + +// AddMetricNoCopy adds a metric to the output. +// Takes ownership of metric regardless of whether the output selects it for outputting. +func (r *RunningOutput) AddMetricNoCopy(metric telegraf.Metric) { ok, err := r.Config.Filter.Select(metric) if err != nil { r.log.Errorf("filtering failed: %v", err) @@ -217,6 +231,10 @@ func (r *RunningOutput) AddMetric(metric telegraf.Metric) { return } + r.add(metric) +} + +func (r *RunningOutput) add(metric telegraf.Metric) { r.Config.Filter.Modify(metric) if len(metric.FieldList()) == 0 { r.metricFiltered(metric) diff --git a/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push_test.go b/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push_test.go index 252b843fc..9e8aa07d1 100644 --- a/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push_test.go +++ b/plugins/inputs/cloud_pubsub_push/cloud_pubsub_push_test.go @@ -196,6 +196,7 @@ func TestServeHTTP(t *testing.T) { for m := range d { ro.AddMetric(m) ro.Write() //nolint:errcheck // test will fail anyway if the write fails + m.Accept() } }(dst)