feat(outputs): Only copy metric if its not filtered out (#15883)
This commit is contained in:
parent
2b307c789a
commit
8561dedb07
|
|
@ -873,10 +873,10 @@ func (a *Agent) runOutputs(
|
||||||
|
|
||||||
for metric := range unit.src {
|
for metric := range unit.src {
|
||||||
for i, output := range unit.outputs {
|
for i, output := range unit.outputs {
|
||||||
if i == len(a.Config.Outputs)-1 {
|
if i == len(unit.outputs)-1 {
|
||||||
output.AddMetric(metric)
|
output.AddMetricNoCopy(metric)
|
||||||
} else {
|
} else {
|
||||||
output.AddMetric(metric.Copy())
|
output.AddMetric(metric)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -207,8 +207,22 @@ func (r *RunningOutput) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddMetric adds a metric to the output.
|
// AddMetric adds a metric to the output.
|
||||||
// Takes ownership of metric
|
// The given metric will be copied if the output selects the metric.
|
||||||
func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
|
func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
|
||||||
|
ok, err := r.Config.Filter.Select(metric)
|
||||||
|
if err != nil {
|
||||||
|
r.log.Errorf("filtering failed: %v", err)
|
||||||
|
} else if !ok {
|
||||||
|
r.MetricsFiltered.Incr(1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.add(metric.Copy())
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddMetricNoCopy adds a metric to the output.
|
||||||
|
// Takes ownership of metric regardless of whether the output selects it for outputting.
|
||||||
|
func (r *RunningOutput) AddMetricNoCopy(metric telegraf.Metric) {
|
||||||
ok, err := r.Config.Filter.Select(metric)
|
ok, err := r.Config.Filter.Select(metric)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.log.Errorf("filtering failed: %v", err)
|
r.log.Errorf("filtering failed: %v", err)
|
||||||
|
|
@ -217,6 +231,10 @@ func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.add(metric)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RunningOutput) add(metric telegraf.Metric) {
|
||||||
r.Config.Filter.Modify(metric)
|
r.Config.Filter.Modify(metric)
|
||||||
if len(metric.FieldList()) == 0 {
|
if len(metric.FieldList()) == 0 {
|
||||||
r.metricFiltered(metric)
|
r.metricFiltered(metric)
|
||||||
|
|
|
||||||
|
|
@ -196,6 +196,7 @@ func TestServeHTTP(t *testing.T) {
|
||||||
for m := range d {
|
for m := range d {
|
||||||
ro.AddMetric(m)
|
ro.AddMetric(m)
|
||||||
ro.Write() //nolint:errcheck // test will fail anyway if the write fails
|
ro.Write() //nolint:errcheck // test will fail anyway if the write fails
|
||||||
|
m.Accept()
|
||||||
}
|
}
|
||||||
}(dst)
|
}(dst)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue