fix: add keep alive config option, add documentation around issue with eclipse/mosquitto version combined with this plugin, update test (#9803)
This commit is contained in:
parent
70afc94d12
commit
3990ab5eb9
|
|
@ -40,6 +40,12 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
|
||||||
## When true, messages will have RETAIN flag set.
|
## When true, messages will have RETAIN flag set.
|
||||||
# retain = false
|
# retain = false
|
||||||
|
|
||||||
|
## Defines the maximum length of time that the broker and client may not communicate.
|
||||||
|
## Defaults to 0 which turns the feature off. For version v2.0.12 mosquitto there is a
|
||||||
|
## [bug](https://github.com/eclipse/mosquitto/issues/2117) which requires keep_alive to be set.
|
||||||
|
## As a reference eclipse/paho.mqtt.golang v1.3.0 defaults to 30.
|
||||||
|
# keep_alive = 0
|
||||||
|
|
||||||
## Data format to output.
|
## Data format to output.
|
||||||
# data_format = "influx"
|
# data_format = "influx"
|
||||||
```
|
```
|
||||||
|
|
@ -62,3 +68,4 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
|
||||||
* `batch`: When true, metrics will be sent in one MQTT message per flush. Otherwise, metrics are written one metric per MQTT message.
|
* `batch`: When true, metrics will be sent in one MQTT message per flush. Otherwise, metrics are written one metric per MQTT message.
|
||||||
* `retain`: Set `retain` flag when publishing
|
* `retain`: Set `retain` flag when publishing
|
||||||
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
|
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
|
||||||
|
* `keep_alive`: Defines the maximum length of time that the broker and client may not communicate with each other. Defaults to 0 which deactivates this feature.
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,10 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultKeepAlive = 0
|
||||||
|
)
|
||||||
|
|
||||||
var sampleConfig = `
|
var sampleConfig = `
|
||||||
servers = ["localhost:1883"] # required.
|
servers = ["localhost:1883"] # required.
|
||||||
|
|
||||||
|
|
@ -55,6 +59,12 @@ var sampleConfig = `
|
||||||
## actually reads it
|
## actually reads it
|
||||||
# retain = false
|
# retain = false
|
||||||
|
|
||||||
|
## Defines the maximum length of time that the broker and client may not communicate.
|
||||||
|
## Defaults to 0 which turns the feature off. For version v2.0.12 of eclipse/mosquitto there is a
|
||||||
|
## [bug](https://github.com/eclipse/mosquitto/issues/2117) which requires keep_alive to be set.
|
||||||
|
## As a reference eclipse/paho.mqtt.golang v1.3.0 defaults to 30.
|
||||||
|
# keep_alive = 0
|
||||||
|
|
||||||
## Data format to output.
|
## Data format to output.
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Each data format has its own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
@ -72,8 +82,9 @@ type MQTT struct {
|
||||||
QoS int `toml:"qos"`
|
QoS int `toml:"qos"`
|
||||||
ClientID string `toml:"client_id"`
|
ClientID string `toml:"client_id"`
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
BatchMessage bool `toml:"batch"`
|
BatchMessage bool `toml:"batch"`
|
||||||
Retain bool `toml:"retain"`
|
Retain bool `toml:"retain"`
|
||||||
|
KeepAlive int64 `toml:"keep_alive"`
|
||||||
|
|
||||||
client paho.Client
|
client paho.Client
|
||||||
opts *paho.ClientOptions
|
opts *paho.ClientOptions
|
||||||
|
|
@ -190,7 +201,7 @@ func (m *MQTT) publish(topic string, body []byte) error {
|
||||||
|
|
||||||
func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
|
func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
|
||||||
opts := paho.NewClientOptions()
|
opts := paho.NewClientOptions()
|
||||||
opts.KeepAlive = 0
|
opts.KeepAlive = m.KeepAlive
|
||||||
|
|
||||||
if m.Timeout < config.Duration(time.Second) {
|
if m.Timeout < config.Duration(time.Second) {
|
||||||
m.Timeout = config.Duration(5 * time.Second)
|
m.Timeout = config.Duration(5 * time.Second)
|
||||||
|
|
@ -237,6 +248,8 @@ func (m *MQTT) createOpts() (*paho.ClientOptions, error) {
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
outputs.Add("mqtt", func() telegraf.Output {
|
outputs.Add("mqtt", func() telegraf.Output {
|
||||||
return &MQTT{}
|
return &MQTT{
|
||||||
|
KeepAlive: defaultKeepAlive,
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
|
||||||
m := &MQTT{
|
m := &MQTT{
|
||||||
Servers: []string{url},
|
Servers: []string{url},
|
||||||
serializer: s,
|
serializer: s,
|
||||||
|
KeepAlive: 30,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify that we can connect to the MQTT broker
|
// Verify that we can connect to the MQTT broker
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue