fix(processors.lookup): Do not strip tracking info (#13301)
This commit is contained in:
parent
ef8484aab3
commit
5d5147d186
|
|
@ -67,27 +67,21 @@ func (p *Processor) Init() error {
|
||||||
func (p *Processor) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
func (p *Processor) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
out := make([]telegraf.Metric, 0, len(in))
|
out := make([]telegraf.Metric, 0, len(in))
|
||||||
for _, raw := range in {
|
for _, raw := range in {
|
||||||
var m telegraf.Metric
|
m := raw
|
||||||
if wm, ok := raw.(unwrappableMetric); ok {
|
if wm, ok := raw.(unwrappableMetric); ok {
|
||||||
m = wm.Unwrap()
|
m = wm.Unwrap()
|
||||||
} else {
|
|
||||||
m = raw
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
if err := p.tmpl.Execute(&buf, m); err != nil {
|
if err := p.tmpl.Execute(&buf, m); err != nil {
|
||||||
p.Log.Errorf("generating key failed: %v", err)
|
p.Log.Errorf("generating key failed: %v", err)
|
||||||
p.Log.Debugf("metric was %v", m)
|
p.Log.Debugf("metric was %v", m)
|
||||||
out = append(out, m)
|
} else if tags, found := p.mappings[buf.String()]; found {
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if tags, found := p.mappings[buf.String()]; found {
|
|
||||||
for _, tag := range tags {
|
for _, tag := range tags {
|
||||||
m.AddTag(tag.Key, tag.Value)
|
m.AddTag(tag.Key, tag.Value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
out = append(out, m)
|
out = append(out, raw)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,9 @@ package lookup
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
|
@ -126,8 +128,15 @@ func TestCasesTracking(t *testing.T) {
|
||||||
|
|
||||||
inputRaw, err := testutil.ParseMetricsFromFile(inputFilename, parser)
|
inputRaw, err := testutil.ParseMetricsFromFile(inputFilename, parser)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var mu sync.Mutex
|
||||||
|
delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw))
|
||||||
|
notify := func(di telegraf.DeliveryInfo) {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
delivered = append(delivered, di)
|
||||||
|
}
|
||||||
input := make([]telegraf.Metric, 0, len(inputRaw))
|
input := make([]telegraf.Metric, 0, len(inputRaw))
|
||||||
notify := func(_ telegraf.DeliveryInfo) {}
|
|
||||||
for _, m := range inputRaw {
|
for _, m := range inputRaw {
|
||||||
tm, _ := metric.WithTracking(m, notify)
|
tm, _ := metric.WithTracking(m, notify)
|
||||||
input = append(input, tm)
|
input = append(input, tm)
|
||||||
|
|
@ -153,6 +162,18 @@ func TestCasesTracking(t *testing.T) {
|
||||||
// Process expected metrics and compare with resulting metrics
|
// Process expected metrics and compare with resulting metrics
|
||||||
actual := plugin.Apply(input...)
|
actual := plugin.Apply(input...)
|
||||||
testutil.RequireMetricsEqual(t, expected, actual)
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
|
||||||
|
// Simulate output acknowledging delivery
|
||||||
|
for _, m := range input {
|
||||||
|
m.Accept()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check delivery
|
||||||
|
require.Eventuallyf(t, func() bool {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
return len(expected) == len(delivered)
|
||||||
|
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue