diff --git a/agent/agent.go b/agent/agent.go index 1e1b26b5e..25495b6a7 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -106,6 +106,11 @@ func (a *Agent) Run(ctx context.Context) error { time.Duration(a.Config.Agent.Interval), a.Config.Agent.Quiet, a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval)) + log.Printf("D! [agent] Initializing plugins") + if err := a.InitPlugins(); err != nil { + return err + } + if a.Config.Persister != nil { log.Printf("D! [agent] Initializing plugin states") if err := a.initPersister(); err != nil { @@ -119,11 +124,6 @@ func (a *Agent) Run(ctx context.Context) error { } } - log.Printf("D! [agent] Initializing plugins") - if err := a.InitPlugins(); err != nil { - return err - } - startTime := time.Now() log.Printf("D! [agent] Connecting outputs") diff --git a/docs/specs/tsd-003-state-persistence.md b/docs/specs/tsd-003-state-persistence.md index 15470ab4f..5a482f737 100644 --- a/docs/specs/tsd-003-state-persistence.md +++ b/docs/specs/tsd-003-state-persistence.md @@ -31,15 +31,17 @@ It is intended to The persistence will use the following steps: +- Compute an unique ID for each of the plugin _instances_ +- Startup Telegraf plugins calling `Init()`, etc. - Initialize persistence framework with the user specified `statefile` location and load the state if present - Determine all stateful plugin instances by fulfilling the `StatefulPlugin` interface -- Compute an unique ID for each of the plugin _instances_ - Restore plugin states (if any) for each plugin ID present in the state-file -- Startup Telegraf plugins calling `Init()`, etc. - Run data-collection etc... -- On shutdown, query the state of all registered stateful plugins state +- On shutdown, stopping all Telegraf plugins calling `Stop()` or `Close()` + depending on the plugin type +- Query the state of all registered stateful plugins state - Create an overall state-map with the plugin instance ID as a key and the serialized plugin state as value. - Marshal the overall state-map and store to disk @@ -85,7 +87,7 @@ for the overall state. On-disk, the overall state of Telegraf is stored as JSON. To restore the state of a plugin, the overall Telegraf state is first deserialized from the on-disk JSON data and a lookup for the plugin ID is performed in the resulting map. The value, if found, is then deserialized to the -plugin's state data-structure and provided to the plugin before calling `Init()`. +plugin's state data-structure and provided to the plugin after calling `Init()`. ## Is / Is-not diff --git a/plugins/common/starlark/starlark.go b/plugins/common/starlark/starlark.go index d44e99f0a..0f5be3ad7 100644 --- a/plugins/common/starlark/starlark.go +++ b/plugins/common/starlark/starlark.go @@ -25,6 +25,7 @@ type Common struct { StarlarkLoadFunc func(module string, logger telegraf.Logger) (starlark.StringDict, error) thread *starlark.Thread + builtins starlark.StringDict globals starlark.StringDict functions map[string]*starlark.Function parameters map[string]starlark.Tuple @@ -97,8 +98,9 @@ func (s *Common) SetState(state interface{}) error { return fmt.Errorf("state item %q cannot be set: %w", k, err) } } + s.builtins["state"] = s.state - return nil + return s.InitProgram() } func (s *Common) Init() error { @@ -109,44 +111,48 @@ func (s *Common) Init() error { return errors.New("both source or script cannot be set") } + s.builtins = starlark.StringDict{} + s.builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric) + s.builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy) + s.builtins["catch"] = starlark.NewBuiltin("catch", catch) + + if err := s.addConstants(&s.builtins); err != nil { + return err + } + + // Initialize the program + if err := s.InitProgram(); err != nil { + // Try again with a declared state. This might be necessary for + // state persistence. + s.state = starlark.NewDict(0) + s.builtins["state"] = s.state + if serr := s.InitProgram(); serr != nil { + return err + } + } + + s.functions = make(map[string]*starlark.Function) + s.parameters = make(map[string]starlark.Tuple) + + return nil +} + +func (s *Common) InitProgram() error { + // Load the program. In case of an error we can try to insert the state + // which can be used implicitly e.g. when persisting states + program, err := s.sourceProgram(s.builtins) + if err != nil { + return err + } + + // Execute source s.thread = &starlark.Thread{ Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) }, Load: func(_ *starlark.Thread, module string) (starlark.StringDict, error) { return s.StarlarkLoadFunc(module, s.Log) }, } - - builtins := starlark.StringDict{} - builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric) - builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy) - builtins["catch"] = starlark.NewBuiltin("catch", catch) - - if err := s.addConstants(&builtins); err != nil { - return err - } - - // Insert the persisted state if any - if s.state != nil { - builtins["state"] = s.state - } - - // Load the program. In case of an error we can try to insert the state - // which can be used implicitly e.g. when persisting states - program, err := s.sourceProgram(builtins) - if err != nil { - // Try again with a declared state. This might be necessary for - // state persistence. - s.state = starlark.NewDict(0) - builtins["state"] = s.state - p, serr := s.sourceProgram(builtins) - if serr != nil { - return err - } - program = p - } - - // Execute source - globals, err := program.Init(s.thread, builtins) + globals, err := program.Init(s.thread, s.builtins) if err != nil { return err } @@ -162,10 +168,8 @@ func (s *Common) Init() error { // metrics. Tasks that require global state will not be possible due to // this, so maybe we should relax this in the future. globals.Freeze() - s.globals = globals - s.functions = make(map[string]*starlark.Function) - s.parameters = make(map[string]starlark.Tuple) + return nil } diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 95d6ea851..c2d8ac658 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -30,10 +30,6 @@ var sampleConfig string var once sync.Once -const ( - defaultWatchMethod = "inotify" -) - var ( offsets = make(map[string]int64) offsetsMutex = new(sync.Mutex) diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 384075504..f954b7d8e 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -1,11 +1,10 @@ package tail import ( - "bytes" - "log" "os" "path/filepath" "runtime" + "strings" "testing" "time" @@ -22,14 +21,9 @@ import ( "github.com/influxdata/telegraf/testutil" ) -var ( - testdataDir = getTestdataDir() -) - -func NewInfluxParser() (telegraf.Parser, error) { +func newInfluxParser() (telegraf.Parser, error) { parser := &influx.Parser{} - err := parser.Init() - if err != nil { + if err := parser.Init(); err != nil { return nil, err } return parser, nil @@ -42,8 +36,8 @@ func NewTestTail() *Tail { offsetsCopy[k] = v } offsetsMutex.Unlock() - watchMethod := defaultWatchMethod + watchMethod := "inotify" if runtime.GOOS == "windows" { watchMethod = "poll" } @@ -58,61 +52,49 @@ func NewTestTail() *Tail { } func TestTailBadLine(t *testing.T) { - tmpfile, err := os.CreateTemp("", "") - require.NoError(t, err) - defer os.Remove(tmpfile.Name()) + content := ` +cpu mytag= foo usage_idle= 100 +cpu usage_idle=100 +` - _, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n") - require.NoError(t, err) + tmpfile := filepath.Join(t.TempDir(), "input.csv") + require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) - // Write good metric so we can detect when processing is complete - _, err = tmpfile.WriteString("cpu usage_idle=100\n") - require.NoError(t, err) - - require.NoError(t, tmpfile.Close()) - - buf := &bytes.Buffer{} - log.SetOutput(buf) + logger := &testutil.CaptureLogger{} tt := NewTestTail() - tt.Log = testutil.Logger{} + tt.Log = logger tt.FromBeginning = true - tt.Files = []string{tmpfile.Name()} - tt.SetParserFunc(NewInfluxParser) + tt.Files = []string{tmpfile} + tt.SetParserFunc(newInfluxParser) + require.NoError(t, tt.Init()) - err = tt.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} + var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) - require.NoError(t, acc.GatherError(tt.Gather)) acc.Wait(1) tt.Stop() - require.Contains(t, buf.String(), "Malformed log line") + require.Len(t, logger.Errors(), 1) + require.Contains(t, logger.Errors()[0], "Malformed log line") } func TestColoredLine(t *testing.T) { - tmpfile, err := os.CreateTemp("", "") - require.NoError(t, err) - defer os.Remove(tmpfile.Name()) - _, err = tmpfile.WriteString("cpu usage_idle=\033[4A\033[4A100\ncpu2 usage_idle=200\n") - require.NoError(t, err) - require.NoError(t, tmpfile.Close()) + content := "cpu usage_idle=\033[4A\033[4A100\ncpu2 usage_idle=200\n" + + tmpfile := filepath.Join(t.TempDir(), "input.csv") + require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) tt := NewTestTail() tt.Log = testutil.Logger{} tt.FromBeginning = true tt.Filters = []string{"ansi_color"} - tt.Files = []string{tmpfile.Name()} - tt.SetParserFunc(NewInfluxParser) + tt.Files = []string{tmpfile} + tt.SetParserFunc(newInfluxParser) + require.NoError(t, tt.Init()) - err = tt.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} + var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) defer tt.Stop() require.NoError(t, acc.GatherError(tt.Gather)) @@ -129,23 +111,19 @@ func TestColoredLine(t *testing.T) { } func TestTailDosLineEndings(t *testing.T) { - tmpfile, err := os.CreateTemp("", "") - require.NoError(t, err) - defer os.Remove(tmpfile.Name()) - _, err = tmpfile.WriteString("cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n") - require.NoError(t, err) - require.NoError(t, tmpfile.Close()) + content := "cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n" + + tmpfile := filepath.Join(t.TempDir(), "input.csv") + require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) tt := NewTestTail() tt.Log = testutil.Logger{} tt.FromBeginning = true - tt.Files = []string{tmpfile.Name()} - tt.SetParserFunc(NewInfluxParser) + tt.Files = []string{tmpfile} + tt.SetParserFunc(newInfluxParser) + require.NoError(t, tt.Init()) - err = tt.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} + var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) defer tt.Stop() require.NoError(t, acc.GatherError(tt.Gather)) @@ -169,7 +147,7 @@ func TestGrokParseLogFilesWithMultiline(t *testing.T) { tt := NewTail() tt.Log = testutil.Logger{} tt.FromBeginning = true - tt.Files = []string{filepath.Join(testdataDir, "test_multiline.log")} + tt.Files = []string{filepath.Join("testdata", "test_multiline.log")} tt.MultilineConfig = MultilineConfig{ Pattern: `^[^\[]`, MatchWhichLine: Previous, @@ -177,17 +155,15 @@ func TestGrokParseLogFilesWithMultiline(t *testing.T) { Timeout: &duration, } tt.SetParserFunc(createGrokParser) + require.NoError(t, tt.Init()) - err = tt.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} + var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) defer tt.Stop() acc.Wait(3) - expectedPath := filepath.Join(testdataDir, "test_multiline.log") + expectedPath := filepath.Join("testdata", "test_multiline.log") acc.AssertContainsTaggedFields(t, "tail_grok", map[string]interface{}{ "message": "HelloExample: This is debug", @@ -220,6 +196,7 @@ func TestGrokParseLogFilesWithMultiline(t *testing.T) { func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) { tmpfile, err := os.CreateTemp("", "") require.NoError(t, err) + defer tmpfile.Close() defer os.Remove(tmpfile.Name()) // This seems necessary in order to get the test to read the following lines. @@ -242,11 +219,9 @@ func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) { Timeout: &duration, } tt.SetParserFunc(createGrokParser) + require.NoError(t, tt.Init()) - err = tt.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} + var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) time.Sleep(11 * time.Millisecond) // will force timeout _, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] INFO HelloExample: This is info\r\n") @@ -287,7 +262,7 @@ func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *test tt := NewTestTail() tt.Log = testutil.Logger{} tt.FromBeginning = true - tt.Files = []string{filepath.Join(testdataDir, "test_multiline.log")} + tt.Files = []string{filepath.Join("testdata", "test_multiline.log")} tt.MultilineConfig = MultilineConfig{ Pattern: `^[^\[]`, MatchWhichLine: Previous, @@ -295,11 +270,9 @@ func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *test Timeout: &duration, } tt.SetParserFunc(createGrokParser) + require.NoError(t, tt.Init()) - err := tt.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} + var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) acc.Wait(3) require.Equal(t, uint64(3), acc.NMetrics()) @@ -307,7 +280,7 @@ func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *test tt.Stop() acc.Wait(4) - expectedPath := filepath.Join(testdataDir, "test_multiline.log") + expectedPath := filepath.Join("testdata", "test_multiline.log") acc.AssertContainsTaggedFields(t, "tail_grok", map[string]interface{}{ "message": "HelloExample: This is warn", @@ -322,7 +295,7 @@ func createGrokParser() (telegraf.Parser, error) { parser := &grok.Parser{ Measurement: "tail_grok", Patterns: []string{"%{TEST_LOG_MULTILINE}"}, - CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, + CustomPatternFiles: []string{filepath.Join("testdata", "test-patterns")}, Log: testutil.Logger{}, } err := parser.Init() @@ -331,22 +304,18 @@ func createGrokParser() (telegraf.Parser, error) { // The csv parser should only parse the header line once per file. func TestCSVHeadersParsedOnce(t *testing.T) { - tmpfile, err := os.CreateTemp("", "") - require.NoError(t, err) - defer os.Remove(tmpfile.Name()) - - _, err = tmpfile.WriteString(` + content := ` measurement,time_idle cpu,42 cpu,42 -`) - require.NoError(t, err) - require.NoError(t, tmpfile.Close()) +` + tmpfile := filepath.Join(t.TempDir(), "input.csv") + require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) plugin := NewTestTail() plugin.Log = testutil.Logger{} plugin.FromBeginning = true - plugin.Files = []string{tmpfile.Name()} + plugin.Files = []string{tmpfile} plugin.SetParserFunc(func() (telegraf.Parser, error) { parser := csv.Parser{ MeasurementColumn: "measurement", @@ -356,13 +325,12 @@ cpu,42 err := parser.Init() return &parser, err }) - require.NoError(t, plugin.Init()) expected := []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{ - "path": tmpfile.Name(), + "path": tmpfile, }, map[string]interface{}{ "time_idle": 42, @@ -370,7 +338,7 @@ cpu,42 time.Unix(0, 0)), testutil.MustMetric("cpu", map[string]string{ - "path": tmpfile.Name(), + "path": tmpfile, }, map[string]interface{}{ "time_idle": 42, @@ -383,30 +351,45 @@ cpu,42 defer plugin.Stop() require.NoError(t, plugin.Gather(&acc)) - require.Eventually(t, func() bool { - return acc.NFields() >= len(expected) - }, 3*time.Second, 100*time.Millisecond) + require.Eventuallyf(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } func TestCSVMultiHeaderWithSkipRowANDColumn(t *testing.T) { - tmpfile, err := os.CreateTemp("", "") - require.NoError(t, err) - defer os.Remove(tmpfile.Name()) - - _, err = tmpfile.WriteString(`garbage nonsense + content := `garbage nonsense skip,measurement,value row,1,2 skip1,cpu,42 skip2,mem,100 -`) - require.NoError(t, err) - require.NoError(t, tmpfile.Close()) +` + tmpfile := filepath.Join(t.TempDir(), "input.csv") + require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) + + expected := []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "path": tmpfile, + }, + map[string]interface{}{ + "value2": 42, + }, + time.Unix(0, 0)), + testutil.MustMetric("mem", + map[string]string{ + "path": tmpfile, + }, + map[string]interface{}{ + "value2": 100, + }, + time.Unix(0, 0)), + } plugin := NewTestTail() plugin.Log = testutil.Logger{} plugin.FromBeginning = true - plugin.Files = []string{tmpfile.Name()} + plugin.Files = []string{tmpfile} plugin.SetParserFunc(func() (telegraf.Parser, error) { parser := csv.Parser{ MeasurementColumn: "measurement1", @@ -418,95 +401,72 @@ skip2,mem,100 err := parser.Init() return &parser, err }) + require.NoError(t, plugin.Init()) - err = plugin.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} - err = plugin.Start(&acc) - require.NoError(t, err) + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) defer plugin.Stop() - err = plugin.Gather(&acc) - require.NoError(t, err) - acc.Wait(2) + + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) plugin.Stop() - expected := []telegraf.Metric{ - testutil.MustMetric("cpu", - map[string]string{ - "path": tmpfile.Name(), - }, - map[string]interface{}{ - "value2": 42, - }, - time.Unix(0, 0)), - testutil.MustMetric("mem", - map[string]string{ - "path": tmpfile.Name(), - }, - map[string]interface{}{ - "value2": 100, - }, - time.Unix(0, 0)), - } testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) } // Ensure that the first line can produce multiple metrics (#6138) func TestMultipleMetricsOnFirstLine(t *testing.T) { - tmpfile, err := os.CreateTemp("", "") - require.NoError(t, err) - defer os.Remove(tmpfile.Name()) - - _, err = tmpfile.WriteString(` + content := ` [{"time_idle": 42}, {"time_idle": 42}] -`) - require.NoError(t, err) - require.NoError(t, tmpfile.Close()) +` + + tmpfile := filepath.Join(t.TempDir(), "input.csv") + require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600)) + + expected := []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "customPathTagMyFile": tmpfile, + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0)), + testutil.MustMetric("cpu", + map[string]string{ + "customPathTagMyFile": tmpfile, + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0)), + } plugin := NewTestTail() plugin.Log = testutil.Logger{} plugin.FromBeginning = true - plugin.Files = []string{tmpfile.Name()} + plugin.Files = []string{tmpfile} plugin.PathTag = "customPathTagMyFile" plugin.SetParserFunc(func() (telegraf.Parser, error) { p := &json.Parser{MetricName: "cpu"} err := p.Init() return p, err }) + require.NoError(t, plugin.Init()) - err = plugin.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} - err = plugin.Start(&acc) - require.NoError(t, err) + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) defer plugin.Stop() - err = plugin.Gather(&acc) - require.NoError(t, err) - acc.Wait(2) + + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) plugin.Stop() - expected := []telegraf.Metric{ - testutil.MustMetric("cpu", - map[string]string{ - "customPathTagMyFile": tmpfile.Name(), - }, - map[string]interface{}{ - "time_idle": 42.0, - }, - time.Unix(0, 0)), - testutil.MustMetric("cpu", - map[string]string{ - "customPathTagMyFile": tmpfile.Name(), - }, - map[string]interface{}{ - "time_idle": 42.0, - }, - time.Unix(0, 0)), - } - testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), - testutil.IgnoreTime()) + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } func TestCharacterEncoding(t *testing.T) { @@ -558,7 +518,7 @@ func TestCharacterEncoding(t *testing.T) { ), } - watchMethod := defaultWatchMethod + watchMethod := "inotify" if runtime.GOOS == "windows" { watchMethod = "poll" } @@ -610,7 +570,7 @@ func TestCharacterEncoding(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { plugin := &Tail{ - Files: []string{filepath.Join(testdataDir, tt.testfiles)}, + Files: []string{filepath.Join("testdata", tt.testfiles)}, FromBeginning: tt.fromBeginning, MaxUndeliveredLines: 1000, Log: testutil.Logger{}, @@ -618,7 +578,7 @@ func TestCharacterEncoding(t *testing.T) { WatchMethod: watchMethod, } - plugin.SetParserFunc(NewInfluxParser) + plugin.SetParserFunc(newInfluxParser) require.NoError(t, plugin.Init()) if tt.offset != 0 { @@ -629,7 +589,9 @@ func TestCharacterEncoding(t *testing.T) { var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) - acc.Wait(len(tt.expected)) + require.Eventuallyf(t, func() bool { + return acc.NMetrics() >= uint64(len(tt.expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(tt.expected), acc.NMetrics()) plugin.Stop() actual := acc.GetTelegrafMetrics() @@ -645,22 +607,20 @@ func TestCharacterEncoding(t *testing.T) { func TestTailEOF(t *testing.T) { tmpfile, err := os.CreateTemp("", "") require.NoError(t, err) + defer tmpfile.Close() defer os.Remove(tmpfile.Name()) _, err = tmpfile.WriteString("cpu usage_idle=100\r\n") require.NoError(t, err) - err = tmpfile.Sync() - require.NoError(t, err) + require.NoError(t, tmpfile.Sync()) tt := NewTestTail() tt.Log = testutil.Logger{} tt.FromBeginning = true tt.Files = []string{tmpfile.Name()} - tt.SetParserFunc(NewInfluxParser) + tt.SetParserFunc(newInfluxParser) + require.NoError(t, tt.Init()) - err = tt.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} + var acc testutil.Accumulator require.NoError(t, tt.Start(&acc)) defer tt.Stop() require.NoError(t, acc.GatherError(tt.Gather)) @@ -668,8 +628,7 @@ func TestTailEOF(t *testing.T) { _, err = tmpfile.WriteString("cpu2 usage_idle=200\r\n") require.NoError(t, err) - err = tmpfile.Sync() - require.NoError(t, err) + require.NoError(t, tmpfile.Sync()) acc.Wait(2) require.NoError(t, acc.GatherError(tt.Gather)) @@ -681,15 +640,14 @@ func TestTailEOF(t *testing.T) { map[string]interface{}{ "usage_idle": float64(200), }) - - err = tmpfile.Close() - require.NoError(t, err) + require.NoError(t, tmpfile.Close()) } func TestCSVBehavior(t *testing.T) { // Prepare the input file input, err := os.CreateTemp("", "") require.NoError(t, err) + defer input.Close() defer os.Remove(input.Name()) // Write header _, err = input.WriteString("a,b\n") @@ -759,8 +717,6 @@ func TestCSVBehavior(t *testing.T) { require.NoError(t, input.Sync()) require.NoError(t, plugin.Gather(&acc)) require.Eventuallyf(t, func() bool { - acc.Lock() - defer acc.Unlock() return acc.NMetrics() >= uint64(len(expected)) }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) @@ -776,12 +732,71 @@ func TestCSVBehavior(t *testing.T) { require.NoError(t, input.Close()) } -func getTestdataDir() string { - dir, err := os.Getwd() - if err != nil { - // if we cannot even establish the test directory, further progress is meaningless - panic(err) +func TestStatePersistence(t *testing.T) { + // Prepare the input file + lines := []string{ + "metric,tag=value foo=1i 1730478201000000000\n", + "metric,tag=value foo=2i 1730478211000000000\n", + "metric,tag=value foo=3i 1730478221000000000\n", + } + content := []byte(strings.Join(lines, "")) + + inputFilename := filepath.Join(t.TempDir(), "input.influx") + require.NoError(t, os.WriteFile(inputFilename, content, 0600)) + + // Define the metrics and state to skip the first metric + state := map[string]int64{inputFilename: int64(len(lines[0]))} + expectedState := map[string]int64{inputFilename: int64(len(content))} + expected := []telegraf.Metric{ + metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"foo": 2}, + time.Unix(1730478211, 0), + ), + metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"foo": 3}, + time.Unix(1730478221, 0), + ), } - return filepath.Join(dir, "testdata") + // Configure the plugin + plugin := &Tail{ + Files: []string{inputFilename}, + MaxUndeliveredLines: 1000, + offsets: make(map[string]int64, 0), + Log: testutil.Logger{}, + } + plugin.SetParserFunc(newInfluxParser) + require.NoError(t, plugin.Init()) + require.Empty(t, plugin.offsets) + + // Setup the "persisted" state + var pi telegraf.StatefulPlugin = plugin + require.NoError(t, pi.SetState(state)) + require.Len(t, plugin.offsets, 1) + + // Run the plugin + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + require.NoError(t, plugin.Gather(&acc)) + require.Eventuallyf(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) + plugin.Stop() + + // Check the result + options := []cmp.Option{ + testutil.SortMetrics(), + testutil.IgnoreTime(), + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) + + // Check getting the persisted state + actualState, ok := pi.GetState().(map[string]int64) + require.True(t, ok, "state is not a map[string]int64") + require.Equal(t, expectedState, actualState) } diff --git a/plugins/processors/dedup/dedup_test.go b/plugins/processors/dedup/dedup_test.go index 7eded25d5..41c276018 100644 --- a/plugins/processors/dedup/dedup_test.go +++ b/plugins/processors/dedup/dedup_test.go @@ -1,6 +1,7 @@ package dedup import ( + "fmt" "sync" "testing" "time" @@ -457,3 +458,75 @@ func TestTracking(t *testing.T) { return len(input) == len(delivered) }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected)) } + +func TestStatePersistence(t *testing.T) { + now := time.Now() + + // Define the metrics and states + state := fmt.Sprintf("metric,tag=value foo=1i %d\n", now.Add(-1*time.Minute).UnixNano()) + input := []telegraf.Metric{ + metric.New("metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"foo": 1}, + now.Add(-2*time.Second), + ), + metric.New("metric", + map[string]string{"tag": "pass"}, + map[string]interface{}{"foo": 1}, + now.Add(-1*time.Second), + ), + metric.New( + "metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"foo": 3}, + now, + ), + } + + expected := []telegraf.Metric{ + metric.New("metric", + map[string]string{"tag": "pass"}, + map[string]interface{}{"foo": 1}, + now.Add(-1*time.Second), + ), + metric.New( + "metric", + map[string]string{"tag": "value"}, + map[string]interface{}{"foo": 3}, + now, + ), + } + expectedState := []string{ + fmt.Sprintf("metric,tag=pass foo=1i %d\n", now.Add(-1*time.Second).UnixNano()), + fmt.Sprintf("metric,tag=value foo=3i %d\n", now.UnixNano()), + } + + // Configure the plugin + plugin := &Dedup{ + DedupInterval: config.Duration(10 * time.Hour), // use a long interval to avoid flaky tests + FlushTime: now.Add(-1 * time.Second), + Cache: make(map[uint64]telegraf.Metric), + } + require.Empty(t, plugin.Cache) + + // Setup the "persisted" state + var pi telegraf.StatefulPlugin = plugin + require.NoError(t, pi.SetState([]byte(state))) + require.Len(t, plugin.Cache, 1) + + // Process expected metrics and compare with resulting metrics + actual := plugin.Apply(input...) + testutil.RequireMetricsEqual(t, expected, actual) + + // Check getting the persisted state + // Because the cache is a map, the order of metrics in the state is not + // guaranteed, so check the string contents regardless of the order. + actualState, ok := pi.GetState().([]byte) + require.True(t, ok, "state is not a bytes array") + var expectedLen int + for _, m := range expectedState { + require.Contains(t, string(actualState), m) + expectedLen += len(m) + } + require.Len(t, actualState, expectedLen) +} diff --git a/plugins/processors/starlark/starlark.go b/plugins/processors/starlark/starlark.go index 61785e98b..ba3184e14 100644 --- a/plugins/processors/starlark/starlark.go +++ b/plugins/processors/starlark/starlark.go @@ -120,8 +120,7 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err return nil } -func (s *Starlark) Stop() { -} +func (s *Starlark) Stop() {} func containsMetric(metrics []telegraf.Metric, target telegraf.Metric) bool { for _, m := range metrics { diff --git a/plugins/processors/starlark/starlark_test.go b/plugins/processors/starlark/starlark_test.go index 13127cfa4..e9ef5c770 100644 --- a/plugins/processors/starlark/starlark_test.go +++ b/plugins/processors/starlark/starlark_test.go @@ -3680,13 +3680,13 @@ def apply(metric): Log: testutil.Logger{}, }, } + require.NoError(t, plugin.Init()) // Setup the "persisted" state var pi telegraf.StatefulPlugin = plugin var buf bytes.Buffer require.NoError(t, gob.NewEncoder(&buf).Encode(map[string]interface{}{"instance": "myhost"})) require.NoError(t, pi.SetState(buf.Bytes())) - require.NoError(t, plugin.Init()) var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc))