feat(inputs.kafka_consumer): Add sarama debug logs (#12304)
This commit is contained in:
parent
1a6c363cf1
commit
70b33c2649
|
|
@ -0,0 +1,34 @@
|
|||
package kafka
|
||||
|
||||
import (
|
||||
"github.com/Shopify/sarama"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/models"
|
||||
)
|
||||
|
||||
type Logger struct {
|
||||
}
|
||||
|
||||
// DebugLogger logs messages from sarama at the debug level.
|
||||
type DebugLogger struct {
|
||||
Log telegraf.Logger
|
||||
}
|
||||
|
||||
func (l *DebugLogger) Print(v ...interface{}) {
|
||||
l.Log.Debug(v...)
|
||||
}
|
||||
|
||||
func (l *DebugLogger) Printf(format string, v ...interface{}) {
|
||||
l.Log.Debugf(format, v...)
|
||||
}
|
||||
|
||||
func (l *DebugLogger) Println(v ...interface{}) {
|
||||
l.Print(v...)
|
||||
}
|
||||
|
||||
// SetLogger configures a debug logger for kafka (sarama)
|
||||
func (k *Logger) SetLogger() {
|
||||
log := &models.Logger{Name: "sarama"}
|
||||
sarama.Logger = &DebugLogger{Log: log}
|
||||
}
|
||||
|
|
@ -47,6 +47,8 @@ type KafkaConsumer struct {
|
|||
|
||||
kafka.ReadConfig
|
||||
|
||||
kafka.Logger
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
ConsumerCreator ConsumerGroupCreator `toml:"-"`
|
||||
|
|
@ -83,6 +85,8 @@ func (k *KafkaConsumer) SetParser(parser parsers.Parser) {
|
|||
}
|
||||
|
||||
func (k *KafkaConsumer) Init() error {
|
||||
k.SetLogger()
|
||||
|
||||
if k.MaxUndeliveredMessages == 0 {
|
||||
k.MaxUndeliveredMessages = defaultMaxUndeliveredMessages
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,6 +49,8 @@ type Kafka struct {
|
|||
|
||||
kafka.WriteConfig
|
||||
|
||||
kafka.Logger
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
saramaConfig *sarama.Config
|
||||
|
|
@ -64,25 +66,6 @@ type TopicSuffix struct {
|
|||
Separator string `toml:"separator"`
|
||||
}
|
||||
|
||||
// DebugLogger logs messages from sarama at the debug level.
|
||||
type DebugLogger struct {
|
||||
Log telegraf.Logger
|
||||
}
|
||||
|
||||
func (l *DebugLogger) Print(v ...interface{}) {
|
||||
args := make([]interface{}, 0, len(v)+1)
|
||||
args = append(append(args, "[sarama] "), v...)
|
||||
l.Log.Debug(args...)
|
||||
}
|
||||
|
||||
func (l *DebugLogger) Printf(format string, v ...interface{}) {
|
||||
l.Log.Debugf("[sarama] "+format, v...)
|
||||
}
|
||||
|
||||
func (l *DebugLogger) Println(v ...interface{}) {
|
||||
l.Print(v...)
|
||||
}
|
||||
|
||||
func ValidateTopicSuffixMethod(method string) error {
|
||||
for _, validMethod := range ValidTopicSuffixMethods {
|
||||
if method == validMethod {
|
||||
|
|
@ -137,7 +120,7 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
|
|||
}
|
||||
|
||||
func (k *Kafka) Init() error {
|
||||
sarama.Logger = &DebugLogger{Log: k.Log}
|
||||
k.SetLogger()
|
||||
|
||||
err := ValidateTopicSuffixMethod(k.TopicSuffix.Method)
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in New Issue