diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 9d262b734..088b4c0b4 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -64,7 +64,6 @@ type KafkaConsumer struct { topicClient sarama.Client regexps []regexp.Regexp allWantedTopics []string - ticker *time.Ticker fingerprint string parser telegraf.Parser @@ -376,15 +375,13 @@ func (k *KafkaConsumer) Gather(_ telegraf.Accumulator) error { } func (k *KafkaConsumer) Stop() { - if k.ticker != nil { - k.ticker.Stop() - } // Lock so that a topic refresh cannot start while we are stopping. k.topicLock.Lock() - defer k.topicLock.Unlock() if k.topicClient != nil { k.topicClient.Close() } + k.topicLock.Unlock() + k.cancel() k.wg.Wait() }