From 548c064cc627f170e4848ea80a16a75aa14637a7 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 5 May 2025 00:27:29 +0200 Subject: [PATCH] chore(outputs.mqtt): Rework templating to use TemplateMetric (#16560) --- plugins/outputs/mqtt/README.md | 20 ++++---- plugins/outputs/mqtt/homie.go | 33 ++----------- plugins/outputs/mqtt/mqtt.go | 46 +++++++++--------- plugins/outputs/mqtt/mqtt_test.go | 21 +++++---- plugins/outputs/mqtt/sample.conf | 8 ++-- plugins/outputs/mqtt/topic_name_generator.go | 49 +++++++++++++++----- 6 files changed, 93 insertions(+), 84 deletions(-) diff --git a/plugins/outputs/mqtt/README.md b/plugins/outputs/mqtt/README.md index c52a1243a..cc5474ff9 100644 --- a/plugins/outputs/mqtt/README.md +++ b/plugins/outputs/mqtt/README.md @@ -52,14 +52,14 @@ to use them. ## MQTT Topic for Producer Messages ## MQTT outputs send metrics to this topic format: - ## {{ .TopicPrefix }}/{{ .Hostname }}/{{ .PluginName }}/{{ .Tag "tag_key" }} + ## prefix/{{ .Tag "host" }}/{{ .Name }}/{{ .Tag "tag_key" }} ## (e.g. prefix/web01.example.com/mem/some_tag_value) ## Each path segment accepts either a template placeholder, an environment variable, or a tag key ## of the form `{{.Tag "tag_key_name"}}`. All the functions provided by the Sprig library ## (http://masterminds.github.io/sprig/) are available. Empty path elements as well as special MQTT ## characters (such as `+` or `#`) are invalid to form the topic name and will lead to an error. ## In case a tag is missing in the metric, that path segment omitted for the final topic. - topic = "telegraf/{{ .Hostname }}/{{ .PluginName }}" + topic = 'telegraf/{{ .Tag "host" }}/{{ .Name }}' ## QoS policy for messages ## The mqtt QoS policy for sending messages. @@ -126,8 +126,8 @@ to use them. ## HOMIE specific settings ## The following options provide templates for setting the device name ## and the node-ID for the topics. Both options are MANDATORY and can contain - ## {{ .PluginName }} (metric name), {{ .Tag "key"}} (tag reference to 'key') - ## or constant strings. The templays MAY NOT contain slashes! + ## {{ .Name }} (metric name), {{ .Tag "key"}} (tag reference to 'key') or + ## constant strings. The templates MAY NOT contain slashes! # homie_device_name = "" # homie_node_id = "" @@ -172,7 +172,7 @@ with configuration ```toml [[outputs.mqtt]] - topic = 'telegraf/{{ .PluginName }}/{{ .Tag "source" }}' + topic = 'telegraf/{{ .Name }}/{{ .Tag "source" }}' layout = "field" ... ``` @@ -199,8 +199,8 @@ used to specify the `device-id` path. The __mandatory__ options `homie_device_name` will specify the content of the `$name` topic of the device, while `homie_node_id` will provide a template for the `node-id` part of the topic. Both options can contain [Go templates][GoTemplates] similar to `topic` -with `{{ .PluginName }}` referencing the metric name and `{{ .Tag "key"}}` -referencing the tag with the name `key`. +with `{{ .Name }}` referencing the metric name and `{{ .Tag "key"}}` referencing +the tag with the name `key`. [Sprig](http://masterminds.github.io/sprig/) helper functions are available. For example writing the metrics @@ -215,11 +215,11 @@ with configuration ```toml [[outputs.mqtt]] - topic = 'telegraf/{{ .PluginName }}' + topic = 'telegraf/{{ .Name }}' layout = "homie-v4" - homie_device_name ='{{.PluginName}} plugin' - homie_node_id = '{{.Tag "source"}}' + homie_device_name ='{{ .Name }} plugin' + homie_node_id = '{{ .Tag "source" }}' ... ``` diff --git a/plugins/outputs/mqtt/homie.go b/plugins/outputs/mqtt/homie.go index 9d250c043..5755f0dc8 100644 --- a/plugins/outputs/mqtt/homie.go +++ b/plugins/outputs/mqtt/homie.go @@ -8,8 +8,6 @@ import ( "strings" "text/template" - "github.com/Masterminds/sprig/v3" - "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" ) @@ -21,7 +19,7 @@ func (m *MQTT) collectHomieDeviceMessages(topic string, metric telegraf.Metric) // Check if the device-id is already registered if _, found := m.homieSeen[topic]; !found { - deviceName, err := m.homieDeviceNameGenerator.Generate(metric) + deviceName, err := homieGenerate(m.homieDeviceNameGenerator, metric) if err != nil { return nil, "", fmt.Errorf("generating device name failed: %w", err) } @@ -34,7 +32,7 @@ func (m *MQTT) collectHomieDeviceMessages(topic string, metric telegraf.Metric) } // Generate the node-ID from the metric and fixup invalid characters - nodeName, err := m.homieNodeIDGenerator.Generate(metric) + nodeName, err := homieGenerate(m.homieNodeIDGenerator, metric) if err != nil { return nil, "", fmt.Errorf("generating device ID failed: %w", err) } @@ -97,32 +95,9 @@ func convertType(value interface{}) (val, dtype string, err error) { return "", "", fmt.Errorf("unknown type %T", value) } -type HomieGenerator struct { - PluginName string - metric telegraf.Metric - template *template.Template -} - -func NewHomieGenerator(tmpl string) (*HomieGenerator, error) { - tt, err := template.New("topic_name").Funcs(sprig.TxtFuncMap()).Parse(tmpl) - if err != nil { - return nil, err - } - - return &HomieGenerator{template: tt}, nil -} - -func (t *HomieGenerator) Tag(key string) string { - tagString, _ := t.metric.GetTag(key) - return tagString -} - -func (t *HomieGenerator) Generate(m telegraf.Metric) (string, error) { - t.PluginName = m.Name() - t.metric = m - +func homieGenerate(t *template.Template, m telegraf.Metric) (string, error) { var b strings.Builder - if err := t.template.Execute(&b, t); err != nil { + if err := t.Execute(&b, m.(telegraf.TemplateMetric)); err != nil { return "", err } diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 9e710c846..e31085827 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -6,9 +6,13 @@ import ( _ "embed" "errors" "fmt" + "regexp" "sync" + "text/template" "time" + "github.com/Masterminds/sprig/v3" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" @@ -19,6 +23,9 @@ import ( //go:embed sample.conf var sampleConfig string +var pluginNameRe = regexp.MustCompile(`({{.*\B)\.PluginName(\b[^}]*}})`) +var hostnameRe = regexp.MustCompile(`({{.*\B)\.Hostname(\b[^}]*}})`) + type message struct { topic string payload []byte @@ -38,8 +45,8 @@ type MQTT struct { serializer telegraf.Serializer generator *TopicNameGenerator - homieDeviceNameGenerator *HomieGenerator - homieNodeIDGenerator *HomieGenerator + homieDeviceNameGenerator *template.Template + homieNodeIDGenerator *template.Template homieSeen map[string]map[string]bool sync.Mutex @@ -81,7 +88,8 @@ func (m *MQTT) Init() error { return errors.New("missing 'homie_device_name' option") } - m.homieDeviceNameGenerator, err = NewHomieGenerator(m.HomieDeviceName) + m.HomieDeviceName = pluginNameRe.ReplaceAllString(m.HomieDeviceName, `$1.Name$2`) + m.homieDeviceNameGenerator, err = template.New("topic_name").Funcs(sprig.TxtFuncMap()).Parse(m.HomieDeviceName) if err != nil { return fmt.Errorf("creating device name generator failed: %w", err) } @@ -90,7 +98,8 @@ func (m *MQTT) Init() error { return errors.New("missing 'homie_node_id' option") } - m.homieNodeIDGenerator, err = NewHomieGenerator(m.HomieNodeID) + m.HomieNodeID = pluginNameRe.ReplaceAllString(m.HomieNodeID, `$1.Name$2`) + m.homieNodeIDGenerator, err = template.New("topic_name").Funcs(sprig.TxtFuncMap()).Parse(m.HomieNodeID) if err != nil { return fmt.Errorf("creating node ID name generator failed: %w", err) } @@ -144,22 +153,17 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { return nil } - hostname, ok := metrics[0].Tags()["host"] - if !ok { - hostname = "" - } - // Group the metrics to topics and serialize them var topicMessages []message switch m.Layout { case "batch": - topicMessages = m.collectBatch(hostname, metrics) + topicMessages = m.collectBatch(metrics) case "non-batch": - topicMessages = m.collectNonBatch(hostname, metrics) + topicMessages = m.collectNonBatch(metrics) case "field": - topicMessages = m.collectField(hostname, metrics) + topicMessages = m.collectField(metrics) case "homie-v4": - topicMessages = m.collectHomieV4(hostname, metrics) + topicMessages = m.collectHomieV4(metrics) default: return fmt.Errorf("unknown layout %q", m.Layout) } @@ -178,10 +182,10 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { return nil } -func (m *MQTT) collectNonBatch(hostname string, metrics []telegraf.Metric) []message { +func (m *MQTT) collectNonBatch(metrics []telegraf.Metric) []message { collection := make([]message, 0, len(metrics)) for _, metric := range metrics { - topic, err := m.generator.Generate(hostname, metric) + topic, err := m.generateTopic(metric) if err != nil { m.Log.Warnf("Generating topic name failed: %v", err) m.Log.Debugf("metric was: %v", metric) @@ -200,10 +204,10 @@ func (m *MQTT) collectNonBatch(hostname string, metrics []telegraf.Metric) []mes return collection } -func (m *MQTT) collectBatch(hostname string, metrics []telegraf.Metric) []message { +func (m *MQTT) collectBatch(metrics []telegraf.Metric) []message { metricsCollection := make(map[string][]telegraf.Metric) for _, metric := range metrics { - topic, err := m.generator.Generate(hostname, metric) + topic, err := m.generateTopic(metric) if err != nil { m.Log.Warnf("Generating topic name failed: %v", err) m.Log.Debugf("metric was: %v", metric) @@ -224,10 +228,10 @@ func (m *MQTT) collectBatch(hostname string, metrics []telegraf.Metric) []messag return collection } -func (m *MQTT) collectField(hostname string, metrics []telegraf.Metric) []message { +func (m *MQTT) collectField(metrics []telegraf.Metric) []message { var collection []message for _, metric := range metrics { - topic, err := m.generator.Generate(hostname, metric) + topic, err := m.generateTopic(metric) if err != nil { m.Log.Warnf("Generating topic name failed: %v", err) m.Log.Debugf("metric was: %v", metric) @@ -248,10 +252,10 @@ func (m *MQTT) collectField(hostname string, metrics []telegraf.Metric) []messag return collection } -func (m *MQTT) collectHomieV4(hostname string, metrics []telegraf.Metric) []message { +func (m *MQTT) collectHomieV4(metrics []telegraf.Metric) []message { var collection []message for _, metric := range metrics { - topic, err := m.generator.Generate(hostname, metric) + topic, err := m.generateTopic(metric) if err != nil { m.Log.Warnf("Generating topic name failed: %v", err) m.Log.Debugf("metric was: %v", metric) diff --git a/plugins/outputs/mqtt/mqtt_test.go b/plugins/outputs/mqtt/mqtt_test.go index ca37dae9d..9e12e77fb 100644 --- a/plugins/outputs/mqtt/mqtt_test.go +++ b/plugins/outputs/mqtt/mqtt_test.go @@ -169,7 +169,7 @@ func TestIntegrationMQTTv3(t *testing.T) { Timeout: config.Duration(5 * time.Second), AutoReconnect: true, }, - Topic: topic + "/{{.PluginName}}", + Topic: topic + "/{{.Name}}", Layout: "non-batch", Log: testutil.Logger{Name: "mqttv3-integration-test"}, } @@ -321,7 +321,7 @@ func TestIntegrationMQTTLayoutNonBatch(t *testing.T) { Timeout: config.Duration(5 * time.Second), AutoReconnect: true, }, - Topic: topic + "/{{.PluginName}}", + Topic: topic + "/{{.Name}}", Layout: "non-batch", Log: testutil.Logger{Name: "mqttv3-integration-test"}, } @@ -408,7 +408,7 @@ func TestIntegrationMQTTLayoutBatch(t *testing.T) { Timeout: config.Duration(5 * time.Second), AutoReconnect: true, }, - Topic: topic + "/{{.PluginName}}", + Topic: topic + "/{{.Name}}", Layout: "batch", Log: testutil.Logger{Name: "mqttv3-integration-test-"}, } @@ -492,7 +492,7 @@ func TestIntegrationMQTTLayoutField(t *testing.T) { Timeout: config.Duration(5 * time.Second), AutoReconnect: true, }, - Topic: topic + `/{{.PluginName}}/{{.Tag "source"}}`, + Topic: topic + `/{{.Name}}/{{.Tag "source"}}`, Layout: "field", Log: testutil.Logger{Name: "mqttv3-integration-test-"}, } @@ -607,8 +607,8 @@ func TestIntegrationMQTTLayoutHomieV4(t *testing.T) { Timeout: config.Duration(5 * time.Second), AutoReconnect: true, }, - Topic: topic + "/{{.PluginName}}", - HomieDeviceName: `{{.PluginName}}`, + Topic: topic + "/{{.Name}}", + HomieDeviceName: `{{.Name}}`, HomieNodeID: `{{.Tag "source"}}`, Layout: "homie-v4", Log: testutil.Logger{Name: "mqttv3-integration-test-"}, @@ -883,6 +883,11 @@ func TestGenerateTopicName(t *testing.T) { pattern: "telegraf/{{ .Hostname }}/{{ .PluginName }}", want: "telegraf/hostname/metric-name", }, + { + name: "matches default format", + pattern: `telegraf/{{ .Tag "host" }}/{{ .Name }}`, + want: "telegraf/hostname/metric-name", + }, { name: "respect hardcoded strings", pattern: "this/is/a/topic", @@ -920,12 +925,12 @@ func TestGenerateTopicName(t *testing.T) { m.TopicPrefix = "prefix" met := metric.New( "metric-name", - map[string]string{"tag1": "value1"}, + map[string]string{"tag1": "value1", "host": "hostname"}, map[string]interface{}{"value": 123}, time.Date(2022, time.November, 10, 23, 0, 0, 0, time.UTC), ) require.NoError(t, m.Init()) - actual, err := m.generator.Generate("hostname", met) + actual, err := m.generateTopic(met) require.NoError(t, err) require.Equal(t, tt.want, actual) }) diff --git a/plugins/outputs/mqtt/sample.conf b/plugins/outputs/mqtt/sample.conf index 3d1f855b0..502f22e67 100644 --- a/plugins/outputs/mqtt/sample.conf +++ b/plugins/outputs/mqtt/sample.conf @@ -13,14 +13,14 @@ ## MQTT Topic for Producer Messages ## MQTT outputs send metrics to this topic format: - ## {{ .TopicPrefix }}/{{ .Hostname }}/{{ .PluginName }}/{{ .Tag "tag_key" }} + ## prefix/{{ .Tag "host" }}/{{ .Name }}/{{ .Tag "tag_key" }} ## (e.g. prefix/web01.example.com/mem/some_tag_value) ## Each path segment accepts either a template placeholder, an environment variable, or a tag key ## of the form `{{.Tag "tag_key_name"}}`. All the functions provided by the Sprig library ## (http://masterminds.github.io/sprig/) are available. Empty path elements as well as special MQTT ## characters (such as `+` or `#`) are invalid to form the topic name and will lead to an error. ## In case a tag is missing in the metric, that path segment omitted for the final topic. - topic = "telegraf/{{ .Hostname }}/{{ .PluginName }}" + topic = 'telegraf/{{ .Tag "host" }}/{{ .Name }}' ## QoS policy for messages ## The mqtt QoS policy for sending messages. @@ -87,8 +87,8 @@ ## HOMIE specific settings ## The following options provide templates for setting the device name ## and the node-ID for the topics. Both options are MANDATORY and can contain - ## {{ .PluginName }} (metric name), {{ .Tag "key"}} (tag reference to 'key') - ## or constant strings. The templays MAY NOT contain slashes! + ## {{ .Name }} (metric name), {{ .Tag "key"}} (tag reference to 'key') or + ## constant strings. The templates MAY NOT contain slashes! # homie_device_name = "" # homie_node_id = "" diff --git a/plugins/outputs/mqtt/topic_name_generator.go b/plugins/outputs/mqtt/topic_name_generator.go index c51084450..834b183e3 100644 --- a/plugins/outputs/mqtt/topic_name_generator.go +++ b/plugins/outputs/mqtt/topic_name_generator.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" "text/template" + "time" "github.com/Masterminds/sprig/v3" @@ -11,14 +12,15 @@ import ( ) type TopicNameGenerator struct { - Hostname string TopicPrefix string - PluginName string - metric telegraf.Metric + metric telegraf.TemplateMetric template *template.Template } func NewTopicNameGenerator(topicPrefix, topic string) (*TopicNameGenerator, error) { + topic = hostnameRe.ReplaceAllString(topic, `$1.Tag "host"$2`) + topic = pluginNameRe.ReplaceAllString(topic, `$1.Name$2`) + tt, err := template.New("topic_name").Funcs(sprig.TxtFuncMap()).Parse(topic) if err != nil { return nil, err @@ -31,17 +33,40 @@ func NewTopicNameGenerator(topicPrefix, topic string) (*TopicNameGenerator, erro return &TopicNameGenerator{TopicPrefix: topicPrefix, template: tt}, nil } -func (t *TopicNameGenerator) Tag(key string) string { - tagString, _ := t.metric.GetTag(key) - return tagString +func (t *TopicNameGenerator) Name() string { + return t.metric.Name() } -func (t *TopicNameGenerator) Generate(hostname string, m telegraf.Metric) (string, error) { - t.Hostname = hostname - t.metric = m - t.PluginName = m.Name() +func (t *TopicNameGenerator) Tag(key string) string { + return t.metric.Tag(key) +} + +func (t *TopicNameGenerator) Field(key string) interface{} { + return t.metric.Field(key) +} + +func (t *TopicNameGenerator) Time() time.Time { + return t.metric.Time() +} + +func (t *TopicNameGenerator) Tags() map[string]string { + return t.metric.Tags() +} + +func (t *TopicNameGenerator) Fields() map[string]interface{} { + return t.metric.Fields() +} + +func (t *TopicNameGenerator) String() string { + return t.metric.String() +} + +func (m *MQTT) generateTopic(metric telegraf.Metric) (string, error) { + m.generator.metric = metric.(telegraf.TemplateMetric) + + // Cannot directly pass TemplateMetric since TopicNameGenerator still contains TopicPrefix (until v1.35.0) var b strings.Builder - err := t.template.Execute(&b, t) + err := m.generator.template.Execute(&b, m.generator) if err != nil { return "", err } @@ -54,7 +79,7 @@ func (t *TopicNameGenerator) Generate(hostname string, m telegraf.Metric) (strin topic := strings.Join(ts, "/") // This is to keep backward compatibility with previous behaviour where the plugin name was always present if topic == "" { - return m.Name(), nil + return metric.Name(), nil } if strings.HasPrefix(b.String(), "/") { topic = "/" + topic