diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md index 3fd128eb8..19b57f79a 100644 --- a/plugins/inputs/mqtt_consumer/README.md +++ b/plugins/inputs/mqtt_consumer/README.md @@ -3,7 +3,7 @@ The [MQTT][mqtt] consumer plugin reads from the specified MQTT topics and creates metrics using one of the supported [input data formats][]. -### Configuration +## Configuration ```toml [[inputs.mqtt_consumer]] @@ -73,6 +73,63 @@ and creates metrics using one of the supported [input data formats][]. ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" + + ## Enable extracting tag values from MQTT topics + ## _ denotes an ignored entry in the topic path + # [[inputs.mqtt_consumer.topic_parsing]] + # topic = "" + # measurement = "" + # tags = "" + # fields = "" + ## Value supported is int, float, unit + # [[inputs.mqtt_consumer.topic.types]] + # key = type +``` + +## About Topic Parsing + +The MQTT topic as a whole is stored as a tag, but this can be far too coarse +to be easily used when utilizing the data further down the line. This +change allows tag values to be extracted from the MQTT topic letting you +store the information provided in the topic in a meaningful way. An `_` denotes an +ignored entry in the topic path. Please see the following example. + +## Example Configuration for topic parsing + +```toml +[[inputs.mqtt_consumer]] + ## Broker URLs for the MQTT server or cluster. To connect to multiple + ## clusters or standalone servers, use a separate plugin instance. + ## example: servers = ["tcp://localhost:1883"] + ## servers = ["ssl://localhost:1883"] + ## servers = ["ws://localhost:1883"] + servers = ["tcp://127.0.0.1:1883"] + + ## Topics that will be subscribed to. + topics = [ + "telegraf/+/cpu/23", + ] + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "value" + data_type = "float" + + [[inputs.mqtt_consumer.topic_parsing]] + topic = "telegraf/one/cpu/23" + measurement = "_/_/measurement/_" + tags = "tag/_/_/_" + fields = "_/_/_/test" + [inputs.mqtt_consumer.topic_parsing.types] + test = "int" +``` + +Result: + +```shell +cpu,host=pop-os,tag=telegraf,topic=telegraf/one/cpu/23 value=45,test=23i 1637014942460689291 ``` ### Metrics @@ -80,5 +137,7 @@ and creates metrics using one of the supported [input data formats][]. - All measurements are tagged with the incoming topic, ie `topic=telegraf/host01/cpu` +- example when [[inputs.mqtt_consumer.topic_parsing]] is set + [mqtt]: https://mqtt.org [input data formats]: /docs/DATA_FORMATS_INPUT.md diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 3e88cecbb..890ed9f5d 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -4,12 +4,12 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" @@ -20,8 +20,7 @@ import ( var ( // 30 Seconds is the default used by paho.mqtt.golang - defaultConnectionTimeout = config.Duration(30 * time.Second) - + defaultConnectionTimeout = config.Duration(30 * time.Second) defaultMaxUndeliveredMessages = 1000 ) @@ -41,42 +40,47 @@ type Client interface { AddRoute(topic string, callback mqtt.MessageHandler) Disconnect(quiesce uint) } - type ClientFactory func(o *mqtt.ClientOptions) Client - +type TopicParsingConfig struct { + Topic string `toml:"topic"` + Measurement string `toml:"measurement"` + Tags string `toml:"tags"` + Fields string `toml:"fields"` + FieldTypes map[string]string `toml:"types"` + // cached split of user given information + MeasurementIndex int + SplitTags []string + SplitFields []string + SplitTopic []string +} type MQTTConsumer struct { - Servers []string `toml:"servers"` - Topics []string `toml:"topics"` - TopicTag *string `toml:"topic_tag"` - Username string `toml:"username"` - Password string `toml:"password"` - QoS int `toml:"qos"` - ConnectionTimeout config.Duration `toml:"connection_timeout"` - MaxUndeliveredMessages int `toml:"max_undelivered_messages"` - - parser parsers.Parser - + Servers []string `toml:"servers"` + Topics []string `toml:"topics"` + TopicTag *string `toml:"topic_tag"` + TopicParsing []TopicParsingConfig `toml:"topic_parsing"` + Username string `toml:"username"` + Password string `toml:"password"` + QoS int `toml:"qos"` + ConnectionTimeout config.Duration `toml:"connection_timeout"` + MaxUndeliveredMessages int `toml:"max_undelivered_messages"` + parser parsers.Parser // Legacy metric buffer support; deprecated in v0.10.3 - MetricBuffer int - + MetricBuffer int PersistentSession bool ClientID string `toml:"client_id"` tls.ClientConfig - - Log telegraf.Logger - - clientFactory ClientFactory - client Client - opts *mqtt.ClientOptions - acc telegraf.TrackingAccumulator - state ConnectionState - sem semaphore - messages map[telegraf.TrackingID]bool - messagesMutex sync.Mutex - chosenTopicTag string - - ctx context.Context - cancel context.CancelFunc + Log telegraf.Logger + clientFactory ClientFactory + client Client + opts *mqtt.ClientOptions + acc telegraf.TrackingAccumulator + state ConnectionState + sem semaphore + messages map[telegraf.TrackingID]bool + messagesMutex sync.Mutex + topicTagParse string + ctx context.Context + cancel context.CancelFunc } var sampleConfig = ` @@ -86,18 +90,20 @@ var sampleConfig = ` ## servers = ["ssl://localhost:1883"] ## servers = ["ws://localhost:1883"] servers = ["tcp://127.0.0.1:1883"] - ## Topics that will be subscribed to. topics = [ "telegraf/host01/cpu", "telegraf/+/mem", "sensors/#", ] - + ## Enable extracting tag values from MQTT topics + ## _ denotes an ignored entry in the topic path + # topic_tags = "_/format/client/_" + # topic_measurement = "measurement/_/_/_" + # topic_fields = "_/_/_/temperature" ## The message topic will be stored in a tag specified by this value. If set ## to the empty string no topic tag will be created. # topic_tag = "topic" - ## QoS policy for messages ## 0 = at most once ## 1 = at least once @@ -106,10 +112,8 @@ var sampleConfig = ` ## When using a QoS of 1 or 2, you should enable persistent_session to allow ## resuming unacknowledged messages. # qos = 0 - ## Connection timeout for initial connection in seconds # connection_timeout = "30s" - ## Maximum messages to read from the broker that have not been written by an ## output. For best throughput set based on the number of metrics within ## each message and the size of the output's metric_batch_size. @@ -119,87 +123,103 @@ var sampleConfig = ` ## full batch is collected and the write is triggered immediately without ## waiting until the next flush_interval. # max_undelivered_messages = 1000 - ## Persistent session disables clearing of the client session on connection. ## In order for this option to work you must also set client_id to identify ## the client. To receive messages that arrived while the client is offline, ## also set the qos option to 1 or 2 and don't forget to also set the QoS when ## publishing. # persistent_session = false - ## If unset, a random client ID will be generated. # client_id = "" - ## Username and password to connect MQTT server. # username = "telegraf" # password = "metricsmetricsmetricsmetrics" - ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification # insecure_skip_verify = false - ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" + ## Enable extracting tag values from MQTT topics + ## _ denotes an ignored entry in the topic path + ## [[inputs.mqtt_consumer.topic_parsing]] + ## topic = "" + ## measurement = "" + ## tags = "" + ## fields = "" + ## [inputs.mqtt_consumer.topic_parsing.types] + ## ` func (m *MQTTConsumer) SampleConfig() string { return sampleConfig } - func (m *MQTTConsumer) Description() string { return "Read metrics from MQTT topic(s)" } - func (m *MQTTConsumer) SetParser(parser parsers.Parser) { m.parser = parser } - func (m *MQTTConsumer) Init() error { m.state = Disconnected - if m.PersistentSession && m.ClientID == "" { return errors.New("persistent_session requires client_id") } - if m.QoS > 2 || m.QoS < 0 { return fmt.Errorf("qos value must be 0, 1, or 2: %d", m.QoS) } - if time.Duration(m.ConnectionTimeout) < 1*time.Second { return fmt.Errorf("connection_timeout must be greater than 1s: %s", time.Duration(m.ConnectionTimeout)) } - - m.chosenTopicTag = "topic" + m.topicTagParse = "topic" if m.TopicTag != nil { - m.chosenTopicTag = *m.TopicTag + m.topicTagParse = *m.TopicTag } - opts, err := m.createOpts() if err != nil { return err } - m.opts = opts m.messages = map[telegraf.TrackingID]bool{} + for i, p := range m.TopicParsing { + splitMeasurement := strings.Split(p.Measurement, "/") + for j := range splitMeasurement { + if splitMeasurement[j] != "_" { + m.TopicParsing[i].MeasurementIndex = j + break + } + } + m.TopicParsing[i].SplitTags = strings.Split(p.Tags, "/") + m.TopicParsing[i].SplitFields = strings.Split(p.Fields, "/") + m.TopicParsing[i].SplitTopic = strings.Split(p.Topic, "/") + + if len(splitMeasurement) != len(m.TopicParsing[i].SplitTopic) { + return fmt.Errorf("config error topic parsing: measurement length does not equal topic length") + } + + if len(m.TopicParsing[i].SplitFields) != len(m.TopicParsing[i].SplitTopic) { + return fmt.Errorf("config error topic parsing: fields length does not equal topic length") + } + + if len(m.TopicParsing[i].SplitTags) != len(m.TopicParsing[i].SplitTopic) { + return fmt.Errorf("config error topic parsing: tags length does not equal topic length") + } + } + return nil } - func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { m.state = Disconnected - m.acc = acc.WithTracking(m.MaxUndeliveredMessages) m.sem = make(semaphore, m.MaxUndeliveredMessages) m.ctx, m.cancel = context.WithCancel(context.Background()) - m.client = m.clientFactory(m.opts) - // AddRoute sets up the function for handling messages. These need to be // added in case we find a persistent session containing subscriptions so we // know where to dispatch persisted and new messages to. In the alternate @@ -207,11 +227,9 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { for _, topic := range m.Topics { m.client.AddRoute(topic, m.recvMessage) } - m.state = Connecting return m.connect() } - func (m *MQTTConsumer) connect() error { token := m.client.Connect() if token.Wait() && token.Error() != nil { @@ -219,10 +237,8 @@ func (m *MQTTConsumer) connect() error { m.state = Disconnected return err } - m.Log.Infof("Connected %v", m.Servers) m.state = Connected - // Persistent sessions should skip subscription if a session is present, as // the subscriptions are stored by the server. type sessionPresent interface { @@ -232,28 +248,23 @@ func (m *MQTTConsumer) connect() error { m.Log.Debugf("Session found %v", m.Servers) return nil } - topics := make(map[string]byte) for _, topic := range m.Topics { topics[topic] = byte(m.QoS) } - subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage) subscribeToken.Wait() if subscribeToken.Error() != nil { m.acc.AddError(fmt.Errorf("subscription error: topics: %s: %v", strings.Join(m.Topics[:], ","), subscribeToken.Error())) } - return nil } - func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) { m.acc.AddError(fmt.Errorf("connection lost: %v", err)) m.Log.Debugf("Disconnected %v", m.Servers) m.state = Disconnected } - func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) { for { select { @@ -279,26 +290,60 @@ func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) { } } +// compareTopics is used to support the mqtt wild card `+` which allows for one topic of any value +func compareTopics(expected []string, incoming []string) bool { + if len(expected) != len(incoming) { + return false + } + + for i, expected := range expected { + if incoming[i] != expected && expected != "+" { + return false + } + } + + return true +} + func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Message) error { metrics, err := m.parser.Parse(msg.Payload()) if err != nil { return err } - if m.chosenTopicTag != "" { - topic := msg.Topic() - for _, metric := range metrics { - metric.AddTag(m.chosenTopicTag, topic) + for _, metric := range metrics { + if m.topicTagParse != "" { + metric.AddTag(m.topicTagParse, msg.Topic()) + } + for _, p := range m.TopicParsing { + values := strings.Split(msg.Topic(), "/") + if !compareTopics(p.SplitTopic, values) { + continue + } + + if p.Measurement != "" { + metric.SetName(values[p.MeasurementIndex]) + } + if p.Tags != "" { + err := parseMetric(p.SplitTags, values, p.FieldTypes, true, metric) + if err != nil { + return err + } + } + if p.Fields != "" { + err := parseMetric(p.SplitFields, values, p.FieldTypes, false, metric) + if err != nil { + return err + } + } } } - id := acc.AddTrackingMetricGroup(metrics) m.messagesMutex.Lock() m.messages[id] = true m.messagesMutex.Unlock() return nil } - func (m *MQTTConsumer) Stop() { if m.state == Connected { m.Log.Debugf("Disconnecting %v", m.Servers) @@ -308,37 +353,29 @@ func (m *MQTTConsumer) Stop() { } m.cancel() } - func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error { if m.state == Disconnected { m.state = Connecting m.Log.Debugf("Connecting %v", m.Servers) return m.connect() } - return nil } - func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts := mqtt.NewClientOptions() - opts.ConnectTimeout = time.Duration(m.ConnectionTimeout) - if m.ClientID == "" { opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) } else { opts.SetClientID(m.ClientID) } - tlsCfg, err := m.ClientConfig.TLSConfig() if err != nil { return nil, err } - if tlsCfg != nil { opts.SetTLSConfig(tlsCfg) } - user := m.Username if user != "" { opts.SetUsername(user) @@ -347,11 +384,9 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { if password != "" { opts.SetPassword(password) } - if len(m.Servers) == 0 { return opts, fmt.Errorf("could not get host informations") } - for _, server := range m.Servers { // Preserve support for host:port style servers; deprecated in Telegraf 1.4.4 if !strings.Contains(server, "://") { @@ -362,17 +397,72 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { server = "ssl://" + server } } - opts.AddBroker(server) } opts.SetAutoReconnect(false) opts.SetKeepAlive(time.Second * 60) opts.SetCleanSession(!m.PersistentSession) opts.SetConnectionLostHandler(m.onConnectionLost) - return opts, nil } +// parseFields gets multiple fields from the topic based on the user configuration (TopicParsing.Fields) +func parseMetric(keys []string, values []string, types map[string]string, isTag bool, metric telegraf.Metric) error { + var metricFound bool + for i, k := range keys { + if k == "_" { + continue + } + + if isTag { + metric.AddTag(k, values[i]) + metricFound = true + } else { + newType, err := typeConvert(types, values[i], k) + if err != nil { + return err + } + metric.AddField(k, newType) + metricFound = true + } + } + if !metricFound { + return fmt.Errorf("no fields or tags found") + } + return nil +} + +func typeConvert(types map[string]string, topicValue string, key string) (interface{}, error) { + var newType interface{} + var err error + // If the user configured inputs.mqtt_consumer.topic.types, check for the desired type + if desiredType, ok := types[key]; ok { + switch desiredType { + case "uint": + newType, err = strconv.ParseUint(topicValue, 10, 64) + if err != nil { + return nil, fmt.Errorf("unable to convert field '%s' to type uint: %v", topicValue, err) + } + case "int": + newType, err = strconv.ParseInt(topicValue, 10, 64) + if err != nil { + return nil, fmt.Errorf("unable to convert field '%s' to type int: %v", topicValue, err) + } + case "float": + newType, err = strconv.ParseFloat(topicValue, 64) + if err != nil { + return nil, fmt.Errorf("unable to convert field '%s' to type float: %v", topicValue, err) + } + default: + return nil, fmt.Errorf("converting to the type %s is not supported: use int, uint, or float", desiredType) + } + } else { + newType = topicValue + } + + return newType, nil +} + func New(factory ClientFactory) *MQTTConsumer { return &MQTTConsumer{ Servers: []string{"tcp://127.0.0.1:1883"}, @@ -382,7 +472,6 @@ func New(factory ClientFactory) *MQTTConsumer { state: Disconnected, } } - func init() { inputs.Add("mqtt_consumer", func() telegraf.Input { return New(func(o *mqtt.ClientOptions) Client { diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index a9b85c108..7ba560997 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -1,6 +1,7 @@ package mqtt_consumer import ( + "fmt" "testing" "time" @@ -153,6 +154,7 @@ func TestPersistentClientIDFail(t *testing.T) { } type Message struct { + topic string } func (m *Message) Duplicate() bool { @@ -168,7 +170,7 @@ func (m *Message) Retained() bool { } func (m *Message) Topic() string { - return "telegraf" + return m.topic } func (m *Message) MessageID() uint16 { @@ -185,12 +187,16 @@ func (m *Message) Ack() { func TestTopicTag(t *testing.T) { tests := []struct { - name string - topicTag func() *string - expected []telegraf.Metric + name string + topic string + topicTag func() *string + expectedError error + topicParsing []TopicParsingConfig + expected []telegraf.Metric }{ { - name: "default topic when topic tag is unset for backwards compatibility", + name: "default topic when topic tag is unset for backwards compatibility", + topic: "telegraf", topicTag: func() *string { return nil }, @@ -208,7 +214,8 @@ func TestTopicTag(t *testing.T) { }, }, { - name: "use topic tag when set", + name: "use topic tag when set", + topic: "telegraf", topicTag: func() *string { tag := "topic_tag" return &tag @@ -227,7 +234,8 @@ func TestTopicTag(t *testing.T) { }, }, { - name: "no topic tag is added when topic tag is set to the empty string", + name: "no topic tag is added when topic tag is set to the empty string", + topic: "telegraf", topicTag: func() *string { tag := "" return &tag @@ -243,6 +251,105 @@ func TestTopicTag(t *testing.T) { ), }, }, + { + name: "topic parsing configured", + topic: "telegraf/123/test", + topicTag: func() *string { + tag := "" + return &tag + }, + topicParsing: []TopicParsingConfig{ + { + Topic: "telegraf/123/test", + Measurement: "_/_/measurement", + Tags: "testTag/_/_", + Fields: "_/testNumber/_", + FieldTypes: map[string]string{ + "testNumber": "int", + }, + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{ + "testTag": "telegraf", + }, + map[string]interface{}{ + "testNumber": 123, + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "topic parsing configured with a mqtt wild card `+`", + topic: "telegraf/123/test/hello", + topicTag: func() *string { + tag := "" + return &tag + }, + topicParsing: []TopicParsingConfig{ + { + Topic: "telegraf/+/test/hello", + Measurement: "_/_/measurement/_", + Tags: "testTag/_/_/_", + Fields: "_/testNumber/_/testString", + FieldTypes: map[string]string{ + "testNumber": "int", + }, + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{ + "testTag": "telegraf", + }, + map[string]interface{}{ + "testNumber": 123, + "testString": "hello", + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "topic parsing configured incorrectly", + topic: "telegraf/123/test/hello", + topicTag: func() *string { + tag := "" + return &tag + }, + expectedError: fmt.Errorf("config error topic parsing: fields length does not equal topic length"), + topicParsing: []TopicParsingConfig{ + { + Topic: "telegraf/+/test/hello", + Measurement: "_/_/measurement/_", + Tags: "testTag/_/_/_", + Fields: "_/_/testNumber:int/_/testString:string", + FieldTypes: map[string]string{ + "testNumber": "int", + }, + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{ + "testTag": "telegraf", + }, + map[string]interface{}{ + "testNumber": 123, + "testString": "hello", + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -265,21 +372,28 @@ func TestTopicTag(t *testing.T) { return client }) plugin.Log = testutil.Logger{} - plugin.Topics = []string{"telegraf"} + plugin.Topics = []string{tt.topic} plugin.TopicTag = tt.topicTag() + plugin.TopicParsing = tt.topicParsing parser, err := parsers.NewInfluxParser() require.NoError(t, err) plugin.SetParser(parser) err = plugin.Init() - require.NoError(t, err) + require.Equal(t, tt.expectedError, err) + if tt.expectedError != nil { + return + } var acc testutil.Accumulator err = plugin.Start(&acc) require.NoError(t, err) - handler(nil, &Message{}) + var m Message + m.topic = tt.topic + + handler(nil, &m) plugin.Stop()