chore(mqtt): unify input and output plugin's MQTT client (#12683)
This commit is contained in:
parent
da675d4788
commit
75cbda186c
|
|
@ -0,0 +1,94 @@
|
|||
package mqtt
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
paho "github.com/eclipse/paho.mqtt.golang"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||
)
|
||||
|
||||
// mqtt v5-specific publish properties.
|
||||
// See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109
|
||||
type PublishProperties struct {
|
||||
ContentType string `toml:"content_type"`
|
||||
ResponseTopic string `toml:"response_topic"`
|
||||
MessageExpiry config.Duration `toml:"message_expiry"`
|
||||
TopicAlias *uint16 `toml:"topic_alias"`
|
||||
UserProperties map[string]string `toml:"user_properties"`
|
||||
}
|
||||
|
||||
type MqttConfig struct {
|
||||
Servers []string `toml:"servers"`
|
||||
Protocol string `toml:"protocol"`
|
||||
Username config.Secret `toml:"username"`
|
||||
Password config.Secret `toml:"password"`
|
||||
Timeout config.Duration `toml:"timeout"`
|
||||
ConnectionTimeout config.Duration `toml:"connection_timeout"`
|
||||
QoS int `toml:"qos"`
|
||||
ClientID string `toml:"client_id"`
|
||||
Retain bool `toml:"retain"`
|
||||
KeepAlive int64 `toml:"keep_alive"`
|
||||
PersistentSession bool `toml:"persistent_session"`
|
||||
PublishPropertiesV5 *PublishProperties `toml:"v5"`
|
||||
|
||||
tls.ClientConfig
|
||||
|
||||
AutoReconnect bool `toml:"-"`
|
||||
OnConnectionLost func(error) `toml:"-"`
|
||||
}
|
||||
|
||||
// Client is a protocol neutral MQTT client for connecting,
|
||||
// disconnecting, and publishing data to a topic.
|
||||
// The protocol specific clients must implement this interface
|
||||
type Client interface {
|
||||
Connect() (bool, error)
|
||||
Publish(topic string, data []byte) error
|
||||
SubscribeMultiple(filters map[string]byte, callback paho.MessageHandler) error
|
||||
AddRoute(topic string, callback paho.MessageHandler)
|
||||
Close() error
|
||||
}
|
||||
|
||||
func NewClient(cfg *MqttConfig) (Client, error) {
|
||||
if len(cfg.Servers) == 0 {
|
||||
return nil, errors.New("no servers specified")
|
||||
}
|
||||
|
||||
if cfg.PersistentSession && cfg.ClientID == "" {
|
||||
return nil, errors.New("persistent_session requires client_id")
|
||||
}
|
||||
|
||||
if cfg.QoS > 2 || cfg.QoS < 0 {
|
||||
return nil, fmt.Errorf("invalid QoS value %d; must be 0, 1 or 2", cfg.QoS)
|
||||
}
|
||||
|
||||
switch cfg.Protocol {
|
||||
case "", "3.1.1":
|
||||
return NewMQTTv311Client(cfg)
|
||||
case "5":
|
||||
return NewMQTTv5Client(cfg)
|
||||
}
|
||||
return nil, fmt.Errorf("unsuported protocol %q: must be \"3.1.1\" or \"5\"", cfg.Protocol)
|
||||
}
|
||||
|
||||
func parseServers(servers []string) ([]*url.URL, error) {
|
||||
urls := make([]*url.URL, 0, len(servers))
|
||||
for _, svr := range servers {
|
||||
// Preserve support for host:port style servers; deprecated in Telegraf 1.4.4
|
||||
if !strings.Contains(svr, "://") {
|
||||
urls = append(urls, &url.URL{Scheme: "tcp", Host: svr})
|
||||
continue
|
||||
}
|
||||
|
||||
u, err := url.Parse(svr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
urls = append(urls, u)
|
||||
}
|
||||
return urls, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
package mqtt
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Test that default client has random ID
|
||||
func TestRandomClientID(t *testing.T) {
|
||||
var err error
|
||||
|
||||
cfg := &MqttConfig{
|
||||
Servers: []string{"tcp://localhost:1883"},
|
||||
}
|
||||
|
||||
client1, err := NewMQTTv311Client(cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
client2, err := NewMQTTv311Client(cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
options1 := client1.client.OptionsReader()
|
||||
options2 := client2.client.OptionsReader()
|
||||
require.NotEqual(t, options1.ClientID(), options2.ClientID())
|
||||
}
|
||||
|
|
@ -0,0 +1,129 @@
|
|||
package mqtt
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
)
|
||||
|
||||
type mqttv311Client struct {
|
||||
client mqttv3.Client
|
||||
timeout time.Duration
|
||||
qos int
|
||||
retain bool
|
||||
}
|
||||
|
||||
func NewMQTTv311Client(cfg *MqttConfig) (*mqttv311Client, error) {
|
||||
opts := mqttv3.NewClientOptions()
|
||||
opts.KeepAlive = cfg.KeepAlive
|
||||
opts.WriteTimeout = time.Duration(cfg.Timeout)
|
||||
if time.Duration(cfg.ConnectionTimeout) >= 1*time.Second {
|
||||
opts.ConnectTimeout = time.Duration(cfg.ConnectionTimeout)
|
||||
}
|
||||
opts.SetCleanSession(!cfg.PersistentSession)
|
||||
if cfg.OnConnectionLost != nil {
|
||||
onConnectionLost := func(_ mqttv3.Client, err error) {
|
||||
cfg.OnConnectionLost(err)
|
||||
}
|
||||
opts.SetConnectionLostHandler(onConnectionLost)
|
||||
}
|
||||
opts.SetAutoReconnect(cfg.AutoReconnect)
|
||||
|
||||
if cfg.ClientID != "" {
|
||||
opts.SetClientID(cfg.ClientID)
|
||||
} else {
|
||||
id, err := internal.RandomString(5)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("generating random client ID failed: %w", err)
|
||||
}
|
||||
opts.SetClientID("Telegraf-Output-" + id)
|
||||
}
|
||||
|
||||
tlsCfg, err := cfg.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts.SetTLSConfig(tlsCfg)
|
||||
|
||||
if !cfg.Username.Empty() {
|
||||
user, err := cfg.Username.Get()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting username failed: %w", err)
|
||||
}
|
||||
opts.SetUsername(string(user))
|
||||
config.ReleaseSecret(user)
|
||||
}
|
||||
if !cfg.Password.Empty() {
|
||||
password, err := cfg.Password.Get()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting password failed: %w", err)
|
||||
}
|
||||
opts.SetPassword(string(password))
|
||||
config.ReleaseSecret(password)
|
||||
}
|
||||
|
||||
servers, err := parseServers(cfg.Servers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, server := range servers {
|
||||
if tlsCfg != nil {
|
||||
server.Scheme = "tls"
|
||||
}
|
||||
broker := server.String()
|
||||
opts.AddBroker(broker)
|
||||
}
|
||||
|
||||
return &mqttv311Client{
|
||||
client: mqttv3.NewClient(opts),
|
||||
timeout: time.Duration(cfg.Timeout),
|
||||
qos: cfg.QoS,
|
||||
retain: cfg.Retain,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *mqttv311Client) Connect() (bool, error) {
|
||||
token := m.client.Connect()
|
||||
|
||||
if token.Wait() && token.Error() != nil {
|
||||
return false, token.Error()
|
||||
}
|
||||
|
||||
// Persistent sessions should skip subscription if a session is present, as
|
||||
// the subscriptions are stored by the server.
|
||||
type sessionPresent interface {
|
||||
SessionPresent() bool
|
||||
}
|
||||
if t, ok := token.(sessionPresent); ok {
|
||||
return t.SessionPresent(), nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (m *mqttv311Client) Publish(topic string, body []byte) error {
|
||||
token := m.client.Publish(topic, byte(m.qos), m.retain, body)
|
||||
token.WaitTimeout(m.timeout)
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
func (m *mqttv311Client) SubscribeMultiple(filters map[string]byte, callback mqttv3.MessageHandler) error {
|
||||
token := m.client.SubscribeMultiple(filters, callback)
|
||||
token.Wait()
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
func (m *mqttv311Client) AddRoute(topic string, callback mqttv3.MessageHandler) {
|
||||
m.client.AddRoute(topic, callback)
|
||||
}
|
||||
|
||||
func (m *mqttv311Client) Close() error {
|
||||
if m.client.IsConnected() {
|
||||
m.client.Disconnect(20)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,152 @@
|
|||
package mqtt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
mqttv5auto "github.com/eclipse/paho.golang/autopaho"
|
||||
mqttv5 "github.com/eclipse/paho.golang/paho"
|
||||
paho "github.com/eclipse/paho.mqtt.golang"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
)
|
||||
|
||||
type mqttv5Client struct {
|
||||
client *mqttv5auto.ConnectionManager
|
||||
options mqttv5auto.ClientConfig
|
||||
timeout time.Duration
|
||||
qos int
|
||||
retain bool
|
||||
properties *mqttv5.PublishProperties
|
||||
}
|
||||
|
||||
func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) {
|
||||
opts := mqttv5auto.ClientConfig{
|
||||
KeepAlive: uint16(cfg.KeepAlive),
|
||||
OnConnectError: cfg.OnConnectionLost,
|
||||
}
|
||||
opts.SetConnectPacketConfigurator(func(c *mqttv5.Connect) *mqttv5.Connect {
|
||||
c.CleanStart = cfg.PersistentSession
|
||||
return c
|
||||
})
|
||||
|
||||
if time.Duration(cfg.ConnectionTimeout) >= 1*time.Second {
|
||||
opts.ConnectTimeout = time.Duration(cfg.ConnectionTimeout)
|
||||
}
|
||||
|
||||
if cfg.ClientID != "" {
|
||||
opts.ClientID = cfg.ClientID
|
||||
} else {
|
||||
id, err := internal.RandomString(5)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("generating random client ID failed: %w", err)
|
||||
}
|
||||
opts.ClientID = "Telegraf-Output-" + id
|
||||
}
|
||||
|
||||
user, err := cfg.Username.Get()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting username failed: %w", err)
|
||||
}
|
||||
pass, err := cfg.Password.Get()
|
||||
if err != nil {
|
||||
config.ReleaseSecret(user)
|
||||
return nil, fmt.Errorf("getting password failed: %w", err)
|
||||
}
|
||||
opts.SetUsernamePassword(string(user), pass)
|
||||
config.ReleaseSecret(user)
|
||||
config.ReleaseSecret(pass)
|
||||
|
||||
tlsCfg, err := cfg.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if tlsCfg != nil {
|
||||
opts.TlsCfg = tlsCfg
|
||||
}
|
||||
|
||||
brokers := make([]*url.URL, 0)
|
||||
servers, err := parseServers(cfg.Servers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, server := range servers {
|
||||
if tlsCfg != nil {
|
||||
server.Scheme = "tls"
|
||||
}
|
||||
brokers = append(brokers, server)
|
||||
}
|
||||
opts.BrokerUrls = brokers
|
||||
|
||||
// Build the v5 specific publish properties if they are present in the config.
|
||||
// These should not change during the lifecycle of the client.
|
||||
var properties *mqttv5.PublishProperties
|
||||
if cfg.PublishPropertiesV5 != nil {
|
||||
properties = &mqttv5.PublishProperties{
|
||||
ContentType: cfg.PublishPropertiesV5.ContentType,
|
||||
ResponseTopic: cfg.PublishPropertiesV5.ResponseTopic,
|
||||
TopicAlias: cfg.PublishPropertiesV5.TopicAlias,
|
||||
}
|
||||
|
||||
messageExpiry := time.Duration(cfg.PublishPropertiesV5.MessageExpiry)
|
||||
if expirySeconds := uint32(messageExpiry.Seconds()); expirySeconds > 0 {
|
||||
properties.MessageExpiry = &expirySeconds
|
||||
}
|
||||
|
||||
properties.User = make([]mqttv5.UserProperty, 0, len(cfg.PublishPropertiesV5.UserProperties))
|
||||
for k, v := range cfg.PublishPropertiesV5.UserProperties {
|
||||
properties.User.Add(k, v)
|
||||
}
|
||||
}
|
||||
|
||||
return &mqttv5Client{
|
||||
options: opts,
|
||||
timeout: time.Duration(cfg.Timeout),
|
||||
qos: cfg.QoS,
|
||||
retain: cfg.Retain,
|
||||
properties: properties,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *mqttv5Client) Connect() (bool, error) {
|
||||
client, err := mqttv5auto.NewConnection(context.Background(), m.options)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
m.client = client
|
||||
return false, client.AwaitConnection(context.Background())
|
||||
}
|
||||
|
||||
func (m *mqttv5Client) Publish(topic string, body []byte) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := m.client.Publish(ctx, &mqttv5.Publish{
|
||||
Topic: topic,
|
||||
QoS: byte(m.qos),
|
||||
Retain: m.retain,
|
||||
Payload: body,
|
||||
Properties: m.properties,
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *mqttv5Client) SubscribeMultiple(filters map[string]byte, callback paho.MessageHandler) error {
|
||||
_, _ = filters, callback
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (m *mqttv5Client) AddRoute(topic string, callback paho.MessageHandler) {
|
||||
_, _ = topic, callback
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (m *mqttv5Client) Close() error {
|
||||
return m.client.Disconnect(context.Background())
|
||||
}
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
listener 1883 0.0.0.0
|
||||
allow_anonymous true
|
||||
connection_messages true
|
||||
|
|
@ -4,14 +4,14 @@ package mqtt
|
|||
import (
|
||||
// Blank import to support go:embed compile directive
|
||||
_ "embed"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||
"github.com/influxdata/telegraf/plugins/common/mqtt"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
)
|
||||
|
|
@ -19,74 +19,54 @@ import (
|
|||
//go:embed sample.conf
|
||||
var sampleConfig string
|
||||
|
||||
const (
|
||||
defaultKeepAlive = 30
|
||||
)
|
||||
|
||||
type MQTT struct {
|
||||
Servers []string `toml:"servers"`
|
||||
Protocol string `toml:"protocol"`
|
||||
Username config.Secret `toml:"username"`
|
||||
Password config.Secret `toml:"password"`
|
||||
Database string
|
||||
Timeout config.Duration `toml:"timeout"`
|
||||
TopicPrefix string `toml:"topic_prefix" deprecated:"1.25.0;use 'topic' instead"`
|
||||
Topic string `toml:"topic"`
|
||||
QoS int `toml:"qos"`
|
||||
ClientID string `toml:"client_id"`
|
||||
tls.ClientConfig
|
||||
BatchMessage bool `toml:"batch"`
|
||||
Retain bool `toml:"retain"`
|
||||
KeepAlive int64 `toml:"keep_alive"`
|
||||
V5PublishProperties *mqttv5PublishProperties `toml:"v5"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
TopicPrefix string `toml:"topic_prefix" deprecated:"1.25.0;use 'topic' instead"`
|
||||
Topic string `toml:"topic"`
|
||||
BatchMessage bool `toml:"batch"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
mqtt.MqttConfig
|
||||
|
||||
client Client
|
||||
client mqtt.Client
|
||||
serializer serializers.Serializer
|
||||
generator *TopicNameGenerator
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// Client is a protocol neutral MQTT client for connecting,
|
||||
// disconnecting, and publishing data to a topic.
|
||||
// The protocol specific clients must implement this interface
|
||||
type Client interface {
|
||||
Connect() error
|
||||
Publish(topic string, data []byte) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
func (*MQTT) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
func (m *MQTT) Init() error {
|
||||
if len(m.Servers) == 0 {
|
||||
return errors.New("no servers specified")
|
||||
}
|
||||
|
||||
if m.PersistentSession && m.ClientID == "" {
|
||||
return errors.New("persistent_session requires client_id")
|
||||
}
|
||||
|
||||
if m.QoS > 2 || m.QoS < 0 {
|
||||
return fmt.Errorf("qos value must be 0, 1, or 2: %d", m.QoS)
|
||||
}
|
||||
|
||||
var err error
|
||||
m.generator, err = NewTopicNameGenerator(m.TopicPrefix, m.Topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *MQTT) Connect() error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
if m.QoS > 2 || m.QoS < 0 {
|
||||
return fmt.Errorf("MQTT Output, invalid QoS value: %d", m.QoS)
|
||||
}
|
||||
|
||||
switch m.Protocol {
|
||||
case "", "3.1.1":
|
||||
m.client = newMQTTv311Client(m)
|
||||
case "5":
|
||||
m.client = newMQTTv5Client(m)
|
||||
default:
|
||||
return fmt.Errorf("unsuported protocol %q: must be \"3.1.1\" or \"5\"", m.Protocol)
|
||||
client, err := mqtt.NewClient(&m.MqttConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.client = client
|
||||
|
||||
return m.client.Connect()
|
||||
_, err = m.client.Connect()
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *MQTT) SetSerializer(serializer serializers.Serializer) {
|
||||
|
|
@ -146,26 +126,14 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func parseServers(servers []string) ([]*url.URL, error) {
|
||||
urls := make([]*url.URL, 0, len(servers))
|
||||
for _, svr := range servers {
|
||||
if !strings.Contains(svr, "://") {
|
||||
urls = append(urls, &url.URL{Scheme: "tcp", Host: svr})
|
||||
} else {
|
||||
u, err := url.Parse(svr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
urls = append(urls, u)
|
||||
}
|
||||
}
|
||||
return urls, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("mqtt", func() telegraf.Output {
|
||||
return &MQTT{
|
||||
KeepAlive: defaultKeepAlive,
|
||||
MqttConfig: mqtt.MqttConfig{
|
||||
KeepAlive: 30,
|
||||
Timeout: config.Duration(5 * time.Second),
|
||||
AutoReconnect: true,
|
||||
},
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,11 +6,15 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
paho "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/common/mqtt"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
|
@ -45,9 +49,12 @@ func TestConnectAndWriteIntegration(t *testing.T) {
|
|||
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
|
||||
s := serializers.NewInfluxSerializer()
|
||||
m := &MQTT{
|
||||
Servers: []string{url},
|
||||
MqttConfig: mqtt.MqttConfig{
|
||||
Servers: []string{url},
|
||||
KeepAlive: 30,
|
||||
Timeout: config.Duration(5 * time.Second),
|
||||
},
|
||||
serializer: s,
|
||||
KeepAlive: 30,
|
||||
Log: testutil.Logger{Name: "mqtt-default-integration-test"},
|
||||
}
|
||||
|
||||
|
|
@ -72,10 +79,13 @@ func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) {
|
|||
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
|
||||
s := serializers.NewInfluxSerializer()
|
||||
m := &MQTT{
|
||||
Servers: []string{url},
|
||||
Protocol: "3.1.1",
|
||||
MqttConfig: mqtt.MqttConfig{
|
||||
Servers: []string{url},
|
||||
Protocol: "3.1.1",
|
||||
KeepAlive: 30,
|
||||
Timeout: config.Duration(5 * time.Second),
|
||||
},
|
||||
serializer: s,
|
||||
KeepAlive: 30,
|
||||
Log: testutil.Logger{Name: "mqttv311-integration-test"},
|
||||
}
|
||||
|
||||
|
|
@ -98,11 +108,127 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
|
|||
defer container.Terminate()
|
||||
|
||||
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
|
||||
s := serializers.NewInfluxSerializer()
|
||||
m := &MQTT{
|
||||
MqttConfig: mqtt.MqttConfig{
|
||||
Servers: []string{url},
|
||||
Protocol: "5",
|
||||
KeepAlive: 30,
|
||||
Timeout: config.Duration(5 * time.Second),
|
||||
},
|
||||
serializer: serializers.NewInfluxSerializer(),
|
||||
Log: testutil.Logger{Name: "mqttv5-integration-test"},
|
||||
}
|
||||
|
||||
// Verify that we can connect to the MQTT broker
|
||||
require.NoError(t, m.Init())
|
||||
require.NoError(t, m.Connect())
|
||||
|
||||
// Verify that we can successfully write data to the mqtt broker
|
||||
require.NoError(t, m.Write(testutil.MockMetrics()))
|
||||
}
|
||||
|
||||
func TestIntegrationMQTTv3(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
|
||||
require.NoError(t, err, "missing file mosquitto.conf")
|
||||
|
||||
container := testutil.Container{
|
||||
Image: "eclipse-mosquitto:2",
|
||||
ExposedPorts: []string{servicePort},
|
||||
WaitingFor: wait.ForListeningPort(servicePort),
|
||||
BindMounts: map[string]string{
|
||||
"/mosquitto/config/mosquitto.conf": conf,
|
||||
},
|
||||
}
|
||||
require.NoError(t, container.Start(), "failed to start container")
|
||||
defer container.Terminate()
|
||||
|
||||
// Setup the parser / serializer pair
|
||||
parser := &influx.Parser{}
|
||||
require.NoError(t, parser.Init())
|
||||
serializer := serializers.NewInfluxSerializer()
|
||||
|
||||
// Setup the plugin
|
||||
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
|
||||
topic := "testv3"
|
||||
plugin := &MQTT{
|
||||
MqttConfig: mqtt.MqttConfig{
|
||||
Servers: []string{url},
|
||||
KeepAlive: 30,
|
||||
Timeout: config.Duration(5 * time.Second),
|
||||
AutoReconnect: true,
|
||||
},
|
||||
Topic: topic + "/{{.PluginName}}",
|
||||
Log: testutil.Logger{Name: "mqttv3-integration-test"},
|
||||
}
|
||||
plugin.SetSerializer(serializer)
|
||||
require.NoError(t, plugin.Init())
|
||||
|
||||
// Prepare the receiver message
|
||||
var acc testutil.Accumulator
|
||||
onMessage := func(_ paho.Client, msg paho.Message) {
|
||||
metrics, err := parser.Parse(msg.Payload())
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, m := range metrics {
|
||||
m.AddTag("topic", msg.Topic())
|
||||
acc.AddMetric(m)
|
||||
}
|
||||
}
|
||||
|
||||
// Startup the plugin and subscribe to the topic
|
||||
require.NoError(t, plugin.Connect())
|
||||
defer plugin.Close()
|
||||
|
||||
// Add routing for the messages
|
||||
subscriptionPattern := topic + "/#"
|
||||
plugin.client.AddRoute(subscriptionPattern, onMessage)
|
||||
|
||||
// Subscribe to the topic
|
||||
topics := map[string]byte{subscriptionPattern: byte(plugin.QoS)}
|
||||
require.NoError(t, plugin.client.SubscribeMultiple(topics, onMessage))
|
||||
|
||||
// Setup and execute the test case
|
||||
input := make([]telegraf.Metric, 0, 3)
|
||||
expected := make([]telegraf.Metric, 0, len(input))
|
||||
for i := 0; i < cap(input); i++ {
|
||||
name := fmt.Sprintf("test%d", i)
|
||||
m := testutil.TestMetric(i, name)
|
||||
input = append(input, m)
|
||||
|
||||
e := m.Copy()
|
||||
e.AddTag("topic", topic+"/"+name)
|
||||
expected = append(expected, e)
|
||||
}
|
||||
require.NoError(t, plugin.Write(input))
|
||||
|
||||
// Verify the result
|
||||
require.Eventually(t, func() bool {
|
||||
return acc.NMetrics() >= uint64(len(expected))
|
||||
}, time.Second, 100*time.Millisecond)
|
||||
require.NoError(t, plugin.Close())
|
||||
|
||||
require.Empty(t, acc.Errors)
|
||||
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
|
||||
}
|
||||
|
||||
func TestMQTTv5Properties(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping integration test in short mode")
|
||||
}
|
||||
|
||||
container := launchTestContainer(t)
|
||||
defer container.Terminate()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
properties *mqttv5PublishProperties
|
||||
properties *mqtt.PublishProperties
|
||||
}{
|
||||
{
|
||||
name: "no publish properties",
|
||||
|
|
@ -110,46 +236,61 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "content type set",
|
||||
properties: &mqttv5PublishProperties{ContentType: "text/plain"},
|
||||
properties: &mqtt.PublishProperties{ContentType: "text/plain"},
|
||||
},
|
||||
{
|
||||
name: "response topic set",
|
||||
properties: &mqttv5PublishProperties{ResponseTopic: "test/topic"},
|
||||
properties: &mqtt.PublishProperties{ResponseTopic: "test/topic"},
|
||||
},
|
||||
{
|
||||
name: "message expiry set",
|
||||
properties: &mqttv5PublishProperties{MessageExpiry: config.Duration(10 * time.Minute)},
|
||||
properties: &mqtt.PublishProperties{MessageExpiry: config.Duration(10 * time.Minute)},
|
||||
},
|
||||
{
|
||||
name: "topic alias set",
|
||||
properties: &mqttv5PublishProperties{TopicAlias: new(uint16)},
|
||||
properties: &mqtt.PublishProperties{TopicAlias: new(uint16)},
|
||||
},
|
||||
{
|
||||
name: "user properties set",
|
||||
properties: &mqttv5PublishProperties{UserProperties: map[string]string{"key": "value"}},
|
||||
properties: &mqtt.PublishProperties{UserProperties: map[string]string{"key": "value"}},
|
||||
},
|
||||
}
|
||||
|
||||
topic := "testv3"
|
||||
url := fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
m := &MQTT{
|
||||
Servers: []string{url},
|
||||
Protocol: "5",
|
||||
serializer: s,
|
||||
KeepAlive: 30,
|
||||
Log: testutil.Logger{Name: "mqttv5-integration-test"},
|
||||
V5PublishProperties: tt.properties,
|
||||
plugin := &MQTT{
|
||||
MqttConfig: mqtt.MqttConfig{
|
||||
Servers: []string{url},
|
||||
Protocol: "5",
|
||||
KeepAlive: 30,
|
||||
Timeout: config.Duration(5 * time.Second),
|
||||
AutoReconnect: true,
|
||||
},
|
||||
Topic: topic,
|
||||
Log: testutil.Logger{Name: "mqttv5-integration-test"},
|
||||
}
|
||||
|
||||
// Setup the metric serializer
|
||||
serializer := serializers.NewInfluxSerializer()
|
||||
plugin.SetSerializer(serializer)
|
||||
|
||||
// Verify that we can connect to the MQTT broker
|
||||
require.NoError(t, m.Init())
|
||||
require.NoError(t, m.Connect())
|
||||
require.NoError(t, plugin.Init())
|
||||
require.NoError(t, plugin.Connect())
|
||||
|
||||
// Verify that we can successfully write data to the mqtt broker
|
||||
require.NoError(t, m.Write(testutil.MockMetrics()))
|
||||
require.NoError(t, plugin.Write(testutil.MockMetrics()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMissingServers(t *testing.T) {
|
||||
plugin := &MQTT{}
|
||||
require.ErrorContains(t, plugin.Init(), "no servers specified")
|
||||
}
|
||||
|
||||
func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
@ -176,6 +317,9 @@ func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) {
|
|||
t.Run(tt.name, func(t *testing.T) {
|
||||
m := &MQTT{
|
||||
Topic: tt.topic,
|
||||
MqttConfig: mqtt.MqttConfig{
|
||||
Servers: []string{"tcp://localhost:1883"},
|
||||
},
|
||||
}
|
||||
err := m.Init()
|
||||
if tt.expectedError != "" {
|
||||
|
|
@ -190,9 +334,12 @@ func TestMQTTTopicGenerationTemplateIsValid(t *testing.T) {
|
|||
func TestGenerateTopicName(t *testing.T) {
|
||||
s := serializers.NewInfluxSerializer()
|
||||
m := &MQTT{
|
||||
Servers: []string{"tcp://localhost:502"},
|
||||
MqttConfig: mqtt.MqttConfig{
|
||||
Servers: []string{"tcp://localhost:1883"},
|
||||
KeepAlive: 30,
|
||||
Timeout: config.Duration(5 * time.Second),
|
||||
},
|
||||
serializer: s,
|
||||
KeepAlive: 30,
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
tests := []struct {
|
||||
|
|
|
|||
|
|
@ -1,104 +0,0 @@
|
|||
package mqtt
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
)
|
||||
|
||||
type mqttv311Client struct {
|
||||
*MQTT
|
||||
client mqttv3.Client
|
||||
}
|
||||
|
||||
func newMQTTv311Client(cfg *MQTT) *mqttv311Client {
|
||||
return &mqttv311Client{MQTT: cfg}
|
||||
}
|
||||
|
||||
func (m *mqttv311Client) Connect() error {
|
||||
opts := mqttv3.NewClientOptions()
|
||||
opts.KeepAlive = m.KeepAlive
|
||||
|
||||
if m.Timeout < config.Duration(time.Second) {
|
||||
m.Timeout = config.Duration(5 * time.Second)
|
||||
}
|
||||
opts.WriteTimeout = time.Duration(m.Timeout)
|
||||
|
||||
if m.ClientID != "" {
|
||||
opts.SetClientID(m.ClientID)
|
||||
} else {
|
||||
randomString, err := internal.RandomString(5)
|
||||
if err != nil {
|
||||
return fmt.Errorf("generating random string for client ID failed: %w", err)
|
||||
}
|
||||
opts.SetClientID("Telegraf-Output-" + randomString)
|
||||
}
|
||||
|
||||
tlsCfg, err := m.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
opts.SetTLSConfig(tlsCfg)
|
||||
|
||||
if !m.Username.Empty() {
|
||||
user, err := m.Username.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting username failed: %w", err)
|
||||
}
|
||||
opts.SetUsername(string(user))
|
||||
config.ReleaseSecret(user)
|
||||
}
|
||||
if !m.Password.Empty() {
|
||||
password, err := m.Password.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting password failed: %w", err)
|
||||
}
|
||||
opts.SetPassword(string(password))
|
||||
config.ReleaseSecret(password)
|
||||
}
|
||||
|
||||
if len(m.Servers) == 0 {
|
||||
return fmt.Errorf("could not get server informations")
|
||||
}
|
||||
|
||||
servers, err := parseServers(m.Servers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, server := range servers {
|
||||
if tlsCfg != nil {
|
||||
server.Scheme = "tls"
|
||||
}
|
||||
broker := server.String()
|
||||
opts.AddBroker(broker)
|
||||
m.MQTT.Log.Debugf("registered mqtt broker: %v", broker)
|
||||
}
|
||||
|
||||
opts.SetAutoReconnect(true)
|
||||
m.client = mqttv3.NewClient(opts)
|
||||
if token := m.client.Connect(); token.Wait() && token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mqttv311Client) Publish(topic string, body []byte) error {
|
||||
token := m.client.Publish(topic, byte(m.QoS), m.Retain, body)
|
||||
token.WaitTimeout(time.Duration(m.Timeout))
|
||||
if token.Error() != nil {
|
||||
return token.Error()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mqttv311Client) Close() error {
|
||||
if m.client.IsConnected() {
|
||||
m.client.Disconnect(20)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,150 +0,0 @@
|
|||
package mqtt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
mqttv5auto "github.com/eclipse/paho.golang/autopaho"
|
||||
mqttv5 "github.com/eclipse/paho.golang/paho"
|
||||
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
)
|
||||
|
||||
// mqtt v5-specific publish properties.
|
||||
// See https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901109
|
||||
type mqttv5PublishProperties struct {
|
||||
ContentType string `toml:"content_type"`
|
||||
ResponseTopic string `toml:"response_topic"`
|
||||
MessageExpiry config.Duration `toml:"message_expiry"`
|
||||
TopicAlias *uint16 `toml:"topic_alias"`
|
||||
UserProperties map[string]string `toml:"user_properties"`
|
||||
}
|
||||
|
||||
type mqttv5Client struct {
|
||||
*MQTT
|
||||
client *mqttv5auto.ConnectionManager
|
||||
publishProperties *mqttv5.PublishProperties
|
||||
}
|
||||
|
||||
func newMQTTv5Client(cfg *MQTT) *mqttv5Client {
|
||||
return &mqttv5Client{
|
||||
MQTT: cfg,
|
||||
publishProperties: buildPublishProperties(cfg),
|
||||
}
|
||||
}
|
||||
|
||||
// Build the v5 specific publish properties if they are present in the
|
||||
// config.
|
||||
// These should not change during the lifecycle of the client.
|
||||
func buildPublishProperties(cfg *MQTT) *mqttv5.PublishProperties {
|
||||
if cfg.V5PublishProperties == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
publishProperties := &mqttv5.PublishProperties{
|
||||
ContentType: cfg.V5PublishProperties.ContentType,
|
||||
ResponseTopic: cfg.V5PublishProperties.ResponseTopic,
|
||||
TopicAlias: cfg.V5PublishProperties.TopicAlias,
|
||||
User: make([]mqttv5.UserProperty, 0, len(cfg.V5PublishProperties.UserProperties)),
|
||||
}
|
||||
|
||||
messageExpiry := time.Duration(cfg.V5PublishProperties.MessageExpiry)
|
||||
if expirySeconds := uint32(messageExpiry.Seconds()); expirySeconds > 0 {
|
||||
publishProperties.MessageExpiry = &expirySeconds
|
||||
}
|
||||
|
||||
for k, v := range cfg.V5PublishProperties.UserProperties {
|
||||
publishProperties.User.Add(k, v)
|
||||
}
|
||||
|
||||
return publishProperties
|
||||
}
|
||||
|
||||
func (m *mqttv5Client) Connect() error {
|
||||
opts := mqttv5auto.ClientConfig{}
|
||||
if m.ClientID != "" {
|
||||
opts.ClientID = m.ClientID
|
||||
} else {
|
||||
randomString, err := internal.RandomString(5)
|
||||
if err != nil {
|
||||
return fmt.Errorf("generating random string for client ID failed: %w", err)
|
||||
}
|
||||
opts.ClientID = "Telegraf-Output-" + randomString
|
||||
}
|
||||
|
||||
user, err := m.Username.Get()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting username failed: %w", err)
|
||||
}
|
||||
pass, err := m.Password.Get()
|
||||
if err != nil {
|
||||
config.ReleaseSecret(user)
|
||||
return fmt.Errorf("getting password failed: %w", err)
|
||||
}
|
||||
opts.SetUsernamePassword(string(user), pass)
|
||||
config.ReleaseSecret(user)
|
||||
config.ReleaseSecret(pass)
|
||||
|
||||
tlsCfg, err := m.ClientConfig.TLSConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if tlsCfg != nil {
|
||||
opts.TlsCfg = tlsCfg
|
||||
}
|
||||
|
||||
if len(m.Servers) == 0 {
|
||||
return fmt.Errorf("could not get host informations")
|
||||
}
|
||||
|
||||
brokers := make([]*url.URL, 0)
|
||||
servers, err := parseServers(m.Servers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, server := range servers {
|
||||
if tlsCfg != nil {
|
||||
server.Scheme = "tls"
|
||||
}
|
||||
brokers = append(brokers, server)
|
||||
m.MQTT.Log.Debugf("registered mqtt broker: %s", server.String())
|
||||
}
|
||||
opts.BrokerUrls = brokers
|
||||
|
||||
opts.KeepAlive = uint16(m.KeepAlive)
|
||||
if m.Timeout < config.Duration(time.Second) {
|
||||
m.Timeout = config.Duration(5 * time.Second)
|
||||
}
|
||||
|
||||
client, err := mqttv5auto.NewConnection(context.Background(), opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.client = client
|
||||
return client.AwaitConnection(context.Background())
|
||||
}
|
||||
|
||||
func (m *mqttv5Client) Publish(topic string, body []byte) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(m.Timeout))
|
||||
defer cancel()
|
||||
_, err := m.client.Publish(ctx, &mqttv5.Publish{
|
||||
Topic: topic,
|
||||
QoS: byte(m.QoS),
|
||||
Retain: m.Retain,
|
||||
Payload: body,
|
||||
Properties: m.publishProperties,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mqttv5Client) Close() error {
|
||||
return m.client.Disconnect(context.Background())
|
||||
}
|
||||
Loading…
Reference in New Issue