diff --git a/plugins/outputs/mqtt/README.md b/plugins/outputs/mqtt/README.md index 789bae4eb..5bb4a86c9 100644 --- a/plugins/outputs/mqtt/README.md +++ b/plugins/outputs/mqtt/README.md @@ -39,8 +39,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## MQTT Topic for Producer Messages ## MQTT outputs send metrics to this topic format: - ## /// (e.g. prefix/web01.example.com/mem) - topic_prefix = "telegraf" + ## {{ .TopicPrefix }}/{{ .Hostname }}/{{ .PluginName }}/{{ .Tag "tag_key" }} + ## (e.g. prefix/web01.example.com/mem/some_tag_value) + ## Each path segment accepts either a template placeholder, an environment variable, or a tag key + ## of the form `{{.Tag "tag_key_name"]]`. Empty path elements as well as special MQTT characters + ## (such as `+` or `#`) are invalid to form the topic name and will lead to an error. + ## In case a tag is missing in the metric, that path segment omitted for the final topic. + topic = "telegraf/{{ .Hostname }}/{{ .PluginName }}" ## QoS policy for messages ## The mqtt QoS policy for sending messages. diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index f66f25518..d78cd8887 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -30,7 +30,8 @@ type MQTT struct { Password string `toml:"password"` Database string Timeout config.Duration `toml:"timeout"` - TopicPrefix string `toml:"topic_prefix"` + TopicPrefix string `toml:"topic_prefix" deprecated:"1.25.0;use 'topic' instead"` + Topic string `toml:"topic"` QoS int `toml:"qos"` ClientID string `toml:"client_id"` tls.ClientConfig @@ -41,6 +42,7 @@ type MQTT struct { client Client serializer serializers.Serializer + generator *TopicNameGenerator sync.Mutex } @@ -58,6 +60,15 @@ func (*MQTT) SampleConfig() string { return sampleConfig } +func (m *MQTT) Init() error { + var err error + m.generator, err = NewTopicNameGenerator(m.TopicPrefix, m.Topic) + if err != nil { + return err + } + return nil +} + func (m *MQTT) Connect() error { m.Lock() defer m.Unlock() @@ -91,24 +102,17 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { if len(metrics) == 0 { return nil } + hostname, ok := metrics[0].Tags()["host"] if !ok { hostname = "" } - metricsmap := make(map[string][]telegraf.Metric) - for _, metric := range metrics { - var t []string - if m.TopicPrefix != "" { - t = append(t, m.TopicPrefix) + topic, err := m.generator.Generate(hostname, metric) + if err != nil { + return fmt.Errorf("topic name couldn't be generated due to error: %w", err) } - if hostname != "" { - t = append(t, hostname) - } - - t = append(t, metric.Name()) - topic := strings.Join(t, "/") if m.BatchMessage { metricsmap[topic] = append(metricsmap[topic], metric) diff --git a/plugins/outputs/mqtt/mqtt_test.go b/plugins/outputs/mqtt/mqtt_test.go index 819fce7da..11ed6a9e1 100644 --- a/plugins/outputs/mqtt/mqtt_test.go +++ b/plugins/outputs/mqtt/mqtt_test.go @@ -2,8 +2,10 @@ package mqtt import ( "fmt" + "github.com/influxdata/telegraf/metric" "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" @@ -49,12 +51,13 @@ func TestConnectAndWriteIntegration(t *testing.T) { } // Verify that we can connect to the MQTT broker - err := m.Connect() - require.NoError(t, err) + require.NoError(t, m.Init()) + + // Verify that we can connect to the MQTT broker + require.NoError(t, m.Connect()) // Verify that we can successfully write data to the mqtt broker - err = m.Write(testutil.MockMetrics()) - require.NoError(t, err) + require.NoError(t, m.Write(testutil.MockMetrics())) } func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) { @@ -76,12 +79,13 @@ func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) { } // Verify that we can connect to the MQTT broker - err := m.Connect() - require.NoError(t, err) + require.NoError(t, m.Init()) + + // Verify that we can connect to the MQTT broker + require.NoError(t, m.Connect()) // Verify that we can successfully write data to the mqtt broker - err = m.Write(testutil.MockMetrics()) - require.NoError(t, err) + require.NoError(t, m.Write(testutil.MockMetrics())) } func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) { @@ -103,10 +107,108 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) { } // Verify that we can connect to the MQTT broker - err := m.Connect() - require.NoError(t, err) + require.NoError(t, m.Init()) + require.NoError(t, m.Connect()) // Verify that we can successfully write data to the mqtt broker - err = m.Write(testutil.MockMetrics()) - require.NoError(t, err) + require.NoError(t, m.Write(testutil.MockMetrics())) +} + +func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) { + tests := []struct { + name string + topic string + expectedError string + }{ + { + name: "a valid pattern is accepted", + topic: "this/is/valid", + expectedError: "", + }, + { + name: "an invalid pattern is rejected", + topic: "this/is/#/invalid", + expectedError: "found forbidden character # in the topic name this/is/#/invalid", + }, + { + name: "an invalid pattern is rejected", + topic: "this/is/+/invalid", + expectedError: "found forbidden character + in the topic name this/is/+/invalid", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &MQTT{ + Topic: tt.topic, + } + err := m.Init() + if tt.expectedError != "" { + require.ErrorContains(t, err, tt.expectedError) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestGenerateTopicName(t *testing.T) { + s := serializers.NewInfluxSerializer() + m := &MQTT{ + Servers: []string{"tcp://localhost:502"}, + serializer: s, + KeepAlive: 30, + Log: testutil.Logger{}, + } + tests := []struct { + name string + pattern string + want string + }{ + { + name: "matches default legacy format", + pattern: "telegraf/{{ .Hostname }}/{{ .PluginName }}", + want: "telegraf/hostname/metric-name", + }, + { + name: "respect hardcoded strings", + pattern: "this/is/a/topic", + want: "this/is/a/topic", + }, + { + name: "allows the use of tags", + pattern: "{{ .TopicPrefix }}/{{ .Tag \"tag1\" }}", + want: "prefix/value1", + }, + { + name: "uses the plugin name when no pattern is provided", + pattern: "", + want: "metric-name", + }, + { + name: "ignores tag when tag does not exists", + pattern: "{{ .TopicPrefix }}/{{ .Tag \"not-a-tag\" }}", + want: "prefix", + }, + { + name: "ignores empty forward slashes", + pattern: "double//slashes//are//ignored", + want: "double/slashes/are/ignored", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m.Topic = tt.pattern + m.TopicPrefix = "prefix" + met := metric.New( + "metric-name", + map[string]string{"tag1": "value1"}, + map[string]interface{}{"value": 123}, + time.Date(2022, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + require.NoError(t, m.Init()) + actual, err := m.generator.Generate("hostname", met) + require.NoError(t, err) + require.Equal(t, tt.want, actual) + }) + } } diff --git a/plugins/outputs/mqtt/sample.conf b/plugins/outputs/mqtt/sample.conf index 384564c6a..50d999d42 100644 --- a/plugins/outputs/mqtt/sample.conf +++ b/plugins/outputs/mqtt/sample.conf @@ -13,8 +13,13 @@ ## MQTT Topic for Producer Messages ## MQTT outputs send metrics to this topic format: - ## /// (e.g. prefix/web01.example.com/mem) - topic_prefix = "telegraf" + ## {{ .TopicPrefix }}/{{ .Hostname }}/{{ .PluginName }}/{{ .Tag "tag_key" }} + ## (e.g. prefix/web01.example.com/mem/some_tag_value) + ## Each path segment accepts either a template placeholder, an environment variable, or a tag key + ## of the form `{{.Tag "tag_key_name"]]`. Empty path elements as well as special MQTT characters + ## (such as `+` or `#`) are invalid to form the topic name and will lead to an error. + ## In case a tag is missing in the metric, that path segment omitted for the final topic. + topic = "telegraf/{{ .Hostname }}/{{ .PluginName }}" ## QoS policy for messages ## The mqtt QoS policy for sending messages. diff --git a/plugins/outputs/mqtt/topic_name_generator.go b/plugins/outputs/mqtt/topic_name_generator.go new file mode 100644 index 000000000..cf54eb03e --- /dev/null +++ b/plugins/outputs/mqtt/topic_name_generator.go @@ -0,0 +1,58 @@ +package mqtt + +import ( + "fmt" + "strings" + "text/template" + + "github.com/influxdata/telegraf" +) + +type TopicNameGenerator struct { + Hostname string + TopicPrefix string + PluginName string + metric telegraf.Metric + template *template.Template +} + +func NewTopicNameGenerator(topicPrefix string, topic string) (*TopicNameGenerator, error) { + tt, err := template.New("topic_name").Parse(topic) + if err != nil { + return nil, err + } + for _, p := range strings.Split(topic, "/") { + if strings.ContainsAny(p, "#+") { + return nil, fmt.Errorf("found forbidden character %s in the topic name %s", p, topic) + } + } + return &TopicNameGenerator{TopicPrefix: topicPrefix, template: tt}, nil +} + +func (t *TopicNameGenerator) Tag(key string) string { + tagString, _ := t.metric.GetTag(key) + return tagString +} + +func (t *TopicNameGenerator) Generate(hostname string, m telegraf.Metric) (string, error) { + t.Hostname = hostname + t.metric = m + t.PluginName = m.Name() + var b strings.Builder + err := t.template.Execute(&b, t) + if err != nil { + return "", err + } + var ts []string + for _, p := range strings.Split(b.String(), "/") { + if p != "" { + ts = append(ts, p) + } + } + topic := strings.Join(ts, "/") + // This is to keep backward compatibility with previous behaviour where the plugin name was always present + if topic == "" { + return m.Name(), nil + } + return topic, nil +}