fix(inputs.kafka_consumer): Fix deadlock (#16074)
This commit is contained in:
parent
f06111499e
commit
662607cb6c
|
|
@ -64,7 +64,6 @@ type KafkaConsumer struct {
|
||||||
topicClient sarama.Client
|
topicClient sarama.Client
|
||||||
regexps []regexp.Regexp
|
regexps []regexp.Regexp
|
||||||
allWantedTopics []string
|
allWantedTopics []string
|
||||||
ticker *time.Ticker
|
|
||||||
fingerprint string
|
fingerprint string
|
||||||
|
|
||||||
parser telegraf.Parser
|
parser telegraf.Parser
|
||||||
|
|
@ -376,15 +375,13 @@ func (k *KafkaConsumer) Gather(_ telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KafkaConsumer) Stop() {
|
func (k *KafkaConsumer) Stop() {
|
||||||
if k.ticker != nil {
|
|
||||||
k.ticker.Stop()
|
|
||||||
}
|
|
||||||
// Lock so that a topic refresh cannot start while we are stopping.
|
// Lock so that a topic refresh cannot start while we are stopping.
|
||||||
k.topicLock.Lock()
|
k.topicLock.Lock()
|
||||||
defer k.topicLock.Unlock()
|
|
||||||
if k.topicClient != nil {
|
if k.topicClient != nil {
|
||||||
k.topicClient.Close()
|
k.topicClient.Close()
|
||||||
}
|
}
|
||||||
|
k.topicLock.Unlock()
|
||||||
|
|
||||||
k.cancel()
|
k.cancel()
|
||||||
k.wg.Wait()
|
k.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue