diff --git a/plugins/common/mqtt/mqtt.go b/plugins/common/mqtt/mqtt.go index 3534ad093..84367df52 100644 --- a/plugins/common/mqtt/mqtt.go +++ b/plugins/common/mqtt/mqtt.go @@ -35,6 +35,7 @@ type MqttConfig struct { KeepAlive int64 `toml:"keep_alive"` PersistentSession bool `toml:"persistent_session"` PublishPropertiesV5 *PublishProperties `toml:"v5"` + ClientTrace bool `toml:"client_trace"` tls.ClientConfig diff --git a/plugins/inputs/mqtt_consumer/logger.go b/plugins/common/mqtt/mqtt_logger.go similarity index 92% rename from plugins/inputs/mqtt_consumer/logger.go rename to plugins/common/mqtt/mqtt_logger.go index 94f4886d5..4d681f652 100644 --- a/plugins/inputs/mqtt_consumer/logger.go +++ b/plugins/common/mqtt/mqtt_logger.go @@ -1,4 +1,4 @@ -package mqtt_consumer +package mqtt import ( "github.com/influxdata/telegraf" diff --git a/plugins/common/mqtt/mqtt_v3.go b/plugins/common/mqtt/mqtt_v3.go index a8705b8e3..c4bd0b41b 100644 --- a/plugins/common/mqtt/mqtt_v3.go +++ b/plugins/common/mqtt/mqtt_v3.go @@ -7,6 +7,7 @@ import ( mqttv3 "github.com/eclipse/paho.mqtt.golang" // Library that supports v3.1.1 "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/logger" ) type mqttv311Client struct { @@ -77,6 +78,14 @@ func NewMQTTv311Client(cfg *MqttConfig) (*mqttv311Client, error) { opts.AddBroker(broker) } + if cfg.ClientTrace { + log := &mqttLogger{logger.NewLogger("paho", "", "")} + mqttv3.ERROR = log + mqttv3.CRITICAL = log + mqttv3.WARN = log + mqttv3.DEBUG = log + } + return &mqttv311Client{ client: mqttv3.NewClient(opts), timeout: time.Duration(cfg.Timeout), diff --git a/plugins/common/mqtt/mqtt_v5.go b/plugins/common/mqtt/mqtt_v5.go index ca380a776..c018703ec 100644 --- a/plugins/common/mqtt/mqtt_v5.go +++ b/plugins/common/mqtt/mqtt_v5.go @@ -12,17 +12,19 @@ import ( "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/logger" ) type mqttv5Client struct { - client *mqttv5auto.ConnectionManager - options mqttv5auto.ClientConfig - username config.Secret - password config.Secret - timeout time.Duration - qos int - retain bool - properties *mqttv5.PublishProperties + client *mqttv5auto.ConnectionManager + options mqttv5auto.ClientConfig + username config.Secret + password config.Secret + timeout time.Duration + qos int + retain bool + clientTrace bool + properties *mqttv5.PublishProperties } func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) { @@ -94,13 +96,14 @@ func NewMQTTv5Client(cfg *MqttConfig) (*mqttv5Client, error) { } return &mqttv5Client{ - options: opts, - timeout: time.Duration(cfg.Timeout), - username: cfg.Username, - password: cfg.Password, - qos: cfg.QoS, - retain: cfg.Retain, - properties: properties, + options: opts, + timeout: time.Duration(cfg.Timeout), + username: cfg.Username, + password: cfg.Password, + qos: cfg.QoS, + retain: cfg.Retain, + properties: properties, + clientTrace: cfg.ClientTrace, }, nil } @@ -115,8 +118,14 @@ func (m *mqttv5Client) Connect() (bool, error) { return false, fmt.Errorf("getting password failed: %w", err) } defer pass.Destroy() - m.options.ConnectUsername = user.TemporaryString() - m.options.ConnectPassword = pass.Bytes() + m.options.ConnectUsername = user.String() + m.options.ConnectPassword = []byte(pass.String()) + + if m.clientTrace { + log := mqttLogger{logger.NewLogger("paho", "", "")} + m.options.Debug = log + m.options.Errors = log + } client, err := mqttv5auto.NewConnection(context.Background(), m.options) if err != nil { diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 901bedce9..73e72dddb 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -73,7 +73,6 @@ type MQTTConsumer struct { Password config.Secret `toml:"password"` QoS int `toml:"qos"` ConnectionTimeout config.Duration `toml:"connection_timeout"` - ClientTrace bool `toml:"client_trace"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` PersistentSession bool `toml:"persistent_session"` ClientID string `toml:"client_id"` @@ -105,14 +104,6 @@ func (m *MQTTConsumer) SetParser(parser telegraf.Parser) { m.parser = parser } func (m *MQTTConsumer) Init() error { - if m.ClientTrace { - log := &mqttLogger{m.Log} - mqtt.ERROR = log - mqtt.CRITICAL = log - mqtt.WARN = log - mqtt.DEBUG = log - } - m.state = Disconnected if m.PersistentSession && m.ClientID == "" { return errors.New("persistent_session requires client_id") diff --git a/plugins/outputs/mqtt/README.md b/plugins/outputs/mqtt/README.md index b81408b66..a05fa97e5 100644 --- a/plugins/outputs/mqtt/README.md +++ b/plugins/outputs/mqtt/README.md @@ -102,6 +102,12 @@ to use them. ## actually reads it # retain = false + ## Client trace messages + ## When set to true, and debug mode enabled in the agent settings, the MQTT + ## client's messages are included in telegraf logs. These messages are very + ## noisey, but essential for debugging issues. + # client_trace = false + ## Layout of the topics published. ## The following choices are available: ## non-batch -- send individual messages, one for each metric diff --git a/plugins/outputs/mqtt/sample.conf b/plugins/outputs/mqtt/sample.conf index ecfb47ce8..96f5e037f 100644 --- a/plugins/outputs/mqtt/sample.conf +++ b/plugins/outputs/mqtt/sample.conf @@ -67,6 +67,12 @@ ## actually reads it # retain = false + ## Client trace messages + ## When set to true, and debug mode enabled in the agent settings, the MQTT + ## client's messages are included in telegraf logs. These messages are very + ## noisey, but essential for debugging issues. + # client_trace = false + ## Layout of the topics published. ## The following choices are available: ## non-batch -- send individual messages, one for each metric