diff --git a/plugins/inputs/internal/README.md b/plugins/inputs/internal/README.md index 40e0482f8..90882d749 100644 --- a/plugins/inputs/internal/README.md +++ b/plugins/inputs/internal/README.md @@ -84,4 +84,5 @@ internal_write,output=file,host=tyrion,version=1.99.0 buffer_limit=10000i,write_ internal_gather,input=internal,host=tyrion,version=1.99.0 metrics_gathered=19i,gather_time_ns=442114i 1480682800000000000 internal_gather,input=http_listener,host=tyrion,version=1.99.0 metrics_gathered=0i,gather_time_ns=167285i 1480682800000000000 internal_http_listener,address=:8186,host=tyrion,version=1.99.0 queries_received=0i,writes_received=0i,requests_received=0i,buffers_created=0i,requests_served=0i,pings_received=0i,bytes_received=0i,not_founds_served=0i,pings_served=0i,queries_served=0i,writes_served=0i 1480682800000000000 +internal_mqtt_consumer,host=tyrion,version=1.99.0 messages_received=622i,payload_size=37942i 1657282270000000000 ``` diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md index e0f8984e1..d4633e854 100644 --- a/plugins/inputs/mqtt_consumer/README.md +++ b/plugins/inputs/mqtt_consumer/README.md @@ -194,5 +194,15 @@ sensors,site=CLE,version=v1,device_name=device5 temp=390,rpm=45.0,ph=1.45 - example when [[inputs.mqtt_consumer.topic_parsing]] is set +- when [[inputs.internal]] is set: + - payload_size (int): get the cumulative size in bytes that have been received from incoming messages + - messages_received (int): count of the number of messages that have been received from mqtt + +This will result in the following metric: + +```text +internal_mqtt_consumer host=pop-os version=1.24.0 messages_received=622i payload_size=37942i 1657282270000000000 +``` + [mqtt]: https://mqtt.org [input data formats]: /docs/DATA_FORMATS_INPUT.md diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 37fabded7..adc6ff029 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -19,6 +19,7 @@ import ( "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/selfstat" ) // DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data. @@ -90,6 +91,8 @@ type MQTTConsumer struct { topicTagParse string ctx context.Context cancel context.CancelFunc + payloadSize selfstat.Stat + messagesRecv selfstat.Stat } func (*MQTTConsumer) SampleConfig() string { @@ -146,6 +149,8 @@ func (m *MQTTConsumer) Init() error { } } + m.payloadSize = selfstat.Register("mqtt_consumer", "payload_size", map[string]string{}) + m.messagesRecv = selfstat.Register("mqtt_consumer", "messages_received", map[string]string{}) return nil } func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error { @@ -240,6 +245,10 @@ func compareTopics(expected []string, incoming []string) bool { } func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Message) error { + payloadBytes := len(msg.Payload()) + m.payloadSize.Incr(int64(payloadBytes)) + m.messagesRecv.Incr(1) + metrics, err := m.parser.Parse(msg.Payload()) if err != nil { return err diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 3b8780c78..83144f486 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -156,6 +156,7 @@ func TestPersistentClientIDFail(t *testing.T) { type Message struct { topic string + qos byte } func (m *Message) Duplicate() bool { @@ -163,7 +164,7 @@ func (m *Message) Duplicate() bool { } func (m *Message) Qos() byte { - panic("not implemented") + return m.qos } func (m *Message) Retained() bool {