feat(inputs.dpdk): Add options to customize error-behavior and metric layout (#14308)

This commit is contained in:
PanKaker 2023-12-04 20:07:45 +01:00 committed by GitHub
parent d570f015df
commit f654d9236b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1352 additions and 339 deletions

View File

@ -22,6 +22,14 @@ driver-specific extended stats (`/ethdev/xstats`) via this new interface.
introduced reading via `v2` interface common statistics (`/ethdev/stats`) in
addition to existing (`/ethdev/xstats`).
[DPDK Release 21.11](https://doc.dpdk.org/guides/rel_notes/release_21_11.html)
introduced reading via `v2` interface additional ethernet device information
(`/ethdev/info`).
This version also adds support for exposing telemetry from multiple
`--in-memory` instances of DPDK via dedicated sockets.
The plugin supports reading from those sockets when `in_memory`
option is set.
The example usage of `v2` telemetry interface can be found in [Telemetry User
Guide](https://doc.dpdk.org/guides/howto/telemetry.html). A variety of [DPDK
Sample Applications](https://doc.dpdk.org/guides/sample_app_ug/index.html) is
@ -35,14 +43,17 @@ and to explore the exposed metrics.
> `DPDK 20.05 <= version < DPDK 20.11` it is recommended to disable querying
> `/ethdev/stats` by setting corresponding `exclude_commands` configuration
> option.
>
> **NOTE:** Since DPDK will most likely run with root privileges, the socket
> telemetry interface exposed by DPDK will also require root access. This means
> that either access permissions have to be adjusted for socket telemetry
> interface to allow Telegraf to access it, or Telegraf should run with root
> privileges.
> **NOTE:** The DPDK socket must exist for Telegraf to start successfully.
> Telegraf will attempt to connect to the DPDK socket during the initialization
> phase.
>
> **NOTE:** There are known issues with exposing telemetry from multiple
> `--in-memory` instances while using `DPDK 21.11.1`. The recommended version
> to use in conjunction with `in_memory` plugin option is `DPDK 21.11.2`
> or higher.
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
@ -72,20 +83,37 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# socket_access_timeout = "200ms"
## Enables telemetry data collection for selected device types.
## Adding "ethdev" enables collection of telemetry from DPDK NICs
## (stats, xstats, link_status).
## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices
## (xstats).
## Adding "ethdev" enables collection of telemetry from DPDK NICs (stats, xstats, link_status, info).
## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices (xstats).
# device_types = ["ethdev"]
## List of custom, application-specific telemetry commands to query
## The list of available commands depend on the application deployed.
## Applications can register their own commands via telemetry library API
## http://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands
## https://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands
## For L3 Forwarding with Power Management Sample Application this could be:
## additional_commands = ["/l3fwd-power/stats"]
# additional_commands = []
## List of plugin options.
## Supported options:
## - "in_memory" option enables reading for multiple sockets when a dpdk application is running with --in-memory option.
## When option is enabled plugin will try to find additional socket paths related to provided socket_path.
## Details: https://doc.dpdk.org/guides/howto/telemetry.html#connecting-to-different-dpdk-processes
# plugin_options = ["in_memory"]
## Specifies plugin behavior regarding unreachable socket (which might not have been initialized yet).
## Available choices:
## - error: Telegraf will return an error during the startup and gather phases if socket is unreachable
## - ignore: Telegraf will ignore error regarding unreachable socket on both startup and gather
# unreachable_socket_behavior = "error"
## List of metadata fields which will be added to every metric produced by the plugin.
## Supported options:
## - "pid" - exposes PID of DPDK process. Example: pid=2179660i
## - "version" - exposes version of DPDK. Example: version="DPDK 21.11.2"
# metadata_fields = ["pid", "version"]
## Allows turning off collecting data for individual "ethdev" commands.
## Remove "/ethdev/link_status" from list to gather link status metrics.
[inputs.dpdk.ethdev]
@ -107,6 +135,7 @@ for additional usage information.
This configuration allows getting metrics for all devices reported via
`/ethdev/list` command:
* `/ethdev/info` - device information: name, MAC address, buffers size, etc. (since `DPDK 21.11`)
* `/ethdev/stats` - basic device statistics (since `DPDK 20.11`)
* `/ethdev/xstats` - extended device statistics
* `/ethdev/link_status` - up/down link status
@ -125,7 +154,7 @@ should be adjusted accordingly (e.g. `interval = "30s"`).
### Example: Excluding NIC link status from being collected
Checking link status depending on underlying implementation may take more time
to complete. This configuration can be used to exclude this telemetry command
to complete. This configuration can be used to exclude this telemetry command
to allow faster response for metrics.
```toml
@ -155,7 +184,7 @@ configuration, with higher timeout.
device_types = ["ethdev"]
[inputs.dpdk.ethdev]
exclude_commands = ["/ethdev/stats", "/ethdev/xstats"]
exclude_commands = ["/ethdev/info", "/ethdev/stats", "/ethdev/xstats"]
```
### Example: Getting application-specific metrics
@ -183,7 +212,7 @@ format:
* Commands have to start with `/`.
Providing invalid commands will prevent the plugin from starting. Additional
commands allow duplicates, but they will be removed during execution so each
commands allow duplicates, but they will be removed during execution, so each
command will be executed only once during each metric gathering interval.
[sample-app]: https://doc.dpdk.org/guides/sample_app_ug/l3_forward_power_man.html
@ -232,7 +261,7 @@ JSON Flattener](../../parsers/json/README.md) and exposed as fields. If DPDK
response contains no information (is empty or is null) then such response will
be discarded.
> **NOTE:** Since DPDK allows registering custom metrics in its telemetry
> **NOTE:** Since DPDK allows registering custom metrics in its telemetry
> framework the JSON response from DPDK may contain various sets of metrics.
> While metrics from `/ethdev/stats` should be most stable, the `/ethdev/xstats`
> may contain driver-specific metrics (depending on DPDK application
@ -253,7 +282,7 @@ dpdk,host=dpdk-host,dpdk_instance=l3fwd-power,command=/ethdev/stats,params=0 [fi
| `host` | hostname of the machine (consult [Telegraf Agent configuration](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#agent) for additional details) |
| `dpdk_instance` | custom tag from `[inputs.dpdk.tags]` (optional) |
| `command` | executed command (without params) |
| `params` | command parameter, e.g. for `/ethdev/stats` it is the id of NIC as exposed by `/ethdev/list`. For DPDK app that uses 2 NICs the metrics will output e.g. `params=0`, `params=1`. |
| `params` | command parameter, e.g. for `/ethdev/stats` it is the ID of NIC as exposed by `/ethdev/list`. For DPDK app that uses 2 NICs the metrics will output e.g. `params=0`, `params=1`. |
When running plugin configuration below...
@ -261,6 +290,7 @@ When running plugin configuration below...
[[inputs.dpdk]]
device_types = ["ethdev"]
additional_commands = ["/l3fwd-power/stats"]
metadata_fields = []
[inputs.dpdk.tags]
dpdk_instance = "l3fwd-power"
```
@ -269,11 +299,35 @@ When running plugin configuration below...
`host=dpdk-host`:
```text
dpdk,command=/ethdev/info,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 all_multicast=0,dev_configured=1,dev_flags=74,dev_started=1,ethdev_rss_hf=0,lro=0,mac_addr="E4:3D:1A:DD:13:31",mtu=1500,name="0000:ca:00.1",nb_rx_queues=1,nb_tx_queues=1,numa_node=1,port_id=0,promiscuous=1,rx_mbuf_alloc_fail=0,rx_mbuf_size_min=2176,rx_offloads=0,rxq_state_0=1,scattered_rx=0,state=1,tx_offloads=65536,txq_state_0=1 1659017414000000000
dpdk,command=/ethdev/stats,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 q_opackets_0=0,q_ipackets_5=0,q_errors_11=0,ierrors=0,q_obytes_5=0,q_obytes_10=0,q_opackets_10=0,q_ipackets_4=0,q_ipackets_7=0,q_ipackets_15=0,q_ibytes_5=0,q_ibytes_6=0,q_ibytes_9=0,obytes=0,q_opackets_1=0,q_opackets_11=0,q_obytes_7=0,q_errors_5=0,q_errors_10=0,q_ibytes_4=0,q_obytes_6=0,q_errors_1=0,q_opackets_5=0,q_errors_3=0,q_errors_12=0,q_ipackets_11=0,q_ipackets_12=0,q_obytes_14=0,q_opackets_15=0,q_obytes_2=0,q_errors_8=0,q_opackets_12=0,q_errors_0=0,q_errors_9=0,q_opackets_14=0,q_ibytes_3=0,q_ibytes_15=0,q_ipackets_13=0,q_ipackets_14=0,q_obytes_3=0,q_errors_13=0,q_opackets_3=0,q_ibytes_0=7092,q_ibytes_2=0,q_ibytes_8=0,q_ipackets_8=0,q_ipackets_10=0,q_obytes_4=0,q_ibytes_10=0,q_ibytes_13=0,q_ibytes_1=0,q_ibytes_12=0,opackets=0,q_obytes_1=0,q_errors_15=0,q_opackets_2=0,oerrors=0,rx_nombuf=0,q_opackets_8=0,q_ibytes_11=0,q_ipackets_3=0,q_obytes_0=0,q_obytes_12=0,q_obytes_11=0,q_obytes_13=0,q_errors_6=0,q_ipackets_1=0,q_ipackets_6=0,q_ipackets_9=0,q_obytes_15=0,q_opackets_7=0,q_ibytes_14=0,ipackets=98,q_ipackets_2=0,q_opackets_6=0,q_ibytes_7=0,imissed=0,q_opackets_4=0,q_opackets_9=0,q_obytes_8=0,q_obytes_9=0,q_errors_4=0,q_errors_14=0,q_opackets_13=0,ibytes=7092,q_ipackets_0=98,q_errors_2=0,q_errors_7=0 1606310780000000000
dpdk,command=/ethdev/stats,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 q_opackets_0=0,q_ipackets_5=0,q_errors_11=0,ierrors=0,q_obytes_5=0,q_obytes_10=0,q_opackets_10=0,q_ipackets_4=0,q_ipackets_7=0,q_ipackets_15=0,q_ibytes_5=0,q_ibytes_6=0,q_ibytes_9=0,obytes=0,q_opackets_1=0,q_opackets_11=0,q_obytes_7=0,q_errors_5=0,q_errors_10=0,q_ibytes_4=0,q_obytes_6=0,q_errors_1=0,q_opackets_5=0,q_errors_3=0,q_errors_12=0,q_ipackets_11=0,q_ipackets_12=0,q_obytes_14=0,q_opackets_15=0,q_obytes_2=0,q_errors_8=0,q_opackets_12=0,q_errors_0=0,q_errors_9=0,q_opackets_14=0,q_ibytes_3=0,q_ibytes_15=0,q_ipackets_13=0,q_ipackets_14=0,q_obytes_3=0,q_errors_13=0,q_opackets_3=0,q_ibytes_0=7092,q_ibytes_2=0,q_ibytes_8=0,q_ipackets_8=0,q_ipackets_10=0,q_obytes_4=0,q_ibytes_10=0,q_ibytes_13=0,q_ibytes_1=0,q_ibytes_12=0,opackets=0,q_obytes_1=0,q_errors_15=0,q_opackets_2=0,oerrors=0,rx_nombuf=0,q_opackets_8=0,q_ibytes_11=0,q_ipackets_3=0,q_obytes_0=0,q_obytes_12=0,q_obytes_11=0,q_obytes_13=0,q_errors_6=0,q_ipackets_1=0,q_ipackets_6=0,q_ipackets_9=0,q_obytes_15=0,q_opackets_7=0,q_ibytes_14=0,ipackets=98,q_ipackets_2=0,q_opackets_6=0,q_ibytes_7=0,imissed=0,q_opackets_4=0,q_opackets_9=0,q_obytes_8=0,q_obytes_9=0,q_errors_4=0,q_errors_14=0,q_opackets_13=0,ibytes=7092,q_ipackets_0=98,q_errors_2=0,q_errors_7=0 1606310780000000000
dpdk,command=/ethdev/xstats,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 out_octets_encrypted=0,rx_fcoe_mbuf_allocation_errors=0,tx_q1packets=0,rx_priority0_xoff_packets=0,rx_priority7_xoff_packets=0,rx_errors=0,mac_remote_errors=0,in_pkts_invalid=0,tx_priority3_xoff_packets=0,tx_errors=0,rx_fcoe_bytes=0,rx_flow_control_xon_packets=0,rx_priority4_xoff_packets=0,tx_priority2_xoff_packets=0,rx_illegal_byte_errors=0,rx_xoff_packets=0,rx_management_packets=0,rx_priority7_dropped=0,rx_priority4_dropped=0,in_pkts_unchecked=0,rx_error_bytes=0,rx_size_256_to_511_packets=0,tx_priority4_xoff_packets=0,rx_priority6_xon_packets=0,tx_priority4_xon_to_xoff_packets=0,in_pkts_delayed=0,rx_priority0_mbuf_allocation_errors=0,out_octets_protected=0,tx_priority7_xon_to_xoff_packets=0,tx_priority1_xon_to_xoff_packets=0,rx_fcoe_no_direct_data_placement_ext_buff=0,tx_priority6_xon_to_xoff_packets=0,flow_director_filter_add_errors=0,rx_total_packets=99,rx_crc_errors=0,flow_director_filter_remove_errors=0,rx_missed_errors=0,tx_size_64_packets=0,rx_priority3_dropped=0,flow_director_matched_filters=0,tx_priority2_xon_to_xoff_packets=0,rx_priority1_xon_packets=0,rx_size_65_to_127_packets=99,rx_fragment_errors=0,in_pkts_notusingsa=0,rx_q0bytes=7162,rx_fcoe_dropped=0,rx_priority1_dropped=0,rx_fcoe_packets=0,rx_priority5_xoff_packets=0,out_pkts_protected=0,tx_total_packets=0,rx_priority2_dropped=0,in_pkts_late=0,tx_q1bytes=0,in_pkts_badtag=0,rx_multicast_packets=99,rx_priority6_xoff_packets=0,tx_flow_control_xoff_packets=0,rx_flow_control_xoff_packets=0,rx_priority0_xon_packets=0,in_pkts_untagged=0,tx_fcoe_packets=0,rx_priority7_mbuf_allocation_errors=0,tx_priority0_xon_to_xoff_packets=0,tx_priority5_xon_to_xoff_packets=0,tx_flow_control_xon_packets=0,tx_q0packets=0,tx_xoff_packets=0,rx_size_512_to_1023_packets=0,rx_priority3_xon_packets=0,rx_q0errors=0,rx_oversize_errors=0,tx_priority4_xon_packets=0,tx_priority5_xoff_packets=0,rx_priority5_xon_packets=0,rx_total_missed_packets=0,rx_priority4_mbuf_allocation_errors=0,tx_priority1_xon_packets=0,tx_management_packets=0,rx_priority5_mbuf_allocation_errors=0,rx_fcoe_no_direct_data_placement=0,rx_undersize_errors=0,tx_priority1_xoff_packets=0,rx_q0packets=99,tx_q2packets=0,tx_priority6_xon_packets=0,rx_good_packets=99,tx_priority5_xon_packets=0,tx_size_256_to_511_packets=0,rx_priority6_dropped=0,rx_broadcast_packets=0,tx_size_512_to_1023_packets=0,tx_priority3_xon_to_xoff_packets=0,in_pkts_unknownsci=0,in_octets_validated=0,tx_priority6_xoff_packets=0,tx_priority7_xoff_packets=0,rx_jabber_errors=0,tx_priority7_xon_packets=0,tx_priority0_xon_packets=0,in_pkts_unusedsa=0,tx_priority0_xoff_packets=0,mac_local_errors=33,rx_total_bytes=7162,in_pkts_notvalid=0,rx_length_errors=0,in_octets_decrypted=0,rx_size_128_to_255_packets=0,rx_good_bytes=7162,tx_size_65_to_127_packets=0,rx_mac_short_packet_dropped=0,tx_size_1024_to_max_packets=0,rx_priority2_mbuf_allocation_errors=0,flow_director_added_filters=0,tx_multicast_packets=0,rx_fcoe_crc_errors=0,rx_priority1_xoff_packets=0,flow_director_missed_filters=0,rx_xon_packets=0,tx_size_128_to_255_packets=0,out_pkts_encrypted=0,rx_priority4_xon_packets=0,rx_priority0_dropped=0,rx_size_1024_to_max_packets=0,tx_good_bytes=0,rx_management_dropped=0,rx_mbuf_allocation_errors=0,tx_xon_packets=0,rx_priority3_xoff_packets=0,tx_good_packets=0,tx_fcoe_bytes=0,rx_priority6_mbuf_allocation_errors=0,rx_priority2_xon_packets=0,tx_broadcast_packets=0,tx_q2bytes=0,rx_priority7_xon_packets=0,out_pkts_untagged=0,rx_priority2_xoff_packets=0,rx_priority1_mbuf_allocation_errors=0,tx_q0bytes=0,rx_size_64_packets=0,rx_priority5_dropped=0,tx_priority2_xon_packets=0,in_pkts_nosci=0,flow_director_removed_filters=0,in_pkts_ok=0,rx_l3_l4_xsum_error=0,rx_priority3_mbuf_allocation_errors=0,tx_priority3_xon_packets=0 1606310780000000000
dpdk,command=/ethdev/xstats,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 tx_priority5_xoff_packets=0,in_pkts_unknownsci=0,tx_q0packets=0,tx_total_packets=0,rx_crc_errors=0,rx_priority4_xoff_packets=0,rx_priority5_dropped=0,tx_size_65_to_127_packets=0,rx_good_packets=98,tx_priority6_xoff_packets=0,tx_fcoe_bytes=0,out_octets_protected=0,out_pkts_encrypted=0,rx_priority1_xon_packets=0,tx_size_128_to_255_packets=0,rx_flow_control_xoff_packets=0,rx_priority7_xoff_packets=0,tx_priority0_xon_to_xoff_packets=0,rx_broadcast_packets=0,tx_priority1_xon_packets=0,rx_xon_packets=0,rx_fragment_errors=0,tx_flow_control_xoff_packets=0,tx_q0bytes=0,out_pkts_untagged=0,rx_priority4_xon_packets=0,tx_priority5_xon_packets=0,rx_priority1_xoff_packets=0,rx_good_bytes=7092,rx_priority4_mbuf_allocation_errors=0,in_octets_decrypted=0,tx_priority2_xon_to_xoff_packets=0,rx_priority3_dropped=0,tx_multicast_packets=0,mac_local_errors=33,in_pkts_ok=0,rx_illegal_byte_errors=0,rx_xoff_packets=0,rx_q0errors=0,flow_director_added_filters=0,rx_size_256_to_511_packets=0,rx_priority3_xon_packets=0,rx_l3_l4_xsum_error=0,rx_priority6_dropped=0,in_pkts_notvalid=0,rx_size_64_packets=0,tx_management_packets=0,rx_length_errors=0,tx_priority7_xon_to_xoff_packets=0,rx_mbuf_allocation_errors=0,rx_missed_errors=0,rx_priority1_mbuf_allocation_errors=0,rx_fcoe_no_direct_data_placement=0,tx_priority3_xoff_packets=0,in_pkts_delayed=0,tx_errors=0,rx_size_512_to_1023_packets=0,tx_priority4_xon_packets=0,rx_q0bytes=7092,in_pkts_unchecked=0,tx_size_512_to_1023_packets=0,rx_fcoe_packets=0,in_pkts_nosci=0,rx_priority6_mbuf_allocation_errors=0,rx_priority1_dropped=0,tx_q2packets=0,rx_priority7_dropped=0,tx_size_1024_to_max_packets=0,rx_management_packets=0,rx_multicast_packets=98,rx_total_bytes=7092,mac_remote_errors=0,tx_priority3_xon_packets=0,rx_priority2_mbuf_allocation_errors=0,rx_priority5_mbuf_allocation_errors=0,tx_q2bytes=0,rx_size_128_to_255_packets=0,in_pkts_badtag=0,out_pkts_protected=0,rx_management_dropped=0,rx_fcoe_bytes=0,flow_director_removed_filters=0,tx_priority2_xoff_packets=0,rx_fcoe_crc_errors=0,rx_priority0_mbuf_allocation_errors=0,rx_priority0_xon_packets=0,rx_fcoe_dropped=0,tx_priority1_xon_to_xoff_packets=0,rx_size_65_to_127_packets=98,rx_q0packets=98,tx_priority0_xoff_packets=0,rx_priority6_xon_packets=0,rx_total_packets=98,rx_undersize_errors=0,flow_director_missed_filters=0,rx_jabber_errors=0,in_pkts_invalid=0,in_pkts_late=0,rx_priority5_xon_packets=0,tx_priority4_xoff_packets=0,out_octets_encrypted=0,tx_q1packets=0,rx_priority5_xoff_packets=0,rx_priority6_xoff_packets=0,rx_errors=0,in_octets_validated=0,rx_priority3_xoff_packets=0,tx_priority4_xon_to_xoff_packets=0,tx_priority5_xon_to_xoff_packets=0,tx_flow_control_xon_packets=0,rx_priority0_dropped=0,flow_director_filter_add_errors=0,tx_q1bytes=0,tx_priority6_xon_to_xoff_packets=0,flow_director_matched_filters=0,tx_priority2_xon_packets=0,rx_fcoe_mbuf_allocation_errors=0,rx_priority2_xoff_packets=0,tx_priority7_xoff_packets=0,rx_priority0_xoff_packets=0,rx_oversize_errors=0,in_pkts_notusingsa=0,tx_size_64_packets=0,rx_size_1024_to_max_packets=0,tx_priority6_xon_packets=0,rx_priority2_dropped=0,rx_priority4_dropped=0,rx_priority7_mbuf_allocation_errors=0,rx_flow_control_xon_packets=0,tx_good_bytes=0,tx_priority3_xon_to_xoff_packets=0,rx_total_missed_packets=0,rx_error_bytes=0,tx_priority7_xon_packets=0,rx_mac_short_packet_dropped=0,tx_priority1_xoff_packets=0,tx_good_packets=0,tx_broadcast_packets=0,tx_xon_packets=0,in_pkts_unusedsa=0,rx_priority2_xon_packets=0,in_pkts_untagged=0,tx_fcoe_packets=0,flow_director_filter_remove_errors=0,rx_priority3_mbuf_allocation_errors=0,tx_priority0_xon_packets=0,rx_priority7_xon_packets=0,rx_fcoe_no_direct_data_placement_ext_buff=0,tx_xoff_packets=0,tx_size_256_to_511_packets=0 1606310780000000000
dpdk,command=/ethdev/link_status,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 status="UP",speed=10000,duplex="full-duplex" 1606310780000000000
dpdk,command=/ethdev/link_status,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 status="UP",speed=10000,duplex="full-duplex" 1606310780000000000
dpdk,command=/ethdev/link_status,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 status="UP",link_status=1,speed=10000,duplex="full-duplex" 1606310780000000000
dpdk,command=/ethdev/link_status,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 status="UP",link_status=1,speed=10000,duplex="full-duplex" 1606310780000000000
dpdk,command=/l3fwd-power/stats,dpdk_instance=l3fwd-power,host=dpdk-host empty_poll=49506395979901,full_poll=0,busy_percent=0 1606310780000000000
```
When running plugin configuration below...
```toml
[[inputs.dpdk]]
interval = "30s"
socket_access_timeout = "10s"
device_types = ["ethdev"]
metadata_fields = ["version", "pid"]
plugin_options = ["in_memory"]
[inputs.dpdk.ethdev]
exclude_commands = ["/ethdev/info", "/ethdev/stats", "/ethdev/xstats"]
```
Expected output for `dpdk` plugin instance running with `link_status` command
and all metadata fields enabled, additionally `link_status` field will be
exposed to represent string value of `status` field (`DOWN`=0,`UP`=1):
```text
dpdk,command=/ethdev/link_status,host=dpdk-host,params=0 pid=100988i,version="DPDK 21.11.2",status="DOWN",link_status=0i 1660295749000000000
dpdk,command=/ethdev/link_status,host=dpdk-host,params=0 pid=2401624i,version="DPDK 21.11.2",status="UP",link_status=1i 1660295749000000000
```

View File

@ -5,16 +5,18 @@ package dpdk
import (
_ "embed"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
//go:embed sample.conf
@ -28,26 +30,258 @@ const (
pluginName = "dpdk"
ethdevListCommand = "/ethdev/list"
rawdevListCommand = "/rawdev/list"
dpdkMetadataFieldPidName = "pid"
dpdkMetadataFieldVersionName = "version"
dpdkPluginOptionInMemory = "in_memory"
unreachableSocketBehaviorIgnore = "ignore"
unreachableSocketBehaviorError = "error"
)
type dpdk struct {
SocketPath string `toml:"socket_path"`
AccessTimeout config.Duration `toml:"socket_access_timeout"`
DeviceTypes []string `toml:"device_types"`
EthdevConfig ethdevConfig `toml:"ethdev"`
AdditionalCommands []string `toml:"additional_commands"`
Log telegraf.Logger `toml:"-"`
SocketPath string `toml:"socket_path"`
AccessTimeout config.Duration `toml:"socket_access_timeout"`
DeviceTypes []string `toml:"device_types"`
EthdevConfig ethdevConfig `toml:"ethdev"`
AdditionalCommands []string `toml:"additional_commands"`
MetadataFields []string `toml:"metadata_fields"`
PluginOptions []string `toml:"plugin_options"`
UnreachableSocketBehavior string `toml:"unreachable_socket_behavior"`
Log telegraf.Logger `toml:"-"`
connector *dpdkConnector
connectors []*dpdkConnector
rawdevCommands []string
ethdevCommands []string
ethdevExcludedCommandsFilter filter.Filter
socketGlobPath *globpath.GlobPath
}
type ethdevConfig struct {
EthdevExcludeCommands []string `toml:"exclude_commands"`
}
func (*dpdk) SampleConfig() string {
return sampleConfig
}
// Init performs validation of all parameters from configuration
func (dpdk *dpdk) Init() error {
dpdk.setupDefaultValues()
err := dpdk.validateAdditionalCommands()
if err != nil {
return err
}
if dpdk.AccessTimeout < 0 {
return errors.New("socket_access_timeout should be positive number or equal to 0 (to disable timeouts)")
}
if len(dpdk.AdditionalCommands) == 0 && len(dpdk.DeviceTypes) == 0 {
return errors.New("plugin was configured with nothing to read")
}
dpdk.ethdevExcludedCommandsFilter, err = filter.Compile(dpdk.EthdevConfig.EthdevExcludeCommands)
if err != nil {
return fmt.Errorf("error occurred during filter preparation for ethdev excluded commands: %w", err)
}
if err = choice.Check(dpdk.UnreachableSocketBehavior, []string{unreachableSocketBehaviorError, unreachableSocketBehaviorIgnore}); err != nil {
return fmt.Errorf("unreachable_socket_behavior: %w", err)
}
glob, err := globpath.Compile(dpdk.SocketPath + "*")
if err != nil {
return err
}
dpdk.socketGlobPath = glob
return nil
}
// Start implements ServiceInput interface
func (dpdk *dpdk) Start(telegraf.Accumulator) error {
return dpdk.maintainConnections()
}
func (dpdk *dpdk) Stop() {
for _, connector := range dpdk.connectors {
if err := connector.tryClose(); err != nil {
dpdk.Log.Warnf("Couldn't close connection for %q: %v", connector.pathToSocket, err)
}
}
dpdk.connectors = nil
}
// Gather function gathers all unique commands and processes each command sequentially
// Parallel processing could be achieved by running several instances of this plugin with different settings
func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error {
if err := dpdk.Start(acc); err != nil {
return err
}
for _, dpdkConn := range dpdk.connectors {
commands := dpdk.gatherCommands(acc, dpdkConn)
for _, command := range commands {
dpdkConn.processCommand(acc, dpdk.Log, command, dpdk.MetadataFields)
}
}
return nil
}
// Setup default values for dpdk
func (dpdk *dpdk) setupDefaultValues() {
if dpdk.SocketPath == "" {
dpdk.SocketPath = defaultPathToSocket
}
if dpdk.DeviceTypes == nil {
dpdk.DeviceTypes = []string{"ethdev"}
}
if dpdk.MetadataFields == nil {
dpdk.MetadataFields = []string{dpdkMetadataFieldPidName, dpdkMetadataFieldVersionName}
}
if dpdk.PluginOptions == nil {
dpdk.PluginOptions = []string{dpdkPluginOptionInMemory}
}
if len(dpdk.UnreachableSocketBehavior) == 0 {
dpdk.UnreachableSocketBehavior = unreachableSocketBehaviorError
}
dpdk.rawdevCommands = []string{"/rawdev/xstats"}
dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats", "/ethdev/info", ethdevLinkStatusCommand}
}
func (dpdk *dpdk) getDpdkInMemorySocketPaths() []string {
filePaths := dpdk.socketGlobPath.Match()
var results []string
for _, filePath := range filePaths {
fileInfo, err := os.Stat(filePath)
if err != nil || fileInfo.IsDir() || !strings.Contains(filePath, dpdkSocketTemplateName) {
continue
}
if isInMemorySocketPath(filePath, dpdk.SocketPath) {
results = append(results, filePath)
}
}
return results
}
// Checks that user-supplied commands are unique and match DPDK commands format
func (dpdk *dpdk) validateAdditionalCommands() error {
dpdk.AdditionalCommands = uniqueValues(dpdk.AdditionalCommands)
for _, cmd := range dpdk.AdditionalCommands {
if len(cmd) == 0 {
return errors.New("got empty command")
}
if cmd[0] != '/' {
return fmt.Errorf("%q command should start with slash", cmd)
}
if commandWithoutParams := stripParams(cmd); len(commandWithoutParams) >= maxCommandLength {
return fmt.Errorf("%q command is too long. It shall be less than %v characters", commandWithoutParams, maxCommandLength)
}
if len(cmd) >= maxCommandLengthWithParams {
return fmt.Errorf("command with parameters %q shall be less than %v characters", cmd, maxCommandLengthWithParams)
}
}
return nil
}
// Establishes connections do DPDK telemetry sockets
func (dpdk *dpdk) maintainConnections() error {
candidates := []string{dpdk.SocketPath}
if choice.Contains(dpdkPluginOptionInMemory, dpdk.PluginOptions) {
candidates = dpdk.getDpdkInMemorySocketPaths()
}
// Find sockets in the connected-sockets list that are not among
// the candidates anymore and thus need to be removed.
for i := 0; i < len(dpdk.connectors); i++ {
connector := dpdk.connectors[i]
if !choice.Contains(connector.pathToSocket, candidates) {
dpdk.Log.Debugf("Close unused connection: %s", connector.pathToSocket)
if closeErr := connector.tryClose(); closeErr != nil {
dpdk.Log.Warnf("Failed to close unused connection: %v", closeErr)
}
dpdk.connectors = append(dpdk.connectors[:i], dpdk.connectors[i+1:]...)
i--
}
}
// Find candidates that are not yet in the connected-sockets list as we
// need to connect to those.
for _, candidate := range candidates {
var found bool
for _, connector := range dpdk.connectors {
if candidate == connector.pathToSocket {
found = true
break
}
}
if !found {
connector := newDpdkConnector(candidate, dpdk.AccessTimeout)
connectionInitMessage, err := connector.connect()
if err != nil {
if dpdk.UnreachableSocketBehavior == unreachableSocketBehaviorError {
return fmt.Errorf("couldn't connect to socket %s: %w", candidate, err)
}
dpdk.Log.Warnf("Couldn't connect to socket %s: %v", candidate, err)
continue
}
dpdk.Log.Debugf("Successfully connected to the socket: %s. Version: %v running as process with PID %v with len %v",
candidate, connectionInitMessage.Version, connectionInitMessage.Pid, connectionInitMessage.MaxOutputLen)
dpdk.connectors = append(dpdk.connectors, connector)
}
}
if len(dpdk.connectors) == 0 {
errMsg := "no active sockets connections present"
if dpdk.UnreachableSocketBehavior == unreachableSocketBehaviorError {
return errors.New(errMsg)
}
dpdk.Log.Warnf("Unreachable socket issue occurred: %v", errMsg)
}
return nil
}
// Gathers all unique commands
func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator, dpdkConnector *dpdkConnector) []string {
var commands []string
if choice.Contains("ethdev", dpdk.DeviceTypes) {
ethdevCommands := removeSubset(dpdk.ethdevCommands, dpdk.ethdevExcludedCommandsFilter)
ethdevCommands, err := dpdkConnector.appendCommandsWithParamsFromList(ethdevListCommand, ethdevCommands)
if err != nil {
acc.AddError(fmt.Errorf("error occurred during fetching of %q params: %w", ethdevListCommand, err))
}
commands = append(commands, ethdevCommands...)
}
if choice.Contains("rawdev", dpdk.DeviceTypes) {
rawdevCommands, err := dpdkConnector.appendCommandsWithParamsFromList(rawdevListCommand, dpdk.rawdevCommands)
if err != nil {
acc.AddError(fmt.Errorf("error occurred during fetching of %q params: %w", rawdevListCommand, err))
}
commands = append(commands, rawdevCommands...)
}
commands = append(commands, dpdk.AdditionalCommands...)
return uniqueValues(commands)
}
func init() {
inputs.Add(pluginName, func() telegraf.Input {
dpdk := &dpdk{
@ -58,174 +292,3 @@ func init() {
return dpdk
})
}
func (*dpdk) SampleConfig() string {
return sampleConfig
}
// Performs validation of all parameters from configuration
func (dpdk *dpdk) Init() error {
if dpdk.SocketPath == "" {
dpdk.SocketPath = defaultPathToSocket
dpdk.Log.Debugf("using default '%v' path for socket_path", defaultPathToSocket)
}
if dpdk.DeviceTypes == nil {
dpdk.DeviceTypes = []string{"ethdev"}
}
var err error
if err := isSocket(dpdk.SocketPath); err != nil {
return err
}
dpdk.rawdevCommands = []string{"/rawdev/xstats"}
dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats", "/ethdev/link_status"}
if err := dpdk.validateCommands(); err != nil {
return err
}
if dpdk.AccessTimeout < 0 {
return fmt.Errorf("socket_access_timeout should be positive number or equal to 0 (to disable timeouts)")
}
if len(dpdk.AdditionalCommands) == 0 && len(dpdk.DeviceTypes) == 0 {
return fmt.Errorf("plugin was configured with nothing to read")
}
dpdk.ethdevExcludedCommandsFilter, err = filter.Compile(dpdk.EthdevConfig.EthdevExcludeCommands)
if err != nil {
return fmt.Errorf("error occurred during filter prepation for ethdev excluded commands: %w", err)
}
dpdk.connector = newDpdkConnector(dpdk.SocketPath, dpdk.AccessTimeout)
initMessage, err := dpdk.connector.connect()
if initMessage != nil {
dpdk.Log.Debugf("Successfully connected to %v running as process with PID %v with len %v",
initMessage.Version, initMessage.Pid, initMessage.MaxOutputLen)
}
return err
}
// Checks that user-supplied commands are unique and match DPDK commands format
func (dpdk *dpdk) validateCommands() error {
dpdk.AdditionalCommands = uniqueValues(dpdk.AdditionalCommands)
for _, commandWithParams := range dpdk.AdditionalCommands {
if len(commandWithParams) == 0 {
return fmt.Errorf("got empty command")
}
if commandWithParams[0] != '/' {
return fmt.Errorf("%q command should start with '/'", commandWithParams)
}
if commandWithoutParams := stripParams(commandWithParams); len(commandWithoutParams) >= maxCommandLength {
return fmt.Errorf("%q command is too long. It shall be less than %v characters", commandWithoutParams, maxCommandLength)
}
if len(commandWithParams) >= maxCommandLengthWithParams {
return fmt.Errorf("command with parameters %q shall be less than %v characters", commandWithParams, maxCommandLengthWithParams)
}
}
return nil
}
// Gathers all unique commands and processes each command sequentially
// Parallel processing could be achieved by running several instances of this plugin with different settings
func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error {
// This needs to be done during every `Gather(...)`, because DPDK can be restarted between consecutive
// `Gather(...)` cycles which can cause that it will be exposing different set of metrics.
commands := dpdk.gatherCommands(acc)
for _, command := range commands {
dpdk.processCommand(acc, command)
}
return nil
}
// Gathers all unique commands
func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator) []string {
var commands []string
if choice.Contains("ethdev", dpdk.DeviceTypes) {
ethdevCommands := removeSubset(dpdk.ethdevCommands, dpdk.ethdevExcludedCommandsFilter)
ethdevCommands, err := dpdk.appendCommandsWithParamsFromList(ethdevListCommand, ethdevCommands)
if err != nil {
acc.AddError(fmt.Errorf("error occurred during fetching of %q params: %w", ethdevListCommand, err))
}
commands = append(commands, ethdevCommands...)
}
if choice.Contains("rawdev", dpdk.DeviceTypes) {
rawdevCommands, err := dpdk.appendCommandsWithParamsFromList(rawdevListCommand, dpdk.rawdevCommands)
if err != nil {
acc.AddError(fmt.Errorf("error occurred during fetching of %q params: %w", rawdevListCommand, err))
}
commands = append(commands, rawdevCommands...)
}
commands = append(commands, dpdk.AdditionalCommands...)
return uniqueValues(commands)
}
// Fetches all identifiers of devices and then creates all possible combinations of commands for each device
func (dpdk *dpdk) appendCommandsWithParamsFromList(listCommand string, commands []string) ([]string, error) {
response, err := dpdk.connector.getCommandResponse(listCommand)
if err != nil {
return nil, err
}
params, err := jsonToArray(response, listCommand)
if err != nil {
return nil, err
}
result := make([]string, 0, len(commands)*len(params))
for _, command := range commands {
for _, param := range params {
result = append(result, commandWithParams(command, param))
}
}
return result, nil
}
// Executes command, parses response and creates/writes metric from response
func (dpdk *dpdk) processCommand(acc telegraf.Accumulator, commandWithParams string) {
buf, err := dpdk.connector.getCommandResponse(commandWithParams)
if err != nil {
acc.AddError(err)
return
}
var parsedResponse map[string]interface{}
err = json.Unmarshal(buf, &parsedResponse)
if err != nil {
acc.AddError(fmt.Errorf("failed to unmarshall json response from %q command: %w", commandWithParams, err))
return
}
command := stripParams(commandWithParams)
value := parsedResponse[command]
if isEmpty(value) {
acc.AddError(fmt.Errorf("got empty json on %q command", commandWithParams))
return
}
jf := jsonparser.JSONFlattener{}
err = jf.FullFlattenJSON("", value, true, true)
if err != nil {
acc.AddError(fmt.Errorf("failed to flatten response: %w", err))
return
}
acc.AddFields(pluginName, jf.Fields, map[string]string{
"command": command,
"params": getParams(commandWithParams),
})
}

View File

@ -0,0 +1,55 @@
//go:build linux
package dpdk
import (
"fmt"
"strings"
)
type linkStatus int64
const (
DOWN linkStatus = iota
UP
)
const (
ethdevLinkStatusCommand = "/ethdev/link_status"
linkStatusStringFieldName = "status"
linkStatusIntegerFieldName = "link_status"
)
var (
linkStatusMap = map[string]linkStatus{
"down": DOWN,
"up": UP,
}
)
func processCommandResponse(command string, data map[string]interface{}) error {
if command == ethdevLinkStatusCommand {
return processLinkStatusCmd(data)
}
return nil
}
func processLinkStatusCmd(data map[string]interface{}) error {
status, ok := data[linkStatusStringFieldName].(string)
if !ok {
return fmt.Errorf("can't find or parse %q field", linkStatusStringFieldName)
}
parsedLinkStatus, ok := parseLinkStatus(status)
if !ok {
return fmt.Errorf("can't parse linkStatus: unknown value: %q", status)
}
data[linkStatusIntegerFieldName] = int64(parsedLinkStatus)
return nil
}
func parseLinkStatus(s string) (linkStatus, bool) {
value, ok := linkStatusMap[strings.ToLower(s)]
return value, ok
}

View File

@ -0,0 +1,131 @@
//go:build linux
package dpdk
import (
"fmt"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
func Test_LinkStatusCommand(t *testing.T) {
t.Run("when 'status' field is DOWN then return 'link_status'=0", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := fmt.Sprintf(`{%q:{%q: "DOWN"}}`, ethdevLinkStatusCommand, linkStatusStringFieldName)
simulateResponse(mockConn, response, nil)
dpdkConn := dpdk.connectors[0]
dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil)
expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": ethdevLinkStatusCommand,
"params": "1",
},
map[string]interface{}{
linkStatusStringFieldName: "DOWN",
linkStatusIntegerFieldName: int64(0),
},
time.Unix(0, 0),
),
}
actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})
t.Run("when 'status' field is UP then return 'link_status'=1", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := fmt.Sprintf(`{%q:{%q: "UP"}}`, ethdevLinkStatusCommand, linkStatusStringFieldName)
simulateResponse(mockConn, response, nil)
dpdkConn := dpdk.connectors[0]
dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil)
expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": ethdevLinkStatusCommand,
"params": "1",
},
map[string]interface{}{
linkStatusStringFieldName: "UP",
linkStatusIntegerFieldName: int64(1),
},
time.Unix(0, 0),
),
}
actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})
t.Run("when link status output doesn't have any fields then don't return 'link_status' field", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := fmt.Sprintf(`{%q:{}}`, ethdevLinkStatusCommand)
simulateResponse(mockConn, response, nil)
dpdkConn := dpdk.connectors[0]
dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil)
actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, nil, actual, testutil.IgnoreTime())
})
t.Run("when link status output doesn't have status field then don't return 'link_status' field", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := fmt.Sprintf(`{%q:{"tag1": 1}}`, ethdevLinkStatusCommand)
simulateResponse(mockConn, response, nil)
dpdkConn := dpdk.connectors[0]
dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil)
expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": ethdevLinkStatusCommand,
"params": "1",
},
map[string]interface{}{
"tag1": float64(1),
},
time.Unix(0, 0),
),
}
actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})
t.Run("when link status output is invalid then don't return 'link_status' field", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := fmt.Sprintf(`{%q:{%q: "BOB"}}`, ethdevLinkStatusCommand, linkStatusStringFieldName)
simulateResponse(mockConn, response, nil)
dpdkConn := dpdk.connectors[0]
dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil)
expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": ethdevLinkStatusCommand,
"params": "1",
},
map[string]interface{}{
linkStatusStringFieldName: "BOB",
},
time.Unix(0, 0),
),
}
actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})
}

View File

@ -4,14 +4,20 @@ package dpdk
import (
"encoding/json"
"errors"
"fmt"
"net"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
const maxInitMessageLength = 1024
const (
maxInitMessageLength = 1024 // based on https://github.com/DPDK/dpdk/blob/v22.07/lib/telemetry/telemetry.c#L352
dpdkSocketTemplateName = "dpdk_telemetry"
)
type initMessage struct {
Version string `json:"version"`
@ -21,16 +27,14 @@ type initMessage struct {
type dpdkConnector struct {
pathToSocket string
maxOutputLen uint32
messageShowed bool
accessTimeout time.Duration
connection net.Conn
initMessage *initMessage
}
func newDpdkConnector(pathToSocket string, accessTimeout config.Duration) *dpdkConnector {
return &dpdkConnector{
pathToSocket: pathToSocket,
messageShowed: false,
accessTimeout: time.Duration(accessTimeout),
}
}
@ -38,26 +42,67 @@ func newDpdkConnector(pathToSocket string, accessTimeout config.Duration) *dpdkC
// Connects to the socket
// Since DPDK is a local unix socket, it is instantly returns error or connection, so there's no need to set timeout for it
func (conn *dpdkConnector) connect() (*initMessage, error) {
if err := isSocket(conn.pathToSocket); err != nil {
return nil, err
}
connection, err := net.Dial("unixpacket", conn.pathToSocket)
if err != nil {
return nil, fmt.Errorf("failed to connect to the socket: %w", err)
}
conn.connection = connection
result, err := conn.readMaxOutputLen()
conn.initMessage, err = conn.readInitMessage()
if err != nil {
if closeErr := conn.tryClose(); closeErr != nil {
return nil, fmt.Errorf("%w and failed to close connection: %w", err, closeErr)
}
return nil, err
}
return conn.initMessage, nil
}
// Add metadata fields to data
func (conn *dpdkConnector) addMetadataFields(metadataFields []string, data map[string]interface{}) {
if conn.initMessage == nil {
return
}
for _, field := range metadataFields {
switch field {
case dpdkMetadataFieldPidName:
data[dpdkMetadataFieldPidName] = conn.initMessage.Pid
case dpdkMetadataFieldVersionName:
data[dpdkMetadataFieldVersionName] = conn.initMessage.Version
}
}
}
// Fetches all identifiers of devices and then creates all possible combinations of commands for each device
func (conn *dpdkConnector) appendCommandsWithParamsFromList(listCommand string, commands []string) ([]string, error) {
response, err := conn.getCommandResponse(listCommand)
if err != nil {
return nil, err
}
params, err := jsonToArray(response, listCommand)
if err != nil {
return nil, err
}
result := make([]string, 0, len(commands)*len(params))
for _, command := range commands {
for _, param := range params {
result = append(result, commandWithParams(command, param))
}
}
return result, nil
}
// Executes command using provided connection and returns response
// If error (such as timeout) occurred, then connection is discarded and recreated
// because otherwise behaviour of connection is undefined (e.g. it could return result of timed out command instead of latest)
// because otherwise behavior of connection is undefined (e.g. it could return result of timed out command instead of latest)
func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error) {
connection, err := conn.getConnection()
if err != nil {
@ -77,7 +122,7 @@ func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error
return nil, fmt.Errorf("failed to send %q command: %w", fullCommand, err)
}
buf := make([]byte, conn.maxOutputLen)
buf := make([]byte, conn.initMessage.MaxOutputLen)
messageLength, err := connection.Read(buf)
if err != nil {
if closeErr := conn.tryClose(); closeErr != nil {
@ -92,6 +137,50 @@ func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error
return buf[:messageLength], nil
}
// Executes command, parses response and creates/writes metrics from response to accumulator
func (conn *dpdkConnector) processCommand(acc telegraf.Accumulator, log telegraf.Logger, commandWithParams string, metadataFields []string) {
buf, err := conn.getCommandResponse(commandWithParams)
if err != nil {
acc.AddError(err)
return
}
var parsedResponse map[string]interface{}
err = json.Unmarshal(buf, &parsedResponse)
if err != nil {
acc.AddError(fmt.Errorf("failed to unmarshal json response from %q command: %w", commandWithParams, err))
return
}
command := stripParams(commandWithParams)
value := parsedResponse[command]
if isEmpty(value) {
log.Warnf("got empty json on %q command", commandWithParams)
return
}
jf := jsonparser.JSONFlattener{}
err = jf.FullFlattenJSON("", value, true, true)
if err != nil {
acc.AddError(fmt.Errorf("failed to flatten response: %w", err))
return
}
err = processCommandResponse(command, jf.Fields)
if err != nil {
log.Warnf("Failed to process a response of the command: %s. Error: %v. Continue to handle data", command, err)
}
// Add metadata fields if required
conn.addMetadataFields(metadataFields, jf.Fields)
// Add common fields
acc.AddFields(pluginName, jf.Fields, map[string]string{
"command": command,
"params": getParams(commandWithParams),
})
}
func (conn *dpdkConnector) tryClose() error {
if conn.connection == nil {
return nil
@ -107,7 +196,7 @@ func (conn *dpdkConnector) tryClose() error {
func (conn *dpdkConnector) setTimeout() error {
if conn.connection == nil {
return fmt.Errorf("connection had not been established before")
return errors.New("connection had not been established before")
}
if conn.accessTimeout == 0 {
@ -128,7 +217,7 @@ func (conn *dpdkConnector) getConnection() (net.Conn, error) {
}
// Reads InitMessage for connection. Should be read for each connection, otherwise InitMessage is returned as response for first command.
func (conn *dpdkConnector) readMaxOutputLen() (*initMessage, error) {
func (conn *dpdkConnector) readInitMessage() (*initMessage, error) {
buf := make([]byte, maxInitMessageLength)
err := conn.setTimeout()
if err != nil {
@ -140,21 +229,15 @@ func (conn *dpdkConnector) readMaxOutputLen() (*initMessage, error) {
return nil, fmt.Errorf("failed to read InitMessage: %w", err)
}
var initMessage initMessage
err = json.Unmarshal(buf[:messageLength], &initMessage)
var connectionInitMessage initMessage
err = json.Unmarshal(buf[:messageLength], &connectionInitMessage)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
if initMessage.MaxOutputLen == 0 {
return nil, fmt.Errorf("failed to read maxOutputLen information")
if connectionInitMessage.MaxOutputLen == 0 {
return nil, errors.New("failed to read maxOutputLen information")
}
if !conn.messageShowed {
conn.maxOutputLen = initMessage.MaxOutputLen
conn.messageShowed = true
return &initMessage, nil
}
return nil, nil
return &connectionInitMessage, nil
}

View File

@ -4,26 +4,28 @@ package dpdk
import (
"encoding/json"
"fmt"
"errors"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs/dpdk/mocks"
"github.com/influxdata/telegraf/testutil"
)
func Test_readMaxOutputLen(t *testing.T) {
t.Run("should return error if timeout occurred", func(t *testing.T) {
conn := &mocks.Conn{}
conn.On("Read", mock.Anything).Return(0, fmt.Errorf("timeout"))
conn.On("Read", mock.Anything).Return(0, errors.New("timeout"))
conn.On("SetDeadline", mock.Anything).Return(nil)
connector := dpdkConnector{connection: conn}
_, err := connector.readMaxOutputLen()
initMessage, err := connector.readInitMessage()
require.Error(t, err)
require.Contains(t, err.Error(), "timeout")
require.Empty(t, initMessage)
})
t.Run("should pass and set maxOutputLen if provided with valid InitMessage", func(t *testing.T) {
@ -43,10 +45,10 @@ func Test_readMaxOutputLen(t *testing.T) {
conn.On("SetDeadline", mock.Anything).Return(nil)
connector := dpdkConnector{connection: conn}
_, err = connector.readMaxOutputLen()
initMsg, err := connector.readInitMessage()
require.NoError(t, err)
require.Equal(t, maxOutputLen, connector.maxOutputLen)
require.Equal(t, maxOutputLen, initMsg.MaxOutputLen)
})
t.Run("should fail if received invalid json", func(t *testing.T) {
@ -59,7 +61,7 @@ func Test_readMaxOutputLen(t *testing.T) {
conn.On("SetDeadline", mock.Anything).Return(nil)
connector := dpdkConnector{connection: conn}
_, err := connector.readMaxOutputLen()
_, err := connector.readInitMessage()
require.Error(t, err)
require.Contains(t, err.Error(), "looking for beginning of object key string")
@ -80,7 +82,7 @@ func Test_readMaxOutputLen(t *testing.T) {
conn.On("SetDeadline", mock.Anything).Return(nil)
connector := dpdkConnector{connection: conn}
_, err = connector.readMaxOutputLen()
_, err = connector.readInitMessage()
require.Error(t, err)
require.Contains(t, err.Error(), "failed to read maxOutputLen information")
@ -89,15 +91,15 @@ func Test_readMaxOutputLen(t *testing.T) {
func Test_connect(t *testing.T) {
t.Run("should pass if PathToSocket points to socket", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t)
pathToSocket, socket := createSocketForTest(t, "")
defer socket.Close()
dpdk := dpdk{
SocketPath: pathToSocket,
connector: newDpdkConnector(pathToSocket, 0),
connectors: []*dpdkConnector{newDpdkConnector(pathToSocket, 0)},
}
go simulateSocketResponse(socket, t)
_, err := dpdk.connector.connect()
_, err := dpdk.connectors[0].connect()
require.NoError(t, err)
})
@ -112,18 +114,20 @@ func Test_getCommandResponse(t *testing.T) {
defer mockConn.AssertExpectations(t)
simulateResponse(mockConn, response, nil)
buf, err := dpdk.connector.getCommandResponse(command)
for _, connector := range dpdk.connectors {
buf, err := connector.getCommandResponse(command)
require.NoError(t, err)
require.Equal(t, len(response), len(buf))
require.Equal(t, response, string(buf))
require.NoError(t, err)
require.Equal(t, len(response), len(buf))
require.Equal(t, response, string(buf))
}
})
t.Run("should return error if failed to get connection handler", func(t *testing.T) {
_, dpdk, _ := prepareEnvironment()
dpdk.connector.connection = nil
dpdk.connectors[0].connection = nil
buf, err := dpdk.connector.getCommandResponse(command)
buf, err := dpdk.connectors[0].getCommandResponse(command)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to get connection to execute \"/\" command")
@ -133,9 +137,9 @@ func Test_getCommandResponse(t *testing.T) {
t.Run("should return error if failed to set timeout duration", func(t *testing.T) {
mockConn, dpdk, _ := prepareEnvironment()
defer mockConn.AssertExpectations(t)
mockConn.On("SetDeadline", mock.Anything).Return(fmt.Errorf("deadline error"))
mockConn.On("SetDeadline", mock.Anything).Return(errors.New("deadline error"))
buf, err := dpdk.connector.getCommandResponse(command)
buf, err := dpdk.connectors[0].getCommandResponse(command)
require.Error(t, err)
require.Contains(t, err.Error(), "deadline error")
@ -145,11 +149,11 @@ func Test_getCommandResponse(t *testing.T) {
t.Run("should return error if timeout occurred during Write operation", func(t *testing.T) {
mockConn, dpdk, _ := prepareEnvironment()
defer mockConn.AssertExpectations(t)
mockConn.On("Write", mock.Anything).Return(0, fmt.Errorf("write timeout"))
mockConn.On("Write", mock.Anything).Return(0, errors.New("write timeout"))
mockConn.On("SetDeadline", mock.Anything).Return(nil)
mockConn.On("Close").Return(nil)
buf, err := dpdk.connector.getCommandResponse(command)
buf, err := dpdk.connectors[0].getCommandResponse(command)
require.Error(t, err)
require.Contains(t, err.Error(), "write timeout")
@ -159,9 +163,9 @@ func Test_getCommandResponse(t *testing.T) {
t.Run("should return error if timeout occurred during Read operation", func(t *testing.T) {
mockConn, dpdk, _ := prepareEnvironment()
defer mockConn.AssertExpectations(t)
simulateResponse(mockConn, "", fmt.Errorf("read timeout"))
simulateResponse(mockConn, "", errors.New("read timeout"))
buf, err := dpdk.connector.getCommandResponse(command)
buf, err := dpdk.connectors[0].getCommandResponse(command)
require.Error(t, err)
require.Contains(t, err.Error(), "read timeout")
@ -173,10 +177,65 @@ func Test_getCommandResponse(t *testing.T) {
defer mockConn.AssertExpectations(t)
simulateResponse(mockConn, "", nil)
buf, err := dpdk.connector.getCommandResponse(command)
buf, err := dpdk.connectors[0].getCommandResponse(command)
require.Error(t, err)
require.Empty(t, buf)
require.Contains(t, err.Error(), "got empty response during execution of")
})
}
func Test_processCommand(t *testing.T) {
t.Run("should pass if received valid response", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := `{"/": ["/", "/eal/app_params", "/eal/params", "/ethdev/link_status, /ethdev/info"]}`
simulateResponse(mockConn, response, nil)
for _, dpdkConn := range dpdk.connectors {
dpdkConn.processCommand(mockAcc, testutil.Logger{}, "/", nil)
}
require.Empty(t, mockAcc.Errors)
})
t.Run("if received a non-JSON object then should return error", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := `notAJson`
simulateResponse(mockConn, response, nil)
for _, dpdkConn := range dpdk.connectors {
dpdkConn.processCommand(mockAcc, testutil.Logger{}, "/", nil)
}
require.Len(t, mockAcc.Errors, 1)
require.Contains(t, mockAcc.Errors[0].Error(), "invalid character")
})
t.Run("if failed to get command response then accumulator should contain error", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
mockConn.On("Write", mock.Anything).Return(0, errors.New("deadline exceeded"))
mockConn.On("SetDeadline", mock.Anything).Return(nil)
mockConn.On("Close").Return(nil)
for _, dpdkConn := range dpdk.connectors {
dpdkConn.processCommand(mockAcc, testutil.Logger{}, "/", nil)
}
require.Len(t, mockAcc.Errors, 1)
require.Contains(t, mockAcc.Errors[0].Error(), "deadline exceeded")
})
t.Run("if response contains nil or empty value then error shouldn't be returned in accumulator", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := `{"/test": null}`
simulateResponse(mockConn, response, nil)
for _, dpdkConn := range dpdk.connectors {
dpdkConn.processCommand(mockAcc, testutil.Logger{}, "/test,param", nil)
}
require.Empty(t, mockAcc.Errors)
})
}

View File

@ -5,7 +5,10 @@ package dpdk
import (
"encoding/json"
"fmt"
"math/rand"
"net"
"os"
"path/filepath"
"strings"
"testing"
"time"
@ -15,14 +18,18 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs/dpdk/mocks"
"github.com/influxdata/telegraf/testutil"
)
func Test_Init(t *testing.T) {
t.Run("when SocketPath field isn't set then it should be set to default value", func(t *testing.T) {
_, dpdk, _ := prepareEnvironment()
dpdk.SocketPath = ""
dpdk := dpdk{
Log: testutil.Logger{},
SocketPath: "",
}
require.Equal(t, "", dpdk.SocketPath)
_ = dpdk.Init()
@ -30,10 +37,31 @@ func Test_Init(t *testing.T) {
require.Equal(t, defaultPathToSocket, dpdk.SocketPath)
})
t.Run("when Metadata Fields isn't set then it should be set to default value (dpdk_pid)", func(t *testing.T) {
dpdk := dpdk{
Log: testutil.Logger{},
}
require.Nil(t, dpdk.MetadataFields)
_ = dpdk.Init()
require.Equal(t, []string{dpdkMetadataFieldPidName, dpdkMetadataFieldVersionName}, dpdk.MetadataFields)
})
t.Run("when PluginOptions field isn't set then it should be set to default value (in_memory)", func(t *testing.T) {
dpdk := dpdk{
Log: testutil.Logger{},
}
require.Nil(t, dpdk.PluginOptions)
_ = dpdk.Init()
require.Equal(t, []string{dpdkPluginOptionInMemory}, dpdk.PluginOptions)
})
t.Run("when commands are in invalid format (doesn't start with '/') then error should be returned", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t)
pathToSocket, socket := createSocketForTest(t, "")
defer socket.Close()
dpdk := dpdk{
Log: testutil.Logger{},
SocketPath: pathToSocket,
AdditionalCommands: []string{"invalid"},
}
@ -41,26 +69,22 @@ func Test_Init(t *testing.T) {
err := dpdk.Init()
require.Error(t, err)
require.Contains(t, err.Error(), "command should start with '/'")
require.Contains(t, err.Error(), "command should start with slash")
})
t.Run("when all values are valid, then no error should be returned", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t)
defer socket.Close()
t.Run("when AccessTime is < 0 then error should be returned", func(t *testing.T) {
dpdk := dpdk{
SocketPath: pathToSocket,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
Log: testutil.Logger{},
AccessTimeout: -1,
}
go simulateSocketResponse(socket, t)
err := dpdk.Init()
require.NoError(t, err)
require.Error(t, err)
require.Contains(t, err.Error(), "socket_access_timeout should be positive number")
})
t.Run("when device_types and additional_commands are empty, then error should be returned", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t)
pathToSocket, socket := createSocketForTest(t, "")
defer socket.Close()
dpdk := dpdk{
SocketPath: pathToSocket,
@ -74,15 +98,241 @@ func Test_Init(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "plugin was configured with nothing to read")
})
t.Run("when UnreachableSocketBehavior specified with unknown value - err should be returned", func(t *testing.T) {
dpdk := dpdk{
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
UnreachableSocketBehavior: "whatisthat",
}
err := dpdk.Init()
require.Error(t, err)
require.Contains(t, err.Error(), "unreachable_socket_behavior")
})
}
func Test_validateCommands(t *testing.T) {
func Test_Start(t *testing.T) {
t.Run("when socket doesn't exist err should be returned", func(t *testing.T) {
dpdk := dpdk{
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
}
err := dpdk.Init()
require.NoError(t, err)
err = dpdk.Start(nil)
require.Error(t, err)
require.Contains(t, err.Error(), "no active sockets connections present")
})
t.Run("when socket doesn't exist, but UnreachableSocketBehavior is Ignore err shouldn't be returned", func(t *testing.T) {
dpdk := dpdk{
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
UnreachableSocketBehavior: unreachableSocketBehaviorIgnore,
}
err := dpdk.Init()
require.NoError(t, err)
err = dpdk.Start(nil)
require.NoError(t, err)
})
t.Run("when all values are valid, then no error should be returned", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t, "")
defer socket.Close()
dpdk := dpdk{
SocketPath: pathToSocket,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
}
err := dpdk.Init()
require.NoError(t, err)
go simulateSocketResponse(socket, t)
err = dpdk.Start(nil)
require.NoError(t, err)
})
}
func TestMaintainConnections(t *testing.T) {
t.Run("maintainConnections should return the error if socket doesn't exist", func(t *testing.T) {
dpdk := dpdk{
SocketPath: "/tmp/justrandompath",
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
UnreachableSocketBehavior: unreachableSocketBehaviorError,
}
require.Empty(t, dpdk.connectors)
err := dpdk.maintainConnections()
defer dpdk.Stop()
require.Error(t, err)
require.Contains(t, err.Error(), "couldn't connect to socket")
})
t.Run("maintainConnections should return the error if socket not found with dpdkPluginOptionInMemory", func(t *testing.T) {
dpdk := dpdk{
SocketPath: defaultPathToSocket,
Log: testutil.Logger{},
PluginOptions: []string{dpdkPluginOptionInMemory},
UnreachableSocketBehavior: unreachableSocketBehaviorError,
}
var err error
dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath)
require.NoError(t, err)
require.Empty(t, dpdk.connectors)
err = dpdk.maintainConnections()
require.Error(t, err)
require.Contains(t, err.Error(), "no active sockets connections present")
})
t.Run("maintainConnections shouldn't return error with 1 socket", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t, "")
defer socket.Close()
dpdk := dpdk{
SocketPath: pathToSocket,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
}
go simulateSocketResponse(socket, t)
require.Empty(t, dpdk.connectors)
err := dpdk.maintainConnections()
defer dpdk.Stop()
require.NoError(t, err)
require.Len(t, dpdk.connectors, 1)
})
t.Run("maintainConnections shouldn't return error with multiple sockets", func(t *testing.T) {
numSockets := rand.Intn(5) + 1
pathToSockets, sockets := createMultipleSocketsForTest(t, numSockets, "")
defer func() {
for _, socket := range sockets {
socket.Close()
}
}()
dpdk := dpdk{
SocketPath: pathToSockets[0],
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
PluginOptions: []string{dpdkPluginOptionInMemory},
}
var err error
dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath)
require.NoError(t, err)
for _, socket := range sockets {
go simulateSocketResponse(socket, t)
}
require.Empty(t, dpdk.connectors)
err = dpdk.maintainConnections()
defer dpdk.Stop()
require.NoError(t, err)
require.Len(t, dpdk.connectors, numSockets)
})
t.Run("Test maintainConnections without dpdkPluginOptionInMemory option", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t, "")
defer socket.Close()
dpdk := dpdk{
SocketPath: pathToSocket,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
}
go simulateSocketResponse(socket, t)
require.Empty(t, dpdk.connectors)
err := dpdk.maintainConnections()
require.NoError(t, err)
require.Len(t, dpdk.connectors, 1)
dpdk.Stop()
require.Empty(t, dpdk.connectors)
})
t.Run("Test maintainConnections with dpdkPluginOptionInMemory option", func(t *testing.T) {
pathToSocket1, socket1 := createSocketForTest(t, "")
defer socket1.Close()
go simulateSocketResponse(socket1, t)
dpdk := dpdk{
SocketPath: pathToSocket1,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
PluginOptions: []string{dpdkPluginOptionInMemory},
}
var err error
dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath)
require.NoError(t, err)
require.Empty(t, dpdk.connectors)
err = dpdk.maintainConnections()
require.NoError(t, err)
require.Len(t, dpdk.connectors, 1)
// Adding 2 sockets more
pathToSocket2, socket2 := createSocketForTest(t, filepath.Dir(pathToSocket1))
pathToSocket3, socket3 := createSocketForTest(t, filepath.Dir(pathToSocket1))
require.NotEqual(t, pathToSocket2, pathToSocket3)
go simulateSocketResponse(socket2, t)
go simulateSocketResponse(socket3, t)
err = dpdk.maintainConnections()
require.NoError(t, err)
require.Len(t, dpdk.connectors, 3)
// Close 2 new sockets
socket2.Close()
socket3.Close()
err = dpdk.maintainConnections()
require.NoError(t, err)
require.Len(t, dpdk.connectors, 1)
require.Equal(t, pathToSocket1, dpdk.connectors[0].pathToSocket)
dpdk.Stop()
require.Empty(t, dpdk.connectors)
})
}
func TestClose(t *testing.T) {
t.Run("Num of connections should be 0 after Stop func", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t, "")
defer socket.Close()
dpdk := dpdk{
SocketPath: pathToSocket,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
}
go simulateSocketResponse(socket, t)
require.Empty(t, dpdk.connectors)
err := dpdk.maintainConnections()
require.NoError(t, err)
require.Len(t, dpdk.connectors, 1)
dpdk.Stop()
require.Empty(t, dpdk.connectors)
})
}
func Test_validateAdditionalCommands(t *testing.T) {
t.Run("when validating commands in correct format then no error should be returned", func(t *testing.T) {
dpdk := dpdk{
AdditionalCommands: []string{"/test", "/help"},
}
err := dpdk.validateCommands()
err := dpdk.validateAdditionalCommands()
require.NoError(t, err)
})
@ -94,10 +344,10 @@ func Test_validateCommands(t *testing.T) {
},
}
err := dpdk.validateCommands()
err := dpdk.validateAdditionalCommands()
require.Error(t, err)
require.Contains(t, err.Error(), "command should start with '/'")
require.Contains(t, err.Error(), "command should start with slash")
})
t.Run("when validating long command (without parameters) then error should be returned", func(t *testing.T) {
@ -107,7 +357,7 @@ func Test_validateCommands(t *testing.T) {
},
}
err := dpdk.validateCommands()
err := dpdk.validateAdditionalCommands()
require.Error(t, err)
require.Contains(t, err.Error(), "command is too long")
@ -120,7 +370,7 @@ func Test_validateCommands(t *testing.T) {
},
}
err := dpdk.validateCommands()
err := dpdk.validateAdditionalCommands()
require.Error(t, err)
require.Contains(t, err.Error(), "shall be less than 1024 characters")
@ -133,7 +383,7 @@ func Test_validateCommands(t *testing.T) {
},
}
err := dpdk.validateCommands()
err := dpdk.validateAdditionalCommands()
require.Error(t, err)
require.Contains(t, err.Error(), "got empty command")
@ -147,7 +397,7 @@ func Test_validateCommands(t *testing.T) {
}
require.Len(t, dpdk.AdditionalCommands, 2)
err := dpdk.validateCommands()
err := dpdk.validateAdditionalCommands()
require.Len(t, dpdk.AdditionalCommands, 1)
require.NoError(t, err)
@ -157,65 +407,62 @@ func Test_validateCommands(t *testing.T) {
func prepareEnvironment() (*mocks.Conn, dpdk, *testutil.Accumulator) {
mockConnection := &mocks.Conn{}
dpdk := dpdk{
connector: &dpdkConnector{
connection: mockConnection,
maxOutputLen: 1024,
connectors: []*dpdkConnector{{
connection: mockConnection,
initMessage: &initMessage{
Version: "mockedDPDK",
Pid: 1,
MaxOutputLen: 1024,
},
accessTimeout: 2 * time.Second,
},
}},
Log: testutil.Logger{},
}
mockAcc := &testutil.Accumulator{}
return mockConnection, dpdk, mockAcc
}
func Test_processCommand(t *testing.T) {
t.Run("should pass if received valid response", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := `{"/": ["/", "/eal/app_params", "/eal/params", "/ethdev/link_status"]}`
simulateResponse(mockConn, response, nil)
func prepareEnvironmentWithMultiSockets() ([]*mocks.Conn, dpdk, *testutil.Accumulator) {
mockConnections := []*mocks.Conn{{}, {}}
dpdk := dpdk{
connectors: []*dpdkConnector{
{
connection: mockConnections[0],
initMessage: &initMessage{
Version: "mockedDPDK",
Pid: 1,
MaxOutputLen: 1024,
},
accessTimeout: 2 * time.Second,
},
{
connection: mockConnections[1],
initMessage: &initMessage{
Version: "mockedDPDK",
Pid: 2,
MaxOutputLen: 1024,
},
accessTimeout: 2 * time.Second,
},
},
Log: testutil.Logger{},
}
mockAcc := &testutil.Accumulator{}
return mockConnections, dpdk, mockAcc
}
dpdk.processCommand(mockAcc, "/")
require.Empty(t, mockAcc.Errors)
})
t.Run("if received a non-JSON object then should return error", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := `notAJson`
simulateResponse(mockConn, response, nil)
dpdk.processCommand(mockAcc, "/")
require.Len(t, mockAcc.Errors, 1)
require.Contains(t, mockAcc.Errors[0].Error(), "invalid character")
})
t.Run("if failed to get command response then accumulator should contain error", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
mockConn.On("Write", mock.Anything).Return(0, fmt.Errorf("deadline exceeded"))
mockConn.On("SetDeadline", mock.Anything).Return(nil)
mockConn.On("Close").Return(nil)
dpdk.processCommand(mockAcc, "/")
require.Len(t, mockAcc.Errors, 1)
require.Contains(t, mockAcc.Errors[0].Error(), "deadline exceeded")
})
t.Run("if response contains nil or empty value then error should be returned in accumulator", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
response := `{"/test": null}`
simulateResponse(mockConn, response, nil)
dpdk.processCommand(mockAcc, "/test,param")
require.Len(t, mockAcc.Errors, 1)
require.Contains(t, mockAcc.Errors[0].Error(), "got empty json on")
})
func prepareEnvironmentWithInitializedMessage(initMsg *initMessage) (*mocks.Conn, dpdk, *testutil.Accumulator) {
mockConnection := &mocks.Conn{}
dpdk := dpdk{
connectors: []*dpdkConnector{{
connection: mockConnection,
accessTimeout: 2 * time.Second,
initMessage: initMsg,
}},
Log: testutil.Logger{},
}
mockAcc := &testutil.Accumulator{}
return mockConnection, dpdk, mockAcc
}
func Test_appendCommandsWithParams(t *testing.T) {
@ -226,11 +473,12 @@ func Test_appendCommandsWithParams(t *testing.T) {
simulateResponse(mockConn, response, nil)
expectedCommands := []string{"/action1,1", "/action1,123", "/action2,1", "/action2,123"}
result, err := dpdk.appendCommandsWithParamsFromList("/testendpoint", []string{"/action1", "/action2"})
require.NoError(t, err)
require.Len(t, result, 4)
require.ElementsMatch(t, result, expectedCommands)
for _, dpdkConn := range dpdk.connectors {
result, err := dpdkConn.appendCommandsWithParamsFromList("/testendpoint", []string{"/action1", "/action2"})
require.NoError(t, err)
require.Len(t, result, 4)
require.ElementsMatch(t, result, expectedCommands)
}
})
}
@ -246,7 +494,7 @@ func Test_getCommandsAndParamsCombinations(t *testing.T) {
dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats"}
dpdk.ethdevExcludedCommandsFilter, _ = filter.Compile([]string{})
dpdk.AdditionalCommands = []string{}
commands := dpdk.gatherCommands(mockAcc)
commands := dpdk.gatherCommands(mockAcc, dpdk.connectors[0])
require.ElementsMatch(t, commands, expectedCommands)
require.Empty(t, mockAcc.Errors)
@ -262,7 +510,7 @@ func Test_getCommandsAndParamsCombinations(t *testing.T) {
dpdk.DeviceTypes = []string{"rawdev"}
dpdk.rawdevCommands = []string{"/rawdev/xstats"}
dpdk.AdditionalCommands = []string{}
commands := dpdk.gatherCommands(mockAcc)
commands := dpdk.gatherCommands(mockAcc, dpdk.connectors[0])
require.ElementsMatch(t, commands, expectedCommands)
require.Empty(t, mockAcc.Errors)
@ -279,7 +527,7 @@ func Test_getCommandsAndParamsCombinations(t *testing.T) {
dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats"}
dpdk.ethdevExcludedCommandsFilter, _ = filter.Compile([]string{"/ethdev/xstats"})
dpdk.AdditionalCommands = []string{}
commands := dpdk.gatherCommands(mockAcc)
commands := dpdk.gatherCommands(mockAcc, dpdk.connectors[0])
require.ElementsMatch(t, commands, expectedCommands)
require.Empty(t, mockAcc.Errors)
@ -294,14 +542,105 @@ func Test_getCommandsAndParamsCombinations(t *testing.T) {
dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats"}
dpdk.ethdevExcludedCommandsFilter, _ = filter.Compile([]string{})
dpdk.AdditionalCommands = []string{}
commands := dpdk.gatherCommands(mockAcc)
commands := dpdk.gatherCommands(mockAcc, dpdk.connectors[0])
require.Empty(t, commands)
require.Len(t, mockAcc.Errors, 1)
})
}
func Test_getDpdkInMemorySocketPaths(t *testing.T) {
var err error
t.Run("Should return nil if path doesn't exist", func(t *testing.T) {
dpdk := dpdk{
SocketPath: "/tmp/nothing-should-exist-here/test.socket",
Log: testutil.Logger{},
}
dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath)
require.NoError(t, err)
socketsPaths := dpdk.getDpdkInMemorySocketPaths()
require.Nil(t, socketsPaths)
})
t.Run("Should return nil if can't read the dir", func(t *testing.T) {
dpdk := dpdk{
SocketPath: "/root/no_access",
Log: testutil.Logger{},
}
dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath)
require.NoError(t, err)
socketsPaths := dpdk.getDpdkInMemorySocketPaths()
require.Nil(t, socketsPaths)
})
t.Run("Should return one socket from socket path", func(t *testing.T) {
socketPath, socket := createSocketForTest(t, "")
defer socket.Close()
dpdk := dpdk{
SocketPath: socketPath,
Log: testutil.Logger{},
}
dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath)
require.NoError(t, err)
socketsPaths := dpdk.getDpdkInMemorySocketPaths()
require.Len(t, socketsPaths, 1)
require.Equal(t, socketPath, socketsPaths[0])
})
t.Run("Should return 2 sockets from socket path", func(t *testing.T) {
socketPaths, sockets := createMultipleSocketsForTest(t, 2, "")
defer func() {
for _, socket := range sockets {
socket.Close()
}
}()
dpdk := dpdk{
SocketPath: socketPaths[0],
Log: testutil.Logger{},
}
dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath)
require.NoError(t, err)
socketsPathsFromFunc := dpdk.getDpdkInMemorySocketPaths()
require.Len(t, socketsPathsFromFunc, 2)
require.Equal(t, socketPaths, socketsPathsFromFunc)
})
}
func Test_Gather(t *testing.T) {
t.Run("Gather should return error, because socket weren't created", func(t *testing.T) {
mockAcc := &testutil.Accumulator{}
dpdk := dpdk{
Log: testutil.Logger{},
PluginOptions: []string{},
}
_ = dpdk.Init()
err := dpdk.Gather(mockAcc)
require.Error(t, err)
require.Contains(t, err.Error(), "couldn't connect to socket")
})
t.Run("Gather shouldn't return error with UnreachableSocketBehavior: Ignore option, because socket weren't created", func(t *testing.T) {
mockAcc := &testutil.Accumulator{}
dpdk := dpdk{
Log: testutil.Logger{},
PluginOptions: []string{},
UnreachableSocketBehavior: unreachableSocketBehaviorIgnore,
}
_ = dpdk.Init()
err := dpdk.Gather(mockAcc)
require.NoError(t, err)
})
t.Run("When parsing a plain json without nested object, then its key should be equal to \"\"", func(t *testing.T) {
mockConn, dpdk, mockAcc := prepareEnvironment()
defer mockConn.AssertExpectations(t)
@ -360,6 +699,164 @@ func Test_Gather(t *testing.T) {
actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})
t.Run("Test Gather with Metadata Fields dpdk pid and version", func(t *testing.T) {
testInitMessage := &initMessage{
Pid: 100,
Version: "DPDK 21.11.11",
MaxOutputLen: 1024,
}
mockConn, dpdk, mockAcc := prepareEnvironmentWithInitializedMessage(testInitMessage)
dpdk.MetadataFields = []string{dpdkMetadataFieldPidName, dpdkMetadataFieldVersionName}
defer mockConn.AssertExpectations(t)
dpdk.AdditionalCommands = []string{"/endpoint1"}
simulateResponse(mockConn, `{"/endpoint1":"myvalue"}`, nil)
err := dpdk.Gather(mockAcc)
require.NoError(t, err)
require.Empty(t, mockAcc.Errors)
expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": "/endpoint1",
"params": "",
},
map[string]interface{}{
"": "myvalue",
dpdkMetadataFieldPidName: testInitMessage.Pid,
dpdkMetadataFieldVersionName: testInitMessage.Version,
},
time.Unix(0, 0),
),
}
actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})
t.Run("Test Gather with Metadata Fields dpdk_pid", func(t *testing.T) {
testInitMessage := &initMessage{
Pid: 100,
Version: "DPDK 21.11.11",
MaxOutputLen: 1024,
}
mockConn, dpdk, mockAcc := prepareEnvironmentWithInitializedMessage(testInitMessage)
dpdk.MetadataFields = []string{dpdkMetadataFieldPidName}
defer mockConn.AssertExpectations(t)
dpdk.AdditionalCommands = []string{"/endpoint1"}
simulateResponse(mockConn, `{"/endpoint1":"myvalue"}`, nil)
err := dpdk.Gather(mockAcc)
require.NoError(t, err)
require.Empty(t, mockAcc.Errors)
expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": "/endpoint1",
"params": "",
},
map[string]interface{}{
"": "myvalue",
dpdkMetadataFieldPidName: testInitMessage.Pid,
},
time.Unix(0, 0),
),
}
actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})
t.Run("Test Gather without Metadata Fields", func(t *testing.T) {
testInitMessage := &initMessage{
Pid: 100,
Version: "DPDK 21.11.11",
MaxOutputLen: 1024,
}
mockConn, dpdk, mockAcc := prepareEnvironmentWithInitializedMessage(testInitMessage)
dpdk.MetadataFields = []string{}
defer mockConn.AssertExpectations(t)
dpdk.AdditionalCommands = []string{"/endpoint1"}
simulateResponse(mockConn, `{"/endpoint1":"myvalue"}`, nil)
err := dpdk.Gather(mockAcc)
require.NoError(t, err)
require.Empty(t, mockAcc.Errors)
expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": "/endpoint1",
"params": "",
},
map[string]interface{}{
"": "myvalue",
},
time.Unix(0, 0),
),
}
actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})
}
func Test_Gather_MultiSocket(t *testing.T) {
t.Run("Test Gather without Metadata Fields", func(t *testing.T) {
mockConns, dpdk, mockAcc := prepareEnvironmentWithMultiSockets()
dpdk.MetadataFields = []string{}
defer func() {
for _, mockConn := range mockConns {
mockConn.AssertExpectations(t)
}
}()
dpdk.AdditionalCommands = []string{"/endpoint1"}
for _, mockConn := range mockConns {
simulateResponse(mockConn, `{"/endpoint1":"myvalue"}`, nil)
}
err := dpdk.Gather(mockAcc)
require.NoError(t, err)
require.Empty(t, mockAcc.Errors)
expected := []telegraf.Metric{
testutil.MustMetric(
"dpdk",
map[string]string{
"command": "/endpoint1",
"params": "",
},
map[string]interface{}{
"": "myvalue",
},
time.Unix(0, 0),
),
testutil.MustMetric(
"dpdk",
map[string]string{
"command": "/endpoint1",
"params": "",
},
map[string]interface{}{
"": "myvalue",
},
time.Unix(0, 0),
),
}
actual := mockAcc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
})
}
func simulateResponse(mockConn *mocks.Conn, response string, readErr error) {
@ -375,6 +872,44 @@ func simulateResponse(mockConn *mocks.Conn, response string, readErr error) {
}
}
func createSocketForTest(t *testing.T, dirPath string) (string, net.Listener) {
var err error
var pathToSocket string
if len(dirPath) == 0 {
dirPath, err = os.MkdirTemp("", "dpdk-test-socket")
require.NoError(t, err)
pathToSocket = filepath.Join(dirPath, dpdkSocketTemplateName)
} else {
pathToSocket = fmt.Sprintf("%s:%d", filepath.Join(dirPath, dpdkSocketTemplateName), rand.Intn(100)+1)
}
socket, err := net.Listen("unixpacket", pathToSocket)
require.NoError(t, err)
return pathToSocket, socket
}
func createMultipleSocketsForTest(t *testing.T, numSockets int, dirPath string) (socketsPaths []string, sockets []net.Listener) {
var err error
if len(dirPath) == 0 {
dirPath, err = os.MkdirTemp("", "dpdk-test-socket")
}
require.NoError(t, err)
for i := 0; i < numSockets; i++ {
var pathToSocket string
if i == 0 {
pathToSocket = filepath.Join(dirPath, dpdkSocketTemplateName)
} else {
pathToSocket = filepath.Join(dirPath, fmt.Sprintf("%s:%d", dpdkSocketTemplateName, 1000+i))
}
socket, err := net.Listen("unixpacket", pathToSocket)
require.NoError(t, err)
socketsPaths = append(socketsPaths, pathToSocket)
sockets = append(sockets, socket)
}
return socketsPaths, sockets
}
func simulateSocketResponse(socket net.Listener, t *testing.T) {
conn, err := socket.Accept()
require.NoError(t, err)
@ -385,3 +920,7 @@ func simulateSocketResponse(socket net.Listener, t *testing.T) {
_, err = conn.Write(initMessage)
require.NoError(t, err)
}
func prepareGlob(path string) (*globpath.GlobPath, error) {
return globpath.Compile(path + "*")
}

View File

@ -4,6 +4,7 @@ package dpdk
import (
"encoding/json"
"errors"
"fmt"
"os"
"reflect"
@ -39,6 +40,25 @@ func getParams(command string) string {
return command[index+1:]
}
// Checks if the provided filePath contains in-memory socket
func isInMemorySocketPath(filePath, socketPath string) bool {
if filePath == socketPath {
return true
}
socketPathPrefix := fmt.Sprintf("%s:", socketPath)
if strings.HasPrefix(filePath, socketPathPrefix) {
suffix := filePath[len(socketPathPrefix):]
if number, err := strconv.Atoi(suffix); err == nil {
if number > 0 {
return true
}
}
}
return false
}
// Checks if provided path points to socket
func isSocket(path string) error {
pathInfo, err := os.Lstat(path)
@ -60,7 +80,7 @@ func isSocket(path string) error {
// Converts JSON array containing devices identifiers from DPDK response to string slice
func jsonToArray(input []byte, command string) ([]string, error) {
if len(input) == 0 {
return nil, fmt.Errorf("got empty object instead of json")
return nil, errors.New("got empty object instead of json")
}
var rawMessage map[string]json.RawMessage
@ -72,7 +92,7 @@ func jsonToArray(input []byte, command string) ([]string, error) {
var intArray []int64
err = json.Unmarshal(rawMessage[command], &intArray)
if err != nil {
return nil, fmt.Errorf("failed to unmarshall json response: %w", err)
return nil, fmt.Errorf("failed to unmarshal json response: %w", err)
}
stringArray := make([]string, 0, len(intArray))

View File

@ -4,7 +4,6 @@ package dpdk
import (
"fmt"
"net"
"os"
"strconv"
"testing"
@ -20,8 +19,8 @@ func Test_isSocket(t *testing.T) {
require.Contains(t, err.Error(), "provided path does not exist")
})
t.Run("should pass if path points to socket", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t)
t.Run("Should pass if path points to socket", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t, "")
defer socket.Close()
err := isSocket(pathToSocket)
@ -125,13 +124,6 @@ func Test_jsonToArray(t *testing.T) {
_, err := jsonToArray([]byte(jsonString), key)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to unmarshall json response")
require.Contains(t, err.Error(), "failed to unmarshal json response")
})
}
func createSocketForTest(t *testing.T) (string, net.Listener) {
pathToSocket := "/tmp/dpdk-test-socket"
socket, err := net.Listen("unixpacket", pathToSocket)
require.NoError(t, err)
return pathToSocket, socket
}

View File

@ -14,20 +14,37 @@
# socket_access_timeout = "200ms"
## Enables telemetry data collection for selected device types.
## Adding "ethdev" enables collection of telemetry from DPDK NICs
## (stats, xstats, link_status).
## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices
## (xstats).
## Adding "ethdev" enables collection of telemetry from DPDK NICs (stats, xstats, link_status, info).
## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices (xstats).
# device_types = ["ethdev"]
## List of custom, application-specific telemetry commands to query
## The list of available commands depend on the application deployed.
## Applications can register their own commands via telemetry library API
## http://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands
## https://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands
## For L3 Forwarding with Power Management Sample Application this could be:
## additional_commands = ["/l3fwd-power/stats"]
# additional_commands = []
## List of plugin options.
## Supported options:
## - "in_memory" option enables reading for multiple sockets when a dpdk application is running with --in-memory option.
## When option is enabled plugin will try to find additional socket paths related to provided socket_path.
## Details: https://doc.dpdk.org/guides/howto/telemetry.html#connecting-to-different-dpdk-processes
# plugin_options = ["in_memory"]
## Specifies plugin behavior regarding unreachable socket (which might not have been initialized yet).
## Available choices:
## - error: Telegraf will return an error during the startup and gather phases if socket is unreachable
## - ignore: Telegraf will ignore error regarding unreachable socket on both startup and gather
# unreachable_socket_behavior = "error"
## List of metadata fields which will be added to every metric produced by the plugin.
## Supported options:
## - "pid" - exposes PID of DPDK process. Example: pid=2179660i
## - "version" - exposes version of DPDK. Example: version="DPDK 21.11.2"
# metadata_fields = ["pid", "version"]
## Allows turning off collecting data for individual "ethdev" commands.
## Remove "/ethdev/link_status" from list to gather link status metrics.
[inputs.dpdk.ethdev]