From cff5c0e8528850f1da2176467f20dbc8e579a2e6 Mon Sep 17 00:00:00 2001 From: wuxingzhong Date: Wed, 20 Jul 2022 21:28:20 +0800 Subject: [PATCH] fix(inputs.mqtt_consumer): topic parsing error when topic having prefix(/) (#11527) --- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 4 +-- .../mqtt_consumer/mqtt_consumer_test.go | 33 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index eee520db5..37fabded7 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -124,7 +124,7 @@ func (m *MQTTConsumer) Init() error { for i, p := range m.TopicParsing { splitMeasurement := strings.Split(p.Measurement, "/") for j := range splitMeasurement { - if splitMeasurement[j] != "_" { + if splitMeasurement[j] != "_" && splitMeasurement[j] != "" { m.TopicParsing[i].MeasurementIndex = j break } @@ -343,7 +343,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { // parseFields gets multiple fields from the topic based on the user configuration (TopicParsing.Fields) func parseMetric(keys []string, values []string, types map[string]string, isTag bool, metric telegraf.Metric) error { for i, k := range keys { - if k == "_" { + if k == "_" || k == "" { continue } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index da8a376fe..3b8780c78 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -413,6 +413,39 @@ func TestTopicTag(t *testing.T) { ), }, }, + { + name: "topic parsing configured topic with a prefix `/`", + topic: "/telegraf/123/test/hello", + topicTag: func() *string { + tag := "" + return &tag + }, + topicParsing: []TopicParsingConfig{ + { + Topic: "/telegraf/+/test/hello", + Measurement: "/_/_/measurement/_", + Tags: "/testTag/_/_/_", + Fields: "/_/testNumber/_/testString", + FieldTypes: map[string]string{ + "testNumber": "int", + }, + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{ + "testTag": "telegraf", + }, + map[string]interface{}{ + "testNumber": 123, + "testString": "hello", + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {