fix: mqtt topic extracting no longer requires all three fields (#10208)
This commit is contained in:
parent
b954e565cc
commit
d15cf791db
|
|
@ -96,10 +96,6 @@ var sampleConfig = `
|
||||||
"telegraf/+/mem",
|
"telegraf/+/mem",
|
||||||
"sensors/#",
|
"sensors/#",
|
||||||
]
|
]
|
||||||
## Enable extracting tag values from MQTT topics
|
|
||||||
## _ denotes an ignored entry in the topic path
|
|
||||||
# topic_tags = "_/format/client/_"
|
|
||||||
# topic_measurement = "measurement/_/_/_"
|
|
||||||
# topic_fields = "_/_/_/temperature"
|
# topic_fields = "_/_/_/temperature"
|
||||||
## The message topic will be stored in a tag specified by this value. If set
|
## The message topic will be stored in a tag specified by this value. If set
|
||||||
## to the empty string no topic tag will be created.
|
## to the empty string no topic tag will be created.
|
||||||
|
|
@ -199,15 +195,15 @@ func (m *MQTTConsumer) Init() error {
|
||||||
m.TopicParsing[i].SplitFields = strings.Split(p.Fields, "/")
|
m.TopicParsing[i].SplitFields = strings.Split(p.Fields, "/")
|
||||||
m.TopicParsing[i].SplitTopic = strings.Split(p.Topic, "/")
|
m.TopicParsing[i].SplitTopic = strings.Split(p.Topic, "/")
|
||||||
|
|
||||||
if len(splitMeasurement) != len(m.TopicParsing[i].SplitTopic) {
|
if len(splitMeasurement) != len(m.TopicParsing[i].SplitTopic) && len(splitMeasurement) != 1 {
|
||||||
return fmt.Errorf("config error topic parsing: measurement length does not equal topic length")
|
return fmt.Errorf("config error topic parsing: measurement length does not equal topic length")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(m.TopicParsing[i].SplitFields) != len(m.TopicParsing[i].SplitTopic) {
|
if len(m.TopicParsing[i].SplitFields) != len(m.TopicParsing[i].SplitTopic) && p.Fields != "" {
|
||||||
return fmt.Errorf("config error topic parsing: fields length does not equal topic length")
|
return fmt.Errorf("config error topic parsing: fields length does not equal topic length")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(m.TopicParsing[i].SplitTags) != len(m.TopicParsing[i].SplitTopic) {
|
if len(m.TopicParsing[i].SplitTags) != len(m.TopicParsing[i].SplitTopic) && p.Tags != "" {
|
||||||
return fmt.Errorf("config error topic parsing: tags length does not equal topic length")
|
return fmt.Errorf("config error topic parsing: tags length does not equal topic length")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -408,7 +404,6 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
|
||||||
|
|
||||||
// parseFields gets multiple fields from the topic based on the user configuration (TopicParsing.Fields)
|
// parseFields gets multiple fields from the topic based on the user configuration (TopicParsing.Fields)
|
||||||
func parseMetric(keys []string, values []string, types map[string]string, isTag bool, metric telegraf.Metric) error {
|
func parseMetric(keys []string, values []string, types map[string]string, isTag bool, metric telegraf.Metric) error {
|
||||||
var metricFound bool
|
|
||||||
for i, k := range keys {
|
for i, k := range keys {
|
||||||
if k == "_" {
|
if k == "_" {
|
||||||
continue
|
continue
|
||||||
|
|
@ -416,19 +411,14 @@ func parseMetric(keys []string, values []string, types map[string]string, isTag
|
||||||
|
|
||||||
if isTag {
|
if isTag {
|
||||||
metric.AddTag(k, values[i])
|
metric.AddTag(k, values[i])
|
||||||
metricFound = true
|
|
||||||
} else {
|
} else {
|
||||||
newType, err := typeConvert(types, values[i], k)
|
newType, err := typeConvert(types, values[i], k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
metric.AddField(k, newType)
|
metric.AddField(k, newType)
|
||||||
metricFound = true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !metricFound {
|
|
||||||
return fmt.Errorf("no fields or tags found")
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -350,6 +350,68 @@ func TestTopicTag(t *testing.T) {
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "topic parsing configured without fields",
|
||||||
|
topic: "telegraf/123/test/hello",
|
||||||
|
topicTag: func() *string {
|
||||||
|
tag := ""
|
||||||
|
return &tag
|
||||||
|
},
|
||||||
|
topicParsing: []TopicParsingConfig{
|
||||||
|
{
|
||||||
|
Topic: "telegraf/+/test/hello",
|
||||||
|
Measurement: "_/_/measurement/_",
|
||||||
|
Tags: "testTag/_/_/_",
|
||||||
|
FieldTypes: map[string]string{
|
||||||
|
"testNumber": "int",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"test",
|
||||||
|
map[string]string{
|
||||||
|
"testTag": "telegraf",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"time_idle": 42,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "topic parsing configured without measurement",
|
||||||
|
topic: "telegraf/123/test/hello",
|
||||||
|
topicTag: func() *string {
|
||||||
|
tag := ""
|
||||||
|
return &tag
|
||||||
|
},
|
||||||
|
topicParsing: []TopicParsingConfig{
|
||||||
|
{
|
||||||
|
Topic: "telegraf/+/test/hello",
|
||||||
|
Tags: "testTag/_/_/_",
|
||||||
|
Fields: "_/testNumber/_/testString",
|
||||||
|
FieldTypes: map[string]string{
|
||||||
|
"testNumber": "int",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"cpu",
|
||||||
|
map[string]string{
|
||||||
|
"testTag": "telegraf",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"testNumber": 123,
|
||||||
|
"testString": "hello",
|
||||||
|
"time_idle": 42,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue