feat(inputs.mqtt_consumer): Implement startup error behaviors (#15486)

This commit is contained in:
Sven Rebhan 2024-06-17 13:21:42 -04:00 committed by GitHub
parent 53ae9841d0
commit 986b3856a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 473 additions and 53 deletions

View File

@ -23,6 +23,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins [CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
## Startup error behavior options <!-- @/docs/includes/startup_error_behavior.md -->
In addition to the plugin-specific and global configuration settings the plugin
supports options for specifying the behavior when experiencing startup errors
using the `startup_error_behavior` setting. Available values are:
- `error`: Telegraf with stop and exit in case of startup errors. This is the
default behavior.
- `ignore`: Telegraf will ignore startup errors for this plugin and disables it
but continues processing for all other plugins.
- `retry`: Telegraf will try to startup the plugin in every gather or write
cycle in case of startup errors. The plugin is disabled until
the startup succeeds.
## Secret-store support ## Secret-store support
This plugin supports secrets from secret-stores for the `username` and This plugin supports secrets from secret-stores for the `username` and
@ -67,6 +81,13 @@ to use them.
## Connection timeout for initial connection in seconds ## Connection timeout for initial connection in seconds
# connection_timeout = "30s" # connection_timeout = "30s"
## Interval and ping timeout for keep-alive messages
## The sum of those options defines when a connection loss is detected.
## Note: The keep-alive interval needs to be greater or equal one second and
## fractions of a second are not supported.
# keepalive = "60s"
# ping_timeout = "10s"
## Max undelivered messages ## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to ## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data ## outputs before acknowledging them to the original broker to ensure data

View File

@ -12,6 +12,7 @@ import (
"time" "time"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/eclipse/paho.mqtt.golang/packets"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
@ -32,21 +33,15 @@ var (
defaultMaxUndeliveredMessages = 1000 defaultMaxUndeliveredMessages = 1000
) )
type ConnectionState int
type empty struct{} type empty struct{}
type semaphore chan empty type semaphore chan empty
const (
Disconnected ConnectionState = iota
Connecting
Connected
)
type Client interface { type Client interface {
Connect() mqtt.Token Connect() mqtt.Token
SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token
AddRoute(topic string, callback mqtt.MessageHandler) AddRoute(topic string, callback mqtt.MessageHandler)
Disconnect(quiesce uint) Disconnect(quiesce uint)
IsConnected() bool
} }
type ClientFactory func(o *mqtt.ClientOptions) Client type ClientFactory func(o *mqtt.ClientOptions) Client
@ -73,6 +68,8 @@ type MQTTConsumer struct {
Password config.Secret `toml:"password"` Password config.Secret `toml:"password"`
QoS int `toml:"qos"` QoS int `toml:"qos"`
ConnectionTimeout config.Duration `toml:"connection_timeout"` ConnectionTimeout config.Duration `toml:"connection_timeout"`
KeepAliveInterval config.Duration `toml:"keepalive"`
PingTimeout config.Duration `toml:"ping_timeout"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
PersistentSession bool `toml:"persistent_session"` PersistentSession bool `toml:"persistent_session"`
ClientID string `toml:"client_id"` ClientID string `toml:"client_id"`
@ -84,7 +81,6 @@ type MQTTConsumer struct {
client Client client Client
opts *mqtt.ClientOptions opts *mqtt.ClientOptions
acc telegraf.TrackingAccumulator acc telegraf.TrackingAccumulator
state ConnectionState
sem semaphore sem semaphore
messages map[telegraf.TrackingID]mqtt.Message messages map[telegraf.TrackingID]mqtt.Message
messagesMutex sync.Mutex messagesMutex sync.Mutex
@ -104,7 +100,6 @@ func (m *MQTTConsumer) SetParser(parser telegraf.Parser) {
m.parser = parser m.parser = parser
} }
func (m *MQTTConsumer) Init() error { func (m *MQTTConsumer) Init() error {
m.state = Disconnected
if m.PersistentSession && m.ClientID == "" { if m.PersistentSession && m.ClientID == "" {
return errors.New("persistent_session requires client_id") return errors.New("persistent_session requires client_id")
} }
@ -155,7 +150,6 @@ func (m *MQTTConsumer) Init() error {
return nil return nil
} }
func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.state = Disconnected
m.acc = acc.WithTracking(m.MaxUndeliveredMessages) m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.sem = make(semaphore, m.MaxUndeliveredMessages) m.sem = make(semaphore, m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background()) m.ctx, m.cancel = context.WithCancel(context.Background())
@ -176,7 +170,6 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
return m.connect() return m.connect()
} }
func (m *MQTTConsumer) connect() error { func (m *MQTTConsumer) connect() error {
m.state = Connecting
m.client = m.clientFactory(m.opts) m.client = m.clientFactory(m.opts)
// AddRoute sets up the function for handling messages. These need to be // AddRoute sets up the function for handling messages. These need to be
// added in case we find a persistent session containing subscriptions so we // added in case we find a persistent session containing subscriptions so we
@ -187,12 +180,22 @@ func (m *MQTTConsumer) connect() error {
} }
token := m.client.Connect() token := m.client.Connect()
if token.Wait() && token.Error() != nil { if token.Wait() && token.Error() != nil {
err := token.Error() if ct, ok := token.(*mqtt.ConnectToken); ok && ct.ReturnCode() == packets.ErrNetworkError {
m.state = Disconnected // Network errors might be retryable, stop the metric-tracking
return err // goroutine and return a retryable error.
if m.cancel != nil {
m.cancel()
m.cancel = nil
}
return &internal.StartupError{
Err: token.Error(),
Retry: true,
}
}
return token.Error()
} }
m.Log.Infof("Connected %v", m.Servers) m.Log.Infof("Connected %v", m.Servers)
m.state = Connected
// Persistent sessions should skip subscription if a session is present, as // Persistent sessions should skip subscription if a session is present, as
// the subscriptions are stored by the server. // the subscriptions are stored by the server.
type sessionPresent interface { type sessionPresent interface {
@ -218,7 +221,6 @@ func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) {
m.client.Disconnect(5) m.client.Disconnect(5)
m.acc.AddError(fmt.Errorf("connection lost: %w", err)) m.acc.AddError(fmt.Errorf("connection lost: %w", err))
m.Log.Debugf("Disconnected %v", m.Servers) m.Log.Debugf("Disconnected %v", m.Servers)
m.state = Disconnected
} }
// compareTopics is used to support the mqtt wild card `+` which allows for one topic of any value // compareTopics is used to support the mqtt wild card `+` which allows for one topic of any value
@ -321,16 +323,17 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) {
m.messagesMutex.Unlock() m.messagesMutex.Unlock()
} }
func (m *MQTTConsumer) Stop() { func (m *MQTTConsumer) Stop() {
if m.state == Connected { if m.client.IsConnected() {
m.Log.Debugf("Disconnecting %v", m.Servers) m.Log.Debugf("Disconnecting %v", m.Servers)
m.client.Disconnect(200) m.client.Disconnect(200)
m.Log.Debugf("Disconnected %v", m.Servers) m.Log.Debugf("Disconnected %v", m.Servers)
m.state = Disconnected
} }
m.cancel() if m.cancel != nil {
m.cancel()
}
} }
func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error { func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error {
if m.state == Disconnected { if !m.client.IsConnected() {
m.Log.Debugf("Connecting %v", m.Servers) m.Log.Debugf("Connecting %v", m.Servers)
return m.connect() return m.connect()
} }
@ -388,7 +391,8 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts.AddBroker(server) opts.AddBroker(server)
} }
opts.SetAutoReconnect(false) opts.SetAutoReconnect(false)
opts.SetKeepAlive(time.Second * 60) opts.SetKeepAlive(time.Duration(m.KeepAliveInterval))
opts.SetPingTimeout(time.Duration(m.PingTimeout))
opts.SetCleanSession(!m.PersistentSession) opts.SetCleanSession(!m.PersistentSession)
opts.SetAutoAckDisabled(m.PersistentSession) opts.SetAutoAckDisabled(m.PersistentSession)
opts.SetConnectionLostHandler(m.onConnectionLost) opts.SetConnectionLostHandler(m.onConnectionLost)
@ -449,10 +453,11 @@ func typeConvert(types map[string]string, topicValue string, key string) (interf
func New(factory ClientFactory) *MQTTConsumer { func New(factory ClientFactory) *MQTTConsumer {
return &MQTTConsumer{ return &MQTTConsumer{
Servers: []string{"tcp://127.0.0.1:1883"}, Servers: []string{"tcp://127.0.0.1:1883"},
ConnectionTimeout: defaultConnectionTimeout,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages, MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ConnectionTimeout: defaultConnectionTimeout,
KeepAliveInterval: config.Duration(60 * time.Second),
PingTimeout: config.Duration(10 * time.Second),
clientFactory: factory, clientFactory: factory,
state: Disconnected,
} }
} }
func init() { func init() {

View File

@ -2,13 +2,19 @@ package mqtt_consumer
import ( import (
"errors" "errors"
"fmt"
"path/filepath"
"testing" "testing"
"time" "time"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -23,11 +29,15 @@ type FakeClient struct {
subscribeCallCount int subscribeCallCount int
addRouteCallCount int addRouteCallCount int
disconnectCallCount int disconnectCallCount int
connected bool
} }
func (c *FakeClient) Connect() mqtt.Token { func (c *FakeClient) Connect() mqtt.Token {
c.connectCallCount++ c.connectCallCount++
return c.ConnectF() token := c.ConnectF()
c.connected = token.Error() == nil
return token
} }
func (c *FakeClient) SubscribeMultiple(map[string]byte, mqtt.MessageHandler) mqtt.Token { func (c *FakeClient) SubscribeMultiple(map[string]byte, mqtt.MessageHandler) mqtt.Token {
@ -43,11 +53,15 @@ func (c *FakeClient) AddRoute(_ string, callback mqtt.MessageHandler) {
func (c *FakeClient) Disconnect(uint) { func (c *FakeClient) Disconnect(uint) {
c.disconnectCallCount++ c.disconnectCallCount++
c.DisconnectF() c.DisconnectF()
c.connected = false
} }
type FakeParser struct { func (c *FakeClient) IsConnected() bool {
return c.connected
} }
type FakeParser struct{}
// FakeParser satisfies telegraf.Parser // FakeParser satisfies telegraf.Parser
var _ telegraf.Parser = &FakeParser{} var _ telegraf.Parser = &FakeParser{}
@ -115,15 +129,9 @@ func TestLifecycleSanity(t *testing.T) {
parser := &FakeParser{} parser := &FakeParser{}
plugin.SetParser(parser) plugin.SetParser(parser)
err := plugin.Init() require.NoError(t, plugin.Init())
require.NoError(t, err) require.NoError(t, plugin.Start(&acc))
require.NoError(t, plugin.Gather(&acc))
err = plugin.Start(&acc)
require.NoError(t, err)
err = plugin.Gather(&acc)
require.NoError(t, err)
plugin.Stop() plugin.Stop()
} }
@ -477,15 +485,13 @@ func TestTopicTag(t *testing.T) {
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
plugin.SetParser(parser) plugin.SetParser(parser)
err := plugin.Init() require.Equal(t, tt.expectedError, plugin.Init())
require.Equal(t, tt.expectedError, err)
if tt.expectedError != nil { if tt.expectedError != nil {
return return
} }
var acc testutil.Accumulator var acc testutil.Accumulator
err = plugin.Start(&acc) require.NoError(t, plugin.Start(&acc))
require.NoError(t, err)
var m Message var m Message
m.topic = tt.topic m.topic = tt.topic
@ -494,8 +500,7 @@ func TestTopicTag(t *testing.T) {
plugin.Stop() plugin.Stop()
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
testutil.IgnoreTime())
}) })
} }
} }
@ -519,12 +524,10 @@ func TestAddRouteCalledForEachTopic(t *testing.T) {
plugin.Log = testutil.Logger{} plugin.Log = testutil.Logger{}
plugin.Topics = []string{"a", "b"} plugin.Topics = []string{"a", "b"}
err := plugin.Init() require.NoError(t, plugin.Init())
require.NoError(t, err)
var acc testutil.Accumulator var acc testutil.Accumulator
err = plugin.Start(&acc) require.NoError(t, plugin.Start(&acc))
require.NoError(t, err)
plugin.Stop() plugin.Stop()
@ -550,12 +553,10 @@ func TestSubscribeCalledIfNoSession(t *testing.T) {
plugin.Log = testutil.Logger{} plugin.Log = testutil.Logger{}
plugin.Topics = []string{"b"} plugin.Topics = []string{"b"}
err := plugin.Init() require.NoError(t, plugin.Init())
require.NoError(t, err)
var acc testutil.Accumulator var acc testutil.Accumulator
err = plugin.Start(&acc) require.NoError(t, plugin.Start(&acc))
require.NoError(t, err)
plugin.Stop() plugin.Stop()
@ -581,14 +582,395 @@ func TestSubscribeNotCalledIfSession(t *testing.T) {
plugin.Log = testutil.Logger{} plugin.Log = testutil.Logger{}
plugin.Topics = []string{"b"} plugin.Topics = []string{"b"}
err := plugin.Init() require.NoError(t, plugin.Init())
require.NoError(t, err)
var acc testutil.Accumulator var acc testutil.Accumulator
err = plugin.Start(&acc) require.NoError(t, plugin.Start(&acc))
require.NoError(t, err)
plugin.Stop() plugin.Stop()
require.Equal(t, 0, client.subscribeCallCount) require.Equal(t, 0, client.subscribeCallCount)
} }
func TestIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Startup the container
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")
const servicePort = "1883"
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
Files: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ConnectionTimeout: config.Duration(5 * time.Second),
KeepAliveInterval: config.Duration(1 * time.Second),
PingTimeout: config.Duration(100 * time.Millisecond),
Log: testutil.Logger{Name: "mqtt-integration-test"},
clientFactory: factory,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
require.NoError(t, plugin.Init())
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
// Setup a producer to send some metrics to the broker
cfg, err := plugin.createOpts()
require.NoError(t, err)
client := mqtt.NewClient(cfg)
token := client.Connect()
token.Wait()
require.NoError(t, token.Error())
defer client.Disconnect(100)
// Setup the metrics
metrics := []string{
"test,source=A value=0i 1712780301000000000",
"test,source=B value=1i 1712780301000000100",
"test,source=C value=2i 1712780301000000200",
}
expected := make([]telegraf.Metric, 0, len(metrics))
for _, x := range metrics {
metrics, err := parser.Parse([]byte(x))
for i := range metrics {
metrics[i].AddTag("topic", topic)
}
require.NoError(t, err)
expected = append(expected, metrics...)
}
// Write metrics
for _, x := range metrics {
xtoken := client.Publish(topic, byte(plugin.QoS), false, []byte(x))
require.NoError(t, xtoken.Error())
}
// Verify that the metrics were actually written
require.Eventually(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, 3*time.Second, 100*time.Millisecond)
client.Disconnect(100)
plugin.Stop()
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestStartupErrorBehaviorErrorIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Startup the container
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")
const servicePort = "1883"
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
Files: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Pause the container for simulating connectivity issues
require.NoError(t, container.Pause())
defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ConnectionTimeout: config.Duration(5 * time.Second),
KeepAliveInterval: config.Duration(1 * time.Second),
PingTimeout: config.Duration(100 * time.Millisecond),
Log: testutil.Logger{Name: "mqtt-integration-test"},
clientFactory: factory,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Create a model to be able to use the startup retry strategy
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "mqtt_consumer",
Alias: "error-test", // required to get a unique error stats instance
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())
// Starting the plugin will fail with an error because the container is paused.
var acc testutil.Accumulator
require.ErrorContains(t, model.Start(&acc), "network Error")
}
func TestStartupErrorBehaviorIgnoreIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Startup the container
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")
const servicePort = "1883"
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
Files: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Pause the container for simulating connectivity issues
require.NoError(t, container.Pause())
defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ConnectionTimeout: config.Duration(5 * time.Second),
KeepAliveInterval: config.Duration(1 * time.Second),
PingTimeout: config.Duration(100 * time.Millisecond),
Log: testutil.Logger{Name: "mqtt-integration-test"},
clientFactory: factory,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Create a model to be able to use the startup retry strategy
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "mqtt_consumer",
Alias: "ignore-test", // required to get a unique error stats instance
StartupErrorBehavior: "ignore",
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())
// Starting the plugin will fail because the container is paused.
// The model code should convert it to a fatal error for the agent to remove
// the plugin.
var acc testutil.Accumulator
err = model.Start(&acc)
require.ErrorContains(t, err, "network Error")
var fatalErr *internal.FatalError
require.ErrorAs(t, err, &fatalErr)
}
func TestStartupErrorBehaviorRetryIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Startup the container
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")
const servicePort = "1883"
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
Files: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Pause the container for simulating connectivity issues
require.NoError(t, container.Pause())
defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ConnectionTimeout: config.Duration(5 * time.Second),
KeepAliveInterval: config.Duration(1 * time.Second),
PingTimeout: config.Duration(100 * time.Millisecond),
Log: testutil.Logger{Name: "mqtt-integration-test"},
clientFactory: factory,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
// Create a model to be able to use the startup retry strategy
model := models.NewRunningInput(
plugin,
&models.InputConfig{
Name: "mqtt_consumer",
Alias: "retry-test", // required to get a unique error stats instance
StartupErrorBehavior: "retry",
},
)
model.StartupErrors.Set(0)
require.NoError(t, model.Init())
var acc testutil.Accumulator
require.NoError(t, model.Start(&acc))
// There should be no metrics as the plugin is not fully started up yet
require.Empty(t, acc.GetTelegrafMetrics())
require.ErrorIs(t, model.Gather(&acc), internal.ErrNotConnected)
require.Equal(t, int64(2), model.StartupErrors.Get())
// Unpause the container, now writes should succeed
require.NoError(t, container.Resume())
require.NoError(t, model.Gather(&acc))
defer model.Stop()
// Setup a producer to send some metrics to the broker
cfg, err := plugin.createOpts()
require.NoError(t, err)
client := mqtt.NewClient(cfg)
token := client.Connect()
token.Wait()
require.NoError(t, token.Error())
defer client.Disconnect(100)
// Setup the metrics
metrics := []string{
"test,source=A value=0i 1712780301000000000",
"test,source=B value=1i 1712780301000000100",
"test,source=C value=2i 1712780301000000200",
}
expected := make([]telegraf.Metric, 0, len(metrics))
for _, x := range metrics {
metrics, err := parser.Parse([]byte(x))
for i := range metrics {
metrics[i].AddTag("topic", topic)
}
require.NoError(t, err)
expected = append(expected, metrics...)
}
// Write metrics
for _, x := range metrics {
xtoken := client.Publish(topic, byte(plugin.QoS), false, []byte(x))
require.NoError(t, xtoken.Error())
}
// Verify that the metrics were actually written
require.Eventually(t, func() bool {
return acc.NMetrics() >= uint64(len(expected))
}, 3*time.Second, 100*time.Millisecond)
client.Disconnect(100)
plugin.Stop()
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestReconnectIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
// Startup the container
conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf"))
require.NoError(t, err, "missing file mosquitto.conf")
const servicePort = "1883"
container := testutil.Container{
Image: "eclipse-mosquitto:2",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(servicePort),
Files: map[string]string{
"/mosquitto/config/mosquitto.conf": conf,
},
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
// Setup the plugin and connect to the broker
url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])
topic := "/telegraf/test"
factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) }
plugin := &MQTTConsumer{
Servers: []string{url},
Topics: []string{topic},
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ConnectionTimeout: config.Duration(5 * time.Second),
KeepAliveInterval: config.Duration(1 * time.Second),
PingTimeout: config.Duration(100 * time.Millisecond),
Log: testutil.Logger{Name: "mqtt-integration-test"},
clientFactory: factory,
}
parser := &influx.Parser{}
require.NoError(t, parser.Init())
plugin.SetParser(parser)
require.NoError(t, plugin.Init())
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()
// Pause the container for simulating loosing connection
require.NoError(t, container.Pause())
defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway
// Wait until we really lost the connection
require.Eventually(t, func() bool {
return !plugin.client.IsConnected()
}, 5*time.Second, 100*time.Millisecond)
// There should be no metrics as the plugin is not fully started up yet
require.ErrorContains(t, plugin.Gather(&acc), "network Error")
require.False(t, plugin.client.IsConnected())
// Unpause the container, now we should be able to reconnect
require.NoError(t, container.Resume())
require.NoError(t, plugin.Gather(&acc))
require.Eventually(t, func() bool {
return plugin.Gather(&acc) == nil
}, 5*time.Second, 200*time.Millisecond)
}

View File

@ -30,6 +30,13 @@
## Connection timeout for initial connection in seconds ## Connection timeout for initial connection in seconds
# connection_timeout = "30s" # connection_timeout = "30s"
## Interval and ping timeout for keep-alive messages
## The sum of those options defines when a connection loss is detected.
## Note: The keep-alive interval needs to be greater or equal one second and
## fractions of a second are not supported.
# keepalive = "60s"
# ping_timeout = "10s"
## Max undelivered messages ## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to ## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data ## outputs before acknowledging them to the original broker to ensure data

View File

@ -40,6 +40,7 @@ type Accumulator struct {
TimeFunc func() time.Time TimeFunc func() time.Time
trackingMutex sync.Mutex
sync.Mutex sync.Mutex
*sync.Cond *sync.Cond
} }
@ -214,6 +215,8 @@ func (a *Accumulator) AddMetric(m telegraf.Metric) {
} }
func (a *Accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator { func (a *Accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator {
a.trackingMutex.Lock()
defer a.trackingMutex.Unlock()
a.deliverChan = make(chan telegraf.DeliveryInfo, maxTracked) a.deliverChan = make(chan telegraf.DeliveryInfo, maxTracked)
a.delivered = make([]telegraf.DeliveryInfo, 0, maxTracked) a.delivered = make([]telegraf.DeliveryInfo, 0, maxTracked)
return a return a
@ -244,6 +247,8 @@ func (a *Accumulator) onDelivery(info telegraf.DeliveryInfo) {
} }
func (a *Accumulator) Delivered() <-chan telegraf.DeliveryInfo { func (a *Accumulator) Delivered() <-chan telegraf.DeliveryInfo {
a.trackingMutex.Lock()
defer a.trackingMutex.Unlock()
return a.deliverChan return a.deliverChan
} }