From 3990ab5eb9047c99b03a40afd3f02a90e7aabdb2 Mon Sep 17 00:00:00 2001 From: Helen Weller <38860767+helenosheaa@users.noreply.github.com> Date: Fri, 1 Oct 2021 11:10:30 -0400 Subject: [PATCH] fix: add keep alive config option, add documentation around issue with eclipse/mosquitto version combined with this plugin, update test (#9803) --- plugins/outputs/mqtt/README.md | 7 +++++++ plugins/outputs/mqtt/mqtt.go | 21 +++++++++++++++++---- plugins/outputs/mqtt/mqtt_test.go | 1 + 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/plugins/outputs/mqtt/README.md b/plugins/outputs/mqtt/README.md index abb770f06..f82d7597c 100644 --- a/plugins/outputs/mqtt/README.md +++ b/plugins/outputs/mqtt/README.md @@ -40,6 +40,12 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt ## When true, messages will have RETAIN flag set. # retain = false + ## Defines the maximum length of time that the broker and client may not communicate. + ## Defaults to 0 which turns the feature off. For version v2.0.12 mosquitto there is a + ## [bug](https://github.com/eclipse/mosquitto/issues/2117) which requires keep_alive to be set. + ## As a reference eclipse/paho.mqtt.golang v1.3.0 defaults to 30. + # keep_alive = 0 + ## Data format to output. # data_format = "influx" ``` @@ -62,3 +68,4 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt * `batch`: When true, metrics will be sent in one MQTT message per flush. Otherwise, metrics are written one metric per MQTT message. * `retain`: Set `retain` flag when publishing * `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md) +* `keep_alive`: Defines the maximum length of time that the broker and client may not communicate with each other. Defaults to 0 which deactivates this feature. diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 584a79ffd..54203ee0d 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -16,6 +16,10 @@ import ( "github.com/influxdata/telegraf/plugins/serializers" ) +const ( + defaultKeepAlive = 0 +) + var sampleConfig = ` servers = ["localhost:1883"] # required. @@ -55,6 +59,12 @@ var sampleConfig = ` ## actually reads it # retain = false + ## Defines the maximum length of time that the broker and client may not communicate. + ## Defaults to 0 which turns the feature off. For version v2.0.12 of eclipse/mosquitto there is a + ## [bug](https://github.com/eclipse/mosquitto/issues/2117) which requires keep_alive to be set. + ## As a reference eclipse/paho.mqtt.golang v1.3.0 defaults to 30. + # keep_alive = 0 + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -72,8 +82,9 @@ type MQTT struct { QoS int `toml:"qos"` ClientID string `toml:"client_id"` tls.ClientConfig - BatchMessage bool `toml:"batch"` - Retain bool `toml:"retain"` + BatchMessage bool `toml:"batch"` + Retain bool `toml:"retain"` + KeepAlive int64 `toml:"keep_alive"` client paho.Client opts *paho.ClientOptions @@ -190,7 +201,7 @@ func (m *MQTT) publish(topic string, body []byte) error { func (m *MQTT) createOpts() (*paho.ClientOptions, error) { opts := paho.NewClientOptions() - opts.KeepAlive = 0 + opts.KeepAlive = m.KeepAlive if m.Timeout < config.Duration(time.Second) { m.Timeout = config.Duration(5 * time.Second) @@ -237,6 +248,8 @@ func (m *MQTT) createOpts() (*paho.ClientOptions, error) { func init() { outputs.Add("mqtt", func() telegraf.Output { - return &MQTT{} + return &MQTT{ + KeepAlive: defaultKeepAlive, + } }) } diff --git a/plugins/outputs/mqtt/mqtt_test.go b/plugins/outputs/mqtt/mqtt_test.go index 8affce1c9..fd36d6d05 100644 --- a/plugins/outputs/mqtt/mqtt_test.go +++ b/plugins/outputs/mqtt/mqtt_test.go @@ -19,6 +19,7 @@ func TestConnectAndWriteIntegration(t *testing.T) { m := &MQTT{ Servers: []string{url}, serializer: s, + KeepAlive: 30, } // Verify that we can connect to the MQTT broker