From 784ede96f843d163ebbd4dc2cb1b5c89a17ac16d Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Tue, 18 Jun 2024 12:15:28 -0400 Subject: [PATCH] feat(inputs.mqtt_consumer): Add variable length topic parsing (#15528) --- plugins/inputs/mqtt_consumer/README.md | 3 +- plugins/inputs/mqtt_consumer/mqtt_consumer.go | 147 ++--------- .../mqtt_consumer/mqtt_consumer_test.go | 76 +++++- plugins/inputs/mqtt_consumer/sample.conf | 3 +- plugins/inputs/mqtt_consumer/topic_parser.go | 230 ++++++++++++++++++ 5 files changed, 319 insertions(+), 140 deletions(-) create mode 100644 plugins/inputs/mqtt_consumer/topic_parser.go diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md index e035414cb..6b8065d39 100644 --- a/plugins/inputs/mqtt_consumer/README.md +++ b/plugins/inputs/mqtt_consumer/README.md @@ -136,7 +136,8 @@ to use them. data_format = "influx" ## Enable extracting tag values from MQTT topics - ## _ denotes an ignored entry in the topic path + ## _ denotes an ignored entry in the topic path, + ## # denotes a variable length path element (can only be used once per setting) # [[inputs.mqtt_consumer.topic_parsing]] # topic = "" # measurement = "" diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 6ed5d0507..5992b7be3 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -6,7 +6,6 @@ import ( _ "embed" "errors" "fmt" - "strconv" "strings" "sync" "time" @@ -46,24 +45,11 @@ type Client interface { type ClientFactory func(o *mqtt.ClientOptions) Client -type TopicParsingConfig struct { - Topic string `toml:"topic"` - Measurement string `toml:"measurement"` - Tags string `toml:"tags"` - Fields string `toml:"fields"` - FieldTypes map[string]string `toml:"types"` - // cached split of user given information - MeasurementIndex int - SplitTags []string - SplitFields []string - SplitTopic []string -} - type MQTTConsumer struct { Servers []string `toml:"servers"` Topics []string `toml:"topics"` TopicTag *string `toml:"topic_tag"` - TopicParsing []TopicParsingConfig `toml:"topic_parsing"` + TopicParserConfig []TopicParsingConfig `toml:"topic_parsing"` Username config.Secret `toml:"username"` Password config.Secret `toml:"password"` QoS int `toml:"qos"` @@ -85,6 +71,7 @@ type MQTTConsumer struct { messages map[telegraf.TrackingID]mqtt.Message messagesMutex sync.Mutex topicTagParse string + topicParsers []*TopicParser ctx context.Context cancel context.CancelFunc payloadSize selfstat.Stat @@ -120,29 +107,13 @@ func (m *MQTTConsumer) Init() error { m.opts = opts m.messages = map[telegraf.TrackingID]mqtt.Message{} - for i, p := range m.TopicParsing { - splitMeasurement := strings.Split(p.Measurement, "/") - for j := range splitMeasurement { - if splitMeasurement[j] != "_" && splitMeasurement[j] != "" { - m.TopicParsing[i].MeasurementIndex = j - break - } - } - m.TopicParsing[i].SplitTags = strings.Split(p.Tags, "/") - m.TopicParsing[i].SplitFields = strings.Split(p.Fields, "/") - m.TopicParsing[i].SplitTopic = strings.Split(p.Topic, "/") - - if len(splitMeasurement) != len(m.TopicParsing[i].SplitTopic) && len(splitMeasurement) != 1 { - return errors.New("config error topic parsing: measurement length does not equal topic length") - } - - if len(m.TopicParsing[i].SplitFields) != len(m.TopicParsing[i].SplitTopic) && p.Fields != "" { - return errors.New("config error topic parsing: fields length does not equal topic length") - } - - if len(m.TopicParsing[i].SplitTags) != len(m.TopicParsing[i].SplitTopic) && p.Tags != "" { - return errors.New("config error topic parsing: tags length does not equal topic length") + m.topicParsers = make([]*TopicParser, 0, len(m.TopicParserConfig)) + for _, cfg := range m.TopicParserConfig { + p, err := cfg.NewParser() + if err != nil { + return fmt.Errorf("config error topic parsing: %w", err) } + m.topicParsers = append(m.topicParsers, p) } m.payloadSize = selfstat.Register("mqtt_consumer", "payload_size", map[string]string{}) @@ -223,21 +194,6 @@ func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) { m.Log.Debugf("Disconnected %v", m.Servers) } -// compareTopics is used to support the mqtt wild card `+` which allows for one topic of any value -func compareTopics(expected []string, incoming []string) bool { - if len(expected) != len(incoming) { - return false - } - - for i, expected := range expected { - if incoming[i] != expected && expected != "+" { - return false - } - } - - return true -} - func (m *MQTTConsumer) onDelivered(track telegraf.DeliveryInfo) { <-m.sem @@ -284,36 +240,14 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) { if m.topicTagParse != "" { metric.AddTag(m.topicTagParse, msg.Topic()) } - for _, p := range m.TopicParsing { - values := strings.Split(msg.Topic(), "/") - if !compareTopics(p.SplitTopic, values) { - continue - } - - if p.Measurement != "" { - metric.SetName(values[p.MeasurementIndex]) - } - if p.Tags != "" { - err := parseMetric(p.SplitTags, values, p.FieldTypes, true, metric) - if err != nil { - if m.PersistentSession { - msg.Ack() - } - m.acc.AddError(err) - <-m.sem - return - } - } - if p.Fields != "" { - err := parseMetric(p.SplitFields, values, p.FieldTypes, false, metric) - if err != nil { - if m.PersistentSession { - msg.Ack() - } - m.acc.AddError(err) - <-m.sem - return + for _, p := range m.topicParsers { + if err := p.Parse(metric, msg.Topic()); err != nil { + if m.PersistentSession { + msg.Ack() } + m.acc.AddError(err) + <-m.sem + return } } } @@ -399,57 +333,6 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { return opts, nil } -// parseFields gets multiple fields from the topic based on the user configuration (TopicParsing.Fields) -func parseMetric(keys []string, values []string, types map[string]string, isTag bool, metric telegraf.Metric) error { - for i, k := range keys { - if k == "_" || k == "" { - continue - } - - if isTag { - metric.AddTag(k, values[i]) - } else { - newType, err := typeConvert(types, values[i], k) - if err != nil { - return err - } - metric.AddField(k, newType) - } - } - return nil -} - -func typeConvert(types map[string]string, topicValue string, key string) (interface{}, error) { - var newType interface{} - var err error - // If the user configured inputs.mqtt_consumer.topic.types, check for the desired type - if desiredType, ok := types[key]; ok { - switch desiredType { - case "uint": - newType, err = strconv.ParseUint(topicValue, 10, 64) - if err != nil { - return nil, fmt.Errorf("unable to convert field %q to type uint: %w", topicValue, err) - } - case "int": - newType, err = strconv.ParseInt(topicValue, 10, 64) - if err != nil { - return nil, fmt.Errorf("unable to convert field %q to type int: %w", topicValue, err) - } - case "float": - newType, err = strconv.ParseFloat(topicValue, 64) - if err != nil { - return nil, fmt.Errorf("unable to convert field %q to type float: %w", topicValue, err) - } - default: - return nil, fmt.Errorf("converting to the type %s is not supported: use int, uint, or float", desiredType) - } - } else { - newType = topicValue - } - - return newType, nil -} - func New(factory ClientFactory) *MQTTConsumer { return &MQTTConsumer{ Servers: []string{"tcp://127.0.0.1:1883"}, diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 8b4665375..cb15e2ac0 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -1,7 +1,6 @@ package mqtt_consumer import ( - "errors" "fmt" "path/filepath" "testing" @@ -200,7 +199,7 @@ func TestTopicTag(t *testing.T) { name string topic string topicTag func() *string - expectedError error + expectedError string topicParsing []TopicParsingConfig expected []telegraf.Metric }{ @@ -333,7 +332,7 @@ func TestTopicTag(t *testing.T) { tag := "" return &tag }, - expectedError: errors.New("config error topic parsing: fields length does not equal topic length"), + expectedError: "config error topic parsing: fields length does not equal topic length", topicParsing: []TopicParsingConfig{ { Topic: "telegraf/+/test/hello", @@ -455,6 +454,69 @@ func TestTopicTag(t *testing.T) { ), }, }, + { + name: "topic parsing with variable length", + topic: "/telegraf/123/foo/test/hello", + topicTag: func() *string { + tag := "" + return &tag + }, + topicParsing: []TopicParsingConfig{ + { + Topic: "/telegraf/#/test/hello", + Measurement: "/#/measurement/_", + Tags: "/testTag/#/moreTag/_/_", + Fields: "/_/testNumber/#/testString", + FieldTypes: map[string]string{ + "testNumber": "int", + }, + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{ + "testTag": "telegraf", + "moreTag": "foo", + }, + map[string]interface{}{ + "testNumber": 123, + "testString": "hello", + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "topic parsing with variable length too short", + topic: "/telegraf/123", + topicTag: func() *string { + tag := "" + return &tag + }, + topicParsing: []TopicParsingConfig{ + { + Topic: "/telegraf/#", + Measurement: "/#/measurement/_", + Tags: "/testTag/#/moreTag/_/_", + Fields: "/_/testNumber/#/testString", + FieldTypes: map[string]string{ + "testNumber": "int", + }, + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -479,16 +541,18 @@ func TestTopicTag(t *testing.T) { plugin.Log = testutil.Logger{} plugin.Topics = []string{tt.topic} plugin.TopicTag = tt.topicTag() - plugin.TopicParsing = tt.topicParsing + plugin.TopicParserConfig = tt.topicParsing parser := &influx.Parser{} require.NoError(t, parser.Init()) plugin.SetParser(parser) - require.Equal(t, tt.expectedError, plugin.Init()) - if tt.expectedError != nil { + err := plugin.Init() + if tt.expectedError != "" { + require.ErrorContains(t, err, tt.expectedError) return } + require.NoError(t, err) var acc testutil.Accumulator require.NoError(t, plugin.Start(&acc)) diff --git a/plugins/inputs/mqtt_consumer/sample.conf b/plugins/inputs/mqtt_consumer/sample.conf index d1f792893..5d8104c7a 100644 --- a/plugins/inputs/mqtt_consumer/sample.conf +++ b/plugins/inputs/mqtt_consumer/sample.conf @@ -85,7 +85,8 @@ data_format = "influx" ## Enable extracting tag values from MQTT topics - ## _ denotes an ignored entry in the topic path + ## _ denotes an ignored entry in the topic path, + ## # denotes a variable length path element (can only be used once per setting) # [[inputs.mqtt_consumer.topic_parsing]] # topic = "" # measurement = "" diff --git a/plugins/inputs/mqtt_consumer/topic_parser.go b/plugins/inputs/mqtt_consumer/topic_parser.go new file mode 100644 index 000000000..ac739adfd --- /dev/null +++ b/plugins/inputs/mqtt_consumer/topic_parser.go @@ -0,0 +1,230 @@ +package mqtt_consumer + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "github.com/influxdata/telegraf" +) + +type TopicParsingConfig struct { + Topic string `toml:"topic"` + Measurement string `toml:"measurement"` + Tags string `toml:"tags"` + Fields string `toml:"fields"` + FieldTypes map[string]string `toml:"types"` +} + +type TopicParser struct { + topicIndices map[string]int + topicVarLength bool + topicMinLength int + + extractMeasurement bool + measurementIndex int + tagIndices map[string]int + fieldIndices map[string]int + fieldTypes map[string]string +} + +func (cfg *TopicParsingConfig) NewParser() (*TopicParser, error) { + p := &TopicParser{ + fieldTypes: cfg.FieldTypes, + } + + // Build a check list for topic elements + var topicMinLength int + var topicInvert bool + topicParts := strings.Split(cfg.Topic, "/") + p.topicIndices = make(map[string]int, len(topicParts)) + for i, k := range topicParts { + switch k { + case "+": + topicMinLength++ + case "#": + if p.topicVarLength { + return nil, errors.New("topic can only contain one hash") + } + p.topicVarLength = true + topicInvert = true + default: + if !topicInvert { + p.topicIndices[k] = i + } else { + p.topicIndices[k] = i - len(topicParts) + } + topicMinLength++ + } + } + + // Determine metric name selection + var measurementMinLength int + var measurementInvert bool + measurementParts := strings.Split(cfg.Measurement, "/") + for i, k := range measurementParts { + if k == "_" || k == "" { + measurementMinLength++ + continue + } + + if k == "#" { + measurementInvert = true + continue + } + + if p.extractMeasurement { + return nil, errors.New("measurement can only contain one element") + } + + if !measurementInvert { + p.measurementIndex = i + } else { + p.measurementIndex = i - len(measurementParts) + } + p.extractMeasurement = true + measurementMinLength++ + } + + // Determine tag selections + var tagMinLength int + var tagInvert bool + tagParts := strings.Split(cfg.Tags, "/") + p.tagIndices = make(map[string]int, len(tagParts)) + for i, k := range tagParts { + if k == "_" || k == "" { + tagMinLength++ + continue + } + if k == "#" { + tagInvert = true + continue + } + if !tagInvert { + p.tagIndices[k] = i + } else { + p.tagIndices[k] = i - len(tagParts) + } + tagMinLength++ + } + + // Determine tag selections + var fieldMinLength int + var fieldInvert bool + fieldParts := strings.Split(cfg.Fields, "/") + p.fieldIndices = make(map[string]int, len(fieldParts)) + for i, k := range fieldParts { + if k == "_" || k == "" { + fieldMinLength++ + continue + } + if k == "#" { + fieldInvert = true + continue + } + if !fieldInvert { + p.fieldIndices[k] = i + } else { + p.fieldIndices[k] = i - len(fieldParts) + } + fieldMinLength++ + } + + if !p.topicVarLength { + if measurementMinLength != topicMinLength && p.extractMeasurement { + return nil, errors.New("measurement length does not equal topic length") + } + + if fieldMinLength != topicMinLength && cfg.Fields != "" { + return nil, errors.New("fields length does not equal topic length") + } + + if tagMinLength != topicMinLength && cfg.Tags != "" { + return nil, errors.New("tags length does not equal topic length") + } + } + + p.topicMinLength = max(topicMinLength, measurementMinLength, tagMinLength, fieldMinLength) + + return p, nil +} + +func (p *TopicParser) Parse(metric telegraf.Metric, topic string) error { + // Split the actual topic into its elements and check for a match + topicParts := strings.Split(topic, "/") + if p.topicVarLength && len(topicParts) < p.topicMinLength || !p.topicVarLength && len(topicParts) != p.topicMinLength { + return nil + } + for expected, i := range p.topicIndices { + if i >= 0 && topicParts[i] != expected || i < 0 && topicParts[len(topicParts)+i] != expected { + return nil + } + } + + // Extract the measurement name + var measurement string + if p.extractMeasurement { + if p.measurementIndex >= 0 { + measurement = topicParts[p.measurementIndex] + } else { + measurement = topicParts[len(topicParts)+p.measurementIndex] + } + metric.SetName(measurement) + } + + // Extract the tags + for k, i := range p.tagIndices { + if i >= 0 { + metric.AddTag(k, topicParts[i]) + } else { + metric.AddTag(k, topicParts[len(topicParts)+i]) + } + } + + // Extract the fields + for k, i := range p.fieldIndices { + var raw string + if i >= 0 { + raw = topicParts[i] + } else { + raw = topicParts[len(topicParts)+i] + } + v, err := p.convertToFieldType(raw, k) + if err != nil { + return err + } + metric.AddField(k, v) + } + + return nil +} + +func (p *TopicParser) convertToFieldType(value string, key string) (interface{}, error) { + // If the user configured inputs.mqtt_consumer.topic.types, check for the desired type + desiredType, ok := p.fieldTypes[key] + if !ok { + return value, nil + } + + var v interface{} + var err error + switch desiredType { + case "uint": + if v, err = strconv.ParseUint(value, 10, 64); err != nil { + return nil, fmt.Errorf("unable to convert field %q to type uint: %w", value, err) + } + case "int": + if v, err = strconv.ParseInt(value, 10, 64); err != nil { + return nil, fmt.Errorf("unable to convert field %q to type int: %w", value, err) + } + case "float": + if v, err = strconv.ParseFloat(value, 64); err != nil { + return nil, fmt.Errorf("unable to convert field %q to type float: %w", value, err) + } + default: + return nil, fmt.Errorf("converting to the type %s is not supported: use int, uint, or float", desiredType) + } + + return v, nil +}