From 309c195e03974fc03dd9c90ba4fa5539c83da2e8 Mon Sep 17 00:00:00 2001 From: Chase Sterling Date: Thu, 12 Oct 2023 17:07:13 -0400 Subject: [PATCH] feat(processors.dedup): Add state persistence between runs (#14065) --- agent/agent.go | 23 ++++++++++++++++------- plugins/processors/dedup/README.md | 2 ++ plugins/processors/dedup/dedup.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 4d5a2bfa4..9beb5ba38 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/snmp" "github.com/influxdata/telegraf/models" + "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/serializers/influx" ) @@ -265,9 +266,17 @@ func (a *Agent) initPersister() error { } for _, processor := range a.Config.Processors { - plugin, ok := processor.Processor.(telegraf.StatefulPlugin) - if !ok { - continue + var plugin telegraf.StatefulPlugin + if p, ok := processor.Processor.(processors.HasUnwrap); ok { + plugin, ok = p.Unwrap().(telegraf.StatefulPlugin) + if !ok { + continue + } + } else { + plugin, ok = processor.Processor.(telegraf.StatefulPlugin) + if !ok { + continue + } } name := processor.LogName() @@ -600,17 +609,17 @@ func (a *Agent) gatherOnce( // processors. If an error occurs any started processors are Stopped. func (a *Agent) startProcessors( dst chan<- telegraf.Metric, - processors models.RunningProcessors, + runningProcessors models.RunningProcessors, ) (chan<- telegraf.Metric, []*processorUnit, error) { var src chan telegraf.Metric - units := make([]*processorUnit, 0, len(processors)) + units := make([]*processorUnit, 0, len(runningProcessors)) // The processor chain is constructed from the output side starting from // the output(s) and walking the way back to the input(s). However, the // processor-list is sorted by order and/or by appearance in the config, // i.e. in input-to-output direction. Therefore, reverse the processor list // to reflect the order/definition order in the processing chain. - for i := len(processors) - 1; i >= 0; i-- { - processor := processors[i] + for i := len(runningProcessors) - 1; i >= 0; i-- { + processor := runningProcessors[i] src = make(chan telegraf.Metric, 100) acc := NewAccumulator(processor, dst) diff --git a/plugins/processors/dedup/README.md b/plugins/processors/dedup/README.md index e0a4000b0..61320d022 100644 --- a/plugins/processors/dedup/README.md +++ b/plugins/processors/dedup/README.md @@ -1,6 +1,8 @@ # Dedup Processor Plugin Filter metrics whose field values are exact repetitions of the previous values. +This plugin will store its state between runs if the `statefile` option in the +agent config section is set. ## Global configuration options diff --git a/plugins/processors/dedup/dedup.go b/plugins/processors/dedup/dedup.go index 866363651..dc1c10a93 100644 --- a/plugins/processors/dedup/dedup.go +++ b/plugins/processors/dedup/dedup.go @@ -3,11 +3,14 @@ package dedup import ( _ "embed" + "fmt" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/processors" + influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx" ) //go:embed sample.conf @@ -117,6 +120,32 @@ func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric { return metrics } +func (d *Dedup) GetState() interface{} { + s := &influxSerializer.Serializer{} + v := make([]telegraf.Metric, 0, len(d.Cache)) + for _, value := range d.Cache { + v = append(v, value) + } + state, _ := s.SerializeBatch(v) + return state +} + +func (d *Dedup) SetState(state interface{}) error { + p := &influx.Parser{} + if err := p.Init(); err != nil { + return err + } + data, ok := state.([]byte) + if !ok { + return fmt.Errorf("state has wrong type %T", state) + } + metrics, err := p.Parse(data) + if err == nil { + d.Apply(metrics...) + } + return nil +} + func init() { processors.Add("dedup", func() telegraf.Processor { return &Dedup{