feat(outputs.mqtt): Add client trace logging, resolve MQTT5 reconnect login (#15429)
This commit is contained in:
parent
079c9d285a
commit
f0c72586cc
|
|
@ -35,6 +35,7 @@ type MqttConfig struct {
|
||||||
KeepAlive int64 `toml:"keep_alive"`
|
KeepAlive int64 `toml:"keep_alive"`
|
||||||
PersistentSession bool `toml:"persistent_session"`
|
PersistentSession bool `toml:"persistent_session"`
|
||||||
PublishPropertiesV5 *PublishProperties `toml:"v5"`
|
PublishPropertiesV5 *PublishProperties `toml:"v5"`
|
||||||
|
ClientTrace bool `toml:"client_trace"`
|
||||||
|
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package mqtt_consumer
|
package mqtt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1
|
mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mqttv311Client struct {
|
type mqttv311Client struct {
|
||||||
|
|
@ -77,6 +78,14 @@ func NewMQTTv311Client(cfg *MqttConfig) (*mqttv311Client, error) {
|
||||||
opts.AddBroker(broker)
|
opts.AddBroker(broker)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.ClientTrace {
|
||||||
|
log := &mqttLogger{logger.NewLogger("paho", "", "")}
|
||||||
|
mqttv3.ERROR = log
|
||||||
|
mqttv3.CRITICAL = log
|
||||||
|
mqttv3.WARN = log
|
||||||
|
mqttv3.DEBUG = log
|
||||||
|
}
|
||||||
|
|
||||||
return &mqttv311Client{
|
return &mqttv311Client{
|
||||||
client: mqttv3.NewClient(opts),
|
client: mqttv3.NewClient(opts),
|
||||||
timeout: time.Duration(cfg.Timeout),
|
timeout: time.Duration(cfg.Timeout),
|
||||||
|
|
|
||||||
|
|
@ -12,17 +12,19 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mqttv5Client struct {
|
type mqttv5Client struct {
|
||||||
client *mqttv5auto.ConnectionManager
|
client *mqttv5auto.ConnectionManager
|
||||||
options mqttv5auto.ClientConfig
|
options mqttv5auto.ClientConfig
|
||||||
username config.Secret
|
username config.Secret
|
||||||
password config.Secret
|
password config.Secret
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
qos int
|
qos int
|
||||||
retain bool
|
retain bool
|
||||||
properties *mqttv5.PublishProperties
|
clientTrace bool
|
||||||
|
properties *mqttv5.PublishProperties
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) {
|
func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) {
|
||||||
|
|
@ -94,13 +96,14 @@ func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &mqttv5Client{
|
return &mqttv5Client{
|
||||||
options: opts,
|
options: opts,
|
||||||
timeout: time.Duration(cfg.Timeout),
|
timeout: time.Duration(cfg.Timeout),
|
||||||
username: cfg.Username,
|
username: cfg.Username,
|
||||||
password: cfg.Password,
|
password: cfg.Password,
|
||||||
qos: cfg.QoS,
|
qos: cfg.QoS,
|
||||||
retain: cfg.Retain,
|
retain: cfg.Retain,
|
||||||
properties: properties,
|
properties: properties,
|
||||||
|
clientTrace: cfg.ClientTrace,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -115,8 +118,14 @@ func (m *mqttv5Client) Connect() (bool, error) {
|
||||||
return false, fmt.Errorf("getting password failed: %w", err)
|
return false, fmt.Errorf("getting password failed: %w", err)
|
||||||
}
|
}
|
||||||
defer pass.Destroy()
|
defer pass.Destroy()
|
||||||
m.options.ConnectUsername = user.TemporaryString()
|
m.options.ConnectUsername = user.String()
|
||||||
m.options.ConnectPassword = pass.Bytes()
|
m.options.ConnectPassword = []byte(pass.String())
|
||||||
|
|
||||||
|
if m.clientTrace {
|
||||||
|
log := mqttLogger{logger.NewLogger("paho", "", "")}
|
||||||
|
m.options.Debug = log
|
||||||
|
m.options.Errors = log
|
||||||
|
}
|
||||||
|
|
||||||
client, err := mqttv5auto.NewConnection(context.Background(), m.options)
|
client, err := mqttv5auto.NewConnection(context.Background(), m.options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -73,7 +73,6 @@ type MQTTConsumer struct {
|
||||||
Password config.Secret `toml:"password"`
|
Password config.Secret `toml:"password"`
|
||||||
QoS int `toml:"qos"`
|
QoS int `toml:"qos"`
|
||||||
ConnectionTimeout config.Duration `toml:"connection_timeout"`
|
ConnectionTimeout config.Duration `toml:"connection_timeout"`
|
||||||
ClientTrace bool `toml:"client_trace"`
|
|
||||||
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
|
||||||
PersistentSession bool `toml:"persistent_session"`
|
PersistentSession bool `toml:"persistent_session"`
|
||||||
ClientID string `toml:"client_id"`
|
ClientID string `toml:"client_id"`
|
||||||
|
|
@ -105,14 +104,6 @@ func (m *MQTTConsumer) SetParser(parser telegraf.Parser) {
|
||||||
m.parser = parser
|
m.parser = parser
|
||||||
}
|
}
|
||||||
func (m *MQTTConsumer) Init() error {
|
func (m *MQTTConsumer) Init() error {
|
||||||
if m.ClientTrace {
|
|
||||||
log := &mqttLogger{m.Log}
|
|
||||||
mqtt.ERROR = log
|
|
||||||
mqtt.CRITICAL = log
|
|
||||||
mqtt.WARN = log
|
|
||||||
mqtt.DEBUG = log
|
|
||||||
}
|
|
||||||
|
|
||||||
m.state = Disconnected
|
m.state = Disconnected
|
||||||
if m.PersistentSession && m.ClientID == "" {
|
if m.PersistentSession && m.ClientID == "" {
|
||||||
return errors.New("persistent_session requires client_id")
|
return errors.New("persistent_session requires client_id")
|
||||||
|
|
|
||||||
|
|
@ -102,6 +102,12 @@ to use them.
|
||||||
## actually reads it
|
## actually reads it
|
||||||
# retain = false
|
# retain = false
|
||||||
|
|
||||||
|
## Client trace messages
|
||||||
|
## When set to true, and debug mode enabled in the agent settings, the MQTT
|
||||||
|
## client's messages are included in telegraf logs. These messages are very
|
||||||
|
## noisey, but essential for debugging issues.
|
||||||
|
# client_trace = false
|
||||||
|
|
||||||
## Layout of the topics published.
|
## Layout of the topics published.
|
||||||
## The following choices are available:
|
## The following choices are available:
|
||||||
## non-batch -- send individual messages, one for each metric
|
## non-batch -- send individual messages, one for each metric
|
||||||
|
|
|
||||||
|
|
@ -67,6 +67,12 @@
|
||||||
## actually reads it
|
## actually reads it
|
||||||
# retain = false
|
# retain = false
|
||||||
|
|
||||||
|
## Client trace messages
|
||||||
|
## When set to true, and debug mode enabled in the agent settings, the MQTT
|
||||||
|
## client's messages are included in telegraf logs. These messages are very
|
||||||
|
## noisey, but essential for debugging issues.
|
||||||
|
# client_trace = false
|
||||||
|
|
||||||
## Layout of the topics published.
|
## Layout of the topics published.
|
||||||
## The following choices are available:
|
## The following choices are available:
|
||||||
## non-batch -- send individual messages, one for each metric
|
## non-batch -- send individual messages, one for each metric
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue