fix(agent): Restore setup order of stateful plugins to Init() then SetState() (#16123)

This commit is contained in:
Sven Rebhan 2024-11-13 08:21:49 +01:00 committed by GitHub
parent 70e1cbc0be
commit 35fe105bb4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 327 additions and 238 deletions

View File

@ -106,6 +106,11 @@ func (a *Agent) Run(ctx context.Context) error {
time.Duration(a.Config.Agent.Interval), a.Config.Agent.Quiet, time.Duration(a.Config.Agent.Interval), a.Config.Agent.Quiet,
a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval)) a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval))
log.Printf("D! [agent] Initializing plugins")
if err := a.InitPlugins(); err != nil {
return err
}
if a.Config.Persister != nil { if a.Config.Persister != nil {
log.Printf("D! [agent] Initializing plugin states") log.Printf("D! [agent] Initializing plugin states")
if err := a.initPersister(); err != nil { if err := a.initPersister(); err != nil {
@ -119,11 +124,6 @@ func (a *Agent) Run(ctx context.Context) error {
} }
} }
log.Printf("D! [agent] Initializing plugins")
if err := a.InitPlugins(); err != nil {
return err
}
startTime := time.Now() startTime := time.Now()
log.Printf("D! [agent] Connecting outputs") log.Printf("D! [agent] Connecting outputs")

View File

@ -31,15 +31,17 @@ It is intended to
The persistence will use the following steps: The persistence will use the following steps:
- Compute an unique ID for each of the plugin _instances_
- Startup Telegraf plugins calling `Init()`, etc.
- Initialize persistence framework with the user specified `statefile` location - Initialize persistence framework with the user specified `statefile` location
and load the state if present and load the state if present
- Determine all stateful plugin instances by fulfilling the `StatefulPlugin` - Determine all stateful plugin instances by fulfilling the `StatefulPlugin`
interface interface
- Compute an unique ID for each of the plugin _instances_
- Restore plugin states (if any) for each plugin ID present in the state-file - Restore plugin states (if any) for each plugin ID present in the state-file
- Startup Telegraf plugins calling `Init()`, etc.
- Run data-collection etc... - Run data-collection etc...
- On shutdown, query the state of all registered stateful plugins state - On shutdown, stopping all Telegraf plugins calling `Stop()` or `Close()`
depending on the plugin type
- Query the state of all registered stateful plugins state
- Create an overall state-map with the plugin instance ID as a key and the - Create an overall state-map with the plugin instance ID as a key and the
serialized plugin state as value. serialized plugin state as value.
- Marshal the overall state-map and store to disk - Marshal the overall state-map and store to disk
@ -85,7 +87,7 @@ for the overall state. On-disk, the overall state of Telegraf is stored as JSON.
To restore the state of a plugin, the overall Telegraf state is first To restore the state of a plugin, the overall Telegraf state is first
deserialized from the on-disk JSON data and a lookup for the plugin ID is deserialized from the on-disk JSON data and a lookup for the plugin ID is
performed in the resulting map. The value, if found, is then deserialized to the performed in the resulting map. The value, if found, is then deserialized to the
plugin's state data-structure and provided to the plugin before calling `Init()`. plugin's state data-structure and provided to the plugin after calling `Init()`.
## Is / Is-not ## Is / Is-not

View File

@ -25,6 +25,7 @@ type Common struct {
StarlarkLoadFunc func(module string, logger telegraf.Logger) (starlark.StringDict, error) StarlarkLoadFunc func(module string, logger telegraf.Logger) (starlark.StringDict, error)
thread *starlark.Thread thread *starlark.Thread
builtins starlark.StringDict
globals starlark.StringDict globals starlark.StringDict
functions map[string]*starlark.Function functions map[string]*starlark.Function
parameters map[string]starlark.Tuple parameters map[string]starlark.Tuple
@ -97,8 +98,9 @@ func (s *Common) SetState(state interface{}) error {
return fmt.Errorf("state item %q cannot be set: %w", k, err) return fmt.Errorf("state item %q cannot be set: %w", k, err)
} }
} }
s.builtins["state"] = s.state
return nil return s.InitProgram()
} }
func (s *Common) Init() error { func (s *Common) Init() error {
@ -109,44 +111,48 @@ func (s *Common) Init() error {
return errors.New("both source or script cannot be set") return errors.New("both source or script cannot be set")
} }
s.builtins = starlark.StringDict{}
s.builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric)
s.builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy)
s.builtins["catch"] = starlark.NewBuiltin("catch", catch)
if err := s.addConstants(&s.builtins); err != nil {
return err
}
// Initialize the program
if err := s.InitProgram(); err != nil {
// Try again with a declared state. This might be necessary for
// state persistence.
s.state = starlark.NewDict(0)
s.builtins["state"] = s.state
if serr := s.InitProgram(); serr != nil {
return err
}
}
s.functions = make(map[string]*starlark.Function)
s.parameters = make(map[string]starlark.Tuple)
return nil
}
func (s *Common) InitProgram() error {
// Load the program. In case of an error we can try to insert the state
// which can be used implicitly e.g. when persisting states
program, err := s.sourceProgram(s.builtins)
if err != nil {
return err
}
// Execute source
s.thread = &starlark.Thread{ s.thread = &starlark.Thread{
Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) }, Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) },
Load: func(_ *starlark.Thread, module string) (starlark.StringDict, error) { Load: func(_ *starlark.Thread, module string) (starlark.StringDict, error) {
return s.StarlarkLoadFunc(module, s.Log) return s.StarlarkLoadFunc(module, s.Log)
}, },
} }
globals, err := program.Init(s.thread, s.builtins)
builtins := starlark.StringDict{}
builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric)
builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy)
builtins["catch"] = starlark.NewBuiltin("catch", catch)
if err := s.addConstants(&builtins); err != nil {
return err
}
// Insert the persisted state if any
if s.state != nil {
builtins["state"] = s.state
}
// Load the program. In case of an error we can try to insert the state
// which can be used implicitly e.g. when persisting states
program, err := s.sourceProgram(builtins)
if err != nil {
// Try again with a declared state. This might be necessary for
// state persistence.
s.state = starlark.NewDict(0)
builtins["state"] = s.state
p, serr := s.sourceProgram(builtins)
if serr != nil {
return err
}
program = p
}
// Execute source
globals, err := program.Init(s.thread, builtins)
if err != nil { if err != nil {
return err return err
} }
@ -162,10 +168,8 @@ func (s *Common) Init() error {
// metrics. Tasks that require global state will not be possible due to // metrics. Tasks that require global state will not be possible due to
// this, so maybe we should relax this in the future. // this, so maybe we should relax this in the future.
globals.Freeze() globals.Freeze()
s.globals = globals s.globals = globals
s.functions = make(map[string]*starlark.Function)
s.parameters = make(map[string]starlark.Tuple)
return nil return nil
} }

View File

@ -30,10 +30,6 @@ var sampleConfig string
var once sync.Once var once sync.Once
const (
defaultWatchMethod = "inotify"
)
var ( var (
offsets = make(map[string]int64) offsets = make(map[string]int64)
offsetsMutex = new(sync.Mutex) offsetsMutex = new(sync.Mutex)

View File

@ -1,11 +1,10 @@
package tail package tail
import ( import (
"bytes"
"log"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"strings"
"testing" "testing"
"time" "time"
@ -22,14 +21,9 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
var ( func newInfluxParser() (telegraf.Parser, error) {
testdataDir = getTestdataDir()
)
func NewInfluxParser() (telegraf.Parser, error) {
parser := &influx.Parser{} parser := &influx.Parser{}
err := parser.Init() if err := parser.Init(); err != nil {
if err != nil {
return nil, err return nil, err
} }
return parser, nil return parser, nil
@ -42,8 +36,8 @@ func NewTestTail() *Tail {
offsetsCopy[k] = v offsetsCopy[k] = v
} }
offsetsMutex.Unlock() offsetsMutex.Unlock()
watchMethod := defaultWatchMethod
watchMethod := "inotify"
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
watchMethod = "poll" watchMethod = "poll"
} }
@ -58,61 +52,49 @@ func NewTestTail() *Tail {
} }
func TestTailBadLine(t *testing.T) { func TestTailBadLine(t *testing.T) {
tmpfile, err := os.CreateTemp("", "") content := `
require.NoError(t, err) cpu mytag= foo usage_idle= 100
defer os.Remove(tmpfile.Name()) cpu usage_idle=100
`
_, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n") tmpfile := filepath.Join(t.TempDir(), "input.csv")
require.NoError(t, err) require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600))
// Write good metric so we can detect when processing is complete logger := &testutil.CaptureLogger{}
_, err = tmpfile.WriteString("cpu usage_idle=100\n")
require.NoError(t, err)
require.NoError(t, tmpfile.Close())
buf := &bytes.Buffer{}
log.SetOutput(buf)
tt := NewTestTail() tt := NewTestTail()
tt.Log = testutil.Logger{} tt.Log = logger
tt.FromBeginning = true tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()} tt.Files = []string{tmpfile}
tt.SetParserFunc(NewInfluxParser) tt.SetParserFunc(newInfluxParser)
require.NoError(t, tt.Init())
err = tt.Init() var acc testutil.Accumulator
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
require.NoError(t, acc.GatherError(tt.Gather)) require.NoError(t, acc.GatherError(tt.Gather))
acc.Wait(1) acc.Wait(1)
tt.Stop() tt.Stop()
require.Contains(t, buf.String(), "Malformed log line") require.Len(t, logger.Errors(), 1)
require.Contains(t, logger.Errors()[0], "Malformed log line")
} }
func TestColoredLine(t *testing.T) { func TestColoredLine(t *testing.T) {
tmpfile, err := os.CreateTemp("", "") content := "cpu usage_idle=\033[4A\033[4A100\ncpu2 usage_idle=200\n"
require.NoError(t, err)
defer os.Remove(tmpfile.Name()) tmpfile := filepath.Join(t.TempDir(), "input.csv")
_, err = tmpfile.WriteString("cpu usage_idle=\033[4A\033[4A100\ncpu2 usage_idle=200\n") require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600))
require.NoError(t, err)
require.NoError(t, tmpfile.Close())
tt := NewTestTail() tt := NewTestTail()
tt.Log = testutil.Logger{} tt.Log = testutil.Logger{}
tt.FromBeginning = true tt.FromBeginning = true
tt.Filters = []string{"ansi_color"} tt.Filters = []string{"ansi_color"}
tt.Files = []string{tmpfile.Name()} tt.Files = []string{tmpfile}
tt.SetParserFunc(NewInfluxParser) tt.SetParserFunc(newInfluxParser)
require.NoError(t, tt.Init())
err = tt.Init() var acc testutil.Accumulator
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
defer tt.Stop() defer tt.Stop()
require.NoError(t, acc.GatherError(tt.Gather)) require.NoError(t, acc.GatherError(tt.Gather))
@ -129,23 +111,19 @@ func TestColoredLine(t *testing.T) {
} }
func TestTailDosLineEndings(t *testing.T) { func TestTailDosLineEndings(t *testing.T) {
tmpfile, err := os.CreateTemp("", "") content := "cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n"
require.NoError(t, err)
defer os.Remove(tmpfile.Name()) tmpfile := filepath.Join(t.TempDir(), "input.csv")
_, err = tmpfile.WriteString("cpu usage_idle=100\r\ncpu2 usage_idle=200\r\n") require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600))
require.NoError(t, err)
require.NoError(t, tmpfile.Close())
tt := NewTestTail() tt := NewTestTail()
tt.Log = testutil.Logger{} tt.Log = testutil.Logger{}
tt.FromBeginning = true tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()} tt.Files = []string{tmpfile}
tt.SetParserFunc(NewInfluxParser) tt.SetParserFunc(newInfluxParser)
require.NoError(t, tt.Init())
err = tt.Init() var acc testutil.Accumulator
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
defer tt.Stop() defer tt.Stop()
require.NoError(t, acc.GatherError(tt.Gather)) require.NoError(t, acc.GatherError(tt.Gather))
@ -169,7 +147,7 @@ func TestGrokParseLogFilesWithMultiline(t *testing.T) {
tt := NewTail() tt := NewTail()
tt.Log = testutil.Logger{} tt.Log = testutil.Logger{}
tt.FromBeginning = true tt.FromBeginning = true
tt.Files = []string{filepath.Join(testdataDir, "test_multiline.log")} tt.Files = []string{filepath.Join("testdata", "test_multiline.log")}
tt.MultilineConfig = MultilineConfig{ tt.MultilineConfig = MultilineConfig{
Pattern: `^[^\[]`, Pattern: `^[^\[]`,
MatchWhichLine: Previous, MatchWhichLine: Previous,
@ -177,17 +155,15 @@ func TestGrokParseLogFilesWithMultiline(t *testing.T) {
Timeout: &duration, Timeout: &duration,
} }
tt.SetParserFunc(createGrokParser) tt.SetParserFunc(createGrokParser)
require.NoError(t, tt.Init())
err = tt.Init() var acc testutil.Accumulator
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
defer tt.Stop() defer tt.Stop()
acc.Wait(3) acc.Wait(3)
expectedPath := filepath.Join(testdataDir, "test_multiline.log") expectedPath := filepath.Join("testdata", "test_multiline.log")
acc.AssertContainsTaggedFields(t, "tail_grok", acc.AssertContainsTaggedFields(t, "tail_grok",
map[string]interface{}{ map[string]interface{}{
"message": "HelloExample: This is debug", "message": "HelloExample: This is debug",
@ -220,6 +196,7 @@ func TestGrokParseLogFilesWithMultiline(t *testing.T) {
func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) { func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) {
tmpfile, err := os.CreateTemp("", "") tmpfile, err := os.CreateTemp("", "")
require.NoError(t, err) require.NoError(t, err)
defer tmpfile.Close()
defer os.Remove(tmpfile.Name()) defer os.Remove(tmpfile.Name())
// This seems necessary in order to get the test to read the following lines. // This seems necessary in order to get the test to read the following lines.
@ -242,11 +219,9 @@ func TestGrokParseLogFilesWithMultilineTimeout(t *testing.T) {
Timeout: &duration, Timeout: &duration,
} }
tt.SetParserFunc(createGrokParser) tt.SetParserFunc(createGrokParser)
require.NoError(t, tt.Init())
err = tt.Init() var acc testutil.Accumulator
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
time.Sleep(11 * time.Millisecond) // will force timeout time.Sleep(11 * time.Millisecond) // will force timeout
_, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] INFO HelloExample: This is info\r\n") _, err = tmpfile.WriteString("[04/Jun/2016:12:41:48 +0100] INFO HelloExample: This is info\r\n")
@ -287,7 +262,7 @@ func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *test
tt := NewTestTail() tt := NewTestTail()
tt.Log = testutil.Logger{} tt.Log = testutil.Logger{}
tt.FromBeginning = true tt.FromBeginning = true
tt.Files = []string{filepath.Join(testdataDir, "test_multiline.log")} tt.Files = []string{filepath.Join("testdata", "test_multiline.log")}
tt.MultilineConfig = MultilineConfig{ tt.MultilineConfig = MultilineConfig{
Pattern: `^[^\[]`, Pattern: `^[^\[]`,
MatchWhichLine: Previous, MatchWhichLine: Previous,
@ -295,11 +270,9 @@ func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *test
Timeout: &duration, Timeout: &duration,
} }
tt.SetParserFunc(createGrokParser) tt.SetParserFunc(createGrokParser)
require.NoError(t, tt.Init())
err := tt.Init() var acc testutil.Accumulator
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
acc.Wait(3) acc.Wait(3)
require.Equal(t, uint64(3), acc.NMetrics()) require.Equal(t, uint64(3), acc.NMetrics())
@ -307,7 +280,7 @@ func TestGrokParseLogFilesWithMultilineTailerCloseFlushesMultilineBuffer(t *test
tt.Stop() tt.Stop()
acc.Wait(4) acc.Wait(4)
expectedPath := filepath.Join(testdataDir, "test_multiline.log") expectedPath := filepath.Join("testdata", "test_multiline.log")
acc.AssertContainsTaggedFields(t, "tail_grok", acc.AssertContainsTaggedFields(t, "tail_grok",
map[string]interface{}{ map[string]interface{}{
"message": "HelloExample: This is warn", "message": "HelloExample: This is warn",
@ -322,7 +295,7 @@ func createGrokParser() (telegraf.Parser, error) {
parser := &grok.Parser{ parser := &grok.Parser{
Measurement: "tail_grok", Measurement: "tail_grok",
Patterns: []string{"%{TEST_LOG_MULTILINE}"}, Patterns: []string{"%{TEST_LOG_MULTILINE}"},
CustomPatternFiles: []string{filepath.Join(testdataDir, "test-patterns")}, CustomPatternFiles: []string{filepath.Join("testdata", "test-patterns")},
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
err := parser.Init() err := parser.Init()
@ -331,22 +304,18 @@ func createGrokParser() (telegraf.Parser, error) {
// The csv parser should only parse the header line once per file. // The csv parser should only parse the header line once per file.
func TestCSVHeadersParsedOnce(t *testing.T) { func TestCSVHeadersParsedOnce(t *testing.T) {
tmpfile, err := os.CreateTemp("", "") content := `
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString(`
measurement,time_idle measurement,time_idle
cpu,42 cpu,42
cpu,42 cpu,42
`) `
require.NoError(t, err) tmpfile := filepath.Join(t.TempDir(), "input.csv")
require.NoError(t, tmpfile.Close()) require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600))
plugin := NewTestTail() plugin := NewTestTail()
plugin.Log = testutil.Logger{} plugin.Log = testutil.Logger{}
plugin.FromBeginning = true plugin.FromBeginning = true
plugin.Files = []string{tmpfile.Name()} plugin.Files = []string{tmpfile}
plugin.SetParserFunc(func() (telegraf.Parser, error) { plugin.SetParserFunc(func() (telegraf.Parser, error) {
parser := csv.Parser{ parser := csv.Parser{
MeasurementColumn: "measurement", MeasurementColumn: "measurement",
@ -356,13 +325,12 @@ cpu,42
err := parser.Init() err := parser.Init()
return &parser, err return &parser, err
}) })
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
expected := []telegraf.Metric{ expected := []telegraf.Metric{
testutil.MustMetric("cpu", testutil.MustMetric("cpu",
map[string]string{ map[string]string{
"path": tmpfile.Name(), "path": tmpfile,
}, },
map[string]interface{}{ map[string]interface{}{
"time_idle": 42, "time_idle": 42,
@ -370,7 +338,7 @@ cpu,42
time.Unix(0, 0)), time.Unix(0, 0)),
testutil.MustMetric("cpu", testutil.MustMetric("cpu",
map[string]string{ map[string]string{
"path": tmpfile.Name(), "path": tmpfile,
}, },
map[string]interface{}{ map[string]interface{}{
"time_idle": 42, "time_idle": 42,
@ -383,30 +351,45 @@ cpu,42
defer plugin.Stop() defer plugin.Stop()
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
require.Eventually(t, func() bool { require.Eventuallyf(t, func() bool {
return acc.NFields() >= len(expected) return acc.NMetrics() >= uint64(len(expected))
}, 3*time.Second, 100*time.Millisecond) }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
} }
func TestCSVMultiHeaderWithSkipRowANDColumn(t *testing.T) { func TestCSVMultiHeaderWithSkipRowANDColumn(t *testing.T) {
tmpfile, err := os.CreateTemp("", "") content := `garbage nonsense
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString(`garbage nonsense
skip,measurement,value skip,measurement,value
row,1,2 row,1,2
skip1,cpu,42 skip1,cpu,42
skip2,mem,100 skip2,mem,100
`) `
require.NoError(t, err) tmpfile := filepath.Join(t.TempDir(), "input.csv")
require.NoError(t, tmpfile.Close()) require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600))
expected := []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"path": tmpfile,
},
map[string]interface{}{
"value2": 42,
},
time.Unix(0, 0)),
testutil.MustMetric("mem",
map[string]string{
"path": tmpfile,
},
map[string]interface{}{
"value2": 100,
},
time.Unix(0, 0)),
}
plugin := NewTestTail() plugin := NewTestTail()
plugin.Log = testutil.Logger{} plugin.Log = testutil.Logger{}
plugin.FromBeginning = true plugin.FromBeginning = true
plugin.Files = []string{tmpfile.Name()} plugin.Files = []string{tmpfile}
plugin.SetParserFunc(func() (telegraf.Parser, error) { plugin.SetParserFunc(func() (telegraf.Parser, error) {
parser := csv.Parser{ parser := csv.Parser{
MeasurementColumn: "measurement1", MeasurementColumn: "measurement1",
@ -418,95 +401,72 @@ skip2,mem,100
err := parser.Init() err := parser.Init()
return &parser, err return &parser, err
}) })
require.NoError(t, plugin.Init())
err = plugin.Init() var acc testutil.Accumulator
require.NoError(t, err) require.NoError(t, plugin.Start(&acc))
acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
defer plugin.Stop() defer plugin.Stop()
err = plugin.Gather(&acc)
require.NoError(t, err) require.NoError(t, plugin.Gather(&acc))
acc.Wait(2) require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
plugin.Stop() plugin.Stop()
expected := []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"path": tmpfile.Name(),
},
map[string]interface{}{
"value2": 42,
},
time.Unix(0, 0)),
testutil.MustMetric("mem",
map[string]string{
"path": tmpfile.Name(),
},
map[string]interface{}{
"value2": 100,
},
time.Unix(0, 0)),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
} }
// Ensure that the first line can produce multiple metrics (#6138) // Ensure that the first line can produce multiple metrics (#6138)
func TestMultipleMetricsOnFirstLine(t *testing.T) { func TestMultipleMetricsOnFirstLine(t *testing.T) {
tmpfile, err := os.CreateTemp("", "") content := `
require.NoError(t, err)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString(`
[{"time_idle": 42}, {"time_idle": 42}] [{"time_idle": 42}, {"time_idle": 42}]
`) `
require.NoError(t, err)
require.NoError(t, tmpfile.Close()) tmpfile := filepath.Join(t.TempDir(), "input.csv")
require.NoError(t, os.WriteFile(tmpfile, []byte(content), 0600))
expected := []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"customPathTagMyFile": tmpfile,
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0)),
testutil.MustMetric("cpu",
map[string]string{
"customPathTagMyFile": tmpfile,
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0)),
}
plugin := NewTestTail() plugin := NewTestTail()
plugin.Log = testutil.Logger{} plugin.Log = testutil.Logger{}
plugin.FromBeginning = true plugin.FromBeginning = true
plugin.Files = []string{tmpfile.Name()} plugin.Files = []string{tmpfile}
plugin.PathTag = "customPathTagMyFile" plugin.PathTag = "customPathTagMyFile"
plugin.SetParserFunc(func() (telegraf.Parser, error) { plugin.SetParserFunc(func() (telegraf.Parser, error) {
p := &json.Parser{MetricName: "cpu"} p := &json.Parser{MetricName: "cpu"}
err := p.Init() err := p.Init()
return p, err return p, err
}) })
require.NoError(t, plugin.Init())
err = plugin.Init() var acc testutil.Accumulator
require.NoError(t, err) require.NoError(t, plugin.Start(&acc))
acc := testutil.Accumulator{}
err = plugin.Start(&acc)
require.NoError(t, err)
defer plugin.Stop() defer plugin.Stop()
err = plugin.Gather(&acc)
require.NoError(t, err) require.NoError(t, plugin.Gather(&acc))
acc.Wait(2) require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
plugin.Stop() plugin.Stop()
expected := []telegraf.Metric{ testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
testutil.MustMetric("cpu",
map[string]string{
"customPathTagMyFile": tmpfile.Name(),
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0)),
testutil.MustMetric("cpu",
map[string]string{
"customPathTagMyFile": tmpfile.Name(),
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0)),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(),
testutil.IgnoreTime())
} }
func TestCharacterEncoding(t *testing.T) { func TestCharacterEncoding(t *testing.T) {
@ -558,7 +518,7 @@ func TestCharacterEncoding(t *testing.T) {
), ),
} }
watchMethod := defaultWatchMethod watchMethod := "inotify"
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
watchMethod = "poll" watchMethod = "poll"
} }
@ -610,7 +570,7 @@ func TestCharacterEncoding(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
plugin := &Tail{ plugin := &Tail{
Files: []string{filepath.Join(testdataDir, tt.testfiles)}, Files: []string{filepath.Join("testdata", tt.testfiles)},
FromBeginning: tt.fromBeginning, FromBeginning: tt.fromBeginning,
MaxUndeliveredLines: 1000, MaxUndeliveredLines: 1000,
Log: testutil.Logger{}, Log: testutil.Logger{},
@ -618,7 +578,7 @@ func TestCharacterEncoding(t *testing.T) {
WatchMethod: watchMethod, WatchMethod: watchMethod,
} }
plugin.SetParserFunc(NewInfluxParser) plugin.SetParserFunc(newInfluxParser)
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
if tt.offset != 0 { if tt.offset != 0 {
@ -629,7 +589,9 @@ func TestCharacterEncoding(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc)) require.NoError(t, plugin.Start(&acc))
acc.Wait(len(tt.expected)) require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= uint64(len(tt.expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(tt.expected), acc.NMetrics())
plugin.Stop() plugin.Stop()
actual := acc.GetTelegrafMetrics() actual := acc.GetTelegrafMetrics()
@ -645,22 +607,20 @@ func TestCharacterEncoding(t *testing.T) {
func TestTailEOF(t *testing.T) { func TestTailEOF(t *testing.T) {
tmpfile, err := os.CreateTemp("", "") tmpfile, err := os.CreateTemp("", "")
require.NoError(t, err) require.NoError(t, err)
defer tmpfile.Close()
defer os.Remove(tmpfile.Name()) defer os.Remove(tmpfile.Name())
_, err = tmpfile.WriteString("cpu usage_idle=100\r\n") _, err = tmpfile.WriteString("cpu usage_idle=100\r\n")
require.NoError(t, err) require.NoError(t, err)
err = tmpfile.Sync() require.NoError(t, tmpfile.Sync())
require.NoError(t, err)
tt := NewTestTail() tt := NewTestTail()
tt.Log = testutil.Logger{} tt.Log = testutil.Logger{}
tt.FromBeginning = true tt.FromBeginning = true
tt.Files = []string{tmpfile.Name()} tt.Files = []string{tmpfile.Name()}
tt.SetParserFunc(NewInfluxParser) tt.SetParserFunc(newInfluxParser)
require.NoError(t, tt.Init())
err = tt.Init() var acc testutil.Accumulator
require.NoError(t, err)
acc := testutil.Accumulator{}
require.NoError(t, tt.Start(&acc)) require.NoError(t, tt.Start(&acc))
defer tt.Stop() defer tt.Stop()
require.NoError(t, acc.GatherError(tt.Gather)) require.NoError(t, acc.GatherError(tt.Gather))
@ -668,8 +628,7 @@ func TestTailEOF(t *testing.T) {
_, err = tmpfile.WriteString("cpu2 usage_idle=200\r\n") _, err = tmpfile.WriteString("cpu2 usage_idle=200\r\n")
require.NoError(t, err) require.NoError(t, err)
err = tmpfile.Sync() require.NoError(t, tmpfile.Sync())
require.NoError(t, err)
acc.Wait(2) acc.Wait(2)
require.NoError(t, acc.GatherError(tt.Gather)) require.NoError(t, acc.GatherError(tt.Gather))
@ -681,15 +640,14 @@ func TestTailEOF(t *testing.T) {
map[string]interface{}{ map[string]interface{}{
"usage_idle": float64(200), "usage_idle": float64(200),
}) })
require.NoError(t, tmpfile.Close())
err = tmpfile.Close()
require.NoError(t, err)
} }
func TestCSVBehavior(t *testing.T) { func TestCSVBehavior(t *testing.T) {
// Prepare the input file // Prepare the input file
input, err := os.CreateTemp("", "") input, err := os.CreateTemp("", "")
require.NoError(t, err) require.NoError(t, err)
defer input.Close()
defer os.Remove(input.Name()) defer os.Remove(input.Name())
// Write header // Write header
_, err = input.WriteString("a,b\n") _, err = input.WriteString("a,b\n")
@ -759,8 +717,6 @@ func TestCSVBehavior(t *testing.T) {
require.NoError(t, input.Sync()) require.NoError(t, input.Sync())
require.NoError(t, plugin.Gather(&acc)) require.NoError(t, plugin.Gather(&acc))
require.Eventuallyf(t, func() bool { require.Eventuallyf(t, func() bool {
acc.Lock()
defer acc.Unlock()
return acc.NMetrics() >= uint64(len(expected)) return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics()) }, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
@ -776,12 +732,71 @@ func TestCSVBehavior(t *testing.T) {
require.NoError(t, input.Close()) require.NoError(t, input.Close())
} }
func getTestdataDir() string { func TestStatePersistence(t *testing.T) {
dir, err := os.Getwd() // Prepare the input file
if err != nil { lines := []string{
// if we cannot even establish the test directory, further progress is meaningless "metric,tag=value foo=1i 1730478201000000000\n",
panic(err) "metric,tag=value foo=2i 1730478211000000000\n",
"metric,tag=value foo=3i 1730478221000000000\n",
}
content := []byte(strings.Join(lines, ""))
inputFilename := filepath.Join(t.TempDir(), "input.influx")
require.NoError(t, os.WriteFile(inputFilename, content, 0600))
// Define the metrics and state to skip the first metric
state := map[string]int64{inputFilename: int64(len(lines[0]))}
expectedState := map[string]int64{inputFilename: int64(len(content))}
expected := []telegraf.Metric{
metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 2},
time.Unix(1730478211, 0),
),
metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 3},
time.Unix(1730478221, 0),
),
} }
return filepath.Join(dir, "testdata") // Configure the plugin
plugin := &Tail{
Files: []string{inputFilename},
MaxUndeliveredLines: 1000,
offsets: make(map[string]int64, 0),
Log: testutil.Logger{},
}
plugin.SetParserFunc(newInfluxParser)
require.NoError(t, plugin.Init())
require.Empty(t, plugin.offsets)
// Setup the "persisted" state
var pi telegraf.StatefulPlugin = plugin
require.NoError(t, pi.SetState(state))
require.Len(t, plugin.offsets, 1)
// Run the plugin
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
require.NoError(t, plugin.Gather(&acc))
require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond, "Expected %d metrics found %d", len(expected), acc.NMetrics())
plugin.Stop()
// Check the result
options := []cmp.Option{
testutil.SortMetrics(),
testutil.IgnoreTime(),
}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, options...)
// Check getting the persisted state
actualState, ok := pi.GetState().(map[string]int64)
require.True(t, ok, "state is not a map[string]int64")
require.Equal(t, expectedState, actualState)
} }

View File

@ -1,6 +1,7 @@
package dedup package dedup
import ( import (
"fmt"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -457,3 +458,75 @@ func TestTracking(t *testing.T) {
return len(input) == len(delivered) return len(input) == len(delivered)
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected)) }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected))
} }
func TestStatePersistence(t *testing.T) {
now := time.Now()
// Define the metrics and states
state := fmt.Sprintf("metric,tag=value foo=1i %d\n", now.Add(-1*time.Minute).UnixNano())
input := []telegraf.Metric{
metric.New("metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 1},
now.Add(-2*time.Second),
),
metric.New("metric",
map[string]string{"tag": "pass"},
map[string]interface{}{"foo": 1},
now.Add(-1*time.Second),
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 3},
now,
),
}
expected := []telegraf.Metric{
metric.New("metric",
map[string]string{"tag": "pass"},
map[string]interface{}{"foo": 1},
now.Add(-1*time.Second),
),
metric.New(
"metric",
map[string]string{"tag": "value"},
map[string]interface{}{"foo": 3},
now,
),
}
expectedState := []string{
fmt.Sprintf("metric,tag=pass foo=1i %d\n", now.Add(-1*time.Second).UnixNano()),
fmt.Sprintf("metric,tag=value foo=3i %d\n", now.UnixNano()),
}
// Configure the plugin
plugin := &Dedup{
DedupInterval: config.Duration(10 * time.Hour), // use a long interval to avoid flaky tests
FlushTime: now.Add(-1 * time.Second),
Cache: make(map[uint64]telegraf.Metric),
}
require.Empty(t, plugin.Cache)
// Setup the "persisted" state
var pi telegraf.StatefulPlugin = plugin
require.NoError(t, pi.SetState([]byte(state)))
require.Len(t, plugin.Cache, 1)
// Process expected metrics and compare with resulting metrics
actual := plugin.Apply(input...)
testutil.RequireMetricsEqual(t, expected, actual)
// Check getting the persisted state
// Because the cache is a map, the order of metrics in the state is not
// guaranteed, so check the string contents regardless of the order.
actualState, ok := pi.GetState().([]byte)
require.True(t, ok, "state is not a bytes array")
var expectedLen int
for _, m := range expectedState {
require.Contains(t, string(actualState), m)
expectedLen += len(m)
}
require.Len(t, actualState, expectedLen)
}

View File

@ -120,8 +120,7 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err
return nil return nil
} }
func (s *Starlark) Stop() { func (s *Starlark) Stop() {}
}
func containsMetric(metrics []telegraf.Metric, target telegraf.Metric) bool { func containsMetric(metrics []telegraf.Metric, target telegraf.Metric) bool {
for _, m := range metrics { for _, m := range metrics {

View File

@ -3680,13 +3680,13 @@ def apply(metric):
Log: testutil.Logger{}, Log: testutil.Logger{},
}, },
} }
require.NoError(t, plugin.Init())
// Setup the "persisted" state // Setup the "persisted" state
var pi telegraf.StatefulPlugin = plugin var pi telegraf.StatefulPlugin = plugin
var buf bytes.Buffer var buf bytes.Buffer
require.NoError(t, gob.NewEncoder(&buf).Encode(map[string]interface{}{"instance": "myhost"})) require.NoError(t, gob.NewEncoder(&buf).Encode(map[string]interface{}{"instance": "myhost"}))
require.NoError(t, pi.SetState(buf.Bytes())) require.NoError(t, pi.SetState(buf.Bytes()))
require.NoError(t, plugin.Init())
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc)) require.NoError(t, plugin.Start(&acc))