feat(inputs.mqtt_consumer): Add variable length topic parsing (#15528)

This commit is contained in:
Sven Rebhan 2024-06-18 12:15:28 -04:00 committed by GitHub
parent 3ee61ffc4e
commit 784ede96f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 319 additions and 140 deletions

View File

@ -136,7 +136,8 @@ to use them.
data_format = "influx"
## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
## _ denotes an ignored entry in the topic path,
## # denotes a variable length path element (can only be used once per setting)
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""

View File

@ -6,7 +6,6 @@ import (
_ "embed"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
@ -46,24 +45,11 @@ type Client interface {
type ClientFactory func(o *mqtt.ClientOptions) Client
type TopicParsingConfig struct {
Topic string `toml:"topic"`
Measurement string `toml:"measurement"`
Tags string `toml:"tags"`
Fields string `toml:"fields"`
FieldTypes map[string]string `toml:"types"`
// cached split of user given information
MeasurementIndex int
SplitTags []string
SplitFields []string
SplitTopic []string
}
type MQTTConsumer struct {
Servers []string `toml:"servers"`
Topics []string `toml:"topics"`
TopicTag *string `toml:"topic_tag"`
TopicParsing []TopicParsingConfig `toml:"topic_parsing"`
TopicParserConfig []TopicParsingConfig `toml:"topic_parsing"`
Username config.Secret `toml:"username"`
Password config.Secret `toml:"password"`
QoS int `toml:"qos"`
@ -85,6 +71,7 @@ type MQTTConsumer struct {
messages map[telegraf.TrackingID]mqtt.Message
messagesMutex sync.Mutex
topicTagParse string
topicParsers []*TopicParser
ctx context.Context
cancel context.CancelFunc
payloadSize selfstat.Stat
@ -120,29 +107,13 @@ func (m *MQTTConsumer) Init() error {
m.opts = opts
m.messages = map[telegraf.TrackingID]mqtt.Message{}
for i, p := range m.TopicParsing {
splitMeasurement := strings.Split(p.Measurement, "/")
for j := range splitMeasurement {
if splitMeasurement[j] != "_" && splitMeasurement[j] != "" {
m.TopicParsing[i].MeasurementIndex = j
break
}
}
m.TopicParsing[i].SplitTags = strings.Split(p.Tags, "/")
m.TopicParsing[i].SplitFields = strings.Split(p.Fields, "/")
m.TopicParsing[i].SplitTopic = strings.Split(p.Topic, "/")
if len(splitMeasurement) != len(m.TopicParsing[i].SplitTopic) && len(splitMeasurement) != 1 {
return errors.New("config error topic parsing: measurement length does not equal topic length")
}
if len(m.TopicParsing[i].SplitFields) != len(m.TopicParsing[i].SplitTopic) && p.Fields != "" {
return errors.New("config error topic parsing: fields length does not equal topic length")
}
if len(m.TopicParsing[i].SplitTags) != len(m.TopicParsing[i].SplitTopic) && p.Tags != "" {
return errors.New("config error topic parsing: tags length does not equal topic length")
m.topicParsers = make([]*TopicParser, 0, len(m.TopicParserConfig))
for _, cfg := range m.TopicParserConfig {
p, err := cfg.NewParser()
if err != nil {
return fmt.Errorf("config error topic parsing: %w", err)
}
m.topicParsers = append(m.topicParsers, p)
}
m.payloadSize = selfstat.Register("mqtt_consumer", "payload_size", map[string]string{})
@ -223,21 +194,6 @@ func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) {
m.Log.Debugf("Disconnected %v", m.Servers)
}
// compareTopics is used to support the mqtt wild card `+` which allows for one topic of any value
func compareTopics(expected []string, incoming []string) bool {
if len(expected) != len(incoming) {
return false
}
for i, expected := range expected {
if incoming[i] != expected && expected != "+" {
return false
}
}
return true
}
func (m *MQTTConsumer) onDelivered(track telegraf.DeliveryInfo) {
<-m.sem
@ -284,36 +240,14 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
if m.topicTagParse != "" {
metric.AddTag(m.topicTagParse, msg.Topic())
}
for _, p := range m.TopicParsing {
values := strings.Split(msg.Topic(), "/")
if !compareTopics(p.SplitTopic, values) {
continue
}
if p.Measurement != "" {
metric.SetName(values[p.MeasurementIndex])
}
if p.Tags != "" {
err := parseMetric(p.SplitTags, values, p.FieldTypes, true, metric)
if err != nil {
if m.PersistentSession {
msg.Ack()
}
m.acc.AddError(err)
<-m.sem
return
}
}
if p.Fields != "" {
err := parseMetric(p.SplitFields, values, p.FieldTypes, false, metric)
if err != nil {
if m.PersistentSession {
msg.Ack()
}
m.acc.AddError(err)
<-m.sem
return
for _, p := range m.topicParsers {
if err := p.Parse(metric, msg.Topic()); err != nil {
if m.PersistentSession {
msg.Ack()
}
m.acc.AddError(err)
<-m.sem
return
}
}
}
@ -399,57 +333,6 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
return opts, nil
}
// parseFields gets multiple fields from the topic based on the user configuration (TopicParsing.Fields)
func parseMetric(keys []string, values []string, types map[string]string, isTag bool, metric telegraf.Metric) error {
for i, k := range keys {
if k == "_" || k == "" {
continue
}
if isTag {
metric.AddTag(k, values[i])
} else {
newType, err := typeConvert(types, values[i], k)
if err != nil {
return err
}
metric.AddField(k, newType)
}
}
return nil
}
func typeConvert(types map[string]string, topicValue string, key string) (interface{}, error) {
var newType interface{}
var err error
// If the user configured inputs.mqtt_consumer.topic.types, check for the desired type
if desiredType, ok := types[key]; ok {
switch desiredType {
case "uint":
newType, err = strconv.ParseUint(topicValue, 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to convert field %q to type uint: %w", topicValue, err)
}
case "int":
newType, err = strconv.ParseInt(topicValue, 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to convert field %q to type int: %w", topicValue, err)
}
case "float":
newType, err = strconv.ParseFloat(topicValue, 64)
if err != nil {
return nil, fmt.Errorf("unable to convert field %q to type float: %w", topicValue, err)
}
default:
return nil, fmt.Errorf("converting to the type %s is not supported: use int, uint, or float", desiredType)
}
} else {
newType = topicValue
}
return newType, nil
}
func New(factory ClientFactory) *MQTTConsumer {
return &MQTTConsumer{
Servers: []string{"tcp://127.0.0.1:1883"},

View File

@ -1,7 +1,6 @@
package mqtt_consumer
import (
"errors"
"fmt"
"path/filepath"
"testing"
@ -200,7 +199,7 @@ func TestTopicTag(t *testing.T) {
name string
topic string
topicTag func() *string
expectedError error
expectedError string
topicParsing []TopicParsingConfig
expected []telegraf.Metric
}{
@ -333,7 +332,7 @@ func TestTopicTag(t *testing.T) {
tag := ""
return &tag
},
expectedError: errors.New("config error topic parsing: fields length does not equal topic length"),
expectedError: "config error topic parsing: fields length does not equal topic length",
topicParsing: []TopicParsingConfig{
{
Topic: "telegraf/+/test/hello",
@ -455,6 +454,69 @@ func TestTopicTag(t *testing.T) {
),
},
},
{
name: "topic parsing with variable length",
topic: "/telegraf/123/foo/test/hello",
topicTag: func() *string {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
{
Topic: "/telegraf/#/test/hello",
Measurement: "/#/measurement/_",
Tags: "/testTag/#/moreTag/_/_",
Fields: "/_/testNumber/#/testString",
FieldTypes: map[string]string{
"testNumber": "int",
},
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{
"testTag": "telegraf",
"moreTag": "foo",
},
map[string]interface{}{
"testNumber": 123,
"testString": "hello",
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
{
name: "topic parsing with variable length too short",
topic: "/telegraf/123",
topicTag: func() *string {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
{
Topic: "/telegraf/#",
Measurement: "/#/measurement/_",
Tags: "/testTag/#/moreTag/_/_",
Fields: "/_/testNumber/#/testString",
FieldTypes: map[string]string{
"testNumber": "int",
},
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -479,16 +541,18 @@ func TestTopicTag(t *testing.T) {
plugin.Log = testutil.Logger{}
plugin.Topics = []string{tt.topic}
plugin.TopicTag = tt.topicTag()
plugin.TopicParsing = tt.topicParsing
plugin.TopicParserConfig = tt.topicParsing
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
require.Equal(t, tt.expectedError, plugin.Init())
if tt.expectedError != nil {
err := plugin.Init()
if tt.expectedError != "" {
require.ErrorContains(t, err, tt.expectedError)
return
}
require.NoError(t, err)
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))

View File

@ -85,7 +85,8 @@
data_format = "influx"
## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
## _ denotes an ignored entry in the topic path,
## # denotes a variable length path element (can only be used once per setting)
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""

View File

@ -0,0 +1,230 @@
package mqtt_consumer
import (
"errors"
"fmt"
"strconv"
"strings"
"github.com/influxdata/telegraf"
)
type TopicParsingConfig struct {
Topic string `toml:"topic"`
Measurement string `toml:"measurement"`
Tags string `toml:"tags"`
Fields string `toml:"fields"`
FieldTypes map[string]string `toml:"types"`
}
type TopicParser struct {
topicIndices map[string]int
topicVarLength bool
topicMinLength int
extractMeasurement bool
measurementIndex int
tagIndices map[string]int
fieldIndices map[string]int
fieldTypes map[string]string
}
func (cfg *TopicParsingConfig) NewParser() (*TopicParser, error) {
p := &TopicParser{
fieldTypes: cfg.FieldTypes,
}
// Build a check list for topic elements
var topicMinLength int
var topicInvert bool
topicParts := strings.Split(cfg.Topic, "/")
p.topicIndices = make(map[string]int, len(topicParts))
for i, k := range topicParts {
switch k {
case "+":
topicMinLength++
case "#":
if p.topicVarLength {
return nil, errors.New("topic can only contain one hash")
}
p.topicVarLength = true
topicInvert = true
default:
if !topicInvert {
p.topicIndices[k] = i
} else {
p.topicIndices[k] = i - len(topicParts)
}
topicMinLength++
}
}
// Determine metric name selection
var measurementMinLength int
var measurementInvert bool
measurementParts := strings.Split(cfg.Measurement, "/")
for i, k := range measurementParts {
if k == "_" || k == "" {
measurementMinLength++
continue
}
if k == "#" {
measurementInvert = true
continue
}
if p.extractMeasurement {
return nil, errors.New("measurement can only contain one element")
}
if !measurementInvert {
p.measurementIndex = i
} else {
p.measurementIndex = i - len(measurementParts)
}
p.extractMeasurement = true
measurementMinLength++
}
// Determine tag selections
var tagMinLength int
var tagInvert bool
tagParts := strings.Split(cfg.Tags, "/")
p.tagIndices = make(map[string]int, len(tagParts))
for i, k := range tagParts {
if k == "_" || k == "" {
tagMinLength++
continue
}
if k == "#" {
tagInvert = true
continue
}
if !tagInvert {
p.tagIndices[k] = i
} else {
p.tagIndices[k] = i - len(tagParts)
}
tagMinLength++
}
// Determine tag selections
var fieldMinLength int
var fieldInvert bool
fieldParts := strings.Split(cfg.Fields, "/")
p.fieldIndices = make(map[string]int, len(fieldParts))
for i, k := range fieldParts {
if k == "_" || k == "" {
fieldMinLength++
continue
}
if k == "#" {
fieldInvert = true
continue
}
if !fieldInvert {
p.fieldIndices[k] = i
} else {
p.fieldIndices[k] = i - len(fieldParts)
}
fieldMinLength++
}
if !p.topicVarLength {
if measurementMinLength != topicMinLength && p.extractMeasurement {
return nil, errors.New("measurement length does not equal topic length")
}
if fieldMinLength != topicMinLength && cfg.Fields != "" {
return nil, errors.New("fields length does not equal topic length")
}
if tagMinLength != topicMinLength && cfg.Tags != "" {
return nil, errors.New("tags length does not equal topic length")
}
}
p.topicMinLength = max(topicMinLength, measurementMinLength, tagMinLength, fieldMinLength)
return p, nil
}
func (p *TopicParser) Parse(metric telegraf.Metric, topic string) error {
// Split the actual topic into its elements and check for a match
topicParts := strings.Split(topic, "/")
if p.topicVarLength && len(topicParts) < p.topicMinLength || !p.topicVarLength && len(topicParts) != p.topicMinLength {
return nil
}
for expected, i := range p.topicIndices {
if i >= 0 && topicParts[i] != expected || i < 0 && topicParts[len(topicParts)+i] != expected {
return nil
}
}
// Extract the measurement name
var measurement string
if p.extractMeasurement {
if p.measurementIndex >= 0 {
measurement = topicParts[p.measurementIndex]
} else {
measurement = topicParts[len(topicParts)+p.measurementIndex]
}
metric.SetName(measurement)
}
// Extract the tags
for k, i := range p.tagIndices {
if i >= 0 {
metric.AddTag(k, topicParts[i])
} else {
metric.AddTag(k, topicParts[len(topicParts)+i])
}
}
// Extract the fields
for k, i := range p.fieldIndices {
var raw string
if i >= 0 {
raw = topicParts[i]
} else {
raw = topicParts[len(topicParts)+i]
}
v, err := p.convertToFieldType(raw, k)
if err != nil {
return err
}
metric.AddField(k, v)
}
return nil
}
func (p *TopicParser) convertToFieldType(value string, key string) (interface{}, error) {
// If the user configured inputs.mqtt_consumer.topic.types, check for the desired type
desiredType, ok := p.fieldTypes[key]
if !ok {
return value, nil
}
var v interface{}
var err error
switch desiredType {
case "uint":
if v, err = strconv.ParseUint(value, 10, 64); err != nil {
return nil, fmt.Errorf("unable to convert field %q to type uint: %w", value, err)
}
case "int":
if v, err = strconv.ParseInt(value, 10, 64); err != nil {
return nil, fmt.Errorf("unable to convert field %q to type int: %w", value, err)
}
case "float":
if v, err = strconv.ParseFloat(value, 64); err != nil {
return nil, fmt.Errorf("unable to convert field %q to type float: %w", value, err)
}
default:
return nil, fmt.Errorf("converting to the type %s is not supported: use int, uint, or float", desiredType)
}
return v, nil
}