feat(processors.dedup): Add state persistence between runs (#14065)

This commit is contained in:
Chase Sterling 2023-10-12 17:07:13 -04:00 committed by GitHub
parent 710c92c50d
commit 309c195e03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 7 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/snmp" "github.com/influxdata/telegraf/internal/snmp"
"github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx"
) )
@ -265,9 +266,17 @@ func (a *Agent) initPersister() error {
} }
for _, processor := range a.Config.Processors { for _, processor := range a.Config.Processors {
plugin, ok := processor.Processor.(telegraf.StatefulPlugin) var plugin telegraf.StatefulPlugin
if !ok { if p, ok := processor.Processor.(processors.HasUnwrap); ok {
continue plugin, ok = p.Unwrap().(telegraf.StatefulPlugin)
if !ok {
continue
}
} else {
plugin, ok = processor.Processor.(telegraf.StatefulPlugin)
if !ok {
continue
}
} }
name := processor.LogName() name := processor.LogName()
@ -600,17 +609,17 @@ func (a *Agent) gatherOnce(
// processors. If an error occurs any started processors are Stopped. // processors. If an error occurs any started processors are Stopped.
func (a *Agent) startProcessors( func (a *Agent) startProcessors(
dst chan<- telegraf.Metric, dst chan<- telegraf.Metric,
processors models.RunningProcessors, runningProcessors models.RunningProcessors,
) (chan<- telegraf.Metric, []*processorUnit, error) { ) (chan<- telegraf.Metric, []*processorUnit, error) {
var src chan telegraf.Metric var src chan telegraf.Metric
units := make([]*processorUnit, 0, len(processors)) units := make([]*processorUnit, 0, len(runningProcessors))
// The processor chain is constructed from the output side starting from // The processor chain is constructed from the output side starting from
// the output(s) and walking the way back to the input(s). However, the // the output(s) and walking the way back to the input(s). However, the
// processor-list is sorted by order and/or by appearance in the config, // processor-list is sorted by order and/or by appearance in the config,
// i.e. in input-to-output direction. Therefore, reverse the processor list // i.e. in input-to-output direction. Therefore, reverse the processor list
// to reflect the order/definition order in the processing chain. // to reflect the order/definition order in the processing chain.
for i := len(processors) - 1; i >= 0; i-- { for i := len(runningProcessors) - 1; i >= 0; i-- {
processor := processors[i] processor := runningProcessors[i]
src = make(chan telegraf.Metric, 100) src = make(chan telegraf.Metric, 100)
acc := NewAccumulator(processor, dst) acc := NewAccumulator(processor, dst)

View File

@ -1,6 +1,8 @@
# Dedup Processor Plugin # Dedup Processor Plugin
Filter metrics whose field values are exact repetitions of the previous values. Filter metrics whose field values are exact repetitions of the previous values.
This plugin will store its state between runs if the `statefile` option in the
agent config section is set.
## Global configuration options <!-- @/docs/includes/plugin_config.md --> ## Global configuration options <!-- @/docs/includes/plugin_config.md -->

View File

@ -3,11 +3,14 @@ package dedup
import ( import (
_ "embed" _ "embed"
"fmt"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/processors"
influxSerializer "github.com/influxdata/telegraf/plugins/serializers/influx"
) )
//go:embed sample.conf //go:embed sample.conf
@ -117,6 +120,32 @@ func (d *Dedup) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
return metrics return metrics
} }
func (d *Dedup) GetState() interface{} {
s := &influxSerializer.Serializer{}
v := make([]telegraf.Metric, 0, len(d.Cache))
for _, value := range d.Cache {
v = append(v, value)
}
state, _ := s.SerializeBatch(v)
return state
}
func (d *Dedup) SetState(state interface{}) error {
p := &influx.Parser{}
if err := p.Init(); err != nil {
return err
}
data, ok := state.([]byte)
if !ok {
return fmt.Errorf("state has wrong type %T", state)
}
metrics, err := p.Parse(data)
if err == nil {
d.Apply(metrics...)
}
return nil
}
func init() { func init() {
processors.Add("dedup", func() telegraf.Processor { processors.Add("dedup", func() telegraf.Processor {
return &Dedup{ return &Dedup{