diff --git a/plugins/inputs/dpdk/README.md b/plugins/inputs/dpdk/README.md index 2c7a0d1dc..639485773 100644 --- a/plugins/inputs/dpdk/README.md +++ b/plugins/inputs/dpdk/README.md @@ -22,6 +22,14 @@ driver-specific extended stats (`/ethdev/xstats`) via this new interface. introduced reading via `v2` interface common statistics (`/ethdev/stats`) in addition to existing (`/ethdev/xstats`). +[DPDK Release 21.11](https://doc.dpdk.org/guides/rel_notes/release_21_11.html) +introduced reading via `v2` interface additional ethernet device information +(`/ethdev/info`). +This version also adds support for exposing telemetry from multiple +`--in-memory` instances of DPDK via dedicated sockets. +The plugin supports reading from those sockets when `in_memory` +option is set. + The example usage of `v2` telemetry interface can be found in [Telemetry User Guide](https://doc.dpdk.org/guides/howto/telemetry.html). A variety of [DPDK Sample Applications](https://doc.dpdk.org/guides/sample_app_ug/index.html) is @@ -35,14 +43,17 @@ and to explore the exposed metrics. > `DPDK 20.05 <= version < DPDK 20.11` it is recommended to disable querying > `/ethdev/stats` by setting corresponding `exclude_commands` configuration > option. +> > **NOTE:** Since DPDK will most likely run with root privileges, the socket > telemetry interface exposed by DPDK will also require root access. This means > that either access permissions have to be adjusted for socket telemetry > interface to allow Telegraf to access it, or Telegraf should run with root > privileges. -> **NOTE:** The DPDK socket must exist for Telegraf to start successfully. -> Telegraf will attempt to connect to the DPDK socket during the initialization -> phase. +> +> **NOTE:** There are known issues with exposing telemetry from multiple +> `--in-memory` instances while using `DPDK 21.11.1`. The recommended version +> to use in conjunction with `in_memory` plugin option is `DPDK 21.11.2` +> or higher. ## Global configuration options @@ -72,20 +83,37 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # socket_access_timeout = "200ms" ## Enables telemetry data collection for selected device types. - ## Adding "ethdev" enables collection of telemetry from DPDK NICs - ## (stats, xstats, link_status). - ## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices - ## (xstats). + ## Adding "ethdev" enables collection of telemetry from DPDK NICs (stats, xstats, link_status, info). + ## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices (xstats). # device_types = ["ethdev"] ## List of custom, application-specific telemetry commands to query ## The list of available commands depend on the application deployed. ## Applications can register their own commands via telemetry library API - ## http://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands + ## https://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands ## For L3 Forwarding with Power Management Sample Application this could be: ## additional_commands = ["/l3fwd-power/stats"] # additional_commands = [] + ## List of plugin options. + ## Supported options: + ## - "in_memory" option enables reading for multiple sockets when a dpdk application is running with --in-memory option. + ## When option is enabled plugin will try to find additional socket paths related to provided socket_path. + ## Details: https://doc.dpdk.org/guides/howto/telemetry.html#connecting-to-different-dpdk-processes + # plugin_options = ["in_memory"] + + ## Specifies plugin behavior regarding unreachable socket (which might not have been initialized yet). + ## Available choices: + ## - error: Telegraf will return an error during the startup and gather phases if socket is unreachable + ## - ignore: Telegraf will ignore error regarding unreachable socket on both startup and gather + # unreachable_socket_behavior = "error" + + ## List of metadata fields which will be added to every metric produced by the plugin. + ## Supported options: + ## - "pid" - exposes PID of DPDK process. Example: pid=2179660i + ## - "version" - exposes version of DPDK. Example: version="DPDK 21.11.2" + # metadata_fields = ["pid", "version"] + ## Allows turning off collecting data for individual "ethdev" commands. ## Remove "/ethdev/link_status" from list to gather link status metrics. [inputs.dpdk.ethdev] @@ -107,6 +135,7 @@ for additional usage information. This configuration allows getting metrics for all devices reported via `/ethdev/list` command: +* `/ethdev/info` - device information: name, MAC address, buffers size, etc. (since `DPDK 21.11`) * `/ethdev/stats` - basic device statistics (since `DPDK 20.11`) * `/ethdev/xstats` - extended device statistics * `/ethdev/link_status` - up/down link status @@ -125,7 +154,7 @@ should be adjusted accordingly (e.g. `interval = "30s"`). ### Example: Excluding NIC link status from being collected Checking link status depending on underlying implementation may take more time -to complete. This configuration can be used to exclude this telemetry command +to complete. This configuration can be used to exclude this telemetry command to allow faster response for metrics. ```toml @@ -155,7 +184,7 @@ configuration, with higher timeout. device_types = ["ethdev"] [inputs.dpdk.ethdev] - exclude_commands = ["/ethdev/stats", "/ethdev/xstats"] + exclude_commands = ["/ethdev/info", "/ethdev/stats", "/ethdev/xstats"] ``` ### Example: Getting application-specific metrics @@ -183,7 +212,7 @@ format: * Commands have to start with `/`. Providing invalid commands will prevent the plugin from starting. Additional -commands allow duplicates, but they will be removed during execution so each +commands allow duplicates, but they will be removed during execution, so each command will be executed only once during each metric gathering interval. [sample-app]: https://doc.dpdk.org/guides/sample_app_ug/l3_forward_power_man.html @@ -232,7 +261,7 @@ JSON Flattener](../../parsers/json/README.md) and exposed as fields. If DPDK response contains no information (is empty or is null) then such response will be discarded. -> **NOTE:** Since DPDK allows registering custom metrics in its telemetry +> **NOTE:** Since DPDK allows registering custom metrics in its telemetry > framework the JSON response from DPDK may contain various sets of metrics. > While metrics from `/ethdev/stats` should be most stable, the `/ethdev/xstats` > may contain driver-specific metrics (depending on DPDK application @@ -253,7 +282,7 @@ dpdk,host=dpdk-host,dpdk_instance=l3fwd-power,command=/ethdev/stats,params=0 [fi | `host` | hostname of the machine (consult [Telegraf Agent configuration](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#agent) for additional details) | | `dpdk_instance` | custom tag from `[inputs.dpdk.tags]` (optional) | | `command` | executed command (without params) | -| `params` | command parameter, e.g. for `/ethdev/stats` it is the id of NIC as exposed by `/ethdev/list`. For DPDK app that uses 2 NICs the metrics will output e.g. `params=0`, `params=1`. | +| `params` | command parameter, e.g. for `/ethdev/stats` it is the ID of NIC as exposed by `/ethdev/list`. For DPDK app that uses 2 NICs the metrics will output e.g. `params=0`, `params=1`. | When running plugin configuration below... @@ -261,6 +290,7 @@ When running plugin configuration below... [[inputs.dpdk]] device_types = ["ethdev"] additional_commands = ["/l3fwd-power/stats"] + metadata_fields = [] [inputs.dpdk.tags] dpdk_instance = "l3fwd-power" ``` @@ -269,11 +299,35 @@ When running plugin configuration below... `host=dpdk-host`: ```text +dpdk,command=/ethdev/info,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 all_multicast=0,dev_configured=1,dev_flags=74,dev_started=1,ethdev_rss_hf=0,lro=0,mac_addr="E4:3D:1A:DD:13:31",mtu=1500,name="0000:ca:00.1",nb_rx_queues=1,nb_tx_queues=1,numa_node=1,port_id=0,promiscuous=1,rx_mbuf_alloc_fail=0,rx_mbuf_size_min=2176,rx_offloads=0,rxq_state_0=1,scattered_rx=0,state=1,tx_offloads=65536,txq_state_0=1 1659017414000000000 dpdk,command=/ethdev/stats,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 q_opackets_0=0,q_ipackets_5=0,q_errors_11=0,ierrors=0,q_obytes_5=0,q_obytes_10=0,q_opackets_10=0,q_ipackets_4=0,q_ipackets_7=0,q_ipackets_15=0,q_ibytes_5=0,q_ibytes_6=0,q_ibytes_9=0,obytes=0,q_opackets_1=0,q_opackets_11=0,q_obytes_7=0,q_errors_5=0,q_errors_10=0,q_ibytes_4=0,q_obytes_6=0,q_errors_1=0,q_opackets_5=0,q_errors_3=0,q_errors_12=0,q_ipackets_11=0,q_ipackets_12=0,q_obytes_14=0,q_opackets_15=0,q_obytes_2=0,q_errors_8=0,q_opackets_12=0,q_errors_0=0,q_errors_9=0,q_opackets_14=0,q_ibytes_3=0,q_ibytes_15=0,q_ipackets_13=0,q_ipackets_14=0,q_obytes_3=0,q_errors_13=0,q_opackets_3=0,q_ibytes_0=7092,q_ibytes_2=0,q_ibytes_8=0,q_ipackets_8=0,q_ipackets_10=0,q_obytes_4=0,q_ibytes_10=0,q_ibytes_13=0,q_ibytes_1=0,q_ibytes_12=0,opackets=0,q_obytes_1=0,q_errors_15=0,q_opackets_2=0,oerrors=0,rx_nombuf=0,q_opackets_8=0,q_ibytes_11=0,q_ipackets_3=0,q_obytes_0=0,q_obytes_12=0,q_obytes_11=0,q_obytes_13=0,q_errors_6=0,q_ipackets_1=0,q_ipackets_6=0,q_ipackets_9=0,q_obytes_15=0,q_opackets_7=0,q_ibytes_14=0,ipackets=98,q_ipackets_2=0,q_opackets_6=0,q_ibytes_7=0,imissed=0,q_opackets_4=0,q_opackets_9=0,q_obytes_8=0,q_obytes_9=0,q_errors_4=0,q_errors_14=0,q_opackets_13=0,ibytes=7092,q_ipackets_0=98,q_errors_2=0,q_errors_7=0 1606310780000000000 dpdk,command=/ethdev/stats,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 q_opackets_0=0,q_ipackets_5=0,q_errors_11=0,ierrors=0,q_obytes_5=0,q_obytes_10=0,q_opackets_10=0,q_ipackets_4=0,q_ipackets_7=0,q_ipackets_15=0,q_ibytes_5=0,q_ibytes_6=0,q_ibytes_9=0,obytes=0,q_opackets_1=0,q_opackets_11=0,q_obytes_7=0,q_errors_5=0,q_errors_10=0,q_ibytes_4=0,q_obytes_6=0,q_errors_1=0,q_opackets_5=0,q_errors_3=0,q_errors_12=0,q_ipackets_11=0,q_ipackets_12=0,q_obytes_14=0,q_opackets_15=0,q_obytes_2=0,q_errors_8=0,q_opackets_12=0,q_errors_0=0,q_errors_9=0,q_opackets_14=0,q_ibytes_3=0,q_ibytes_15=0,q_ipackets_13=0,q_ipackets_14=0,q_obytes_3=0,q_errors_13=0,q_opackets_3=0,q_ibytes_0=7092,q_ibytes_2=0,q_ibytes_8=0,q_ipackets_8=0,q_ipackets_10=0,q_obytes_4=0,q_ibytes_10=0,q_ibytes_13=0,q_ibytes_1=0,q_ibytes_12=0,opackets=0,q_obytes_1=0,q_errors_15=0,q_opackets_2=0,oerrors=0,rx_nombuf=0,q_opackets_8=0,q_ibytes_11=0,q_ipackets_3=0,q_obytes_0=0,q_obytes_12=0,q_obytes_11=0,q_obytes_13=0,q_errors_6=0,q_ipackets_1=0,q_ipackets_6=0,q_ipackets_9=0,q_obytes_15=0,q_opackets_7=0,q_ibytes_14=0,ipackets=98,q_ipackets_2=0,q_opackets_6=0,q_ibytes_7=0,imissed=0,q_opackets_4=0,q_opackets_9=0,q_obytes_8=0,q_obytes_9=0,q_errors_4=0,q_errors_14=0,q_opackets_13=0,ibytes=7092,q_ipackets_0=98,q_errors_2=0,q_errors_7=0 1606310780000000000 dpdk,command=/ethdev/xstats,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 out_octets_encrypted=0,rx_fcoe_mbuf_allocation_errors=0,tx_q1packets=0,rx_priority0_xoff_packets=0,rx_priority7_xoff_packets=0,rx_errors=0,mac_remote_errors=0,in_pkts_invalid=0,tx_priority3_xoff_packets=0,tx_errors=0,rx_fcoe_bytes=0,rx_flow_control_xon_packets=0,rx_priority4_xoff_packets=0,tx_priority2_xoff_packets=0,rx_illegal_byte_errors=0,rx_xoff_packets=0,rx_management_packets=0,rx_priority7_dropped=0,rx_priority4_dropped=0,in_pkts_unchecked=0,rx_error_bytes=0,rx_size_256_to_511_packets=0,tx_priority4_xoff_packets=0,rx_priority6_xon_packets=0,tx_priority4_xon_to_xoff_packets=0,in_pkts_delayed=0,rx_priority0_mbuf_allocation_errors=0,out_octets_protected=0,tx_priority7_xon_to_xoff_packets=0,tx_priority1_xon_to_xoff_packets=0,rx_fcoe_no_direct_data_placement_ext_buff=0,tx_priority6_xon_to_xoff_packets=0,flow_director_filter_add_errors=0,rx_total_packets=99,rx_crc_errors=0,flow_director_filter_remove_errors=0,rx_missed_errors=0,tx_size_64_packets=0,rx_priority3_dropped=0,flow_director_matched_filters=0,tx_priority2_xon_to_xoff_packets=0,rx_priority1_xon_packets=0,rx_size_65_to_127_packets=99,rx_fragment_errors=0,in_pkts_notusingsa=0,rx_q0bytes=7162,rx_fcoe_dropped=0,rx_priority1_dropped=0,rx_fcoe_packets=0,rx_priority5_xoff_packets=0,out_pkts_protected=0,tx_total_packets=0,rx_priority2_dropped=0,in_pkts_late=0,tx_q1bytes=0,in_pkts_badtag=0,rx_multicast_packets=99,rx_priority6_xoff_packets=0,tx_flow_control_xoff_packets=0,rx_flow_control_xoff_packets=0,rx_priority0_xon_packets=0,in_pkts_untagged=0,tx_fcoe_packets=0,rx_priority7_mbuf_allocation_errors=0,tx_priority0_xon_to_xoff_packets=0,tx_priority5_xon_to_xoff_packets=0,tx_flow_control_xon_packets=0,tx_q0packets=0,tx_xoff_packets=0,rx_size_512_to_1023_packets=0,rx_priority3_xon_packets=0,rx_q0errors=0,rx_oversize_errors=0,tx_priority4_xon_packets=0,tx_priority5_xoff_packets=0,rx_priority5_xon_packets=0,rx_total_missed_packets=0,rx_priority4_mbuf_allocation_errors=0,tx_priority1_xon_packets=0,tx_management_packets=0,rx_priority5_mbuf_allocation_errors=0,rx_fcoe_no_direct_data_placement=0,rx_undersize_errors=0,tx_priority1_xoff_packets=0,rx_q0packets=99,tx_q2packets=0,tx_priority6_xon_packets=0,rx_good_packets=99,tx_priority5_xon_packets=0,tx_size_256_to_511_packets=0,rx_priority6_dropped=0,rx_broadcast_packets=0,tx_size_512_to_1023_packets=0,tx_priority3_xon_to_xoff_packets=0,in_pkts_unknownsci=0,in_octets_validated=0,tx_priority6_xoff_packets=0,tx_priority7_xoff_packets=0,rx_jabber_errors=0,tx_priority7_xon_packets=0,tx_priority0_xon_packets=0,in_pkts_unusedsa=0,tx_priority0_xoff_packets=0,mac_local_errors=33,rx_total_bytes=7162,in_pkts_notvalid=0,rx_length_errors=0,in_octets_decrypted=0,rx_size_128_to_255_packets=0,rx_good_bytes=7162,tx_size_65_to_127_packets=0,rx_mac_short_packet_dropped=0,tx_size_1024_to_max_packets=0,rx_priority2_mbuf_allocation_errors=0,flow_director_added_filters=0,tx_multicast_packets=0,rx_fcoe_crc_errors=0,rx_priority1_xoff_packets=0,flow_director_missed_filters=0,rx_xon_packets=0,tx_size_128_to_255_packets=0,out_pkts_encrypted=0,rx_priority4_xon_packets=0,rx_priority0_dropped=0,rx_size_1024_to_max_packets=0,tx_good_bytes=0,rx_management_dropped=0,rx_mbuf_allocation_errors=0,tx_xon_packets=0,rx_priority3_xoff_packets=0,tx_good_packets=0,tx_fcoe_bytes=0,rx_priority6_mbuf_allocation_errors=0,rx_priority2_xon_packets=0,tx_broadcast_packets=0,tx_q2bytes=0,rx_priority7_xon_packets=0,out_pkts_untagged=0,rx_priority2_xoff_packets=0,rx_priority1_mbuf_allocation_errors=0,tx_q0bytes=0,rx_size_64_packets=0,rx_priority5_dropped=0,tx_priority2_xon_packets=0,in_pkts_nosci=0,flow_director_removed_filters=0,in_pkts_ok=0,rx_l3_l4_xsum_error=0,rx_priority3_mbuf_allocation_errors=0,tx_priority3_xon_packets=0 1606310780000000000 dpdk,command=/ethdev/xstats,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 tx_priority5_xoff_packets=0,in_pkts_unknownsci=0,tx_q0packets=0,tx_total_packets=0,rx_crc_errors=0,rx_priority4_xoff_packets=0,rx_priority5_dropped=0,tx_size_65_to_127_packets=0,rx_good_packets=98,tx_priority6_xoff_packets=0,tx_fcoe_bytes=0,out_octets_protected=0,out_pkts_encrypted=0,rx_priority1_xon_packets=0,tx_size_128_to_255_packets=0,rx_flow_control_xoff_packets=0,rx_priority7_xoff_packets=0,tx_priority0_xon_to_xoff_packets=0,rx_broadcast_packets=0,tx_priority1_xon_packets=0,rx_xon_packets=0,rx_fragment_errors=0,tx_flow_control_xoff_packets=0,tx_q0bytes=0,out_pkts_untagged=0,rx_priority4_xon_packets=0,tx_priority5_xon_packets=0,rx_priority1_xoff_packets=0,rx_good_bytes=7092,rx_priority4_mbuf_allocation_errors=0,in_octets_decrypted=0,tx_priority2_xon_to_xoff_packets=0,rx_priority3_dropped=0,tx_multicast_packets=0,mac_local_errors=33,in_pkts_ok=0,rx_illegal_byte_errors=0,rx_xoff_packets=0,rx_q0errors=0,flow_director_added_filters=0,rx_size_256_to_511_packets=0,rx_priority3_xon_packets=0,rx_l3_l4_xsum_error=0,rx_priority6_dropped=0,in_pkts_notvalid=0,rx_size_64_packets=0,tx_management_packets=0,rx_length_errors=0,tx_priority7_xon_to_xoff_packets=0,rx_mbuf_allocation_errors=0,rx_missed_errors=0,rx_priority1_mbuf_allocation_errors=0,rx_fcoe_no_direct_data_placement=0,tx_priority3_xoff_packets=0,in_pkts_delayed=0,tx_errors=0,rx_size_512_to_1023_packets=0,tx_priority4_xon_packets=0,rx_q0bytes=7092,in_pkts_unchecked=0,tx_size_512_to_1023_packets=0,rx_fcoe_packets=0,in_pkts_nosci=0,rx_priority6_mbuf_allocation_errors=0,rx_priority1_dropped=0,tx_q2packets=0,rx_priority7_dropped=0,tx_size_1024_to_max_packets=0,rx_management_packets=0,rx_multicast_packets=98,rx_total_bytes=7092,mac_remote_errors=0,tx_priority3_xon_packets=0,rx_priority2_mbuf_allocation_errors=0,rx_priority5_mbuf_allocation_errors=0,tx_q2bytes=0,rx_size_128_to_255_packets=0,in_pkts_badtag=0,out_pkts_protected=0,rx_management_dropped=0,rx_fcoe_bytes=0,flow_director_removed_filters=0,tx_priority2_xoff_packets=0,rx_fcoe_crc_errors=0,rx_priority0_mbuf_allocation_errors=0,rx_priority0_xon_packets=0,rx_fcoe_dropped=0,tx_priority1_xon_to_xoff_packets=0,rx_size_65_to_127_packets=98,rx_q0packets=98,tx_priority0_xoff_packets=0,rx_priority6_xon_packets=0,rx_total_packets=98,rx_undersize_errors=0,flow_director_missed_filters=0,rx_jabber_errors=0,in_pkts_invalid=0,in_pkts_late=0,rx_priority5_xon_packets=0,tx_priority4_xoff_packets=0,out_octets_encrypted=0,tx_q1packets=0,rx_priority5_xoff_packets=0,rx_priority6_xoff_packets=0,rx_errors=0,in_octets_validated=0,rx_priority3_xoff_packets=0,tx_priority4_xon_to_xoff_packets=0,tx_priority5_xon_to_xoff_packets=0,tx_flow_control_xon_packets=0,rx_priority0_dropped=0,flow_director_filter_add_errors=0,tx_q1bytes=0,tx_priority6_xon_to_xoff_packets=0,flow_director_matched_filters=0,tx_priority2_xon_packets=0,rx_fcoe_mbuf_allocation_errors=0,rx_priority2_xoff_packets=0,tx_priority7_xoff_packets=0,rx_priority0_xoff_packets=0,rx_oversize_errors=0,in_pkts_notusingsa=0,tx_size_64_packets=0,rx_size_1024_to_max_packets=0,tx_priority6_xon_packets=0,rx_priority2_dropped=0,rx_priority4_dropped=0,rx_priority7_mbuf_allocation_errors=0,rx_flow_control_xon_packets=0,tx_good_bytes=0,tx_priority3_xon_to_xoff_packets=0,rx_total_missed_packets=0,rx_error_bytes=0,tx_priority7_xon_packets=0,rx_mac_short_packet_dropped=0,tx_priority1_xoff_packets=0,tx_good_packets=0,tx_broadcast_packets=0,tx_xon_packets=0,in_pkts_unusedsa=0,rx_priority2_xon_packets=0,in_pkts_untagged=0,tx_fcoe_packets=0,flow_director_filter_remove_errors=0,rx_priority3_mbuf_allocation_errors=0,tx_priority0_xon_packets=0,rx_priority7_xon_packets=0,rx_fcoe_no_direct_data_placement_ext_buff=0,tx_xoff_packets=0,tx_size_256_to_511_packets=0 1606310780000000000 -dpdk,command=/ethdev/link_status,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 status="UP",speed=10000,duplex="full-duplex" 1606310780000000000 -dpdk,command=/ethdev/link_status,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 status="UP",speed=10000,duplex="full-duplex" 1606310780000000000 +dpdk,command=/ethdev/link_status,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 status="UP",link_status=1,speed=10000,duplex="full-duplex" 1606310780000000000 +dpdk,command=/ethdev/link_status,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 status="UP",link_status=1,speed=10000,duplex="full-duplex" 1606310780000000000 dpdk,command=/l3fwd-power/stats,dpdk_instance=l3fwd-power,host=dpdk-host empty_poll=49506395979901,full_poll=0,busy_percent=0 1606310780000000000 ``` + +When running plugin configuration below... + +```toml +[[inputs.dpdk]] + interval = "30s" + socket_access_timeout = "10s" + device_types = ["ethdev"] + metadata_fields = ["version", "pid"] + plugin_options = ["in_memory"] + + [inputs.dpdk.ethdev] + exclude_commands = ["/ethdev/info", "/ethdev/stats", "/ethdev/xstats"] +``` + +Expected output for `dpdk` plugin instance running with `link_status` command +and all metadata fields enabled, additionally `link_status` field will be +exposed to represent string value of `status` field (`DOWN`=0,`UP`=1): + +```text +dpdk,command=/ethdev/link_status,host=dpdk-host,params=0 pid=100988i,version="DPDK 21.11.2",status="DOWN",link_status=0i 1660295749000000000 +dpdk,command=/ethdev/link_status,host=dpdk-host,params=0 pid=2401624i,version="DPDK 21.11.2",status="UP",link_status=1i 1660295749000000000 +``` diff --git a/plugins/inputs/dpdk/dpdk.go b/plugins/inputs/dpdk/dpdk.go index 726adb1fd..2007b512f 100644 --- a/plugins/inputs/dpdk/dpdk.go +++ b/plugins/inputs/dpdk/dpdk.go @@ -5,16 +5,18 @@ package dpdk import ( _ "embed" - "encoding/json" + "errors" "fmt" + "os" + "strings" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs" - jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) //go:embed sample.conf @@ -28,26 +30,258 @@ const ( pluginName = "dpdk" ethdevListCommand = "/ethdev/list" rawdevListCommand = "/rawdev/list" + + dpdkMetadataFieldPidName = "pid" + dpdkMetadataFieldVersionName = "version" + + dpdkPluginOptionInMemory = "in_memory" + + unreachableSocketBehaviorIgnore = "ignore" + unreachableSocketBehaviorError = "error" ) type dpdk struct { - SocketPath string `toml:"socket_path"` - AccessTimeout config.Duration `toml:"socket_access_timeout"` - DeviceTypes []string `toml:"device_types"` - EthdevConfig ethdevConfig `toml:"ethdev"` - AdditionalCommands []string `toml:"additional_commands"` - Log telegraf.Logger `toml:"-"` + SocketPath string `toml:"socket_path"` + AccessTimeout config.Duration `toml:"socket_access_timeout"` + DeviceTypes []string `toml:"device_types"` + EthdevConfig ethdevConfig `toml:"ethdev"` + AdditionalCommands []string `toml:"additional_commands"` + MetadataFields []string `toml:"metadata_fields"` + PluginOptions []string `toml:"plugin_options"` + UnreachableSocketBehavior string `toml:"unreachable_socket_behavior"` + Log telegraf.Logger `toml:"-"` - connector *dpdkConnector + connectors []*dpdkConnector rawdevCommands []string ethdevCommands []string ethdevExcludedCommandsFilter filter.Filter + socketGlobPath *globpath.GlobPath } type ethdevConfig struct { EthdevExcludeCommands []string `toml:"exclude_commands"` } +func (*dpdk) SampleConfig() string { + return sampleConfig +} + +// Init performs validation of all parameters from configuration +func (dpdk *dpdk) Init() error { + dpdk.setupDefaultValues() + + err := dpdk.validateAdditionalCommands() + if err != nil { + return err + } + + if dpdk.AccessTimeout < 0 { + return errors.New("socket_access_timeout should be positive number or equal to 0 (to disable timeouts)") + } + + if len(dpdk.AdditionalCommands) == 0 && len(dpdk.DeviceTypes) == 0 { + return errors.New("plugin was configured with nothing to read") + } + + dpdk.ethdevExcludedCommandsFilter, err = filter.Compile(dpdk.EthdevConfig.EthdevExcludeCommands) + if err != nil { + return fmt.Errorf("error occurred during filter preparation for ethdev excluded commands: %w", err) + } + + if err = choice.Check(dpdk.UnreachableSocketBehavior, []string{unreachableSocketBehaviorError, unreachableSocketBehaviorIgnore}); err != nil { + return fmt.Errorf("unreachable_socket_behavior: %w", err) + } + + glob, err := globpath.Compile(dpdk.SocketPath + "*") + if err != nil { + return err + } + dpdk.socketGlobPath = glob + return nil +} + +// Start implements ServiceInput interface +func (dpdk *dpdk) Start(telegraf.Accumulator) error { + return dpdk.maintainConnections() +} + +func (dpdk *dpdk) Stop() { + for _, connector := range dpdk.connectors { + if err := connector.tryClose(); err != nil { + dpdk.Log.Warnf("Couldn't close connection for %q: %v", connector.pathToSocket, err) + } + } + dpdk.connectors = nil +} + +// Gather function gathers all unique commands and processes each command sequentially +// Parallel processing could be achieved by running several instances of this plugin with different settings +func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error { + if err := dpdk.Start(acc); err != nil { + return err + } + + for _, dpdkConn := range dpdk.connectors { + commands := dpdk.gatherCommands(acc, dpdkConn) + for _, command := range commands { + dpdkConn.processCommand(acc, dpdk.Log, command, dpdk.MetadataFields) + } + } + return nil +} + +// Setup default values for dpdk +func (dpdk *dpdk) setupDefaultValues() { + if dpdk.SocketPath == "" { + dpdk.SocketPath = defaultPathToSocket + } + + if dpdk.DeviceTypes == nil { + dpdk.DeviceTypes = []string{"ethdev"} + } + + if dpdk.MetadataFields == nil { + dpdk.MetadataFields = []string{dpdkMetadataFieldPidName, dpdkMetadataFieldVersionName} + } + + if dpdk.PluginOptions == nil { + dpdk.PluginOptions = []string{dpdkPluginOptionInMemory} + } + + if len(dpdk.UnreachableSocketBehavior) == 0 { + dpdk.UnreachableSocketBehavior = unreachableSocketBehaviorError + } + + dpdk.rawdevCommands = []string{"/rawdev/xstats"} + dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats", "/ethdev/info", ethdevLinkStatusCommand} +} + +func (dpdk *dpdk) getDpdkInMemorySocketPaths() []string { + filePaths := dpdk.socketGlobPath.Match() + + var results []string + for _, filePath := range filePaths { + fileInfo, err := os.Stat(filePath) + if err != nil || fileInfo.IsDir() || !strings.Contains(filePath, dpdkSocketTemplateName) { + continue + } + + if isInMemorySocketPath(filePath, dpdk.SocketPath) { + results = append(results, filePath) + } + } + + return results +} + +// Checks that user-supplied commands are unique and match DPDK commands format +func (dpdk *dpdk) validateAdditionalCommands() error { + dpdk.AdditionalCommands = uniqueValues(dpdk.AdditionalCommands) + + for _, cmd := range dpdk.AdditionalCommands { + if len(cmd) == 0 { + return errors.New("got empty command") + } + + if cmd[0] != '/' { + return fmt.Errorf("%q command should start with slash", cmd) + } + + if commandWithoutParams := stripParams(cmd); len(commandWithoutParams) >= maxCommandLength { + return fmt.Errorf("%q command is too long. It shall be less than %v characters", commandWithoutParams, maxCommandLength) + } + + if len(cmd) >= maxCommandLengthWithParams { + return fmt.Errorf("command with parameters %q shall be less than %v characters", cmd, maxCommandLengthWithParams) + } + } + + return nil +} + +// Establishes connections do DPDK telemetry sockets +func (dpdk *dpdk) maintainConnections() error { + candidates := []string{dpdk.SocketPath} + if choice.Contains(dpdkPluginOptionInMemory, dpdk.PluginOptions) { + candidates = dpdk.getDpdkInMemorySocketPaths() + } + + // Find sockets in the connected-sockets list that are not among + // the candidates anymore and thus need to be removed. + for i := 0; i < len(dpdk.connectors); i++ { + connector := dpdk.connectors[i] + if !choice.Contains(connector.pathToSocket, candidates) { + dpdk.Log.Debugf("Close unused connection: %s", connector.pathToSocket) + if closeErr := connector.tryClose(); closeErr != nil { + dpdk.Log.Warnf("Failed to close unused connection: %v", closeErr) + } + dpdk.connectors = append(dpdk.connectors[:i], dpdk.connectors[i+1:]...) + i-- + } + } + + // Find candidates that are not yet in the connected-sockets list as we + // need to connect to those. + for _, candidate := range candidates { + var found bool + for _, connector := range dpdk.connectors { + if candidate == connector.pathToSocket { + found = true + break + } + } + if !found { + connector := newDpdkConnector(candidate, dpdk.AccessTimeout) + connectionInitMessage, err := connector.connect() + if err != nil { + if dpdk.UnreachableSocketBehavior == unreachableSocketBehaviorError { + return fmt.Errorf("couldn't connect to socket %s: %w", candidate, err) + } + dpdk.Log.Warnf("Couldn't connect to socket %s: %v", candidate, err) + continue + } + + dpdk.Log.Debugf("Successfully connected to the socket: %s. Version: %v running as process with PID %v with len %v", + candidate, connectionInitMessage.Version, connectionInitMessage.Pid, connectionInitMessage.MaxOutputLen) + dpdk.connectors = append(dpdk.connectors, connector) + } + } + + if len(dpdk.connectors) == 0 { + errMsg := "no active sockets connections present" + if dpdk.UnreachableSocketBehavior == unreachableSocketBehaviorError { + return errors.New(errMsg) + } + dpdk.Log.Warnf("Unreachable socket issue occurred: %v", errMsg) + } + + return nil +} + +// Gathers all unique commands +func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator, dpdkConnector *dpdkConnector) []string { + var commands []string + if choice.Contains("ethdev", dpdk.DeviceTypes) { + ethdevCommands := removeSubset(dpdk.ethdevCommands, dpdk.ethdevExcludedCommandsFilter) + ethdevCommands, err := dpdkConnector.appendCommandsWithParamsFromList(ethdevListCommand, ethdevCommands) + if err != nil { + acc.AddError(fmt.Errorf("error occurred during fetching of %q params: %w", ethdevListCommand, err)) + } + commands = append(commands, ethdevCommands...) + } + + if choice.Contains("rawdev", dpdk.DeviceTypes) { + rawdevCommands, err := dpdkConnector.appendCommandsWithParamsFromList(rawdevListCommand, dpdk.rawdevCommands) + if err != nil { + acc.AddError(fmt.Errorf("error occurred during fetching of %q params: %w", rawdevListCommand, err)) + } + commands = append(commands, rawdevCommands...) + } + + commands = append(commands, dpdk.AdditionalCommands...) + return uniqueValues(commands) +} + func init() { inputs.Add(pluginName, func() telegraf.Input { dpdk := &dpdk{ @@ -58,174 +292,3 @@ func init() { return dpdk }) } - -func (*dpdk) SampleConfig() string { - return sampleConfig -} - -// Performs validation of all parameters from configuration -func (dpdk *dpdk) Init() error { - if dpdk.SocketPath == "" { - dpdk.SocketPath = defaultPathToSocket - dpdk.Log.Debugf("using default '%v' path for socket_path", defaultPathToSocket) - } - - if dpdk.DeviceTypes == nil { - dpdk.DeviceTypes = []string{"ethdev"} - } - - var err error - if err := isSocket(dpdk.SocketPath); err != nil { - return err - } - - dpdk.rawdevCommands = []string{"/rawdev/xstats"} - dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats", "/ethdev/link_status"} - - if err := dpdk.validateCommands(); err != nil { - return err - } - - if dpdk.AccessTimeout < 0 { - return fmt.Errorf("socket_access_timeout should be positive number or equal to 0 (to disable timeouts)") - } - - if len(dpdk.AdditionalCommands) == 0 && len(dpdk.DeviceTypes) == 0 { - return fmt.Errorf("plugin was configured with nothing to read") - } - - dpdk.ethdevExcludedCommandsFilter, err = filter.Compile(dpdk.EthdevConfig.EthdevExcludeCommands) - if err != nil { - return fmt.Errorf("error occurred during filter prepation for ethdev excluded commands: %w", err) - } - - dpdk.connector = newDpdkConnector(dpdk.SocketPath, dpdk.AccessTimeout) - initMessage, err := dpdk.connector.connect() - if initMessage != nil { - dpdk.Log.Debugf("Successfully connected to %v running as process with PID %v with len %v", - initMessage.Version, initMessage.Pid, initMessage.MaxOutputLen) - } - return err -} - -// Checks that user-supplied commands are unique and match DPDK commands format -func (dpdk *dpdk) validateCommands() error { - dpdk.AdditionalCommands = uniqueValues(dpdk.AdditionalCommands) - - for _, commandWithParams := range dpdk.AdditionalCommands { - if len(commandWithParams) == 0 { - return fmt.Errorf("got empty command") - } - - if commandWithParams[0] != '/' { - return fmt.Errorf("%q command should start with '/'", commandWithParams) - } - - if commandWithoutParams := stripParams(commandWithParams); len(commandWithoutParams) >= maxCommandLength { - return fmt.Errorf("%q command is too long. It shall be less than %v characters", commandWithoutParams, maxCommandLength) - } - - if len(commandWithParams) >= maxCommandLengthWithParams { - return fmt.Errorf("command with parameters %q shall be less than %v characters", commandWithParams, maxCommandLengthWithParams) - } - } - - return nil -} - -// Gathers all unique commands and processes each command sequentially -// Parallel processing could be achieved by running several instances of this plugin with different settings -func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error { - // This needs to be done during every `Gather(...)`, because DPDK can be restarted between consecutive - // `Gather(...)` cycles which can cause that it will be exposing different set of metrics. - commands := dpdk.gatherCommands(acc) - - for _, command := range commands { - dpdk.processCommand(acc, command) - } - - return nil -} - -// Gathers all unique commands -func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator) []string { - var commands []string - if choice.Contains("ethdev", dpdk.DeviceTypes) { - ethdevCommands := removeSubset(dpdk.ethdevCommands, dpdk.ethdevExcludedCommandsFilter) - ethdevCommands, err := dpdk.appendCommandsWithParamsFromList(ethdevListCommand, ethdevCommands) - if err != nil { - acc.AddError(fmt.Errorf("error occurred during fetching of %q params: %w", ethdevListCommand, err)) - } - - commands = append(commands, ethdevCommands...) - } - - if choice.Contains("rawdev", dpdk.DeviceTypes) { - rawdevCommands, err := dpdk.appendCommandsWithParamsFromList(rawdevListCommand, dpdk.rawdevCommands) - if err != nil { - acc.AddError(fmt.Errorf("error occurred during fetching of %q params: %w", rawdevListCommand, err)) - } - - commands = append(commands, rawdevCommands...) - } - - commands = append(commands, dpdk.AdditionalCommands...) - return uniqueValues(commands) -} - -// Fetches all identifiers of devices and then creates all possible combinations of commands for each device -func (dpdk *dpdk) appendCommandsWithParamsFromList(listCommand string, commands []string) ([]string, error) { - response, err := dpdk.connector.getCommandResponse(listCommand) - if err != nil { - return nil, err - } - - params, err := jsonToArray(response, listCommand) - if err != nil { - return nil, err - } - - result := make([]string, 0, len(commands)*len(params)) - for _, command := range commands { - for _, param := range params { - result = append(result, commandWithParams(command, param)) - } - } - - return result, nil -} - -// Executes command, parses response and creates/writes metric from response -func (dpdk *dpdk) processCommand(acc telegraf.Accumulator, commandWithParams string) { - buf, err := dpdk.connector.getCommandResponse(commandWithParams) - if err != nil { - acc.AddError(err) - return - } - - var parsedResponse map[string]interface{} - err = json.Unmarshal(buf, &parsedResponse) - if err != nil { - acc.AddError(fmt.Errorf("failed to unmarshall json response from %q command: %w", commandWithParams, err)) - return - } - - command := stripParams(commandWithParams) - value := parsedResponse[command] - if isEmpty(value) { - acc.AddError(fmt.Errorf("got empty json on %q command", commandWithParams)) - return - } - - jf := jsonparser.JSONFlattener{} - err = jf.FullFlattenJSON("", value, true, true) - if err != nil { - acc.AddError(fmt.Errorf("failed to flatten response: %w", err)) - return - } - - acc.AddFields(pluginName, jf.Fields, map[string]string{ - "command": command, - "params": getParams(commandWithParams), - }) -} diff --git a/plugins/inputs/dpdk/dpdk_cmds.go b/plugins/inputs/dpdk/dpdk_cmds.go new file mode 100644 index 000000000..857db89d8 --- /dev/null +++ b/plugins/inputs/dpdk/dpdk_cmds.go @@ -0,0 +1,55 @@ +//go:build linux + +package dpdk + +import ( + "fmt" + "strings" +) + +type linkStatus int64 + +const ( + DOWN linkStatus = iota + UP +) + +const ( + ethdevLinkStatusCommand = "/ethdev/link_status" + linkStatusStringFieldName = "status" + linkStatusIntegerFieldName = "link_status" +) + +var ( + linkStatusMap = map[string]linkStatus{ + "down": DOWN, + "up": UP, + } +) + +func processCommandResponse(command string, data map[string]interface{}) error { + if command == ethdevLinkStatusCommand { + return processLinkStatusCmd(data) + } + return nil +} + +func processLinkStatusCmd(data map[string]interface{}) error { + status, ok := data[linkStatusStringFieldName].(string) + if !ok { + return fmt.Errorf("can't find or parse %q field", linkStatusStringFieldName) + } + + parsedLinkStatus, ok := parseLinkStatus(status) + if !ok { + return fmt.Errorf("can't parse linkStatus: unknown value: %q", status) + } + + data[linkStatusIntegerFieldName] = int64(parsedLinkStatus) + return nil +} + +func parseLinkStatus(s string) (linkStatus, bool) { + value, ok := linkStatusMap[strings.ToLower(s)] + return value, ok +} diff --git a/plugins/inputs/dpdk/dpdk_cmds_test.go b/plugins/inputs/dpdk/dpdk_cmds_test.go new file mode 100644 index 000000000..7b0ae4821 --- /dev/null +++ b/plugins/inputs/dpdk/dpdk_cmds_test.go @@ -0,0 +1,131 @@ +//go:build linux + +package dpdk + +import ( + "fmt" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" +) + +func Test_LinkStatusCommand(t *testing.T) { + t.Run("when 'status' field is DOWN then return 'link_status'=0", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := fmt.Sprintf(`{%q:{%q: "DOWN"}}`, ethdevLinkStatusCommand, linkStatusStringFieldName) + simulateResponse(mockConn, response, nil) + dpdkConn := dpdk.connectors[0] + dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": ethdevLinkStatusCommand, + "params": "1", + }, + map[string]interface{}{ + linkStatusStringFieldName: "DOWN", + linkStatusIntegerFieldName: int64(0), + }, + time.Unix(0, 0), + ), + } + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) + + t.Run("when 'status' field is UP then return 'link_status'=1", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := fmt.Sprintf(`{%q:{%q: "UP"}}`, ethdevLinkStatusCommand, linkStatusStringFieldName) + simulateResponse(mockConn, response, nil) + dpdkConn := dpdk.connectors[0] + dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": ethdevLinkStatusCommand, + "params": "1", + }, + map[string]interface{}{ + linkStatusStringFieldName: "UP", + linkStatusIntegerFieldName: int64(1), + }, + time.Unix(0, 0), + ), + } + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) + + t.Run("when link status output doesn't have any fields then don't return 'link_status' field", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := fmt.Sprintf(`{%q:{}}`, ethdevLinkStatusCommand) + simulateResponse(mockConn, response, nil) + dpdkConn := dpdk.connectors[0] + dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil) + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, nil, actual, testutil.IgnoreTime()) + }) + + t.Run("when link status output doesn't have status field then don't return 'link_status' field", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := fmt.Sprintf(`{%q:{"tag1": 1}}`, ethdevLinkStatusCommand) + simulateResponse(mockConn, response, nil) + dpdkConn := dpdk.connectors[0] + dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil) + expected := []telegraf.Metric{ + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": ethdevLinkStatusCommand, + "params": "1", + }, + map[string]interface{}{ + "tag1": float64(1), + }, + time.Unix(0, 0), + ), + } + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) + + t.Run("when link status output is invalid then don't return 'link_status' field", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := fmt.Sprintf(`{%q:{%q: "BOB"}}`, ethdevLinkStatusCommand, linkStatusStringFieldName) + simulateResponse(mockConn, response, nil) + dpdkConn := dpdk.connectors[0] + dpdkConn.processCommand(mockAcc, testutil.Logger{}, fmt.Sprintf("%s,1", ethdevLinkStatusCommand), nil) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": ethdevLinkStatusCommand, + "params": "1", + }, + map[string]interface{}{ + linkStatusStringFieldName: "BOB", + }, + time.Unix(0, 0), + ), + } + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) +} diff --git a/plugins/inputs/dpdk/dpdk_connector.go b/plugins/inputs/dpdk/dpdk_connector.go index 40e9a0091..b87c67eda 100644 --- a/plugins/inputs/dpdk/dpdk_connector.go +++ b/plugins/inputs/dpdk/dpdk_connector.go @@ -4,14 +4,20 @@ package dpdk import ( "encoding/json" + "errors" "fmt" "net" "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" ) -const maxInitMessageLength = 1024 +const ( + maxInitMessageLength = 1024 // based on https://github.com/DPDK/dpdk/blob/v22.07/lib/telemetry/telemetry.c#L352 + dpdkSocketTemplateName = "dpdk_telemetry" +) type initMessage struct { Version string `json:"version"` @@ -21,16 +27,14 @@ type initMessage struct { type dpdkConnector struct { pathToSocket string - maxOutputLen uint32 - messageShowed bool accessTimeout time.Duration connection net.Conn + initMessage *initMessage } func newDpdkConnector(pathToSocket string, accessTimeout config.Duration) *dpdkConnector { return &dpdkConnector{ pathToSocket: pathToSocket, - messageShowed: false, accessTimeout: time.Duration(accessTimeout), } } @@ -38,26 +42,67 @@ func newDpdkConnector(pathToSocket string, accessTimeout config.Duration) *dpdkC // Connects to the socket // Since DPDK is a local unix socket, it is instantly returns error or connection, so there's no need to set timeout for it func (conn *dpdkConnector) connect() (*initMessage, error) { + if err := isSocket(conn.pathToSocket); err != nil { + return nil, err + } + connection, err := net.Dial("unixpacket", conn.pathToSocket) if err != nil { return nil, fmt.Errorf("failed to connect to the socket: %w", err) } - conn.connection = connection - result, err := conn.readMaxOutputLen() + + conn.initMessage, err = conn.readInitMessage() if err != nil { if closeErr := conn.tryClose(); closeErr != nil { return nil, fmt.Errorf("%w and failed to close connection: %w", err, closeErr) } return nil, err } + return conn.initMessage, nil +} + +// Add metadata fields to data +func (conn *dpdkConnector) addMetadataFields(metadataFields []string, data map[string]interface{}) { + if conn.initMessage == nil { + return + } + + for _, field := range metadataFields { + switch field { + case dpdkMetadataFieldPidName: + data[dpdkMetadataFieldPidName] = conn.initMessage.Pid + case dpdkMetadataFieldVersionName: + data[dpdkMetadataFieldVersionName] = conn.initMessage.Version + } + } +} + +// Fetches all identifiers of devices and then creates all possible combinations of commands for each device +func (conn *dpdkConnector) appendCommandsWithParamsFromList(listCommand string, commands []string) ([]string, error) { + response, err := conn.getCommandResponse(listCommand) + if err != nil { + return nil, err + } + + params, err := jsonToArray(response, listCommand) + if err != nil { + return nil, err + } + + result := make([]string, 0, len(commands)*len(params)) + for _, command := range commands { + for _, param := range params { + result = append(result, commandWithParams(command, param)) + } + } return result, nil } // Executes command using provided connection and returns response // If error (such as timeout) occurred, then connection is discarded and recreated -// because otherwise behaviour of connection is undefined (e.g. it could return result of timed out command instead of latest) +// because otherwise behavior of connection is undefined (e.g. it could return result of timed out command instead of latest) func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error) { connection, err := conn.getConnection() if err != nil { @@ -77,7 +122,7 @@ func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error return nil, fmt.Errorf("failed to send %q command: %w", fullCommand, err) } - buf := make([]byte, conn.maxOutputLen) + buf := make([]byte, conn.initMessage.MaxOutputLen) messageLength, err := connection.Read(buf) if err != nil { if closeErr := conn.tryClose(); closeErr != nil { @@ -92,6 +137,50 @@ func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error return buf[:messageLength], nil } +// Executes command, parses response and creates/writes metrics from response to accumulator +func (conn *dpdkConnector) processCommand(acc telegraf.Accumulator, log telegraf.Logger, commandWithParams string, metadataFields []string) { + buf, err := conn.getCommandResponse(commandWithParams) + if err != nil { + acc.AddError(err) + return + } + + var parsedResponse map[string]interface{} + err = json.Unmarshal(buf, &parsedResponse) + if err != nil { + acc.AddError(fmt.Errorf("failed to unmarshal json response from %q command: %w", commandWithParams, err)) + return + } + + command := stripParams(commandWithParams) + value := parsedResponse[command] + if isEmpty(value) { + log.Warnf("got empty json on %q command", commandWithParams) + return + } + + jf := jsonparser.JSONFlattener{} + err = jf.FullFlattenJSON("", value, true, true) + if err != nil { + acc.AddError(fmt.Errorf("failed to flatten response: %w", err)) + return + } + + err = processCommandResponse(command, jf.Fields) + if err != nil { + log.Warnf("Failed to process a response of the command: %s. Error: %v. Continue to handle data", command, err) + } + + // Add metadata fields if required + conn.addMetadataFields(metadataFields, jf.Fields) + + // Add common fields + acc.AddFields(pluginName, jf.Fields, map[string]string{ + "command": command, + "params": getParams(commandWithParams), + }) +} + func (conn *dpdkConnector) tryClose() error { if conn.connection == nil { return nil @@ -107,7 +196,7 @@ func (conn *dpdkConnector) tryClose() error { func (conn *dpdkConnector) setTimeout() error { if conn.connection == nil { - return fmt.Errorf("connection had not been established before") + return errors.New("connection had not been established before") } if conn.accessTimeout == 0 { @@ -128,7 +217,7 @@ func (conn *dpdkConnector) getConnection() (net.Conn, error) { } // Reads InitMessage for connection. Should be read for each connection, otherwise InitMessage is returned as response for first command. -func (conn *dpdkConnector) readMaxOutputLen() (*initMessage, error) { +func (conn *dpdkConnector) readInitMessage() (*initMessage, error) { buf := make([]byte, maxInitMessageLength) err := conn.setTimeout() if err != nil { @@ -140,21 +229,15 @@ func (conn *dpdkConnector) readMaxOutputLen() (*initMessage, error) { return nil, fmt.Errorf("failed to read InitMessage: %w", err) } - var initMessage initMessage - err = json.Unmarshal(buf[:messageLength], &initMessage) + var connectionInitMessage initMessage + err = json.Unmarshal(buf[:messageLength], &connectionInitMessage) if err != nil { return nil, fmt.Errorf("failed to unmarshal response: %w", err) } - if initMessage.MaxOutputLen == 0 { - return nil, fmt.Errorf("failed to read maxOutputLen information") + if connectionInitMessage.MaxOutputLen == 0 { + return nil, errors.New("failed to read maxOutputLen information") } - if !conn.messageShowed { - conn.maxOutputLen = initMessage.MaxOutputLen - conn.messageShowed = true - return &initMessage, nil - } - - return nil, nil + return &connectionInitMessage, nil } diff --git a/plugins/inputs/dpdk/dpdk_connector_test.go b/plugins/inputs/dpdk/dpdk_connector_test.go index 19841e034..3819f1398 100644 --- a/plugins/inputs/dpdk/dpdk_connector_test.go +++ b/plugins/inputs/dpdk/dpdk_connector_test.go @@ -4,26 +4,28 @@ package dpdk import ( "encoding/json" - "fmt" + "errors" "testing" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/inputs/dpdk/mocks" + "github.com/influxdata/telegraf/testutil" ) func Test_readMaxOutputLen(t *testing.T) { t.Run("should return error if timeout occurred", func(t *testing.T) { conn := &mocks.Conn{} - conn.On("Read", mock.Anything).Return(0, fmt.Errorf("timeout")) + conn.On("Read", mock.Anything).Return(0, errors.New("timeout")) conn.On("SetDeadline", mock.Anything).Return(nil) connector := dpdkConnector{connection: conn} - _, err := connector.readMaxOutputLen() + initMessage, err := connector.readInitMessage() require.Error(t, err) require.Contains(t, err.Error(), "timeout") + require.Empty(t, initMessage) }) t.Run("should pass and set maxOutputLen if provided with valid InitMessage", func(t *testing.T) { @@ -43,10 +45,10 @@ func Test_readMaxOutputLen(t *testing.T) { conn.On("SetDeadline", mock.Anything).Return(nil) connector := dpdkConnector{connection: conn} - _, err = connector.readMaxOutputLen() + initMsg, err := connector.readInitMessage() require.NoError(t, err) - require.Equal(t, maxOutputLen, connector.maxOutputLen) + require.Equal(t, maxOutputLen, initMsg.MaxOutputLen) }) t.Run("should fail if received invalid json", func(t *testing.T) { @@ -59,7 +61,7 @@ func Test_readMaxOutputLen(t *testing.T) { conn.On("SetDeadline", mock.Anything).Return(nil) connector := dpdkConnector{connection: conn} - _, err := connector.readMaxOutputLen() + _, err := connector.readInitMessage() require.Error(t, err) require.Contains(t, err.Error(), "looking for beginning of object key string") @@ -80,7 +82,7 @@ func Test_readMaxOutputLen(t *testing.T) { conn.On("SetDeadline", mock.Anything).Return(nil) connector := dpdkConnector{connection: conn} - _, err = connector.readMaxOutputLen() + _, err = connector.readInitMessage() require.Error(t, err) require.Contains(t, err.Error(), "failed to read maxOutputLen information") @@ -89,15 +91,15 @@ func Test_readMaxOutputLen(t *testing.T) { func Test_connect(t *testing.T) { t.Run("should pass if PathToSocket points to socket", func(t *testing.T) { - pathToSocket, socket := createSocketForTest(t) + pathToSocket, socket := createSocketForTest(t, "") defer socket.Close() dpdk := dpdk{ SocketPath: pathToSocket, - connector: newDpdkConnector(pathToSocket, 0), + connectors: []*dpdkConnector{newDpdkConnector(pathToSocket, 0)}, } go simulateSocketResponse(socket, t) - _, err := dpdk.connector.connect() + _, err := dpdk.connectors[0].connect() require.NoError(t, err) }) @@ -112,18 +114,20 @@ func Test_getCommandResponse(t *testing.T) { defer mockConn.AssertExpectations(t) simulateResponse(mockConn, response, nil) - buf, err := dpdk.connector.getCommandResponse(command) + for _, connector := range dpdk.connectors { + buf, err := connector.getCommandResponse(command) - require.NoError(t, err) - require.Equal(t, len(response), len(buf)) - require.Equal(t, response, string(buf)) + require.NoError(t, err) + require.Equal(t, len(response), len(buf)) + require.Equal(t, response, string(buf)) + } }) t.Run("should return error if failed to get connection handler", func(t *testing.T) { _, dpdk, _ := prepareEnvironment() - dpdk.connector.connection = nil + dpdk.connectors[0].connection = nil - buf, err := dpdk.connector.getCommandResponse(command) + buf, err := dpdk.connectors[0].getCommandResponse(command) require.Error(t, err) require.Contains(t, err.Error(), "failed to get connection to execute \"/\" command") @@ -133,9 +137,9 @@ func Test_getCommandResponse(t *testing.T) { t.Run("should return error if failed to set timeout duration", func(t *testing.T) { mockConn, dpdk, _ := prepareEnvironment() defer mockConn.AssertExpectations(t) - mockConn.On("SetDeadline", mock.Anything).Return(fmt.Errorf("deadline error")) + mockConn.On("SetDeadline", mock.Anything).Return(errors.New("deadline error")) - buf, err := dpdk.connector.getCommandResponse(command) + buf, err := dpdk.connectors[0].getCommandResponse(command) require.Error(t, err) require.Contains(t, err.Error(), "deadline error") @@ -145,11 +149,11 @@ func Test_getCommandResponse(t *testing.T) { t.Run("should return error if timeout occurred during Write operation", func(t *testing.T) { mockConn, dpdk, _ := prepareEnvironment() defer mockConn.AssertExpectations(t) - mockConn.On("Write", mock.Anything).Return(0, fmt.Errorf("write timeout")) + mockConn.On("Write", mock.Anything).Return(0, errors.New("write timeout")) mockConn.On("SetDeadline", mock.Anything).Return(nil) mockConn.On("Close").Return(nil) - buf, err := dpdk.connector.getCommandResponse(command) + buf, err := dpdk.connectors[0].getCommandResponse(command) require.Error(t, err) require.Contains(t, err.Error(), "write timeout") @@ -159,9 +163,9 @@ func Test_getCommandResponse(t *testing.T) { t.Run("should return error if timeout occurred during Read operation", func(t *testing.T) { mockConn, dpdk, _ := prepareEnvironment() defer mockConn.AssertExpectations(t) - simulateResponse(mockConn, "", fmt.Errorf("read timeout")) + simulateResponse(mockConn, "", errors.New("read timeout")) - buf, err := dpdk.connector.getCommandResponse(command) + buf, err := dpdk.connectors[0].getCommandResponse(command) require.Error(t, err) require.Contains(t, err.Error(), "read timeout") @@ -173,10 +177,65 @@ func Test_getCommandResponse(t *testing.T) { defer mockConn.AssertExpectations(t) simulateResponse(mockConn, "", nil) - buf, err := dpdk.connector.getCommandResponse(command) + buf, err := dpdk.connectors[0].getCommandResponse(command) require.Error(t, err) require.Empty(t, buf) require.Contains(t, err.Error(), "got empty response during execution of") }) } + +func Test_processCommand(t *testing.T) { + t.Run("should pass if received valid response", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := `{"/": ["/", "/eal/app_params", "/eal/params", "/ethdev/link_status, /ethdev/info"]}` + simulateResponse(mockConn, response, nil) + + for _, dpdkConn := range dpdk.connectors { + dpdkConn.processCommand(mockAcc, testutil.Logger{}, "/", nil) + } + + require.Empty(t, mockAcc.Errors) + }) + + t.Run("if received a non-JSON object then should return error", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := `notAJson` + simulateResponse(mockConn, response, nil) + + for _, dpdkConn := range dpdk.connectors { + dpdkConn.processCommand(mockAcc, testutil.Logger{}, "/", nil) + } + + require.Len(t, mockAcc.Errors, 1) + require.Contains(t, mockAcc.Errors[0].Error(), "invalid character") + }) + + t.Run("if failed to get command response then accumulator should contain error", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + mockConn.On("Write", mock.Anything).Return(0, errors.New("deadline exceeded")) + mockConn.On("SetDeadline", mock.Anything).Return(nil) + mockConn.On("Close").Return(nil) + for _, dpdkConn := range dpdk.connectors { + dpdkConn.processCommand(mockAcc, testutil.Logger{}, "/", nil) + } + + require.Len(t, mockAcc.Errors, 1) + require.Contains(t, mockAcc.Errors[0].Error(), "deadline exceeded") + }) + + t.Run("if response contains nil or empty value then error shouldn't be returned in accumulator", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := `{"/test": null}` + simulateResponse(mockConn, response, nil) + for _, dpdkConn := range dpdk.connectors { + dpdkConn.processCommand(mockAcc, testutil.Logger{}, "/test,param", nil) + } + + require.Empty(t, mockAcc.Errors) + }) +} diff --git a/plugins/inputs/dpdk/dpdk_test.go b/plugins/inputs/dpdk/dpdk_test.go index 8df467315..e2c94e5cd 100644 --- a/plugins/inputs/dpdk/dpdk_test.go +++ b/plugins/inputs/dpdk/dpdk_test.go @@ -5,7 +5,10 @@ package dpdk import ( "encoding/json" "fmt" + "math/rand" "net" + "os" + "path/filepath" "strings" "testing" "time" @@ -15,14 +18,18 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs/dpdk/mocks" "github.com/influxdata/telegraf/testutil" ) func Test_Init(t *testing.T) { t.Run("when SocketPath field isn't set then it should be set to default value", func(t *testing.T) { - _, dpdk, _ := prepareEnvironment() - dpdk.SocketPath = "" + dpdk := dpdk{ + Log: testutil.Logger{}, + SocketPath: "", + } + require.Equal(t, "", dpdk.SocketPath) _ = dpdk.Init() @@ -30,10 +37,31 @@ func Test_Init(t *testing.T) { require.Equal(t, defaultPathToSocket, dpdk.SocketPath) }) + t.Run("when Metadata Fields isn't set then it should be set to default value (dpdk_pid)", func(t *testing.T) { + dpdk := dpdk{ + Log: testutil.Logger{}, + } + require.Nil(t, dpdk.MetadataFields) + + _ = dpdk.Init() + require.Equal(t, []string{dpdkMetadataFieldPidName, dpdkMetadataFieldVersionName}, dpdk.MetadataFields) + }) + + t.Run("when PluginOptions field isn't set then it should be set to default value (in_memory)", func(t *testing.T) { + dpdk := dpdk{ + Log: testutil.Logger{}, + } + require.Nil(t, dpdk.PluginOptions) + + _ = dpdk.Init() + require.Equal(t, []string{dpdkPluginOptionInMemory}, dpdk.PluginOptions) + }) + t.Run("when commands are in invalid format (doesn't start with '/') then error should be returned", func(t *testing.T) { - pathToSocket, socket := createSocketForTest(t) + pathToSocket, socket := createSocketForTest(t, "") defer socket.Close() dpdk := dpdk{ + Log: testutil.Logger{}, SocketPath: pathToSocket, AdditionalCommands: []string{"invalid"}, } @@ -41,26 +69,22 @@ func Test_Init(t *testing.T) { err := dpdk.Init() require.Error(t, err) - require.Contains(t, err.Error(), "command should start with '/'") + require.Contains(t, err.Error(), "command should start with slash") }) - t.Run("when all values are valid, then no error should be returned", func(t *testing.T) { - pathToSocket, socket := createSocketForTest(t) - defer socket.Close() + t.Run("when AccessTime is < 0 then error should be returned", func(t *testing.T) { dpdk := dpdk{ - SocketPath: pathToSocket, - DeviceTypes: []string{"ethdev"}, - Log: testutil.Logger{}, + Log: testutil.Logger{}, + AccessTimeout: -1, } - go simulateSocketResponse(socket, t) - err := dpdk.Init() - require.NoError(t, err) + require.Error(t, err) + require.Contains(t, err.Error(), "socket_access_timeout should be positive number") }) t.Run("when device_types and additional_commands are empty, then error should be returned", func(t *testing.T) { - pathToSocket, socket := createSocketForTest(t) + pathToSocket, socket := createSocketForTest(t, "") defer socket.Close() dpdk := dpdk{ SocketPath: pathToSocket, @@ -74,15 +98,241 @@ func Test_Init(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "plugin was configured with nothing to read") }) + + t.Run("when UnreachableSocketBehavior specified with unknown value - err should be returned", func(t *testing.T) { + dpdk := dpdk{ + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + UnreachableSocketBehavior: "whatisthat", + } + err := dpdk.Init() + + require.Error(t, err) + require.Contains(t, err.Error(), "unreachable_socket_behavior") + }) } -func Test_validateCommands(t *testing.T) { +func Test_Start(t *testing.T) { + t.Run("when socket doesn't exist err should be returned", func(t *testing.T) { + dpdk := dpdk{ + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + } + err := dpdk.Init() + require.NoError(t, err) + + err = dpdk.Start(nil) + require.Error(t, err) + require.Contains(t, err.Error(), "no active sockets connections present") + }) + + t.Run("when socket doesn't exist, but UnreachableSocketBehavior is Ignore err shouldn't be returned", func(t *testing.T) { + dpdk := dpdk{ + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + UnreachableSocketBehavior: unreachableSocketBehaviorIgnore, + } + err := dpdk.Init() + require.NoError(t, err) + + err = dpdk.Start(nil) + require.NoError(t, err) + }) + + t.Run("when all values are valid, then no error should be returned", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t, "") + defer socket.Close() + dpdk := dpdk{ + SocketPath: pathToSocket, + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + } + err := dpdk.Init() + require.NoError(t, err) + + go simulateSocketResponse(socket, t) + + err = dpdk.Start(nil) + require.NoError(t, err) + }) +} + +func TestMaintainConnections(t *testing.T) { + t.Run("maintainConnections should return the error if socket doesn't exist", func(t *testing.T) { + dpdk := dpdk{ + SocketPath: "/tmp/justrandompath", + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + UnreachableSocketBehavior: unreachableSocketBehaviorError, + } + + require.Empty(t, dpdk.connectors) + err := dpdk.maintainConnections() + defer dpdk.Stop() + + require.Error(t, err) + require.Contains(t, err.Error(), "couldn't connect to socket") + }) + + t.Run("maintainConnections should return the error if socket not found with dpdkPluginOptionInMemory", func(t *testing.T) { + dpdk := dpdk{ + SocketPath: defaultPathToSocket, + Log: testutil.Logger{}, + PluginOptions: []string{dpdkPluginOptionInMemory}, + UnreachableSocketBehavior: unreachableSocketBehaviorError, + } + var err error + dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath) + require.NoError(t, err) + + require.Empty(t, dpdk.connectors) + err = dpdk.maintainConnections() + require.Error(t, err) + require.Contains(t, err.Error(), "no active sockets connections present") + }) + + t.Run("maintainConnections shouldn't return error with 1 socket", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t, "") + defer socket.Close() + dpdk := dpdk{ + SocketPath: pathToSocket, + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + } + + go simulateSocketResponse(socket, t) + + require.Empty(t, dpdk.connectors) + err := dpdk.maintainConnections() + defer dpdk.Stop() + + require.NoError(t, err) + require.Len(t, dpdk.connectors, 1) + }) + + t.Run("maintainConnections shouldn't return error with multiple sockets", func(t *testing.T) { + numSockets := rand.Intn(5) + 1 + + pathToSockets, sockets := createMultipleSocketsForTest(t, numSockets, "") + defer func() { + for _, socket := range sockets { + socket.Close() + } + }() + + dpdk := dpdk{ + SocketPath: pathToSockets[0], + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + PluginOptions: []string{dpdkPluginOptionInMemory}, + } + var err error + dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath) + require.NoError(t, err) + + for _, socket := range sockets { + go simulateSocketResponse(socket, t) + } + + require.Empty(t, dpdk.connectors) + err = dpdk.maintainConnections() + defer dpdk.Stop() + + require.NoError(t, err) + require.Len(t, dpdk.connectors, numSockets) + }) + + t.Run("Test maintainConnections without dpdkPluginOptionInMemory option", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t, "") + defer socket.Close() + dpdk := dpdk{ + SocketPath: pathToSocket, + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + } + + go simulateSocketResponse(socket, t) + + require.Empty(t, dpdk.connectors) + err := dpdk.maintainConnections() + require.NoError(t, err) + require.Len(t, dpdk.connectors, 1) + + dpdk.Stop() + require.Empty(t, dpdk.connectors) + }) + + t.Run("Test maintainConnections with dpdkPluginOptionInMemory option", func(t *testing.T) { + pathToSocket1, socket1 := createSocketForTest(t, "") + defer socket1.Close() + go simulateSocketResponse(socket1, t) + dpdk := dpdk{ + SocketPath: pathToSocket1, + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + PluginOptions: []string{dpdkPluginOptionInMemory}, + } + var err error + dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath) + require.NoError(t, err) + + require.Empty(t, dpdk.connectors) + err = dpdk.maintainConnections() + require.NoError(t, err) + require.Len(t, dpdk.connectors, 1) + + // Adding 2 sockets more + pathToSocket2, socket2 := createSocketForTest(t, filepath.Dir(pathToSocket1)) + pathToSocket3, socket3 := createSocketForTest(t, filepath.Dir(pathToSocket1)) + require.NotEqual(t, pathToSocket2, pathToSocket3) + go simulateSocketResponse(socket2, t) + go simulateSocketResponse(socket3, t) + err = dpdk.maintainConnections() + require.NoError(t, err) + require.Len(t, dpdk.connectors, 3) + + // Close 2 new sockets + socket2.Close() + socket3.Close() + err = dpdk.maintainConnections() + require.NoError(t, err) + require.Len(t, dpdk.connectors, 1) + require.Equal(t, pathToSocket1, dpdk.connectors[0].pathToSocket) + + dpdk.Stop() + require.Empty(t, dpdk.connectors) + }) +} + +func TestClose(t *testing.T) { + t.Run("Num of connections should be 0 after Stop func", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t, "") + defer socket.Close() + dpdk := dpdk{ + SocketPath: pathToSocket, + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + } + + go simulateSocketResponse(socket, t) + + require.Empty(t, dpdk.connectors) + err := dpdk.maintainConnections() + require.NoError(t, err) + require.Len(t, dpdk.connectors, 1) + + dpdk.Stop() + require.Empty(t, dpdk.connectors) + }) +} + +func Test_validateAdditionalCommands(t *testing.T) { t.Run("when validating commands in correct format then no error should be returned", func(t *testing.T) { dpdk := dpdk{ AdditionalCommands: []string{"/test", "/help"}, } - err := dpdk.validateCommands() + err := dpdk.validateAdditionalCommands() require.NoError(t, err) }) @@ -94,10 +344,10 @@ func Test_validateCommands(t *testing.T) { }, } - err := dpdk.validateCommands() + err := dpdk.validateAdditionalCommands() require.Error(t, err) - require.Contains(t, err.Error(), "command should start with '/'") + require.Contains(t, err.Error(), "command should start with slash") }) t.Run("when validating long command (without parameters) then error should be returned", func(t *testing.T) { @@ -107,7 +357,7 @@ func Test_validateCommands(t *testing.T) { }, } - err := dpdk.validateCommands() + err := dpdk.validateAdditionalCommands() require.Error(t, err) require.Contains(t, err.Error(), "command is too long") @@ -120,7 +370,7 @@ func Test_validateCommands(t *testing.T) { }, } - err := dpdk.validateCommands() + err := dpdk.validateAdditionalCommands() require.Error(t, err) require.Contains(t, err.Error(), "shall be less than 1024 characters") @@ -133,7 +383,7 @@ func Test_validateCommands(t *testing.T) { }, } - err := dpdk.validateCommands() + err := dpdk.validateAdditionalCommands() require.Error(t, err) require.Contains(t, err.Error(), "got empty command") @@ -147,7 +397,7 @@ func Test_validateCommands(t *testing.T) { } require.Len(t, dpdk.AdditionalCommands, 2) - err := dpdk.validateCommands() + err := dpdk.validateAdditionalCommands() require.Len(t, dpdk.AdditionalCommands, 1) require.NoError(t, err) @@ -157,65 +407,62 @@ func Test_validateCommands(t *testing.T) { func prepareEnvironment() (*mocks.Conn, dpdk, *testutil.Accumulator) { mockConnection := &mocks.Conn{} dpdk := dpdk{ - connector: &dpdkConnector{ - connection: mockConnection, - maxOutputLen: 1024, + connectors: []*dpdkConnector{{ + connection: mockConnection, + initMessage: &initMessage{ + Version: "mockedDPDK", + Pid: 1, + MaxOutputLen: 1024, + }, accessTimeout: 2 * time.Second, - }, + }}, Log: testutil.Logger{}, } mockAcc := &testutil.Accumulator{} return mockConnection, dpdk, mockAcc } -func Test_processCommand(t *testing.T) { - t.Run("should pass if received valid response", func(t *testing.T) { - mockConn, dpdk, mockAcc := prepareEnvironment() - defer mockConn.AssertExpectations(t) - response := `{"/": ["/", "/eal/app_params", "/eal/params", "/ethdev/link_status"]}` - simulateResponse(mockConn, response, nil) +func prepareEnvironmentWithMultiSockets() ([]*mocks.Conn, dpdk, *testutil.Accumulator) { + mockConnections := []*mocks.Conn{{}, {}} + dpdk := dpdk{ + connectors: []*dpdkConnector{ + { + connection: mockConnections[0], + initMessage: &initMessage{ + Version: "mockedDPDK", + Pid: 1, + MaxOutputLen: 1024, + }, + accessTimeout: 2 * time.Second, + }, + { + connection: mockConnections[1], + initMessage: &initMessage{ + Version: "mockedDPDK", + Pid: 2, + MaxOutputLen: 1024, + }, + accessTimeout: 2 * time.Second, + }, + }, + Log: testutil.Logger{}, + } + mockAcc := &testutil.Accumulator{} + return mockConnections, dpdk, mockAcc +} - dpdk.processCommand(mockAcc, "/") - - require.Empty(t, mockAcc.Errors) - }) - - t.Run("if received a non-JSON object then should return error", func(t *testing.T) { - mockConn, dpdk, mockAcc := prepareEnvironment() - defer mockConn.AssertExpectations(t) - response := `notAJson` - simulateResponse(mockConn, response, nil) - - dpdk.processCommand(mockAcc, "/") - - require.Len(t, mockAcc.Errors, 1) - require.Contains(t, mockAcc.Errors[0].Error(), "invalid character") - }) - - t.Run("if failed to get command response then accumulator should contain error", func(t *testing.T) { - mockConn, dpdk, mockAcc := prepareEnvironment() - defer mockConn.AssertExpectations(t) - mockConn.On("Write", mock.Anything).Return(0, fmt.Errorf("deadline exceeded")) - mockConn.On("SetDeadline", mock.Anything).Return(nil) - mockConn.On("Close").Return(nil) - - dpdk.processCommand(mockAcc, "/") - - require.Len(t, mockAcc.Errors, 1) - require.Contains(t, mockAcc.Errors[0].Error(), "deadline exceeded") - }) - - t.Run("if response contains nil or empty value then error should be returned in accumulator", func(t *testing.T) { - mockConn, dpdk, mockAcc := prepareEnvironment() - defer mockConn.AssertExpectations(t) - response := `{"/test": null}` - simulateResponse(mockConn, response, nil) - - dpdk.processCommand(mockAcc, "/test,param") - - require.Len(t, mockAcc.Errors, 1) - require.Contains(t, mockAcc.Errors[0].Error(), "got empty json on") - }) +func prepareEnvironmentWithInitializedMessage(initMsg *initMessage) (*mocks.Conn, dpdk, *testutil.Accumulator) { + mockConnection := &mocks.Conn{} + dpdk := dpdk{ + connectors: []*dpdkConnector{{ + connection: mockConnection, + accessTimeout: 2 * time.Second, + initMessage: initMsg, + }}, + Log: testutil.Logger{}, + } + mockAcc := &testutil.Accumulator{} + return mockConnection, dpdk, mockAcc } func Test_appendCommandsWithParams(t *testing.T) { @@ -226,11 +473,12 @@ func Test_appendCommandsWithParams(t *testing.T) { simulateResponse(mockConn, response, nil) expectedCommands := []string{"/action1,1", "/action1,123", "/action2,1", "/action2,123"} - result, err := dpdk.appendCommandsWithParamsFromList("/testendpoint", []string{"/action1", "/action2"}) - - require.NoError(t, err) - require.Len(t, result, 4) - require.ElementsMatch(t, result, expectedCommands) + for _, dpdkConn := range dpdk.connectors { + result, err := dpdkConn.appendCommandsWithParamsFromList("/testendpoint", []string{"/action1", "/action2"}) + require.NoError(t, err) + require.Len(t, result, 4) + require.ElementsMatch(t, result, expectedCommands) + } }) } @@ -246,7 +494,7 @@ func Test_getCommandsAndParamsCombinations(t *testing.T) { dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats"} dpdk.ethdevExcludedCommandsFilter, _ = filter.Compile([]string{}) dpdk.AdditionalCommands = []string{} - commands := dpdk.gatherCommands(mockAcc) + commands := dpdk.gatherCommands(mockAcc, dpdk.connectors[0]) require.ElementsMatch(t, commands, expectedCommands) require.Empty(t, mockAcc.Errors) @@ -262,7 +510,7 @@ func Test_getCommandsAndParamsCombinations(t *testing.T) { dpdk.DeviceTypes = []string{"rawdev"} dpdk.rawdevCommands = []string{"/rawdev/xstats"} dpdk.AdditionalCommands = []string{} - commands := dpdk.gatherCommands(mockAcc) + commands := dpdk.gatherCommands(mockAcc, dpdk.connectors[0]) require.ElementsMatch(t, commands, expectedCommands) require.Empty(t, mockAcc.Errors) @@ -279,7 +527,7 @@ func Test_getCommandsAndParamsCombinations(t *testing.T) { dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats"} dpdk.ethdevExcludedCommandsFilter, _ = filter.Compile([]string{"/ethdev/xstats"}) dpdk.AdditionalCommands = []string{} - commands := dpdk.gatherCommands(mockAcc) + commands := dpdk.gatherCommands(mockAcc, dpdk.connectors[0]) require.ElementsMatch(t, commands, expectedCommands) require.Empty(t, mockAcc.Errors) @@ -294,14 +542,105 @@ func Test_getCommandsAndParamsCombinations(t *testing.T) { dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats"} dpdk.ethdevExcludedCommandsFilter, _ = filter.Compile([]string{}) dpdk.AdditionalCommands = []string{} - commands := dpdk.gatherCommands(mockAcc) + commands := dpdk.gatherCommands(mockAcc, dpdk.connectors[0]) require.Empty(t, commands) require.Len(t, mockAcc.Errors, 1) }) } +func Test_getDpdkInMemorySocketPaths(t *testing.T) { + var err error + + t.Run("Should return nil if path doesn't exist", func(t *testing.T) { + dpdk := dpdk{ + SocketPath: "/tmp/nothing-should-exist-here/test.socket", + Log: testutil.Logger{}, + } + dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath) + require.NoError(t, err) + + socketsPaths := dpdk.getDpdkInMemorySocketPaths() + require.Nil(t, socketsPaths) + }) + + t.Run("Should return nil if can't read the dir", func(t *testing.T) { + dpdk := dpdk{ + SocketPath: "/root/no_access", + Log: testutil.Logger{}, + } + dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath) + require.NoError(t, err) + + socketsPaths := dpdk.getDpdkInMemorySocketPaths() + require.Nil(t, socketsPaths) + }) + + t.Run("Should return one socket from socket path", func(t *testing.T) { + socketPath, socket := createSocketForTest(t, "") + defer socket.Close() + + dpdk := dpdk{ + SocketPath: socketPath, + Log: testutil.Logger{}, + } + dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath) + require.NoError(t, err) + + socketsPaths := dpdk.getDpdkInMemorySocketPaths() + require.Len(t, socketsPaths, 1) + require.Equal(t, socketPath, socketsPaths[0]) + }) + + t.Run("Should return 2 sockets from socket path", func(t *testing.T) { + socketPaths, sockets := createMultipleSocketsForTest(t, 2, "") + defer func() { + for _, socket := range sockets { + socket.Close() + } + }() + + dpdk := dpdk{ + SocketPath: socketPaths[0], + Log: testutil.Logger{}, + } + dpdk.socketGlobPath, err = prepareGlob(dpdk.SocketPath) + require.NoError(t, err) + + socketsPathsFromFunc := dpdk.getDpdkInMemorySocketPaths() + require.Len(t, socketsPathsFromFunc, 2) + require.Equal(t, socketPaths, socketsPathsFromFunc) + }) +} + func Test_Gather(t *testing.T) { + t.Run("Gather should return error, because socket weren't created", func(t *testing.T) { + mockAcc := &testutil.Accumulator{} + dpdk := dpdk{ + Log: testutil.Logger{}, + PluginOptions: []string{}, + } + + _ = dpdk.Init() + + err := dpdk.Gather(mockAcc) + require.Error(t, err) + require.Contains(t, err.Error(), "couldn't connect to socket") + }) + + t.Run("Gather shouldn't return error with UnreachableSocketBehavior: Ignore option, because socket weren't created", func(t *testing.T) { + mockAcc := &testutil.Accumulator{} + dpdk := dpdk{ + Log: testutil.Logger{}, + PluginOptions: []string{}, + UnreachableSocketBehavior: unreachableSocketBehaviorIgnore, + } + _ = dpdk.Init() + + err := dpdk.Gather(mockAcc) + require.NoError(t, err) + }) + t.Run("When parsing a plain json without nested object, then its key should be equal to \"\"", func(t *testing.T) { mockConn, dpdk, mockAcc := prepareEnvironment() defer mockConn.AssertExpectations(t) @@ -360,6 +699,164 @@ func Test_Gather(t *testing.T) { actual := mockAcc.GetTelegrafMetrics() testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) }) + + t.Run("Test Gather with Metadata Fields dpdk pid and version", func(t *testing.T) { + testInitMessage := &initMessage{ + Pid: 100, + Version: "DPDK 21.11.11", + MaxOutputLen: 1024, + } + mockConn, dpdk, mockAcc := prepareEnvironmentWithInitializedMessage(testInitMessage) + dpdk.MetadataFields = []string{dpdkMetadataFieldPidName, dpdkMetadataFieldVersionName} + defer mockConn.AssertExpectations(t) + dpdk.AdditionalCommands = []string{"/endpoint1"} + simulateResponse(mockConn, `{"/endpoint1":"myvalue"}`, nil) + + err := dpdk.Gather(mockAcc) + + require.NoError(t, err) + require.Empty(t, mockAcc.Errors) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": "/endpoint1", + "params": "", + }, + map[string]interface{}{ + "": "myvalue", + dpdkMetadataFieldPidName: testInitMessage.Pid, + dpdkMetadataFieldVersionName: testInitMessage.Version, + }, + time.Unix(0, 0), + ), + } + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) + + t.Run("Test Gather with Metadata Fields dpdk_pid", func(t *testing.T) { + testInitMessage := &initMessage{ + Pid: 100, + Version: "DPDK 21.11.11", + MaxOutputLen: 1024, + } + mockConn, dpdk, mockAcc := prepareEnvironmentWithInitializedMessage(testInitMessage) + dpdk.MetadataFields = []string{dpdkMetadataFieldPidName} + defer mockConn.AssertExpectations(t) + dpdk.AdditionalCommands = []string{"/endpoint1"} + simulateResponse(mockConn, `{"/endpoint1":"myvalue"}`, nil) + + err := dpdk.Gather(mockAcc) + + require.NoError(t, err) + require.Empty(t, mockAcc.Errors) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": "/endpoint1", + "params": "", + }, + map[string]interface{}{ + "": "myvalue", + dpdkMetadataFieldPidName: testInitMessage.Pid, + }, + time.Unix(0, 0), + ), + } + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) + + t.Run("Test Gather without Metadata Fields", func(t *testing.T) { + testInitMessage := &initMessage{ + Pid: 100, + Version: "DPDK 21.11.11", + MaxOutputLen: 1024, + } + mockConn, dpdk, mockAcc := prepareEnvironmentWithInitializedMessage(testInitMessage) + dpdk.MetadataFields = []string{} + defer mockConn.AssertExpectations(t) + dpdk.AdditionalCommands = []string{"/endpoint1"} + simulateResponse(mockConn, `{"/endpoint1":"myvalue"}`, nil) + + err := dpdk.Gather(mockAcc) + + require.NoError(t, err) + require.Empty(t, mockAcc.Errors) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": "/endpoint1", + "params": "", + }, + map[string]interface{}{ + "": "myvalue", + }, + time.Unix(0, 0), + ), + } + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) +} + +func Test_Gather_MultiSocket(t *testing.T) { + t.Run("Test Gather without Metadata Fields", func(t *testing.T) { + mockConns, dpdk, mockAcc := prepareEnvironmentWithMultiSockets() + dpdk.MetadataFields = []string{} + defer func() { + for _, mockConn := range mockConns { + mockConn.AssertExpectations(t) + } + }() + dpdk.AdditionalCommands = []string{"/endpoint1"} + + for _, mockConn := range mockConns { + simulateResponse(mockConn, `{"/endpoint1":"myvalue"}`, nil) + } + + err := dpdk.Gather(mockAcc) + + require.NoError(t, err) + require.Empty(t, mockAcc.Errors) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": "/endpoint1", + "params": "", + }, + map[string]interface{}{ + "": "myvalue", + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": "/endpoint1", + "params": "", + }, + map[string]interface{}{ + "": "myvalue", + }, + time.Unix(0, 0), + ), + } + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) } func simulateResponse(mockConn *mocks.Conn, response string, readErr error) { @@ -375,6 +872,44 @@ func simulateResponse(mockConn *mocks.Conn, response string, readErr error) { } } +func createSocketForTest(t *testing.T, dirPath string) (string, net.Listener) { + var err error + var pathToSocket string + if len(dirPath) == 0 { + dirPath, err = os.MkdirTemp("", "dpdk-test-socket") + require.NoError(t, err) + pathToSocket = filepath.Join(dirPath, dpdkSocketTemplateName) + } else { + pathToSocket = fmt.Sprintf("%s:%d", filepath.Join(dirPath, dpdkSocketTemplateName), rand.Intn(100)+1) + } + + socket, err := net.Listen("unixpacket", pathToSocket) + require.NoError(t, err) + return pathToSocket, socket +} + +func createMultipleSocketsForTest(t *testing.T, numSockets int, dirPath string) (socketsPaths []string, sockets []net.Listener) { + var err error + if len(dirPath) == 0 { + dirPath, err = os.MkdirTemp("", "dpdk-test-socket") + } + require.NoError(t, err) + + for i := 0; i < numSockets; i++ { + var pathToSocket string + if i == 0 { + pathToSocket = filepath.Join(dirPath, dpdkSocketTemplateName) + } else { + pathToSocket = filepath.Join(dirPath, fmt.Sprintf("%s:%d", dpdkSocketTemplateName, 1000+i)) + } + socket, err := net.Listen("unixpacket", pathToSocket) + require.NoError(t, err) + socketsPaths = append(socketsPaths, pathToSocket) + sockets = append(sockets, socket) + } + return socketsPaths, sockets +} + func simulateSocketResponse(socket net.Listener, t *testing.T) { conn, err := socket.Accept() require.NoError(t, err) @@ -385,3 +920,7 @@ func simulateSocketResponse(socket net.Listener, t *testing.T) { _, err = conn.Write(initMessage) require.NoError(t, err) } + +func prepareGlob(path string) (*globpath.GlobPath, error) { + return globpath.Compile(path + "*") +} diff --git a/plugins/inputs/dpdk/dpdk_utils.go b/plugins/inputs/dpdk/dpdk_utils.go index 5625dcaeb..a3a756b12 100644 --- a/plugins/inputs/dpdk/dpdk_utils.go +++ b/plugins/inputs/dpdk/dpdk_utils.go @@ -4,6 +4,7 @@ package dpdk import ( "encoding/json" + "errors" "fmt" "os" "reflect" @@ -39,6 +40,25 @@ func getParams(command string) string { return command[index+1:] } +// Checks if the provided filePath contains in-memory socket +func isInMemorySocketPath(filePath, socketPath string) bool { + if filePath == socketPath { + return true + } + + socketPathPrefix := fmt.Sprintf("%s:", socketPath) + if strings.HasPrefix(filePath, socketPathPrefix) { + suffix := filePath[len(socketPathPrefix):] + if number, err := strconv.Atoi(suffix); err == nil { + if number > 0 { + return true + } + } + } + + return false +} + // Checks if provided path points to socket func isSocket(path string) error { pathInfo, err := os.Lstat(path) @@ -60,7 +80,7 @@ func isSocket(path string) error { // Converts JSON array containing devices identifiers from DPDK response to string slice func jsonToArray(input []byte, command string) ([]string, error) { if len(input) == 0 { - return nil, fmt.Errorf("got empty object instead of json") + return nil, errors.New("got empty object instead of json") } var rawMessage map[string]json.RawMessage @@ -72,7 +92,7 @@ func jsonToArray(input []byte, command string) ([]string, error) { var intArray []int64 err = json.Unmarshal(rawMessage[command], &intArray) if err != nil { - return nil, fmt.Errorf("failed to unmarshall json response: %w", err) + return nil, fmt.Errorf("failed to unmarshal json response: %w", err) } stringArray := make([]string, 0, len(intArray)) diff --git a/plugins/inputs/dpdk/dpdk_utils_test.go b/plugins/inputs/dpdk/dpdk_utils_test.go index 338d7a8f9..4ac43ef0f 100644 --- a/plugins/inputs/dpdk/dpdk_utils_test.go +++ b/plugins/inputs/dpdk/dpdk_utils_test.go @@ -4,7 +4,6 @@ package dpdk import ( "fmt" - "net" "os" "strconv" "testing" @@ -20,8 +19,8 @@ func Test_isSocket(t *testing.T) { require.Contains(t, err.Error(), "provided path does not exist") }) - t.Run("should pass if path points to socket", func(t *testing.T) { - pathToSocket, socket := createSocketForTest(t) + t.Run("Should pass if path points to socket", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t, "") defer socket.Close() err := isSocket(pathToSocket) @@ -125,13 +124,6 @@ func Test_jsonToArray(t *testing.T) { _, err := jsonToArray([]byte(jsonString), key) require.Error(t, err) - require.Contains(t, err.Error(), "failed to unmarshall json response") + require.Contains(t, err.Error(), "failed to unmarshal json response") }) } - -func createSocketForTest(t *testing.T) (string, net.Listener) { - pathToSocket := "/tmp/dpdk-test-socket" - socket, err := net.Listen("unixpacket", pathToSocket) - require.NoError(t, err) - return pathToSocket, socket -} diff --git a/plugins/inputs/dpdk/sample.conf b/plugins/inputs/dpdk/sample.conf index 1199e517d..fd1219b63 100644 --- a/plugins/inputs/dpdk/sample.conf +++ b/plugins/inputs/dpdk/sample.conf @@ -14,20 +14,37 @@ # socket_access_timeout = "200ms" ## Enables telemetry data collection for selected device types. - ## Adding "ethdev" enables collection of telemetry from DPDK NICs - ## (stats, xstats, link_status). - ## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices - ## (xstats). + ## Adding "ethdev" enables collection of telemetry from DPDK NICs (stats, xstats, link_status, info). + ## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices (xstats). # device_types = ["ethdev"] ## List of custom, application-specific telemetry commands to query ## The list of available commands depend on the application deployed. ## Applications can register their own commands via telemetry library API - ## http://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands + ## https://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands ## For L3 Forwarding with Power Management Sample Application this could be: ## additional_commands = ["/l3fwd-power/stats"] # additional_commands = [] + ## List of plugin options. + ## Supported options: + ## - "in_memory" option enables reading for multiple sockets when a dpdk application is running with --in-memory option. + ## When option is enabled plugin will try to find additional socket paths related to provided socket_path. + ## Details: https://doc.dpdk.org/guides/howto/telemetry.html#connecting-to-different-dpdk-processes + # plugin_options = ["in_memory"] + + ## Specifies plugin behavior regarding unreachable socket (which might not have been initialized yet). + ## Available choices: + ## - error: Telegraf will return an error during the startup and gather phases if socket is unreachable + ## - ignore: Telegraf will ignore error regarding unreachable socket on both startup and gather + # unreachable_socket_behavior = "error" + + ## List of metadata fields which will be added to every metric produced by the plugin. + ## Supported options: + ## - "pid" - exposes PID of DPDK process. Example: pid=2179660i + ## - "version" - exposes version of DPDK. Example: version="DPDK 21.11.2" + # metadata_fields = ["pid", "version"] + ## Allows turning off collecting data for individual "ethdev" commands. ## Remove "/ethdev/link_status" from list to gather link status metrics. [inputs.dpdk.ethdev]