chore(outputs.mqtt): Rework templating to use TemplateMetric (#16560)

This commit is contained in:
Thomas Casteleyn 2025-05-05 00:27:29 +02:00 committed by GitHub
parent b49810d337
commit 548c064cc6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 93 additions and 84 deletions

View File

@ -52,14 +52,14 @@ to use them.
## MQTT Topic for Producer Messages
## MQTT outputs send metrics to this topic format:
## {{ .TopicPrefix }}/{{ .Hostname }}/{{ .PluginName }}/{{ .Tag "tag_key" }}
## prefix/{{ .Tag "host" }}/{{ .Name }}/{{ .Tag "tag_key" }}
## (e.g. prefix/web01.example.com/mem/some_tag_value)
## Each path segment accepts either a template placeholder, an environment variable, or a tag key
## of the form `{{.Tag "tag_key_name"}}`. All the functions provided by the Sprig library
## (http://masterminds.github.io/sprig/) are available. Empty path elements as well as special MQTT
## characters (such as `+` or `#`) are invalid to form the topic name and will lead to an error.
## In case a tag is missing in the metric, that path segment omitted for the final topic.
topic = "telegraf/{{ .Hostname }}/{{ .PluginName }}"
topic = 'telegraf/{{ .Tag "host" }}/{{ .Name }}'
## QoS policy for messages
## The mqtt QoS policy for sending messages.
@ -126,8 +126,8 @@ to use them.
## HOMIE specific settings
## The following options provide templates for setting the device name
## and the node-ID for the topics. Both options are MANDATORY and can contain
## {{ .PluginName }} (metric name), {{ .Tag "key"}} (tag reference to 'key')
## or constant strings. The templays MAY NOT contain slashes!
## {{ .Name }} (metric name), {{ .Tag "key"}} (tag reference to 'key') or
## constant strings. The templates MAY NOT contain slashes!
# homie_device_name = ""
# homie_node_id = ""
@ -172,7 +172,7 @@ with configuration
```toml
[[outputs.mqtt]]
topic = 'telegraf/{{ .PluginName }}/{{ .Tag "source" }}'
topic = 'telegraf/{{ .Name }}/{{ .Tag "source" }}'
layout = "field"
...
```
@ -199,8 +199,8 @@ used to specify the `device-id` path. The __mandatory__ options
`homie_device_name` will specify the content of the `$name` topic of the device,
while `homie_node_id` will provide a template for the `node-id` part of the
topic. Both options can contain [Go templates][GoTemplates] similar to `topic`
with `{{ .PluginName }}` referencing the metric name and `{{ .Tag "key"}}`
referencing the tag with the name `key`.
with `{{ .Name }}` referencing the metric name and `{{ .Tag "key"}}` referencing
the tag with the name `key`.
[Sprig](http://masterminds.github.io/sprig/) helper functions are available.
For example writing the metrics
@ -215,11 +215,11 @@ with configuration
```toml
[[outputs.mqtt]]
topic = 'telegraf/{{ .PluginName }}'
topic = 'telegraf/{{ .Name }}'
layout = "homie-v4"
homie_device_name ='{{.PluginName}} plugin'
homie_node_id = '{{.Tag "source"}}'
homie_device_name ='{{ .Name }} plugin'
homie_node_id = '{{ .Tag "source" }}'
...
```

View File

@ -8,8 +8,6 @@ import (
"strings"
"text/template"
"github.com/Masterminds/sprig/v3"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
)
@ -21,7 +19,7 @@ func (m *MQTT) collectHomieDeviceMessages(topic string, metric telegraf.Metric)
// Check if the device-id is already registered
if _, found := m.homieSeen[topic]; !found {
deviceName, err := m.homieDeviceNameGenerator.Generate(metric)
deviceName, err := homieGenerate(m.homieDeviceNameGenerator, metric)
if err != nil {
return nil, "", fmt.Errorf("generating device name failed: %w", err)
}
@ -34,7 +32,7 @@ func (m *MQTT) collectHomieDeviceMessages(topic string, metric telegraf.Metric)
}
// Generate the node-ID from the metric and fixup invalid characters
nodeName, err := m.homieNodeIDGenerator.Generate(metric)
nodeName, err := homieGenerate(m.homieNodeIDGenerator, metric)
if err != nil {
return nil, "", fmt.Errorf("generating device ID failed: %w", err)
}
@ -97,32 +95,9 @@ func convertType(value interface{}) (val, dtype string, err error) {
return "", "", fmt.Errorf("unknown type %T", value)
}
type HomieGenerator struct {
PluginName string
metric telegraf.Metric
template *template.Template
}
func NewHomieGenerator(tmpl string) (*HomieGenerator, error) {
tt, err := template.New("topic_name").Funcs(sprig.TxtFuncMap()).Parse(tmpl)
if err != nil {
return nil, err
}
return &HomieGenerator{template: tt}, nil
}
func (t *HomieGenerator) Tag(key string) string {
tagString, _ := t.metric.GetTag(key)
return tagString
}
func (t *HomieGenerator) Generate(m telegraf.Metric) (string, error) {
t.PluginName = m.Name()
t.metric = m
func homieGenerate(t *template.Template, m telegraf.Metric) (string, error) {
var b strings.Builder
if err := t.template.Execute(&b, t); err != nil {
if err := t.Execute(&b, m.(telegraf.TemplateMetric)); err != nil {
return "", err
}

View File

@ -6,9 +6,13 @@ import (
_ "embed"
"errors"
"fmt"
"regexp"
"sync"
"text/template"
"time"
"github.com/Masterminds/sprig/v3"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
@ -19,6 +23,9 @@ import (
//go:embed sample.conf
var sampleConfig string
var pluginNameRe = regexp.MustCompile(`({{.*\B)\.PluginName(\b[^}]*}})`)
var hostnameRe = regexp.MustCompile(`({{.*\B)\.Hostname(\b[^}]*}})`)
type message struct {
topic string
payload []byte
@ -38,8 +45,8 @@ type MQTT struct {
serializer telegraf.Serializer
generator *TopicNameGenerator
homieDeviceNameGenerator *HomieGenerator
homieNodeIDGenerator *HomieGenerator
homieDeviceNameGenerator *template.Template
homieNodeIDGenerator *template.Template
homieSeen map[string]map[string]bool
sync.Mutex
@ -81,7 +88,8 @@ func (m *MQTT) Init() error {
return errors.New("missing 'homie_device_name' option")
}
m.homieDeviceNameGenerator, err = NewHomieGenerator(m.HomieDeviceName)
m.HomieDeviceName = pluginNameRe.ReplaceAllString(m.HomieDeviceName, `$1.Name$2`)
m.homieDeviceNameGenerator, err = template.New("topic_name").Funcs(sprig.TxtFuncMap()).Parse(m.HomieDeviceName)
if err != nil {
return fmt.Errorf("creating device name generator failed: %w", err)
}
@ -90,7 +98,8 @@ func (m *MQTT) Init() error {
return errors.New("missing 'homie_node_id' option")
}
m.homieNodeIDGenerator, err = NewHomieGenerator(m.HomieNodeID)
m.HomieNodeID = pluginNameRe.ReplaceAllString(m.HomieNodeID, `$1.Name$2`)
m.homieNodeIDGenerator, err = template.New("topic_name").Funcs(sprig.TxtFuncMap()).Parse(m.HomieNodeID)
if err != nil {
return fmt.Errorf("creating node ID name generator failed: %w", err)
}
@ -144,22 +153,17 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
return nil
}
hostname, ok := metrics[0].Tags()["host"]
if !ok {
hostname = ""
}
// Group the metrics to topics and serialize them
var topicMessages []message
switch m.Layout {
case "batch":
topicMessages = m.collectBatch(hostname, metrics)
topicMessages = m.collectBatch(metrics)
case "non-batch":
topicMessages = m.collectNonBatch(hostname, metrics)
topicMessages = m.collectNonBatch(metrics)
case "field":
topicMessages = m.collectField(hostname, metrics)
topicMessages = m.collectField(metrics)
case "homie-v4":
topicMessages = m.collectHomieV4(hostname, metrics)
topicMessages = m.collectHomieV4(metrics)
default:
return fmt.Errorf("unknown layout %q", m.Layout)
}
@ -178,10 +182,10 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
return nil
}
func (m *MQTT) collectNonBatch(hostname string, metrics []telegraf.Metric) []message {
func (m *MQTT) collectNonBatch(metrics []telegraf.Metric) []message {
collection := make([]message, 0, len(metrics))
for _, metric := range metrics {
topic, err := m.generator.Generate(hostname, metric)
topic, err := m.generateTopic(metric)
if err != nil {
m.Log.Warnf("Generating topic name failed: %v", err)
m.Log.Debugf("metric was: %v", metric)
@ -200,10 +204,10 @@ func (m *MQTT) collectNonBatch(hostname string, metrics []telegraf.Metric) []mes
return collection
}
func (m *MQTT) collectBatch(hostname string, metrics []telegraf.Metric) []message {
func (m *MQTT) collectBatch(metrics []telegraf.Metric) []message {
metricsCollection := make(map[string][]telegraf.Metric)
for _, metric := range metrics {
topic, err := m.generator.Generate(hostname, metric)
topic, err := m.generateTopic(metric)
if err != nil {
m.Log.Warnf("Generating topic name failed: %v", err)
m.Log.Debugf("metric was: %v", metric)
@ -224,10 +228,10 @@ func (m *MQTT) collectBatch(hostname string, metrics []telegraf.Metric) []messag
return collection
}
func (m *MQTT) collectField(hostname string, metrics []telegraf.Metric) []message {
func (m *MQTT) collectField(metrics []telegraf.Metric) []message {
var collection []message
for _, metric := range metrics {
topic, err := m.generator.Generate(hostname, metric)
topic, err := m.generateTopic(metric)
if err != nil {
m.Log.Warnf("Generating topic name failed: %v", err)
m.Log.Debugf("metric was: %v", metric)
@ -248,10 +252,10 @@ func (m *MQTT) collectField(hostname string, metrics []telegraf.Metric) []messag
return collection
}
func (m *MQTT) collectHomieV4(hostname string, metrics []telegraf.Metric) []message {
func (m *MQTT) collectHomieV4(metrics []telegraf.Metric) []message {
var collection []message
for _, metric := range metrics {
topic, err := m.generator.Generate(hostname, metric)
topic, err := m.generateTopic(metric)
if err != nil {
m.Log.Warnf("Generating topic name failed: %v", err)
m.Log.Debugf("metric was: %v", metric)

View File

@ -169,7 +169,7 @@ func TestIntegrationMQTTv3(t *testing.T) {
Timeout: config.Duration(5 * time.Second),
AutoReconnect: true,
},
Topic: topic + "/{{.PluginName}}",
Topic: topic + "/{{.Name}}",
Layout: "non-batch",
Log: testutil.Logger{Name: "mqttv3-integration-test"},
}
@ -321,7 +321,7 @@ func TestIntegrationMQTTLayoutNonBatch(t *testing.T) {
Timeout: config.Duration(5 * time.Second),
AutoReconnect: true,
},
Topic: topic + "/{{.PluginName}}",
Topic: topic + "/{{.Name}}",
Layout: "non-batch",
Log: testutil.Logger{Name: "mqttv3-integration-test"},
}
@ -408,7 +408,7 @@ func TestIntegrationMQTTLayoutBatch(t *testing.T) {
Timeout: config.Duration(5 * time.Second),
AutoReconnect: true,
},
Topic: topic + "/{{.PluginName}}",
Topic: topic + "/{{.Name}}",
Layout: "batch",
Log: testutil.Logger{Name: "mqttv3-integration-test-"},
}
@ -492,7 +492,7 @@ func TestIntegrationMQTTLayoutField(t *testing.T) {
Timeout: config.Duration(5 * time.Second),
AutoReconnect: true,
},
Topic: topic + `/{{.PluginName}}/{{.Tag "source"}}`,
Topic: topic + `/{{.Name}}/{{.Tag "source"}}`,
Layout: "field",
Log: testutil.Logger{Name: "mqttv3-integration-test-"},
}
@ -607,8 +607,8 @@ func TestIntegrationMQTTLayoutHomieV4(t *testing.T) {
Timeout: config.Duration(5 * time.Second),
AutoReconnect: true,
},
Topic: topic + "/{{.PluginName}}",
HomieDeviceName: `{{.PluginName}}`,
Topic: topic + "/{{.Name}}",
HomieDeviceName: `{{.Name}}`,
HomieNodeID: `{{.Tag "source"}}`,
Layout: "homie-v4",
Log: testutil.Logger{Name: "mqttv3-integration-test-"},
@ -883,6 +883,11 @@ func TestGenerateTopicName(t *testing.T) {
pattern: "telegraf/{{ .Hostname }}/{{ .PluginName }}",
want: "telegraf/hostname/metric-name",
},
{
name: "matches default format",
pattern: `telegraf/{{ .Tag "host" }}/{{ .Name }}`,
want: "telegraf/hostname/metric-name",
},
{
name: "respect hardcoded strings",
pattern: "this/is/a/topic",
@ -920,12 +925,12 @@ func TestGenerateTopicName(t *testing.T) {
m.TopicPrefix = "prefix"
met := metric.New(
"metric-name",
map[string]string{"tag1": "value1"},
map[string]string{"tag1": "value1", "host": "hostname"},
map[string]interface{}{"value": 123},
time.Date(2022, time.November, 10, 23, 0, 0, 0, time.UTC),
)
require.NoError(t, m.Init())
actual, err := m.generator.Generate("hostname", met)
actual, err := m.generateTopic(met)
require.NoError(t, err)
require.Equal(t, tt.want, actual)
})

View File

@ -13,14 +13,14 @@
## MQTT Topic for Producer Messages
## MQTT outputs send metrics to this topic format:
## {{ .TopicPrefix }}/{{ .Hostname }}/{{ .PluginName }}/{{ .Tag "tag_key" }}
## prefix/{{ .Tag "host" }}/{{ .Name }}/{{ .Tag "tag_key" }}
## (e.g. prefix/web01.example.com/mem/some_tag_value)
## Each path segment accepts either a template placeholder, an environment variable, or a tag key
## of the form `{{.Tag "tag_key_name"}}`. All the functions provided by the Sprig library
## (http://masterminds.github.io/sprig/) are available. Empty path elements as well as special MQTT
## characters (such as `+` or `#`) are invalid to form the topic name and will lead to an error.
## In case a tag is missing in the metric, that path segment omitted for the final topic.
topic = "telegraf/{{ .Hostname }}/{{ .PluginName }}"
topic = 'telegraf/{{ .Tag "host" }}/{{ .Name }}'
## QoS policy for messages
## The mqtt QoS policy for sending messages.
@ -87,8 +87,8 @@
## HOMIE specific settings
## The following options provide templates for setting the device name
## and the node-ID for the topics. Both options are MANDATORY and can contain
## {{ .PluginName }} (metric name), {{ .Tag "key"}} (tag reference to 'key')
## or constant strings. The templays MAY NOT contain slashes!
## {{ .Name }} (metric name), {{ .Tag "key"}} (tag reference to 'key') or
## constant strings. The templates MAY NOT contain slashes!
# homie_device_name = ""
# homie_node_id = ""

View File

@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"text/template"
"time"
"github.com/Masterminds/sprig/v3"
@ -11,14 +12,15 @@ import (
)
type TopicNameGenerator struct {
Hostname string
TopicPrefix string
PluginName string
metric telegraf.Metric
metric telegraf.TemplateMetric
template *template.Template
}
func NewTopicNameGenerator(topicPrefix, topic string) (*TopicNameGenerator, error) {
topic = hostnameRe.ReplaceAllString(topic, `$1.Tag "host"$2`)
topic = pluginNameRe.ReplaceAllString(topic, `$1.Name$2`)
tt, err := template.New("topic_name").Funcs(sprig.TxtFuncMap()).Parse(topic)
if err != nil {
return nil, err
@ -31,17 +33,40 @@ func NewTopicNameGenerator(topicPrefix, topic string) (*TopicNameGenerator, erro
return &TopicNameGenerator{TopicPrefix: topicPrefix, template: tt}, nil
}
func (t *TopicNameGenerator) Tag(key string) string {
tagString, _ := t.metric.GetTag(key)
return tagString
func (t *TopicNameGenerator) Name() string {
return t.metric.Name()
}
func (t *TopicNameGenerator) Generate(hostname string, m telegraf.Metric) (string, error) {
t.Hostname = hostname
t.metric = m
t.PluginName = m.Name()
func (t *TopicNameGenerator) Tag(key string) string {
return t.metric.Tag(key)
}
func (t *TopicNameGenerator) Field(key string) interface{} {
return t.metric.Field(key)
}
func (t *TopicNameGenerator) Time() time.Time {
return t.metric.Time()
}
func (t *TopicNameGenerator) Tags() map[string]string {
return t.metric.Tags()
}
func (t *TopicNameGenerator) Fields() map[string]interface{} {
return t.metric.Fields()
}
func (t *TopicNameGenerator) String() string {
return t.metric.String()
}
func (m *MQTT) generateTopic(metric telegraf.Metric) (string, error) {
m.generator.metric = metric.(telegraf.TemplateMetric)
// Cannot directly pass TemplateMetric since TopicNameGenerator still contains TopicPrefix (until v1.35.0)
var b strings.Builder
err := t.template.Execute(&b, t)
err := m.generator.template.Execute(&b, m.generator)
if err != nil {
return "", err
}
@ -54,7 +79,7 @@ func (t *TopicNameGenerator) Generate(hostname string, m telegraf.Metric) (strin
topic := strings.Join(ts, "/")
// This is to keep backward compatibility with previous behaviour where the plugin name was always present
if topic == "" {
return m.Name(), nil
return metric.Name(), nil
}
if strings.HasPrefix(b.String(), "/") {
topic = "/" + topic