fix(agent): Respect processor order in file (#13614)

This commit is contained in:
Sven Rebhan 2023-07-14 16:12:24 +02:00 committed by GitHub
parent de8a9c514c
commit a72b859b13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 219 additions and 7 deletions

View File

@ -7,7 +7,6 @@ import (
"log"
"os"
"runtime"
"sort"
"sync"
"time"
@ -603,14 +602,16 @@ func (a *Agent) startProcessors(
dst chan<- telegraf.Metric,
processors models.RunningProcessors,
) (chan<- telegraf.Metric, []*processorUnit, error) {
// Sort from last to first
sort.SliceStable(processors, func(i, j int) bool {
return processors[i].Config.Order > processors[j].Config.Order
})
var src chan telegraf.Metric
units := make([]*processorUnit, 0, len(processors))
for _, processor := range processors {
// The processor chain is constructed from the output side starting from
// the output(s) and walking the way back to the input(s). However, the
// processor-list is sorted by order and/or by appearance in the config,
// i.e. in input-to-output direction. Therefore, reverse the processor list
// to reflect the order/definition order in the processing chain.
for i := len(processors) - 1; i >= 0; i-- {
processor := processors[i]
src = make(chan telegraf.Metric, 100)
acc := NewAccumulator(processor, dst)

View File

@ -1,14 +1,26 @@
package agent
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/models"
_ "github.com/influxdata/telegraf/plugins/aggregators/all"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
"github.com/influxdata/telegraf/plugins/parsers/influx"
_ "github.com/influxdata/telegraf/plugins/processors/all"
"github.com/influxdata/telegraf/testutil"
)
func TestAgent_OmitHostname(t *testing.T) {
@ -165,3 +177,86 @@ func TestWindow(t *testing.T) {
})
}
}
func TestCases(t *testing.T) {
// Get all directories in testcases
folders, err := os.ReadDir("testcases")
require.NoError(t, err)
// Make sure tests contains data
require.NotEmpty(t, folders)
for _, f := range folders {
// Only handle folders
if !f.IsDir() {
continue
}
fname := f.Name()
testdataPath := filepath.Join("testcases", fname)
configFilename := filepath.Join(testdataPath, "telegraf.conf")
expectedFilename := filepath.Join(testdataPath, "expected.out")
t.Run(fname, func(t *testing.T) {
// Get parser to parse input and expected output
parser := &influx.Parser{}
require.NoError(t, parser.Init())
expected, err := testutil.ParseMetricsFromFile(expectedFilename, parser)
require.NoError(t, err)
require.NotEmpty(t, expected)
// Load the config and inject the mock output to be able to verify
// the resulting metrics
cfg := config.NewConfig()
require.NoError(t, cfg.LoadAll(configFilename))
require.Empty(t, cfg.Outputs, "No output(s) allowed in the config!")
// Setup the agent and run the agent in "once" mode
agent := NewAgent(cfg)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
actual, err := collect(ctx, agent, 0)
require.NoError(t, err)
// Process expected metrics and compare with resulting metrics
options := []cmp.Option{
testutil.IgnoreTags("host"),
}
if expected[0].Time().IsZero() {
options = append(options, testutil.IgnoreTime())
}
testutil.RequireMetricsEqual(t, expected, actual, options...)
})
}
}
// Implement a "test-mode" like call but collect the metrics
func collect(ctx context.Context, a *Agent, wait time.Duration) ([]telegraf.Metric, error) {
var received []telegraf.Metric
var mu sync.Mutex
src := make(chan telegraf.Metric, 100)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for m := range src {
mu.Lock()
received = append(received, m)
mu.Unlock()
m.Reject()
}
}()
if err := a.runTest(ctx, wait, src); err != nil {
return nil, err
}
wg.Wait()
if models.GlobalGatherErrors.Get() != 0 {
return received, fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get())
}
return received, nil
}

View File

@ -0,0 +1,2 @@
new_metric_from_starlark,foo=bar baz=42i,timestamp="2023-07-13T12:53:54.197709713Z" 1689252834197709713
old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000

View File

@ -0,0 +1 @@
old_metric_from_mock,mood=good value=23i 1689253834000000000

View File

@ -0,0 +1,26 @@
# Test for using the appearance order in the file for processor order
[[inputs.file]]
files = ["testcases/processor-order-appearance/input.influx"]
data_format = "influx"
[[processors.starlark]]
source = '''
def apply(metric):
metrics = []
m = Metric("new_metric_from_starlark")
m.tags["foo"] = "bar"
m.fields["baz"] = 42
m.time = 1689252834197709713
metrics.append(m)
metrics.append(metric)
return metrics
'''
[[processors.date]]
field_key = "timestamp"
date_format = "2006-01-02T15:04:05.999999999Z"
timezone = "UTC"

View File

@ -0,0 +1,2 @@
new_metric_from_starlark,foo=bar baz=42i,timestamp="2023-07-13T12:53:54.197709713Z" 1689252834197709713
old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000

View File

@ -0,0 +1 @@
old_metric_from_mock,mood=good value=23i 1689253834000000000

View File

@ -0,0 +1,27 @@
# Test for specifying an explicit processor order
[[inputs.file]]
files = ["testcases/processor-order-explicit/input.influx"]
data_format = "influx"
[[processors.date]]
field_key = "timestamp"
date_format = "2006-01-02T15:04:05.999999999Z"
timezone = "UTC"
order = 2
[[processors.starlark]]
source = '''
def apply(metric):
metrics = []
m = Metric("new_metric_from_starlark")
m.tags["foo"] = "bar"
m.fields["baz"] = 42
m.time = 1689252834197709713
metrics.append(m)
metrics.append(metric)
return metrics
'''
order = 1

View File

@ -0,0 +1,2 @@
new_metric_from_starlark,foo=bar baz=42i,timestamp="2023-07-13T12:53:54.197709713Z" 1689252834197709713
old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000

View File

@ -0,0 +1 @@
old_metric_from_mock,mood=good value=23i 1689253834000000000

View File

@ -0,0 +1,25 @@
# Test for using the appearance order in the file for processor order
[[inputs.file]]
files = ["testcases/processor-order-appearance/input.influx"]
data_format = "influx"
[[processors.starlark]]
source = '''
def apply(metric):
metrics = []
m = Metric("new_metric_from_starlark")
m.tags["foo"] = "bar"
m.fields["baz"] = 42
m.time = 1689252834197709713
metrics.append(m)
metrics.append(metric)
return metrics
'''
[[processors.date]]
field_key = "timestamp"
date_format = "2006-01-02T15:04:05.999999999Z"
timezone = "UTC"
order = 1

View File

@ -0,0 +1,2 @@
new_metric_from_starlark,foo=bar baz=42i 1689252834197709713
old_metric_from_mock,mood=good value=23i,timestamp="2023-07-13T13:10:34Z" 1689253834000000000

View File

@ -0,0 +1 @@
old_metric_from_mock,mood=good value=23i 1689253834000000000

View File

@ -0,0 +1,26 @@
# Test for using the appearance order in the file for processor order.
# This will not add the "timestamp" field as the starlark processor runs _after_
# the date processor.
[[inputs.file]]
files = ["testcases/processor-order-no-starlark/input.influx"]
data_format = "influx"
[[processors.date]]
field_key = "timestamp"
date_format = "2006-01-02T15:04:05.999999999Z"
timezone = "UTC"
[[processors.starlark]]
source = '''
def apply(metric):
metrics = []
m = Metric("new_metric_from_starlark")
m.tags["foo"] = "bar"
m.fields["baz"] = 42
m.time = 1689252834197709713
metrics.append(m)
metrics.append(metric)
return metrics
'''