feat(inputs.kafka_consumer): Refresh regexp topics periodically (#13410)
This commit is contained in:
parent
c3d30c8227
commit
a13f3463eb
|
|
@ -50,6 +50,12 @@ to use them.
|
||||||
## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
|
## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
|
||||||
# topic_regexps = [ ]
|
# topic_regexps = [ ]
|
||||||
|
|
||||||
|
## Topic regexp refresh interval. If enabled, and if regular expressions
|
||||||
|
## are enabled, available topics will be rescanned at this interval to
|
||||||
|
## determine whether new ones are present.
|
||||||
|
## Exmaple: topic_refresh_interval = "5m"
|
||||||
|
# topic_refresh_interval = ""
|
||||||
|
|
||||||
## When set this tag will be added to all metrics with the topic as the value.
|
## When set this tag will be added to all metrics with the topic as the value.
|
||||||
# topic_tag = ""
|
# topic_tag = ""
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,7 @@ type KafkaConsumer struct {
|
||||||
BalanceStrategy string `toml:"balance_strategy"`
|
BalanceStrategy string `toml:"balance_strategy"`
|
||||||
Topics []string `toml:"topics"`
|
Topics []string `toml:"topics"`
|
||||||
TopicRegexps []string `toml:"topic_regexps"`
|
TopicRegexps []string `toml:"topic_regexps"`
|
||||||
|
TopicRefreshInterval config.Duration `toml:"topic_refresh_interval"`
|
||||||
TopicTag string `toml:"topic_tag"`
|
TopicTag string `toml:"topic_tag"`
|
||||||
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
|
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
|
||||||
ConnectionStrategy string `toml:"connection_strategy"`
|
ConnectionStrategy string `toml:"connection_strategy"`
|
||||||
|
|
@ -65,6 +66,7 @@ type KafkaConsumer struct {
|
||||||
|
|
||||||
parser telegraf.Parser
|
parser telegraf.Parser
|
||||||
topicLock sync.Mutex
|
topicLock sync.Mutex
|
||||||
|
groupLock sync.Mutex
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
@ -167,14 +169,14 @@ func (k *KafkaConsumer) Init() error {
|
||||||
}
|
}
|
||||||
k.topicClient = client
|
k.topicClient = client
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KafkaConsumer) compileTopicRegexps() error {
|
func (k *KafkaConsumer) compileTopicRegexps() error {
|
||||||
// While we can add new topics matching extant regexps, we can't
|
// While we can add new topics matching extant regexps, we can't
|
||||||
// update that list on the fly. We compile them once at startup.
|
// update that list on the fly. We compile them once at startup.
|
||||||
// Changing them is a configuration change and requires a restart.
|
// Changing them is a configuration change and requires us to cancel
|
||||||
|
// and relaunch our ConsumerGroup.
|
||||||
|
|
||||||
k.regexps = make([]regexp.Regexp, 0, len(k.TopicRegexps))
|
k.regexps = make([]regexp.Regexp, 0, len(k.TopicRegexps))
|
||||||
for _, r := range k.TopicRegexps {
|
for _, r := range k.TopicRegexps {
|
||||||
|
|
@ -187,22 +189,32 @@ func (k *KafkaConsumer) compileTopicRegexps() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KafkaConsumer) refreshTopics() error {
|
func (k *KafkaConsumer) changedTopics() (bool, error) {
|
||||||
// We have instantiated a new generic Kafka client, so we can ask
|
// We have instantiated a generic Kafka client, so we can ask
|
||||||
// it for all the topics it knows about. Then we build
|
// it for all the topics it knows about. Then we build
|
||||||
// regexps from our strings, loop over those, loop over the
|
// regexps from our strings, loop over those, loop over the
|
||||||
// topics, and if we find a match, add that topic to
|
// topics, and if we find a match, add that topic to
|
||||||
// out topic set, which then we turn back into a list at the end.
|
// out topic set, which then we turn back into a list at the end.
|
||||||
|
//
|
||||||
|
// If our topics changed, we return true, to indicate that the
|
||||||
|
// consumer will need a restart in order to pick up the new
|
||||||
|
// topics.
|
||||||
|
|
||||||
if len(k.regexps) == 0 {
|
if len(k.regexps) == 0 {
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Refresh metadata for all topics.
|
||||||
|
err := k.topicClient.RefreshMetadata()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
allDiscoveredTopics, err := k.topicClient.Topics()
|
allDiscoveredTopics, err := k.topicClient.Topics()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
k.Log.Debugf("discovered topics: %v", allDiscoveredTopics)
|
sort.Strings(allDiscoveredTopics)
|
||||||
|
k.Log.Debugf("discovered %d topics in total", len(allDiscoveredTopics))
|
||||||
|
|
||||||
extantTopicSet := make(map[string]bool, len(allDiscoveredTopics))
|
extantTopicSet := make(map[string]bool, len(allDiscoveredTopics))
|
||||||
for _, t := range allDiscoveredTopics {
|
for _, t := range allDiscoveredTopics {
|
||||||
|
|
@ -220,7 +232,6 @@ func (k *KafkaConsumer) refreshTopics() error {
|
||||||
wantedTopicSet := make(map[string]bool, len(allDiscoveredTopics))
|
wantedTopicSet := make(map[string]bool, len(allDiscoveredTopics))
|
||||||
for _, t := range k.Topics {
|
for _, t := range k.Topics {
|
||||||
// Get our pre-specified topics
|
// Get our pre-specified topics
|
||||||
k.Log.Debugf("adding literally-specified topic %s", t)
|
|
||||||
wantedTopicSet[t] = true
|
wantedTopicSet[t] = true
|
||||||
}
|
}
|
||||||
for _, t := range allDiscoveredTopics {
|
for _, t := range allDiscoveredTopics {
|
||||||
|
|
@ -228,7 +239,6 @@ func (k *KafkaConsumer) refreshTopics() error {
|
||||||
for _, r := range k.regexps {
|
for _, r := range k.regexps {
|
||||||
if r.MatchString(t) {
|
if r.MatchString(t) {
|
||||||
wantedTopicSet[t] = true
|
wantedTopicSet[t] = true
|
||||||
k.Log.Debugf("adding regexp-matched topic %q", t)
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -239,14 +249,17 @@ func (k *KafkaConsumer) refreshTopics() error {
|
||||||
}
|
}
|
||||||
sort.Strings(topicList)
|
sort.Strings(topicList)
|
||||||
fingerprint := strings.Join(topicList, ";")
|
fingerprint := strings.Join(topicList, ";")
|
||||||
|
k.Log.Debugf("Regular expression list %q matched %d topics", k.TopicRegexps, len(topicList))
|
||||||
if fingerprint != k.fingerprint {
|
if fingerprint != k.fingerprint {
|
||||||
k.Log.Infof("updating topics: replacing %q with %q", k.allWantedTopics, topicList)
|
k.Log.Infof("updating topics: replacing %q with %q", k.allWantedTopics, topicList)
|
||||||
|
k.topicLock.Lock()
|
||||||
|
k.fingerprint = fingerprint
|
||||||
|
k.allWantedTopics = topicList
|
||||||
|
k.topicLock.Unlock()
|
||||||
|
return true, nil
|
||||||
}
|
}
|
||||||
k.topicLock.Lock()
|
k.Log.Debugf("topic list unchanged on refresh")
|
||||||
k.fingerprint = fingerprint
|
return false, nil
|
||||||
k.allWantedTopics = topicList
|
|
||||||
k.topicLock.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *KafkaConsumer) create() error {
|
func (k *KafkaConsumer) create() error {
|
||||||
|
|
@ -270,14 +283,139 @@ func (k *KafkaConsumer) startErrorAdder(acc telegraf.Accumulator) {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *KafkaConsumer) getNewHandler(acc telegraf.Accumulator) *ConsumerGroupHandler {
|
||||||
|
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
|
||||||
|
handler.MaxMessageLen = k.MaxMessageLen
|
||||||
|
handler.TopicTag = k.TopicTag
|
||||||
|
return handler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KafkaConsumer) handleTicker(acc telegraf.Accumulator) {
|
||||||
|
dstr := time.Duration(k.TopicRefreshInterval).String()
|
||||||
|
k.Log.Infof("starting refresh ticker: scanning topics every %s", dstr)
|
||||||
|
for {
|
||||||
|
<-k.ticker.C
|
||||||
|
k.Log.Debugf("received topic refresh request (every %s)", dstr)
|
||||||
|
changed, err := k.changedTopics()
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if changed {
|
||||||
|
err = k.restartConsumer(acc)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KafkaConsumer) consumeTopics(ctx context.Context, acc telegraf.Accumulator) {
|
||||||
|
k.wg.Add(1)
|
||||||
|
defer k.wg.Done()
|
||||||
|
go func() {
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
handler := k.getNewHandler(acc)
|
||||||
|
// We need to copy allWantedTopics; the Consume() is
|
||||||
|
// long-running and we can easily deadlock if our
|
||||||
|
// topic-update-checker fires.
|
||||||
|
k.topicLock.Lock()
|
||||||
|
topics := make([]string, len(k.allWantedTopics))
|
||||||
|
copy(topics, k.allWantedTopics)
|
||||||
|
k.topicLock.Unlock()
|
||||||
|
err := k.consumer.Consume(ctx, topics, handler)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(fmt.Errorf("consume: %w", err))
|
||||||
|
internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := k.consumer.Close()
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(fmt.Errorf("close: %w", err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *KafkaConsumer) restartConsumer(acc telegraf.Accumulator) error {
|
||||||
|
// This is tricky. As soon as we call k.cancel() the old
|
||||||
|
// consumer group is no longer usable, so we need to get
|
||||||
|
// a new group and context ready and then pull a switcheroo
|
||||||
|
// quickly.
|
||||||
|
//
|
||||||
|
// Up to 100Hz, at least, we do not lose messages on consumer group
|
||||||
|
// restart. Since the group name is the same, it seems very likely
|
||||||
|
// that we just pick up at the offset we left off at: that is the
|
||||||
|
// producer does not see this as a new consumer, but as an old one
|
||||||
|
// that's just missed a couple beats.
|
||||||
|
if k.consumer == nil {
|
||||||
|
// Fast exit if the consumer isn't running
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
k.Log.Info("restarting consumer group")
|
||||||
|
k.Log.Debug("creating new consumer group")
|
||||||
|
newConsumer, err := k.ConsumerCreator.Create(
|
||||||
|
k.Brokers,
|
||||||
|
k.ConsumerGroup,
|
||||||
|
k.config,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
k.Log.Debug("acquiring new context before swapping consumer groups")
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
// I am not sure we really need this lock, but if it hurts we're
|
||||||
|
// already refreshing way too frequently.
|
||||||
|
k.groupLock.Lock()
|
||||||
|
// Do the switcheroo.
|
||||||
|
k.Log.Debug("replacing consumer group")
|
||||||
|
oldConsumer := k.consumer
|
||||||
|
k.consumer = newConsumer
|
||||||
|
k.cancel = cancel
|
||||||
|
// Do this in the background
|
||||||
|
go func() {
|
||||||
|
k.Log.Debug("closing old consumer group")
|
||||||
|
err = oldConsumer.Close()
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
k.groupLock.Unlock()
|
||||||
|
k.Log.Debug("starting new consumer group")
|
||||||
|
k.consumeTopics(ctx, acc)
|
||||||
|
k.Log.Info("restarted consumer group")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
|
func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
k.Log.Debugf("TopicRegexps: %v", k.TopicRegexps)
|
||||||
|
k.Log.Debugf("TopicRefreshInterval: %v", k.TopicRefreshInterval)
|
||||||
|
|
||||||
// If TopicRegexps is set, add matches to Topics
|
// If TopicRegexps is set, add matches to Topics
|
||||||
if len(k.TopicRegexps) > 0 {
|
if len(k.TopicRegexps) > 0 {
|
||||||
if err := k.refreshTopics(); err != nil {
|
if _, err = k.changedTopics(); err != nil {
|
||||||
|
// We're starting, so we expect the list to change;
|
||||||
|
// all we care about is whether we got an error
|
||||||
|
// acquiring our topics.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// If refresh interval is specified, start a goroutine
|
||||||
|
// to refresh topics periodically. This only makes sense if
|
||||||
|
// TopicRegexps is set.
|
||||||
|
if k.TopicRefreshInterval > 0 {
|
||||||
|
k.ticker = time.NewTicker(time.Duration(k.TopicRefreshInterval))
|
||||||
|
// Note that there's no waitgroup here. That's because
|
||||||
|
// handleTicker does care whether topics have changed,
|
||||||
|
// and if they have, it will invoke restartConsumer(),
|
||||||
|
// which tells the current consumer to stop. That stop
|
||||||
|
// cancels goroutines in the waitgroup, and therefore
|
||||||
|
// the restart goroutine must not be in the waitgroup.
|
||||||
|
go k.handleTicker(acc)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
@ -291,44 +429,16 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
|
||||||
k.startErrorAdder(acc)
|
k.startErrorAdder(acc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start consumer goroutine
|
if k.consumer == nil {
|
||||||
k.wg.Add(1)
|
err = k.create()
|
||||||
go func() {
|
|
||||||
var err error
|
|
||||||
defer k.wg.Done()
|
|
||||||
|
|
||||||
if k.consumer == nil {
|
|
||||||
err = k.create()
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("create consumer async: %w", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
k.startErrorAdder(acc)
|
|
||||||
|
|
||||||
for ctx.Err() == nil {
|
|
||||||
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
|
|
||||||
handler.MaxMessageLen = k.MaxMessageLen
|
|
||||||
handler.TopicTag = k.TopicTag
|
|
||||||
// We need to copy allWantedTopics; the Consume() is
|
|
||||||
// long-running and we can easily deadlock if our
|
|
||||||
// topic-update-checker fires.
|
|
||||||
topics := make([]string, len(k.allWantedTopics))
|
|
||||||
k.topicLock.Lock()
|
|
||||||
copy(topics, k.allWantedTopics)
|
|
||||||
k.topicLock.Unlock()
|
|
||||||
err := k.consumer.Consume(ctx, topics, handler)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(fmt.Errorf("consume: %w", err))
|
|
||||||
internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err = k.consumer.Close()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(fmt.Errorf("close: %w", err))
|
acc.AddError(fmt.Errorf("create consumer async: %w", err))
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
|
|
||||||
|
k.startErrorAdder(acc)
|
||||||
|
k.consumeTopics(ctx, acc) // Starts goroutine internally
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -386,7 +496,7 @@ type ConsumerGroupHandler struct {
|
||||||
log telegraf.Logger
|
log telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup is called once when a new session is opened. It setups up the handler
|
// Setup is called once when a new session is opened. It sets up the handler
|
||||||
// and begins processing delivered messages.
|
// and begins processing delivered messages.
|
||||||
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
|
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
|
||||||
h.undelivered = make(map[telegraf.TrackingID]Message)
|
h.undelivered = make(map[telegraf.TrackingID]Message)
|
||||||
|
|
|
||||||
|
|
@ -476,15 +476,14 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var tests = []struct {
|
var tests = []struct {
|
||||||
name string
|
name string
|
||||||
connectionStrategy string
|
connectionStrategy string
|
||||||
topics []string
|
topics []string
|
||||||
topicRegexps []string
|
topicRegexps []string
|
||||||
topicRefreshInterval config.Duration
|
|
||||||
}{
|
}{
|
||||||
{"connection strategy startup", "startup", []string{"Test"}, nil, config.Duration(0)},
|
{"connection strategy startup", "startup", []string{"Test"}, nil},
|
||||||
{"connection strategy defer", "defer", []string{"Test"}, nil, config.Duration(0)},
|
{"connection strategy defer", "defer", []string{"Test"}, nil},
|
||||||
{"topic regexp", "startup", nil, []string{"T*"}, config.Duration(5 * time.Second)},
|
{"topic regexp", "startup", nil, []string{"Test*"}},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
|
@ -590,6 +589,161 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDynamicTopicRefresh(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
connectionStrategy string
|
||||||
|
topics []string
|
||||||
|
topicRegexps []string
|
||||||
|
topicRefreshInterval config.Duration
|
||||||
|
}{
|
||||||
|
// 3-second refresh interval
|
||||||
|
{"topic regexp refresh", "startup", nil, []string{"Test*"}, config.Duration(3 * time.Second)},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
t.Logf("rt: starting network")
|
||||||
|
ctx := context.Background()
|
||||||
|
networkName := "telegraf-test-kafka-consumer-network"
|
||||||
|
network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
|
||||||
|
NetworkRequest: testcontainers.NetworkRequest{
|
||||||
|
Name: networkName,
|
||||||
|
Attachable: true,
|
||||||
|
CheckDuplicate: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, network.Remove(ctx), "terminating network failed")
|
||||||
|
}()
|
||||||
|
|
||||||
|
t.Logf("rt: starting zookeeper")
|
||||||
|
zookeeperName := "telegraf-test-kafka-consumer-zookeeper"
|
||||||
|
zookeeper := testutil.Container{
|
||||||
|
Image: "wurstmeister/zookeeper",
|
||||||
|
ExposedPorts: []string{"2181:2181"},
|
||||||
|
Networks: []string{networkName},
|
||||||
|
WaitingFor: wait.ForLog("binding to port"),
|
||||||
|
Name: zookeeperName,
|
||||||
|
}
|
||||||
|
require.NoError(t, zookeeper.Start(), "failed to start container")
|
||||||
|
defer zookeeper.Terminate()
|
||||||
|
|
||||||
|
t.Logf("rt: starting broker")
|
||||||
|
container := testutil.Container{
|
||||||
|
Name: "telegraf-test-kafka-consumer",
|
||||||
|
Image: "wurstmeister/kafka",
|
||||||
|
ExposedPorts: []string{"9092:9092"},
|
||||||
|
Env: map[string]string{
|
||||||
|
"KAFKA_ADVERTISED_HOST_NAME": "localhost",
|
||||||
|
"KAFKA_ADVERTISED_PORT": "9092",
|
||||||
|
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]),
|
||||||
|
"KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", "Test"),
|
||||||
|
},
|
||||||
|
Networks: []string{networkName},
|
||||||
|
WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"),
|
||||||
|
}
|
||||||
|
require.NoError(t, container.Start(), "failed to start container")
|
||||||
|
defer container.Terminate()
|
||||||
|
|
||||||
|
brokers := []string{
|
||||||
|
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make kafka output
|
||||||
|
t.Logf("rt: starting output plugin")
|
||||||
|
creator := outputs.Outputs["kafka"]
|
||||||
|
output, ok := creator().(*kafkaOutput.Kafka)
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
s := &influxSerializer.Serializer{}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
output.SetSerializer(s)
|
||||||
|
output.Brokers = brokers
|
||||||
|
output.Topic = "TestDynamic"
|
||||||
|
output.Log = testutil.Logger{}
|
||||||
|
|
||||||
|
require.NoError(t, output.Init())
|
||||||
|
require.NoError(t, output.Connect())
|
||||||
|
|
||||||
|
// Make kafka input
|
||||||
|
t.Logf("rt: starting input plugin")
|
||||||
|
// If MaxUndeliveredMessages is 1 here (as it is
|
||||||
|
// for the other end-to-end tests) the test fails
|
||||||
|
// more often than not (but not always). We suspect
|
||||||
|
// this is something to do with internal messages
|
||||||
|
// about new topics and rebalancing getting in the
|
||||||
|
// way. At any rate, at 1000 (the default value),
|
||||||
|
// the tests pass reliably.
|
||||||
|
input := KafkaConsumer{
|
||||||
|
Brokers: brokers,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
Topics: tt.topics,
|
||||||
|
TopicRegexps: tt.topicRegexps,
|
||||||
|
TopicRefreshInterval: tt.topicRefreshInterval,
|
||||||
|
MaxUndeliveredMessages: 1000, // Default
|
||||||
|
ConnectionStrategy: tt.connectionStrategy,
|
||||||
|
}
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
input.SetParser(parser)
|
||||||
|
require.NoError(t, input.Init())
|
||||||
|
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
require.NoError(t, input.Start(&acc))
|
||||||
|
t.Logf("rt: input plugin started")
|
||||||
|
|
||||||
|
// First we need an AdminClient, so that we can add
|
||||||
|
// a topic list.
|
||||||
|
|
||||||
|
newCfg := sarama.NewConfig() // Defaults are OK.
|
||||||
|
t.Logf("rt: creating new Kafkfa ClusterAdmin")
|
||||||
|
admin, err := sarama.NewClusterAdmin(brokers, newCfg)
|
||||||
|
if err != nil {
|
||||||
|
require.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
newTopic := "TestDynamic"
|
||||||
|
t.Logf("rt: creating new Kafkfa topic %s", newTopic)
|
||||||
|
detail := sarama.TopicDetail{
|
||||||
|
NumPartitions: 1,
|
||||||
|
ReplicationFactor: 1,
|
||||||
|
}
|
||||||
|
// Add the topic
|
||||||
|
err = admin.CreateTopic(newTopic, &detail, false)
|
||||||
|
if err != nil {
|
||||||
|
require.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Shove some metrics through
|
||||||
|
expected := testutil.MockMetrics()
|
||||||
|
t.Logf("rt: writing %v to %s", expected, output.Topic)
|
||||||
|
require.NoError(t, output.Write(expected))
|
||||||
|
|
||||||
|
// Check that they were received
|
||||||
|
t.Logf("rt: expecting")
|
||||||
|
// This usually hangs and we never read.
|
||||||
|
// Sometimes, though, we do read the expected data.
|
||||||
|
// Why?
|
||||||
|
acc.Wait(len(expected))
|
||||||
|
t.Logf("rt: received %d", len(expected))
|
||||||
|
q := acc.GetTelegrafMetrics()
|
||||||
|
t.Logf("rt: received metrics %v", q)
|
||||||
|
testutil.RequireMetricsEqual(t, expected, q)
|
||||||
|
|
||||||
|
t.Logf("rt: shutdown")
|
||||||
|
require.NoError(t, output.Close())
|
||||||
|
input.Stop()
|
||||||
|
|
||||||
|
t.Logf("rt: done")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestExponentialBackoff(t *testing.T) {
|
func TestExponentialBackoff(t *testing.T) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,12 @@
|
||||||
## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
|
## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
|
||||||
# topic_regexps = [ ]
|
# topic_regexps = [ ]
|
||||||
|
|
||||||
|
## Topic regexp refresh interval. If enabled, and if regular expressions
|
||||||
|
## are enabled, available topics will be rescanned at this interval to
|
||||||
|
## determine whether new ones are present.
|
||||||
|
## Exmaple: topic_refresh_interval = "5m"
|
||||||
|
# topic_refresh_interval = ""
|
||||||
|
|
||||||
## When set this tag will be added to all metrics with the topic as the value.
|
## When set this tag will be added to all metrics with the topic as the value.
|
||||||
# topic_tag = ""
|
# topic_tag = ""
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue