diff --git a/plugins/outputs/mqtt/README.md b/plugins/outputs/mqtt/README.md index 2db4cbee9..70e368830 100644 --- a/plugins/outputs/mqtt/README.md +++ b/plugins/outputs/mqtt/README.md @@ -105,4 +105,19 @@ to use them. ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "influx" + + ## Optional MQTT 5 publish properties + ## These setting only apply if the "protocol" property is set to 5. This must + ## be defined at the end of the plugin settings, otherwise TOML will assume + ## anything else is part of this table. For more details on publish properties + ## see the spec: + ## https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109 + # [outputs.mqtt.v5] + # content_type = "" + # response_topic = "" + # message_expiry = "0s" + # topic_alias = 0 + # [outputs.mqtt.v5.user_properties] + # "key1" = "value 1" + # "key2" = "value 2" ``` diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 8a2fbf4c7..84226cb9e 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -35,10 +35,11 @@ type MQTT struct { QoS int `toml:"qos"` ClientID string `toml:"client_id"` tls.ClientConfig - BatchMessage bool `toml:"batch"` - Retain bool `toml:"retain"` - KeepAlive int64 `toml:"keep_alive"` - Log telegraf.Logger `toml:"-"` + BatchMessage bool `toml:"batch"` + Retain bool `toml:"retain"` + KeepAlive int64 `toml:"keep_alive"` + V5PublishProperties *mqttv5PublishProperties `toml:"v5"` + Log telegraf.Logger `toml:"-"` client Client serializer serializers.Serializer diff --git a/plugins/outputs/mqtt/mqtt_test.go b/plugins/outputs/mqtt/mqtt_test.go index 11ed6a9e1..a13d7617e 100644 --- a/plugins/outputs/mqtt/mqtt_test.go +++ b/plugins/outputs/mqtt/mqtt_test.go @@ -2,7 +2,6 @@ package mqtt import ( "fmt" - "github.com/influxdata/telegraf/metric" "path/filepath" "testing" "time" @@ -10,6 +9,8 @@ import ( "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" ) @@ -98,20 +99,55 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) { var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) s := serializers.NewInfluxSerializer() - m := &MQTT{ - Servers: []string{url}, - Protocol: "5", - serializer: s, - KeepAlive: 30, - Log: testutil.Logger{Name: "mqttv5-integration-test"}, + + tests := []struct { + name string + properties *mqttv5PublishProperties + }{ + { + name: "no publish properties", + properties: nil, + }, + { + name: "content type set", + properties: &mqttv5PublishProperties{ContentType: "text/plain"}, + }, + { + name: "response topic set", + properties: &mqttv5PublishProperties{ResponseTopic: "test/topic"}, + }, + { + name: "message expiry set", + properties: &mqttv5PublishProperties{MessageExpiry: config.Duration(10 * time.Minute)}, + }, + { + name: "topic alias set", + properties: &mqttv5PublishProperties{TopicAlias: new(uint16)}, + }, + { + name: "user properties set", + properties: &mqttv5PublishProperties{UserProperties: map[string]string{"key": "value"}}, + }, } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &MQTT{ + Servers: []string{url}, + Protocol: "5", + serializer: s, + KeepAlive: 30, + Log: testutil.Logger{Name: "mqttv5-integration-test"}, + V5PublishProperties: tt.properties, + } - // Verify that we can connect to the MQTT broker - require.NoError(t, m.Init()) - require.NoError(t, m.Connect()) + // Verify that we can connect to the MQTT broker + require.NoError(t, m.Init()) + require.NoError(t, m.Connect()) - // Verify that we can successfully write data to the mqtt broker - require.NoError(t, m.Write(testutil.MockMetrics())) + // Verify that we can successfully write data to the mqtt broker + require.NoError(t, m.Write(testutil.MockMetrics())) + }) + } } func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) { diff --git a/plugins/outputs/mqtt/mqtt_v5.go b/plugins/outputs/mqtt/mqtt_v5.go index 6976c32ae..b80282d34 100644 --- a/plugins/outputs/mqtt/mqtt_v5.go +++ b/plugins/outputs/mqtt/mqtt_v5.go @@ -13,13 +13,54 @@ import ( "github.com/influxdata/telegraf/internal" ) +// mqtt v5-specific publish properties. +// See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109 +type mqttv5PublishProperties struct { + ContentType string `toml:"content_type"` + ResponseTopic string `toml:"response_topic"` + MessageExpiry config.Duration `toml:"message_expiry"` + TopicAlias *uint16 `toml:"topic_alias"` + UserProperties map[string]string `toml:"user_properties"` +} + type mqttv5Client struct { *MQTT - client *mqttv5auto.ConnectionManager + client *mqttv5auto.ConnectionManager + publishProperties *mqttv5.PublishProperties } func newMQTTv5Client(cfg *MQTT) *mqttv5Client { - return &mqttv5Client{MQTT: cfg} + return &mqttv5Client{ + MQTT: cfg, + publishProperties: buildPublishProperties(cfg), + } +} + +// Build the v5 specific publish properties if they are present in the +// config. +// These should not change during the lifecycle of the client. +func buildPublishProperties(cfg *MQTT) *mqttv5.PublishProperties { + if cfg.V5PublishProperties == nil { + return nil + } + + publishProperties := &mqttv5.PublishProperties{ + ContentType: cfg.V5PublishProperties.ContentType, + ResponseTopic: cfg.V5PublishProperties.ResponseTopic, + TopicAlias: cfg.V5PublishProperties.TopicAlias, + User: make([]mqttv5.UserProperty, 0, len(cfg.V5PublishProperties.UserProperties)), + } + + messageExpiry := time.Duration(cfg.V5PublishProperties.MessageExpiry) + if expirySeconds := uint32(messageExpiry.Seconds()); expirySeconds > 0 { + publishProperties.MessageExpiry = &expirySeconds + } + + for k, v := range cfg.V5PublishProperties.UserProperties { + publishProperties.User.Add(k, v) + } + + return publishProperties } func (m *mqttv5Client) Connect() error { @@ -92,10 +133,11 @@ func (m *mqttv5Client) Publish(topic string, body []byte) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(m.Timeout)) defer cancel() _, err := m.client.Publish(ctx, &mqttv5.Publish{ - Topic: topic, - QoS: byte(m.QoS), - Retain: m.Retain, - Payload: body, + Topic: topic, + QoS: byte(m.QoS), + Retain: m.Retain, + Payload: body, + Properties: m.publishProperties, }) if err != nil { return err diff --git a/plugins/outputs/mqtt/sample.conf b/plugins/outputs/mqtt/sample.conf index 9d87e94f1..2117ef474 100644 --- a/plugins/outputs/mqtt/sample.conf +++ b/plugins/outputs/mqtt/sample.conf @@ -70,3 +70,18 @@ ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md data_format = "influx" + + ## Optional MQTT 5 publish properties + ## These setting only apply if the "protocol" property is set to 5. This must + ## be defined at the end of the plugin settings, otherwise TOML will assume + ## anything else is part of this table. For more details on publish properties + ## see the spec: + ## https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109 + # [outputs.mqtt.v5] + # content_type = "" + # response_topic = "" + # message_expiry = "0s" + # topic_alias = 0 + # [outputs.mqtt.v5.user_properties] + # "key1" = "value 1" + # "key2" = "value 2"