diff --git a/plugins/common/shim/goshim.go b/plugins/common/shim/goshim.go index 2490967ee..7be139194 100644 --- a/plugins/common/shim/goshim.go +++ b/plugins/common/shim/goshim.go @@ -37,6 +37,8 @@ type Shim struct { Processor telegraf.StreamingProcessor Output telegraf.Output + log *Logger + // streams stdin io.Reader stdout io.Writer @@ -56,6 +58,7 @@ func New() *Shim { stdin: os.Stdin, stdout: os.Stdout, stderr: os.Stderr, + log: NewLogger(), } } @@ -127,5 +130,5 @@ func (s *Shim) MakeMetric(m telegraf.Metric) telegraf.Metric { // Log satisfies the MetricMaker interface func (s *Shim) Log() telegraf.Logger { - return nil + return s.log } diff --git a/plugins/common/shim/goshim_test.go b/plugins/common/shim/goshim_test.go new file mode 100644 index 000000000..080a513ad --- /dev/null +++ b/plugins/common/shim/goshim_test.go @@ -0,0 +1,79 @@ +package shim + +import ( + "bufio" + "errors" + "io" + "log" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/stretchr/testify/require" +) + +func TestShimSetsUpLogger(t *testing.T) { + stderrReader, stderrWriter := io.Pipe() + stdinReader, stdinWriter := io.Pipe() + + runErroringInputPlugin(t, 40*time.Second, stdinReader, nil, stderrWriter) + + stdinWriter.Write([]byte("\n")) + + // <-metricProcessed + + r := bufio.NewReader(stderrReader) + out, err := r.ReadString('\n') + require.NoError(t, err) + require.Contains(t, out, "Error in plugin: intentional") + + stdinWriter.Close() +} + +func runErroringInputPlugin(t *testing.T, interval time.Duration, stdin io.Reader, stdout, stderr io.Writer) (metricProcessed chan bool, exited chan bool) { + metricProcessed = make(chan bool, 1) + exited = make(chan bool, 1) + inp := &erroringInput{} + + shim := New() + if stdin != nil { + shim.stdin = stdin + } + if stdout != nil { + shim.stdout = stdout + } + if stderr != nil { + shim.stderr = stderr + log.SetOutput(stderr) + } + shim.AddInput(inp) + go func() { + err := shim.Run(interval) + require.NoError(t, err) + exited <- true + }() + return metricProcessed, exited +} + +type erroringInput struct { +} + +func (i *erroringInput) SampleConfig() string { + return "" +} + +func (i *erroringInput) Description() string { + return "" +} + +func (i *erroringInput) Gather(acc telegraf.Accumulator) error { + acc.AddError(errors.New("intentional")) + return nil +} + +func (i *erroringInput) Start(acc telegraf.Accumulator) error { + return nil +} + +func (i *erroringInput) Stop() { +} diff --git a/plugins/common/shim/input.go b/plugins/common/shim/input.go index acf199fed..a2956c3e1 100644 --- a/plugins/common/shim/input.go +++ b/plugins/common/shim/input.go @@ -13,7 +13,7 @@ import ( // AddInput adds the input to the shim. Later calls to Run() will run this input. func (s *Shim) AddInput(input telegraf.Input) error { - setLoggerOnPlugin(input, NewLogger()) + setLoggerOnPlugin(input, s.Log()) if p, ok := input.(telegraf.Initializer); ok { err := p.Init() if err != nil { diff --git a/plugins/common/shim/output.go b/plugins/common/shim/output.go index 6aa9546fa..c5ce46da7 100644 --- a/plugins/common/shim/output.go +++ b/plugins/common/shim/output.go @@ -10,7 +10,7 @@ import ( // AddOutput adds the input to the shim. Later calls to Run() will run this. func (s *Shim) AddOutput(output telegraf.Output) error { - setLoggerOnPlugin(output, NewLogger()) + setLoggerOnPlugin(output, s.Log()) if p, ok := output.(telegraf.Initializer); ok { err := p.Init() if err != nil { diff --git a/plugins/common/shim/processor.go b/plugins/common/shim/processor.go index 95b5dff86..33dceba87 100644 --- a/plugins/common/shim/processor.go +++ b/plugins/common/shim/processor.go @@ -14,14 +14,14 @@ import ( // AddProcessor adds the processor to the shim. Later calls to Run() will run this. func (s *Shim) AddProcessor(processor telegraf.Processor) error { - setLoggerOnPlugin(processor, NewLogger()) + setLoggerOnPlugin(processor, s.Log()) p := processors.NewStreamingProcessorFromProcessor(processor) return s.AddStreamingProcessor(p) } // AddStreamingProcessor adds the processor to the shim. Later calls to Run() will run this. func (s *Shim) AddStreamingProcessor(processor telegraf.StreamingProcessor) error { - setLoggerOnPlugin(processor, NewLogger()) + setLoggerOnPlugin(processor, s.Log()) if p, ok := processor.(telegraf.Initializer); ok { err := p.Init() if err != nil {