diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 0198d2b97..7458fb4fe 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -50,6 +50,12 @@ to use them. ## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ] # topic_regexps = [ ] + ## Topic regexp refresh interval. If enabled, and if regular expressions + ## are enabled, available topics will be rescanned at this interval to + ## determine whether new ones are present. + ## Exmaple: topic_refresh_interval = "5m" + # topic_refresh_interval = "" + ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = "" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 2d1d59013..9b56e8bf6 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -43,6 +43,7 @@ type KafkaConsumer struct { BalanceStrategy string `toml:"balance_strategy"` Topics []string `toml:"topics"` TopicRegexps []string `toml:"topic_regexps"` + TopicRefreshInterval config.Duration `toml:"topic_refresh_interval"` TopicTag string `toml:"topic_tag"` ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` ConnectionStrategy string `toml:"connection_strategy"` @@ -65,6 +66,7 @@ type KafkaConsumer struct { parser telegraf.Parser topicLock sync.Mutex + groupLock sync.Mutex wg sync.WaitGroup cancel context.CancelFunc } @@ -167,14 +169,14 @@ func (k *KafkaConsumer) Init() error { } k.topicClient = client } - return nil } func (k *KafkaConsumer) compileTopicRegexps() error { // While we can add new topics matching extant regexps, we can't // update that list on the fly. We compile them once at startup. - // Changing them is a configuration change and requires a restart. + // Changing them is a configuration change and requires us to cancel + // and relaunch our ConsumerGroup. k.regexps = make([]regexp.Regexp, 0, len(k.TopicRegexps)) for _, r := range k.TopicRegexps { @@ -187,22 +189,32 @@ func (k *KafkaConsumer) compileTopicRegexps() error { return nil } -func (k *KafkaConsumer) refreshTopics() error { - // We have instantiated a new generic Kafka client, so we can ask +func (k *KafkaConsumer) changedTopics() (bool, error) { + // We have instantiated a generic Kafka client, so we can ask // it for all the topics it knows about. Then we build // regexps from our strings, loop over those, loop over the // topics, and if we find a match, add that topic to // out topic set, which then we turn back into a list at the end. + // + // If our topics changed, we return true, to indicate that the + // consumer will need a restart in order to pick up the new + // topics. if len(k.regexps) == 0 { - return nil + return false, nil } + // Refresh metadata for all topics. + err := k.topicClient.RefreshMetadata() + if err != nil { + return false, err + } allDiscoveredTopics, err := k.topicClient.Topics() if err != nil { - return err + return false, err } - k.Log.Debugf("discovered topics: %v", allDiscoveredTopics) + sort.Strings(allDiscoveredTopics) + k.Log.Debugf("discovered %d topics in total", len(allDiscoveredTopics)) extantTopicSet := make(map[string]bool, len(allDiscoveredTopics)) for _, t := range allDiscoveredTopics { @@ -220,7 +232,6 @@ func (k *KafkaConsumer) refreshTopics() error { wantedTopicSet := make(map[string]bool, len(allDiscoveredTopics)) for _, t := range k.Topics { // Get our pre-specified topics - k.Log.Debugf("adding literally-specified topic %s", t) wantedTopicSet[t] = true } for _, t := range allDiscoveredTopics { @@ -228,7 +239,6 @@ func (k *KafkaConsumer) refreshTopics() error { for _, r := range k.regexps { if r.MatchString(t) { wantedTopicSet[t] = true - k.Log.Debugf("adding regexp-matched topic %q", t) break } } @@ -239,14 +249,17 @@ func (k *KafkaConsumer) refreshTopics() error { } sort.Strings(topicList) fingerprint := strings.Join(topicList, ";") + k.Log.Debugf("Regular expression list %q matched %d topics", k.TopicRegexps, len(topicList)) if fingerprint != k.fingerprint { k.Log.Infof("updating topics: replacing %q with %q", k.allWantedTopics, topicList) + k.topicLock.Lock() + k.fingerprint = fingerprint + k.allWantedTopics = topicList + k.topicLock.Unlock() + return true, nil } - k.topicLock.Lock() - k.fingerprint = fingerprint - k.allWantedTopics = topicList - k.topicLock.Unlock() - return nil + k.Log.Debugf("topic list unchanged on refresh") + return false, nil } func (k *KafkaConsumer) create() error { @@ -270,14 +283,139 @@ func (k *KafkaConsumer) startErrorAdder(acc telegraf.Accumulator) { }() } +func (k *KafkaConsumer) getNewHandler(acc telegraf.Accumulator) *ConsumerGroupHandler { + handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log) + handler.MaxMessageLen = k.MaxMessageLen + handler.TopicTag = k.TopicTag + return handler +} + +func (k *KafkaConsumer) handleTicker(acc telegraf.Accumulator) { + dstr := time.Duration(k.TopicRefreshInterval).String() + k.Log.Infof("starting refresh ticker: scanning topics every %s", dstr) + for { + <-k.ticker.C + k.Log.Debugf("received topic refresh request (every %s)", dstr) + changed, err := k.changedTopics() + if err != nil { + acc.AddError(err) + return + } + if changed { + err = k.restartConsumer(acc) + if err != nil { + acc.AddError(err) + return + } + } + } +} + +func (k *KafkaConsumer) consumeTopics(ctx context.Context, acc telegraf.Accumulator) { + k.wg.Add(1) + defer k.wg.Done() + go func() { + for ctx.Err() == nil { + handler := k.getNewHandler(acc) + // We need to copy allWantedTopics; the Consume() is + // long-running and we can easily deadlock if our + // topic-update-checker fires. + k.topicLock.Lock() + topics := make([]string, len(k.allWantedTopics)) + copy(topics, k.allWantedTopics) + k.topicLock.Unlock() + err := k.consumer.Consume(ctx, topics, handler) + if err != nil { + acc.AddError(fmt.Errorf("consume: %w", err)) + internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway + } + } + err := k.consumer.Close() + if err != nil { + acc.AddError(fmt.Errorf("close: %w", err)) + } + }() +} + +func (k *KafkaConsumer) restartConsumer(acc telegraf.Accumulator) error { + // This is tricky. As soon as we call k.cancel() the old + // consumer group is no longer usable, so we need to get + // a new group and context ready and then pull a switcheroo + // quickly. + // + // Up to 100Hz, at least, we do not lose messages on consumer group + // restart. Since the group name is the same, it seems very likely + // that we just pick up at the offset we left off at: that is the + // producer does not see this as a new consumer, but as an old one + // that's just missed a couple beats. + if k.consumer == nil { + // Fast exit if the consumer isn't running + return nil + } + k.Log.Info("restarting consumer group") + k.Log.Debug("creating new consumer group") + newConsumer, err := k.ConsumerCreator.Create( + k.Brokers, + k.ConsumerGroup, + k.config, + ) + if err != nil { + acc.AddError(err) + return err + } + k.Log.Debug("acquiring new context before swapping consumer groups") + ctx, cancel := context.WithCancel(context.Background()) + // I am not sure we really need this lock, but if it hurts we're + // already refreshing way too frequently. + k.groupLock.Lock() + // Do the switcheroo. + k.Log.Debug("replacing consumer group") + oldConsumer := k.consumer + k.consumer = newConsumer + k.cancel = cancel + // Do this in the background + go func() { + k.Log.Debug("closing old consumer group") + err = oldConsumer.Close() + if err != nil { + acc.AddError(err) + return + } + }() + k.groupLock.Unlock() + k.Log.Debug("starting new consumer group") + k.consumeTopics(ctx, acc) + k.Log.Info("restarted consumer group") + return nil +} + func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { var err error + k.Log.Debugf("TopicRegexps: %v", k.TopicRegexps) + k.Log.Debugf("TopicRefreshInterval: %v", k.TopicRefreshInterval) + // If TopicRegexps is set, add matches to Topics if len(k.TopicRegexps) > 0 { - if err := k.refreshTopics(); err != nil { + if _, err = k.changedTopics(); err != nil { + // We're starting, so we expect the list to change; + // all we care about is whether we got an error + // acquiring our topics. return err } + // If refresh interval is specified, start a goroutine + // to refresh topics periodically. This only makes sense if + // TopicRegexps is set. + if k.TopicRefreshInterval > 0 { + k.ticker = time.NewTicker(time.Duration(k.TopicRefreshInterval)) + // Note that there's no waitgroup here. That's because + // handleTicker does care whether topics have changed, + // and if they have, it will invoke restartConsumer(), + // which tells the current consumer to stop. That stop + // cancels goroutines in the waitgroup, and therefore + // the restart goroutine must not be in the waitgroup. + go k.handleTicker(acc) + } } ctx, cancel := context.WithCancel(context.Background()) @@ -291,44 +429,16 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { k.startErrorAdder(acc) } - // Start consumer goroutine - k.wg.Add(1) - go func() { - var err error - defer k.wg.Done() - - if k.consumer == nil { - err = k.create() - if err != nil { - acc.AddError(fmt.Errorf("create consumer async: %w", err)) - return - } - } - - k.startErrorAdder(acc) - - for ctx.Err() == nil { - handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log) - handler.MaxMessageLen = k.MaxMessageLen - handler.TopicTag = k.TopicTag - // We need to copy allWantedTopics; the Consume() is - // long-running and we can easily deadlock if our - // topic-update-checker fires. - topics := make([]string, len(k.allWantedTopics)) - k.topicLock.Lock() - copy(topics, k.allWantedTopics) - k.topicLock.Unlock() - err := k.consumer.Consume(ctx, topics, handler) - if err != nil { - acc.AddError(fmt.Errorf("consume: %w", err)) - internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway - } - } - err = k.consumer.Close() + if k.consumer == nil { + err = k.create() if err != nil { - acc.AddError(fmt.Errorf("close: %w", err)) + acc.AddError(fmt.Errorf("create consumer async: %w", err)) + return err } - }() + } + + k.startErrorAdder(acc) + k.consumeTopics(ctx, acc) // Starts goroutine internally return nil } @@ -386,7 +496,7 @@ type ConsumerGroupHandler struct { log telegraf.Logger } -// Setup is called once when a new session is opened. It setups up the handler +// Setup is called once when a new session is opened. It sets up the handler // and begins processing delivered messages. func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { h.undelivered = make(map[telegraf.TrackingID]Message) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index f7b82c76d..d0cb235a7 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -476,15 +476,14 @@ func TestKafkaRoundTripIntegration(t *testing.T) { } var tests = []struct { - name string - connectionStrategy string - topics []string - topicRegexps []string - topicRefreshInterval config.Duration + name string + connectionStrategy string + topics []string + topicRegexps []string }{ - {"connection strategy startup", "startup", []string{"Test"}, nil, config.Duration(0)}, - {"connection strategy defer", "defer", []string{"Test"}, nil, config.Duration(0)}, - {"topic regexp", "startup", nil, []string{"T*"}, config.Duration(5 * time.Second)}, + {"connection strategy startup", "startup", []string{"Test"}, nil}, + {"connection strategy defer", "defer", []string{"Test"}, nil}, + {"topic regexp", "startup", nil, []string{"Test*"}}, } for _, tt := range tests { @@ -590,6 +589,161 @@ func TestKafkaRoundTripIntegration(t *testing.T) { } } +func TestDynamicTopicRefresh(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + var tests = []struct { + name string + connectionStrategy string + topics []string + topicRegexps []string + topicRefreshInterval config.Duration + }{ + // 3-second refresh interval + {"topic regexp refresh", "startup", nil, []string{"Test*"}, config.Duration(3 * time.Second)}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Logf("rt: starting network") + ctx := context.Background() + networkName := "telegraf-test-kafka-consumer-network" + network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ + NetworkRequest: testcontainers.NetworkRequest{ + Name: networkName, + Attachable: true, + CheckDuplicate: true, + }, + }) + require.NoError(t, err) + defer func() { + require.NoError(t, network.Remove(ctx), "terminating network failed") + }() + + t.Logf("rt: starting zookeeper") + zookeeperName := "telegraf-test-kafka-consumer-zookeeper" + zookeeper := testutil.Container{ + Image: "wurstmeister/zookeeper", + ExposedPorts: []string{"2181:2181"}, + Networks: []string{networkName}, + WaitingFor: wait.ForLog("binding to port"), + Name: zookeeperName, + } + require.NoError(t, zookeeper.Start(), "failed to start container") + defer zookeeper.Terminate() + + t.Logf("rt: starting broker") + container := testutil.Container{ + Name: "telegraf-test-kafka-consumer", + Image: "wurstmeister/kafka", + ExposedPorts: []string{"9092:9092"}, + Env: map[string]string{ + "KAFKA_ADVERTISED_HOST_NAME": "localhost", + "KAFKA_ADVERTISED_PORT": "9092", + "KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]), + "KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", "Test"), + }, + Networks: []string{networkName}, + WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + brokers := []string{ + fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]), + } + + // Make kafka output + t.Logf("rt: starting output plugin") + creator := outputs.Outputs["kafka"] + output, ok := creator().(*kafkaOutput.Kafka) + require.True(t, ok) + + s := &influxSerializer.Serializer{} + require.NoError(t, s.Init()) + output.SetSerializer(s) + output.Brokers = brokers + output.Topic = "TestDynamic" + output.Log = testutil.Logger{} + + require.NoError(t, output.Init()) + require.NoError(t, output.Connect()) + + // Make kafka input + t.Logf("rt: starting input plugin") + // If MaxUndeliveredMessages is 1 here (as it is + // for the other end-to-end tests) the test fails + // more often than not (but not always). We suspect + // this is something to do with internal messages + // about new topics and rebalancing getting in the + // way. At any rate, at 1000 (the default value), + // the tests pass reliably. + input := KafkaConsumer{ + Brokers: brokers, + Log: testutil.Logger{}, + Topics: tt.topics, + TopicRegexps: tt.topicRegexps, + TopicRefreshInterval: tt.topicRefreshInterval, + MaxUndeliveredMessages: 1000, // Default + ConnectionStrategy: tt.connectionStrategy, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + input.SetParser(parser) + require.NoError(t, input.Init()) + + acc := testutil.Accumulator{} + require.NoError(t, input.Start(&acc)) + t.Logf("rt: input plugin started") + + // First we need an AdminClient, so that we can add + // a topic list. + + newCfg := sarama.NewConfig() // Defaults are OK. + t.Logf("rt: creating new Kafkfa ClusterAdmin") + admin, err := sarama.NewClusterAdmin(brokers, newCfg) + if err != nil { + require.Error(t, err) + return + } + newTopic := "TestDynamic" + t.Logf("rt: creating new Kafkfa topic %s", newTopic) + detail := sarama.TopicDetail{ + NumPartitions: 1, + ReplicationFactor: 1, + } + // Add the topic + err = admin.CreateTopic(newTopic, &detail, false) + if err != nil { + require.Error(t, err) + return + } + // Shove some metrics through + expected := testutil.MockMetrics() + t.Logf("rt: writing %v to %s", expected, output.Topic) + require.NoError(t, output.Write(expected)) + + // Check that they were received + t.Logf("rt: expecting") + // This usually hangs and we never read. + // Sometimes, though, we do read the expected data. + // Why? + acc.Wait(len(expected)) + t.Logf("rt: received %d", len(expected)) + q := acc.GetTelegrafMetrics() + t.Logf("rt: received metrics %v", q) + testutil.RequireMetricsEqual(t, expected, q) + + t.Logf("rt: shutdown") + require.NoError(t, output.Close()) + input.Stop() + + t.Logf("rt: done") + }) + } +} + func TestExponentialBackoff(t *testing.T) { var err error diff --git a/plugins/inputs/kafka_consumer/sample.conf b/plugins/inputs/kafka_consumer/sample.conf index c618d1610..7e8cc511c 100644 --- a/plugins/inputs/kafka_consumer/sample.conf +++ b/plugins/inputs/kafka_consumer/sample.conf @@ -10,6 +10,12 @@ ## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ] # topic_regexps = [ ] + ## Topic regexp refresh interval. If enabled, and if regular expressions + ## are enabled, available topics will be rescanned at this interval to + ## determine whether new ones are present. + ## Exmaple: topic_refresh_interval = "5m" + # topic_refresh_interval = "" + ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = ""