Revert "feat(inputs.kafka_consumer): Refresh regexp topics periodically (#13410)" (#13617)

This reverts commit a13f3463eb.
This commit is contained in:
Joshua Powers 2023-07-14 08:06:45 -06:00 committed by GitHub
parent 714f3c187f
commit 9f83bee8ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 61 additions and 337 deletions

View File

@ -50,12 +50,6 @@ to use them.
## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ] ## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
# topic_regexps = [ ] # topic_regexps = [ ]
## Topic regexp refresh interval. If enabled, and if regular expressions
## are enabled, available topics will be rescanned at this interval to
## determine whether new ones are present.
## Exmaple: topic_refresh_interval = "5m"
# topic_refresh_interval = ""
## When set this tag will be added to all metrics with the topic as the value. ## When set this tag will be added to all metrics with the topic as the value.
# topic_tag = "" # topic_tag = ""

View File

@ -43,7 +43,6 @@ type KafkaConsumer struct {
BalanceStrategy string `toml:"balance_strategy"` BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"` Topics []string `toml:"topics"`
TopicRegexps []string `toml:"topic_regexps"` TopicRegexps []string `toml:"topic_regexps"`
TopicRefreshInterval config.Duration `toml:"topic_refresh_interval"`
TopicTag string `toml:"topic_tag"` TopicTag string `toml:"topic_tag"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"` ConnectionStrategy string `toml:"connection_strategy"`
@ -66,7 +65,6 @@ type KafkaConsumer struct {
parser telegraf.Parser parser telegraf.Parser
topicLock sync.Mutex topicLock sync.Mutex
groupLock sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
cancel context.CancelFunc cancel context.CancelFunc
} }
@ -169,14 +167,14 @@ func (k *KafkaConsumer) Init() error {
} }
k.topicClient = client k.topicClient = client
} }
return nil return nil
} }
func (k *KafkaConsumer) compileTopicRegexps() error { func (k *KafkaConsumer) compileTopicRegexps() error {
// While we can add new topics matching extant regexps, we can't // While we can add new topics matching extant regexps, we can't
// update that list on the fly. We compile them once at startup. // update that list on the fly. We compile them once at startup.
// Changing them is a configuration change and requires us to cancel // Changing them is a configuration change and requires a restart.
// and relaunch our ConsumerGroup.
k.regexps = make([]regexp.Regexp, 0, len(k.TopicRegexps)) k.regexps = make([]regexp.Regexp, 0, len(k.TopicRegexps))
for _, r := range k.TopicRegexps { for _, r := range k.TopicRegexps {
@ -189,32 +187,22 @@ func (k *KafkaConsumer) compileTopicRegexps() error {
return nil return nil
} }
func (k *KafkaConsumer) changedTopics() (bool, error) { func (k *KafkaConsumer) refreshTopics() error {
// We have instantiated a generic Kafka client, so we can ask // We have instantiated a new generic Kafka client, so we can ask
// it for all the topics it knows about. Then we build // it for all the topics it knows about. Then we build
// regexps from our strings, loop over those, loop over the // regexps from our strings, loop over those, loop over the
// topics, and if we find a match, add that topic to // topics, and if we find a match, add that topic to
// out topic set, which then we turn back into a list at the end. // out topic set, which then we turn back into a list at the end.
//
// If our topics changed, we return true, to indicate that the
// consumer will need a restart in order to pick up the new
// topics.
if len(k.regexps) == 0 { if len(k.regexps) == 0 {
return false, nil return nil
} }
// Refresh metadata for all topics.
err := k.topicClient.RefreshMetadata()
if err != nil {
return false, err
}
allDiscoveredTopics, err := k.topicClient.Topics() allDiscoveredTopics, err := k.topicClient.Topics()
if err != nil { if err != nil {
return false, err return err
} }
sort.Strings(allDiscoveredTopics) k.Log.Debugf("discovered topics: %v", allDiscoveredTopics)
k.Log.Debugf("discovered %d topics in total", len(allDiscoveredTopics))
extantTopicSet := make(map[string]bool, len(allDiscoveredTopics)) extantTopicSet := make(map[string]bool, len(allDiscoveredTopics))
for _, t := range allDiscoveredTopics { for _, t := range allDiscoveredTopics {
@ -232,6 +220,7 @@ func (k *KafkaConsumer) changedTopics() (bool, error) {
wantedTopicSet := make(map[string]bool, len(allDiscoveredTopics)) wantedTopicSet := make(map[string]bool, len(allDiscoveredTopics))
for _, t := range k.Topics { for _, t := range k.Topics {
// Get our pre-specified topics // Get our pre-specified topics
k.Log.Debugf("adding literally-specified topic %s", t)
wantedTopicSet[t] = true wantedTopicSet[t] = true
} }
for _, t := range allDiscoveredTopics { for _, t := range allDiscoveredTopics {
@ -239,6 +228,7 @@ func (k *KafkaConsumer) changedTopics() (bool, error) {
for _, r := range k.regexps { for _, r := range k.regexps {
if r.MatchString(t) { if r.MatchString(t) {
wantedTopicSet[t] = true wantedTopicSet[t] = true
k.Log.Debugf("adding regexp-matched topic %q", t)
break break
} }
} }
@ -249,17 +239,14 @@ func (k *KafkaConsumer) changedTopics() (bool, error) {
} }
sort.Strings(topicList) sort.Strings(topicList)
fingerprint := strings.Join(topicList, ";") fingerprint := strings.Join(topicList, ";")
k.Log.Debugf("Regular expression list %q matched %d topics", k.TopicRegexps, len(topicList))
if fingerprint != k.fingerprint { if fingerprint != k.fingerprint {
k.Log.Infof("updating topics: replacing %q with %q", k.allWantedTopics, topicList) k.Log.Infof("updating topics: replacing %q with %q", k.allWantedTopics, topicList)
k.topicLock.Lock()
k.fingerprint = fingerprint
k.allWantedTopics = topicList
k.topicLock.Unlock()
return true, nil
} }
k.Log.Debugf("topic list unchanged on refresh") k.topicLock.Lock()
return false, nil k.fingerprint = fingerprint
k.allWantedTopics = topicList
k.topicLock.Unlock()
return nil
} }
func (k *KafkaConsumer) create() error { func (k *KafkaConsumer) create() error {
@ -283,139 +270,14 @@ func (k *KafkaConsumer) startErrorAdder(acc telegraf.Accumulator) {
}() }()
} }
func (k *KafkaConsumer) getNewHandler(acc telegraf.Accumulator) *ConsumerGroupHandler {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
return handler
}
func (k *KafkaConsumer) handleTicker(acc telegraf.Accumulator) {
dstr := time.Duration(k.TopicRefreshInterval).String()
k.Log.Infof("starting refresh ticker: scanning topics every %s", dstr)
for {
<-k.ticker.C
k.Log.Debugf("received topic refresh request (every %s)", dstr)
changed, err := k.changedTopics()
if err != nil {
acc.AddError(err)
return
}
if changed {
err = k.restartConsumer(acc)
if err != nil {
acc.AddError(err)
return
}
}
}
}
func (k *KafkaConsumer) consumeTopics(ctx context.Context, acc telegraf.Accumulator) {
k.wg.Add(1)
defer k.wg.Done()
go func() {
for ctx.Err() == nil {
handler := k.getNewHandler(acc)
// We need to copy allWantedTopics; the Consume() is
// long-running and we can easily deadlock if our
// topic-update-checker fires.
k.topicLock.Lock()
topics := make([]string, len(k.allWantedTopics))
copy(topics, k.allWantedTopics)
k.topicLock.Unlock()
err := k.consumer.Consume(ctx, topics, handler)
if err != nil {
acc.AddError(fmt.Errorf("consume: %w", err))
internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway
}
}
err := k.consumer.Close()
if err != nil {
acc.AddError(fmt.Errorf("close: %w", err))
}
}()
}
func (k *KafkaConsumer) restartConsumer(acc telegraf.Accumulator) error {
// This is tricky. As soon as we call k.cancel() the old
// consumer group is no longer usable, so we need to get
// a new group and context ready and then pull a switcheroo
// quickly.
//
// Up to 100Hz, at least, we do not lose messages on consumer group
// restart. Since the group name is the same, it seems very likely
// that we just pick up at the offset we left off at: that is the
// producer does not see this as a new consumer, but as an old one
// that's just missed a couple beats.
if k.consumer == nil {
// Fast exit if the consumer isn't running
return nil
}
k.Log.Info("restarting consumer group")
k.Log.Debug("creating new consumer group")
newConsumer, err := k.ConsumerCreator.Create(
k.Brokers,
k.ConsumerGroup,
k.config,
)
if err != nil {
acc.AddError(err)
return err
}
k.Log.Debug("acquiring new context before swapping consumer groups")
ctx, cancel := context.WithCancel(context.Background())
// I am not sure we really need this lock, but if it hurts we're
// already refreshing way too frequently.
k.groupLock.Lock()
// Do the switcheroo.
k.Log.Debug("replacing consumer group")
oldConsumer := k.consumer
k.consumer = newConsumer
k.cancel = cancel
// Do this in the background
go func() {
k.Log.Debug("closing old consumer group")
err = oldConsumer.Close()
if err != nil {
acc.AddError(err)
return
}
}()
k.groupLock.Unlock()
k.Log.Debug("starting new consumer group")
k.consumeTopics(ctx, acc)
k.Log.Info("restarted consumer group")
return nil
}
func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
var err error var err error
k.Log.Debugf("TopicRegexps: %v", k.TopicRegexps)
k.Log.Debugf("TopicRefreshInterval: %v", k.TopicRefreshInterval)
// If TopicRegexps is set, add matches to Topics // If TopicRegexps is set, add matches to Topics
if len(k.TopicRegexps) > 0 { if len(k.TopicRegexps) > 0 {
if _, err = k.changedTopics(); err != nil { if err := k.refreshTopics(); err != nil {
// We're starting, so we expect the list to change;
// all we care about is whether we got an error
// acquiring our topics.
return err return err
} }
// If refresh interval is specified, start a goroutine
// to refresh topics periodically. This only makes sense if
// TopicRegexps is set.
if k.TopicRefreshInterval > 0 {
k.ticker = time.NewTicker(time.Duration(k.TopicRefreshInterval))
// Note that there's no waitgroup here. That's because
// handleTicker does care whether topics have changed,
// and if they have, it will invoke restartConsumer(),
// which tells the current consumer to stop. That stop
// cancels goroutines in the waitgroup, and therefore
// the restart goroutine must not be in the waitgroup.
go k.handleTicker(acc)
}
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -429,16 +291,44 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
k.startErrorAdder(acc) k.startErrorAdder(acc)
} }
if k.consumer == nil { // Start consumer goroutine
err = k.create() k.wg.Add(1)
if err != nil { go func() {
acc.AddError(fmt.Errorf("create consumer async: %w", err)) var err error
return err defer k.wg.Done()
}
}
k.startErrorAdder(acc) if k.consumer == nil {
k.consumeTopics(ctx, acc) // Starts goroutine internally err = k.create()
if err != nil {
acc.AddError(fmt.Errorf("create consumer async: %w", err))
return
}
}
k.startErrorAdder(acc)
for ctx.Err() == nil {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
// We need to copy allWantedTopics; the Consume() is
// long-running and we can easily deadlock if our
// topic-update-checker fires.
topics := make([]string, len(k.allWantedTopics))
k.topicLock.Lock()
copy(topics, k.allWantedTopics)
k.topicLock.Unlock()
err := k.consumer.Consume(ctx, topics, handler)
if err != nil {
acc.AddError(fmt.Errorf("consume: %w", err))
internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway
}
}
err = k.consumer.Close()
if err != nil {
acc.AddError(fmt.Errorf("close: %w", err))
}
}()
return nil return nil
} }
@ -496,7 +386,7 @@ type ConsumerGroupHandler struct {
log telegraf.Logger log telegraf.Logger
} }
// Setup is called once when a new session is opened. It sets up the handler // Setup is called once when a new session is opened. It setups up the handler
// and begins processing delivered messages. // and begins processing delivered messages.
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
h.undelivered = make(map[telegraf.TrackingID]Message) h.undelivered = make(map[telegraf.TrackingID]Message)

View File

@ -476,14 +476,15 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
} }
var tests = []struct { var tests = []struct {
name string name string
connectionStrategy string connectionStrategy string
topics []string topics []string
topicRegexps []string topicRegexps []string
topicRefreshInterval config.Duration
}{ }{
{"connection strategy startup", "startup", []string{"Test"}, nil}, {"connection strategy startup", "startup", []string{"Test"}, nil, config.Duration(0)},
{"connection strategy defer", "defer", []string{"Test"}, nil}, {"connection strategy defer", "defer", []string{"Test"}, nil, config.Duration(0)},
{"topic regexp", "startup", nil, []string{"Test*"}}, {"topic regexp", "startup", nil, []string{"T*"}, config.Duration(5 * time.Second)},
} }
for _, tt := range tests { for _, tt := range tests {
@ -589,161 +590,6 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
} }
} }
func TestDynamicTopicRefresh(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
var tests = []struct {
name string
connectionStrategy string
topics []string
topicRegexps []string
topicRefreshInterval config.Duration
}{
// 3-second refresh interval
{"topic regexp refresh", "startup", nil, []string{"Test*"}, config.Duration(3 * time.Second)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Logf("rt: starting network")
ctx := context.Background()
networkName := "telegraf-test-kafka-consumer-network"
network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
require.NoError(t, err)
defer func() {
require.NoError(t, network.Remove(ctx), "terminating network failed")
}()
t.Logf("rt: starting zookeeper")
zookeeperName := "telegraf-test-kafka-consumer-zookeeper"
zookeeper := testutil.Container{
Image: "wurstmeister/zookeeper",
ExposedPorts: []string{"2181:2181"},
Networks: []string{networkName},
WaitingFor: wait.ForLog("binding to port"),
Name: zookeeperName,
}
require.NoError(t, zookeeper.Start(), "failed to start container")
defer zookeeper.Terminate()
t.Logf("rt: starting broker")
container := testutil.Container{
Name: "telegraf-test-kafka-consumer",
Image: "wurstmeister/kafka",
ExposedPorts: []string{"9092:9092"},
Env: map[string]string{
"KAFKA_ADVERTISED_HOST_NAME": "localhost",
"KAFKA_ADVERTISED_PORT": "9092",
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]),
"KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", "Test"),
},
Networks: []string{networkName},
WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
brokers := []string{
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
}
// Make kafka output
t.Logf("rt: starting output plugin")
creator := outputs.Outputs["kafka"]
output, ok := creator().(*kafkaOutput.Kafka)
require.True(t, ok)
s := &influxSerializer.Serializer{}
require.NoError(t, s.Init())
output.SetSerializer(s)
output.Brokers = brokers
output.Topic = "TestDynamic"
output.Log = testutil.Logger{}
require.NoError(t, output.Init())
require.NoError(t, output.Connect())
// Make kafka input
t.Logf("rt: starting input plugin")
// If MaxUndeliveredMessages is 1 here (as it is
// for the other end-to-end tests) the test fails
// more often than not (but not always). We suspect
// this is something to do with internal messages
// about new topics and rebalancing getting in the
// way. At any rate, at 1000 (the default value),
// the tests pass reliably.
input := KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: tt.topics,
TopicRegexps: tt.topicRegexps,
TopicRefreshInterval: tt.topicRefreshInterval,
MaxUndeliveredMessages: 1000, // Default
ConnectionStrategy: tt.connectionStrategy,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
input.SetParser(parser)
require.NoError(t, input.Init())
acc := testutil.Accumulator{}
require.NoError(t, input.Start(&acc))
t.Logf("rt: input plugin started")
// First we need an AdminClient, so that we can add
// a topic list.
newCfg := sarama.NewConfig() // Defaults are OK.
t.Logf("rt: creating new Kafkfa ClusterAdmin")
admin, err := sarama.NewClusterAdmin(brokers, newCfg)
if err != nil {
require.Error(t, err)
return
}
newTopic := "TestDynamic"
t.Logf("rt: creating new Kafkfa topic %s", newTopic)
detail := sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}
// Add the topic
err = admin.CreateTopic(newTopic, &detail, false)
if err != nil {
require.Error(t, err)
return
}
// Shove some metrics through
expected := testutil.MockMetrics()
t.Logf("rt: writing %v to %s", expected, output.Topic)
require.NoError(t, output.Write(expected))
// Check that they were received
t.Logf("rt: expecting")
// This usually hangs and we never read.
// Sometimes, though, we do read the expected data.
// Why?
acc.Wait(len(expected))
t.Logf("rt: received %d", len(expected))
q := acc.GetTelegrafMetrics()
t.Logf("rt: received metrics %v", q)
testutil.RequireMetricsEqual(t, expected, q)
t.Logf("rt: shutdown")
require.NoError(t, output.Close())
input.Stop()
t.Logf("rt: done")
})
}
}
func TestExponentialBackoff(t *testing.T) { func TestExponentialBackoff(t *testing.T) {
var err error var err error

View File

@ -10,12 +10,6 @@
## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ] ## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
# topic_regexps = [ ] # topic_regexps = [ ]
## Topic regexp refresh interval. If enabled, and if regular expressions
## are enabled, available topics will be rescanned at this interval to
## determine whether new ones are present.
## Exmaple: topic_refresh_interval = "5m"
# topic_refresh_interval = ""
## When set this tag will be added to all metrics with the topic as the value. ## When set this tag will be added to all metrics with the topic as the value.
# topic_tag = "" # topic_tag = ""