diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 8928aec1e..0198d2b97 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -46,6 +46,10 @@ to use them. ## Topics to consume. topics = ["telegraf"] + ## Topic regular expressions to consume. Matches will be added to topics. + ## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ] + # topic_regexps = [ ] + ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = "" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 02f06d18e..b9195d673 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -5,6 +5,8 @@ import ( "context" _ "embed" "fmt" + "regexp" + "sort" "strings" "sync" "time" @@ -41,6 +43,7 @@ type KafkaConsumer struct { Offset string `toml:"offset"` BalanceStrategy string `toml:"balance_strategy"` Topics []string `toml:"topics"` + TopicRegexps []string `toml:"topic_regexps"` TopicTag string `toml:"topic_tag"` ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` ConnectionStrategy string `toml:"connection_strategy"` @@ -55,9 +58,16 @@ type KafkaConsumer struct { consumer ConsumerGroup config *sarama.Config - parser parsers.Parser - wg sync.WaitGroup - cancel context.CancelFunc + topicClient sarama.Client + regexps []regexp.Regexp + allWantedTopics []string + ticker *time.Ticker + fingerprint string + + parser parsers.Parser + topicLock sync.Mutex + wg sync.WaitGroup + cancel context.CancelFunc } type ConsumerGroup interface { @@ -143,6 +153,100 @@ func (k *KafkaConsumer) Init() error { } k.config = cfg + + if len(k.TopicRegexps) == 0 { + k.allWantedTopics = k.Topics + } else { + if err := k.compileTopicRegexps(); err != nil { + return err + } + // We have regexps, so we're going to need a client to ask + // the broker for topics + client, err := sarama.NewClient(k.Brokers, k.config) + if err != nil { + return err + } + k.topicClient = client + } + + return nil +} + +func (k *KafkaConsumer) compileTopicRegexps() error { + // While we can add new topics matching extant regexps, we can't + // update that list on the fly. We compile them once at startup. + // Changing them is a configuration change and requires a restart. + + k.regexps = make([]regexp.Regexp, 0, len(k.TopicRegexps)) + for _, r := range k.TopicRegexps { + re, err := regexp.Compile(r) + if err != nil { + return fmt.Errorf("regular expression %q did not compile: '%w", r, err) + } + k.regexps = append(k.regexps, *re) + } + return nil +} + +func (k *KafkaConsumer) refreshTopics() error { + // We have instantiated a new generic Kafka client, so we can ask + // it for all the topics it knows about. Then we build + // regexps from our strings, loop over those, loop over the + // topics, and if we find a match, add that topic to + // out topic set, which then we turn back into a list at the end. + + if len(k.regexps) == 0 { + return nil + } + + allDiscoveredTopics, err := k.topicClient.Topics() + if err != nil { + return err + } + k.Log.Debugf("discovered topics: %v", allDiscoveredTopics) + + extantTopicSet := make(map[string]bool, len(allDiscoveredTopics)) + for _, t := range allDiscoveredTopics { + extantTopicSet[t] = true + } + // Even if a topic specified by a literal string (that is, k.Topics) + // does not appear in the topic list, we want to keep it around, in + // case it pops back up--it is not guaranteed to be matched by any + // of our regular expressions. Therefore, we pretend that it's in + // extantTopicSet, even if it isn't. + // + // Assuming that literally-specified topics are usually in the topics + // present on the broker, this should not need a resizing (although if + // you have many topics that you don't care about, it will be too big) + wantedTopicSet := make(map[string]bool, len(allDiscoveredTopics)) + for _, t := range k.Topics { + // Get our pre-specified topics + k.Log.Debugf("adding literally-specified topic %s", t) + wantedTopicSet[t] = true + } + for _, t := range allDiscoveredTopics { + // Add topics that match regexps + for _, r := range k.regexps { + if r.MatchString(t) { + wantedTopicSet[t] = true + k.Log.Debugf("adding regexp-matched topic %q", t) + break + } + } + } + topicList := make([]string, 0, len(wantedTopicSet)) + for t := range wantedTopicSet { + topicList = append(topicList, t) + } + sort.Strings(topicList) + fingerprint := strings.Join(topicList, ";") + if fingerprint != k.fingerprint { + k.Log.Infof("updating topics: replacing %q with %q", k.allWantedTopics, topicList) + } + k.topicLock.Lock() + k.fingerprint = fingerprint + k.allWantedTopics = topicList + k.topicLock.Unlock() return nil } @@ -170,6 +274,13 @@ func (k *KafkaConsumer) startErrorAdder(acc telegraf.Accumulator) { func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { var err error + // If TopicRegexps is set, add matches to Topics + if len(k.TopicRegexps) > 0 { + if err := k.refreshTopics(); err != nil { + return err + } + } + ctx, cancel := context.WithCancel(context.Background()) k.cancel = cancel @@ -201,7 +312,14 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log) handler.MaxMessageLen = k.MaxMessageLen handler.TopicTag = k.TopicTag - err := k.consumer.Consume(ctx, k.Topics, handler) + // We need to copy allWantedTopics; the Consume() is + // long-running and we can easily deadlock if our + // topic-update-checker fires. + topics := make([]string, len(k.allWantedTopics)) + k.topicLock.Lock() + copy(topics, k.allWantedTopics) + k.topicLock.Unlock() + err := k.consumer.Consume(ctx, topics, handler) if err != nil { acc.AddError(fmt.Errorf("consume: %w", err)) internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway @@ -221,6 +339,15 @@ func (k *KafkaConsumer) Gather(_ telegraf.Accumulator) error { } func (k *KafkaConsumer) Stop() { + if k.ticker != nil { + k.ticker.Stop() + } + // Lock so that a topic refresh cannot start while we are stopping. + k.topicLock.Lock() + defer k.topicLock.Unlock() + if k.topicClient != nil { + k.topicClient.Close() + } k.cancel() k.wg.Wait() } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 31b9ceab1..f7b82c76d 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -476,11 +476,15 @@ func TestKafkaRoundTripIntegration(t *testing.T) { } var tests = []struct { - name string - connectionStrategy string + name string + connectionStrategy string + topics []string + topicRegexps []string + topicRefreshInterval config.Duration }{ - {"connection strategy startup", "startup"}, - {"connection strategy defer", "defer"}, + {"connection strategy startup", "startup", []string{"Test"}, nil, config.Duration(0)}, + {"connection strategy defer", "defer", []string{"Test"}, nil, config.Duration(0)}, + {"topic regexp", "startup", nil, []string{"T*"}, config.Duration(5 * time.Second)}, } for _, tt := range tests { @@ -513,7 +517,6 @@ func TestKafkaRoundTripIntegration(t *testing.T) { defer zookeeper.Terminate() t.Logf("rt: starting broker") - topic := "Test" container := testutil.Container{ Name: "telegraf-test-kafka-consumer", Image: "wurstmeister/kafka", @@ -522,7 +525,7 @@ func TestKafkaRoundTripIntegration(t *testing.T) { "KAFKA_ADVERTISED_HOST_NAME": "localhost", "KAFKA_ADVERTISED_PORT": "9092", "KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]), - "KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", topic), + "KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", "Test"), }, Networks: []string{networkName}, WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"), @@ -544,7 +547,7 @@ func TestKafkaRoundTripIntegration(t *testing.T) { require.NoError(t, s.Init()) output.SetSerializer(s) output.Brokers = brokers - output.Topic = topic + output.Topic = "Test" output.Log = testutil.Logger{} require.NoError(t, output.Init()) @@ -555,7 +558,8 @@ func TestKafkaRoundTripIntegration(t *testing.T) { input := KafkaConsumer{ Brokers: brokers, Log: testutil.Logger{}, - Topics: []string{topic}, + Topics: tt.topics, + TopicRegexps: tt.topicRegexps, MaxUndeliveredMessages: 1, ConnectionStrategy: tt.connectionStrategy, } diff --git a/plugins/inputs/kafka_consumer/sample.conf b/plugins/inputs/kafka_consumer/sample.conf index 059de14c3..c618d1610 100644 --- a/plugins/inputs/kafka_consumer/sample.conf +++ b/plugins/inputs/kafka_consumer/sample.conf @@ -6,6 +6,10 @@ ## Topics to consume. topics = ["telegraf"] + ## Topic regular expressions to consume. Matches will be added to topics. + ## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ] + # topic_regexps = [ ] + ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = ""