diff --git a/plugins/common/kafka/logger.go b/plugins/common/kafka/logger.go new file mode 100644 index 000000000..ad264c31e --- /dev/null +++ b/plugins/common/kafka/logger.go @@ -0,0 +1,34 @@ +package kafka + +import ( + "github.com/Shopify/sarama" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/models" +) + +type Logger struct { +} + +// DebugLogger logs messages from sarama at the debug level. +type DebugLogger struct { + Log telegraf.Logger +} + +func (l *DebugLogger) Print(v ...interface{}) { + l.Log.Debug(v...) +} + +func (l *DebugLogger) Printf(format string, v ...interface{}) { + l.Log.Debugf(format, v...) +} + +func (l *DebugLogger) Println(v ...interface{}) { + l.Print(v...) +} + +// SetLogger configures a debug logger for kafka (sarama) +func (k *Logger) SetLogger() { + log := &models.Logger{Name: "sarama"} + sarama.Logger = &DebugLogger{Log: log} +} diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index b80fdeb7f..d9c24f034 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -47,6 +47,8 @@ type KafkaConsumer struct { kafka.ReadConfig + kafka.Logger + Log telegraf.Logger `toml:"-"` ConsumerCreator ConsumerGroupCreator `toml:"-"` @@ -83,6 +85,8 @@ func (k *KafkaConsumer) SetParser(parser parsers.Parser) { } func (k *KafkaConsumer) Init() error { + k.SetLogger() + if k.MaxUndeliveredMessages == 0 { k.MaxUndeliveredMessages = defaultMaxUndeliveredMessages } diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 70854dacc..10f86a124 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -49,6 +49,8 @@ type Kafka struct { kafka.WriteConfig + kafka.Logger + Log telegraf.Logger `toml:"-"` saramaConfig *sarama.Config @@ -64,25 +66,6 @@ type TopicSuffix struct { Separator string `toml:"separator"` } -// DebugLogger logs messages from sarama at the debug level. -type DebugLogger struct { - Log telegraf.Logger -} - -func (l *DebugLogger) Print(v ...interface{}) { - args := make([]interface{}, 0, len(v)+1) - args = append(append(args, "[sarama] "), v...) - l.Log.Debug(args...) -} - -func (l *DebugLogger) Printf(format string, v ...interface{}) { - l.Log.Debugf("[sarama] "+format, v...) -} - -func (l *DebugLogger) Println(v ...interface{}) { - l.Print(v...) -} - func ValidateTopicSuffixMethod(method string) error { for _, validMethod := range ValidTopicSuffixMethods { if method == validMethod { @@ -137,7 +120,7 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) { } func (k *Kafka) Init() error { - sarama.Logger = &DebugLogger{Log: k.Log} + k.SetLogger() err := ValidateTopicSuffixMethod(k.TopicSuffix.Method) if err != nil {