diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index b43c021e6..fc86f56e8 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -65,10 +65,10 @@ type KafkaConsumer struct { ticker *time.Ticker fingerprint string - parserFunc telegraf.ParserFunc - topicLock sync.Mutex - wg sync.WaitGroup - cancel context.CancelFunc + parser telegraf.Parser + topicLock sync.Mutex + wg sync.WaitGroup + cancel context.CancelFunc } type ConsumerGroup interface { @@ -91,8 +91,8 @@ func (*KafkaConsumer) SampleConfig() string { return sampleConfig } -func (k *KafkaConsumer) SetParserFunc(fn telegraf.ParserFunc) { - k.parserFunc = fn +func (k *KafkaConsumer) SetParser(parser telegraf.Parser) { + k.parser = parser } func (k *KafkaConsumer) Init() error { @@ -318,7 +318,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { k.startErrorAdder(acc) for ctx.Err() == nil { - handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log) + handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log) handler.MaxMessageLen = k.MaxMessageLen handler.TopicTag = k.TopicTag //if message headers list specified, put it as map to handler @@ -377,12 +377,12 @@ type Message struct { session sarama.ConsumerGroupSession } -func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, fn telegraf.ParserFunc, log telegraf.Logger) *ConsumerGroupHandler { +func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, parser telegraf.Parser, log telegraf.Logger) *ConsumerGroupHandler { handler := &ConsumerGroupHandler{ acc: acc.WithTracking(maxUndelivered), sem: make(chan empty, maxUndelivered), undelivered: make(map[telegraf.TrackingID]Message, maxUndelivered), - parserFunc: fn, + parser: parser, log: log, } return handler @@ -394,11 +394,11 @@ type ConsumerGroupHandler struct { TopicTag string MsgHeadersToTags map[string]bool - acc telegraf.TrackingAccumulator - sem semaphore - parserFunc telegraf.ParserFunc - wg sync.WaitGroup - cancel context.CancelFunc + acc telegraf.TrackingAccumulator + sem semaphore + parser telegraf.Parser + wg sync.WaitGroup + cancel context.CancelFunc mu sync.Mutex undelivered map[telegraf.TrackingID]Message @@ -476,12 +476,7 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg * len(msg.Value), h.MaxMessageLen) } - parser, err := h.parserFunc() - if err != nil { - return fmt.Errorf("creating parser: %w", err) - } - - metrics, err := parser.Parse(msg.Value) + metrics, err := h.parser.Parse(msg.Value) if err != nil { h.release() return err diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 36077a516..3e40ad5ed 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -294,15 +294,11 @@ func (c *FakeConsumerGroupClaim) Messages() <-chan *sarama.ConsumerMessage { func TestConsumerGroupHandler_Lifecycle(t *testing.T) { acc := &testutil.Accumulator{} - parserFunc := func() (telegraf.Parser, error) { - parser := &value.Parser{ - MetricName: "cpu", - DataType: "int", - } - err := parser.Init() - return parser, err + parser := value.Parser{ + MetricName: "cpu", + DataType: "int", } - cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{}) + cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -330,15 +326,12 @@ func TestConsumerGroupHandler_Lifecycle(t *testing.T) { func TestConsumerGroupHandler_ConsumeClaim(t *testing.T) { acc := &testutil.Accumulator{} - parserFunc := func() (telegraf.Parser, error) { - parser := &value.Parser{ - MetricName: "cpu", - DataType: "int", - } - err := parser.Init() - return parser, err + parser := value.Parser{ + MetricName: "cpu", + DataType: "int", } - cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{}) + require.NoError(t, parser.Init()) + cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -451,15 +444,12 @@ func TestConsumerGroupHandler_Handle(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { acc := &testutil.Accumulator{} - parserFunc := func() (telegraf.Parser, error) { - parser := &value.Parser{ - MetricName: "cpu", - DataType: "int", - } - err := parser.Init() - return parser, err + parser := value.Parser{ + MetricName: "cpu", + DataType: "int", } - cg := NewConsumerGroupHandler(acc, 1, parserFunc, testutil.Logger{}) + require.NoError(t, parser.Init()) + cg := NewConsumerGroupHandler(acc, 1, &parser, testutil.Logger{}) cg.MaxMessageLen = tt.maxMessageLen cg.TopicTag = tt.topicTag @@ -573,12 +563,9 @@ func TestKafkaRoundTripIntegration(t *testing.T) { MaxUndeliveredMessages: 1, ConnectionStrategy: tt.connectionStrategy, } - parserFunc := func() (telegraf.Parser, error) { - parser := &influx.Parser{} - err := parser.Init() - return parser, err - } - input.SetParserFunc(parserFunc) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + input.SetParser(parser) require.NoError(t, input.Init()) acc := testutil.Accumulator{} @@ -634,12 +621,9 @@ func TestExponentialBackoff(t *testing.T) { }, }, } - parserFunc := func() (telegraf.Parser, error) { - parser := &influx.Parser{} - err := parser.Init() - return parser, err - } - input.SetParserFunc(parserFunc) + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + input.SetParser(parser) //time how long initialization (connection) takes start := time.Now() @@ -682,13 +666,9 @@ func TestExponentialBackoffDefault(t *testing.T) { }, }, } - parserFunc := func() (telegraf.Parser, error) { - parser := &influx.Parser{} - err := parser.Init() - return parser, err - } - input.SetParserFunc(parserFunc) - + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + input.SetParser(parser) require.NoError(t, input.Init()) // We don't need to start the plugin here since we're only testing diff --git a/plugins/parsers/json_v2/parser.go b/plugins/parsers/json_v2/parser.go index c195cf103..9c3b2c033 100644 --- a/plugins/parsers/json_v2/parser.go +++ b/plugins/parsers/json_v2/parser.go @@ -5,6 +5,7 @@ import ( "io" "strconv" "strings" + "sync" "time" "github.com/dimchansky/utfbom" @@ -35,6 +36,8 @@ type Parser struct { iterateObjects bool // objectConfig contains the config for an object, some info is needed while iterating over the gjson results objectConfig Object + // parseMutex is here because Parse() is not threadsafe. If it is made threadsafe at some point, then we won't need it anymore. + parseMutex sync.Mutex } type Config struct { @@ -114,6 +117,19 @@ func (p *Parser) Init() error { } func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { + // What we've done here is to put the entire former contents of Parse() + // into parseCriticalPath(). + // + // As we determine what bits of parseCriticalPath() are and are not + // threadsafe, we can lift the safe pieces back up into Parse(), and + // shrink the scope (or scopes, if the critical sections are disjoint) + // of those pieces that need to be protected with a mutex. + return p.parseCriticalPath(input) +} + +func (p *Parser) parseCriticalPath(input []byte) ([]telegraf.Metric, error) { + p.parseMutex.Lock() + defer p.parseMutex.Unlock() reader := strings.NewReader(string(input)) body, _ := utfbom.Skip(reader) input, err := io.ReadAll(body)