feat(outputs.mqtt): enhance routing capabilities (#12224)

This commit is contained in:
Sebastian Machuca 2023-01-18 05:45:45 +11:00 committed by GitHub
parent a586101d84
commit 74ed28938a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 202 additions and 28 deletions

View File

@ -39,8 +39,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## MQTT Topic for Producer Messages
## MQTT outputs send metrics to this topic format:
## <topic_prefix>/<hostname>/<pluginname>/ (e.g. prefix/web01.example.com/mem)
topic_prefix = "telegraf"
## {{ .TopicPrefix }}/{{ .Hostname }}/{{ .PluginName }}/{{ .Tag "tag_key" }}
## (e.g. prefix/web01.example.com/mem/some_tag_value)
## Each path segment accepts either a template placeholder, an environment variable, or a tag key
## of the form `{{.Tag "tag_key_name"]]`. Empty path elements as well as special MQTT characters
## (such as `+` or `#`) are invalid to form the topic name and will lead to an error.
## In case a tag is missing in the metric, that path segment omitted for the final topic.
topic = "telegraf/{{ .Hostname }}/{{ .PluginName }}"
## QoS policy for messages
## The mqtt QoS policy for sending messages.

View File

@ -30,7 +30,8 @@ type MQTT struct {
Password string `toml:"password"`
Database string
Timeout config.Duration `toml:"timeout"`
TopicPrefix string `toml:"topic_prefix"`
TopicPrefix string `toml:"topic_prefix" deprecated:"1.25.0;use 'topic' instead"`
Topic string `toml:"topic"`
QoS int `toml:"qos"`
ClientID string `toml:"client_id"`
tls.ClientConfig
@ -41,6 +42,7 @@ type MQTT struct {
client Client
serializer serializers.Serializer
generator *TopicNameGenerator
sync.Mutex
}
@ -58,6 +60,15 @@ func (*MQTT) SampleConfig() string {
return sampleConfig
}
func (m *MQTT) Init() error {
var err error
m.generator, err = NewTopicNameGenerator(m.TopicPrefix, m.Topic)
if err != nil {
return err
}
return nil
}
func (m *MQTT) Connect() error {
m.Lock()
defer m.Unlock()
@ -91,24 +102,17 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
hostname, ok := metrics[0].Tags()["host"]
if !ok {
hostname = ""
}
metricsmap := make(map[string][]telegraf.Metric)
for _, metric := range metrics {
var t []string
if m.TopicPrefix != "" {
t = append(t, m.TopicPrefix)
topic, err := m.generator.Generate(hostname, metric)
if err != nil {
return fmt.Errorf("topic name couldn't be generated due to error: %w", err)
}
if hostname != "" {
t = append(t, hostname)
}
t = append(t, metric.Name())
topic := strings.Join(t, "/")
if m.BatchMessage {
metricsmap[topic] = append(metricsmap[topic], metric)

View File

@ -2,8 +2,10 @@ package mqtt
import (
"fmt"
"github.com/influxdata/telegraf/metric"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
@ -49,12 +51,13 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
// Verify that we can connect to the MQTT broker
err := m.Connect()
require.NoError(t, err)
require.NoError(t, m.Init())
// Verify that we can connect to the MQTT broker
require.NoError(t, m.Connect())
// Verify that we can successfully write data to the mqtt broker
err = m.Write(testutil.MockMetrics())
require.NoError(t, err)
require.NoError(t, m.Write(testutil.MockMetrics()))
}
func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) {
@ -76,12 +79,13 @@ func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) {
}
// Verify that we can connect to the MQTT broker
err := m.Connect()
require.NoError(t, err)
require.NoError(t, m.Init())
// Verify that we can connect to the MQTT broker
require.NoError(t, m.Connect())
// Verify that we can successfully write data to the mqtt broker
err = m.Write(testutil.MockMetrics())
require.NoError(t, err)
require.NoError(t, m.Write(testutil.MockMetrics()))
}
func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
@ -103,10 +107,108 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
}
// Verify that we can connect to the MQTT broker
err := m.Connect()
require.NoError(t, err)
require.NoError(t, m.Init())
require.NoError(t, m.Connect())
// Verify that we can successfully write data to the mqtt broker
err = m.Write(testutil.MockMetrics())
require.NoError(t, err)
require.NoError(t, m.Write(testutil.MockMetrics()))
}
func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) {
tests := []struct {
name string
topic string
expectedError string
}{
{
name: "a valid pattern is accepted",
topic: "this/is/valid",
expectedError: "",
},
{
name: "an invalid pattern is rejected",
topic: "this/is/#/invalid",
expectedError: "found forbidden character # in the topic name this/is/#/invalid",
},
{
name: "an invalid pattern is rejected",
topic: "this/is/+/invalid",
expectedError: "found forbidden character + in the topic name this/is/+/invalid",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &MQTT{
Topic: tt.topic,
}
err := m.Init()
if tt.expectedError != "" {
require.ErrorContains(t, err, tt.expectedError)
} else {
require.NoError(t, err)
}
})
}
}
func TestGenerateTopicName(t *testing.T) {
s := serializers.NewInfluxSerializer()
m := &MQTT{
Servers: []string{"tcp://localhost:502"},
serializer: s,
KeepAlive: 30,
Log: testutil.Logger{},
}
tests := []struct {
name string
pattern string
want string
}{
{
name: "matches default legacy format",
pattern: "telegraf/{{ .Hostname }}/{{ .PluginName }}",
want: "telegraf/hostname/metric-name",
},
{
name: "respect hardcoded strings",
pattern: "this/is/a/topic",
want: "this/is/a/topic",
},
{
name: "allows the use of tags",
pattern: "{{ .TopicPrefix }}/{{ .Tag \"tag1\" }}",
want: "prefix/value1",
},
{
name: "uses the plugin name when no pattern is provided",
pattern: "",
want: "metric-name",
},
{
name: "ignores tag when tag does not exists",
pattern: "{{ .TopicPrefix }}/{{ .Tag \"not-a-tag\" }}",
want: "prefix",
},
{
name: "ignores empty forward slashes",
pattern: "double//slashes//are//ignored",
want: "double/slashes/are/ignored",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m.Topic = tt.pattern
m.TopicPrefix = "prefix"
met := metric.New(
"metric-name",
map[string]string{"tag1": "value1"},
map[string]interface{}{"value": 123},
time.Date(2022, time.November, 10, 23, 0, 0, 0, time.UTC),
)
require.NoError(t, m.Init())
actual, err := m.generator.Generate("hostname", met)
require.NoError(t, err)
require.Equal(t, tt.want, actual)
})
}
}

View File

@ -13,8 +13,13 @@
## MQTT Topic for Producer Messages
## MQTT outputs send metrics to this topic format:
## <topic_prefix>/<hostname>/<pluginname>/ (e.g. prefix/web01.example.com/mem)
topic_prefix = "telegraf"
## {{ .TopicPrefix }}/{{ .Hostname }}/{{ .PluginName }}/{{ .Tag "tag_key" }}
## (e.g. prefix/web01.example.com/mem/some_tag_value)
## Each path segment accepts either a template placeholder, an environment variable, or a tag key
## of the form `{{.Tag "tag_key_name"]]`. Empty path elements as well as special MQTT characters
## (such as `+` or `#`) are invalid to form the topic name and will lead to an error.
## In case a tag is missing in the metric, that path segment omitted for the final topic.
topic = "telegraf/{{ .Hostname }}/{{ .PluginName }}"
## QoS policy for messages
## The mqtt QoS policy for sending messages.

View File

@ -0,0 +1,58 @@
package mqtt
import (
"fmt"
"strings"
"text/template"
"github.com/influxdata/telegraf"
)
type TopicNameGenerator struct {
Hostname string
TopicPrefix string
PluginName string
metric telegraf.Metric
template *template.Template
}
func NewTopicNameGenerator(topicPrefix string, topic string) (*TopicNameGenerator, error) {
tt, err := template.New("topic_name").Parse(topic)
if err != nil {
return nil, err
}
for _, p := range strings.Split(topic, "/") {
if strings.ContainsAny(p, "#+") {
return nil, fmt.Errorf("found forbidden character %s in the topic name %s", p, topic)
}
}
return &TopicNameGenerator{TopicPrefix: topicPrefix, template: tt}, nil
}
func (t *TopicNameGenerator) Tag(key string) string {
tagString, _ := t.metric.GetTag(key)
return tagString
}
func (t *TopicNameGenerator) Generate(hostname string, m telegraf.Metric) (string, error) {
t.Hostname = hostname
t.metric = m
t.PluginName = m.Name()
var b strings.Builder
err := t.template.Execute(&b, t)
if err != nil {
return "", err
}
var ts []string
for _, p := range strings.Split(b.String(), "/") {
if p != "" {
ts = append(ts, p)
}
}
topic := strings.Join(ts, "/")
// This is to keep backward compatibility with previous behaviour where the plugin name was always present
if topic == "" {
return m.Name(), nil
}
return topic, nil
}