fix(inputs.kafka_consumer): Use per-message parser to avoid races (#13886)

This commit is contained in:
Tobias Jungel 2023-09-11 16:18:51 +02:00 committed by GitHub
parent 855b25d383
commit 3fae6439ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 39 deletions

View File

@ -63,10 +63,10 @@ type KafkaConsumer struct {
ticker *time.Ticker ticker *time.Ticker
fingerprint string fingerprint string
parser telegraf.Parser parserFunc telegraf.ParserFunc
topicLock sync.Mutex topicLock sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
cancel context.CancelFunc cancel context.CancelFunc
} }
type ConsumerGroup interface { type ConsumerGroup interface {
@ -89,8 +89,8 @@ func (*KafkaConsumer) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (k *KafkaConsumer) SetParser(parser telegraf.Parser) { func (k *KafkaConsumer) SetParserFunc(fn telegraf.ParserFunc) {
k.parser = parser k.parserFunc = fn
} }
func (k *KafkaConsumer) Init() error { func (k *KafkaConsumer) Init() error {
@ -308,7 +308,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
k.startErrorAdder(acc) k.startErrorAdder(acc)
for ctx.Err() == nil { for ctx.Err() == nil {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log) handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log)
handler.MaxMessageLen = k.MaxMessageLen handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag handler.TopicTag = k.TopicTag
// We need to copy allWantedTopics; the Consume() is // We need to copy allWantedTopics; the Consume() is
@ -358,12 +358,12 @@ type Message struct {
session sarama.ConsumerGroupSession session sarama.ConsumerGroupSession
} }
func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser telegraf.Parser, log telegraf.Logger) *ConsumerGroupHandler { func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, fn telegraf.ParserFunc, log telegraf.Logger) *ConsumerGroupHandler {
handler := &ConsumerGroupHandler{ handler := &ConsumerGroupHandler{
acc: acc.WithTracking(maxUndelivered), acc: acc.WithTracking(maxUndelivered),
sem: make(chan empty, maxUndelivered), sem: make(chan empty, maxUndelivered),
undelivered: make(map[telegraf.TrackingID]Message, maxUndelivered), undelivered: make(map[telegraf.TrackingID]Message, maxUndelivered),
parser: parser, parserFunc: fn,
log: log, log: log,
} }
return handler return handler
@ -374,11 +374,11 @@ type ConsumerGroupHandler struct {
MaxMessageLen int MaxMessageLen int
TopicTag string TopicTag string
acc telegraf.TrackingAccumulator acc telegraf.TrackingAccumulator
sem semaphore sem semaphore
parser telegraf.Parser parserFunc telegraf.ParserFunc
wg sync.WaitGroup wg sync.WaitGroup
cancel context.CancelFunc cancel context.CancelFunc
mu sync.Mutex mu sync.Mutex
undelivered map[telegraf.TrackingID]Message undelivered map[telegraf.TrackingID]Message
@ -456,7 +456,12 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
len(msg.Value), h.MaxMessageLen) len(msg.Value), h.MaxMessageLen)
} }
metrics, err := h.parser.Parse(msg.Value) parser, err := h.parserFunc()
if err != nil {
return fmt.Errorf("creating parser: %w", err)
}
metrics, err := parser.Parse(msg.Value)
if err != nil { if err != nil {
h.release() h.release()
return err return err

View File

@ -293,12 +293,16 @@ func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage {
func TestConsumerGroupHandler_Lifecycle(t *testing.T) { func TestConsumerGroupHandler_Lifecycle(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
parser := value.Parser{
MetricName: "cpu", parserFunc := func() (telegraf.Parser, error) {
DataType: "int", parser := &value.Parser{
MetricName: "cpu",
DataType: "int",
}
err := parser.Init()
return parser, err
} }
require.NoError(t, parser.Init()) cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{})
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -326,12 +330,15 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) {
func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) { func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
parser := value.Parser{ parserFunc := func() (telegraf.Parser, error) {
MetricName: "cpu", parser := &value.Parser{
DataType: "int", MetricName: "cpu",
DataType: "int",
}
err := parser.Init()
return parser, err
} }
require.NoError(t, parser.Init()) cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{})
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -444,12 +451,15 @@ func TestConsumerGroupHandler_Handle(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
parser := value.Parser{ parserFunc := func() (telegraf.Parser, error) {
MetricName: "cpu", parser := &value.Parser{
DataType: "int", MetricName: "cpu",
DataType: "int",
}
err := parser.Init()
return parser, err
} }
require.NoError(t, parser.Init()) cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{})
cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{})
cg.MaxMessageLen = tt.maxMessageLen cg.MaxMessageLen = tt.maxMessageLen
cg.TopicTag = tt.topicTag cg.TopicTag = tt.topicTag
@ -563,9 +573,12 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
MaxUndeliveredMessages: 1, MaxUndeliveredMessages: 1,
ConnectionStrategy: tt.connectionStrategy, ConnectionStrategy: tt.connectionStrategy,
} }
parser := &influx.Parser{} parserFunc := func() (telegraf.Parser, error) {
require.NoError(t, parser.Init()) parser := &influx.Parser{}
input.SetParser(parser) err := parser.Init()
return parser, err
}
input.SetParserFunc(parserFunc)
require.NoError(t, input.Init()) require.NoError(t, input.Init())
acc := testutil.Accumulator{} acc := testutil.Accumulator{}
@ -621,9 +634,12 @@ func TestExponentialBackoff(t *testing.T) {
}, },
}, },
} }
parser := &influx.Parser{} parserFunc := func() (telegraf.Parser, error) {
require.NoError(t, parser.Init()) parser := &influx.Parser{}
input.SetParser(parser) err := parser.Init()
return parser, err
}
input.SetParserFunc(parserFunc)
//time how long initialization (connection) takes //time how long initialization (connection) takes
start := time.Now() start := time.Now()
@ -666,9 +682,12 @@ func TestExponentialBackoffDefault(t *testing.T) {
}, },
}, },
} }
parser := &influx.Parser{} parserFunc := func() (telegraf.Parser, error) {
require.NoError(t, parser.Init()) parser := &influx.Parser{}
input.SetParser(parser) err := parser.Init()
return parser, err
}
input.SetParserFunc(parserFunc)
require.NoError(t, input.Init()) require.NoError(t, input.Init())