fix panic on streaming processers using logging (#8176)

This commit is contained in:
Steven Soroka 2020-09-28 11:58:23 -04:00 committed by GitHub
parent e1cb269a35
commit b4fb1adc6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 30 additions and 23 deletions

View File

@ -79,7 +79,7 @@ func logName(pluginType, name, alias string) string {
return pluginType + "." + name + "::" + alias return pluginType + "." + name + "::" + alias
} }
func setLoggerOnPlugin(i interface{}, log telegraf.Logger) { func SetLoggerOnPlugin(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i) valI := reflect.ValueOf(i)
if valI.Type().Kind() != reflect.Ptr { if valI.Type().Kind() != reflect.Ptr {
@ -96,6 +96,9 @@ func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
if field.CanSet() { if field.CanSet() {
field.Set(reflect.ValueOf(log)) field.Set(reflect.ValueOf(log))
} }
default:
log.Debugf("Plugin %q defines a 'Log' field on its struct of an unexpected type %q. Expected telegraf.Logger",
valI.Type().Name(), field.Type().String())
} }
return return

View File

@ -35,7 +35,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
aggErrorsRegister.Incr(1) aggErrorsRegister.Incr(1)
}) })
setLoggerOnPlugin(aggregator, logger) SetLoggerOnPlugin(aggregator, logger)
return &RunningAggregator{ return &RunningAggregator{
Aggregator: aggregator, Aggregator: aggregator,

View File

@ -35,7 +35,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
inputErrorsRegister.Incr(1) inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1) GlobalGatherErrors.Incr(1)
}) })
setLoggerOnPlugin(input, logger) SetLoggerOnPlugin(input, logger)
return &RunningInput{ return &RunningInput{
Input: input, Input: input,

View File

@ -72,7 +72,7 @@ func NewRunningOutput(
logger.OnErr(func() { logger.OnErr(func() {
writeErrorsRegister.Incr(1) writeErrorsRegister.Incr(1)
}) })
setLoggerOnPlugin(output, logger) SetLoggerOnPlugin(output, logger)
if config.MetricBufferLimit > 0 { if config.MetricBufferLimit > 0 {
bufferLimit = config.MetricBufferLimit bufferLimit = config.MetricBufferLimit

View File

@ -39,7 +39,7 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
logger.OnErr(func() { logger.OnErr(func() {
processErrorsRegister.Incr(1) processErrorsRegister.Incr(1)
}) })
setLoggerOnPlugin(processor, logger) SetLoggerOnPlugin(processor, logger)
return &RunningProcessor{ return &RunningProcessor{
Processor: processor, Processor: processor,

View File

@ -1,4 +1,4 @@
package models package models_test
import ( import (
"sort" "sort"
@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -52,7 +53,7 @@ func (p *MockProcessorToInit) Init() error {
func TestRunningProcessor_Init(t *testing.T) { func TestRunningProcessor_Init(t *testing.T) {
mock := MockProcessorToInit{} mock := MockProcessorToInit{}
rp := &RunningProcessor{ rp := &models.RunningProcessor{
Processor: processors.NewStreamingProcessorFromProcessor(&mock), Processor: processors.NewStreamingProcessorFromProcessor(&mock),
} }
rp.Init() rp.Init()
@ -75,7 +76,7 @@ func TagProcessor(key, value string) *MockProcessor {
func TestRunningProcessor_Apply(t *testing.T) { func TestRunningProcessor_Apply(t *testing.T) {
type args struct { type args struct {
Processor telegraf.StreamingProcessor Processor telegraf.StreamingProcessor
Config *ProcessorConfig Config *models.ProcessorConfig
} }
tests := []struct { tests := []struct {
@ -88,8 +89,8 @@ func TestRunningProcessor_Apply(t *testing.T) {
name: "inactive filter applies metrics", name: "inactive filter applies metrics",
args: args{ args: args{
Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{ Config: &models.ProcessorConfig{
Filter: Filter{}, Filter: models.Filter{},
}, },
}, },
input: []telegraf.Metric{ input: []telegraf.Metric{
@ -119,8 +120,8 @@ func TestRunningProcessor_Apply(t *testing.T) {
name: "filter applies", name: "filter applies",
args: args{ args: args{
Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{ Config: &models.ProcessorConfig{
Filter: Filter{ Filter: models.Filter{
NamePass: []string{"cpu"}, NamePass: []string{"cpu"},
}, },
}, },
@ -152,8 +153,8 @@ func TestRunningProcessor_Apply(t *testing.T) {
name: "filter doesn't apply", name: "filter doesn't apply",
args: args{ args: args{
Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")), Processor: processors.NewStreamingProcessorFromProcessor(TagProcessor("apply", "true")),
Config: &ProcessorConfig{ Config: &models.ProcessorConfig{
Filter: Filter{ Filter: models.Filter{
NameDrop: []string{"cpu"}, NameDrop: []string{"cpu"},
}, },
}, },
@ -183,7 +184,7 @@ func TestRunningProcessor_Apply(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
rp := &RunningProcessor{ rp := &models.RunningProcessor{
Processor: tt.args.Processor, Processor: tt.args.Processor,
Config: tt.args.Config, Config: tt.args.Config,
} }
@ -204,25 +205,25 @@ func TestRunningProcessor_Apply(t *testing.T) {
} }
func TestRunningProcessor_Order(t *testing.T) { func TestRunningProcessor_Order(t *testing.T) {
rp1 := &RunningProcessor{ rp1 := &models.RunningProcessor{
Config: &ProcessorConfig{ Config: &models.ProcessorConfig{
Order: 1, Order: 1,
}, },
} }
rp2 := &RunningProcessor{ rp2 := &models.RunningProcessor{
Config: &ProcessorConfig{ Config: &models.ProcessorConfig{
Order: 2, Order: 2,
}, },
} }
rp3 := &RunningProcessor{ rp3 := &models.RunningProcessor{
Config: &ProcessorConfig{ Config: &models.ProcessorConfig{
Order: 3, Order: 3,
}, },
} }
procs := RunningProcessors{rp2, rp3, rp1} procs := models.RunningProcessors{rp2, rp3, rp1}
sort.Sort(procs) sort.Sort(procs)
require.Equal(t, require.Equal(t,
RunningProcessors{rp1, rp2, rp3}, models.RunningProcessors{rp1, rp2, rp3},
procs) procs)
} }

View File

@ -2,6 +2,7 @@ package processors
import ( import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
) )
// NewStreamingProcessorFromProcessor is a converter that turns a standard // NewStreamingProcessorFromProcessor is a converter that turns a standard
@ -16,6 +17,7 @@ func NewStreamingProcessorFromProcessor(p telegraf.Processor) telegraf.Streaming
type streamingProcessor struct { type streamingProcessor struct {
processor telegraf.Processor processor telegraf.Processor
acc telegraf.Accumulator acc telegraf.Accumulator
Log telegraf.Logger
} }
func (sp *streamingProcessor) SampleConfig() string { func (sp *streamingProcessor) SampleConfig() string {
@ -46,6 +48,7 @@ func (sp *streamingProcessor) Stop() error {
// to call the Init method of the wrapped processor if // to call the Init method of the wrapped processor if
// needed // needed
func (sp *streamingProcessor) Init() error { func (sp *streamingProcessor) Init() error {
models.SetLoggerOnPlugin(sp.processor, sp.Log)
if p, ok := sp.processor.(telegraf.Initializer); ok { if p, ok := sp.processor.(telegraf.Initializer); ok {
err := p.Init() err := p.Init()
if err != nil { if err != nil {