From 662607cb6c37d45860c51c728462e18e57a3a6d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20=C5=BBak?= Date: Thu, 24 Oct 2024 20:34:56 +0200 Subject: [PATCH] fix(inputs.kafka_consumer): Fix deadlock (#16074) --- plugins/inputs/kafka_consumer/kafka_consumer.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 9d262b734..088b4c0b4 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -64,7 +64,6 @@ type KafkaConsumer struct { topicClient sarama.Client regexps []regexp.Regexp allWantedTopics []string - ticker *time.Ticker fingerprint string parser telegraf.Parser @@ -376,15 +375,13 @@ func (k *KafkaConsumer) Gather(_ telegraf.Accumulator) error { } func (k *KafkaConsumer) Stop() { - if k.ticker != nil { - k.ticker.Stop() - } // Lock so that a topic refresh cannot start while we are stopping. k.topicLock.Lock() - defer k.topicLock.Unlock() if k.topicClient != nil { k.topicClient.Close() } + k.topicLock.Unlock() + k.cancel() k.wg.Wait() }