fix(processors.lookup): Fix tracking metrics (#13092)
This commit is contained in:
parent
2fed77e02a
commit
3c3c0d8352
|
|
@ -150,6 +150,11 @@ func (m *trackingMetric) decr() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unwrap allows to access the underlying metric directly e.g. for go-templates
|
||||||
|
func (m *trackingMetric) Unwrap() telegraf.Metric {
|
||||||
|
return m.Metric
|
||||||
|
}
|
||||||
|
|
||||||
type deliveryInfo struct {
|
type deliveryInfo struct {
|
||||||
id telegraf.TrackingID
|
id telegraf.TrackingID
|
||||||
accepted int
|
accepted int
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,8 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/processors"
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type unwrappableMetric interface{ Unwrap() telegraf.Metric }
|
||||||
|
|
||||||
//go:embed sample.conf
|
//go:embed sample.conf
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
|
|
@ -64,7 +66,14 @@ func (p *Processor) Init() error {
|
||||||
|
|
||||||
func (p *Processor) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
func (p *Processor) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
out := make([]telegraf.Metric, 0, len(in))
|
out := make([]telegraf.Metric, 0, len(in))
|
||||||
for _, m := range in {
|
for _, raw := range in {
|
||||||
|
var m telegraf.Metric
|
||||||
|
if wm, ok := raw.(unwrappableMetric); ok {
|
||||||
|
m = wm.Unwrap()
|
||||||
|
} else {
|
||||||
|
m = raw
|
||||||
|
}
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
if err := p.tmpl.Execute(&buf, m); err != nil {
|
if err := p.tmpl.Execute(&buf, m); err != nil {
|
||||||
p.Log.Errorf("generating key failed: %v", err)
|
p.Log.Errorf("generating key failed: %v", err)
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
"github.com/influxdata/telegraf/plugins/processors"
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
@ -92,3 +93,66 @@ func TestCases(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCasesTracking(t *testing.T) {
|
||||||
|
// Get all directories in testcases
|
||||||
|
folders, err := os.ReadDir("testcases")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Make sure tests contains data
|
||||||
|
require.NotEmpty(t, folders)
|
||||||
|
|
||||||
|
// Set up for file inputs
|
||||||
|
processors.Add("lookup", func() telegraf.Processor {
|
||||||
|
return &Processor{Log: testutil.Logger{}}
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, f := range folders {
|
||||||
|
// Only handle folders
|
||||||
|
if !f.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fname := f.Name()
|
||||||
|
testdataPath := filepath.Join("testcases", fname)
|
||||||
|
configFilename := filepath.Join(testdataPath, "telegraf.conf")
|
||||||
|
inputFilename := filepath.Join(testdataPath, "input.influx")
|
||||||
|
expectedFilename := filepath.Join(testdataPath, "expected.out")
|
||||||
|
|
||||||
|
t.Run(fname, func(t *testing.T) {
|
||||||
|
// Get parser to parse input and expected output
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
|
inputRaw, err := testutil.ParseMetricsFromFile(inputFilename, parser)
|
||||||
|
require.NoError(t, err)
|
||||||
|
input := make([]telegraf.Metric, 0, len(inputRaw))
|
||||||
|
notify := func(_ telegraf.DeliveryInfo) {}
|
||||||
|
for _, m := range inputRaw {
|
||||||
|
tm, _ := metric.WithTracking(m, notify)
|
||||||
|
input = append(input, tm)
|
||||||
|
}
|
||||||
|
|
||||||
|
var expected []telegraf.Metric
|
||||||
|
if _, err := os.Stat(expectedFilename); err == nil {
|
||||||
|
var err error
|
||||||
|
expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure the plugin
|
||||||
|
cfg := config.NewConfig()
|
||||||
|
require.NoError(t, cfg.LoadConfig(configFilename))
|
||||||
|
require.Len(t, cfg.Processors, 1, "wrong number of processors")
|
||||||
|
|
||||||
|
type unwrappableProcessor interface{ Unwrap() telegraf.Processor }
|
||||||
|
proc := cfg.Processors[0].Processor.(unwrappableProcessor)
|
||||||
|
plugin := proc.Unwrap().(*Processor)
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
// Process expected metrics and compare with resulting metrics
|
||||||
|
actual := plugin.Apply(input...)
|
||||||
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
test,lutkey=peters_friend,location=home value=42i 1678124473000000123
|
||||||
|
other,status=alert value=-1i 1678124473000000575
|
||||||
|
test,lutkey=marys_son,location=party value=666i 1678124473000000944
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
test,lutkey=peters_friend value=42i 1678124473000000123
|
||||||
|
other,status=alert value=-1i 1678124473000000575
|
||||||
|
test,lutkey=marys_son value=666i 1678124473000000944
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
{
|
||||||
|
"peters_friend": {
|
||||||
|
"location": "home"
|
||||||
|
},
|
||||||
|
"marys_son": {
|
||||||
|
"location": "party"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
[[processors.lookup]]
|
||||||
|
files = ["testcases/non_existing_tag/lut.json"]
|
||||||
|
key = '{{.Tag "lutkey"}}'
|
||||||
Loading…
Reference in New Issue