feat(inputs.mqtt_consumer): Add incoming mqtt message size calculation (#11426)
This commit is contained in:
parent
f1ce84f02d
commit
1dc617ebdd
|
|
@ -84,4 +84,5 @@ internal_write,output=file,host=tyrion,version=1.99.0 buffer_limit=10000i,write_
|
||||||
internal_gather,input=internal,host=tyrion,version=1.99.0 metrics_gathered=19i,gather_time_ns=442114i 1480682800000000000
|
internal_gather,input=internal,host=tyrion,version=1.99.0 metrics_gathered=19i,gather_time_ns=442114i 1480682800000000000
|
||||||
internal_gather,input=http_listener,host=tyrion,version=1.99.0 metrics_gathered=0i,gather_time_ns=167285i 1480682800000000000
|
internal_gather,input=http_listener,host=tyrion,version=1.99.0 metrics_gathered=0i,gather_time_ns=167285i 1480682800000000000
|
||||||
internal_http_listener,address=:8186,host=tyrion,version=1.99.0 queries_received=0i,writes_received=0i,requests_received=0i,buffers_created=0i,requests_served=0i,pings_received=0i,bytes_received=0i,not_founds_served=0i,pings_served=0i,queries_served=0i,writes_served=0i 1480682800000000000
|
internal_http_listener,address=:8186,host=tyrion,version=1.99.0 queries_received=0i,writes_received=0i,requests_received=0i,buffers_created=0i,requests_served=0i,pings_received=0i,bytes_received=0i,not_founds_served=0i,pings_served=0i,queries_served=0i,writes_served=0i 1480682800000000000
|
||||||
|
internal_mqtt_consumer,host=tyrion,version=1.99.0 messages_received=622i,payload_size=37942i 1657282270000000000
|
||||||
```
|
```
|
||||||
|
|
|
||||||
|
|
@ -194,5 +194,15 @@ sensors,site=CLE,version=v1,device_name=device5 temp=390,rpm=45.0,ph=1.45
|
||||||
|
|
||||||
- example when [[inputs.mqtt_consumer.topic_parsing]] is set
|
- example when [[inputs.mqtt_consumer.topic_parsing]] is set
|
||||||
|
|
||||||
|
- when [[inputs.internal]] is set:
|
||||||
|
- payload_size (int): get the cumulative size in bytes that have been received from incoming messages
|
||||||
|
- messages_received (int): count of the number of messages that have been received from mqtt
|
||||||
|
|
||||||
|
This will result in the following metric:
|
||||||
|
|
||||||
|
```text
|
||||||
|
internal_mqtt_consumer host=pop-os version=1.24.0 messages_received=622i payload_size=37942i 1657282270000000000
|
||||||
|
```
|
||||||
|
|
||||||
[mqtt]: https://mqtt.org
|
[mqtt]: https://mqtt.org
|
||||||
[input data formats]: /docs/DATA_FORMATS_INPUT.md
|
[input data formats]: /docs/DATA_FORMATS_INPUT.md
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
|
"github.com/influxdata/telegraf/selfstat"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
|
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
|
||||||
|
|
@ -90,6 +91,8 @@ type MQTTConsumer struct {
|
||||||
topicTagParse string
|
topicTagParse string
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
payloadSize selfstat.Stat
|
||||||
|
messagesRecv selfstat.Stat
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*MQTTConsumer) SampleConfig() string {
|
func (*MQTTConsumer) SampleConfig() string {
|
||||||
|
|
@ -146,6 +149,8 @@ func (m *MQTTConsumer) Init() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.payloadSize = selfstat.Register("mqtt_consumer", "payload_size", map[string]string{})
|
||||||
|
m.messagesRecv = selfstat.Register("mqtt_consumer", "messages_received", map[string]string{})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
|
func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
|
||||||
|
|
@ -240,6 +245,10 @@ func compareTopics(expected []string, incoming []string) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Message) error {
|
func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Message) error {
|
||||||
|
payloadBytes := len(msg.Payload())
|
||||||
|
m.payloadSize.Incr(int64(payloadBytes))
|
||||||
|
m.messagesRecv.Incr(1)
|
||||||
|
|
||||||
metrics, err := m.parser.Parse(msg.Payload())
|
metrics, err := m.parser.Parse(msg.Payload())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -156,6 +156,7 @@ func TestPersistentClientIDFail(t *testing.T) {
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
topic string
|
topic string
|
||||||
|
qos byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Message) Duplicate() bool {
|
func (m *Message) Duplicate() bool {
|
||||||
|
|
@ -163,7 +164,7 @@ func (m *Message) Duplicate() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Message) Qos() byte {
|
func (m *Message) Qos() byte {
|
||||||
panic("not implemented")
|
return m.qos
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Message) Retained() bool {
|
func (m *Message) Retained() bool {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue