2016-02-10 06:03:46 +08:00
|
|
|
# MQTT Consumer Input Plugin
|
|
|
|
|
|
2018-11-06 05:34:28 +08:00
|
|
|
The [MQTT][mqtt] consumer plugin reads from the specified MQTT topics
|
|
|
|
|
and creates metrics using one of the supported [input data formats][].
|
2016-02-10 06:03:46 +08:00
|
|
|
|
2021-11-23 23:20:39 +08:00
|
|
|
## Configuration
|
2016-02-10 06:03:46 +08:00
|
|
|
|
2022-05-24 21:49:47 +08:00
|
|
|
```toml @sample.conf
|
2022-04-12 21:29:02 +08:00
|
|
|
# Read metrics from MQTT topic(s)
|
2016-02-10 06:03:46 +08:00
|
|
|
[[inputs.mqtt_consumer]]
|
2020-06-05 05:43:05 +08:00
|
|
|
## Broker URLs for the MQTT server or cluster. To connect to multiple
|
2021-11-02 22:49:26 +08:00
|
|
|
## clusters or standalone servers, use a separate plugin instance.
|
2020-06-05 05:43:05 +08:00
|
|
|
## example: servers = ["tcp://localhost:1883"]
|
|
|
|
|
## servers = ["ssl://localhost:1883"]
|
|
|
|
|
## servers = ["ws://localhost:1883"]
|
2019-08-15 08:05:34 +08:00
|
|
|
servers = ["tcp://127.0.0.1:1883"]
|
|
|
|
|
|
|
|
|
|
## Topics that will be subscribed to.
|
|
|
|
|
topics = [
|
|
|
|
|
"telegraf/host01/cpu",
|
|
|
|
|
"telegraf/+/mem",
|
|
|
|
|
"sensors/#",
|
|
|
|
|
]
|
2018-10-04 08:21:30 +08:00
|
|
|
|
2019-08-20 10:05:22 +08:00
|
|
|
## The message topic will be stored in a tag specified by this value. If set
|
|
|
|
|
## to the empty string no topic tag will be created.
|
|
|
|
|
# topic_tag = "topic"
|
|
|
|
|
|
2018-10-04 08:21:30 +08:00
|
|
|
## QoS policy for messages
|
|
|
|
|
## 0 = at most once
|
|
|
|
|
## 1 = at least once
|
|
|
|
|
## 2 = exactly once
|
|
|
|
|
##
|
|
|
|
|
## When using a QoS of 1 or 2, you should enable persistent_session to allow
|
|
|
|
|
## resuming unacknowledged messages.
|
2019-08-15 08:05:34 +08:00
|
|
|
# qos = 0
|
2018-10-04 08:21:30 +08:00
|
|
|
|
2017-09-12 03:24:51 +08:00
|
|
|
## Connection timeout for initial connection in seconds
|
2019-08-15 08:05:34 +08:00
|
|
|
# connection_timeout = "30s"
|
2016-02-10 06:03:46 +08:00
|
|
|
|
2018-11-06 05:34:28 +08:00
|
|
|
## Maximum messages to read from the broker that have not been written by an
|
|
|
|
|
## output. For best throughput set based on the number of metrics within
|
|
|
|
|
## each message and the size of the output's metric_batch_size.
|
|
|
|
|
##
|
|
|
|
|
## For example, if each message from the queue contains 10 metrics and the
|
|
|
|
|
## output metric_batch_size is 1000, setting this to 100 will ensure that a
|
|
|
|
|
## full batch is collected and the write is triggered immediately without
|
|
|
|
|
## waiting until the next flush_interval.
|
|
|
|
|
# max_undelivered_messages = 1000
|
|
|
|
|
|
2019-08-15 08:05:34 +08:00
|
|
|
## Persistent session disables clearing of the client session on connection.
|
2019-11-13 06:11:42 +08:00
|
|
|
## In order for this option to work you must also set client_id to identify
|
2019-08-15 08:05:34 +08:00
|
|
|
## the client. To receive messages that arrived while the client is offline,
|
|
|
|
|
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
|
|
|
|
|
## publishing.
|
|
|
|
|
# persistent_session = false
|
2016-02-10 06:03:46 +08:00
|
|
|
|
2019-08-15 08:05:34 +08:00
|
|
|
## If unset, a random client ID will be generated.
|
|
|
|
|
# client_id = ""
|
2016-02-10 06:03:46 +08:00
|
|
|
|
2019-08-15 08:05:34 +08:00
|
|
|
## Username and password to connect MQTT server.
|
2016-02-10 06:03:46 +08:00
|
|
|
# username = "telegraf"
|
|
|
|
|
# password = "metricsmetricsmetricsmetrics"
|
|
|
|
|
|
2018-05-05 07:33:23 +08:00
|
|
|
## Optional TLS Config
|
|
|
|
|
# tls_ca = "/etc/telegraf/ca.pem"
|
|
|
|
|
# tls_cert = "/etc/telegraf/cert.pem"
|
|
|
|
|
# tls_key = "/etc/telegraf/key.pem"
|
|
|
|
|
## Use TLS but skip chain & host verification
|
2016-02-10 06:03:46 +08:00
|
|
|
# insecure_skip_verify = false
|
|
|
|
|
|
2016-04-06 04:42:20 +08:00
|
|
|
## Data format to consume.
|
2017-04-28 05:59:18 +08:00
|
|
|
## Each data format has its own unique set of configuration options, read
|
2016-02-19 05:26:51 +08:00
|
|
|
## more about them here:
|
|
|
|
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
2016-02-10 06:03:46 +08:00
|
|
|
data_format = "influx"
|
2021-11-23 23:20:39 +08:00
|
|
|
|
|
|
|
|
## Enable extracting tag values from MQTT topics
|
2022-04-12 21:29:02 +08:00
|
|
|
## _ denotes an ignored entry in the topic path
|
2021-11-23 23:20:39 +08:00
|
|
|
# [[inputs.mqtt_consumer.topic_parsing]]
|
|
|
|
|
# topic = ""
|
|
|
|
|
# measurement = ""
|
|
|
|
|
# tags = ""
|
|
|
|
|
# fields = ""
|
|
|
|
|
## Value supported is int, float, unit
|
|
|
|
|
# [[inputs.mqtt_consumer.topic.types]]
|
|
|
|
|
# key = type
|
|
|
|
|
```
|
|
|
|
|
|
2022-07-06 01:27:19 +08:00
|
|
|
## Example Output
|
|
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=45i 1653579140440951943
|
|
|
|
|
mqtt_consumer,host=pop-os,topic=telegraf/host01/cpu value=100i 1653579153147395661
|
|
|
|
|
```
|
|
|
|
|
|
2021-11-23 23:20:39 +08:00
|
|
|
## About Topic Parsing
|
|
|
|
|
|
2022-06-09 05:22:56 +08:00
|
|
|
The MQTT topic as a whole is stored as a tag, but this can be far too coarse to
|
|
|
|
|
be easily used when utilizing the data further down the line. This change allows
|
|
|
|
|
tag values to be extracted from the MQTT topic letting you store the information
|
|
|
|
|
provided in the topic in a meaningful way. An `_` denotes an ignored entry in
|
|
|
|
|
the topic path. Please see the following example.
|
2021-11-23 23:20:39 +08:00
|
|
|
|
2022-07-06 01:27:19 +08:00
|
|
|
### Topic Parsing Example
|
2021-11-23 23:20:39 +08:00
|
|
|
|
|
|
|
|
```toml
|
|
|
|
|
[[inputs.mqtt_consumer]]
|
|
|
|
|
## Broker URLs for the MQTT server or cluster. To connect to multiple
|
|
|
|
|
## clusters or standalone servers, use a separate plugin instance.
|
|
|
|
|
## example: servers = ["tcp://localhost:1883"]
|
|
|
|
|
## servers = ["ssl://localhost:1883"]
|
|
|
|
|
## servers = ["ws://localhost:1883"]
|
|
|
|
|
servers = ["tcp://127.0.0.1:1883"]
|
|
|
|
|
|
|
|
|
|
## Topics that will be subscribed to.
|
|
|
|
|
topics = [
|
|
|
|
|
"telegraf/+/cpu/23",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
## Data format to consume.
|
|
|
|
|
## Each data format has its own unique set of configuration options, read
|
|
|
|
|
## more about them here:
|
|
|
|
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
|
|
|
|
data_format = "value"
|
|
|
|
|
data_type = "float"
|
|
|
|
|
|
|
|
|
|
[[inputs.mqtt_consumer.topic_parsing]]
|
|
|
|
|
topic = "telegraf/one/cpu/23"
|
|
|
|
|
measurement = "_/_/measurement/_"
|
|
|
|
|
tags = "tag/_/_/_"
|
|
|
|
|
fields = "_/_/_/test"
|
|
|
|
|
[inputs.mqtt_consumer.topic_parsing.types]
|
|
|
|
|
test = "int"
|
|
|
|
|
```
|
|
|
|
|
|
2022-07-06 01:27:19 +08:00
|
|
|
Will result in the following metric:
|
2021-11-23 23:20:39 +08:00
|
|
|
|
2022-07-06 01:27:19 +08:00
|
|
|
```text
|
2021-11-23 23:20:39 +08:00
|
|
|
cpu,host=pop-os,tag=telegraf,topic=telegraf/one/cpu/23 value=45,test=23i 1637014942460689291
|
2016-02-10 06:03:46 +08:00
|
|
|
```
|
|
|
|
|
|
2022-07-06 01:27:19 +08:00
|
|
|
## Field Pivoting Example
|
|
|
|
|
|
|
|
|
|
You can use the pivot processor to rotate single
|
|
|
|
|
valued metrics into a multi field metric.
|
|
|
|
|
For more info check out the pivot processors
|
|
|
|
|
[here][1].
|
|
|
|
|
|
|
|
|
|
For this example these are the topics:
|
|
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
/sensors/CLE/v1/device5/temp
|
|
|
|
|
/sensors/CLE/v1/device5/rpm
|
|
|
|
|
/sensors/CLE/v1/device5/ph
|
|
|
|
|
/sensors/CLE/v1/device5/spin
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
And these are the metrics:
|
|
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
sensors,site=CLE,version=v1,device_name=device5,field=temp value=390
|
|
|
|
|
sensors,site=CLE,version=v1,device_name=device5,field=rpm value=45.0
|
|
|
|
|
sensors,site=CLE,version=v1,device_name=device5,field=ph value=1.45
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
Using pivot in the config will rotate the metrics into a multi field metric.
|
|
|
|
|
The config:
|
|
|
|
|
|
|
|
|
|
```toml
|
|
|
|
|
[[inputs.mqtt_consumer]]
|
|
|
|
|
....
|
|
|
|
|
topics = "/sensors/#"
|
|
|
|
|
[[inputs.mqtt_consumer.topic_parsing]]
|
|
|
|
|
measurement = "/measurement/_/_/_/_"
|
|
|
|
|
tags = "/_/site/version/device_name/field"
|
|
|
|
|
[[processors.pivot]]
|
|
|
|
|
tag_key = "field"
|
|
|
|
|
value_key = "value"
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
Will result in the following metric:
|
|
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
sensors,site=CLE,version=v1,device_name=device5 temp=390,rpm=45.0,ph=1.45
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
[1]: <https://github.com/influxdata/telegraf/tree/master/plugins/processors/pivot> "Pivot Processor"
|
|
|
|
|
|
2022-06-09 05:22:56 +08:00
|
|
|
## Metrics
|
2016-02-10 06:03:46 +08:00
|
|
|
|
|
|
|
|
- All measurements are tagged with the incoming topic, ie
|
|
|
|
|
`topic=telegraf/host01/cpu`
|
2018-11-06 05:34:28 +08:00
|
|
|
|
2021-11-23 23:20:39 +08:00
|
|
|
- example when [[inputs.mqtt_consumer.topic_parsing]] is set
|
|
|
|
|
|
2022-08-23 02:54:38 +08:00
|
|
|
- when [[inputs.internal]] is set:
|
|
|
|
|
- payload_size (int): get the cumulative size in bytes that have been received from incoming messages
|
|
|
|
|
- messages_received (int): count of the number of messages that have been received from mqtt
|
|
|
|
|
|
|
|
|
|
This will result in the following metric:
|
|
|
|
|
|
|
|
|
|
```text
|
|
|
|
|
internal_mqtt_consumer host=pop-os version=1.24.0 messages_received=622i payload_size=37942i 1657282270000000000
|
|
|
|
|
```
|
|
|
|
|
|
2018-11-06 05:34:28 +08:00
|
|
|
[mqtt]: https://mqtt.org
|
|
|
|
|
[input data formats]: /docs/DATA_FORMATS_INPUT.md
|