diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 7458fb4fe..0198d2b97 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -50,12 +50,6 @@ to use them. ## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ] # topic_regexps = [ ] - ## Topic regexp refresh interval. If enabled, and if regular expressions - ## are enabled, available topics will be rescanned at this interval to - ## determine whether new ones are present. - ## Exmaple: topic_refresh_interval = "5m" - # topic_refresh_interval = "" - ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = "" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 9b56e8bf6..2d1d59013 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -43,7 +43,6 @@ type KafkaConsumer struct { BalanceStrategy string `toml:"balance_strategy"` Topics []string `toml:"topics"` TopicRegexps []string `toml:"topic_regexps"` - TopicRefreshInterval config.Duration `toml:"topic_refresh_interval"` TopicTag string `toml:"topic_tag"` ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` ConnectionStrategy string `toml:"connection_strategy"` @@ -66,7 +65,6 @@ type KafkaConsumer struct { parser telegraf.Parser topicLock sync.Mutex - groupLock sync.Mutex wg sync.WaitGroup cancel context.CancelFunc } @@ -169,14 +167,14 @@ func (k *KafkaConsumer) Init() error { } k.topicClient = client } + return nil } func (k *KafkaConsumer) compileTopicRegexps() error { // While we can add new topics matching extant regexps, we can't // update that list on the fly. We compile them once at startup. - // Changing them is a configuration change and requires us to cancel - // and relaunch our ConsumerGroup. + // Changing them is a configuration change and requires a restart. k.regexps = make([]regexp.Regexp, 0, len(k.TopicRegexps)) for _, r := range k.TopicRegexps { @@ -189,32 +187,22 @@ func (k *KafkaConsumer) compileTopicRegexps() error { return nil } -func (k *KafkaConsumer) changedTopics() (bool, error) { - // We have instantiated a generic Kafka client, so we can ask +func (k *KafkaConsumer) refreshTopics() error { + // We have instantiated a new generic Kafka client, so we can ask // it for all the topics it knows about. Then we build // regexps from our strings, loop over those, loop over the // topics, and if we find a match, add that topic to // out topic set, which then we turn back into a list at the end. - // - // If our topics changed, we return true, to indicate that the - // consumer will need a restart in order to pick up the new - // topics. if len(k.regexps) == 0 { - return false, nil + return nil } - // Refresh metadata for all topics. - err := k.topicClient.RefreshMetadata() - if err != nil { - return false, err - } allDiscoveredTopics, err := k.topicClient.Topics() if err != nil { - return false, err + return err } - sort.Strings(allDiscoveredTopics) - k.Log.Debugf("discovered %d topics in total", len(allDiscoveredTopics)) + k.Log.Debugf("discovered topics: %v", allDiscoveredTopics) extantTopicSet := make(map[string]bool, len(allDiscoveredTopics)) for _, t := range allDiscoveredTopics { @@ -232,6 +220,7 @@ func (k *KafkaConsumer) changedTopics() (bool, error) { wantedTopicSet := make(map[string]bool, len(allDiscoveredTopics)) for _, t := range k.Topics { // Get our pre-specified topics + k.Log.Debugf("adding literally-specified topic %s", t) wantedTopicSet[t] = true } for _, t := range allDiscoveredTopics { @@ -239,6 +228,7 @@ func (k *KafkaConsumer) changedTopics() (bool, error) { for _, r := range k.regexps { if r.MatchString(t) { wantedTopicSet[t] = true + k.Log.Debugf("adding regexp-matched topic %q", t) break } } @@ -249,17 +239,14 @@ func (k *KafkaConsumer) changedTopics() (bool, error) { } sort.Strings(topicList) fingerprint := strings.Join(topicList, ";") - k.Log.Debugf("Regular expression list %q matched %d topics", k.TopicRegexps, len(topicList)) if fingerprint != k.fingerprint { k.Log.Infof("updating topics: replacing %q with %q", k.allWantedTopics, topicList) - k.topicLock.Lock() - k.fingerprint = fingerprint - k.allWantedTopics = topicList - k.topicLock.Unlock() - return true, nil } - k.Log.Debugf("topic list unchanged on refresh") - return false, nil + k.topicLock.Lock() + k.fingerprint = fingerprint + k.allWantedTopics = topicList + k.topicLock.Unlock() + return nil } func (k *KafkaConsumer) create() error { @@ -283,139 +270,14 @@ func (k *KafkaConsumer) startErrorAdder(acc telegraf.Accumulator) { }() } -func (k *KafkaConsumer) getNewHandler(acc telegraf.Accumulator) *ConsumerGroupHandler { - handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log) - handler.MaxMessageLen = k.MaxMessageLen - handler.TopicTag = k.TopicTag - return handler -} - -func (k *KafkaConsumer) handleTicker(acc telegraf.Accumulator) { - dstr := time.Duration(k.TopicRefreshInterval).String() - k.Log.Infof("starting refresh ticker: scanning topics every %s", dstr) - for { - <-k.ticker.C - k.Log.Debugf("received topic refresh request (every %s)", dstr) - changed, err := k.changedTopics() - if err != nil { - acc.AddError(err) - return - } - if changed { - err = k.restartConsumer(acc) - if err != nil { - acc.AddError(err) - return - } - } - } -} - -func (k *KafkaConsumer) consumeTopics(ctx context.Context, acc telegraf.Accumulator) { - k.wg.Add(1) - defer k.wg.Done() - go func() { - for ctx.Err() == nil { - handler := k.getNewHandler(acc) - // We need to copy allWantedTopics; the Consume() is - // long-running and we can easily deadlock if our - // topic-update-checker fires. - k.topicLock.Lock() - topics := make([]string, len(k.allWantedTopics)) - copy(topics, k.allWantedTopics) - k.topicLock.Unlock() - err := k.consumer.Consume(ctx, topics, handler) - if err != nil { - acc.AddError(fmt.Errorf("consume: %w", err)) - internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway - } - } - err := k.consumer.Close() - if err != nil { - acc.AddError(fmt.Errorf("close: %w", err)) - } - }() -} - -func (k *KafkaConsumer) restartConsumer(acc telegraf.Accumulator) error { - // This is tricky. As soon as we call k.cancel() the old - // consumer group is no longer usable, so we need to get - // a new group and context ready and then pull a switcheroo - // quickly. - // - // Up to 100Hz, at least, we do not lose messages on consumer group - // restart. Since the group name is the same, it seems very likely - // that we just pick up at the offset we left off at: that is the - // producer does not see this as a new consumer, but as an old one - // that's just missed a couple beats. - if k.consumer == nil { - // Fast exit if the consumer isn't running - return nil - } - k.Log.Info("restarting consumer group") - k.Log.Debug("creating new consumer group") - newConsumer, err := k.ConsumerCreator.Create( - k.Brokers, - k.ConsumerGroup, - k.config, - ) - if err != nil { - acc.AddError(err) - return err - } - k.Log.Debug("acquiring new context before swapping consumer groups") - ctx, cancel := context.WithCancel(context.Background()) - // I am not sure we really need this lock, but if it hurts we're - // already refreshing way too frequently. - k.groupLock.Lock() - // Do the switcheroo. - k.Log.Debug("replacing consumer group") - oldConsumer := k.consumer - k.consumer = newConsumer - k.cancel = cancel - // Do this in the background - go func() { - k.Log.Debug("closing old consumer group") - err = oldConsumer.Close() - if err != nil { - acc.AddError(err) - return - } - }() - k.groupLock.Unlock() - k.Log.Debug("starting new consumer group") - k.consumeTopics(ctx, acc) - k.Log.Info("restarted consumer group") - return nil -} - func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { var err error - k.Log.Debugf("TopicRegexps: %v", k.TopicRegexps) - k.Log.Debugf("TopicRefreshInterval: %v", k.TopicRefreshInterval) - // If TopicRegexps is set, add matches to Topics if len(k.TopicRegexps) > 0 { - if _, err = k.changedTopics(); err != nil { - // We're starting, so we expect the list to change; - // all we care about is whether we got an error - // acquiring our topics. + if err := k.refreshTopics(); err != nil { return err } - // If refresh interval is specified, start a goroutine - // to refresh topics periodically. This only makes sense if - // TopicRegexps is set. - if k.TopicRefreshInterval > 0 { - k.ticker = time.NewTicker(time.Duration(k.TopicRefreshInterval)) - // Note that there's no waitgroup here. That's because - // handleTicker does care whether topics have changed, - // and if they have, it will invoke restartConsumer(), - // which tells the current consumer to stop. That stop - // cancels goroutines in the waitgroup, and therefore - // the restart goroutine must not be in the waitgroup. - go k.handleTicker(acc) - } } ctx, cancel := context.WithCancel(context.Background()) @@ -429,16 +291,44 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { k.startErrorAdder(acc) } - if k.consumer == nil { - err = k.create() - if err != nil { - acc.AddError(fmt.Errorf("create consumer async: %w", err)) - return err - } - } + // Start consumer goroutine + k.wg.Add(1) + go func() { + var err error + defer k.wg.Done() - k.startErrorAdder(acc) - k.consumeTopics(ctx, acc) // Starts goroutine internally + if k.consumer == nil { + err = k.create() + if err != nil { + acc.AddError(fmt.Errorf("create consumer async: %w", err)) + return + } + } + + k.startErrorAdder(acc) + + for ctx.Err() == nil { + handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log) + handler.MaxMessageLen = k.MaxMessageLen + handler.TopicTag = k.TopicTag + // We need to copy allWantedTopics; the Consume() is + // long-running and we can easily deadlock if our + // topic-update-checker fires. + topics := make([]string, len(k.allWantedTopics)) + k.topicLock.Lock() + copy(topics, k.allWantedTopics) + k.topicLock.Unlock() + err := k.consumer.Consume(ctx, topics, handler) + if err != nil { + acc.AddError(fmt.Errorf("consume: %w", err)) + internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway + } + } + err = k.consumer.Close() + if err != nil { + acc.AddError(fmt.Errorf("close: %w", err)) + } + }() return nil } @@ -496,7 +386,7 @@ type ConsumerGroupHandler struct { log telegraf.Logger } -// Setup is called once when a new session is opened. It sets up the handler +// Setup is called once when a new session is opened. It setups up the handler // and begins processing delivered messages. func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { h.undelivered = make(map[telegraf.TrackingID]Message) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index d0cb235a7..f7b82c76d 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -476,14 +476,15 @@ func TestKafkaRoundTripIntegration(t *testing.T) { } var tests = []struct { - name string - connectionStrategy string - topics []string - topicRegexps []string + name string + connectionStrategy string + topics []string + topicRegexps []string + topicRefreshInterval config.Duration }{ - {"connection strategy startup", "startup", []string{"Test"}, nil}, - {"connection strategy defer", "defer", []string{"Test"}, nil}, - {"topic regexp", "startup", nil, []string{"Test*"}}, + {"connection strategy startup", "startup", []string{"Test"}, nil, config.Duration(0)}, + {"connection strategy defer", "defer", []string{"Test"}, nil, config.Duration(0)}, + {"topic regexp", "startup", nil, []string{"T*"}, config.Duration(5 * time.Second)}, } for _, tt := range tests { @@ -589,161 +590,6 @@ func TestKafkaRoundTripIntegration(t *testing.T) { } } -func TestDynamicTopicRefresh(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - var tests = []struct { - name string - connectionStrategy string - topics []string - topicRegexps []string - topicRefreshInterval config.Duration - }{ - // 3-second refresh interval - {"topic regexp refresh", "startup", nil, []string{"Test*"}, config.Duration(3 * time.Second)}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - t.Logf("rt: starting network") - ctx := context.Background() - networkName := "telegraf-test-kafka-consumer-network" - network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ - NetworkRequest: testcontainers.NetworkRequest{ - Name: networkName, - Attachable: true, - CheckDuplicate: true, - }, - }) - require.NoError(t, err) - defer func() { - require.NoError(t, network.Remove(ctx), "terminating network failed") - }() - - t.Logf("rt: starting zookeeper") - zookeeperName := "telegraf-test-kafka-consumer-zookeeper" - zookeeper := testutil.Container{ - Image: "wurstmeister/zookeeper", - ExposedPorts: []string{"2181:2181"}, - Networks: []string{networkName}, - WaitingFor: wait.ForLog("binding to port"), - Name: zookeeperName, - } - require.NoError(t, zookeeper.Start(), "failed to start container") - defer zookeeper.Terminate() - - t.Logf("rt: starting broker") - container := testutil.Container{ - Name: "telegraf-test-kafka-consumer", - Image: "wurstmeister/kafka", - ExposedPorts: []string{"9092:9092"}, - Env: map[string]string{ - "KAFKA_ADVERTISED_HOST_NAME": "localhost", - "KAFKA_ADVERTISED_PORT": "9092", - "KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]), - "KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", "Test"), - }, - Networks: []string{networkName}, - WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"), - } - require.NoError(t, container.Start(), "failed to start container") - defer container.Terminate() - - brokers := []string{ - fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]), - } - - // Make kafka output - t.Logf("rt: starting output plugin") - creator := outputs.Outputs["kafka"] - output, ok := creator().(*kafkaOutput.Kafka) - require.True(t, ok) - - s := &influxSerializer.Serializer{} - require.NoError(t, s.Init()) - output.SetSerializer(s) - output.Brokers = brokers - output.Topic = "TestDynamic" - output.Log = testutil.Logger{} - - require.NoError(t, output.Init()) - require.NoError(t, output.Connect()) - - // Make kafka input - t.Logf("rt: starting input plugin") - // If MaxUndeliveredMessages is 1 here (as it is - // for the other end-to-end tests) the test fails - // more often than not (but not always). We suspect - // this is something to do with internal messages - // about new topics and rebalancing getting in the - // way. At any rate, at 1000 (the default value), - // the tests pass reliably. - input := KafkaConsumer{ - Brokers: brokers, - Log: testutil.Logger{}, - Topics: tt.topics, - TopicRegexps: tt.topicRegexps, - TopicRefreshInterval: tt.topicRefreshInterval, - MaxUndeliveredMessages: 1000, // Default - ConnectionStrategy: tt.connectionStrategy, - } - parser := &influx.Parser{} - require.NoError(t, parser.Init()) - input.SetParser(parser) - require.NoError(t, input.Init()) - - acc := testutil.Accumulator{} - require.NoError(t, input.Start(&acc)) - t.Logf("rt: input plugin started") - - // First we need an AdminClient, so that we can add - // a topic list. - - newCfg := sarama.NewConfig() // Defaults are OK. - t.Logf("rt: creating new Kafkfa ClusterAdmin") - admin, err := sarama.NewClusterAdmin(brokers, newCfg) - if err != nil { - require.Error(t, err) - return - } - newTopic := "TestDynamic" - t.Logf("rt: creating new Kafkfa topic %s", newTopic) - detail := sarama.TopicDetail{ - NumPartitions: 1, - ReplicationFactor: 1, - } - // Add the topic - err = admin.CreateTopic(newTopic, &detail, false) - if err != nil { - require.Error(t, err) - return - } - // Shove some metrics through - expected := testutil.MockMetrics() - t.Logf("rt: writing %v to %s", expected, output.Topic) - require.NoError(t, output.Write(expected)) - - // Check that they were received - t.Logf("rt: expecting") - // This usually hangs and we never read. - // Sometimes, though, we do read the expected data. - // Why? - acc.Wait(len(expected)) - t.Logf("rt: received %d", len(expected)) - q := acc.GetTelegrafMetrics() - t.Logf("rt: received metrics %v", q) - testutil.RequireMetricsEqual(t, expected, q) - - t.Logf("rt: shutdown") - require.NoError(t, output.Close()) - input.Stop() - - t.Logf("rt: done") - }) - } -} - func TestExponentialBackoff(t *testing.T) { var err error diff --git a/plugins/inputs/kafka_consumer/sample.conf b/plugins/inputs/kafka_consumer/sample.conf index 7e8cc511c..c618d1610 100644 --- a/plugins/inputs/kafka_consumer/sample.conf +++ b/plugins/inputs/kafka_consumer/sample.conf @@ -10,12 +10,6 @@ ## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ] # topic_regexps = [ ] - ## Topic regexp refresh interval. If enabled, and if regular expressions - ## are enabled, available topics will be rescanned at this interval to - ## determine whether new ones are present. - ## Exmaple: topic_refresh_interval = "5m" - # topic_refresh_interval = "" - ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = ""