feat(output.mqtt): Add support for MQTT protocol version 5 (#11284)

This commit is contained in:
Cole Mackenzie 2022-07-27 08:36:57 -07:00 committed by GitHub
parent b87d06eb69
commit af43d0183c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 342 additions and 104 deletions

View File

@ -95,6 +95,7 @@ following works:
- github.com/eapache/go-resiliency [MIT License](https://github.com/eapache/go-resiliency/blob/master/LICENSE)
- github.com/eapache/go-xerial-snappy [MIT License](https://github.com/eapache/go-xerial-snappy/blob/master/LICENSE)
- github.com/eapache/queue [MIT License](https://github.com/eapache/queue/blob/master/LICENSE)
- github.com/eclipse/paho.golang [Eclipse Public License - v 2.0](https://github.com/eclipse/paho.golang/blob/master/LICENSE)
- github.com/eclipse/paho.mqtt.golang [Eclipse Public License - v 1.0](https://github.com/eclipse/paho.mqtt.golang/blob/master/LICENSE)
- github.com/emicklei/go-restful [MIT License](https://github.com/emicklei/go-restful/blob/v3/LICENSE)
- github.com/fatih/color [MIT License](https://github.com/fatih/color/blob/master/LICENSE.md)

1
go.mod
View File

@ -55,6 +55,7 @@ require (
github.com/docker/go-connections v0.4.0
github.com/doclambda/protobufquery v0.0.0-20210317203640-88ffabe06a60
github.com/dynatrace-oss/dynatrace-metric-utils-go v0.5.0
github.com/eclipse/paho.golang v0.10.0
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/fatih/color v1.13.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32

2
go.sum
View File

@ -706,6 +706,8 @@ github.com/echlebek/crock v1.0.1 h1:KbzamClMIfVIkkjq/GTXf+N16KylYBpiaTitO3f1ujg=
github.com/echlebek/crock v1.0.1/go.mod h1:/kvwHRX3ZXHj/kHWJkjXDmzzRow54EJuHtQ/PapL/HI=
github.com/echlebek/timeproxy v1.0.0 h1:V41/v8tmmMDNMA2GrBPI45nlXb3F7+OY+nJz1BqKsCk=
github.com/echlebek/timeproxy v1.0.0/go.mod h1:0dg2Lnb8no/jFwoMQKMTU6iAivgoMptGqSTprhnrRtk=
github.com/eclipse/paho.golang v0.10.0 h1:oUGPjRwWcZQRgDD9wVDV7y7i7yBSxts3vcvcNJo8B4Q=
github.com/eclipse/paho.golang v0.10.0/go.mod h1:rhrV37IEwauUyx8FHrvmXOKo+QRKng5ncoN1vJiJMcs=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=

View File

@ -1,7 +1,7 @@
# MQTT Producer Output Plugin
This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
Producer.
Producer. It supports MQTT protocols `3.1.1` and `5`.
## Mosquitto v2.0.12+ and `identifier rejected`
@ -10,7 +10,7 @@ In v2.0.12+ of the mosquitto MQTT server, there is a
`keep_alive` value to be set non-zero in your telegraf configuration. If not
set, the server will return with `identifier rejected`.
As a reference `eclipse/paho.mqtt.golang` sets the `keep_alive` to 30.
As a reference `eclipse/paho.golang` sets the `keep_alive` to 30.
## Configuration
@ -19,9 +19,14 @@ As a reference `eclipse/paho.mqtt.golang` sets the `keep_alive` to 30.
[[outputs.mqtt]]
## MQTT Brokers
## The list of brokers should only include the hostname or IP address and the
## port to the broker. This should follow the format '{host}:{port}'. For
## example, "localhost:1883" or "127.0.0.1:8883".
servers = ["localhost:1883"]
## port to the broker. This should follow the format `[{scheme}://]{host}:{port}`. For
## example, `localhost:1883` or `mqtt://localhost:1883`.
## Scheme can be any of the following: tcp://, mqtt://, tls://, mqtts://
## non-TLS and TLS servers can not be mix-and-matched.
servers = ["localhost:1883", ] # or ["mqtts://tls.example.com:1883"]
## Protocol can be `3.1.1` or `5`. Default is `3.1.1`
# procotol = "3.1.1"
## MQTT Topic for Producer Messages
## MQTT outputs send metrics to this topic format:

View File

@ -2,17 +2,15 @@
package mqtt
import (
// Blank import to support go:embed compile directive
_ "embed"
"fmt"
"net/url"
"strings"
"sync"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
@ -23,55 +21,61 @@ import (
var sampleConfig string
const (
defaultKeepAlive = 0
defaultKeepAlive = 30
)
type MQTT struct {
Servers []string `toml:"servers"`
Username string
Password string
Protocol string `toml:"protocol"`
Username string `toml:"username"`
Password string `toml:"password"`
Database string
Timeout config.Duration
TopicPrefix string
QoS int `toml:"qos"`
ClientID string `toml:"client_id"`
Timeout config.Duration `toml:"timeout"`
TopicPrefix string `toml:"topic_prefix"`
QoS int `toml:"qos"`
ClientID string `toml:"client_id"`
tls.ClientConfig
BatchMessage bool `toml:"batch"`
Retain bool `toml:"retain"`
KeepAlive int64 `toml:"keep_alive"`
Log telegraf.Logger `toml:"-"`
client paho.Client
opts *paho.ClientOptions
client Client
serializer serializers.Serializer
sync.Mutex
}
// Client is a protocol neutral MQTT client for connecting,
// disconnecting, and publishing data to a topic.
// The protocol specific clients must implement this interface
type Client interface {
Connect() error
Publish(topic string, data []byte) error
Close() error
}
func (*MQTT) SampleConfig() string {
return sampleConfig
}
func (m *MQTT) Connect() error {
var err error
m.Lock()
defer m.Unlock()
if m.QoS > 2 || m.QoS < 0 {
return fmt.Errorf("MQTT Output, invalid QoS value: %d", m.QoS)
}
m.opts, err = m.createOpts()
if err != nil {
return err
switch m.Protocol {
case "", "3.1.1":
m.client = newMQTTv311Client(m)
case "5":
m.client = newMQTTv5Client(m)
default:
return fmt.Errorf("unsuported protocol %q: must be \"3.1.1\" or \"5\"", m.Protocol)
}
m.client = paho.NewClient(m.opts)
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
return m.client.Connect()
}
func (m *MQTT) SetSerializer(serializer serializers.Serializer) {
@ -79,10 +83,7 @@ func (m *MQTT) SetSerializer(serializer serializers.Serializer) {
}
func (m *MQTT) Close() error {
if m.client.IsConnected() {
m.client.Disconnect(20)
}
return nil
return m.client.Close()
}
func (m *MQTT) Write(metrics []telegraf.Metric) error {
@ -119,7 +120,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
continue
}
err = m.publish(topic, buf)
err = m.client.Publish(topic, buf)
if err != nil {
return fmt.Errorf("could not write to MQTT server, %s", err)
}
@ -132,69 +133,29 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
if err != nil {
return err
}
publisherr := m.publish(key, buf)
if publisherr != nil {
return fmt.Errorf("could not write to MQTT server, %s", publisherr)
err = m.client.Publish(key, buf)
if err != nil {
return fmt.Errorf("could not write to MQTT server, %s", err)
}
}
return nil
}
func (m *MQTT) publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.QoS), m.Retain, body)
token.WaitTimeout(time.Duration(m.Timeout))
if token.Error() != nil {
return token.Error()
func parseServers(servers []string) ([]*url.URL, error) {
urls := make([]*url.URL, 0, len(servers))
for _, svr := range servers {
if !strings.Contains(svr, "://") {
urls = append(urls, &url.URL{Scheme: "tcp", Host: svr})
} else {
u, err := url.Parse(svr)
if err != nil {
return nil, err
}
urls = append(urls, u)
}
}
return nil
}
func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
opts := paho.NewClientOptions()
opts.KeepAlive = m.KeepAlive
if m.Timeout < config.Duration(time.Second) {
m.Timeout = config.Duration(5 * time.Second)
}
opts.WriteTimeout = time.Duration(m.Timeout)
if m.ClientID != "" {
opts.SetClientID(m.ClientID)
} else {
opts.SetClientID("Telegraf-Output-" + internal.RandomString(5))
}
tlsCfg, err := m.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
scheme := "tcp"
if tlsCfg != nil {
scheme = "ssl"
opts.SetTLSConfig(tlsCfg)
}
user := m.Username
if user != "" {
opts.SetUsername(user)
}
password := m.Password
if password != "" {
opts.SetPassword(password)
}
if len(m.Servers) == 0 {
return opts, fmt.Errorf("could not get host informations")
}
for _, host := range m.Servers {
server := fmt.Sprintf("%s://%s", scheme, host)
opts.AddBroker(server)
}
opts.SetAutoReconnect(true)
return opts, nil
return urls, nil
}
func init() {

View File

@ -2,9 +2,9 @@ package mqtt
import (
"fmt"
"path/filepath"
"testing"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/testcontainers/testcontainers-go/wait"
@ -12,29 +12,103 @@ import (
"github.com/stretchr/testify/require"
)
const servicePort = "1883"
func launchTestContainer(t *testing.T) *testutil.Container {
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
BindMounts: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
err = container.Start()
require.NoError(t, err, "failed to start container")
return &container
}
func TestConnectAndWriteIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
servicePort := "1883"
container := testutil.Container{
Image: "ncarlier/mqtt",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
}
err := container.Start()
require.NoError(t, err, "failed to start container")
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, _ := serializers.NewInfluxSerializer()
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
m := &MQTT{
Servers: []string{url},
serializer: s,
KeepAlive: 30,
Log: testutil.Logger{Name: "mqtt-default-integration-test"},
}
// Verify that we can connect to the MQTT broker
err = m.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to the mqtt broker
err = m.Write(testutil.MockMetrics())
require.NoError(t, err)
}
func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
m := &MQTT{
Servers: []string{url},
Protocol: "3.1.1",
serializer: s,
KeepAlive: 30,
Log: testutil.Logger{Name: "mqttv311-integration-test"},
}
// Verify that we can connect to the MQTT broker
err = m.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to the mqtt broker
err = m.Write(testutil.MockMetrics())
require.NoError(t, err)
}
func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
m := &MQTT{
Servers: []string{url},
Protocol: "5",
serializer: s,
KeepAlive: 30,
Log: testutil.Logger{Name: "mqttv5-integration-test"},
}
// Verify that we can connect to the MQTT broker

View File

@ -0,0 +1,92 @@
package mqtt
import (
"fmt"
"time"
// Library that supports v3.1.1
mqttv3 "github.com/eclipse/paho.mqtt.golang"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
)
type mqttv311Client struct {
*MQTT
client mqttv3.Client
}
func newMQTTv311Client(cfg *MQTT) *mqttv311Client {
return &mqttv311Client{MQTT: cfg}
}
func (m *mqttv311Client) Connect() error {
opts := mqttv3.NewClientOptions()
opts.KeepAlive = m.KeepAlive
if m.Timeout < config.Duration(time.Second) {
m.Timeout = config.Duration(5 * time.Second)
}
opts.WriteTimeout = time.Duration(m.Timeout)
if m.ClientID != "" {
opts.SetClientID(m.ClientID)
} else {
opts.SetClientID("Telegraf-Output-" + internal.RandomString(5))
}
tlsCfg, err := m.ClientConfig.TLSConfig()
if err != nil {
return err
}
opts.SetTLSConfig(tlsCfg)
user := m.Username
if user != "" {
opts.SetUsername(user)
}
password := m.Password
if password != "" {
opts.SetPassword(password)
}
if len(m.Servers) == 0 {
return fmt.Errorf("could not get server informations")
}
servers, err := parseServers(m.Servers)
if err != nil {
return err
}
for _, server := range servers {
if tlsCfg != nil {
server.Scheme = "tls"
}
broker := server.String()
opts.AddBroker(broker)
m.MQTT.Log.Debugf("registered mqtt broker: %v", broker)
}
opts.SetAutoReconnect(true)
m.client = mqttv3.NewClient(opts)
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
func (m *mqttv311Client) Publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.QoS), m.Retain, body)
token.WaitTimeout(time.Duration(m.Timeout))
if token.Error() != nil {
return token.Error()
}
return nil
}
func (m *mqttv311Client) Close() error {
if m.client.IsConnected() {
m.client.Disconnect(20)
}
return nil
}

View File

@ -0,0 +1,94 @@
package mqtt
import (
"context"
"fmt"
"net/url"
"time"
mqttv5auto "github.com/eclipse/paho.golang/autopaho"
mqttv5 "github.com/eclipse/paho.golang/paho"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
)
type mqttv5Client struct {
*MQTT
client *mqttv5auto.ConnectionManager
}
func newMQTTv5Client(cfg *MQTT) *mqttv5Client {
return &mqttv5Client{MQTT: cfg}
}
func (m *mqttv5Client) Connect() error {
opts := mqttv5auto.ClientConfig{}
if m.ClientID != "" {
opts.ClientID = m.ClientID
} else {
opts.ClientID = "Telegraf-Output-" + internal.RandomString(5)
}
user := m.Username
pass := m.Password
opts.SetUsernamePassword(user, []byte(pass))
tlsCfg, err := m.ClientConfig.TLSConfig()
if err != nil {
return err
}
if tlsCfg != nil {
opts.TlsCfg = tlsCfg
}
if len(m.Servers) == 0 {
return fmt.Errorf("could not get host informations")
}
brokers := make([]*url.URL, 0)
servers, err := parseServers(m.Servers)
if err != nil {
return err
}
for _, server := range servers {
if tlsCfg != nil {
server.Scheme = "tls"
}
brokers = append(brokers, server)
m.MQTT.Log.Debugf("registered mqtt broker: %s", server.String())
}
opts.BrokerUrls = brokers
opts.KeepAlive = uint16(m.KeepAlive)
if m.Timeout < config.Duration(time.Second) {
m.Timeout = config.Duration(5 * time.Second)
}
client, err := mqttv5auto.NewConnection(context.Background(), opts)
if err != nil {
return err
}
m.client = client
return client.AwaitConnection(context.Background())
}
func (m *mqttv5Client) Publish(topic string, body []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(m.Timeout))
defer cancel()
_, err := m.client.Publish(ctx, &mqttv5.Publish{
Topic: topic,
QoS: byte(m.QoS),
Retain: m.Retain,
Payload: body,
})
if err != nil {
return err
}
return nil
}
func (m *mqttv5Client) Close() error {
return m.client.Disconnect(context.Background())
}

View File

@ -2,9 +2,14 @@
[[outputs.mqtt]]
## MQTT Brokers
## The list of brokers should only include the hostname or IP address and the
## port to the broker. This should follow the format '{host}:{port}'. For
## example, "localhost:1883" or "127.0.0.1:8883".
servers = ["localhost:1883"]
## port to the broker. This should follow the format `[{scheme}://]{host}:{port}`. For
## example, `localhost:1883` or `mqtt://localhost:1883`.
## Scheme can be any of the following: tcp://, mqtt://, tls://, mqtts://
## non-TLS and TLS servers can not be mix-and-matched.
servers = ["localhost:1883", ] # or ["mqtts://tls.example.com:1883"]
## Protocol can be `3.1.1` or `5`. Default is `3.1.1`
# procotol = "3.1.1"
## MQTT Topic for Producer Messages
## MQTT outputs send metrics to this topic format:

View File

@ -0,0 +1,3 @@
listener 1883 0.0.0.0
allow_anonymous true
connection_messages true