diff --git a/models/log.go b/models/log.go index 2e42a516c..c0b52a812 100644 --- a/models/log.go +++ b/models/log.go @@ -79,7 +79,7 @@ func logName(pluginType, name, alias string) string { return pluginType + "." + name + "::" + alias } -func setLoggerOnPlugin(i interface{}, log telegraf.Logger) { +func SetLoggerOnPlugin(i interface{}, log telegraf.Logger) { valI := reflect.ValueOf(i) if valI.Type().Kind() != reflect.Ptr { @@ -96,6 +96,9 @@ func setLoggerOnPlugin(i interface{}, log telegraf.Logger) { if field.CanSet() { field.Set(reflect.ValueOf(log)) } + default: + log.Debugf("Plugin %q defines a 'Log' field on its struct of an unexpected type %q. Expected telegraf.Logger", + valI.Type().Name(), field.Type().String()) } return diff --git a/models/running_aggregator.go b/models/running_aggregator.go index ad054be76..cbfb9889b 100644 --- a/models/running_aggregator.go +++ b/models/running_aggregator.go @@ -35,7 +35,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf aggErrorsRegister.Incr(1) }) - setLoggerOnPlugin(aggregator, logger) + SetLoggerOnPlugin(aggregator, logger) return &RunningAggregator{ Aggregator: aggregator, diff --git a/models/running_input.go b/models/running_input.go index 52f95cb52..70a4c2ee3 100644 --- a/models/running_input.go +++ b/models/running_input.go @@ -35,7 +35,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput { inputErrorsRegister.Incr(1) GlobalGatherErrors.Incr(1) }) - setLoggerOnPlugin(input, logger) + SetLoggerOnPlugin(input, logger) return &RunningInput{ Input: input, diff --git a/models/running_output.go b/models/running_output.go index 0d2954c4a..894ae011c 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -72,7 +72,7 @@ func NewRunningOutput( logger.OnErr(func() { writeErrorsRegister.Incr(1) }) - setLoggerOnPlugin(output, logger) + SetLoggerOnPlugin(output, logger) if config.MetricBufferLimit > 0 { bufferLimit = config.MetricBufferLimit diff --git a/models/running_processor.go b/models/running_processor.go index c487f4821..1bd2d0f6e 100644 --- a/models/running_processor.go +++ b/models/running_processor.go @@ -39,7 +39,7 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo logger.OnErr(func() { processErrorsRegister.Incr(1) }) - setLoggerOnPlugin(processor, logger) + SetLoggerOnPlugin(processor, logger) return &RunningProcessor{ Processor: processor, diff --git a/models/running_processor_test.go b/models/running_processor_test.go index 1c431bde1..14df03253 100644 --- a/models/running_processor_test.go +++ b/models/running_processor_test.go @@ -1,4 +1,4 @@ -package models +package models_test import ( "sort" @@ -6,6 +6,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -52,7 +53,7 @@ func (p *MockProcessorToInit) Init() error { func TestRunningProcessor_Init(t *testing.T) { mock := MockProcessorToInit{} - rp := &RunningProcessor{ + rp := &models.RunningProcessor{ Processor: processors.NewStreamingProcessorFromProcessor(&mock), } rp.Init() @@ -75,7 +76,7 @@ func TagProcessor(key, value string) *MockProcessor { func TestRunningProcessor_Apply(t *testing.T) { type args struct { Processor telegraf.StreamingProcessor - Config *ProcessorConfig + Config *models.ProcessorConfig } tests := []struct { @@ -88,8 +89,8 @@ func TestRunningProcessor_Apply(t *testing.T) { name: "inactive filter applies metrics", args: args{ Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), - Config: &ProcessorConfig{ - Filter: Filter{}, + Config: &models.ProcessorConfig{ + Filter: models.Filter{}, }, }, input: []telegraf.Metric{ @@ -119,8 +120,8 @@ func TestRunningProcessor_Apply(t *testing.T) { name: "filter applies", args: args{ Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), - Config: &ProcessorConfig{ - Filter: Filter{ + Config: &models.ProcessorConfig{ + Filter: models.Filter{ NamePass: []string{"cpu"}, }, }, @@ -152,8 +153,8 @@ func TestRunningProcessor_Apply(t *testing.T) { name: "filter doesn't apply", args: args{ Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), - Config: &ProcessorConfig{ - Filter: Filter{ + Config: &models.ProcessorConfig{ + Filter: models.Filter{ NameDrop: []string{"cpu"}, }, }, @@ -183,7 +184,7 @@ func TestRunningProcessor_Apply(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - rp := &RunningProcessor{ + rp := &models.RunningProcessor{ Processor: tt.args.Processor, Config: tt.args.Config, } @@ -204,25 +205,25 @@ func TestRunningProcessor_Apply(t *testing.T) { } func TestRunningProcessor_Order(t *testing.T) { - rp1 := &RunningProcessor{ - Config: &ProcessorConfig{ + rp1 := &models.RunningProcessor{ + Config: &models.ProcessorConfig{ Order: 1, }, } - rp2 := &RunningProcessor{ - Config: &ProcessorConfig{ + rp2 := &models.RunningProcessor{ + Config: &models.ProcessorConfig{ Order: 2, }, } - rp3 := &RunningProcessor{ - Config: &ProcessorConfig{ + rp3 := &models.RunningProcessor{ + Config: &models.ProcessorConfig{ Order: 3, }, } - procs := RunningProcessors{rp2, rp3, rp1} + procs := models.RunningProcessors{rp2, rp3, rp1} sort.Sort(procs) require.Equal(t, - RunningProcessors{rp1, rp2, rp3}, + models.RunningProcessors{rp1, rp2, rp3}, procs) } diff --git a/plugins/processors/streamingprocessor.go b/plugins/processors/streamingprocessor.go index 95b2e0748..95ebae214 100644 --- a/plugins/processors/streamingprocessor.go +++ b/plugins/processors/streamingprocessor.go @@ -2,6 +2,7 @@ package processors import ( "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/models" ) // NewStreamingProcessorFromProcessor is a converter that turns a standard @@ -16,6 +17,7 @@ func NewStreamingProcessorFromProcessor(p telegraf.Processor) telegraf.Streaming type streamingProcessor struct { processor telegraf.Processor acc telegraf.Accumulator + Log telegraf.Logger } func (sp *streamingProcessor) SampleConfig() string { @@ -46,6 +48,7 @@ func (sp *streamingProcessor) Stop() error { // to call the Init method of the wrapped processor if // needed func (sp *streamingProcessor) Init() error { + models.SetLoggerOnPlugin(sp.processor, sp.Log) if p, ok := sp.processor.(telegraf.Initializer); ok { err := p.Init() if err != nil {