diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index ed533ce6a..934abc11d 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -95,6 +95,7 @@ following works: - github.com/eapache/go-resiliency [MIT License](https://github.com/eapache/go-resiliency/blob/master/LICENSE) - github.com/eapache/go-xerial-snappy [MIT License](https://github.com/eapache/go-xerial-snappy/blob/master/LICENSE) - github.com/eapache/queue [MIT License](https://github.com/eapache/queue/blob/master/LICENSE) +- github.com/eclipse/paho.golang [Eclipse Public License - v 2.0](https://github.com/eclipse/paho.golang/blob/master/LICENSE) - github.com/eclipse/paho.mqtt.golang [Eclipse Public License - v 1.0](https://github.com/eclipse/paho.mqtt.golang/blob/master/LICENSE) - github.com/emicklei/go-restful [MIT License](https://github.com/emicklei/go-restful/blob/v3/LICENSE) - github.com/fatih/color [MIT License](https://github.com/fatih/color/blob/master/LICENSE.md) diff --git a/go.mod b/go.mod index 1ca758b01..67af67d5e 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,7 @@ require ( github.com/docker/go-connections v0.4.0 github.com/doclambda/protobufquery v0.0.0-20210317203640-88ffabe06a60 github.com/dynatrace-oss/dynatrace-metric-utils-go v0.5.0 + github.com/eclipse/paho.golang v0.10.0 github.com/eclipse/paho.mqtt.golang v1.3.5 github.com/fatih/color v1.13.0 github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 diff --git a/go.sum b/go.sum index a3ca8414b..724d70ba8 100644 --- a/go.sum +++ b/go.sum @@ -706,6 +706,8 @@ github.com/echlebek/crock v1.0.1 h1:KbzamClMIfVIkkjq/GTXf+N16KylYBpiaTitO3f1ujg= github.com/echlebek/crock v1.0.1/go.mod h1:/kvwHRX3ZXHj/kHWJkjXDmzzRow54EJuHtQ/PapL/HI= github.com/echlebek/timeproxy v1.0.0 h1:V41/v8tmmMDNMA2GrBPI45nlXb3F7+OY+nJz1BqKsCk= github.com/echlebek/timeproxy v1.0.0/go.mod h1:0dg2Lnb8no/jFwoMQKMTU6iAivgoMptGqSTprhnrRtk= +github.com/eclipse/paho.golang v0.10.0 h1:oUGPjRwWcZQRgDD9wVDV7y7i7yBSxts3vcvcNJo8B4Q= +github.com/eclipse/paho.golang v0.10.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs= github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y= github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= diff --git a/plugins/outputs/mqtt/README.md b/plugins/outputs/mqtt/README.md index 32142bd3e..2b74fe07e 100644 --- a/plugins/outputs/mqtt/README.md +++ b/plugins/outputs/mqtt/README.md @@ -1,7 +1,7 @@ # MQTT Producer Output Plugin This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt -Producer. +Producer. It supports MQTT protocols `3.1.1` and `5`. ## Mosquitto v2.0.12+ and `identifier rejected` @@ -10,7 +10,7 @@ In v2.0.12+ of the mosquitto MQTT server, there is a `keep_alive` value to be set non-zero in your telegraf configuration. If not set, the server will return with `identifier rejected`. -As a reference `eclipse/paho.mqtt.golang` sets the `keep_alive` to 30. +As a reference `eclipse/paho.golang` sets the `keep_alive` to 30. ## Configuration @@ -19,9 +19,14 @@ As a reference `eclipse/paho.mqtt.golang` sets the `keep_alive` to 30. [[outputs.mqtt]] ## MQTT Brokers ## The list of brokers should only include the hostname or IP address and the - ## port to the broker. This should follow the format '{host}:{port}'. For - ## example, "localhost:1883" or "127.0.0.1:8883". - servers = ["localhost:1883"] + ## port to the broker. This should follow the format `[{scheme}://]{host}:{port}`. For + ## example, `localhost:1883` or `mqtt://localhost:1883`. + ## Scheme can be any of the following: tcp://, mqtt://, tls://, mqtts:// + ## non-TLS and TLS servers can not be mix-and-matched. + servers = ["localhost:1883", ] # or ["mqtts://tls.example.com:1883"] + + ## Protocol can be `3.1.1` or `5`. Default is `3.1.1` + # procotol = "3.1.1" ## MQTT Topic for Producer Messages ## MQTT outputs send metrics to this topic format: diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 749fcec42..5cd5c9e58 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -2,17 +2,15 @@ package mqtt import ( + // Blank import to support go:embed compile directive _ "embed" "fmt" + "net/url" "strings" "sync" - "time" - - paho "github.com/eclipse/paho.mqtt.golang" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" @@ -23,55 +21,61 @@ import ( var sampleConfig string const ( - defaultKeepAlive = 0 + defaultKeepAlive = 30 ) type MQTT struct { Servers []string `toml:"servers"` - Username string - Password string + Protocol string `toml:"protocol"` + Username string `toml:"username"` + Password string `toml:"password"` Database string - Timeout config.Duration - TopicPrefix string - QoS int `toml:"qos"` - ClientID string `toml:"client_id"` + Timeout config.Duration `toml:"timeout"` + TopicPrefix string `toml:"topic_prefix"` + QoS int `toml:"qos"` + ClientID string `toml:"client_id"` tls.ClientConfig BatchMessage bool `toml:"batch"` Retain bool `toml:"retain"` KeepAlive int64 `toml:"keep_alive"` Log telegraf.Logger `toml:"-"` - client paho.Client - opts *paho.ClientOptions - + client Client serializer serializers.Serializer sync.Mutex } +// Client is a protocol neutral MQTT client for connecting, +// disconnecting, and publishing data to a topic. +// The protocol specific clients must implement this interface +type Client interface { + Connect() error + Publish(topic string, data []byte) error + Close() error +} + func (*MQTT) SampleConfig() string { return sampleConfig } func (m *MQTT) Connect() error { - var err error m.Lock() defer m.Unlock() if m.QoS > 2 || m.QoS < 0 { return fmt.Errorf("MQTT Output, invalid QoS value: %d", m.QoS) } - m.opts, err = m.createOpts() - if err != nil { - return err + switch m.Protocol { + case "", "3.1.1": + m.client = newMQTTv311Client(m) + case "5": + m.client = newMQTTv5Client(m) + default: + return fmt.Errorf("unsuported protocol %q: must be \"3.1.1\" or \"5\"", m.Protocol) } - m.client = paho.NewClient(m.opts) - if token := m.client.Connect(); token.Wait() && token.Error() != nil { - return token.Error() - } - - return nil + return m.client.Connect() } func (m *MQTT) SetSerializer(serializer serializers.Serializer) { @@ -79,10 +83,7 @@ func (m *MQTT) SetSerializer(serializer serializers.Serializer) { } func (m *MQTT) Close() error { - if m.client.IsConnected() { - m.client.Disconnect(20) - } - return nil + return m.client.Close() } func (m *MQTT) Write(metrics []telegraf.Metric) error { @@ -119,7 +120,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { continue } - err = m.publish(topic, buf) + err = m.client.Publish(topic, buf) if err != nil { return fmt.Errorf("could not write to MQTT server, %s", err) } @@ -132,69 +133,29 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { if err != nil { return err } - publisherr := m.publish(key, buf) - if publisherr != nil { - return fmt.Errorf("could not write to MQTT server, %s", publisherr) + err = m.client.Publish(key, buf) + if err != nil { + return fmt.Errorf("could not write to MQTT server, %s", err) } } return nil } -func (m *MQTT) publish(topic string, body []byte) error { - token := m.client.Publish(topic, byte(m.QoS), m.Retain, body) - token.WaitTimeout(time.Duration(m.Timeout)) - if token.Error() != nil { - return token.Error() +func parseServers(servers []string) ([]*url.URL, error) { + urls := make([]*url.URL, 0, len(servers)) + for _, svr := range servers { + if !strings.Contains(svr, "://") { + urls = append(urls, &url.URL{Scheme: "tcp", Host: svr}) + } else { + u, err := url.Parse(svr) + if err != nil { + return nil, err + } + urls = append(urls, u) + } } - return nil -} - -func (m *MQTT) createOpts() (*paho.ClientOptions, error) { - opts := paho.NewClientOptions() - opts.KeepAlive = m.KeepAlive - - if m.Timeout < config.Duration(time.Second) { - m.Timeout = config.Duration(5 * time.Second) - } - opts.WriteTimeout = time.Duration(m.Timeout) - - if m.ClientID != "" { - opts.SetClientID(m.ClientID) - } else { - opts.SetClientID("Telegraf-Output-" + internal.RandomString(5)) - } - - tlsCfg, err := m.ClientConfig.TLSConfig() - if err != nil { - return nil, err - } - - scheme := "tcp" - if tlsCfg != nil { - scheme = "ssl" - opts.SetTLSConfig(tlsCfg) - } - - user := m.Username - if user != "" { - opts.SetUsername(user) - } - password := m.Password - if password != "" { - opts.SetPassword(password) - } - - if len(m.Servers) == 0 { - return opts, fmt.Errorf("could not get host informations") - } - for _, host := range m.Servers { - server := fmt.Sprintf("%s://%s", scheme, host) - - opts.AddBroker(server) - } - opts.SetAutoReconnect(true) - return opts, nil + return urls, nil } func init() { diff --git a/plugins/outputs/mqtt/mqtt_test.go b/plugins/outputs/mqtt/mqtt_test.go index 1ffc063d9..1ef162ac9 100644 --- a/plugins/outputs/mqtt/mqtt_test.go +++ b/plugins/outputs/mqtt/mqtt_test.go @@ -2,9 +2,9 @@ package mqtt import ( "fmt" + "path/filepath" "testing" - "github.com/docker/go-connections/nat" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" "github.com/testcontainers/testcontainers-go/wait" @@ -12,29 +12,103 @@ import ( "github.com/stretchr/testify/require" ) +const servicePort = "1883" + +func launchTestContainer(t *testing.T) *testutil.Container { + conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf")) + require.NoError(t, err, "missing file mosquitto.conf") + + container := testutil.Container{ + Image: "eclipse-mosquitto:2", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForListeningPort(servicePort), + BindMounts: map[string]string{ + "/mosquitto/config/mosquitto.conf": conf, + }, + } + err = container.Start() + require.NoError(t, err, "failed to start container") + + return &container +} + func TestConnectAndWriteIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } - servicePort := "1883" - container := testutil.Container{ - Image: "ncarlier/mqtt", - ExposedPorts: []string{servicePort}, - WaitingFor: wait.ForListeningPort(nat.Port(servicePort)), - } - err := container.Start() - require.NoError(t, err, "failed to start container") + container := launchTestContainer(t) defer func() { require.NoError(t, container.Terminate(), "terminating container failed") }() - var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) - s, _ := serializers.NewInfluxSerializer() + s, err := serializers.NewInfluxSerializer() + require.NoError(t, err) m := &MQTT{ Servers: []string{url}, serializer: s, KeepAlive: 30, + Log: testutil.Logger{Name: "mqtt-default-integration-test"}, + } + + // Verify that we can connect to the MQTT broker + err = m.Connect() + require.NoError(t, err) + + // Verify that we can successfully write data to the mqtt broker + err = m.Write(testutil.MockMetrics()) + require.NoError(t, err) +} + +func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) + s, err := serializers.NewInfluxSerializer() + require.NoError(t, err) + m := &MQTT{ + Servers: []string{url}, + Protocol: "3.1.1", + serializer: s, + KeepAlive: 30, + Log: testutil.Logger{Name: "mqttv311-integration-test"}, + } + + // Verify that we can connect to the MQTT broker + err = m.Connect() + require.NoError(t, err) + + // Verify that we can successfully write data to the mqtt broker + err = m.Write(testutil.MockMetrics()) + require.NoError(t, err) +} + +func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := launchTestContainer(t) + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]) + s, err := serializers.NewInfluxSerializer() + require.NoError(t, err) + m := &MQTT{ + Servers: []string{url}, + Protocol: "5", + serializer: s, + KeepAlive: 30, + Log: testutil.Logger{Name: "mqttv5-integration-test"}, } // Verify that we can connect to the MQTT broker diff --git a/plugins/outputs/mqtt/mqtt_v3.go b/plugins/outputs/mqtt/mqtt_v3.go new file mode 100644 index 000000000..cc868c3ef --- /dev/null +++ b/plugins/outputs/mqtt/mqtt_v3.go @@ -0,0 +1,92 @@ +package mqtt + +import ( + "fmt" + "time" + + // Library that supports v3.1.1 + mqttv3 "github.com/eclipse/paho.mqtt.golang" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" +) + +type mqttv311Client struct { + *MQTT + client mqttv3.Client +} + +func newMQTTv311Client(cfg *MQTT) *mqttv311Client { + return &mqttv311Client{MQTT: cfg} +} + +func (m *mqttv311Client) Connect() error { + opts := mqttv3.NewClientOptions() + opts.KeepAlive = m.KeepAlive + + if m.Timeout < config.Duration(time.Second) { + m.Timeout = config.Duration(5 * time.Second) + } + opts.WriteTimeout = time.Duration(m.Timeout) + + if m.ClientID != "" { + opts.SetClientID(m.ClientID) + } else { + opts.SetClientID("Telegraf-Output-" + internal.RandomString(5)) + } + + tlsCfg, err := m.ClientConfig.TLSConfig() + if err != nil { + return err + } + opts.SetTLSConfig(tlsCfg) + + user := m.Username + if user != "" { + opts.SetUsername(user) + } + password := m.Password + if password != "" { + opts.SetPassword(password) + } + + if len(m.Servers) == 0 { + return fmt.Errorf("could not get server informations") + } + + servers, err := parseServers(m.Servers) + if err != nil { + return err + } + for _, server := range servers { + if tlsCfg != nil { + server.Scheme = "tls" + } + broker := server.String() + opts.AddBroker(broker) + m.MQTT.Log.Debugf("registered mqtt broker: %v", broker) + } + + opts.SetAutoReconnect(true) + m.client = mqttv3.NewClient(opts) + if token := m.client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + return nil +} + +func (m *mqttv311Client) Publish(topic string, body []byte) error { + token := m.client.Publish(topic, byte(m.QoS), m.Retain, body) + token.WaitTimeout(time.Duration(m.Timeout)) + if token.Error() != nil { + return token.Error() + } + return nil +} + +func (m *mqttv311Client) Close() error { + if m.client.IsConnected() { + m.client.Disconnect(20) + } + return nil +} diff --git a/plugins/outputs/mqtt/mqtt_v5.go b/plugins/outputs/mqtt/mqtt_v5.go new file mode 100644 index 000000000..818a743ad --- /dev/null +++ b/plugins/outputs/mqtt/mqtt_v5.go @@ -0,0 +1,94 @@ +package mqtt + +import ( + "context" + "fmt" + "net/url" + "time" + + mqttv5auto "github.com/eclipse/paho.golang/autopaho" + mqttv5 "github.com/eclipse/paho.golang/paho" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" +) + +type mqttv5Client struct { + *MQTT + client *mqttv5auto.ConnectionManager +} + +func newMQTTv5Client(cfg *MQTT) *mqttv5Client { + return &mqttv5Client{MQTT: cfg} +} + +func (m *mqttv5Client) Connect() error { + opts := mqttv5auto.ClientConfig{} + if m.ClientID != "" { + opts.ClientID = m.ClientID + } else { + opts.ClientID = "Telegraf-Output-" + internal.RandomString(5) + } + + user := m.Username + pass := m.Password + opts.SetUsernamePassword(user, []byte(pass)) + + tlsCfg, err := m.ClientConfig.TLSConfig() + if err != nil { + return err + } + + if tlsCfg != nil { + opts.TlsCfg = tlsCfg + } + + if len(m.Servers) == 0 { + return fmt.Errorf("could not get host informations") + } + + brokers := make([]*url.URL, 0) + servers, err := parseServers(m.Servers) + if err != nil { + return err + } + + for _, server := range servers { + if tlsCfg != nil { + server.Scheme = "tls" + } + brokers = append(brokers, server) + m.MQTT.Log.Debugf("registered mqtt broker: %s", server.String()) + } + opts.BrokerUrls = brokers + + opts.KeepAlive = uint16(m.KeepAlive) + if m.Timeout < config.Duration(time.Second) { + m.Timeout = config.Duration(5 * time.Second) + } + + client, err := mqttv5auto.NewConnection(context.Background(), opts) + if err != nil { + return err + } + m.client = client + return client.AwaitConnection(context.Background()) +} + +func (m *mqttv5Client) Publish(topic string, body []byte) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(m.Timeout)) + defer cancel() + _, err := m.client.Publish(ctx, &mqttv5.Publish{ + Topic: topic, + QoS: byte(m.QoS), + Retain: m.Retain, + Payload: body, + }) + if err != nil { + return err + } + return nil +} + +func (m *mqttv5Client) Close() error { + return m.client.Disconnect(context.Background()) +} diff --git a/plugins/outputs/mqtt/sample.conf b/plugins/outputs/mqtt/sample.conf index a090c4307..384564c6a 100644 --- a/plugins/outputs/mqtt/sample.conf +++ b/plugins/outputs/mqtt/sample.conf @@ -2,9 +2,14 @@ [[outputs.mqtt]] ## MQTT Brokers ## The list of brokers should only include the hostname or IP address and the - ## port to the broker. This should follow the format '{host}:{port}'. For - ## example, "localhost:1883" or "127.0.0.1:8883". - servers = ["localhost:1883"] + ## port to the broker. This should follow the format `[{scheme}://]{host}:{port}`. For + ## example, `localhost:1883` or `mqtt://localhost:1883`. + ## Scheme can be any of the following: tcp://, mqtt://, tls://, mqtts:// + ## non-TLS and TLS servers can not be mix-and-matched. + servers = ["localhost:1883", ] # or ["mqtts://tls.example.com:1883"] + + ## Protocol can be `3.1.1` or `5`. Default is `3.1.1` + # procotol = "3.1.1" ## MQTT Topic for Producer Messages ## MQTT outputs send metrics to this topic format: diff --git a/plugins/outputs/mqtt/testdata/mosquitto.conf b/plugins/outputs/mqtt/testdata/mosquitto.conf new file mode 100644 index 000000000..60c189a8c --- /dev/null +++ b/plugins/outputs/mqtt/testdata/mosquitto.conf @@ -0,0 +1,3 @@ +listener 1883 0.0.0.0 +allow_anonymous true +connection_messages true