fix bug in shim logger affecting AddError (#8052)

This commit is contained in:
Steven Soroka 2020-08-28 18:45:38 -04:00 committed by GitHub
parent 5d999f85f0
commit 3278054c5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 87 additions and 5 deletions

View File

@ -37,6 +37,8 @@ type Shim struct {
Processor telegraf.StreamingProcessor
Output telegraf.Output
log *Logger
// streams
stdin io.Reader
stdout io.Writer
@ -56,6 +58,7 @@ func New() *Shim {
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
log: NewLogger(),
}
}
@ -127,5 +130,5 @@ func (s *Shim) MakeMetric(m telegraf.Metric) telegraf.Metric {
// Log satisfies the MetricMaker interface
func (s *Shim) Log() telegraf.Logger {
return nil
return s.log
}

View File

@ -0,0 +1,79 @@
package shim
import (
"bufio"
"errors"
"io"
"log"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/stretchr/testify/require"
)
func TestShimSetsUpLogger(t *testing.T) {
stderrReader, stderrWriter := io.Pipe()
stdinReader, stdinWriter := io.Pipe()
runErroringInputPlugin(t, 40*time.Second, stdinReader, nil, stderrWriter)
stdinWriter.Write([]byte("\n"))
// <-metricProcessed
r := bufio.NewReader(stderrReader)
out, err := r.ReadString('\n')
require.NoError(t, err)
require.Contains(t, out, "Error in plugin: intentional")
stdinWriter.Close()
}
func runErroringInputPlugin(t *testing.T, interval time.Duration, stdin io.Reader, stdout, stderr io.Writer) (metricProcessed chan bool, exited chan bool) {
metricProcessed = make(chan bool, 1)
exited = make(chan bool, 1)
inp := &erroringInput{}
shim := New()
if stdin != nil {
shim.stdin = stdin
}
if stdout != nil {
shim.stdout = stdout
}
if stderr != nil {
shim.stderr = stderr
log.SetOutput(stderr)
}
shim.AddInput(inp)
go func() {
err := shim.Run(interval)
require.NoError(t, err)
exited <- true
}()
return metricProcessed, exited
}
type erroringInput struct {
}
func (i *erroringInput) SampleConfig() string {
return ""
}
func (i *erroringInput) Description() string {
return ""
}
func (i *erroringInput) Gather(acc telegraf.Accumulator) error {
acc.AddError(errors.New("intentional"))
return nil
}
func (i *erroringInput) Start(acc telegraf.Accumulator) error {
return nil
}
func (i *erroringInput) Stop() {
}

View File

@ -13,7 +13,7 @@ import (
// AddInput adds the input to the shim. Later calls to Run() will run this input.
func (s *Shim) AddInput(input telegraf.Input) error {
setLoggerOnPlugin(input, NewLogger())
setLoggerOnPlugin(input, s.Log())
if p, ok := input.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {

View File

@ -10,7 +10,7 @@ import (
// AddOutput adds the input to the shim. Later calls to Run() will run this.
func (s *Shim) AddOutput(output telegraf.Output) error {
setLoggerOnPlugin(output, NewLogger())
setLoggerOnPlugin(output, s.Log())
if p, ok := output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {

View File

@ -14,14 +14,14 @@ import (
// AddProcessor adds the processor to the shim. Later calls to Run() will run this.
func (s *Shim) AddProcessor(processor telegraf.Processor) error {
setLoggerOnPlugin(processor, NewLogger())
setLoggerOnPlugin(processor, s.Log())
p := processors.NewStreamingProcessorFromProcessor(processor)
return s.AddStreamingProcessor(p)
}
// AddStreamingProcessor adds the processor to the shim. Later calls to Run() will run this.
func (s *Shim) AddStreamingProcessor(processor telegraf.StreamingProcessor) error {
setLoggerOnPlugin(processor, NewLogger())
setLoggerOnPlugin(processor, s.Log())
if p, ok := processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {