diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md index 3190f216a..e035414cb 100644 --- a/plugins/inputs/mqtt_consumer/README.md +++ b/plugins/inputs/mqtt_consumer/README.md @@ -23,6 +23,20 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. [CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins +## Startup error behavior options + +In addition to the plugin-specific and global configuration settings the plugin +supports options for specifying the behavior when experiencing startup errors +using the `startup_error_behavior` setting. Available values are: + +- `error`: Telegraf with stop and exit in case of startup errors. This is the + default behavior. +- `ignore`: Telegraf will ignore startup errors for this plugin and disables it + but continues processing for all other plugins. +- `retry`: Telegraf will try to startup the plugin in every gather or write + cycle in case of startup errors. The plugin is disabled until + the startup succeeds. + ## Secret-store support This plugin supports secrets from secret-stores for the `username` and @@ -67,6 +81,13 @@ to use them. ## Connection timeout for initial connection in seconds # connection_timeout = "30s" + ## Interval and ping timeout for keep-alive messages + ## The sum of those options defines when a connection loss is detected. + ## Note: The keep-alive interval needs to be greater or equal one second and + ## fractions of a second are not supported. + # keepalive = "60s" + # ping_timeout = "10s" + ## Max undelivered messages ## This plugin uses tracking metrics, which ensure messages are read to ## outputs before acknowledging them to the original broker to ensure data diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 73e72dddb..6ed5d0507 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -12,6 +12,7 @@ import ( "time" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/eclipse/paho.mqtt.golang/packets" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" @@ -32,21 +33,15 @@ var ( defaultMaxUndeliveredMessages = 1000 ) -type ConnectionState int type empty struct{} type semaphore chan empty -const ( - Disconnected ConnectionState = iota - Connecting - Connected -) - type Client interface { Connect() mqtt.Token SubscribeMultiple(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token AddRoute(topic string, callback mqtt.MessageHandler) Disconnect(quiesce uint) + IsConnected() bool } type ClientFactory func(o *mqtt.ClientOptions) Client @@ -73,6 +68,8 @@ type MQTTConsumer struct { Password config.Secret `toml:"password"` QoS int `toml:"qos"` ConnectionTimeout config.Duration `toml:"connection_timeout"` + KeepAliveInterval config.Duration `toml:"keepalive"` + PingTimeout config.Duration `toml:"ping_timeout"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` PersistentSession bool `toml:"persistent_session"` ClientID string `toml:"client_id"` @@ -84,7 +81,6 @@ type MQTTConsumer struct { client Client opts *mqtt.ClientOptions acc telegraf.TrackingAccumulator - state ConnectionState sem semaphore messages map[telegraf.TrackingID]mqtt.Message messagesMutex sync.Mutex @@ -104,7 +100,6 @@ func (m *MQTTConsumer) SetParser(parser telegraf.Parser) { m.parser = parser } func (m *MQTTConsumer) Init() error { - m.state = Disconnected if m.PersistentSession && m.ClientID == "" { return errors.New("persistent_session requires client_id") } @@ -155,7 +150,6 @@ func (m *MQTTConsumer) Init() error { return nil } func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { - m.state = Disconnected m.acc = acc.WithTracking(m.MaxUndeliveredMessages) m.sem = make(semaphore, m.MaxUndeliveredMessages) m.ctx, m.cancel = context.WithCancel(context.Background()) @@ -176,7 +170,6 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { return m.connect() } func (m *MQTTConsumer) connect() error { - m.state = Connecting m.client = m.clientFactory(m.opts) // AddRoute sets up the function for handling messages. These need to be // added in case we find a persistent session containing subscriptions so we @@ -187,12 +180,22 @@ func (m *MQTTConsumer) connect() error { } token := m.client.Connect() if token.Wait() && token.Error() != nil { - err := token.Error() - m.state = Disconnected - return err + if ct, ok := token.(*mqtt.ConnectToken); ok && ct.ReturnCode() == packets.ErrNetworkError { + // Network errors might be retryable, stop the metric-tracking + // goroutine and return a retryable error. + if m.cancel != nil { + m.cancel() + m.cancel = nil + } + return &internal.StartupError{ + Err: token.Error(), + Retry: true, + } + } + return token.Error() } m.Log.Infof("Connected %v", m.Servers) - m.state = Connected + // Persistent sessions should skip subscription if a session is present, as // the subscriptions are stored by the server. type sessionPresent interface { @@ -218,7 +221,6 @@ func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) { m.client.Disconnect(5) m.acc.AddError(fmt.Errorf("connection lost: %w", err)) m.Log.Debugf("Disconnected %v", m.Servers) - m.state = Disconnected } // compareTopics is used to support the mqtt wild card `+` which allows for one topic of any value @@ -321,16 +323,17 @@ func (m *MQTTConsumer) onMessage(_ mqtt.Client, msg mqtt.Message) { m.messagesMutex.Unlock() } func (m *MQTTConsumer) Stop() { - if m.state == Connected { + if m.client.IsConnected() { m.Log.Debugf("Disconnecting %v", m.Servers) m.client.Disconnect(200) m.Log.Debugf("Disconnected %v", m.Servers) - m.state = Disconnected } - m.cancel() + if m.cancel != nil { + m.cancel() + } } func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error { - if m.state == Disconnected { + if !m.client.IsConnected() { m.Log.Debugf("Connecting %v", m.Servers) return m.connect() } @@ -388,7 +391,8 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { opts.AddBroker(server) } opts.SetAutoReconnect(false) - opts.SetKeepAlive(time.Second * 60) + opts.SetKeepAlive(time.Duration(m.KeepAliveInterval)) + opts.SetPingTimeout(time.Duration(m.PingTimeout)) opts.SetCleanSession(!m.PersistentSession) opts.SetAutoAckDisabled(m.PersistentSession) opts.SetConnectionLostHandler(m.onConnectionLost) @@ -449,10 +453,11 @@ func typeConvert(types map[string]string, topicValue string, key string) (interf func New(factory ClientFactory) *MQTTConsumer { return &MQTTConsumer{ Servers: []string{"tcp://127.0.0.1:1883"}, - ConnectionTimeout: defaultConnectionTimeout, MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + ConnectionTimeout: defaultConnectionTimeout, + KeepAliveInterval: config.Duration(60 * time.Second), + PingTimeout: config.Duration(10 * time.Second), clientFactory: factory, - state: Disconnected, } } func init() { diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 8bbca1cc2..8b4665375 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -2,13 +2,19 @@ package mqtt_consumer import ( "errors" + "fmt" + "path/filepath" "testing" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -23,11 +29,15 @@ type FakeClient struct { subscribeCallCount int addRouteCallCount int disconnectCallCount int + + connected bool } func (c *FakeClient) Connect() mqtt.Token { c.connectCallCount++ - return c.ConnectF() + token := c.ConnectF() + c.connected = token.Error() == nil + return token } func (c *FakeClient) SubscribeMultiple(map[string]byte, mqtt.MessageHandler) mqtt.Token { @@ -43,11 +53,15 @@ func (c *FakeClient) AddRoute(_ string, callback mqtt.MessageHandler) { func (c *FakeClient) Disconnect(uint) { c.disconnectCallCount++ c.DisconnectF() + c.connected = false } -type FakeParser struct { +func (c *FakeClient) IsConnected() bool { + return c.connected } +type FakeParser struct{} + // FakeParser satisfies telegraf.Parser var _ telegraf.Parser = &FakeParser{} @@ -115,15 +129,9 @@ func TestLifecycleSanity(t *testing.T) { parser := &FakeParser{} plugin.SetParser(parser) - err := plugin.Init() - require.NoError(t, err) - - err = plugin.Start(&acc) - require.NoError(t, err) - - err = plugin.Gather(&acc) - require.NoError(t, err) - + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Start(&acc)) + require.NoError(t, plugin.Gather(&acc)) plugin.Stop() } @@ -477,15 +485,13 @@ func TestTopicTag(t *testing.T) { require.NoError(t, parser.Init()) plugin.SetParser(parser) - err := plugin.Init() - require.Equal(t, tt.expectedError, err) + require.Equal(t, tt.expectedError, plugin.Init()) if tt.expectedError != nil { return } var acc testutil.Accumulator - err = plugin.Start(&acc) - require.NoError(t, err) + require.NoError(t, plugin.Start(&acc)) var m Message m.topic = tt.topic @@ -494,8 +500,7 @@ func TestTopicTag(t *testing.T) { plugin.Stop() - testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), - testutil.IgnoreTime()) + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) }) } } @@ -519,12 +524,10 @@ func TestAddRouteCalledForEachTopic(t *testing.T) { plugin.Log = testutil.Logger{} plugin.Topics = []string{"a", "b"} - err := plugin.Init() - require.NoError(t, err) + require.NoError(t, plugin.Init()) var acc testutil.Accumulator - err = plugin.Start(&acc) - require.NoError(t, err) + require.NoError(t, plugin.Start(&acc)) plugin.Stop() @@ -550,12 +553,10 @@ func TestSubscribeCalledIfNoSession(t *testing.T) { plugin.Log = testutil.Logger{} plugin.Topics = []string{"b"} - err := plugin.Init() - require.NoError(t, err) + require.NoError(t, plugin.Init()) var acc testutil.Accumulator - err = plugin.Start(&acc) - require.NoError(t, err) + require.NoError(t, plugin.Start(&acc)) plugin.Stop() @@ -581,14 +582,395 @@ func TestSubscribeNotCalledIfSession(t *testing.T) { plugin.Log = testutil.Logger{} plugin.Topics = []string{"b"} - err := plugin.Init() - require.NoError(t, err) + require.NoError(t, plugin.Init()) var acc testutil.Accumulator - err = plugin.Start(&acc) - require.NoError(t, err) - + require.NoError(t, plugin.Start(&acc)) plugin.Stop() require.Equal(t, 0, client.subscribeCallCount) } + +func TestIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Startup the container + conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf")) + require.NoError(t, err, "missing file mosquitto.conf") + + const servicePort = "1883" + container := testutil.Container{ + Image: "eclipse-mosquitto:2", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForListeningPort(servicePort), + Files: map[string]string{ + "/mosquitto/config/mosquitto.conf": conf, + }, + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // Setup the plugin and connect to the broker + url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) + topic := "/telegraf/test" + factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) } + plugin := &MQTTConsumer{ + Servers: []string{url}, + Topics: []string{topic}, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + ConnectionTimeout: config.Duration(5 * time.Second), + KeepAliveInterval: config.Duration(1 * time.Second), + PingTimeout: config.Duration(100 * time.Millisecond), + Log: testutil.Logger{Name: "mqtt-integration-test"}, + clientFactory: factory, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Setup a producer to send some metrics to the broker + cfg, err := plugin.createOpts() + require.NoError(t, err) + client := mqtt.NewClient(cfg) + token := client.Connect() + token.Wait() + require.NoError(t, token.Error()) + defer client.Disconnect(100) + + // Setup the metrics + metrics := []string{ + "test,source=A value=0i 1712780301000000000", + "test,source=B value=1i 1712780301000000100", + "test,source=C value=2i 1712780301000000200", + } + expected := make([]telegraf.Metric, 0, len(metrics)) + for _, x := range metrics { + metrics, err := parser.Parse([]byte(x)) + for i := range metrics { + metrics[i].AddTag("topic", topic) + } + require.NoError(t, err) + expected = append(expected, metrics...) + } + + // Write metrics + for _, x := range metrics { + xtoken := client.Publish(topic, byte(plugin.QoS), false, []byte(x)) + require.NoError(t, xtoken.Error()) + } + + // Verify that the metrics were actually written + require.Eventually(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, 3*time.Second, 100*time.Millisecond) + + client.Disconnect(100) + plugin.Stop() + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +func TestStartupErrorBehaviorErrorIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Startup the container + conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf")) + require.NoError(t, err, "missing file mosquitto.conf") + + const servicePort = "1883" + container := testutil.Container{ + Image: "eclipse-mosquitto:2", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForListeningPort(servicePort), + Files: map[string]string{ + "/mosquitto/config/mosquitto.conf": conf, + }, + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // Pause the container for simulating connectivity issues + require.NoError(t, container.Pause()) + defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway + + // Setup the plugin and connect to the broker + url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) + topic := "/telegraf/test" + factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) } + plugin := &MQTTConsumer{ + Servers: []string{url}, + Topics: []string{topic}, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + ConnectionTimeout: config.Duration(5 * time.Second), + KeepAliveInterval: config.Duration(1 * time.Second), + PingTimeout: config.Duration(100 * time.Millisecond), + Log: testutil.Logger{Name: "mqtt-integration-test"}, + clientFactory: factory, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Create a model to be able to use the startup retry strategy + model := models.NewRunningInput( + plugin, + &models.InputConfig{ + Name: "mqtt_consumer", + Alias: "error-test", // required to get a unique error stats instance + }, + ) + model.StartupErrors.Set(0) + require.NoError(t, model.Init()) + + // Starting the plugin will fail with an error because the container is paused. + var acc testutil.Accumulator + require.ErrorContains(t, model.Start(&acc), "network Error") +} + +func TestStartupErrorBehaviorIgnoreIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Startup the container + conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf")) + require.NoError(t, err, "missing file mosquitto.conf") + + const servicePort = "1883" + container := testutil.Container{ + Image: "eclipse-mosquitto:2", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForListeningPort(servicePort), + Files: map[string]string{ + "/mosquitto/config/mosquitto.conf": conf, + }, + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // Pause the container for simulating connectivity issues + require.NoError(t, container.Pause()) + defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway + + // Setup the plugin and connect to the broker + url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) + topic := "/telegraf/test" + factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) } + plugin := &MQTTConsumer{ + Servers: []string{url}, + Topics: []string{topic}, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + ConnectionTimeout: config.Duration(5 * time.Second), + KeepAliveInterval: config.Duration(1 * time.Second), + PingTimeout: config.Duration(100 * time.Millisecond), + Log: testutil.Logger{Name: "mqtt-integration-test"}, + clientFactory: factory, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Create a model to be able to use the startup retry strategy + model := models.NewRunningInput( + plugin, + &models.InputConfig{ + Name: "mqtt_consumer", + Alias: "ignore-test", // required to get a unique error stats instance + StartupErrorBehavior: "ignore", + }, + ) + model.StartupErrors.Set(0) + require.NoError(t, model.Init()) + + // Starting the plugin will fail because the container is paused. + // The model code should convert it to a fatal error for the agent to remove + // the plugin. + var acc testutil.Accumulator + err = model.Start(&acc) + require.ErrorContains(t, err, "network Error") + var fatalErr *internal.FatalError + require.ErrorAs(t, err, &fatalErr) +} + +func TestStartupErrorBehaviorRetryIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Startup the container + conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf")) + require.NoError(t, err, "missing file mosquitto.conf") + + const servicePort = "1883" + container := testutil.Container{ + Image: "eclipse-mosquitto:2", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForListeningPort(servicePort), + Files: map[string]string{ + "/mosquitto/config/mosquitto.conf": conf, + }, + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // Pause the container for simulating connectivity issues + require.NoError(t, container.Pause()) + defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway + + // Setup the plugin and connect to the broker + url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) + topic := "/telegraf/test" + factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) } + plugin := &MQTTConsumer{ + Servers: []string{url}, + Topics: []string{topic}, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + ConnectionTimeout: config.Duration(5 * time.Second), + KeepAliveInterval: config.Duration(1 * time.Second), + PingTimeout: config.Duration(100 * time.Millisecond), + Log: testutil.Logger{Name: "mqtt-integration-test"}, + clientFactory: factory, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Create a model to be able to use the startup retry strategy + model := models.NewRunningInput( + plugin, + &models.InputConfig{ + Name: "mqtt_consumer", + Alias: "retry-test", // required to get a unique error stats instance + StartupErrorBehavior: "retry", + }, + ) + model.StartupErrors.Set(0) + require.NoError(t, model.Init()) + + var acc testutil.Accumulator + require.NoError(t, model.Start(&acc)) + + // There should be no metrics as the plugin is not fully started up yet + require.Empty(t, acc.GetTelegrafMetrics()) + require.ErrorIs(t, model.Gather(&acc), internal.ErrNotConnected) + require.Equal(t, int64(2), model.StartupErrors.Get()) + + // Unpause the container, now writes should succeed + require.NoError(t, container.Resume()) + require.NoError(t, model.Gather(&acc)) + defer model.Stop() + + // Setup a producer to send some metrics to the broker + cfg, err := plugin.createOpts() + require.NoError(t, err) + client := mqtt.NewClient(cfg) + token := client.Connect() + token.Wait() + require.NoError(t, token.Error()) + defer client.Disconnect(100) + + // Setup the metrics + metrics := []string{ + "test,source=A value=0i 1712780301000000000", + "test,source=B value=1i 1712780301000000100", + "test,source=C value=2i 1712780301000000200", + } + expected := make([]telegraf.Metric, 0, len(metrics)) + for _, x := range metrics { + metrics, err := parser.Parse([]byte(x)) + for i := range metrics { + metrics[i].AddTag("topic", topic) + } + require.NoError(t, err) + expected = append(expected, metrics...) + } + + // Write metrics + for _, x := range metrics { + xtoken := client.Publish(topic, byte(plugin.QoS), false, []byte(x)) + require.NoError(t, xtoken.Error()) + } + + // Verify that the metrics were actually written + require.Eventually(t, func() bool { + return acc.NMetrics() >= uint64(len(expected)) + }, 3*time.Second, 100*time.Millisecond) + + client.Disconnect(100) + plugin.Stop() + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +func TestReconnectIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Startup the container + conf, err := filepath.Abs(filepath.Join("testdata", "mosquitto.conf")) + require.NoError(t, err, "missing file mosquitto.conf") + + const servicePort = "1883" + container := testutil.Container{ + Image: "eclipse-mosquitto:2", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForListeningPort(servicePort), + Files: map[string]string{ + "/mosquitto/config/mosquitto.conf": conf, + }, + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + // Setup the plugin and connect to the broker + url := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort]) + topic := "/telegraf/test" + factory := func(o *mqtt.ClientOptions) Client { return mqtt.NewClient(o) } + plugin := &MQTTConsumer{ + Servers: []string{url}, + Topics: []string{topic}, + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + ConnectionTimeout: config.Duration(5 * time.Second), + KeepAliveInterval: config.Duration(1 * time.Second), + PingTimeout: config.Duration(100 * time.Millisecond), + Log: testutil.Logger{Name: "mqtt-integration-test"}, + clientFactory: factory, + } + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Pause the container for simulating loosing connection + require.NoError(t, container.Pause()) + defer container.Resume() //nolint:errcheck // Ignore the returned error as we cannot do anything about it anyway + + // Wait until we really lost the connection + require.Eventually(t, func() bool { + return !plugin.client.IsConnected() + }, 5*time.Second, 100*time.Millisecond) + + // There should be no metrics as the plugin is not fully started up yet + require.ErrorContains(t, plugin.Gather(&acc), "network Error") + require.False(t, plugin.client.IsConnected()) + + // Unpause the container, now we should be able to reconnect + require.NoError(t, container.Resume()) + require.NoError(t, plugin.Gather(&acc)) + + require.Eventually(t, func() bool { + return plugin.Gather(&acc) == nil + }, 5*time.Second, 200*time.Millisecond) +} diff --git a/plugins/inputs/mqtt_consumer/sample.conf b/plugins/inputs/mqtt_consumer/sample.conf index 1583d0ac7..d1f792893 100644 --- a/plugins/inputs/mqtt_consumer/sample.conf +++ b/plugins/inputs/mqtt_consumer/sample.conf @@ -30,6 +30,13 @@ ## Connection timeout for initial connection in seconds # connection_timeout = "30s" + ## Interval and ping timeout for keep-alive messages + ## The sum of those options defines when a connection loss is detected. + ## Note: The keep-alive interval needs to be greater or equal one second and + ## fractions of a second are not supported. + # keepalive = "60s" + # ping_timeout = "10s" + ## Max undelivered messages ## This plugin uses tracking metrics, which ensure messages are read to ## outputs before acknowledging them to the original broker to ensure data diff --git a/testutil/accumulator.go b/testutil/accumulator.go index be1f92b3c..5360e57a8 100644 --- a/testutil/accumulator.go +++ b/testutil/accumulator.go @@ -40,6 +40,7 @@ type Accumulator struct { TimeFunc func() time.Time + trackingMutex sync.Mutex sync.Mutex *sync.Cond } @@ -214,6 +215,8 @@ func (a *Accumulator) AddMetric(m telegraf.Metric) { } func (a *Accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator { + a.trackingMutex.Lock() + defer a.trackingMutex.Unlock() a.deliverChan = make(chan telegraf.DeliveryInfo, maxTracked) a.delivered = make([]telegraf.DeliveryInfo, 0, maxTracked) return a @@ -244,6 +247,8 @@ func (a *Accumulator) onDelivery(info telegraf.DeliveryInfo) { } func (a *Accumulator) Delivered() <-chan telegraf.DeliveryInfo { + a.trackingMutex.Lock() + defer a.trackingMutex.Unlock() return a.deliverChan }