feat: enable extracting tag values from MQTT topics (#9995)

This commit is contained in:
Mya 2021-11-23 08:20:39 -07:00 committed by GitHub
parent 5f9bd0d951
commit b89ef94777
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 359 additions and 97 deletions

View File

@ -3,7 +3,7 @@
The [MQTT][mqtt] consumer plugin reads from the specified MQTT topics
and creates metrics using one of the supported [input data formats][].
### Configuration
## Configuration
```toml
[[inputs.mqtt_consumer]]
@ -73,6 +73,63 @@ and creates metrics using one of the supported [input data formats][].
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""
# tags = ""
# fields = ""
## Value supported is int, float, unit
# [[inputs.mqtt_consumer.topic.types]]
# key = type
```
## About Topic Parsing
The MQTT topic as a whole is stored as a tag, but this can be far too coarse
to be easily used when utilizing the data further down the line. This
change allows tag values to be extracted from the MQTT topic letting you
store the information provided in the topic in a meaningful way. An `_` denotes an
ignored entry in the topic path. Please see the following example.
## Example Configuration for topic parsing
```toml
[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
## example: servers = ["tcp://localhost:1883"]
## servers = ["ssl://localhost:1883"]
## servers = ["ws://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]
## Topics that will be subscribed to.
topics = [
"telegraf/+/cpu/23",
]
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "value"
data_type = "float"
[[inputs.mqtt_consumer.topic_parsing]]
topic = "telegraf/one/cpu/23"
measurement = "_/_/measurement/_"
tags = "tag/_/_/_"
fields = "_/_/_/test"
[inputs.mqtt_consumer.topic_parsing.types]
test = "int"
```
Result:
```shell
cpu,host=pop-os,tag=telegraf,topic=telegraf/one/cpu/23 value=45,test=23i 1637014942460689291
```
### Metrics
@ -80,5 +137,7 @@ and creates metrics using one of the supported [input data formats][].
- All measurements are tagged with the incoming topic, ie
`topic=telegraf/host01/cpu`
- example when [[inputs.mqtt_consumer.topic_parsing]] is set
[mqtt]: https://mqtt.org
[input data formats]: /docs/DATA_FORMATS_INPUT.md

View File

@ -4,12 +4,12 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
@ -20,8 +20,7 @@ import (
var (
// 30 Seconds is the default used by paho.mqtt.golang
defaultConnectionTimeout = config.Duration(30 * time.Second)
defaultConnectionTimeout = config.Duration(30 * time.Second)
defaultMaxUndeliveredMessages = 1000
)
@ -41,42 +40,47 @@ type Client interface {
AddRoute(topic string, callback mqtt.MessageHandler)
Disconnect(quiesce uint)
}
type ClientFactory func(o *mqtt.ClientOptions) Client
type TopicParsingConfig struct {
Topic string `toml:"topic"`
Measurement string `toml:"measurement"`
Tags string `toml:"tags"`
Fields string `toml:"fields"`
FieldTypes map[string]string `toml:"types"`
// cached split of user given information
MeasurementIndex int
SplitTags []string
SplitFields []string
SplitTopic []string
}
type MQTTConsumer struct {
Servers []string `toml:"servers"`
Topics []string `toml:"topics"`
TopicTag *string `toml:"topic_tag"`
Username string `toml:"username"`
Password string `toml:"password"`
QoS int `toml:"qos"`
ConnectionTimeout config.Duration `toml:"connection_timeout"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
parser parsers.Parser
Servers []string `toml:"servers"`
Topics []string `toml:"topics"`
TopicTag *string `toml:"topic_tag"`
TopicParsing []TopicParsingConfig `toml:"topic_parsing"`
Username string `toml:"username"`
Password string `toml:"password"`
QoS int `toml:"qos"`
ConnectionTimeout config.Duration `toml:"connection_timeout"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
parser parsers.Parser
// Legacy metric buffer support; deprecated in v0.10.3
MetricBuffer int
MetricBuffer int
PersistentSession bool
ClientID string `toml:"client_id"`
tls.ClientConfig
Log telegraf.Logger
clientFactory ClientFactory
client Client
opts *mqtt.ClientOptions
acc telegraf.TrackingAccumulator
state ConnectionState
sem semaphore
messages map[telegraf.TrackingID]bool
messagesMutex sync.Mutex
chosenTopicTag string
ctx context.Context
cancel context.CancelFunc
Log telegraf.Logger
clientFactory ClientFactory
client Client
opts *mqtt.ClientOptions
acc telegraf.TrackingAccumulator
state ConnectionState
sem semaphore
messages map[telegraf.TrackingID]bool
messagesMutex sync.Mutex
topicTagParse string
ctx context.Context
cancel context.CancelFunc
}
var sampleConfig = `
@ -86,18 +90,20 @@ var sampleConfig = `
## servers = ["ssl://localhost:1883"]
## servers = ["ws://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]
## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]
## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
# topic_tags = "_/format/client/_"
# topic_measurement = "measurement/_/_/_"
# topic_fields = "_/_/_/temperature"
## The message topic will be stored in a tag specified by this value. If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"
## QoS policy for messages
## 0 = at most once
## 1 = at least once
@ -106,10 +112,8 @@ var sampleConfig = `
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
# qos = 0
## Connection timeout for initial connection in seconds
# connection_timeout = "30s"
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
@ -119,87 +123,103 @@ var sampleConfig = `
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identify
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false
## If unset, a random client ID will be generated.
# client_id = ""
## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
## [[inputs.mqtt_consumer.topic_parsing]]
## topic = ""
## measurement = ""
## tags = ""
## fields = ""
## [inputs.mqtt_consumer.topic_parsing.types]
##
`
func (m *MQTTConsumer) SampleConfig() string {
return sampleConfig
}
func (m *MQTTConsumer) Description() string {
return "Read metrics from MQTT topic(s)"
}
func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
m.parser = parser
}
func (m *MQTTConsumer) Init() error {
m.state = Disconnected
if m.PersistentSession && m.ClientID == "" {
return errors.New("persistent_session requires client_id")
}
if m.QoS > 2 || m.QoS < 0 {
return fmt.Errorf("qos value must be 0, 1, or 2: %d", m.QoS)
}
if time.Duration(m.ConnectionTimeout) < 1*time.Second {
return fmt.Errorf("connection_timeout must be greater than 1s: %s", time.Duration(m.ConnectionTimeout))
}
m.chosenTopicTag = "topic"
m.topicTagParse = "topic"
if m.TopicTag != nil {
m.chosenTopicTag = *m.TopicTag
m.topicTagParse = *m.TopicTag
}
opts, err := m.createOpts()
if err != nil {
return err
}
m.opts = opts
m.messages = map[telegraf.TrackingID]bool{}
for i, p := range m.TopicParsing {
splitMeasurement := strings.Split(p.Measurement, "/")
for j := range splitMeasurement {
if splitMeasurement[j] != "_" {
m.TopicParsing[i].MeasurementIndex = j
break
}
}
m.TopicParsing[i].SplitTags = strings.Split(p.Tags, "/")
m.TopicParsing[i].SplitFields = strings.Split(p.Fields, "/")
m.TopicParsing[i].SplitTopic = strings.Split(p.Topic, "/")
if len(splitMeasurement) != len(m.TopicParsing[i].SplitTopic) {
return fmt.Errorf("config error topic parsing: measurement length does not equal topic length")
}
if len(m.TopicParsing[i].SplitFields) != len(m.TopicParsing[i].SplitTopic) {
return fmt.Errorf("config error topic parsing: fields length does not equal topic length")
}
if len(m.TopicParsing[i].SplitTags) != len(m.TopicParsing[i].SplitTopic) {
return fmt.Errorf("config error topic parsing: tags length does not equal topic length")
}
}
return nil
}
func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.state = Disconnected
m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.sem = make(semaphore, m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background())
m.client = m.clientFactory(m.opts)
// AddRoute sets up the function for handling messages. These need to be
// added in case we find a persistent session containing subscriptions so we
// know where to dispatch persisted and new messages to. In the alternate
@ -207,11 +227,9 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
for _, topic := range m.Topics {
m.client.AddRoute(topic, m.recvMessage)
}
m.state = Connecting
return m.connect()
}
func (m *MQTTConsumer) connect() error {
token := m.client.Connect()
if token.Wait() && token.Error() != nil {
@ -219,10 +237,8 @@ func (m *MQTTConsumer) connect() error {
m.state = Disconnected
return err
}
m.Log.Infof("Connected %v", m.Servers)
m.state = Connected
// Persistent sessions should skip subscription if a session is present, as
// the subscriptions are stored by the server.
type sessionPresent interface {
@ -232,28 +248,23 @@ func (m *MQTTConsumer) connect() error {
m.Log.Debugf("Session found %v", m.Servers)
return nil
}
topics := make(map[string]byte)
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
}
subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage)
subscribeToken.Wait()
if subscribeToken.Error() != nil {
m.acc.AddError(fmt.Errorf("subscription error: topics: %s: %v",
strings.Join(m.Topics[:], ","), subscribeToken.Error()))
}
return nil
}
func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) {
m.acc.AddError(fmt.Errorf("connection lost: %v", err))
m.Log.Debugf("Disconnected %v", m.Servers)
m.state = Disconnected
}
func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) {
for {
select {
@ -279,26 +290,60 @@ func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) {
}
}
// compareTopics is used to support the mqtt wild card `+` which allows for one topic of any value
func compareTopics(expected []string, incoming []string) bool {
if len(expected) != len(incoming) {
return false
}
for i, expected := range expected {
if incoming[i] != expected && expected != "+" {
return false
}
}
return true
}
func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Message) error {
metrics, err := m.parser.Parse(msg.Payload())
if err != nil {
return err
}
if m.chosenTopicTag != "" {
topic := msg.Topic()
for _, metric := range metrics {
metric.AddTag(m.chosenTopicTag, topic)
for _, metric := range metrics {
if m.topicTagParse != "" {
metric.AddTag(m.topicTagParse, msg.Topic())
}
for _, p := range m.TopicParsing {
values := strings.Split(msg.Topic(), "/")
if !compareTopics(p.SplitTopic, values) {
continue
}
if p.Measurement != "" {
metric.SetName(values[p.MeasurementIndex])
}
if p.Tags != "" {
err := parseMetric(p.SplitTags, values, p.FieldTypes, true, metric)
if err != nil {
return err
}
}
if p.Fields != "" {
err := parseMetric(p.SplitFields, values, p.FieldTypes, false, metric)
if err != nil {
return err
}
}
}
}
id := acc.AddTrackingMetricGroup(metrics)
m.messagesMutex.Lock()
m.messages[id] = true
m.messagesMutex.Unlock()
return nil
}
func (m *MQTTConsumer) Stop() {
if m.state == Connected {
m.Log.Debugf("Disconnecting %v", m.Servers)
@ -308,37 +353,29 @@ func (m *MQTTConsumer) Stop() {
}
m.cancel()
}
func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error {
if m.state == Disconnected {
m.state = Connecting
m.Log.Debugf("Connecting %v", m.Servers)
return m.connect()
}
return nil
}
func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts := mqtt.NewClientOptions()
opts.ConnectTimeout = time.Duration(m.ConnectionTimeout)
if m.ClientID == "" {
opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
} else {
opts.SetClientID(m.ClientID)
}
tlsCfg, err := m.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
if tlsCfg != nil {
opts.SetTLSConfig(tlsCfg)
}
user := m.Username
if user != "" {
opts.SetUsername(user)
@ -347,11 +384,9 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
if password != "" {
opts.SetPassword(password)
}
if len(m.Servers) == 0 {
return opts, fmt.Errorf("could not get host informations")
}
for _, server := range m.Servers {
// Preserve support for host:port style servers; deprecated in Telegraf 1.4.4
if !strings.Contains(server, "://") {
@ -362,17 +397,72 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
server = "ssl://" + server
}
}
opts.AddBroker(server)
}
opts.SetAutoReconnect(false)
opts.SetKeepAlive(time.Second * 60)
opts.SetCleanSession(!m.PersistentSession)
opts.SetConnectionLostHandler(m.onConnectionLost)
return opts, nil
}
// parseFields gets multiple fields from the topic based on the user configuration (TopicParsing.Fields)
func parseMetric(keys []string, values []string, types map[string]string, isTag bool, metric telegraf.Metric) error {
var metricFound bool
for i, k := range keys {
if k == "_" {
continue
}
if isTag {
metric.AddTag(k, values[i])
metricFound = true
} else {
newType, err := typeConvert(types, values[i], k)
if err != nil {
return err
}
metric.AddField(k, newType)
metricFound = true
}
}
if !metricFound {
return fmt.Errorf("no fields or tags found")
}
return nil
}
func typeConvert(types map[string]string, topicValue string, key string) (interface{}, error) {
var newType interface{}
var err error
// If the user configured inputs.mqtt_consumer.topic.types, check for the desired type
if desiredType, ok := types[key]; ok {
switch desiredType {
case "uint":
newType, err = strconv.ParseUint(topicValue, 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to convert field '%s' to type uint: %v", topicValue, err)
}
case "int":
newType, err = strconv.ParseInt(topicValue, 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to convert field '%s' to type int: %v", topicValue, err)
}
case "float":
newType, err = strconv.ParseFloat(topicValue, 64)
if err != nil {
return nil, fmt.Errorf("unable to convert field '%s' to type float: %v", topicValue, err)
}
default:
return nil, fmt.Errorf("converting to the type %s is not supported: use int, uint, or float", desiredType)
}
} else {
newType = topicValue
}
return newType, nil
}
func New(factory ClientFactory) *MQTTConsumer {
return &MQTTConsumer{
Servers: []string{"tcp://127.0.0.1:1883"},
@ -382,7 +472,6 @@ func New(factory ClientFactory) *MQTTConsumer {
state: Disconnected,
}
}
func init() {
inputs.Add("mqtt_consumer", func() telegraf.Input {
return New(func(o *mqtt.ClientOptions) Client {

View File

@ -1,6 +1,7 @@
package mqtt_consumer
import (
"fmt"
"testing"
"time"
@ -153,6 +154,7 @@ func TestPersistentClientIDFail(t *testing.T) {
}
type Message struct {
topic string
}
func (m *Message) Duplicate() bool {
@ -168,7 +170,7 @@ func (m *Message) Retained() bool {
}
func (m *Message) Topic() string {
return "telegraf"
return m.topic
}
func (m *Message) MessageID() uint16 {
@ -185,12 +187,16 @@ func (m *Message) Ack() {
func TestTopicTag(t *testing.T) {
tests := []struct {
name string
topicTag func() *string
expected []telegraf.Metric
name string
topic string
topicTag func() *string
expectedError error
topicParsing []TopicParsingConfig
expected []telegraf.Metric
}{
{
name: "default topic when topic tag is unset for backwards compatibility",
name: "default topic when topic tag is unset for backwards compatibility",
topic: "telegraf",
topicTag: func() *string {
return nil
},
@ -208,7 +214,8 @@ func TestTopicTag(t *testing.T) {
},
},
{
name: "use topic tag when set",
name: "use topic tag when set",
topic: "telegraf",
topicTag: func() *string {
tag := "topic_tag"
return &tag
@ -227,7 +234,8 @@ func TestTopicTag(t *testing.T) {
},
},
{
name: "no topic tag is added when topic tag is set to the empty string",
name: "no topic tag is added when topic tag is set to the empty string",
topic: "telegraf",
topicTag: func() *string {
tag := ""
return &tag
@ -243,6 +251,105 @@ func TestTopicTag(t *testing.T) {
),
},
},
{
name: "topic parsing configured",
topic: "telegraf/123/test",
topicTag: func() *string {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
{
Topic: "telegraf/123/test",
Measurement: "_/_/measurement",
Tags: "testTag/_/_",
Fields: "_/testNumber/_",
FieldTypes: map[string]string{
"testNumber": "int",
},
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{
"testTag": "telegraf",
},
map[string]interface{}{
"testNumber": 123,
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
{
name: "topic parsing configured with a mqtt wild card `+`",
topic: "telegraf/123/test/hello",
topicTag: func() *string {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
{
Topic: "telegraf/+/test/hello",
Measurement: "_/_/measurement/_",
Tags: "testTag/_/_/_",
Fields: "_/testNumber/_/testString",
FieldTypes: map[string]string{
"testNumber": "int",
},
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{
"testTag": "telegraf",
},
map[string]interface{}{
"testNumber": 123,
"testString": "hello",
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
{
name: "topic parsing configured incorrectly",
topic: "telegraf/123/test/hello",
topicTag: func() *string {
tag := ""
return &tag
},
expectedError: fmt.Errorf("config error topic parsing: fields length does not equal topic length"),
topicParsing: []TopicParsingConfig{
{
Topic: "telegraf/+/test/hello",
Measurement: "_/_/measurement/_",
Tags: "testTag/_/_/_",
Fields: "_/_/testNumber:int/_/testString:string",
FieldTypes: map[string]string{
"testNumber": "int",
},
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{
"testTag": "telegraf",
},
map[string]interface{}{
"testNumber": 123,
"testString": "hello",
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -265,21 +372,28 @@ func TestTopicTag(t *testing.T) {
return client
})
plugin.Log = testutil.Logger{}
plugin.Topics = []string{"telegraf"}
plugin.Topics = []string{tt.topic}
plugin.TopicTag = tt.topicTag()
plugin.TopicParsing = tt.topicParsing
parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
plugin.SetParser(parser)
err = plugin.Init()
require.NoError(t, err)
require.Equal(t, tt.expectedError, err)
if tt.expectedError != nil {
return
}
var acc testutil.Accumulator
err = plugin.Start(&acc)
require.NoError(t, err)
handler(nil, &Message{})
var m Message
m.topic = tt.topic
handler(nil, &m)
plugin.Stop()