shim logger improvements (#7865)

This commit is contained in:
Steven Soroka 2020-07-22 14:29:50 -04:00 committed by GitHub
parent 6e5c72f743
commit 569584d7df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 118 additions and 11 deletions

View File

@ -79,7 +79,7 @@ func logName(pluginType, name, alias string) string {
return pluginType + "." + name + "::" + alias
}
func setLogIfExist(i interface{}, log telegraf.Logger) {
func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i)
if valI.Type().Kind() != reflect.Ptr {

View File

@ -35,7 +35,7 @@ func NewRunningAggregator(aggregator telegraf.Aggregator, config *AggregatorConf
aggErrorsRegister.Incr(1)
})
setLogIfExist(aggregator, logger)
setLoggerOnPlugin(aggregator, logger)
return &RunningAggregator{
Aggregator: aggregator,

View File

@ -35,7 +35,7 @@ func NewRunningInput(input telegraf.Input, config *InputConfig) *RunningInput {
inputErrorsRegister.Incr(1)
GlobalGatherErrors.Incr(1)
})
setLogIfExist(input, logger)
setLoggerOnPlugin(input, logger)
return &RunningInput{
Input: input,

View File

@ -72,7 +72,7 @@ func NewRunningOutput(
logger.OnErr(func() {
writeErrorsRegister.Incr(1)
})
setLogIfExist(output, logger)
setLoggerOnPlugin(output, logger)
if config.MetricBufferLimit > 0 {
bufferLimit = config.MetricBufferLimit

View File

@ -39,7 +39,7 @@ func NewRunningProcessor(processor telegraf.StreamingProcessor, config *Processo
logger.OnErr(func() {
processErrorsRegister.Incr(1)
})
setLogIfExist(processor, logger)
setLoggerOnPlugin(processor, logger)
return &RunningProcessor{
Processor: processor,

View File

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"github.com/BurntSushi/toml"
@ -152,14 +153,17 @@ func DefaultImportedPlugins() (config, error) {
Outputs: map[string][]toml.Primitive{},
}
for name := range inputs.Inputs {
log.Println("No config found. Loading default config for plugin", name)
conf.Inputs[name] = []toml.Primitive{}
return conf, nil
}
for name := range processors.Processors {
log.Println("No config found. Loading default config for plugin", name)
conf.Processors[name] = []toml.Primitive{}
return conf, nil
}
for name := range outputs.Outputs {
log.Println("No config found. Loading default config for plugin", name)
conf.Outputs[name] = []toml.Primitive{}
return conf, nil
}

View File

@ -13,6 +13,7 @@ import (
// AddInput adds the input to the shim. Later calls to Run() will run this input.
func (s *Shim) AddInput(input telegraf.Input) error {
setLoggerOnPlugin(input, NewLogger())
if p, ok := input.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
@ -57,13 +58,16 @@ func (s *Shim) RunInput(pollInterval time.Duration) error {
wg.Done()
}()
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
// push a non-blocking message to trigger metric collection.
s.pushCollectMetricsRequest()
}
go func() {
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
// push a non-blocking message to trigger metric collection.
s.pushCollectMetricsRequest()
}
cancel() // cancel gracefully stops gathering
}()
cancel() // cancel gracefully stops gathering
wg.Wait() // wait for writing to stdout to finish
return nil
}

View File

@ -0,0 +1,89 @@
package shim
import (
"fmt"
"log"
"os"
"reflect"
"github.com/influxdata/telegraf"
)
func init() {
log.SetOutput(os.Stderr)
}
// Logger defines a logging structure for plugins.
// external plugins can only ever write to stderr and writing to stdout
// would interfere with input/processor writing out of metrics.
type Logger struct{}
// NewLogger creates a new logger instance
func NewLogger() *Logger {
return &Logger{}
}
// Errorf logs an error message, patterned after log.Printf.
func (l *Logger) Errorf(format string, args ...interface{}) {
log.Printf("E! "+format, args...)
}
// Error logs an error message, patterned after log.Print.
func (l *Logger) Error(args ...interface{}) {
log.Print("E! ", fmt.Sprint(args...))
}
// Debugf logs a debug message, patterned after log.Printf.
func (l *Logger) Debugf(format string, args ...interface{}) {
log.Printf("D! "+format, args...)
}
// Debug logs a debug message, patterned after log.Print.
func (l *Logger) Debug(args ...interface{}) {
log.Print("D! ", fmt.Sprint(args...))
}
// Warnf logs a warning message, patterned after log.Printf.
func (l *Logger) Warnf(format string, args ...interface{}) {
log.Printf("W! "+format, args...)
}
// Warn logs a warning message, patterned after log.Print.
func (l *Logger) Warn(args ...interface{}) {
log.Print("W! ", fmt.Sprint(args...))
}
// Infof logs an information message, patterned after log.Printf.
func (l *Logger) Infof(format string, args ...interface{}) {
log.Printf("I! "+format, args...)
}
// Info logs an information message, patterned after log.Print.
func (l *Logger) Info(args ...interface{}) {
log.Print("I! ", fmt.Sprint(args...))
}
// setLoggerOnPlugin injects the logger into the plugin,
// if it defines Log telegraf.Logger. This is sort of like SetLogger but using
// reflection instead of forcing the plugin author to define the function for it
func setLoggerOnPlugin(i interface{}, log telegraf.Logger) {
valI := reflect.ValueOf(i)
if valI.Type().Kind() != reflect.Ptr {
valI = reflect.New(reflect.TypeOf(i))
}
field := valI.Elem().FieldByName("Log")
if !field.IsValid() {
return
}
switch field.Type().String() {
case "telegraf.Logger":
if field.CanSet() {
field.Set(reflect.ValueOf(log))
}
}
return
}

View File

@ -10,6 +10,7 @@ import (
// AddOutput adds the input to the shim. Later calls to Run() will run this.
func (s *Shim) AddOutput(output telegraf.Output) error {
setLoggerOnPlugin(output, NewLogger())
if p, ok := output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {

View File

@ -14,12 +14,14 @@ import (
// AddProcessor adds the processor to the shim. Later calls to Run() will run this.
func (s *Shim) AddProcessor(processor telegraf.Processor) error {
setLoggerOnPlugin(processor, NewLogger())
p := processors.NewStreamingProcessorFromProcessor(processor)
return s.AddStreamingProcessor(p)
}
// AddStreamingProcessor adds the processor to the shim. Later calls to Run() will run this.
func (s *Shim) AddStreamingProcessor(processor telegraf.StreamingProcessor) error {
setLoggerOnPlugin(processor, NewLogger())
if p, ok := processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {

View File

@ -49,8 +49,15 @@ type Shim struct {
stderr io.Writer
}
var (
oldpkg = "github.com/influxdata/telegraf/plugins/inputs/execd/shim"
newpkg = "github.com/influxdata/telegraf/plugins/common/shim"
)
// New creates a new shim interface
func New() *Shim {
fmt.Fprintf(os.Stderr, "%s is deprecated; please change your import to %s\n",
oldpkg, newpkg)
return &Shim{
stdin: os.Stdin,
stdout: os.Stdout,