fix(inputs.mqtt_consumer): topic parsing error when topic having prefix(/) (#11527)
This commit is contained in:
parent
ae695e8736
commit
cff5c0e852
|
|
@ -124,7 +124,7 @@ func (m *MQTTConsumer) Init() error {
|
|||
for i, p := range m.TopicParsing {
|
||||
splitMeasurement := strings.Split(p.Measurement, "/")
|
||||
for j := range splitMeasurement {
|
||||
if splitMeasurement[j] != "_" {
|
||||
if splitMeasurement[j] != "_" && splitMeasurement[j] != "" {
|
||||
m.TopicParsing[i].MeasurementIndex = j
|
||||
break
|
||||
}
|
||||
|
|
@ -343,7 +343,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
|
|||
// parseFields gets multiple fields from the topic based on the user configuration (TopicParsing.Fields)
|
||||
func parseMetric(keys []string, values []string, types map[string]string, isTag bool, metric telegraf.Metric) error {
|
||||
for i, k := range keys {
|
||||
if k == "_" {
|
||||
if k == "_" || k == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -413,6 +413,39 @@ func TestTopicTag(t *testing.T) {
|
|||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "topic parsing configured topic with a prefix `/`",
|
||||
topic: "/telegraf/123/test/hello",
|
||||
topicTag: func() *string {
|
||||
tag := ""
|
||||
return &tag
|
||||
},
|
||||
topicParsing: []TopicParsingConfig{
|
||||
{
|
||||
Topic: "/telegraf/+/test/hello",
|
||||
Measurement: "/_/_/measurement/_",
|
||||
Tags: "/testTag/_/_/_",
|
||||
Fields: "/_/testNumber/_/testString",
|
||||
FieldTypes: map[string]string{
|
||||
"testNumber": "int",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"test",
|
||||
map[string]string{
|
||||
"testTag": "telegraf",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"testNumber": 123,
|
||||
"testString": "hello",
|
||||
"time_idle": 42,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue