From f87916aaa96368ca6d8a2de3e38a513785a1ee99 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 1 Mar 2023 23:34:48 +0100 Subject: [PATCH] feat: Plugin state-persistence (#12166) --- agent/agent.go | 98 +++++++- cmd/telegraf/agent.conf | 6 + config/config.go | 58 +++-- config/config_test.go | 236 +++++++++++++++++- config/plugin_id.go | 85 +++++++ ...state_persistence_input_all_different.toml | 42 ++++ .../state_persistence_input_all_same.toml | 60 +++++ .../state_persistence_input_store_load.toml | 17 ++ .../state_persistence_processors.toml | 8 + docs/CONFIGURATION.md | 6 + docs/developers/STATE_PERSISTENCE.md | 145 +++++++++++ models/running_aggregator.go | 8 + models/running_input.go | 8 + models/running_output.go | 8 + models/running_processor.go | 8 + persister/persister.go | 103 ++++++++ plugin.go | 29 +++ plugins/inputs/tail/tail.go | 20 +- plugins/inputs/tail/tail_test.go | 7 +- 19 files changed, 926 insertions(+), 26 deletions(-) create mode 100644 config/plugin_id.go create mode 100644 config/testdata/state_persistence_input_all_different.toml create mode 100644 config/testdata/state_persistence_input_all_same.toml create mode 100644 config/testdata/state_persistence_input_store_load.toml create mode 100644 config/testdata/state_persistence_processors.toml create mode 100644 docs/developers/STATE_PERSISTENCE.md create mode 100644 persister/persister.go diff --git a/agent/agent.go b/agent/agent.go index 3add3bb3d..878a155d2 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2,6 +2,7 @@ package agent import ( "context" + "errors" "fmt" "log" "os" @@ -106,11 +107,23 @@ func (a *Agent) Run(ctx context.Context) error { a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval)) log.Printf("D! [agent] Initializing plugins") - err := a.initPlugins() - if err != nil { + if err := a.initPlugins(); err != nil { return err } + if a.Config.Persister != nil { + log.Printf("D! [agent] Initializing plugin states") + if err := a.initPersister(); err != nil { + return err + } + if err := a.Config.Persister.Load(); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + log.Print("I! [agent] State file does not exist... Skip restoring states...") + } + } + startTime := time.Now() log.Printf("D! [agent] Connecting outputs") @@ -183,6 +196,13 @@ func (a *Agent) Run(ctx context.Context) error { wg.Wait() + if a.Config.Persister != nil { + log.Printf("D! [agent] Persisting plugin states") + if err := a.Config.Persister.Store(); err != nil { + return err + } + } + log.Printf("D! [agent] Stopped Successfully") return err } @@ -226,6 +246,80 @@ func (a *Agent) initPlugins() error { return nil } +// initPersister initializes the persister and registers the plugins. +func (a *Agent) initPersister() error { + if err := a.Config.Persister.Init(); err != nil { + return err + } + + for _, input := range a.Config.Inputs { + plugin, ok := input.Input.(telegraf.StatefulPlugin) + if !ok { + continue + } + + name := input.LogName() + id := input.ID() + if err := a.Config.Persister.Register(id, plugin); err != nil { + return fmt.Errorf("could not register input %s: %w", name, err) + } + } + + for _, processor := range a.Config.Processors { + plugin, ok := processor.Processor.(telegraf.StatefulPlugin) + if !ok { + continue + } + + name := processor.LogName() + id := processor.ID() + if err := a.Config.Persister.Register(id, plugin); err != nil { + return fmt.Errorf("could not register processor %s: %w", name, err) + } + } + + for _, aggregator := range a.Config.Aggregators { + plugin, ok := aggregator.Aggregator.(telegraf.StatefulPlugin) + if !ok { + continue + } + + name := aggregator.LogName() + id := aggregator.ID() + if err := a.Config.Persister.Register(id, plugin); err != nil { + return fmt.Errorf("could not register aggregator %s: %w", name, err) + } + } + + for _, processor := range a.Config.AggProcessors { + plugin, ok := processor.Processor.(telegraf.StatefulPlugin) + if !ok { + continue + } + + name := processor.LogName() + id := processor.ID() + if err := a.Config.Persister.Register(id, plugin); err != nil { + return fmt.Errorf("could not register aggregating processor %s: %w", name, err) + } + } + + for _, output := range a.Config.Outputs { + plugin, ok := output.Output.(telegraf.StatefulPlugin) + if !ok { + continue + } + + name := output.LogName() + id := output.ID() + if err := a.Config.Persister.Register(id, plugin); err != nil { + return fmt.Errorf("could not register output %s: %w", name, err) + } + } + + return nil +} + func (a *Agent) startInputs( dst chan<- telegraf.Metric, inputs []*models.RunningInput, diff --git a/cmd/telegraf/agent.conf b/cmd/telegraf/agent.conf index 9e9886b4a..919b63d09 100644 --- a/cmd/telegraf/agent.conf +++ b/cmd/telegraf/agent.conf @@ -88,3 +88,9 @@ ## translates by calling external programs snmptranslate and snmptable, ## or "gosmi" which translates using the built-in gosmi library. # snmp_translator = "netsnmp" + + ## Name of the file to load the state of plugins from and store the state to. + ## If uncommented and not empty, this file will be used to save the state of + ## stateful plugins on termination of Telegraf. If the file exists on start, + ## the state in the file will be restored for the plugins. + # statefile = "" diff --git a/config/config.go b/config/config.go index 8a3bb0881..4ba5ca31c 100644 --- a/config/config.go +++ b/config/config.go @@ -27,6 +27,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/models" + "github.com/influxdata/telegraf/persister" "github.com/influxdata/telegraf/plugins/aggregators" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" @@ -83,6 +84,8 @@ type Config struct { Deprecations map[string][]int64 version *semver.Version + + Persister *persister.Persister } // Ordered plugins used to keep the order in which they appear in a file @@ -243,6 +246,12 @@ type AgentConfig struct { // Method for translating SNMP objects. 'netsnmp' to call external programs, // 'gosmi' to use the built-in library. SnmpTranslator string `toml:"snmp_translator"` + + // Name of the file to load the state of plugins from and store the state to. + // If uncommented and not empty, this file will be used to save the state of + // stateful plugins on termination of Telegraf. If the file exists on start, + // the state in the file will be restored for the plugins. + Statefile string `toml:"statefile"` } // InputNames returns a list of strings of the configured inputs. @@ -522,6 +531,13 @@ func (c *Config) LoadConfigData(data []byte) error { }) } + // Setup the persister if requested + if c.Agent.Statefile != "" { + c.Persister = &persister.Persister{ + Filename: c.Agent.Statefile, + } + } + if len(c.UnusedFields) > 0 { return fmt.Errorf("line %d: configuration specified the fields %q, but they weren't used", tbl.Line, keys(c.UnusedFields)) } @@ -962,25 +978,28 @@ func (c *Config) addProcessor(name string, table *ast.Table) error { c.setLocalMissingTomlFieldTracker(missCount) defer c.resetMissingTomlFieldTracker() - processorConfig, err := c.buildProcessor(name, table) - if err != nil { - return err - } - // Setup the processor running before the aggregators - processorBefore, hasParser, err := c.setupProcessor(processorConfig.Name, creator, table) + processorBeforeConfig, err := c.buildProcessor("processors", name, table) if err != nil { return err } - rf := models.NewRunningProcessor(processorBefore, processorConfig) + processorBefore, hasParser, err := c.setupProcessor(processorBeforeConfig.Name, creator, table) + if err != nil { + return err + } + rf := models.NewRunningProcessor(processorBefore, processorBeforeConfig) c.fileProcessors = append(c.fileProcessors, &OrderedPlugin{table.Line, rf}) // Setup another (new) processor instance running after the aggregator - processorAfter, _, err := c.setupProcessor(processorConfig.Name, creator, table) + processorAfterConfig, err := c.buildProcessor("aggprocessors", name, table) if err != nil { return err } - rf = models.NewRunningProcessor(processorAfter, processorConfig) + processorAfter, _, err := c.setupProcessor(processorAfterConfig.Name, creator, table) + if err != nil { + return err + } + rf = models.NewRunningProcessor(processorAfter, processorAfterConfig) c.fileAggProcessors = append(c.fileAggProcessors, &OrderedPlugin{table.Line, rf}) // Check the number of misses against the threshold @@ -1235,7 +1254,10 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato if err != nil { return conf, err } - return conf, nil + + // Generate an ID for the plugin + conf.ID, err = generatePluginID("aggregators."+name, tbl) + return conf, err } // buildParser parses Parser specific items from the ast.Table, @@ -1256,7 +1278,7 @@ func (c *Config) buildParser(name string, tbl *ast.Table) *models.ParserConfig { // buildProcessor parses Processor specific items from the ast.Table, // builds the filter and returns a // models.ProcessorConfig to be inserted into models.RunningProcessor -func (c *Config) buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) { +func (c *Config) buildProcessor(category, name string, tbl *ast.Table) (*models.ProcessorConfig, error) { conf := &models.ProcessorConfig{Name: name} c.getFieldInt64(tbl, "order", &conf.Order) @@ -1271,7 +1293,10 @@ func (c *Config) buildProcessor(name string, tbl *ast.Table) (*models.ProcessorC if err != nil { return conf, err } - return conf, nil + + // Generate an ID for the plugin + conf.ID, err = generatePluginID(category+"."+name, tbl) + return conf, err } // buildFilter builds a Filter @@ -1339,7 +1364,10 @@ func (c *Config) buildInput(name string, tbl *ast.Table) (*models.InputConfig, e if err != nil { return cp, err } - return cp, nil + + // Generate an ID for the plugin + cp.ID, err = generatePluginID("inputs."+name, tbl) + return cp, err } // buildSerializer grabs the necessary entries from the ast.Table for creating @@ -1427,7 +1455,9 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, return nil, c.firstErr() } - return oc, nil + // Generate an ID for the plugin + oc.ID, err = generatePluginID("outputs."+name, tbl) + return oc, err } func (c *Config) missingTomlField(_ reflect.Type, key string) error { diff --git a/config/config_test.go b/config/config_test.go index a71a39a0b..9cb58a37b 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -21,6 +21,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/models" + "github.com/influxdata/telegraf/persister" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/outputs" @@ -91,9 +92,10 @@ func TestConfig_LoadSingleInputWithEnvVars(t *testing.T) { } inputConfig.Tags = make(map[string]string) - // Ignore Log and Parser + // Ignore Log, Parser and ID c.Inputs[0].Input.(*MockupInputPlugin).Log = nil c.Inputs[0].Input.(*MockupInputPlugin).parser = nil + c.Inputs[0].Config.ID = "" require.Equal(t, input, c.Inputs[0].Input, "Testdata did not produce a correct mockup struct.") require.Equal(t, inputConfig, c.Inputs[0].Config, "Testdata did not produce correct input metadata.") } @@ -131,9 +133,10 @@ func TestConfig_LoadSingleInput(t *testing.T) { } inputConfig.Tags = make(map[string]string) - // Ignore Log and Parser + // Ignore Log, Parser and ID c.Inputs[0].Input.(*MockupInputPlugin).Log = nil c.Inputs[0].Input.(*MockupInputPlugin).parser = nil + c.Inputs[0].Config.ID = "" require.Equal(t, input, c.Inputs[0].Input, "Testdata did not produce a correct memcached struct.") require.Equal(t, inputConfig, c.Inputs[0].Config, "Testdata did not produce correct memcached metadata.") } @@ -258,6 +261,9 @@ func TestConfig_LoadDirectory(t *testing.T) { input.parser = nil expectedPlugins[i].parser = nil + // Ignore the ID + plugin.Config.ID = "" + require.Equalf(t, expectedPlugins[i], plugin.Input, "Plugin %d: incorrect struct produced", i) require.Equalf(t, expectedConfigs[i], plugin.Config, "Plugin %d: incorrect config produced", i) } @@ -964,6 +970,151 @@ func TestConfig_ProcessorsWithParsers(t *testing.T) { } } +func TestConfigPluginIDsDifferent(t *testing.T) { + c := NewConfig() + c.Agent.Statefile = "/dev/null" + require.NoError(t, c.LoadConfig("./testdata/state_persistence_input_all_different.toml")) + require.NotEmpty(t, c.Inputs) + + // Compare generated IDs + for i, pi := range c.Inputs { + refid := pi.Config.ID + require.NotEmpty(t, refid) + + // Cross-comparison + for j, pj := range c.Inputs { + testid := pj.Config.ID + if i == j { + require.Equal(t, refid, testid) + continue + } + require.NotEqualf(t, refid, testid, "equal for %d, %d", i, j) + } + } +} + +func TestConfigPluginIDsSame(t *testing.T) { + c := NewConfig() + c.Agent.Statefile = "/dev/null" + require.NoError(t, c.LoadConfig("./testdata/state_persistence_input_all_same.toml")) + require.NotEmpty(t, c.Inputs) + + // Compare generated IDs + for i, pi := range c.Inputs { + refid := pi.Config.ID + require.NotEmpty(t, refid) + + // Cross-comparison + for j, pj := range c.Inputs { + testid := pj.Config.ID + require.Equal(t, refid, testid, "not equal for %d, %d", i, j) + } + } +} + +func TestPersisterInputStoreLoad(t *testing.T) { + // Reserve a temporary state file + file, err := os.CreateTemp("", "telegraf_state-*.json") + require.NoError(t, err) + filename := file.Name() + require.NoError(t, file.Close()) + defer os.Remove(filename) + + // Load the plugins + cstore := NewConfig() + require.NoError(t, cstore.LoadConfig("testdata/state_persistence_input_store_load.toml")) + + // Initialize the persister for storing the state + persisterStore := persister.Persister{ + Filename: filename, + } + require.NoError(t, persisterStore.Init()) + + expected := make(map[string]interface{}) + for i, plugin := range cstore.Inputs { + require.NoError(t, plugin.Init()) + + // Register + p := plugin.Input.(*MockupStatePlugin) + require.NoError(t, persisterStore.Register(plugin.ID(), p)) + + // Change the state + p.state.Name += "_" + strings.Repeat("a", i+1) + p.state.Version++ + p.state.Offset += uint64(i + 1) + p.state.Bits = append(p.state.Bits, len(p.state.Bits)) + p.state.Modified, _ = time.Parse(time.RFC3339, "2022-11-03T16:49:00+02:00") + + // Store the state for later comparison + expected[plugin.ID()] = p.GetState() + } + + // Write state + require.NoError(t, persisterStore.Store()) + + // Load the plugins + cload := NewConfig() + require.NoError(t, cload.LoadConfig("testdata/state_persistence_input_store_load.toml")) + require.Len(t, cload.Inputs, len(expected)) + + // Initialize the persister for loading the state + persisterLoad := persister.Persister{ + Filename: filename, + } + require.NoError(t, persisterLoad.Init()) + + for _, plugin := range cload.Inputs { + require.NoError(t, plugin.Init()) + + // Register + p := plugin.Input.(*MockupStatePlugin) + require.NoError(t, persisterLoad.Register(plugin.ID(), p)) + + // Check that the states are not yet restored + require.NotNil(t, expected[plugin.ID()]) + require.NotEqual(t, expected[plugin.ID()], p.GetState()) + } + + // Restore states + require.NoError(t, persisterLoad.Load()) + + // Check we got what we saved. + for _, plugin := range cload.Inputs { + p := plugin.Input.(*MockupStatePlugin) + require.Equal(t, expected[plugin.ID()], p.GetState()) + } +} + +func TestPersisterProcessorRegistration(t *testing.T) { + // Load the plugins + c := NewConfig() + require.NoError(t, c.LoadConfig("testdata/state_persistence_processors.toml")) + require.NotEmpty(t, c.Processors) + require.NotEmpty(t, c.AggProcessors) + + // Initialize the persister for test + dut := persister.Persister{ + Filename: "/tmp/doesn_t_matter.json", + } + require.NoError(t, dut.Init()) + + // Register the processors + for _, plugin := range c.Processors { + unwrapped := plugin.Processor.(unwrappable).Unwrap() + + p := unwrapped.(*MockupProcessorPlugin) + require.NoError(t, dut.Register(plugin.ID(), p)) + } + + // Register the after-aggregator processors + for _, plugin := range c.AggProcessors { + unwrapped := plugin.Processor.(unwrappable).Unwrap() + + p := unwrapped.(*MockupProcessorPlugin) + require.NoError(t, dut.Register(plugin.ID(), p)) + } +} + /*** Mockup INPUT plugin for (old) parser testing to avoid cyclic dependencies ***/ type MockupInputPluginParserOld struct { Parser parsers.Parser @@ -1089,7 +1240,10 @@ func (m *MockupProcessorPluginParser) SetParserFunc(f telegraf.ParserFunc) { } /*** Mockup PROCESSOR plugin without parser ***/ -type MockupProcessorPlugin struct{} +type MockupProcessorPlugin struct { + Option string `toml:"option"` + state []uint64 +} func (m *MockupProcessorPlugin) Start(_ telegraf.Accumulator) error { return nil @@ -1105,6 +1259,18 @@ func (m *MockupProcessorPlugin) Apply(_ ...telegraf.Metric) []telegraf.Metric { func (m *MockupProcessorPlugin) Add(_ telegraf.Metric, _ telegraf.Accumulator) error { return nil } +func (m *MockupProcessorPlugin) GetState() interface{} { + return m.state +} +func (m *MockupProcessorPlugin) SetState(state interface{}) error { + s, ok := state.([]uint64) + if !ok { + return fmt.Errorf("invalid state type %T", state) + } + m.state = s + + return nil +} /*** Mockup PROCESSOR plugin with parser ***/ type MockupProcessorPluginParserOnly struct { @@ -1175,6 +1341,64 @@ func (m *MockupOuputPlugin) Write(_ []telegraf.Metric) error { return nil } +/*** Mockup INPUT plugin with state for testing to avoid cyclic dependencies ***/ +type MockupState struct { + Name string + Version uint64 + Offset uint64 + Bits []int + Modified time.Time +} + +type MockupStatePluginSettings struct { + Name string `toml:"name"` + Factor float64 `toml:"factor"` + Enabled bool `toml:"enabled"` + BitField []int `toml:"bits"` +} + +type MockupStatePlugin struct { + Servers []string `toml:"servers"` + Method string `toml:"method"` + Settings map[string]string `toml:"params"` + Port int `toml:"port"` + Setups []MockupStatePluginSettings `toml:"setup"` + state MockupState +} + +func (m *MockupStatePlugin) Init() error { + t0, _ := time.Parse(time.RFC3339, "2021-04-24T23:42:00+02:00") + m.state = MockupState{ + Name: "mockup", + Bits: []int{}, + Modified: t0, + } + + return nil +} + +func (m *MockupStatePlugin) GetState() interface{} { + return m.state +} + +func (m *MockupStatePlugin) SetState(state interface{}) error { + s, ok := state.(MockupState) + if !ok { + return fmt.Errorf("invalid state type %T", state) + } + m.state = s + + return nil +} + +func (m *MockupStatePlugin) SampleConfig() string { + return "Mockup test plugin" +} + +func (m *MockupStatePlugin) Gather(_ telegraf.Accumulator) error { + return nil +} + // Register the mockup plugin on loading func init() { // Register the mockup input plugin for the required names @@ -1205,6 +1429,9 @@ func init() { inputs.Add("procstat", func() telegraf.Input { return &MockupInputPlugin{} }) + inputs.Add("statetest", func() telegraf.Input { + return &MockupStatePlugin{} + }) // Register the mockup processor plugin for the required names processors.Add("parser_test", func() telegraf.Processor { @@ -1219,6 +1446,9 @@ func init() { processors.Add("processor_parserfunc", func() telegraf.Processor { return &MockupProcessorPluginParserFunc{} }) + processors.Add("statetest", func() telegraf.Processor { + return &MockupProcessorPlugin{} + }) // Register the mockup output plugin for the required names outputs.Add("azure_monitor", func() telegraf.Output { diff --git a/config/plugin_id.go b/config/plugin_id.go new file mode 100644 index 000000000..2f61cce5e --- /dev/null +++ b/config/plugin_id.go @@ -0,0 +1,85 @@ +package config + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "sort" + + "github.com/influxdata/toml/ast" +) + +type keyValuePair struct { + Key string + Value string +} + +func processTable(parent string, table *ast.Table) ([]keyValuePair, error) { + var prefix string + var options []keyValuePair + + if parent != "" { + prefix = parent + "." + } + + for k, value := range table.Fields { + switch v := value.(type) { + case *ast.KeyValue: + key := prefix + k + options = append(options, keyValuePair{ + Key: key, + Value: v.Value.Source(), + }) + case *ast.Table: + key := prefix + k + childs, err := processTable(key, v) + if err != nil { + return nil, fmt.Errorf("parsing table for %q failed: %v", key, err) + } + options = append(options, childs...) + case []*ast.Table: + for i, t := range v { + key := fmt.Sprintf("%s#%d.%s", prefix, i, k) + childs, err := processTable(key, t) + if err != nil { + return nil, fmt.Errorf("parsing table for %q #%d failed: %v", key, i, err) + } + options = append(options, childs...) + } + default: + return nil, fmt.Errorf("unknown node type %T in key %q", value, prefix+k) + } + } + return options, nil +} + +func generatePluginID(prefix string, table *ast.Table) (string, error) { + // We need to ensure that identically configured plugins _always_ + // result in the same ID no matter which order the options are specified. + // This is even more relevant as Golang does _not_ give any guarantee + // on the ordering of maps. + // So we flatten out the configuration options (also for nested objects) + // and then sort the resulting array by the canonical key-name. + cfg, err := processTable("", table) + if err != nil { + return "", fmt.Errorf("processing AST failed: %w", err) + } + sort.SliceStable(cfg, func(i, j int) bool { return cfg[i].Key < cfg[j].Key }) + + // Hash the config options to get the ID. We also prefix the ID with + // the plugin name to prevent overlap with other plugin types. + hash := sha256.New() + if _, err := hash.Write(append([]byte(prefix), 0)); err != nil { + return "", fmt.Errorf("hashing name failed: %w", err) + } + for _, kv := range cfg { + if _, err := hash.Write([]byte(kv.Key + ":" + kv.Value)); err != nil { + return "", fmt.Errorf("hashing entry %q failed: %w", kv.Key, err) + } + if _, err := hash.Write([]byte{0}); err != nil { + return "", fmt.Errorf("adding option end marker failed: %w", err) + } + } + + return hex.EncodeToString(hash.Sum(nil)), nil +} diff --git a/config/testdata/state_persistence_input_all_different.toml b/config/testdata/state_persistence_input_all_different.toml new file mode 100644 index 000000000..b8b38dea7 --- /dev/null +++ b/config/testdata/state_persistence_input_all_different.toml @@ -0,0 +1,42 @@ +[[inputs.statetest]] + +[[inputs.statetest]] + servers = ["myserver.com", "myserver.org"] + +[[inputs.statetest]] + servers = ["myserver.org", "myserver.com"] + +[[inputs.statetest]] + servers = ["myserver.org", "myserver.com"] + port = 0 + +[[inputs.statetest]] + servers = ["myserver.org", "myserver.com"] + port = 80 + method = "strange" + [inputs.statetest.params] + a = "foo" + b = "bar" + +[[inputs.statetest]] + servers = ["myserver.org", "myserver.com"] + port = 80 + method = "strange" + setup = [ + {name="alpha", factor=3.1415, enabled=true, bits=[1,2,3]} + ] + [inputs.statetest.params] + a = "foo" + b = "bar" + +[[inputs.statetest]] + servers = ["myserver.org", "myserver.com"] + port = 80 + method = "strange" + setup = [ + {name="alpha", factor=3.1415, enabled=true, bits=[1,2,3]}, + {name="beta", factor=2.71828, enabled=true, bits=[1,2,3]} + ] + [inputs.statetest.params] + a = "foo" + b = "bar" diff --git a/config/testdata/state_persistence_input_all_same.toml b/config/testdata/state_persistence_input_all_same.toml new file mode 100644 index 000000000..7ad5db8ed --- /dev/null +++ b/config/testdata/state_persistence_input_all_same.toml @@ -0,0 +1,60 @@ +[[inputs.statetest]] + servers = ["myserver.org", "myserver.com"] + port = 80 + method = "strange" + setup = [ + {name="alpha", factor=3.1415, enabled=true, bits=[1,2,3]}, + {name="beta", factor=2.71828, enabled=true, bits=[1,2,3]} + ] + [inputs.statetest.params] + a = "foo" + b = "bar" + +[[inputs.statetest]] + ## What a wounderful world... + servers = ["myserver.org", "myserver.com"] + port = 80 + method = "strange" + setup = [ + {name="alpha", factor=3.1415, enabled=true, bits=[1,2,3]}, + {name="beta", factor=2.71828, enabled=true, bits=[1,2,3]} + ] + [inputs.statetest.params] + a = "foo" + b = "bar" + +[[inputs.statetest]] + servers = ["myserver.org", "myserver.com"] + method = "strange" + setup = [ + {name="alpha", factor=3.1415, enabled=true, bits=[1,2,3]}, + {name="beta", factor=2.71828, enabled=true, bits=[1,2,3]} + ] + port = 80 + [inputs.statetest.params] + a = "foo" + b = "bar" + +[[inputs.statetest]] + servers = ["myserver.org", "myserver.com"] + port = 80 + method = "strange" + setup = [ + {name="alpha", factor=3.1415, enabled=true, bits=[1,2,3]}, + {name="beta", factor=2.71828, enabled=true, bits=[1,2,3]} + ] + [inputs.statetest.params] + b = "bar" + a = "foo" + +[[inputs.statetest]] + method = "strange" + servers = ["myserver.org", "myserver.com"] + port = 80 + setup = [ + {name="alpha", factor=3.1415, enabled=true, bits=[1,2,3]}, + {name="beta", factor=2.71828, enabled=true, bits=[1,2,3]} + ] + [inputs.statetest.params] + a = "foo" + b = "bar" diff --git a/config/testdata/state_persistence_input_store_load.toml b/config/testdata/state_persistence_input_store_load.toml new file mode 100644 index 000000000..62983afc1 --- /dev/null +++ b/config/testdata/state_persistence_input_store_load.toml @@ -0,0 +1,17 @@ +[[inputs.statetest]] + servers = ["myserverA.org"] + port = 42 + method = "strange" + +[[inputs.statetest]] + servers = ["myserverB.org"] + port = 23 + method = "strange" + +[[inputs.statetest]] + servers = ["myserverC.org"] + port = 80 + method = "strange" + [inputs.statetest.params] + a = "foo" + b = "bar" diff --git a/config/testdata/state_persistence_processors.toml b/config/testdata/state_persistence_processors.toml new file mode 100644 index 000000000..4e9a4f36f --- /dev/null +++ b/config/testdata/state_persistence_processors.toml @@ -0,0 +1,8 @@ +[[processors.statetest]] + option = "foo" + +[[processors.statetest]] + option = "bar" + +[[processors.statetest]] + option = "captain obvious" diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index cae623fb1..d70108eb4 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -308,6 +308,12 @@ The agent table configures Telegraf and the defaults used across all plugins. translates by calling external programs `snmptranslate` and `snmptable`, or "gosmi" which translates using the built-in gosmi library. +- **statefile**: + Name of the file to load the states of plugins from and store the states to. + If uncommented and not empty, this file will be used to save the state of + stateful plugins on termination of Telegraf. If the file exists on start, + the state in the file will be restored for the plugins. + ## Plugins Telegraf plugins are divided into 4 types: [inputs][], [outputs][], diff --git a/docs/developers/STATE_PERSISTENCE.md b/docs/developers/STATE_PERSISTENCE.md new file mode 100644 index 000000000..7cdaaad0d --- /dev/null +++ b/docs/developers/STATE_PERSISTENCE.md @@ -0,0 +1,145 @@ +# State-persistence for plugins + +## Purpose + +Plugin state-persistence allows a plugin to save its state across restarts of +Telegraf. This might be necessary if data-input (or output) is stateful and +depends on the result of a previous operation. + +If you for example query data from a service providing a `next` token, your +plugin would need to know the last token received in order to make the next +query. However, this token is lost after a restart of Telegraf if not persisted +and thus your only chance is to restart the query chain potentially resulting +in handling redundant data producing unnecessary traffic. + +This is where state-persistence comes into play. The state-persistence framework +allows your plugin to store a _state_ on shutdown and load that _state_ again +on startup of Telegraf. + +## State format + +The _state_ of a plugin can be any structure or datatype that is serializable +using Golang's JSON serializer. It can be a key-value map or a more complex +structure. E.g. + +```go +type MyState struct { + CurrentToken string + LastToken string + NextToken string + FilterIDs []int64 +} +``` + +would represent a valid state. + +## Implementation + +To enable state-persistence in your plugin you need to implement the +`StatefulPlugin` interface defined in `plugin.go`. The interface looks as +follows: + +```go +type StatefulPlugin interface { + GetState() interface{} + SetState(state interface{}) error +} +``` + +The `GetState()` function should return the current state of the plugin +(see [state format](#state-format)). Please note that this function should +_always_ succeed and should always be callable directly after `Init()`. So make +sure your relevant data-structures are initialized in `Init` to prevent panics. + +Telegraf will call the `GetState()` function on shutdown and will then compile +an overall Telegraf state from the information of all stateful plugins. This +state is then persisted to disk if (and only if) the `statefile` option in the +`agent` section is set. You do _not_ need take care of any serialization or +writing, Telegraf will handle this for you. + +When starting Telegraf, the overall persisted Telegraf state will be restored, +if `statefile` is set. To do so, the `SetState()` function is called with the +deserialized state of the plugin. Please note that this function is called +directly _after_ the `Init()` function of your plugin. You need to make sure +that the given state is what you expect using a type-assertion! Make sure this +won't panic but rather return a meaningful error. + +To assign the state to the correct plugin, Telegraf relies on a plugin ID. +See the ["State assignment" section](#state-assignment) for more details on +the procedure and ["Plugin Identifier" section](#plugin-identifier) for more +details on ID generation. + +## State assignment + +When restoring the state on loading, Telegraf needs to ensure that each plugin +_instance_ gets the correct state. To do so, a plugin ID is used. By default +this ID is generated automatically for each plugin instance but can be +overwritten if necessary (see [Plugin Identifier](#plugin-identifier)). + +State assignment needs to be able to handle multiple instances of the same +plugin type correctly, e.g. if the user has configured multiple instances of +your plugin with different `server` settings. Here, the state saved for +`foo.example.com` needs to be restored to the plugin instance handling +`foo.example.com` on next startup of Telegraf and should _not_ end up at server +`bar.example.com`. So the plugin identifier used for the assignment should be +consistent over restarts of Telegraf. + +In case plugin instances are added to the configuration between restarts, no +state is restored _for those instances_. Furthermore, all states referencing +plugin identifier that are no-longer valid are dropped and will be ignored. This +can happen in case plugin instances are removed or changed in ID. + +## Plugin Identifier + +As outlined above, the plugin identifier (plugin ID) is crucial when assigning +states to plugin instances. By default, Telegraf will automatically generate an +identifier for each plugin configured when starting up. The ID is consistent +over restarts of Telegraf and is based on the _entire configuration_ of the +plugin. This means for each plugin instance, all settings in the configuration +will be concatenated and hashed to derive the ID. The resulting ID will then be +used in both save and restore operations making sure the state ends up in a +plugin with _exactly_ the same configuration that created the state. + +However, this also means that the plugin identifier _is changing_ whenever _any_ +of the configuration setting is changed! For example if your plugin is defined +as + +```go +type MyPlugin struct { + Server string `toml:"server"` + Token string `toml:"token"` + Timeout config.Duration `toml:"timeout"` + + offset int +} +``` + +with `offset` being your state, the plugin ID will change if a user changes the +`timeout` setting in the configuration file. As a consequence the state cannot +be restored. This might be undesirable for your plugin, therefore you can +overwrite the ID generation by implementing the `PluginWithID` interface (see +`plugin.go`). This interface defines a `ID() string` function returning the +identifier o the current plugin _instance_. When implementing this function you +should take the following criteria into account: + +1. The identifier has to be _unique_ for your plugin _instance_ (not only for + the plugin type) to make sure the state is assigned to the correct instance. +1. The identifier has to be _consistent_ across startups/restarts of Telegraf + as otherwise the state cannot be restored. Make sure the order of + configuration settings doesn't matter. +1. Make sure to _include all settings relevant for state assignment_. In + the example above, the plugin's `token` setting might or might not be + relevant to identify the plugin instance. +1. Make sure to _leave out all settings irrelevant for state assignment_. In + the example above, the plugin's `timeout` setting likely is not relevant + for the state and can be left out. + +Which settings are relevant for the state are plugin specific. For example, if +the `offset` is a property of the _server_ the `token` setting is irrelevant. +However, if the `offset` is specific for a certain user suddenly the `token` +setting is relevant. + +Alternatively to generating an identifier automatically, the plugin can allow +the user to specify that ID directly in a configuration setting. However, please +not that this might lead to colliding IDs in larger setups and should thus be +avoided. diff --git a/models/running_aggregator.go b/models/running_aggregator.go index 9abb29f21..930906125 100644 --- a/models/running_aggregator.go +++ b/models/running_aggregator.go @@ -68,6 +68,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf type AggregatorConfig struct { Name string Alias string + ID string DropOriginal bool Period time.Duration Delay time.Duration @@ -94,6 +95,13 @@ func (r *RunningAggregator) Init() error { return nil } +func (r *RunningAggregator) ID() string { + if p, ok := r.Aggregator.(telegraf.PluginWithID); ok { + return p.ID() + } + return r.Config.ID +} + func (r *RunningAggregator) Period() time.Duration { return r.Config.Period } diff --git a/models/running_input.go b/models/running_input.go index 16f4bd10b..5684e3801 100644 --- a/models/running_input.go +++ b/models/running_input.go @@ -58,6 +58,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput { type InputConfig struct { Name string Alias string + ID string Interval time.Duration CollectionJitter time.Duration CollectionOffset time.Duration @@ -88,6 +89,13 @@ func (r *RunningInput) Init() error { return nil } +func (r *RunningInput) ID() string { + if p, ok := r.Input.(telegraf.PluginWithID); ok { + return p.ID() + } + return r.Config.ID +} + func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric { if ok := r.Config.Filter.Select(metric); !ok { r.metricFiltered(metric) diff --git a/models/running_output.go b/models/running_output.go index 3b85649fc..f9d9019d9 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -21,6 +21,7 @@ const ( type OutputConfig struct { Name string Alias string + ID string Filter Filter FlushInterval time.Duration @@ -128,6 +129,13 @@ func (r *RunningOutput) Init() error { return nil } +func (r *RunningOutput) ID() string { + if p, ok := r.Output.(telegraf.PluginWithID); ok { + return p.ID() + } + return r.Config.ID +} + // AddMetric adds a metric to the output. // // Takes ownership of metric diff --git a/models/running_processor.go b/models/running_processor.go index 0e4385741..5053da587 100644 --- a/models/running_processor.go +++ b/models/running_processor.go @@ -24,6 +24,7 @@ func (rp RunningProcessors) Less(i, j int) bool { return rp[i].Config.Order < rp type ProcessorConfig struct { Name string Alias string + ID string Order int64 Filter Filter } @@ -62,6 +63,13 @@ func (rp *RunningProcessor) Init() error { return nil } +func (rp *RunningProcessor) ID() string { + if p, ok := rp.Processor.(telegraf.PluginWithID); ok { + return p.ID() + } + return rp.Config.ID +} + func (rp *RunningProcessor) Log() telegraf.Logger { return rp.log } diff --git a/persister/persister.go b/persister/persister.go new file mode 100644 index 000000000..38ef5e804 --- /dev/null +++ b/persister/persister.go @@ -0,0 +1,103 @@ +package persister + +import ( + "encoding/json" + "fmt" + "os" + "reflect" + + "github.com/influxdata/telegraf" +) + +type Persister struct { + Filename string + + register map[string]telegraf.StatefulPlugin +} + +func (p *Persister) Init() error { + p.register = make(map[string]telegraf.StatefulPlugin) + + return nil +} + +func (p *Persister) Register(id string, plugin telegraf.StatefulPlugin) error { + if _, found := p.register[id]; found { + return fmt.Errorf("plugin with ID %q already registered", id) + } + p.register[id] = plugin + + return nil +} + +func (p *Persister) Load() error { + // Read the states from disk + in, err := os.ReadFile(p.Filename) + if err != nil { + return fmt.Errorf("reading states file failed: %w", err) + } + + // Unmarshal the id to serialized states map + var states map[string][]byte + if err := json.Unmarshal(in, &states); err != nil { + return fmt.Errorf("unmarshalling states failed: %w", err) + } + + // Get the initialized state as blueprint for unmarshalling + for id, serialized := range states { + // Check if we have a plugin with that ID + plugin, found := p.register[id] + if !found { + continue + } + + // Create a new empty state of the "state"-type. As we need a pointer + // of the state, we cannot dereference it here due to the unknown + // nature of the state-type. + nstate := reflect.New(reflect.TypeOf(plugin.GetState())).Interface() + if err := json.Unmarshal(serialized, &nstate); err != nil { + return fmt.Errorf("unmarshalling state for %q failed: %w", id, err) + } + state := reflect.ValueOf(nstate).Elem().Interface() + + // Set the state in the plugin + if err := plugin.SetState(state); err != nil { + return fmt.Errorf("setting state of %q failed: %w", id, err) + } + } + + return nil +} + +func (p *Persister) Store() error { + states := make(map[string][]byte) + + // Collect the states and serialize the individual data chunks + // to later serialize all items in the id / serialized-states map + for id, plugin := range p.register { + state, err := json.Marshal(plugin.GetState()) + if err != nil { + return fmt.Errorf("marshalling state for id %q failed: %w", id, err) + } + states[id] = state + } + + // Serialize the states + serialized, err := json.Marshal(states) + if err != nil { + return fmt.Errorf("marshalling states failed: %w", err) + } + + // Write the states to disk + f, err := os.Create(p.Filename) + if err != nil { + return fmt.Errorf("creating states file %q failed: %w", p.Filename, err) + } + defer f.Close() + + if _, err := f.Write(serialized); err != nil { + return fmt.Errorf("writing states failed: %w", err) + } + + return nil +} diff --git a/plugin.go b/plugin.go index d20d057b5..9dff48a42 100644 --- a/plugin.go +++ b/plugin.go @@ -52,6 +52,35 @@ type PluginDescriber interface { SampleConfig() string } +// PluginWithID allows a plugin to overwrite its identifier of the plugin +// instance by a user specified value. By default the ID is generated +// using the plugin's configuration. +type PluginWithID interface { + // ID returns the ID of the plugin instance. This function has to be + // callable directly after the plugin's Init() function if there is any! + ID() string +} + +// StatefulPlugin contains the functions that plugins must implement to +// persist an internal state across Telegraf runs. +// Note that plugins may define a persister that is not part of the +// interface, but can be used to trigger state updates by the plugin if +// it exists in the plugin struct, +// eg: Persister telegraf.StatePersister `toml:"-"` +type StatefulPlugin interface { + // GetState returns the current state of the plugin to persist + // The returned state can be of any time as long as it can be + // serialized to JSON. The best choice is a structure defined in + // your plugin. + // Note: This function has to be callable directly after the + // plugin's Init() function if there is any! + GetState() interface{} + + // SetState is called by the Persister once after loading and + // initialization (after Init() function). + SetState(state interface{}) error +} + // Logger defines an plugin-related interface for logging. type Logger interface { // Errorf logs an error message, patterned after log.Printf. diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index d44d3bb7c..52f1f98b2 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -99,12 +99,29 @@ func (t *Tail) Init() error { t.filterColors = true } } + // init offsets + t.offsets = make(map[string]int64) var err error t.decoder, err = encoding.NewDecoder(t.CharacterEncoding) return err } +func (t *Tail) GetState() interface{} { + return t.offsets +} + +func (t *Tail) SetState(state interface{}) error { + offsetsState, ok := state.(map[string]int64) + if !ok { + return errors.New("state has to be of type 'map[string]int64'") + } + for k, v := range offsetsState { + t.offsets[k] = v + } + return nil +} + func (t *Tail) Gather(_ telegraf.Accumulator) error { return t.tailNewFiles(true) } @@ -138,8 +155,6 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { err = t.tailNewFiles(t.FromBeginning) - // clear offsets - t.offsets = make(map[string]int64) // assumption that once Start is called, all parallel plugins have already been initialized offsetsMutex.Lock() offsets = make(map[string]int64) @@ -356,6 +371,7 @@ func (t *Tail) Stop() { offset, err := tailer.Tell() if err == nil { t.Log.Debugf("Recording offset %d for %q", offset, tailer.Filename) + t.offsets[tailer.Filename] = offset } else { t.Log.Errorf("Recording offset for %q: %s", tailer.Filename, err.Error()) } diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 303a069f3..508b9871d 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -620,6 +620,7 @@ func TestCharacterEncoding(t *testing.T) { } plugin.SetParserFunc(NewInfluxParser) + require.NoError(t, plugin.Init()) if tt.offset != 0 { plugin.offsets = map[string]int64{ @@ -627,12 +628,8 @@ func TestCharacterEncoding(t *testing.T) { } } - err := plugin.Init() - require.NoError(t, err) - var acc testutil.Accumulator - err = plugin.Start(&acc) - require.NoError(t, err) + require.NoError(t, plugin.Start(&acc)) acc.Wait(len(tt.expected)) plugin.Stop()