From 75cbda186ca1e81a4acdd364c2eb1d96d27c5e16 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Fri, 17 Feb 2023 20:59:02 +0100 Subject: [PATCH] chore(mqtt): unify input and output plugin's MQTT client (#12683) --- plugins/common/mqtt/mqtt.go | 94 +++++++++ plugins/common/mqtt/mqtt_test.go | 26 +++ plugins/common/mqtt/mqtt_v3.go | 129 ++++++++++++ plugins/common/mqtt/mqtt_v5.go | 152 ++++++++++++++ .../mqtt_consumer/testdata/mosquitto.conf | 3 + plugins/outputs/mqtt/mqtt.go | 98 +++------ plugins/outputs/mqtt/mqtt_test.go | 195 +++++++++++++++--- plugins/outputs/mqtt/mqtt_v3.go | 104 ---------- plugins/outputs/mqtt/mqtt_v5.go | 150 -------------- 9 files changed, 608 insertions(+), 343 deletions(-) create mode 100644 plugins/common/mqtt/mqtt.go create mode 100644 plugins/common/mqtt/mqtt_test.go create mode 100644 plugins/common/mqtt/mqtt_v3.go create mode 100644 plugins/common/mqtt/mqtt_v5.go create mode 100644 plugins/inputs/mqtt_consumer/testdata/mosquitto.conf delete mode 100644 plugins/outputs/mqtt/mqtt_v3.go delete mode 100644 plugins/outputs/mqtt/mqtt_v5.go diff --git a/plugins/common/mqtt/mqtt.go b/plugins/common/mqtt/mqtt.go new file mode 100644 index 000000000..892eec39c --- /dev/null +++ b/plugins/common/mqtt/mqtt.go @@ -0,0 +1,94 @@ +package mqtt + +import ( + "errors" + "fmt" + "net/url" + "strings" + + paho "github.com/eclipse/paho.mqtt.golang" + + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/tls" +) + +// mqtt v5-specific publish properties. +// See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109 +type PublishProperties struct { + ContentType string `toml:"content_type"` + ResponseTopic string `toml:"response_topic"` + MessageExpiry config.Duration `toml:"message_expiry"` + TopicAlias *uint16 `toml:"topic_alias"` + UserProperties map[string]string `toml:"user_properties"` +} + +type MqttConfig struct { + Servers []string `toml:"servers"` + Protocol string `toml:"protocol"` + Username config.Secret `toml:"username"` + Password config.Secret `toml:"password"` + Timeout config.Duration `toml:"timeout"` + ConnectionTimeout config.Duration `toml:"connection_timeout"` + QoS int `toml:"qos"` + ClientID string `toml:"client_id"` + Retain bool `toml:"retain"` + KeepAlive int64 `toml:"keep_alive"` + PersistentSession bool `toml:"persistent_session"` + PublishPropertiesV5 *PublishProperties `toml:"v5"` + + tls.ClientConfig + + AutoReconnect bool `toml:"-"` + OnConnectionLost func(error) `toml:"-"` +} + +// Client is a protocol neutral MQTT client for connecting, +// disconnecting, and publishing data to a topic. +// The protocol specific clients must implement this interface +type Client interface { + Connect() (bool, error) + Publish(topic string, data []byte) error + SubscribeMultiple(filters map[string]byte, callback paho.MessageHandler) error + AddRoute(topic string, callback paho.MessageHandler) + Close() error +} + +func NewClient(cfg *MqttConfig) (Client, error) { + if len(cfg.Servers) == 0 { + return nil, errors.New("no servers specified") + } + + if cfg.PersistentSession && cfg.ClientID == "" { + return nil, errors.New("persistent_session requires client_id") + } + + if cfg.QoS > 2 || cfg.QoS < 0 { + return nil, fmt.Errorf("invalid QoS value %d; must be 0, 1 or 2", cfg.QoS) + } + + switch cfg.Protocol { + case "", "3.1.1": + return NewMQTTv311Client(cfg) + case "5": + return NewMQTTv5Client(cfg) + } + return nil, fmt.Errorf("unsuported protocol %q: must be \"3.1.1\" or \"5\"", cfg.Protocol) +} + +func parseServers(servers []string) ([]*url.URL, error) { + urls := make([]*url.URL, 0, len(servers)) + for _, svr := range servers { + // Preserve support for host:port style servers; deprecated in Telegraf 1.4.4 + if !strings.Contains(svr, "://") { + urls = append(urls, &url.URL{Scheme: "tcp", Host: svr}) + continue + } + + u, err := url.Parse(svr) + if err != nil { + return nil, err + } + urls = append(urls, u) + } + return urls, nil +} diff --git a/plugins/common/mqtt/mqtt_test.go b/plugins/common/mqtt/mqtt_test.go new file mode 100644 index 000000000..dd1150d5e --- /dev/null +++ b/plugins/common/mqtt/mqtt_test.go @@ -0,0 +1,26 @@ +package mqtt + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// Test that default client has random ID +func TestRandomClientID(t *testing.T) { + var err error + + cfg := &MqttConfig{ + Servers: []string{"tcp://localhost:1883"}, + } + + client1, err := NewMQTTv311Client(cfg) + require.NoError(t, err) + + client2, err := NewMQTTv311Client(cfg) + require.NoError(t, err) + + options1 := client1.client.OptionsReader() + options2 := client2.client.OptionsReader() + require.NotEqual(t, options1.ClientID(), options2.ClientID()) +} diff --git a/plugins/common/mqtt/mqtt_v3.go b/plugins/common/mqtt/mqtt_v3.go new file mode 100644 index 000000000..b0d61e04c --- /dev/null +++ b/plugins/common/mqtt/mqtt_v3.go @@ -0,0 +1,129 @@ +package mqtt + +import ( + "fmt" + "time" + + mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1 + + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" +) + +type mqttv311Client struct { + client mqttv3.Client + timeout time.Duration + qos int + retain bool +} + +func NewMQTTv311Client(cfg *MqttConfig) (*mqttv311Client, error) { + opts := mqttv3.NewClientOptions() + opts.KeepAlive = cfg.KeepAlive + opts.WriteTimeout = time.Duration(cfg.Timeout) + if time.Duration(cfg.ConnectionTimeout) >= 1*time.Second { + opts.ConnectTimeout = time.Duration(cfg.ConnectionTimeout) + } + opts.SetCleanSession(!cfg.PersistentSession) + if cfg.OnConnectionLost != nil { + onConnectionLost := func(_ mqttv3.Client, err error) { + cfg.OnConnectionLost(err) + } + opts.SetConnectionLostHandler(onConnectionLost) + } + opts.SetAutoReconnect(cfg.AutoReconnect) + + if cfg.ClientID != "" { + opts.SetClientID(cfg.ClientID) + } else { + id, err := internal.RandomString(5) + if err != nil { + return nil, fmt.Errorf("generating random client ID failed: %w", err) + } + opts.SetClientID("Telegraf-Output-" + id) + } + + tlsCfg, err := cfg.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + opts.SetTLSConfig(tlsCfg) + + if !cfg.Username.Empty() { + user, err := cfg.Username.Get() + if err != nil { + return nil, fmt.Errorf("getting username failed: %w", err) + } + opts.SetUsername(string(user)) + config.ReleaseSecret(user) + } + if !cfg.Password.Empty() { + password, err := cfg.Password.Get() + if err != nil { + return nil, fmt.Errorf("getting password failed: %w", err) + } + opts.SetPassword(string(password)) + config.ReleaseSecret(password) + } + + servers, err := parseServers(cfg.Servers) + if err != nil { + return nil, err + } + for _, server := range servers { + if tlsCfg != nil { + server.Scheme = "tls" + } + broker := server.String() + opts.AddBroker(broker) + } + + return &mqttv311Client{ + client: mqttv3.NewClient(opts), + timeout: time.Duration(cfg.Timeout), + qos: cfg.QoS, + retain: cfg.Retain, + }, nil +} + +func (m *mqttv311Client) Connect() (bool, error) { + token := m.client.Connect() + + if token.Wait() && token.Error() != nil { + return false, token.Error() + } + + // Persistent sessions should skip subscription if a session is present, as + // the subscriptions are stored by the server. + type sessionPresent interface { + SessionPresent() bool + } + if t, ok := token.(sessionPresent); ok { + return t.SessionPresent(), nil + } + + return false, nil +} + +func (m *mqttv311Client) Publish(topic string, body []byte) error { + token := m.client.Publish(topic, byte(m.qos), m.retain, body) + token.WaitTimeout(m.timeout) + return token.Error() +} + +func (m *mqttv311Client) SubscribeMultiple(filters map[string]byte, callback mqttv3.MessageHandler) error { + token := m.client.SubscribeMultiple(filters, callback) + token.Wait() + return token.Error() +} + +func (m *mqttv311Client) AddRoute(topic string, callback mqttv3.MessageHandler) { + m.client.AddRoute(topic, callback) +} + +func (m *mqttv311Client) Close() error { + if m.client.IsConnected() { + m.client.Disconnect(20) + } + return nil +} diff --git a/plugins/common/mqtt/mqtt_v5.go b/plugins/common/mqtt/mqtt_v5.go new file mode 100644 index 000000000..e6b25dd57 --- /dev/null +++ b/plugins/common/mqtt/mqtt_v5.go @@ -0,0 +1,152 @@ +package mqtt + +import ( + "context" + "fmt" + "net/url" + "time" + + mqttv5auto "github.com/eclipse/paho.golang/autopaho" + mqttv5 "github.com/eclipse/paho.golang/paho" + paho "github.com/eclipse/paho.mqtt.golang" + + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" +) + +type mqttv5Client struct { + client *mqttv5auto.ConnectionManager + options mqttv5auto.ClientConfig + timeout time.Duration + qos int + retain bool + properties *mqttv5.PublishProperties +} + +func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) { + opts := mqttv5auto.ClientConfig{ + KeepAlive: uint16(cfg.KeepAlive), + OnConnectError: cfg.OnConnectionLost, + } + opts.SetConnectPacketConfigurator(func(c *mqttv5.Connect) *mqttv5.Connect { + c.CleanStart = cfg.PersistentSession + return c + }) + + if time.Duration(cfg.ConnectionTimeout) >= 1*time.Second { + opts.ConnectTimeout = time.Duration(cfg.ConnectionTimeout) + } + + if cfg.ClientID != "" { + opts.ClientID = cfg.ClientID + } else { + id, err := internal.RandomString(5) + if err != nil { + return nil, fmt.Errorf("generating random client ID failed: %w", err) + } + opts.ClientID = "Telegraf-Output-" + id + } + + user, err := cfg.Username.Get() + if err != nil { + return nil, fmt.Errorf("getting username failed: %w", err) + } + pass, err := cfg.Password.Get() + if err != nil { + config.ReleaseSecret(user) + return nil, fmt.Errorf("getting password failed: %w", err) + } + opts.SetUsernamePassword(string(user), pass) + config.ReleaseSecret(user) + config.ReleaseSecret(pass) + + tlsCfg, err := cfg.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + if tlsCfg != nil { + opts.TlsCfg = tlsCfg + } + + brokers := make([]*url.URL, 0) + servers, err := parseServers(cfg.Servers) + if err != nil { + return nil, err + } + + for _, server := range servers { + if tlsCfg != nil { + server.Scheme = "tls" + } + brokers = append(brokers, server) + } + opts.BrokerUrls = brokers + + // Build the v5 specific publish properties if they are present in the config. + // These should not change during the lifecycle of the client. + var properties *mqttv5.PublishProperties + if cfg.PublishPropertiesV5 != nil { + properties = &mqttv5.PublishProperties{ + ContentType: cfg.PublishPropertiesV5.ContentType, + ResponseTopic: cfg.PublishPropertiesV5.ResponseTopic, + TopicAlias: cfg.PublishPropertiesV5.TopicAlias, + } + + messageExpiry := time.Duration(cfg.PublishPropertiesV5.MessageExpiry) + if expirySeconds := uint32(messageExpiry.Seconds()); expirySeconds > 0 { + properties.MessageExpiry = &expirySeconds + } + + properties.User = make([]mqttv5.UserProperty, 0, len(cfg.PublishPropertiesV5.UserProperties)) + for k, v := range cfg.PublishPropertiesV5.UserProperties { + properties.User.Add(k, v) + } + } + + return &mqttv5Client{ + options: opts, + timeout: time.Duration(cfg.Timeout), + qos: cfg.QoS, + retain: cfg.Retain, + properties: properties, + }, nil +} + +func (m *mqttv5Client) Connect() (bool, error) { + client, err := mqttv5auto.NewConnection(context.Background(), m.options) + if err != nil { + return false, err + } + m.client = client + return false, client.AwaitConnection(context.Background()) +} + +func (m *mqttv5Client) Publish(topic string, body []byte) error { + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + defer cancel() + + _, err := m.client.Publish(ctx, &mqttv5.Publish{ + Topic: topic, + QoS: byte(m.qos), + Retain: m.retain, + Payload: body, + Properties: m.properties, + }) + + return err +} + +func (m *mqttv5Client) SubscribeMultiple(filters map[string]byte, callback paho.MessageHandler) error { + _, _ = filters, callback + panic("not implemented") +} + +func (m *mqttv5Client) AddRoute(topic string, callback paho.MessageHandler) { + _, _ = topic, callback + panic("not implemented") +} + +func (m *mqttv5Client) Close() error { + return m.client.Disconnect(context.Background()) +} diff --git a/plugins/inputs/mqtt_consumer/testdata/mosquitto.conf b/plugins/inputs/mqtt_consumer/testdata/mosquitto.conf new file mode 100644 index 000000000..60c189a8c --- /dev/null +++ b/plugins/inputs/mqtt_consumer/testdata/mosquitto.conf @@ -0,0 +1,3 @@ +listener 1883 0.0.0.0 +allow_anonymous true +connection_messages true diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 84226cb9e..7cb410286 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -4,14 +4,14 @@ package mqtt import ( // Blank import to support go:embed compile directive _ "embed" + "errors" "fmt" - "net/url" - "strings" "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/common/mqtt" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" ) @@ -19,74 +19,54 @@ import ( //go:embed sample.conf var sampleConfig string -const ( - defaultKeepAlive = 30 -) - type MQTT struct { - Servers []string `toml:"servers"` - Protocol string `toml:"protocol"` - Username config.Secret `toml:"username"` - Password config.Secret `toml:"password"` - Database string - Timeout config.Duration `toml:"timeout"` - TopicPrefix string `toml:"topic_prefix" deprecated:"1.25.0;use 'topic' instead"` - Topic string `toml:"topic"` - QoS int `toml:"qos"` - ClientID string `toml:"client_id"` - tls.ClientConfig - BatchMessage bool `toml:"batch"` - Retain bool `toml:"retain"` - KeepAlive int64 `toml:"keep_alive"` - V5PublishProperties *mqttv5PublishProperties `toml:"v5"` - Log telegraf.Logger `toml:"-"` + TopicPrefix string `toml:"topic_prefix" deprecated:"1.25.0;use 'topic' instead"` + Topic string `toml:"topic"` + BatchMessage bool `toml:"batch"` + Log telegraf.Logger `toml:"-"` + mqtt.MqttConfig - client Client + client mqtt.Client serializer serializers.Serializer generator *TopicNameGenerator sync.Mutex } -// Client is a protocol neutral MQTT client for connecting, -// disconnecting, and publishing data to a topic. -// The protocol specific clients must implement this interface -type Client interface { - Connect() error - Publish(topic string, data []byte) error - Close() error -} - func (*MQTT) SampleConfig() string { return sampleConfig } func (m *MQTT) Init() error { + if len(m.Servers) == 0 { + return errors.New("no servers specified") + } + + if m.PersistentSession && m.ClientID == "" { + return errors.New("persistent_session requires client_id") + } + + if m.QoS > 2 || m.QoS < 0 { + return fmt.Errorf("qos value must be 0, 1, or 2: %d", m.QoS) + } + var err error m.generator, err = NewTopicNameGenerator(m.TopicPrefix, m.Topic) - if err != nil { - return err - } - return nil + return err } func (m *MQTT) Connect() error { m.Lock() defer m.Unlock() - if m.QoS > 2 || m.QoS < 0 { - return fmt.Errorf("MQTT Output, invalid QoS value: %d", m.QoS) - } - switch m.Protocol { - case "", "3.1.1": - m.client = newMQTTv311Client(m) - case "5": - m.client = newMQTTv5Client(m) - default: - return fmt.Errorf("unsuported protocol %q: must be \"3.1.1\" or \"5\"", m.Protocol) + client, err := mqtt.NewClient(&m.MqttConfig) + if err != nil { + return err } + m.client = client - return m.client.Connect() + _, err = m.client.Connect() + return err } func (m *MQTT) SetSerializer(serializer serializers.Serializer) { @@ -146,26 +126,14 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { return nil } -func parseServers(servers []string) ([]*url.URL, error) { - urls := make([]*url.URL, 0, len(servers)) - for _, svr := range servers { - if !strings.Contains(svr, "://") { - urls = append(urls, &url.URL{Scheme: "tcp", Host: svr}) - } else { - u, err := url.Parse(svr) - if err != nil { - return nil, err - } - urls = append(urls, u) - } - } - return urls, nil -} - func init() { outputs.Add("mqtt", func() telegraf.Output { return &MQTT{ - KeepAlive: defaultKeepAlive, + MqttConfig: mqtt.MqttConfig{ + KeepAlive: 30, + Timeout: config.Duration(5 * time.Second), + AutoReconnect: true, + }, } }) } diff --git a/plugins/outputs/mqtt/mqtt_test.go b/plugins/outputs/mqtt/mqtt_test.go index a13d7617e..e4e3b7c7c 100644 --- a/plugins/outputs/mqtt/mqtt_test.go +++ b/plugins/outputs/mqtt/mqtt_test.go @@ -6,11 +6,15 @@ import ( "testing" "time" + paho "github.com/eclipse/paho.mqtt.golang" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/common/mqtt" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" ) @@ -45,9 +49,12 @@ func TestConnectAndWriteIntegration(t *testing.T) { var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) s := serializers.NewInfluxSerializer() m := &MQTT{ - Servers: []string{url}, + MqttConfig: mqtt.MqttConfig{ + Servers: []string{url}, + KeepAlive: 30, + Timeout: config.Duration(5 * time.Second), + }, serializer: s, - KeepAlive: 30, Log: testutil.Logger{Name: "mqtt-default-integration-test"}, } @@ -72,10 +79,13 @@ func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) { var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) s := serializers.NewInfluxSerializer() m := &MQTT{ - Servers: []string{url}, - Protocol: "3.1.1", + MqttConfig: mqtt.MqttConfig{ + Servers: []string{url}, + Protocol: "3.1.1", + KeepAlive: 30, + Timeout: config.Duration(5 * time.Second), + }, serializer: s, - KeepAlive: 30, Log: testutil.Logger{Name: "mqttv311-integration-test"}, } @@ -98,11 +108,127 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) { defer container.Terminate() var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) - s := serializers.NewInfluxSerializer() + m := &MQTT{ + MqttConfig: mqtt.MqttConfig{ + Servers: []string{url}, + Protocol: "5", + KeepAlive: 30, + Timeout: config.Duration(5 * time.Second), + }, + serializer: serializers.NewInfluxSerializer(), + Log: testutil.Logger{Name: "mqttv5-integration-test"}, + } + + // Verify that we can connect to the MQTT broker + require.NoError(t, m.Init()) + require.NoError(t, m.Connect()) + + // Verify that we can successfully write data to the mqtt broker + require.NoError(t, m.Write(testutil.MockMetrics())) +} + +func TestIntegrationMQTTv3(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf")) + require.NoError(t, err, "missing file mosquitto.conf") + + container := testutil.Container{ + Image: "eclipse-mosquitto:2", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForListeningPort(servicePort), + BindMounts: map[string]string{ + "/mosquitto/config/mosquitto.conf": conf, + }, + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // Setup the parser / serializer pair + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + serializer := serializers.NewInfluxSerializer() + + // Setup the plugin + url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) + topic := "testv3" + plugin := &MQTT{ + MqttConfig: mqtt.MqttConfig{ + Servers: []string{url}, + KeepAlive: 30, + Timeout: config.Duration(5 * time.Second), + AutoReconnect: true, + }, + Topic: topic + "/{{.PluginName}}", + Log: testutil.Logger{Name: "mqttv3-integration-test"}, + } + plugin.SetSerializer(serializer) + require.NoError(t, plugin.Init()) + + // Prepare the receiver message + var acc testutil.Accumulator + onMessage := func(_ paho.Client, msg paho.Message) { + metrics, err := parser.Parse(msg.Payload()) + if err != nil { + acc.AddError(err) + return + } + + for _, m := range metrics { + m.AddTag("topic", msg.Topic()) + acc.AddMetric(m) + } + } + + // Startup the plugin and subscribe to the topic + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Add routing for the messages + subscriptionPattern := topic + "/#" + plugin.client.AddRoute(subscriptionPattern, onMessage) + + // Subscribe to the topic + topics := map[string]byte{subscriptionPattern: byte(plugin.QoS)} + require.NoError(t, plugin.client.SubscribeMultiple(topics, onMessage)) + + // Setup and execute the test case + input := make([]telegraf.Metric, 0, 3) + expected := make([]telegraf.Metric, 0, len(input)) + for i := 0; i < cap(input); i++ { + name := fmt.Sprintf("test%d", i) + m := testutil.TestMetric(i, name) + input = append(input, m) + + e := m.Copy() + e.AddTag("topic", topic+"/"+name) + expected = append(expected, e) + } + require.NoError(t, plugin.Write(input)) + + // Verify the result + require.Eventually(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, time.Second, 100*time.Millisecond) + require.NoError(t, plugin.Close()) + + require.Empty(t, acc.Errors) + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +func TestMQTTv5Properties(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer container.Terminate() tests := []struct { name string - properties *mqttv5PublishProperties + properties *mqtt.PublishProperties }{ { name: "no publish properties", @@ -110,46 +236,61 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) { }, { name: "content type set", - properties: &mqttv5PublishProperties{ContentType: "text/plain"}, + properties: &mqtt.PublishProperties{ContentType: "text/plain"}, }, { name: "response topic set", - properties: &mqttv5PublishProperties{ResponseTopic: "test/topic"}, + properties: &mqtt.PublishProperties{ResponseTopic: "test/topic"}, }, { name: "message expiry set", - properties: &mqttv5PublishProperties{MessageExpiry: config.Duration(10 * time.Minute)}, + properties: &mqtt.PublishProperties{MessageExpiry: config.Duration(10 * time.Minute)}, }, { name: "topic alias set", - properties: &mqttv5PublishProperties{TopicAlias: new(uint16)}, + properties: &mqtt.PublishProperties{TopicAlias: new(uint16)}, }, { name: "user properties set", - properties: &mqttv5PublishProperties{UserProperties: map[string]string{"key": "value"}}, + properties: &mqtt.PublishProperties{UserProperties: map[string]string{"key": "value"}}, }, } + + topic := "testv3" + url := fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - m := &MQTT{ - Servers: []string{url}, - Protocol: "5", - serializer: s, - KeepAlive: 30, - Log: testutil.Logger{Name: "mqttv5-integration-test"}, - V5PublishProperties: tt.properties, + plugin := &MQTT{ + MqttConfig: mqtt.MqttConfig{ + Servers: []string{url}, + Protocol: "5", + KeepAlive: 30, + Timeout: config.Duration(5 * time.Second), + AutoReconnect: true, + }, + Topic: topic, + Log: testutil.Logger{Name: "mqttv5-integration-test"}, } + // Setup the metric serializer + serializer := serializers.NewInfluxSerializer() + plugin.SetSerializer(serializer) + // Verify that we can connect to the MQTT broker - require.NoError(t, m.Init()) - require.NoError(t, m.Connect()) + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) // Verify that we can successfully write data to the mqtt broker - require.NoError(t, m.Write(testutil.MockMetrics())) + require.NoError(t, plugin.Write(testutil.MockMetrics())) }) } } +func TestMissingServers(t *testing.T) { + plugin := &MQTT{} + require.ErrorContains(t, plugin.Init(), "no servers specified") +} + func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) { tests := []struct { name string @@ -176,6 +317,9 @@ func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) { t.Run(tt.name, func(t *testing.T) { m := &MQTT{ Topic: tt.topic, + MqttConfig: mqtt.MqttConfig{ + Servers: []string{"tcp://localhost:1883"}, + }, } err := m.Init() if tt.expectedError != "" { @@ -190,9 +334,12 @@ func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) { func TestGenerateTopicName(t *testing.T) { s := serializers.NewInfluxSerializer() m := &MQTT{ - Servers: []string{"tcp://localhost:502"}, + MqttConfig: mqtt.MqttConfig{ + Servers: []string{"tcp://localhost:1883"}, + KeepAlive: 30, + Timeout: config.Duration(5 * time.Second), + }, serializer: s, - KeepAlive: 30, Log: testutil.Logger{}, } tests := []struct { diff --git a/plugins/outputs/mqtt/mqtt_v3.go b/plugins/outputs/mqtt/mqtt_v3.go deleted file mode 100644 index 765e057ef..000000000 --- a/plugins/outputs/mqtt/mqtt_v3.go +++ /dev/null @@ -1,104 +0,0 @@ -package mqtt - -import ( - "fmt" - "time" - - mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1 - - "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/internal" -) - -type mqttv311Client struct { - *MQTT - client mqttv3.Client -} - -func newMQTTv311Client(cfg *MQTT) *mqttv311Client { - return &mqttv311Client{MQTT: cfg} -} - -func (m *mqttv311Client) Connect() error { - opts := mqttv3.NewClientOptions() - opts.KeepAlive = m.KeepAlive - - if m.Timeout < config.Duration(time.Second) { - m.Timeout = config.Duration(5 * time.Second) - } - opts.WriteTimeout = time.Duration(m.Timeout) - - if m.ClientID != "" { - opts.SetClientID(m.ClientID) - } else { - randomString, err := internal.RandomString(5) - if err != nil { - return fmt.Errorf("generating random string for client ID failed: %w", err) - } - opts.SetClientID("Telegraf-Output-" + randomString) - } - - tlsCfg, err := m.ClientConfig.TLSConfig() - if err != nil { - return err - } - opts.SetTLSConfig(tlsCfg) - - if !m.Username.Empty() { - user, err := m.Username.Get() - if err != nil { - return fmt.Errorf("getting username failed: %w", err) - } - opts.SetUsername(string(user)) - config.ReleaseSecret(user) - } - if !m.Password.Empty() { - password, err := m.Password.Get() - if err != nil { - return fmt.Errorf("getting password failed: %w", err) - } - opts.SetPassword(string(password)) - config.ReleaseSecret(password) - } - - if len(m.Servers) == 0 { - return fmt.Errorf("could not get server informations") - } - - servers, err := parseServers(m.Servers) - if err != nil { - return err - } - for _, server := range servers { - if tlsCfg != nil { - server.Scheme = "tls" - } - broker := server.String() - opts.AddBroker(broker) - m.MQTT.Log.Debugf("registered mqtt broker: %v", broker) - } - - opts.SetAutoReconnect(true) - m.client = mqttv3.NewClient(opts) - if token := m.client.Connect(); token.Wait() && token.Error() != nil { - return token.Error() - } - - return nil -} - -func (m *mqttv311Client) Publish(topic string, body []byte) error { - token := m.client.Publish(topic, byte(m.QoS), m.Retain, body) - token.WaitTimeout(time.Duration(m.Timeout)) - if token.Error() != nil { - return token.Error() - } - return nil -} - -func (m *mqttv311Client) Close() error { - if m.client.IsConnected() { - m.client.Disconnect(20) - } - return nil -} diff --git a/plugins/outputs/mqtt/mqtt_v5.go b/plugins/outputs/mqtt/mqtt_v5.go deleted file mode 100644 index b80282d34..000000000 --- a/plugins/outputs/mqtt/mqtt_v5.go +++ /dev/null @@ -1,150 +0,0 @@ -package mqtt - -import ( - "context" - "fmt" - "net/url" - "time" - - mqttv5auto "github.com/eclipse/paho.golang/autopaho" - mqttv5 "github.com/eclipse/paho.golang/paho" - - "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/internal" -) - -// mqtt v5-specific publish properties. -// See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109 -type mqttv5PublishProperties struct { - ContentType string `toml:"content_type"` - ResponseTopic string `toml:"response_topic"` - MessageExpiry config.Duration `toml:"message_expiry"` - TopicAlias *uint16 `toml:"topic_alias"` - UserProperties map[string]string `toml:"user_properties"` -} - -type mqttv5Client struct { - *MQTT - client *mqttv5auto.ConnectionManager - publishProperties *mqttv5.PublishProperties -} - -func newMQTTv5Client(cfg *MQTT) *mqttv5Client { - return &mqttv5Client{ - MQTT: cfg, - publishProperties: buildPublishProperties(cfg), - } -} - -// Build the v5 specific publish properties if they are present in the -// config. -// These should not change during the lifecycle of the client. -func buildPublishProperties(cfg *MQTT) *mqttv5.PublishProperties { - if cfg.V5PublishProperties == nil { - return nil - } - - publishProperties := &mqttv5.PublishProperties{ - ContentType: cfg.V5PublishProperties.ContentType, - ResponseTopic: cfg.V5PublishProperties.ResponseTopic, - TopicAlias: cfg.V5PublishProperties.TopicAlias, - User: make([]mqttv5.UserProperty, 0, len(cfg.V5PublishProperties.UserProperties)), - } - - messageExpiry := time.Duration(cfg.V5PublishProperties.MessageExpiry) - if expirySeconds := uint32(messageExpiry.Seconds()); expirySeconds > 0 { - publishProperties.MessageExpiry = &expirySeconds - } - - for k, v := range cfg.V5PublishProperties.UserProperties { - publishProperties.User.Add(k, v) - } - - return publishProperties -} - -func (m *mqttv5Client) Connect() error { - opts := mqttv5auto.ClientConfig{} - if m.ClientID != "" { - opts.ClientID = m.ClientID - } else { - randomString, err := internal.RandomString(5) - if err != nil { - return fmt.Errorf("generating random string for client ID failed: %w", err) - } - opts.ClientID = "Telegraf-Output-" + randomString - } - - user, err := m.Username.Get() - if err != nil { - return fmt.Errorf("getting username failed: %w", err) - } - pass, err := m.Password.Get() - if err != nil { - config.ReleaseSecret(user) - return fmt.Errorf("getting password failed: %w", err) - } - opts.SetUsernamePassword(string(user), pass) - config.ReleaseSecret(user) - config.ReleaseSecret(pass) - - tlsCfg, err := m.ClientConfig.TLSConfig() - if err != nil { - return err - } - - if tlsCfg != nil { - opts.TlsCfg = tlsCfg - } - - if len(m.Servers) == 0 { - return fmt.Errorf("could not get host informations") - } - - brokers := make([]*url.URL, 0) - servers, err := parseServers(m.Servers) - if err != nil { - return err - } - - for _, server := range servers { - if tlsCfg != nil { - server.Scheme = "tls" - } - brokers = append(brokers, server) - m.MQTT.Log.Debugf("registered mqtt broker: %s", server.String()) - } - opts.BrokerUrls = brokers - - opts.KeepAlive = uint16(m.KeepAlive) - if m.Timeout < config.Duration(time.Second) { - m.Timeout = config.Duration(5 * time.Second) - } - - client, err := mqttv5auto.NewConnection(context.Background(), opts) - if err != nil { - return err - } - m.client = client - return client.AwaitConnection(context.Background()) -} - -func (m *mqttv5Client) Publish(topic string, body []byte) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(m.Timeout)) - defer cancel() - _, err := m.client.Publish(ctx, &mqttv5.Publish{ - Topic: topic, - QoS: byte(m.QoS), - Retain: m.Retain, - Payload: body, - Properties: m.publishProperties, - }) - if err != nil { - return err - } - return nil -} - -func (m *mqttv5Client) Close() error { - return m.client.Disconnect(context.Background()) -}