feat(outputs.mqtt): Add option to specify topic layouts (#12697)

This commit is contained in:
Sven Rebhan 2023-02-28 19:44:35 +01:00 committed by GitHub
parent d8db3ca3a2
commit 6e3af9d06c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1109 additions and 51 deletions

View File

@ -107,7 +107,9 @@ func (m *mqttv311Client) Connect() (bool, error) {
func (m *mqttv311Client) Publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.qos), m.retain, body)
token.WaitTimeout(m.timeout)
if !token.WaitTimeout(m.timeout) {
return internal.ErrTimeout
}
return token.Error()
}
@ -123,7 +125,7 @@ func (m *mqttv311Client) AddRoute(topic string, callback mqttv3.MessageHandler)
func (m *mqttv311Client) Close() error {
if m.client.IsConnected() {
m.client.Disconnect(20)
m.client.Disconnect(100)
}
return nil
}

View File

@ -95,12 +95,31 @@ to use them.
## When true, metrics will be sent in one MQTT message per flush. Otherwise,
## metrics are written one metric per MQTT message.
## DEPRECATED: Use layout option instead
# batch = false
## When true, metric will have RETAIN flag set, making broker cache entries until someone
## actually reads it
# retain = false
## Layout of the topics published.
## The following choices are available:
## non-batch -- send individual messages, one for each metric
## batch -- send all metric as a single message per MQTT topic
## NOTE: The following options will ignore the 'data_format' option and send single values
## field -- send individual messages for each field, appending its name to the metric topic
## homie-v4 -- send metrics with fields and tags according to the 4.0.0 specs
## see https://homieiot.github.io/specification/
# layout = "non-batch"
## HOMIE specific settings
## The following options provide templates for setting the device name
## and the node-ID for the topics. Both options are MANDATORY and can contain
## {{ .PluginName }} (metric name), {{ .Tag "key"}} (tag reference to 'key')
## or constant strings. The templays MAY NOT contain slashes!
# homie_device_name = ""
# homie_node_id = ""
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
@ -121,3 +140,194 @@ to use them.
# "key1" = "value 1"
# "key2" = "value 2"
```
### `field` layout
This layout will publish one topic per metric __field__, only containing the
value as string. This means that the `data_format` option will be ignored.
For example writing the metrics
```text
modbus,location=main\ building,source=device\ 1,status=ok,type=Machine\ A temperature=21.4,serial\ number="324nlk234r5u9834t",working\ hours=123i,supplied=true 1676522982000000000
modbus,location=main\ building,source=device\ 2,status=offline,type=Machine\ B temperature=25.0,supplied=true 1676522982000000000
```
with configuration
```toml
[[outputs.mqtt]]
topic = 'telegraf/{{ .PluginName }}/{{ .Tag "source" }}'
layout = "field"
...
```
will result in the following topics and values
```text
telegraf/modbus/device 1/temperature 21.4
telegraf/modbus/device 1/serial number 324nlk234r5u9834t
telegraf/modbus/device 1/supplied true
telegraf/modbus/device 1/working hours 123
telegraf/modbus/device 2/temperature 25
telegraf/modbus/device 2/supplied false
```
__NOTE__: Only fields will be output, tags and the timestamp are omitted. To
also output those, please convert them to fields first.
### `homie-v4` layout
This layout will publish metrics according to the
[Homie v4.0 specification][HomieSpecV4]. Here, the `topic` template will be
used to specify the `device-id` path. The __mandatory__ options
`homie_device_name` will specify the content of the `$name` topic of the device,
while `homie_node_id` will provide a template for the `node-id` part of the
topic. Both options can contain [Go templates][GoTemplates] similar to `topic`
with `{{ .PluginName }}` referencing the metric name and `{{ .Tag "key"}}`
referencing the tag with the name `key`.
For example writing the metrics
```text
modbus,source=device\ 1,location=main\ building,type=Machine\ A,status=ok temperature=21.4,serial\ number="324nlk234r5u9834t",working\ hours=123i,supplied=true 1676522982000000000
modbus,source=device\ 2,location=main\ building,type=Machine\ B,status=offline supplied=false 1676522982000000000
modbus,source=device\ 2,location=main\ building,type=Machine\ B,status=online supplied=true,Throughput=12345i,Load\ [%]=81.2,account\ no="T3L3GrAf",Temperature=25.38,Voltage=24.1,Current=100 1676542982000000000
```
with configuration
```toml
[[outputs.mqtt]]
topic = 'telegraf/{{ .PluginName }}'
layout = "homie-v4"
homie_device_name ='{{.PluginName}} plugin'
homie_node_id = '{{.Tag "source"}}'
...
```
will result in the following topics and values
```text
telegraf/modbus/$homie 4.0
telegraf/modbus/$name modbus plugin
telegraf/modbus/$state ready
telegraf/modbus/$nodes device-1
telegraf/modbus/device-1/$name device 1
telegraf/modbus/device-1/$properties location,serial-number,source,status,supplied,temperature,type,working-hours
telegraf/modbus/device-1/location main building
telegraf/modbus/device-1/location/$name location
telegraf/modbus/device-1/location/$datatype string
telegraf/modbus/device-1/status ok
telegraf/modbus/device-1/status/$name status
telegraf/modbus/device-1/status/$datatype string
telegraf/modbus/device-1/type Machine A
telegraf/modbus/device-1/type/$name type
telegraf/modbus/device-1/type/$datatype string
telegraf/modbus/device-1/source device 1
telegraf/modbus/device-1/source/$name source
telegraf/modbus/device-1/source/$datatype string
telegraf/modbus/device-1/temperature 21.4
telegraf/modbus/device-1/temperature/$name temperature
telegraf/modbus/device-1/temperature/$datatype float
telegraf/modbus/device-1/serial-number 324nlk234r5u9834t
telegraf/modbus/device-1/serial-number/$name serial number
telegraf/modbus/device-1/serial-number/$datatype string
telegraf/modbus/device-1/working-hours 123
telegraf/modbus/device-1/working-hours/$name working hours
telegraf/modbus/device-1/working-hours/$datatype integer
telegraf/modbus/device-1/supplied true
telegraf/modbus/device-1/supplied/$name supplied
telegraf/modbus/device-1/supplied/$datatype boolean
telegraf/modbus/$nodes device-1,device-2
telegraf/modbus/device-2/$name device 2
telegraf/modbus/device-2/$properties location,source,status,supplied,type
telegraf/modbus/device-2/location main building
telegraf/modbus/device-2/location/$name location
telegraf/modbus/device-2/location/$datatype string
telegraf/modbus/device-2/status offline
telegraf/modbus/device-2/status/$name status
telegraf/modbus/device-2/status/$datatype string
telegraf/modbus/device-2/type Machine B
telegraf/modbus/device-2/type/$name type
telegraf/modbus/device-2/type/$datatype string
telegraf/modbus/device-2/source device 2
telegraf/modbus/device-2/source/$name source
telegraf/modbus/device-2/source/$datatype string
telegraf/modbus/device-2/supplied false
telegraf/modbus/device-2/supplied/$name supplied
telegraf/modbus/device-2/supplied/$datatype boolean
telegraf/modbus/device-2/$properties account-no,current,load,location,source,status,supplied,temperature,throughput,type,voltage
telegraf/modbus/device-2/location main building
telegraf/modbus/device-2/location/$name location
telegraf/modbus/device-2/location/$datatype string
telegraf/modbus/device-2/status online
telegraf/modbus/device-2/status/$name status
telegraf/modbus/device-2/status/$datatype string
telegraf/modbus/device-2/type Machine B
telegraf/modbus/device-2/type/$name type
telegraf/modbus/device-2/type/$datatype string
telegraf/modbus/device-2/source device 2
telegraf/modbus/device-2/source/$name source
telegraf/modbus/device-2/source/$datatype string
telegraf/modbus/device-2/temperature 25.38
telegraf/modbus/device-2/temperature/$name Temperature
telegraf/modbus/device-2/temperature/$datatype float
telegraf/modbus/device-2/voltage 24.1
telegraf/modbus/device-2/voltage/$name Voltage
telegraf/modbus/device-2/voltage/$datatype float
telegraf/modbus/device-2/current 100
telegraf/modbus/device-2/current/$name Current
telegraf/modbus/device-2/current/$datatype float
telegraf/modbus/device-2/throughput 12345
telegraf/modbus/device-2/throughput/$name Throughput
telegraf/modbus/device-2/throughput/$datatype integer
telegraf/modbus/device-2/load 81.2
telegraf/modbus/device-2/load/$name Load [%]
telegraf/modbus/device-2/load/$datatype float
telegraf/modbus/device-2/account-no T3L3GrAf
telegraf/modbus/device-2/account-no/$name account no
telegraf/modbus/device-2/account-no/$datatype string
telegraf/modbus/device-2/supplied true
telegraf/modbus/device-2/supplied/$name supplied
telegraf/modbus/device-2/supplied/$datatype boolean
```
#### Important notes and limitations
It is important to notice that the __"devices" and "nodes" are dynamically
changing__ in Telegraf as the metrics and their structure is not known a-priori.
As a consequence, the content of both `$nodes` and `$properties` topics are
changing as new `device-id`s, `node-id`s and `properties` (i.e. tags and fields)
appear. Best effort is made to limit the number of changes by keeping a
superset of all devices and nodes seen, however especially during startup those
topics will change more often. Both `topic` and `homie_node_id` should be chosen
in a way to group metrics with identical structure!
Furthermore, __lifecycle management of devices is very limited__! Devices will
only be in `ready` state due to the dynamic nature of Telegraf. Due to
limitations in the MQTT client library, it is not possible to set a "will"
dynamically. In consequence, devices are only marked `lost` when exiting
Telegraf normally and might not change in abnormal aborts.
Note that __all field- and tag-names are automatically converted__ to adhere to
the [Homie topic ID specification][HomieSpecV4TopicIDs]. In that process, the
names are converted to lower-case and forbidden character sequences (everything
not being a lower-case character, digit or hyphen) will be replaces by a hyphen.
Finally, leading and trailing hyphens are removed.
This is important as there is a __risk of name collisions__ between fields and
tags of the same node especially after the conversion to ID. Please __make sure
to avoid those collisions__ as otherwise property topics will be sent multiple
times for the colliding items.
[HomieSpecV4]: https://homieiot.github.io/specification/spec-core-v4_0_0
[GoTemplates]: https://pkg.go.dev/text/template
[HomieSpecV4TopicIDs]: https://homieiot.github.io/specification/#topic-ids

View File

@ -0,0 +1,135 @@
package mqtt
import (
"errors"
"fmt"
"regexp"
"sort"
"strings"
"text/template"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
)
var idRe = regexp.MustCompile(`([^a-z0-9]+)`)
func (m *MQTT) collectHomieDeviceMessages(topic string, metric telegraf.Metric) ([]message, string, error) {
var messages []message
// Check if the device-id is already registered
if _, found := m.homieSeen[topic]; !found {
deviceName, err := m.homieDeviceNameGenerator.Generate(metric)
if err != nil {
return nil, "", fmt.Errorf("generating device name failed: %w", err)
}
messages = append(messages, message{topic + "/$homie", []byte("4.0")})
messages = append(messages, message{topic + "/$name", []byte(deviceName)})
messages = append(messages, message{topic + "/$state", []byte("ready")})
m.homieSeen[topic] = make(map[string]bool)
}
// Generate the node-ID from the metric and fixup invalid characters
nodeName, err := m.homieNodeIDGenerator.Generate(metric)
if err != nil {
return nil, "", fmt.Errorf("generating device ID failed: %w", err)
}
nodeID := normalizeID(nodeName)
if !m.homieSeen[topic][nodeID] {
m.homieSeen[topic][nodeID] = true
nodeIDs := make([]string, 0, len(m.homieSeen[topic]))
for id := range m.homieSeen[topic] {
nodeIDs = append(nodeIDs, id)
}
sort.Strings(nodeIDs)
messages = append(messages, message{
topic + "/$nodes",
[]byte(strings.Join(nodeIDs, ",")),
})
messages = append(messages, message{
topic + "/" + nodeID + "/$name",
[]byte(nodeName),
})
}
properties := make([]string, 0, len(metric.TagList())+len(metric.FieldList()))
for _, tag := range metric.TagList() {
properties = append(properties, normalizeID(tag.Key))
}
for _, field := range metric.FieldList() {
properties = append(properties, normalizeID(field.Key))
}
sort.Strings(properties)
messages = append(messages, message{
topic + "/" + nodeID + "/$properties",
[]byte(strings.Join(properties, ",")),
})
return messages, nodeID, nil
}
func normalizeID(raw string) string {
// IDs in Home can only contain lowercase letters and hyphens
// see https://homieiot.github.io/specification/#topic-ids
id := strings.ToLower(raw)
id = idRe.ReplaceAllString(id, "-")
return strings.Trim(id, "-")
}
func convertType(value interface{}) (val, dtype string, err error) {
v, err := internal.ToString(value)
if err != nil {
return "", "", err
}
switch value.(type) {
case int8, int16, int32, int64, uint8, uint16, uint32, uint64:
return v, "integer", nil
case float32, float64:
return v, "float", nil
case []byte, string, fmt.Stringer:
return v, "string", nil
case bool:
return v, "boolean", nil
}
return "", "", fmt.Errorf("unknown type %T", value)
}
type HomieGenerator struct {
PluginName string
metric telegraf.Metric
template *template.Template
}
func NewHomieGenerator(tmpl string) (*HomieGenerator, error) {
tt, err := template.New("topic_name").Parse(tmpl)
if err != nil {
return nil, err
}
return &HomieGenerator{template: tt}, nil
}
func (t *HomieGenerator) Tag(key string) string {
tagString, _ := t.metric.GetTag(key)
return tagString
}
func (t *HomieGenerator) Generate(m telegraf.Metric) (string, error) {
t.PluginName = m.Name()
t.metric = m
var b strings.Builder
if err := t.template.Execute(&b, t); err != nil {
return "", err
}
result := b.String()
if strings.Contains(result, "/") {
return "", errors.New("cannot contain /")
}
return result, nil
}

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/mqtt"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
@ -19,17 +20,29 @@ import (
//go:embed sample.conf
var sampleConfig string
type message struct {
topic string
payload []byte
}
type MQTT struct {
TopicPrefix string `toml:"topic_prefix" deprecated:"1.25.0;use 'topic' instead"`
Topic string `toml:"topic"`
BatchMessage bool `toml:"batch"`
Log telegraf.Logger `toml:"-"`
TopicPrefix string `toml:"topic_prefix" deprecated:"1.25.0;use 'topic' instead"`
Topic string `toml:"topic"`
BatchMessage bool `toml:"batch" deprecated:"1.25.2;use 'layout = \"batch\"' instead"`
Layout string `toml:"layout"`
HomieDeviceName string `toml:"homie_device_name"`
HomieNodeID string `toml:"homie_node_id"`
Log telegraf.Logger `toml:"-"`
mqtt.MqttConfig
client mqtt.Client
serializer serializers.Serializer
generator *TopicNameGenerator
homieDeviceNameGenerator *HomieGenerator
homieNodeIDGenerator *HomieGenerator
homieSeen map[string]map[string]bool
sync.Mutex
}
@ -45,20 +58,56 @@ func (m *MQTT) Init() error {
if m.PersistentSession && m.ClientID == "" {
return errors.New("persistent_session requires client_id")
}
if m.QoS > 2 || m.QoS < 0 {
return fmt.Errorf("qos value must be 0, 1, or 2: %d", m.QoS)
}
var err error
m.generator, err = NewTopicNameGenerator(m.TopicPrefix, m.Topic)
return err
if err != nil {
return err
}
switch m.Layout {
case "":
// For backward compatibility
if m.BatchMessage {
m.Layout = "batch"
} else {
m.Layout = "non-batch"
}
case "non-batch", "batch", "field":
case "homie-v4":
if m.HomieDeviceName == "" {
return errors.New("missing 'homie_device_name' option")
}
m.homieDeviceNameGenerator, err = NewHomieGenerator(m.HomieDeviceName)
if err != nil {
return fmt.Errorf("creating device name generator failed: %w", err)
}
if m.HomieNodeID == "" {
return errors.New("missing 'homie_node_id' option")
}
m.homieNodeIDGenerator, err = NewHomieGenerator(m.HomieNodeID)
if err != nil {
return fmt.Errorf("creating node ID name generator failed: %w", err)
}
default:
return fmt.Errorf("invalid layout %q", m.Layout)
}
return nil
}
func (m *MQTT) Connect() error {
m.Lock()
defer m.Unlock()
m.homieSeen = make(map[string]map[string]bool)
client, err := mqtt.NewClient(&m.MqttConfig)
if err != nil {
return err
@ -74,6 +123,18 @@ func (m *MQTT) SetSerializer(serializer serializers.Serializer) {
}
func (m *MQTT) Close() error {
// Unregister devices if Homie layout was used. Usually we should do this
// using a "will" message, but this can only be done at connect time where,
// due to the dynamic nature of Telegraf messages, we do not know the topics
// to issue that "will" yet.
if len(m.homieSeen) > 0 {
for topic := range m.homieSeen {
// We will ignore potential errors as we cannot do anything here
_ = m.client.Publish(topic+"/$state", []byte("lost"))
}
// Give the messages some time to settle
time.Sleep(100 * time.Millisecond)
}
return m.client.Close()
}
@ -88,44 +149,149 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
if !ok {
hostname = ""
}
metricsmap := make(map[string][]telegraf.Metric)
for _, metric := range metrics {
topic, err := m.generator.Generate(hostname, metric)
if err != nil {
return fmt.Errorf("topic name couldn't be generated due to error: %w", err)
}
if m.BatchMessage {
metricsmap[topic] = append(metricsmap[topic], metric)
} else {
buf, err := m.serializer.Serialize(metric)
if err != nil {
m.Log.Debugf("Could not serialize metric: %v", err)
continue
}
err = m.client.Publish(topic, buf)
if err != nil {
return fmt.Errorf("could not write to MQTT server: %w", err)
}
}
// Group the metrics to topics and serialize them
var topicMessages []message
switch m.Layout {
case "batch":
topicMessages = m.collectBatch(hostname, metrics)
case "non-batch":
topicMessages = m.collectNonBatch(hostname, metrics)
case "field":
topicMessages = m.collectField(hostname, metrics)
case "homie-v4":
topicMessages = m.collectHomieV4(hostname, metrics)
default:
return fmt.Errorf("unknown layout %q", m.Layout)
}
for key := range metricsmap {
buf, err := m.serializer.SerializeBatch(metricsmap[key])
if err != nil {
return err
}
err = m.client.Publish(key, buf)
if err != nil {
return fmt.Errorf("could not write to MQTT server: %w", err)
for _, msg := range topicMessages {
if err := m.client.Publish(msg.topic, msg.payload); err != nil {
m.Log.Warn("Could not publish message to MQTT server, %s", err)
}
}
return nil
}
func (m *MQTT) collectNonBatch(hostname string, metrics []telegraf.Metric) []message {
collection := make([]message, 0, len(metrics))
for _, metric := range metrics {
topic, err := m.generator.Generate(hostname, metric)
if err != nil {
m.Log.Warnf("Generating topic name failed: %w", err)
m.Log.Debugf("metric was: %v", metric)
continue
}
buf, err := m.serializer.Serialize(metric)
if err != nil {
m.Log.Warnf("Could not serialize metric for topic %q: %v", topic, err)
m.Log.Debugf("metric was: %v", metric)
continue
}
collection = append(collection, message{topic, buf})
}
return collection
}
func (m *MQTT) collectBatch(hostname string, metrics []telegraf.Metric) []message {
metricsCollection := make(map[string][]telegraf.Metric)
for _, metric := range metrics {
topic, err := m.generator.Generate(hostname, metric)
if err != nil {
m.Log.Warnf("Generating topic name failed: %w", err)
m.Log.Debugf("metric was: %v", metric)
continue
}
metricsCollection[topic] = append(metricsCollection[topic], metric)
}
collection := make([]message, 0, len(metricsCollection))
for topic, ms := range metricsCollection {
buf, err := m.serializer.SerializeBatch(ms)
if err != nil {
m.Log.Warnf("Could not serialize metric batch for topic %q: %v", topic, err)
continue
}
collection = append(collection, message{topic, buf})
}
return collection
}
func (m *MQTT) collectField(hostname string, metrics []telegraf.Metric) []message {
var collection []message
for _, metric := range metrics {
topic, err := m.generator.Generate(hostname, metric)
if err != nil {
m.Log.Warnf("Generating topic name failed: %w", err)
m.Log.Debugf("metric was: %v", metric)
continue
}
for n, v := range metric.Fields() {
buf, err := internal.ToString(v)
if err != nil {
m.Log.Warnf("Could not serialize metric for topic %q field %q: %v", topic, n, err)
m.Log.Debugf("metric was: %v", metric)
continue
}
collection = append(collection, message{topic + "/" + n, []byte(buf)})
}
}
return collection
}
func (m *MQTT) collectHomieV4(hostname string, metrics []telegraf.Metric) []message {
var collection []message
for _, metric := range metrics {
topic, err := m.generator.Generate(hostname, metric)
if err != nil {
m.Log.Warnf("Generating topic name failed: %w", err)
m.Log.Debugf("metric was: %v", metric)
continue
}
msgs, nodeID, err := m.collectHomieDeviceMessages(topic, metric)
if err != nil {
m.Log.Warnf(err.Error())
m.Log.Debugf("metric was: %v", metric)
continue
}
path := topic + "/" + nodeID
collection = append(collection, msgs...)
for _, tag := range metric.TagList() {
if err != nil {
m.Log.Warnf("Could not serialize metric for topic %q tag %q: %v", topic, tag.Key, err)
m.Log.Debugf("metric was: %v", metric)
continue
}
propID := normalizeID(tag.Key)
collection = append(collection, message{path + "/" + propID, []byte(tag.Value)})
collection = append(collection, message{path + "/" + propID + "/$name", []byte(tag.Key)})
collection = append(collection, message{path + "/" + propID + "/$datatype", []byte("string")})
}
for _, field := range metric.FieldList() {
v, dt, err := convertType(field.Value)
if err != nil {
m.Log.Warnf("Could not serialize metric for topic %q field %q: %v", topic, field.Key, err)
m.Log.Debugf("metric was: %v", metric)
continue
}
propID := normalizeID(field.Key)
collection = append(collection, message{path + "/" + propID, []byte(v)})
collection = append(collection, message{path + "/" + propID + "/$name", []byte(field.Key)})
collection = append(collection, message{path + "/" + propID + "/$datatype", []byte(dt)})
}
}
return collection
}
func init() {
outputs.Add("mqtt", func() telegraf.Output {
return &MQTT{

View File

@ -3,6 +3,7 @@ package mqtt
import (
"fmt"
"path/filepath"
"sync"
"testing"
"time"
@ -15,6 +16,7 @@ import (
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/mqtt"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
)
@ -161,26 +163,16 @@ func TestIntegrationMQTTv3(t *testing.T) {
Timeout: config.Duration(5 * time.Second),
AutoReconnect: true,
},
Topic: topic + "/{{.PluginName}}",
Log: testutil.Logger{Name: "mqttv3-integration-test"},
Topic: topic + "/{{.PluginName}}",
Layout: "non-batch",
Log: testutil.Logger{Name: "mqttv3-integration-test"},
}
plugin.SetSerializer(serializer)
require.NoError(t, plugin.Init())
// Prepare the receiver message
var acc testutil.Accumulator
onMessage := func(_ paho.Client, msg paho.Message) {
metrics, err := parser.Parse(msg.Payload())
if err != nil {
acc.AddError(err)
return
}
for _, m := range metrics {
m.AddTag("topic", msg.Topic())
acc.AddMetric(m)
}
}
onMessage := createMetricMessageHandler(&acc, parser)
// Startup the plugin and subscribe to the topic
require.NoError(t, plugin.Connect())
@ -286,6 +278,540 @@ func TestMQTTv5Properties(t *testing.T) {
}
}
func TestIntegrationMQTTLayoutNonBatch(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
BindMounts: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Setup the parser / serializer pair
parser := &influx.Parser{}
require.NoError(t, parser.Init())
serializer := serializers.NewInfluxSerializer()
// Setup the plugin
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "test_nonbatch"
plugin := &MQTT{
MqttConfig: mqtt.MqttConfig{
Servers: []string{url},
KeepAlive: 30,
Timeout: config.Duration(5 * time.Second),
AutoReconnect: true,
},
Topic: topic + "/{{.PluginName}}",
Layout: "non-batch",
Log: testutil.Logger{Name: "mqttv3-integration-test"},
}
plugin.SetSerializer(serializer)
require.NoError(t, plugin.Init())
// Prepare the receiver message
var acc testutil.Accumulator
onMessage := createMetricMessageHandler(&acc, parser)
// Startup the plugin and subscribe to the topic
require.NoError(t, plugin.Connect())
defer plugin.Close()
// Add routing for the messages
subscriptionPattern := topic + "/#"
plugin.client.AddRoute(subscriptionPattern, onMessage)
// Subscribe to the topic
topics := map[string]byte{subscriptionPattern: byte(plugin.QoS)}
require.NoError(t, plugin.client.SubscribeMultiple(topics, onMessage))
// Setup and execute the test case
input := make([]telegraf.Metric, 0, 3)
expected := make([]telegraf.Metric, 0, len(input))
for i := 0; i < cap(input); i++ {
name := fmt.Sprintf("test%d", i)
m := metric.New(
name,
map[string]string{"case": "mqtt"},
map[string]interface{}{"value": i},
time.Unix(1676470949, 0),
)
input = append(input, m)
e := m.Copy()
e.AddTag("topic", topic+"/"+name)
expected = append(expected, e)
}
require.NoError(t, plugin.Write(input))
// Verify the result
require.Eventually(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond)
require.NoError(t, plugin.Close())
require.Empty(t, acc.Errors)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestIntegrationMQTTLayoutBatch(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
BindMounts: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Setup the parser / serializer pair
parser := &influx.Parser{}
require.NoError(t, parser.Init())
serializer := serializers.NewInfluxSerializer()
// Setup the plugin
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "test_batch"
plugin := &MQTT{
MqttConfig: mqtt.MqttConfig{
Servers: []string{url},
KeepAlive: 30,
Timeout: config.Duration(5 * time.Second),
AutoReconnect: true,
},
Topic: topic + "/{{.PluginName}}",
Layout: "batch",
Log: testutil.Logger{Name: "mqttv3-integration-test-"},
}
plugin.SetSerializer(serializer)
require.NoError(t, plugin.Init())
// Prepare the receiver message
var acc testutil.Accumulator
onMessage := createMetricMessageHandler(&acc, parser)
// Startup the plugin and subscribe to the topic
require.NoError(t, plugin.Connect())
defer plugin.Close()
// Add routing for the messages
subscriptionPattern := topic + "/#"
plugin.client.AddRoute(subscriptionPattern, onMessage)
// Subscribe to the topic
topics := map[string]byte{subscriptionPattern: byte(plugin.QoS)}
require.NoError(t, plugin.client.SubscribeMultiple(topics, onMessage))
// Setup and execute the test case
input := make([]telegraf.Metric, 0, 6)
expected := make([]telegraf.Metric, 0, len(input))
for i := 0; i < cap(input); i++ {
name := fmt.Sprintf("test%d", i%3)
m := metric.New(
name,
map[string]string{
"case": "mqtt",
"id": fmt.Sprintf("test%d", i),
},
map[string]interface{}{"value": i},
time.Unix(1676470949, 0),
)
input = append(input, m)
e := m.Copy()
e.AddTag("topic", topic+"/"+name)
expected = append(expected, e)
}
require.NoError(t, plugin.Write(input))
// Verify the result
require.Eventually(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, time.Second, 100*time.Millisecond)
require.NoError(t, plugin.Close())
require.Empty(t, acc.Errors)
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.SortMetrics())
}
func TestIntegrationMQTTLayoutField(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
BindMounts: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Setup the plugin
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "test_field"
plugin := &MQTT{
MqttConfig: mqtt.MqttConfig{
Servers: []string{url},
KeepAlive: 30,
Timeout: config.Duration(5 * time.Second),
AutoReconnect: true,
},
Topic: topic + `/{{.PluginName}}/{{.Tag "source"}}`,
Layout: "field",
Log: testutil.Logger{Name: "mqttv3-integration-test-"},
}
require.NoError(t, plugin.Init())
// Startup the plugin and subscribe to the topic
require.NoError(t, plugin.Connect())
defer plugin.Close()
// Prepare the message receiver
var received []message
var mtx sync.Mutex
onMessage := func(_ paho.Client, msg paho.Message) {
mtx.Lock()
defer mtx.Unlock()
received = append(received, message{msg.Topic(), msg.Payload()})
}
// Add routing for the messages
subscriptionPattern := topic + "/#"
plugin.client.AddRoute(subscriptionPattern, onMessage)
// Subscribe to the topic
topics := map[string]byte{subscriptionPattern: byte(plugin.QoS)}
require.NoError(t, plugin.client.SubscribeMultiple(topics, onMessage))
// Setup and execute the test case
input := []telegraf.Metric{
metric.New(
"modbus",
map[string]string{
"source": "device 1",
"type": "Machine A",
"location": "main building",
"status": "ok",
},
map[string]interface{}{
"temperature": 21.4,
"serial number": "324nlk234r5u9834t",
"working hours": 123,
"supplied": true,
},
time.Unix(1676522982, 0),
),
metric.New(
"modbus",
map[string]string{
"source": "device 2",
"type": "Machine B",
"location": "main building",
"status": "offline",
},
map[string]interface{}{
"temperature": 25.0,
"supplied": false,
},
time.Unix(1676522982, 0),
),
}
expected := []string{
topic + "/modbus/device 1/temperature" + " " + "21.4",
topic + "/modbus/device 1/serial number" + " " + "324nlk234r5u9834t",
topic + "/modbus/device 1/supplied" + " " + "true",
topic + "/modbus/device 1/working hours" + " " + "123",
topic + "/modbus/device 2/temperature" + " " + "25",
topic + "/modbus/device 2/supplied" + " " + "false",
}
require.NoError(t, plugin.Write(input))
// Verify the result
require.Eventually(t, func() bool {
mtx.Lock()
defer mtx.Unlock()
return len(received) >= len(expected)
}, time.Second, 100*time.Millisecond)
require.NoError(t, plugin.Close())
actual := make([]string, 0, len(received))
for _, msg := range received {
actual = append(actual, msg.topic+" "+string(msg.payload))
}
require.ElementsMatch(t, expected, actual)
}
func TestIntegrationMQTTLayoutHomieV4(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
BindMounts: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Setup the parser / serializer pair
parser := &value.Parser{
MetricName: "test",
DataType: "auto",
}
require.NoError(t, parser.Init())
// Setup the plugin
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "homie"
plugin := &MQTT{
MqttConfig: mqtt.MqttConfig{
Servers: []string{url},
KeepAlive: 30,
Timeout: config.Duration(5 * time.Second),
AutoReconnect: true,
},
Topic: topic + "/{{.PluginName}}",
HomieDeviceName: `{{.PluginName}}`,
HomieNodeID: `{{.Tag "source"}}`,
Layout: "homie-v4",
Log: testutil.Logger{Name: "mqttv3-integration-test-"},
}
require.NoError(t, plugin.Init())
// Startup the plugin and subscribe to the topic
require.NoError(t, plugin.Connect())
defer plugin.Close()
// Prepare the message receiver
var received []message
var mtx sync.Mutex
onMessage := func(_ paho.Client, msg paho.Message) {
mtx.Lock()
defer mtx.Unlock()
received = append(received, message{msg.Topic(), msg.Payload()})
}
// Add routing for the messages
subscriptionPattern := topic + "/#"
plugin.client.AddRoute(subscriptionPattern, onMessage)
// Subscribe to the topic
topics := map[string]byte{subscriptionPattern: byte(plugin.QoS)}
require.NoError(t, plugin.client.SubscribeMultiple(topics, onMessage))
// Setup and execute the test case
input := []telegraf.Metric{
metric.New(
"modbus",
map[string]string{
"source": "device 1",
"type": "Machine A",
"location": "main building",
"status": "ok",
},
map[string]interface{}{
"temperature": 21.4,
"serial number": "324nlk234r5u9834t",
"working hours": 123,
"supplied": true,
},
time.Unix(1676522982, 0),
),
metric.New(
"modbus",
map[string]string{
"source": "device 2",
"type": "Machine B",
"location": "main building",
"status": "offline",
},
map[string]interface{}{
"supplied": false,
},
time.Unix(1676522982, 0),
),
metric.New(
"modbus",
map[string]string{
"source": "device 2",
"type": "Machine B",
"location": "main building",
"status": "online",
"in operation": "yes",
},
map[string]interface{}{
"Temperature": 25.38,
"Voltage": 24.1,
"Current": 100.0,
"Throughput": 12345,
"Load [%]": 81.2,
"account no": "T3L3GrAf",
"supplied": true,
},
time.Unix(1676542982, 0),
),
}
dev1Props := "location,serial-number,source,status,supplied,temperature,type,working-hours"
dev2Props := "account-no,current,in-operation,load,location,source,status,supplied,temperature,"
dev2Props += "throughput,type,voltage"
expected := []string{
topic + "/modbus/$homie" + " " + "4.0",
topic + "/modbus/$name" + " " + "modbus",
topic + "/modbus/$state" + " " + "ready",
topic + "/modbus/$nodes" + " " + "device-1",
topic + "/modbus/device-1/$name" + " " + "device 1",
topic + "/modbus/device-1/$properties" + " " + dev1Props,
topic + "/modbus/device-1/location" + " " + "main building",
topic + "/modbus/device-1/location/$name" + " " + "location",
topic + "/modbus/device-1/location/$datatype" + " " + "string",
topic + "/modbus/device-1/status" + " " + "ok",
topic + "/modbus/device-1/status/$name" + " " + "status",
topic + "/modbus/device-1/status/$datatype" + " " + "string",
topic + "/modbus/device-1/type" + " " + "Machine A",
topic + "/modbus/device-1/type/$name" + " " + "type",
topic + "/modbus/device-1/type/$datatype" + " " + "string",
topic + "/modbus/device-1/source" + " " + "device 1",
topic + "/modbus/device-1/source/$name" + " " + "source",
topic + "/modbus/device-1/source/$datatype" + " " + "string",
topic + "/modbus/device-1/temperature" + " " + "21.4",
topic + "/modbus/device-1/temperature/$name" + " " + "temperature",
topic + "/modbus/device-1/temperature/$datatype" + " " + "float",
topic + "/modbus/device-1/serial-number" + " " + "324nlk234r5u9834t",
topic + "/modbus/device-1/serial-number/$name" + " " + "serial number",
topic + "/modbus/device-1/serial-number/$datatype" + " " + "string",
topic + "/modbus/device-1/working-hours" + " " + "123",
topic + "/modbus/device-1/working-hours/$name" + " " + "working hours",
topic + "/modbus/device-1/working-hours/$datatype" + " " + "integer",
topic + "/modbus/device-1/supplied" + " " + "true",
topic + "/modbus/device-1/supplied/$name" + " " + "supplied",
topic + "/modbus/device-1/supplied/$datatype" + " " + "boolean",
topic + "/modbus/$nodes" + " " + "device-1,device-2",
topic + "/modbus/device-2/$name" + " " + "device 2",
topic + "/modbus/device-2/$properties" + " " + "location,source,status,supplied,type",
topic + "/modbus/device-2/location" + " " + "main building",
topic + "/modbus/device-2/location/$name" + " " + "location",
topic + "/modbus/device-2/location/$datatype" + " " + "string",
topic + "/modbus/device-2/status" + " " + "offline",
topic + "/modbus/device-2/status/$name" + " " + "status",
topic + "/modbus/device-2/status/$datatype" + " " + "string",
topic + "/modbus/device-2/type" + " " + "Machine B",
topic + "/modbus/device-2/type/$name" + " " + "type",
topic + "/modbus/device-2/type/$datatype" + " " + "string",
topic + "/modbus/device-2/source" + " " + "device 2",
topic + "/modbus/device-2/source/$name" + " " + "source",
topic + "/modbus/device-2/source/$datatype" + " " + "string",
topic + "/modbus/device-2/supplied" + " " + "false",
topic + "/modbus/device-2/supplied/$name" + " " + "supplied",
topic + "/modbus/device-2/supplied/$datatype" + " " + "boolean",
topic + "/modbus/device-2/$properties" + " " + dev2Props,
topic + "/modbus/device-2/location" + " " + "main building",
topic + "/modbus/device-2/location/$name" + " " + "location",
topic + "/modbus/device-2/location/$datatype" + " " + "string",
topic + "/modbus/device-2/in-operation" + " " + "yes",
topic + "/modbus/device-2/in-operation/$name" + " " + "in operation",
topic + "/modbus/device-2/in-operation/$datatype" + " " + "string",
topic + "/modbus/device-2/status" + " " + "online",
topic + "/modbus/device-2/status/$name" + " " + "status",
topic + "/modbus/device-2/status/$datatype" + " " + "string",
topic + "/modbus/device-2/type" + " " + "Machine B",
topic + "/modbus/device-2/type/$name" + " " + "type",
topic + "/modbus/device-2/type/$datatype" + " " + "string",
topic + "/modbus/device-2/source" + " " + "device 2",
topic + "/modbus/device-2/source/$name" + " " + "source",
topic + "/modbus/device-2/source/$datatype" + " " + "string",
topic + "/modbus/device-2/temperature" + " " + "25.38",
topic + "/modbus/device-2/temperature/$name" + " " + "Temperature",
topic + "/modbus/device-2/temperature/$datatype" + " " + "float",
topic + "/modbus/device-2/voltage" + " " + "24.1",
topic + "/modbus/device-2/voltage/$name" + " " + "Voltage",
topic + "/modbus/device-2/voltage/$datatype" + " " + "float",
topic + "/modbus/device-2/current" + " " + "100",
topic + "/modbus/device-2/current/$name" + " " + "Current",
topic + "/modbus/device-2/current/$datatype" + " " + "float",
topic + "/modbus/device-2/throughput" + " " + "12345",
topic + "/modbus/device-2/throughput/$name" + " " + "Throughput",
topic + "/modbus/device-2/throughput/$datatype" + " " + "integer",
topic + "/modbus/device-2/load" + " " + "81.2",
topic + "/modbus/device-2/load/$name" + " " + "Load [%]",
topic + "/modbus/device-2/load/$datatype" + " " + "float",
topic + "/modbus/device-2/account-no" + " " + "T3L3GrAf",
topic + "/modbus/device-2/account-no/$name" + " " + "account no",
topic + "/modbus/device-2/account-no/$datatype" + " " + "string",
topic + "/modbus/device-2/supplied" + " " + "true",
topic + "/modbus/device-2/supplied/$name" + " " + "supplied",
topic + "/modbus/device-2/supplied/$datatype" + " " + "boolean",
topic + "/modbus/$state" + " " + "lost",
}
require.NoError(t, plugin.Write(input))
require.NoError(t, plugin.Close())
// Verify the result
require.Eventually(t, func() bool {
mtx.Lock()
defer mtx.Unlock()
return len(received) >= len(expected)
}, time.Second, 100*time.Millisecond)
actual := make([]string, 0, len(received))
for _, msg := range received {
actual = append(actual, msg.topic+" "+string(msg.payload))
}
require.ElementsMatch(t, expected, actual)
}
func createMetricMessageHandler(acc telegraf.Accumulator, parser telegraf.Parser) paho.MessageHandler {
return func(_ paho.Client, msg paho.Message) {
metrics, err := parser.Parse(msg.Payload())
if err != nil {
acc.AddError(err)
return
}
for _, m := range metrics {
m.AddTag("topic", msg.Topic())
acc.AddMetric(m)
}
}
}
func TestMissingServers(t *testing.T) {
plugin := &MQTT{}
require.ErrorContains(t, plugin.Init(), "no servers specified")

View File

@ -60,12 +60,31 @@
## When true, metrics will be sent in one MQTT message per flush. Otherwise,
## metrics are written one metric per MQTT message.
## DEPRECATED: Use layout option instead
# batch = false
## When true, metric will have RETAIN flag set, making broker cache entries until someone
## actually reads it
# retain = false
## Layout of the topics published.
## The following choices are available:
## non-batch -- send individual messages, one for each metric
## batch -- send all metric as a single message per MQTT topic
## NOTE: The following options will ignore the 'data_format' option and send single values
## field -- send individual messages for each field, appending its name to the metric topic
## homie-v4 -- send metrics with fields and tags according to the 4.0.0 specs
## see https://homieiot.github.io/specification/
# layout = "non-batch"
## HOMIE specific settings
## The following options provide templates for setting the device name
## and the node-ID for the topics. Both options are MANDATORY and can contain
## {{ .PluginName }} (metric name), {{ .Tag "key"}} (tag reference to 'key')
## or constant strings. The templays MAY NOT contain slashes!
# homie_device_name = ""
# homie_node_id = ""
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md