fix(processors.execd): Detect line-protocol parser correctly (#16535)

This commit is contained in:
Sven Rebhan 2025-02-24 18:57:47 +01:00 committed by GitHub
parent 26fc051441
commit af999bd129
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 263 additions and 164 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/process"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/processors"
)
@ -32,12 +33,6 @@ type Execd struct {
process *process.Process
}
func New() *Execd {
return &Execd{
RestartDelay: config.Duration(10 * time.Second),
}
}
func (e *Execd) SetParser(p telegraf.Parser) {
e.parser = p
}
@ -101,7 +96,14 @@ func (e *Execd) Stop() {
func (e *Execd) cmdReadOut(out io.Reader) {
// Prefer using the StreamParser when parsing influx format.
if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser {
var parser telegraf.Parser
if rp, ok := e.parser.(*models.RunningParser); ok {
parser = rp.Parser
} else {
parser = e.parser
}
if _, isInfluxParser := parser.(*influx.Parser); isInfluxParser {
e.cmdReadOutStream(out)
return
}
@ -175,6 +177,8 @@ func (e *Execd) Init() error {
func init() {
processors.AddStreaming("execd", func() telegraf.StreamingProcessor {
return New()
return &Execd{
RestartDelay: config.Duration(10 * time.Second),
}
})
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
@ -24,101 +25,94 @@ import (
)
func TestExternalProcessorWorks(t *testing.T) {
e := New()
e.Log = testutil.Logger{}
// Determine name of the test executable for mocking an external program
exe, err := os.Executable()
require.NoError(t, err)
// Setup the plugin
plugin := &Execd{
Command: []string{
exe,
"-case", "multiply",
"-field", "count",
},
Environment: []string{"PLUGINS_PROCESSORS_EXECD_MODE=application"},
RestartDelay: config.Duration(5 * time.Second),
Log: testutil.Logger{},
}
// Setup the parser and serializer in the processor
parser := &influx.Parser{}
require.NoError(t, parser.Init())
e.SetParser(parser)
plugin.SetParser(parser)
serializer := &serializers_influx.Serializer{}
require.NoError(t, serializer.Init())
e.SetSerializer(serializer)
exe, err := os.Executable()
require.NoError(t, err)
t.Log(exe)
e.Command = []string{exe, "-countmultiplier"}
e.Environment = []string{"PLUGINS_PROCESSORS_EXECD_MODE=application", "FIELD_NAME=count"}
e.RestartDelay = config.Duration(5 * time.Second)
acc := &testutil.Accumulator{}
require.NoError(t, e.Start(acc))
plugin.SetSerializer(serializer)
// Setup the input and expected output metrucs
now := time.Now()
orig := now
var input []telegraf.Metric
var expected []telegraf.Metric
for i := 0; i < 10; i++ {
m := metric.New("test",
map[string]string{
"city": "Toronto",
},
map[string]interface{}{
"population": 6000000,
"count": 1,
},
now)
now = now.Add(1)
m := metric.New(
"test",
map[string]string{"city": "Toronto"},
map[string]interface{}{"population": 6000000, "count": 1},
now.Add(time.Duration(i)),
)
input = append(input, m)
require.NoError(t, e.Add(m, acc))
e := m.Copy()
e.AddField("count", 2)
expected = append(expected, e)
}
acc.Wait(1)
e.Stop()
acc.Wait(9)
metrics := acc.GetTelegrafMetrics()
m := metrics[0]
expected := testutil.MustMetric("test",
map[string]string{
"city": "Toronto",
},
map[string]interface{}{
"population": 6000000,
"count": 2,
},
orig,
)
testutil.RequireMetricEqual(t, expected, m)
metricTime := m.Time().UnixNano()
// make sure the other 9 are ordered properly
for i := 0; i < 9; i++ {
m = metrics[i+1]
require.EqualValues(t, metricTime+1, m.Time().UnixNano())
metricTime = m.Time().UnixNano()
// Perform the test and check the result
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
for _, m := range input {
require.NoError(t, plugin.Add(m, &acc))
}
require.Eventually(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, 3*time.Second, 100*time.Millisecond)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestParseLinesWithNewLines(t *testing.T) {
e := New()
e.Log = testutil.Logger{}
// Determine name of the test executable for mocking an external program
exe, err := os.Executable()
require.NoError(t, err)
// Setup the plugin
plugin := &Execd{
Command: []string{
exe,
"-case", "multiply",
"-field", "count",
},
Environment: []string{"PLUGINS_PROCESSORS_EXECD_MODE=application"},
RestartDelay: config.Duration(5 * time.Second),
Log: testutil.Logger{},
}
// Setup the parser and serializer in the processor
parser := &influx.Parser{}
require.NoError(t, parser.Init())
e.SetParser(parser)
plugin.SetParser(parser)
serializer := &serializers_influx.Serializer{}
require.NoError(t, serializer.Init())
e.SetSerializer(serializer)
exe, err := os.Executable()
require.NoError(t, err)
t.Log(exe)
e.Command = []string{exe, "-countmultiplier"}
e.Environment = []string{"PLUGINS_PROCESSORS_EXECD_MODE=application", "FIELD_NAME=count"}
e.RestartDelay = config.Duration(5 * time.Second)
acc := &testutil.Accumulator{}
require.NoError(t, e.Start(acc))
plugin.SetSerializer(serializer)
// Setup the input and expected output metrucs
now := time.Now()
orig := now
m := metric.New("test",
input := metric.New(
"test",
map[string]string{
"author": "Mr. Gopher",
},
@ -126,93 +120,90 @@ func TestParseLinesWithNewLines(t *testing.T) {
"phrase": "Gophers are amazing creatures.\nAbsolutely amazing.",
"count": 3,
},
now)
require.NoError(t, e.Add(m, acc))
acc.Wait(1)
e.Stop()
processedMetric := acc.GetTelegrafMetrics()[0]
expectedMetric := testutil.MustMetric("test",
map[string]string{
"author": "Mr. Gopher",
},
map[string]interface{}{
"phrase": "Gophers are amazing creatures.\nAbsolutely amazing.",
"count": 6,
},
orig,
now,
)
testutil.RequireMetricEqual(t, expectedMetric, processedMetric)
}
var countmultiplier = flag.Bool("countmultiplier", false,
"if true, act like line input program instead of test")
func TestMain(m *testing.M) {
flag.Parse()
runMode := os.Getenv("PLUGINS_PROCESSORS_EXECD_MODE")
if *countmultiplier && runMode == "application" {
runCountMultiplierProgram()
os.Exit(0)
expected := []telegraf.Metric{
metric.New(
"test",
map[string]string{"author": "Mr. Gopher"},
map[string]interface{}{
"phrase": "Gophers are amazing creatures.\nAbsolutely amazing.",
"count": 6,
},
now,
),
}
code := m.Run()
os.Exit(code)
// Perform the test and check the result
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
require.NoError(t, plugin.Add(input, &acc))
require.Eventually(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, 3*time.Second, 100*time.Millisecond)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func runCountMultiplierProgram() {
fieldName := os.Getenv("FIELD_NAME")
parser := influx.NewStreamParser(os.Stdin)
func TestLongLinesForLineProtocol(t *testing.T) {
// Determine name of the test executable for mocking an external program
exe, err := os.Executable()
require.NoError(t, err)
// Setup the plugin
plugin := &Execd{
Command: []string{
exe,
"-case", "long",
"-field", "long",
},
Environment: []string{"PLUGINS_PROCESSORS_EXECD_MODE=application"},
RestartDelay: config.Duration(5 * time.Second),
Log: testutil.Logger{},
}
// Setup the parser and serializer in the processor
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
serializer := &serializers_influx.Serializer{}
//nolint:errcheck // this should always succeed
serializer.Init()
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
for {
m, err := parser.Next()
if err != nil {
if errors.Is(err, influx.EOF) {
return // stream ended
}
var parseErr *influx.ParseError
if errors.As(err, &parseErr) {
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
//nolint:revive // os.Exit called intentionally
os.Exit(1)
}
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
//nolint:revive // os.Exit called intentionally
os.Exit(1)
}
c, found := m.GetField(fieldName)
if !found {
fmt.Fprintf(os.Stderr, "metric has no %s field\n", fieldName)
//nolint:revive // os.Exit called intentionally
os.Exit(1)
}
switch t := c.(type) {
case float64:
t *= 2
m.AddField(fieldName, t)
case int64:
t *= 2
m.AddField(fieldName, t)
default:
fmt.Fprintf(os.Stderr, "%s is not an unknown type, it's a %T\n", fieldName, c)
//nolint:revive // os.Exit called intentionally
os.Exit(1)
}
b, err := serializer.Serialize(m)
if err != nil {
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
//nolint:revive // os.Exit called intentionally
os.Exit(1)
}
fmt.Fprint(os.Stdout, string(b))
// Setup the input and expected output metrucs
now := time.Now()
input := metric.New(
"test",
map[string]string{"author": "Mr. Gopher"},
map[string]interface{}{"count": 3},
now,
)
expected := []telegraf.Metric{
metric.New(
"test",
map[string]string{"author": "Mr. Gopher"},
map[string]interface{}{
"long": strings.Repeat("foobar", 280_000/6),
"count": 3,
},
now,
),
}
// Perform the test and check the result
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
require.NoError(t, plugin.Add(input, &acc))
require.Eventually(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, 3*time.Second, 100*time.Millisecond)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestCases(t *testing.T) {
@ -225,7 +216,7 @@ func TestCases(t *testing.T) {
// Set up for file inputs
processors.AddStreaming("execd", func() telegraf.StreamingProcessor {
return New()
return &Execd{RestartDelay: config.Duration(10 * time.Second)}
})
for _, f := range folders {
@ -266,8 +257,6 @@ func TestCases(t *testing.T) {
plugin.Stop()
require.Eventually(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond)
@ -353,8 +342,12 @@ func TestTracking(t *testing.T) {
require.NoError(t, err)
plugin := &Execd{
Command: []string{exe, "-countmultiplier"},
Environment: []string{"PLUGINS_PROCESSORS_EXECD_MODE=application", "FIELD_NAME=count"},
Command: []string{
exe,
"-case", "multiply",
"-field", "count",
},
Environment: []string{"PLUGINS_PROCESSORS_EXECD_MODE=application"},
RestartDelay: config.Duration(5 * time.Second),
Log: testutil.Logger{},
}
@ -395,3 +388,105 @@ func TestTracking(t *testing.T) {
return len(input) == len(delivered)
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected))
}
func TestMain(m *testing.M) {
var testcase, field string
flag.StringVar(&testcase, "case", "", "test-case to mock [multiply, long]")
flag.StringVar(&field, "field", "count", "name of the field to multiply")
flag.Parse()
if os.Getenv("PLUGINS_PROCESSORS_EXECD_MODE") != "application" || testcase == "" {
os.Exit(m.Run())
}
switch testcase {
case "multiply":
os.Exit(runTestCaseMultiply(field))
case "long":
os.Exit(runTestCaseLong(field))
}
os.Exit(5)
}
func runTestCaseMultiply(field string) int {
parser := influx.NewStreamParser(os.Stdin)
serializer := &serializers_influx.Serializer{}
if err := serializer.Init(); err != nil {
fmt.Fprintf(os.Stderr, "initialization ERR %v\n", err)
return 1
}
for {
m, err := parser.Next()
if err != nil {
if errors.Is(err, influx.EOF) {
return 0
}
var parseErr *influx.ParseError
if errors.As(err, &parseErr) {
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
return 1
}
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
return 1
}
c, found := m.GetField(field)
if !found {
fmt.Fprintf(os.Stderr, "metric has no field %q\n", field)
return 1
}
switch t := c.(type) {
case float64:
m.AddField(field, t*2)
case int64:
m.AddField(field, t*2)
default:
fmt.Fprintf(os.Stderr, "%s has an unknown type, it's a %T\n", field, c)
return 1
}
b, err := serializer.Serialize(m)
if err != nil {
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
return 1
}
fmt.Fprint(os.Stdout, string(b))
}
}
func runTestCaseLong(field string) int {
parser := influx.NewStreamParser(os.Stdin)
serializer := &serializers_influx.Serializer{}
if err := serializer.Init(); err != nil {
fmt.Fprintf(os.Stderr, "initialization ERR %v\n", err)
return 1
}
// Setup a field with a lot of characters to exceed the scanner limit
long := strings.Repeat("foobar", 280_000/6)
for {
m, err := parser.Next()
if err != nil {
if errors.Is(err, influx.EOF) {
return 0
}
var parseErr *influx.ParseError
if errors.As(err, &parseErr) {
fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr)
return 1
}
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
return 1
}
m.AddField(field, long)
b, err := serializer.Serialize(m)
if err != nil {
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
return 1
}
fmt.Fprint(os.Stdout, string(b))
}
}