diff --git a/README.md b/README.md index 45bdc43ba..b2d8e6a54 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,7 @@ For documentation on the latest development code see the [documentation index][d * [docker](./plugins/inputs/docker) * [docker_log](./plugins/inputs/docker_log) * [dovecot](./plugins/inputs/dovecot) +* [dpdk](./plugins/inputs/dpdk) * [aws ecs](./plugins/inputs/ecs) (Amazon Elastic Container Service, Fargate) * [elasticsearch](./plugins/inputs/elasticsearch) * [ethtool](./plugins/inputs/ethtool) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 5f7e81648..3beb30cb4 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -41,6 +41,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/docker" _ "github.com/influxdata/telegraf/plugins/inputs/docker_log" _ "github.com/influxdata/telegraf/plugins/inputs/dovecot" + _ "github.com/influxdata/telegraf/plugins/inputs/dpdk" _ "github.com/influxdata/telegraf/plugins/inputs/ecs" _ "github.com/influxdata/telegraf/plugins/inputs/elasticsearch" _ "github.com/influxdata/telegraf/plugins/inputs/ethtool" diff --git a/plugins/inputs/dpdk/README.md b/plugins/inputs/dpdk/README.md new file mode 100644 index 000000000..bd98af050 --- /dev/null +++ b/plugins/inputs/dpdk/README.md @@ -0,0 +1,200 @@ +# DPDK Input Plugin +The `dpdk` plugin collects metrics exposed by applications built with [Data Plane Development Kit](https://www.dpdk.org/) +which is an extensive set of open source libraries designed for accelerating packet processing workloads. + +DPDK provides APIs that enable exposing various statistics from the devices used by DPDK applications and enable exposing +KPI metrics directly from applications. Device statistics include e.g. common statistics available across NICs, like: +received and sent packets, received and sent bytes etc. In addition to this generic statistics, an extended statistics API +is available that allows providing more detailed, driver-specific metrics that are not available as generic statistics. + +[DPDK Release 20.05](https://doc.dpdk.org/guides/rel_notes/release_20_05.html) introduced updated telemetry interface +that enables DPDK libraries and applications to provide their telemetry. This is referred to as `v2` version of this +socket-based telemetry interface. This release enabled e.g. reading driver-specific extended stats (`/ethdev/xstats`) +via this new interface. + +[DPDK Release 20.11](https://doc.dpdk.org/guides/rel_notes/release_20_11.html) introduced reading via `v2` interface +common statistics (`/ethdev/stats`) in addition to existing (`/ethdev/xstats`). + +The example usage of `v2` telemetry interface can be found in [Telemetry User Guide](https://doc.dpdk.org/guides/howto/telemetry.html). +A variety of [DPDK Sample Applications](https://doc.dpdk.org/guides/sample_app_ug/index.html) is also available for users +to discover and test the capabilities of DPDK libraries and to explore the exposed metrics. + +> **DPDK Version Info:** This plugin uses this `v2` interface to read telemetry data from applications build with +> `DPDK version >= 20.05`. The default configuration include reading common statistics from `/ethdev/stats` that is +> available from `DPDK version >= 20.11`. When using `DPDK 20.05 <= version < DPDK 20.11` it is recommended to disable +> querying `/ethdev/stats` by setting corresponding `exclude_commands` configuration option. + +> **NOTE:** Since DPDK will most likely run with root privileges, the socket telemetry interface exposed by DPDK +> will also require root access. This means that either access permissions have to be adjusted for socket telemetry +> interface to allow Telegraf to access it, or Telegraf should run with root privileges. + +## Configuration +This plugin offers multiple configuration options, please review examples below for additional usage information. +```toml +# Reads metrics from DPDK applications using v2 telemetry interface. +[[inputs.dpdk]] + ## Path to DPDK telemetry socket. This shall point to v2 version of DPDK telemetry interface. + # socket_path = "/var/run/dpdk/rte/dpdk_telemetry.v2" + + ## Duration that defines how long the connected socket client will wait for a response before terminating connection. + ## This includes both writing to and reading from socket. Since it's local socket access + ## to a fast packet processing application, the timeout should be sufficient for most users. + ## Setting the value to 0 disables the timeout (not recommended) + # socket_access_timeout = "200ms" + + ## Enables telemetry data collection for selected device types. + ## Adding "ethdev" enables collection of telemetry from DPDK NICs (stats, xstats, link_status). + ## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices (xstats). + # device_types = ["ethdev"] + + ## List of custom, application-specific telemetry commands to query + ## The list of available commands depend on the application deployed. Applications can register their own commands + ## via telemetry library API http://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands + ## For e.g. L3 Forwarding with Power Management Sample Application this could be: + ## additional_commands = ["/l3fwd-power/stats"] + # additional_commands = [] + + ## Allows turning off collecting data for individual "ethdev" commands. + ## Remove "/ethdev/link_status" from list to start getting link status metrics. + [inputs.dpdk.ethdev] + exclude_commands = ["/ethdev/link_status"] + + ## When running multiple instances of the plugin it's recommended to add a unique tag to each instance to identify + ## metrics exposed by an instance of DPDK application. This is useful when multiple DPDK apps run on a single host. + ## [inputs.dpdk.tags] + ## dpdk_instance = "my-fwd-app" +``` + +### Example: Minimal Configuration for NIC metrics +This configuration allows getting metrics for all devices reported via `/ethdev/list` command: +* `/ethdev/stats` - basic device statistics (since `DPDK 20.11`) +* `/ethdev/xstats` - extended device statistics +* `/ethdev/link_status` - up/down link status +```toml +[[inputs.dpdk]] + device_types = ["ethdev"] +``` +Since this configuration will query `/ethdev/link_status` it's recommended to increase timeout to `socket_access_timeout = "10s"`. + +The [plugin collecting interval](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#input-plugins) +should be adjusted accordingly (e.g. `interval = "30s"`). + +### Example: Excluding NIC link status from being collected +Checking link status depending on underlying implementation may take more time to complete. +This configuration can be used to exclude this telemetry command to allow faster response for metrics. +```toml +[[inputs.dpdk]] + device_types = ["ethdev"] + + [inputs.dpdk.ethdev] + exclude_commands = ["/ethdev/link_status"] +``` +A separate plugin instance with higher timeout settings can be used to get `/ethdev/link_status` independently. +Consult [Independent NIC link status configuration](#example-independent-nic-link-status-configuration) +and [Getting metrics from multiple DPDK instances running on same host](#example-getting-metrics-from-multiple-dpdk-instances-running-on-same-host) +examples for further details. + +### Example: Independent NIC link status configuration +This configuration allows getting `/ethdev/link_status` using separate configuration, with higher timeout. +```toml +[[inputs.dpdk]] + interval = "30s" + socket_access_timeout = "10s" + device_types = ["ethdev"] + + [inputs.dpdk.ethdev] + exclude_commands = ["/ethdev/stats", "/ethdev/xstats"] +``` + +### Example: Getting application-specific metrics +This configuration allows reading custom metrics exposed by applications. Example telemetry command obtained from +[L3 Forwarding with Power Management Sample Application](https://doc.dpdk.org/guides/sample_app_ug/l3_forward_power_man.html). +```toml +[[inputs.dpdk]] + device_types = ["ethdev"] + additional_commands = ["/l3fwd-power/stats"] + + [inputs.dpdk.ethdev] + exclude_commands = ["/ethdev/link_status"] +``` +Command entries specified in `additional_commands` should match DPDK command format: +* Command entry format: either `command` or `command,params` for commands that expect parameters, where comma (`,`) separates command from params. +* Command entry length (command with params) should be `< 1024` characters. +* Command length (without params) should be `< 56` characters. +* Commands have to start with `/`. + +Providing invalid commands will prevent the plugin from starting. Additional commands allow duplicates, but they +will be removed during execution so each command will be executed only once during each metric gathering interval. + +### Example: Getting metrics from multiple DPDK instances running on same host +This configuration allows getting metrics from two separate applications exposing their telemetry interfaces +via separate sockets. For each plugin instance a unique tag `[inputs.dpdk.tags]` allows distinguishing between them. +```toml +# Instance #1 - L3 Forwarding with Power Management Application +[[inputs.dpdk]] + socket_path = "/var/run/dpdk/rte/l3fwd-power_telemetry.v2" + device_types = ["ethdev"] + additional_commands = ["/l3fwd-power/stats"] + + [inputs.dpdk.ethdev] + exclude_commands = ["/ethdev/link_status"] + + [inputs.dpdk.tags] + dpdk_instance = "l3fwd-power" + +# Instance #2 - L2 Forwarding with Intel Cache Allocation Technology (CAT) Application +[[inputs.dpdk]] + socket_path = "/var/run/dpdk/rte/l2fwd-cat_telemetry.v2" + device_types = ["ethdev"] + +[inputs.dpdk.ethdev] + exclude_commands = ["/ethdev/link_status"] + + [inputs.dpdk.tags] + dpdk_instance = "l2fwd-cat" +``` +This utilizes Telegraf's standard capability of [adding custom tags](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#input-plugins) +to input plugin's measurements. + +## Metrics +The DPDK socket accepts `command,params` requests and returns metric data in JSON format. All metrics from DPDK socket +become flattened using [Telegraf's JSON Flattener](../../parsers/json/README.md) and exposed as fields. +If DPDK response contains no information (is empty or is null) then such response will be discarded. + +> **NOTE:** Since DPDK allows registering custom metrics in its telemetry framework the JSON response from DPDK +> may contain various sets of metrics. While metrics from `/ethdev/stats` should be most stable, the `/ethdev/xstats` +> may contain driver-specific metrics (depending on DPDK application configuration). The application-specific commands +> like `/l3fwd-power/stats` can return their own specific set of metrics. + +## Example output +The output consists of plugin name (`dpdk`), and a set of tags that identify querying hierarchy: +``` +dpdk,host=dpdk-host,dpdk_instance=l3fwd-power,command=/ethdev/stats,params=0 [fields] [timestamp] +``` + +| Tag | Description | +|-----|-------------| +| `host` | hostname of the machine (consult [Telegraf Agent configuration](https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#agent) for additional details) | +| `dpdk_instance` | custom tag from `[inputs.dpdk.tags]` (optional) | +| `command` | executed command (without params) | +| `params` | command parameter, e.g. for `/ethdev/stats` it is the id of NIC as exposed by `/ethdev/list`
For DPDK app that uses 2 NICs the metrics will output e.g. `params=0`, `params=1`. | + +When running plugin configuration below... +```toml +[[inputs.dpdk]] + device_types = ["ethdev"] + additional_commands = ["/l3fwd-power/stats"] + [inputs.dpdk.tags] + dpdk_instance = "l3fwd-power" +``` + +...expected output for `dpdk` plugin instance running on host named `host=dpdk-host`: +``` +dpdk,command=/ethdev/stats,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 q_opackets_0=0,q_ipackets_5=0,q_errors_11=0,ierrors=0,q_obytes_5=0,q_obytes_10=0,q_opackets_10=0,q_ipackets_4=0,q_ipackets_7=0,q_ipackets_15=0,q_ibytes_5=0,q_ibytes_6=0,q_ibytes_9=0,obytes=0,q_opackets_1=0,q_opackets_11=0,q_obytes_7=0,q_errors_5=0,q_errors_10=0,q_ibytes_4=0,q_obytes_6=0,q_errors_1=0,q_opackets_5=0,q_errors_3=0,q_errors_12=0,q_ipackets_11=0,q_ipackets_12=0,q_obytes_14=0,q_opackets_15=0,q_obytes_2=0,q_errors_8=0,q_opackets_12=0,q_errors_0=0,q_errors_9=0,q_opackets_14=0,q_ibytes_3=0,q_ibytes_15=0,q_ipackets_13=0,q_ipackets_14=0,q_obytes_3=0,q_errors_13=0,q_opackets_3=0,q_ibytes_0=7092,q_ibytes_2=0,q_ibytes_8=0,q_ipackets_8=0,q_ipackets_10=0,q_obytes_4=0,q_ibytes_10=0,q_ibytes_13=0,q_ibytes_1=0,q_ibytes_12=0,opackets=0,q_obytes_1=0,q_errors_15=0,q_opackets_2=0,oerrors=0,rx_nombuf=0,q_opackets_8=0,q_ibytes_11=0,q_ipackets_3=0,q_obytes_0=0,q_obytes_12=0,q_obytes_11=0,q_obytes_13=0,q_errors_6=0,q_ipackets_1=0,q_ipackets_6=0,q_ipackets_9=0,q_obytes_15=0,q_opackets_7=0,q_ibytes_14=0,ipackets=98,q_ipackets_2=0,q_opackets_6=0,q_ibytes_7=0,imissed=0,q_opackets_4=0,q_opackets_9=0,q_obytes_8=0,q_obytes_9=0,q_errors_4=0,q_errors_14=0,q_opackets_13=0,ibytes=7092,q_ipackets_0=98,q_errors_2=0,q_errors_7=0 1606310780000000000 +dpdk,command=/ethdev/stats,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 q_opackets_0=0,q_ipackets_5=0,q_errors_11=0,ierrors=0,q_obytes_5=0,q_obytes_10=0,q_opackets_10=0,q_ipackets_4=0,q_ipackets_7=0,q_ipackets_15=0,q_ibytes_5=0,q_ibytes_6=0,q_ibytes_9=0,obytes=0,q_opackets_1=0,q_opackets_11=0,q_obytes_7=0,q_errors_5=0,q_errors_10=0,q_ibytes_4=0,q_obytes_6=0,q_errors_1=0,q_opackets_5=0,q_errors_3=0,q_errors_12=0,q_ipackets_11=0,q_ipackets_12=0,q_obytes_14=0,q_opackets_15=0,q_obytes_2=0,q_errors_8=0,q_opackets_12=0,q_errors_0=0,q_errors_9=0,q_opackets_14=0,q_ibytes_3=0,q_ibytes_15=0,q_ipackets_13=0,q_ipackets_14=0,q_obytes_3=0,q_errors_13=0,q_opackets_3=0,q_ibytes_0=7092,q_ibytes_2=0,q_ibytes_8=0,q_ipackets_8=0,q_ipackets_10=0,q_obytes_4=0,q_ibytes_10=0,q_ibytes_13=0,q_ibytes_1=0,q_ibytes_12=0,opackets=0,q_obytes_1=0,q_errors_15=0,q_opackets_2=0,oerrors=0,rx_nombuf=0,q_opackets_8=0,q_ibytes_11=0,q_ipackets_3=0,q_obytes_0=0,q_obytes_12=0,q_obytes_11=0,q_obytes_13=0,q_errors_6=0,q_ipackets_1=0,q_ipackets_6=0,q_ipackets_9=0,q_obytes_15=0,q_opackets_7=0,q_ibytes_14=0,ipackets=98,q_ipackets_2=0,q_opackets_6=0,q_ibytes_7=0,imissed=0,q_opackets_4=0,q_opackets_9=0,q_obytes_8=0,q_obytes_9=0,q_errors_4=0,q_errors_14=0,q_opackets_13=0,ibytes=7092,q_ipackets_0=98,q_errors_2=0,q_errors_7=0 1606310780000000000 +dpdk,command=/ethdev/xstats,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 out_octets_encrypted=0,rx_fcoe_mbuf_allocation_errors=0,tx_q1packets=0,rx_priority0_xoff_packets=0,rx_priority7_xoff_packets=0,rx_errors=0,mac_remote_errors=0,in_pkts_invalid=0,tx_priority3_xoff_packets=0,tx_errors=0,rx_fcoe_bytes=0,rx_flow_control_xon_packets=0,rx_priority4_xoff_packets=0,tx_priority2_xoff_packets=0,rx_illegal_byte_errors=0,rx_xoff_packets=0,rx_management_packets=0,rx_priority7_dropped=0,rx_priority4_dropped=0,in_pkts_unchecked=0,rx_error_bytes=0,rx_size_256_to_511_packets=0,tx_priority4_xoff_packets=0,rx_priority6_xon_packets=0,tx_priority4_xon_to_xoff_packets=0,in_pkts_delayed=0,rx_priority0_mbuf_allocation_errors=0,out_octets_protected=0,tx_priority7_xon_to_xoff_packets=0,tx_priority1_xon_to_xoff_packets=0,rx_fcoe_no_direct_data_placement_ext_buff=0,tx_priority6_xon_to_xoff_packets=0,flow_director_filter_add_errors=0,rx_total_packets=99,rx_crc_errors=0,flow_director_filter_remove_errors=0,rx_missed_errors=0,tx_size_64_packets=0,rx_priority3_dropped=0,flow_director_matched_filters=0,tx_priority2_xon_to_xoff_packets=0,rx_priority1_xon_packets=0,rx_size_65_to_127_packets=99,rx_fragment_errors=0,in_pkts_notusingsa=0,rx_q0bytes=7162,rx_fcoe_dropped=0,rx_priority1_dropped=0,rx_fcoe_packets=0,rx_priority5_xoff_packets=0,out_pkts_protected=0,tx_total_packets=0,rx_priority2_dropped=0,in_pkts_late=0,tx_q1bytes=0,in_pkts_badtag=0,rx_multicast_packets=99,rx_priority6_xoff_packets=0,tx_flow_control_xoff_packets=0,rx_flow_control_xoff_packets=0,rx_priority0_xon_packets=0,in_pkts_untagged=0,tx_fcoe_packets=0,rx_priority7_mbuf_allocation_errors=0,tx_priority0_xon_to_xoff_packets=0,tx_priority5_xon_to_xoff_packets=0,tx_flow_control_xon_packets=0,tx_q0packets=0,tx_xoff_packets=0,rx_size_512_to_1023_packets=0,rx_priority3_xon_packets=0,rx_q0errors=0,rx_oversize_errors=0,tx_priority4_xon_packets=0,tx_priority5_xoff_packets=0,rx_priority5_xon_packets=0,rx_total_missed_packets=0,rx_priority4_mbuf_allocation_errors=0,tx_priority1_xon_packets=0,tx_management_packets=0,rx_priority5_mbuf_allocation_errors=0,rx_fcoe_no_direct_data_placement=0,rx_undersize_errors=0,tx_priority1_xoff_packets=0,rx_q0packets=99,tx_q2packets=0,tx_priority6_xon_packets=0,rx_good_packets=99,tx_priority5_xon_packets=0,tx_size_256_to_511_packets=0,rx_priority6_dropped=0,rx_broadcast_packets=0,tx_size_512_to_1023_packets=0,tx_priority3_xon_to_xoff_packets=0,in_pkts_unknownsci=0,in_octets_validated=0,tx_priority6_xoff_packets=0,tx_priority7_xoff_packets=0,rx_jabber_errors=0,tx_priority7_xon_packets=0,tx_priority0_xon_packets=0,in_pkts_unusedsa=0,tx_priority0_xoff_packets=0,mac_local_errors=33,rx_total_bytes=7162,in_pkts_notvalid=0,rx_length_errors=0,in_octets_decrypted=0,rx_size_128_to_255_packets=0,rx_good_bytes=7162,tx_size_65_to_127_packets=0,rx_mac_short_packet_dropped=0,tx_size_1024_to_max_packets=0,rx_priority2_mbuf_allocation_errors=0,flow_director_added_filters=0,tx_multicast_packets=0,rx_fcoe_crc_errors=0,rx_priority1_xoff_packets=0,flow_director_missed_filters=0,rx_xon_packets=0,tx_size_128_to_255_packets=0,out_pkts_encrypted=0,rx_priority4_xon_packets=0,rx_priority0_dropped=0,rx_size_1024_to_max_packets=0,tx_good_bytes=0,rx_management_dropped=0,rx_mbuf_allocation_errors=0,tx_xon_packets=0,rx_priority3_xoff_packets=0,tx_good_packets=0,tx_fcoe_bytes=0,rx_priority6_mbuf_allocation_errors=0,rx_priority2_xon_packets=0,tx_broadcast_packets=0,tx_q2bytes=0,rx_priority7_xon_packets=0,out_pkts_untagged=0,rx_priority2_xoff_packets=0,rx_priority1_mbuf_allocation_errors=0,tx_q0bytes=0,rx_size_64_packets=0,rx_priority5_dropped=0,tx_priority2_xon_packets=0,in_pkts_nosci=0,flow_director_removed_filters=0,in_pkts_ok=0,rx_l3_l4_xsum_error=0,rx_priority3_mbuf_allocation_errors=0,tx_priority3_xon_packets=0 1606310780000000000 +dpdk,command=/ethdev/xstats,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 tx_priority5_xoff_packets=0,in_pkts_unknownsci=0,tx_q0packets=0,tx_total_packets=0,rx_crc_errors=0,rx_priority4_xoff_packets=0,rx_priority5_dropped=0,tx_size_65_to_127_packets=0,rx_good_packets=98,tx_priority6_xoff_packets=0,tx_fcoe_bytes=0,out_octets_protected=0,out_pkts_encrypted=0,rx_priority1_xon_packets=0,tx_size_128_to_255_packets=0,rx_flow_control_xoff_packets=0,rx_priority7_xoff_packets=0,tx_priority0_xon_to_xoff_packets=0,rx_broadcast_packets=0,tx_priority1_xon_packets=0,rx_xon_packets=0,rx_fragment_errors=0,tx_flow_control_xoff_packets=0,tx_q0bytes=0,out_pkts_untagged=0,rx_priority4_xon_packets=0,tx_priority5_xon_packets=0,rx_priority1_xoff_packets=0,rx_good_bytes=7092,rx_priority4_mbuf_allocation_errors=0,in_octets_decrypted=0,tx_priority2_xon_to_xoff_packets=0,rx_priority3_dropped=0,tx_multicast_packets=0,mac_local_errors=33,in_pkts_ok=0,rx_illegal_byte_errors=0,rx_xoff_packets=0,rx_q0errors=0,flow_director_added_filters=0,rx_size_256_to_511_packets=0,rx_priority3_xon_packets=0,rx_l3_l4_xsum_error=0,rx_priority6_dropped=0,in_pkts_notvalid=0,rx_size_64_packets=0,tx_management_packets=0,rx_length_errors=0,tx_priority7_xon_to_xoff_packets=0,rx_mbuf_allocation_errors=0,rx_missed_errors=0,rx_priority1_mbuf_allocation_errors=0,rx_fcoe_no_direct_data_placement=0,tx_priority3_xoff_packets=0,in_pkts_delayed=0,tx_errors=0,rx_size_512_to_1023_packets=0,tx_priority4_xon_packets=0,rx_q0bytes=7092,in_pkts_unchecked=0,tx_size_512_to_1023_packets=0,rx_fcoe_packets=0,in_pkts_nosci=0,rx_priority6_mbuf_allocation_errors=0,rx_priority1_dropped=0,tx_q2packets=0,rx_priority7_dropped=0,tx_size_1024_to_max_packets=0,rx_management_packets=0,rx_multicast_packets=98,rx_total_bytes=7092,mac_remote_errors=0,tx_priority3_xon_packets=0,rx_priority2_mbuf_allocation_errors=0,rx_priority5_mbuf_allocation_errors=0,tx_q2bytes=0,rx_size_128_to_255_packets=0,in_pkts_badtag=0,out_pkts_protected=0,rx_management_dropped=0,rx_fcoe_bytes=0,flow_director_removed_filters=0,tx_priority2_xoff_packets=0,rx_fcoe_crc_errors=0,rx_priority0_mbuf_allocation_errors=0,rx_priority0_xon_packets=0,rx_fcoe_dropped=0,tx_priority1_xon_to_xoff_packets=0,rx_size_65_to_127_packets=98,rx_q0packets=98,tx_priority0_xoff_packets=0,rx_priority6_xon_packets=0,rx_total_packets=98,rx_undersize_errors=0,flow_director_missed_filters=0,rx_jabber_errors=0,in_pkts_invalid=0,in_pkts_late=0,rx_priority5_xon_packets=0,tx_priority4_xoff_packets=0,out_octets_encrypted=0,tx_q1packets=0,rx_priority5_xoff_packets=0,rx_priority6_xoff_packets=0,rx_errors=0,in_octets_validated=0,rx_priority3_xoff_packets=0,tx_priority4_xon_to_xoff_packets=0,tx_priority5_xon_to_xoff_packets=0,tx_flow_control_xon_packets=0,rx_priority0_dropped=0,flow_director_filter_add_errors=0,tx_q1bytes=0,tx_priority6_xon_to_xoff_packets=0,flow_director_matched_filters=0,tx_priority2_xon_packets=0,rx_fcoe_mbuf_allocation_errors=0,rx_priority2_xoff_packets=0,tx_priority7_xoff_packets=0,rx_priority0_xoff_packets=0,rx_oversize_errors=0,in_pkts_notusingsa=0,tx_size_64_packets=0,rx_size_1024_to_max_packets=0,tx_priority6_xon_packets=0,rx_priority2_dropped=0,rx_priority4_dropped=0,rx_priority7_mbuf_allocation_errors=0,rx_flow_control_xon_packets=0,tx_good_bytes=0,tx_priority3_xon_to_xoff_packets=0,rx_total_missed_packets=0,rx_error_bytes=0,tx_priority7_xon_packets=0,rx_mac_short_packet_dropped=0,tx_priority1_xoff_packets=0,tx_good_packets=0,tx_broadcast_packets=0,tx_xon_packets=0,in_pkts_unusedsa=0,rx_priority2_xon_packets=0,in_pkts_untagged=0,tx_fcoe_packets=0,flow_director_filter_remove_errors=0,rx_priority3_mbuf_allocation_errors=0,tx_priority0_xon_packets=0,rx_priority7_xon_packets=0,rx_fcoe_no_direct_data_placement_ext_buff=0,tx_xoff_packets=0,tx_size_256_to_511_packets=0 1606310780000000000 +dpdk,command=/ethdev/link_status,dpdk_instance=l3fwd-power,host=dpdk-host,params=0 status="UP",speed=10000,duplex="full-duplex" 1606310780000000000 +dpdk,command=/ethdev/link_status,dpdk_instance=l3fwd-power,host=dpdk-host,params=1 status="UP",speed=10000,duplex="full-duplex" 1606310780000000000 +dpdk,command=/l3fwd-power/stats,dpdk_instance=l3fwd-power,host=dpdk-host empty_poll=49506395979901,full_poll=0,busy_percent=0 1606310780000000000 +``` diff --git a/plugins/inputs/dpdk/dpdk.go b/plugins/inputs/dpdk/dpdk.go new file mode 100644 index 000000000..293dbee90 --- /dev/null +++ b/plugins/inputs/dpdk/dpdk.go @@ -0,0 +1,263 @@ +// +build linux + +package dpdk + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/plugins/inputs" + jsonparser "github.com/influxdata/telegraf/plugins/parsers/json" +) + +const ( + description = "Reads metrics from DPDK applications using v2 telemetry interface." + sampleConfig = ` + ## Path to DPDK telemetry socket. This shall point to v2 version of DPDK telemetry interface. + # socket_path = "/var/run/dpdk/rte/dpdk_telemetry.v2" + + ## Duration that defines how long the connected socket client will wait for a response before terminating connection. + ## This includes both writing to and reading from socket. Since it's local socket access + ## to a fast packet processing application, the timeout should be sufficient for most users. + ## Setting the value to 0 disables the timeout (not recommended) + # socket_access_timeout = "200ms" + + ## Enables telemetry data collection for selected device types. + ## Adding "ethdev" enables collection of telemetry from DPDK NICs (stats, xstats, link_status). + ## Adding "rawdev" enables collection of telemetry from DPDK Raw Devices (xstats). + # device_types = ["ethdev"] + + ## List of custom, application-specific telemetry commands to query + ## The list of available commands depend on the application deployed. Applications can register their own commands + ## via telemetry library API http://doc.dpdk.org/guides/prog_guide/telemetry_lib.html#registering-commands + ## For e.g. L3 Forwarding with Power Management Sample Application this could be: + ## additional_commands = ["/l3fwd-power/stats"] + # additional_commands = [] + + ## Allows turning off collecting data for individual "ethdev" commands. + ## Remove "/ethdev/link_status" from list to start getting link status metrics. + [inputs.dpdk.ethdev] + exclude_commands = ["/ethdev/link_status"] + + ## When running multiple instances of the plugin it's recommended to add a unique tag to each instance to identify + ## metrics exposed by an instance of DPDK application. This is useful when multiple DPDK apps run on a single host. + ## [inputs.dpdk.tags] + ## dpdk_instance = "my-fwd-app" +` + defaultPathToSocket = "/var/run/dpdk/rte/dpdk_telemetry.v2" + defaultAccessTimeout = config.Duration(200 * time.Millisecond) + maxCommandLength = 56 + maxCommandLengthWithParams = 1024 + pluginName = "dpdk" + ethdevListCommand = "/ethdev/list" + rawdevListCommand = "/rawdev/list" +) + +type dpdk struct { + SocketPath string `toml:"socket_path"` + AccessTimeout config.Duration `toml:"socket_access_timeout"` + DeviceTypes []string `toml:"device_types"` + EthdevConfig ethdevConfig `toml:"ethdev"` + AdditionalCommands []string `toml:"additional_commands"` + Log telegraf.Logger `toml:"-"` + + connector *dpdkConnector + rawdevCommands []string + ethdevCommands []string + ethdevExcludedCommandsFilter filter.Filter +} + +type ethdevConfig struct { + EthdevExcludeCommands []string `toml:"exclude_commands"` +} + +func init() { + inputs.Add(pluginName, func() telegraf.Input { + dpdk := &dpdk{ + // Setting it here (rather than in `Init()`) to distinguish between "zero" value, + // default value and don't having value in config at all. + AccessTimeout: defaultAccessTimeout, + } + return dpdk + }) +} + +func (dpdk *dpdk) SampleConfig() string { + return sampleConfig +} + +func (dpdk *dpdk) Description() string { + return description +} + +// Performs validation of all parameters from configuration +func (dpdk *dpdk) Init() error { + if dpdk.SocketPath == "" { + dpdk.SocketPath = defaultPathToSocket + dpdk.Log.Debugf("using default '%v' path for socket_path", defaultPathToSocket) + } + + if dpdk.DeviceTypes == nil { + dpdk.DeviceTypes = []string{"ethdev"} + } + + var err error + if err = isSocket(dpdk.SocketPath); err != nil { + return err + } + + dpdk.rawdevCommands = []string{"/rawdev/xstats"} + dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats", "/ethdev/link_status"} + + if err = dpdk.validateCommands(); err != nil { + return err + } + + if dpdk.AccessTimeout < 0 { + return fmt.Errorf("socket_access_timeout should be positive number or equal to 0 (to disable timeouts)") + } + + if len(dpdk.AdditionalCommands) == 0 && len(dpdk.DeviceTypes) == 0 { + return fmt.Errorf("plugin was configured with nothing to read") + } + + dpdk.ethdevExcludedCommandsFilter, err = filter.Compile(dpdk.EthdevConfig.EthdevExcludeCommands) + if err != nil { + return fmt.Errorf("error occurred during filter prepation for ethdev excluded commands - %v", err) + } + + dpdk.connector = newDpdkConnector(dpdk.SocketPath, dpdk.AccessTimeout) + initMessage, err := dpdk.connector.connect() + if initMessage != nil { + dpdk.Log.Debugf("Successfully connected to %v running as process with PID %v with len %v", + initMessage.Version, initMessage.Pid, initMessage.MaxOutputLen) + } + return err +} + +// Checks that user-supplied commands are unique and match DPDK commands format +func (dpdk *dpdk) validateCommands() error { + dpdk.AdditionalCommands = uniqueValues(dpdk.AdditionalCommands) + + for _, commandWithParams := range dpdk.AdditionalCommands { + if len(commandWithParams) == 0 { + return fmt.Errorf("got empty command") + } + + if commandWithParams[0] != '/' { + return fmt.Errorf("'%v' command should start with '/'", commandWithParams) + } + + if commandWithoutParams := stripParams(commandWithParams); len(commandWithoutParams) >= maxCommandLength { + return fmt.Errorf("'%v' command is too long. It shall be less than %v characters", commandWithoutParams, maxCommandLength) + } + + if len(commandWithParams) >= maxCommandLengthWithParams { + return fmt.Errorf("command with parameters '%v' shall be less than %v characters", commandWithParams, maxCommandLengthWithParams) + } + } + + return nil +} + +// Gathers all unique commands and processes each command sequentially +// Parallel processing could be achieved by running several instances of this plugin with different settings +func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error { + // This needs to be done during every `Gather(...)`, because DPDK can be restarted between consecutive + // `Gather(...)` cycles which can cause that it will be exposing different set of metrics. + commands := dpdk.gatherCommands(acc) + + for _, command := range commands { + dpdk.processCommand(acc, command) + } + + return nil +} + +// Gathers all unique commands +func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator) []string { + var commands []string + if choice.Contains("ethdev", dpdk.DeviceTypes) { + ethdevCommands := removeSubset(dpdk.ethdevCommands, dpdk.ethdevExcludedCommandsFilter) + ethdevCommands, err := dpdk.appendCommandsWithParamsFromList(ethdevListCommand, ethdevCommands) + if err != nil { + acc.AddError(fmt.Errorf("error occurred during fetching of %v params - %v", ethdevListCommand, err)) + } + + commands = append(commands, ethdevCommands...) + } + + if choice.Contains("rawdev", dpdk.DeviceTypes) { + rawdevCommands, err := dpdk.appendCommandsWithParamsFromList(rawdevListCommand, dpdk.rawdevCommands) + if err != nil { + acc.AddError(fmt.Errorf("error occurred during fetching of %v params - %v", rawdevListCommand, err)) + } + + commands = append(commands, rawdevCommands...) + } + + commands = append(commands, dpdk.AdditionalCommands...) + return uniqueValues(commands) +} + +// Fetches all identifiers of devices and then creates all possible combinations of commands for each device +func (dpdk *dpdk) appendCommandsWithParamsFromList(listCommand string, commands []string) ([]string, error) { + response, err := dpdk.connector.getCommandResponse(listCommand) + if err != nil { + return nil, err + } + + params, err := jsonToArray(response, listCommand) + if err != nil { + return nil, err + } + + result := make([]string, 0, len(commands)*len(params)) + for _, command := range commands { + for _, param := range params { + result = append(result, commandWithParams(command, param)) + } + } + + return result, nil +} + +// Executes command, parses response and creates/writes metric from response +func (dpdk *dpdk) processCommand(acc telegraf.Accumulator, commandWithParams string) { + buf, err := dpdk.connector.getCommandResponse(commandWithParams) + if err != nil { + acc.AddError(err) + return + } + + var parsedResponse map[string]interface{} + err = json.Unmarshal(buf, &parsedResponse) + if err != nil { + acc.AddError(fmt.Errorf("failed to unmarshall json response from %v command - %v", commandWithParams, err)) + return + } + + command := stripParams(commandWithParams) + value := parsedResponse[command] + if isEmpty(value) { + acc.AddError(fmt.Errorf("got empty json on '%v' command", commandWithParams)) + return + } + + jf := jsonparser.JSONFlattener{} + err = jf.FullFlattenJSON("", value, true, true) + if err != nil { + acc.AddError(fmt.Errorf("failed to flatten response - %v", err)) + return + } + + acc.AddFields(pluginName, jf.Fields, map[string]string{ + "command": command, + "params": getParams(commandWithParams), + }) +} diff --git a/plugins/inputs/dpdk/dpdk_connector.go b/plugins/inputs/dpdk/dpdk_connector.go new file mode 100644 index 000000000..1129d16d3 --- /dev/null +++ b/plugins/inputs/dpdk/dpdk_connector.go @@ -0,0 +1,162 @@ +// +build linux + +package dpdk + +import ( + "encoding/json" + "fmt" + "net" + "time" + + "github.com/influxdata/telegraf/config" +) + +const maxInitMessageLength = 1024 + +type initMessage struct { + Version string `json:"version"` + Pid int `json:"pid"` + MaxOutputLen uint32 `json:"max_output_len"` +} + +type dpdkConnector struct { + pathToSocket string + maxOutputLen uint32 + messageShowed bool + accessTimeout time.Duration + connection net.Conn +} + +func newDpdkConnector(pathToSocket string, accessTimeout config.Duration) *dpdkConnector { + return &dpdkConnector{ + pathToSocket: pathToSocket, + messageShowed: false, + accessTimeout: time.Duration(accessTimeout), + } +} + +// Connects to the socket +// Since DPDK is a local unix socket, it is instantly returns error or connection, so there's no need to set timeout for it +func (conn *dpdkConnector) connect() (*initMessage, error) { + connection, err := net.Dial("unixpacket", conn.pathToSocket) + if err != nil { + return nil, fmt.Errorf("failed to connect to the socket - %v", err) + } + + conn.connection = connection + result, err := conn.readMaxOutputLen() + if err != nil { + if closeErr := conn.tryClose(); closeErr != nil { + return nil, fmt.Errorf("%v and failed to close connection - %v", err, closeErr) + } + return nil, err + } + + return result, nil +} + +// Executes command using provided connection and returns response +// If error (such as timeout) occurred, then connection is discarded and recreated +// because otherwise behaviour of connection is undefined (e.g. it could return result of timed out command instead of latest) +func (conn *dpdkConnector) getCommandResponse(fullCommand string) ([]byte, error) { + connection, err := conn.getConnection() + if err != nil { + return nil, fmt.Errorf("failed to get connection to execute %v command - %v", fullCommand, err) + } + + err = conn.setTimeout() + if err != nil { + return nil, fmt.Errorf("failed to set timeout for %v command - %v", fullCommand, err) + } + + _, err = connection.Write([]byte(fullCommand)) + if err != nil { + if closeErr := conn.tryClose(); closeErr != nil { + return nil, fmt.Errorf("failed to send '%v' command - %v and failed to close connection - %v", + fullCommand, err, closeErr) + } + return nil, fmt.Errorf("failed to send '%v' command - %v", fullCommand, err) + } + + buf := make([]byte, conn.maxOutputLen) + messageLength, err := connection.Read(buf) + if err != nil { + if closeErr := conn.tryClose(); closeErr != nil { + return nil, fmt.Errorf("failed read response of '%v' command - %v and failed to close connection - %v", + fullCommand, err, closeErr) + } + return nil, fmt.Errorf("failed to read response of '%v' command - %v", fullCommand, err) + } + + if messageLength == 0 { + return nil, fmt.Errorf("got empty response during execution of '%v' command", fullCommand) + } + return buf[:messageLength], nil +} + +func (conn *dpdkConnector) tryClose() error { + if conn.connection == nil { + return nil + } + + err := conn.connection.Close() + conn.connection = nil + if err != nil { + return err + } + return nil +} + +func (conn *dpdkConnector) setTimeout() error { + if conn.connection == nil { + return fmt.Errorf("connection had not been established before") + } + + if conn.accessTimeout == 0 { + return conn.connection.SetDeadline(time.Time{}) + } + return conn.connection.SetDeadline(time.Now().Add(conn.accessTimeout)) +} + +// Returns connections, if connection is not created then function tries to recreate it +func (conn *dpdkConnector) getConnection() (net.Conn, error) { + if conn.connection == nil { + _, err := conn.connect() + if err != nil { + return nil, err + } + } + return conn.connection, nil +} + +// Reads InitMessage for connection. Should be read for each connection, otherwise InitMessage is returned as response for first command. +func (conn *dpdkConnector) readMaxOutputLen() (*initMessage, error) { + buf := make([]byte, maxInitMessageLength) + err := conn.setTimeout() + if err != nil { + return nil, fmt.Errorf("failed to set timeout - %v", err) + } + + messageLength, err := conn.connection.Read(buf) + if err != nil { + return nil, fmt.Errorf("failed to read InitMessage - %v", err) + } + + var initMessage initMessage + err = json.Unmarshal(buf[:messageLength], &initMessage) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal response - %v", err) + } + + if initMessage.MaxOutputLen == 0 { + return nil, fmt.Errorf("failed to read maxOutputLen information") + } + + if !conn.messageShowed { + conn.maxOutputLen = initMessage.MaxOutputLen + conn.messageShowed = true + return &initMessage, nil + } + + return nil, nil +} diff --git a/plugins/inputs/dpdk/dpdk_connector_test.go b/plugins/inputs/dpdk/dpdk_connector_test.go new file mode 100644 index 000000000..a32296497 --- /dev/null +++ b/plugins/inputs/dpdk/dpdk_connector_test.go @@ -0,0 +1,182 @@ +// +build linux + +package dpdk + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/plugins/inputs/dpdk/mocks" +) + +func Test_readMaxOutputLen(t *testing.T) { + t.Run("should return error if timeout occurred", func(t *testing.T) { + conn := &mocks.Conn{} + conn.On("Read", mock.Anything).Return(0, fmt.Errorf("timeout")) + conn.On("SetDeadline", mock.Anything).Return(nil) + connector := dpdkConnector{connection: conn} + + _, err := connector.readMaxOutputLen() + + require.Error(t, err) + require.Contains(t, err.Error(), "timeout") + }) + + t.Run("should pass and set maxOutputLen if provided with valid InitMessage", func(t *testing.T) { + maxOutputLen := uint32(4567) + initMessage := initMessage{ + Version: "DPDK test version", + Pid: 1234, + MaxOutputLen: maxOutputLen, + } + message, err := json.Marshal(initMessage) + require.NoError(t, err) + conn := &mocks.Conn{} + conn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, message) + }).Return(len(message), nil) + conn.On("SetDeadline", mock.Anything).Return(nil) + connector := dpdkConnector{connection: conn} + + _, err = connector.readMaxOutputLen() + + require.NoError(t, err) + require.Equal(t, maxOutputLen, connector.maxOutputLen) + }) + + t.Run("should fail if received invalid json", func(t *testing.T) { + message := `{notAJson}` + conn := &mocks.Conn{} + conn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, message) + }).Return(len(message), nil) + conn.On("SetDeadline", mock.Anything).Return(nil) + connector := dpdkConnector{connection: conn} + + _, err := connector.readMaxOutputLen() + + require.Error(t, err) + require.Contains(t, err.Error(), "looking for beginning of object key string") + }) + + t.Run("should fail if received maxOutputLen equals to 0", func(t *testing.T) { + message, err := json.Marshal(initMessage{ + Version: "test", + Pid: 1, + MaxOutputLen: 0, + }) + require.NoError(t, err) + conn := &mocks.Conn{} + conn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, message) + }).Return(len(message), nil) + conn.On("SetDeadline", mock.Anything).Return(nil) + connector := dpdkConnector{connection: conn} + + _, err = connector.readMaxOutputLen() + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to read maxOutputLen information") + }) +} + +func Test_connect(t *testing.T) { + t.Run("should pass if PathToSocket points to socket", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t) + defer socket.Close() + dpdk := dpdk{ + SocketPath: pathToSocket, + connector: newDpdkConnector(pathToSocket, 0), + } + go simulateSocketResponse(socket, t) + + _, err := dpdk.connector.connect() + + require.NoError(t, err) + }) +} + +func Test_getCommandResponse(t *testing.T) { + command := "/" + response := "myResponseString" + + t.Run("should return proper buffer size and value if no error occurred", func(t *testing.T) { + mockConn, dpdk, _ := prepareEnvironment() + defer mockConn.AssertExpectations(t) + simulateResponse(mockConn, response, nil) + + buf, err := dpdk.connector.getCommandResponse(command) + + require.NoError(t, err) + require.Equal(t, len(response), len(buf)) + require.Equal(t, response, string(buf)) + }) + + t.Run("should return error if failed to get connection handler", func(t *testing.T) { + _, dpdk, _ := prepareEnvironment() + dpdk.connector.connection = nil + + buf, err := dpdk.connector.getCommandResponse(command) + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to get connection to execute / command") + require.Equal(t, 0, len(buf)) + }) + + t.Run("should return error if failed to set timeout duration", func(t *testing.T) { + mockConn, dpdk, _ := prepareEnvironment() + defer mockConn.AssertExpectations(t) + mockConn.On("SetDeadline", mock.Anything).Return(fmt.Errorf("deadline error")) + + buf, err := dpdk.connector.getCommandResponse(command) + + require.Error(t, err) + require.Contains(t, err.Error(), "deadline error") + require.Equal(t, 0, len(buf)) + }) + + t.Run("should return error if timeout occurred during Write operation", func(t *testing.T) { + mockConn, dpdk, _ := prepareEnvironment() + defer mockConn.AssertExpectations(t) + mockConn.On("Write", mock.Anything).Return(0, fmt.Errorf("write timeout")) + mockConn.On("SetDeadline", mock.Anything).Return(nil) + mockConn.On("Close").Return(nil) + + buf, err := dpdk.connector.getCommandResponse(command) + + require.Error(t, err) + require.Contains(t, err.Error(), "write timeout") + require.Equal(t, 0, len(buf)) + }) + + t.Run("should return error if timeout occurred during Read operation", func(t *testing.T) { + mockConn, dpdk, _ := prepareEnvironment() + defer mockConn.AssertExpectations(t) + simulateResponse(mockConn, "", fmt.Errorf("read timeout")) + + buf, err := dpdk.connector.getCommandResponse(command) + + require.Error(t, err) + require.Contains(t, err.Error(), "read timeout") + require.Equal(t, 0, len(buf)) + }) + + t.Run("should return error if got empty response", func(t *testing.T) { + mockConn, dpdk, _ := prepareEnvironment() + defer mockConn.AssertExpectations(t) + simulateResponse(mockConn, "", nil) + + buf, err := dpdk.connector.getCommandResponse(command) + + require.Error(t, err) + require.Equal(t, 0, len(buf)) + require.Contains(t, err.Error(), "got empty response during execution of") + }) +} diff --git a/plugins/inputs/dpdk/dpdk_notlinux.go b/plugins/inputs/dpdk/dpdk_notlinux.go new file mode 100644 index 000000000..a86625ff5 --- /dev/null +++ b/plugins/inputs/dpdk/dpdk_notlinux.go @@ -0,0 +1,3 @@ +// +build !linux + +package dpdk diff --git a/plugins/inputs/dpdk/dpdk_test.go b/plugins/inputs/dpdk/dpdk_test.go new file mode 100644 index 000000000..cfee021e9 --- /dev/null +++ b/plugins/inputs/dpdk/dpdk_test.go @@ -0,0 +1,398 @@ +// +build linux + +package dpdk + +import ( + "encoding/json" + "fmt" + "net" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/plugins/inputs/dpdk/mocks" + "github.com/influxdata/telegraf/testutil" +) + +func Test_Init(t *testing.T) { + t.Run("when SocketPath field isn't set then it should be set to default value", func(t *testing.T) { + _, dpdk, _ := prepareEnvironment() + dpdk.SocketPath = "" + require.Equal(t, "", dpdk.SocketPath) + + _ = dpdk.Init() + + require.Equal(t, defaultPathToSocket, dpdk.SocketPath) + }) + + t.Run("when commands are in invalid format (doesn't start with '/') then error should be returned", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t) + defer socket.Close() + dpdk := dpdk{ + SocketPath: pathToSocket, + AdditionalCommands: []string{"invalid"}, + } + + err := dpdk.Init() + + require.Error(t, err) + require.Contains(t, err.Error(), "command should start with '/'") + }) + + t.Run("when all values are valid, then no error should be returned", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t) + defer socket.Close() + dpdk := dpdk{ + SocketPath: pathToSocket, + DeviceTypes: []string{"ethdev"}, + Log: testutil.Logger{}, + } + go simulateSocketResponse(socket, t) + + err := dpdk.Init() + + require.NoError(t, err) + }) + + t.Run("when device_types and additional_commands are empty, then error should be returned", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t) + defer socket.Close() + dpdk := dpdk{ + SocketPath: pathToSocket, + DeviceTypes: []string{}, + AdditionalCommands: []string{}, + Log: testutil.Logger{}, + } + + err := dpdk.Init() + + require.Error(t, err) + require.Contains(t, err.Error(), "plugin was configured with nothing to read") + }) +} + +func Test_validateCommands(t *testing.T) { + t.Run("when validating commands in correct format then no error should be returned", func(t *testing.T) { + dpdk := dpdk{ + AdditionalCommands: []string{"/test", "/help"}, + } + + err := dpdk.validateCommands() + + require.NoError(t, err) + }) + + t.Run("when validating command that doesn't begin with slash then error should be returned", func(t *testing.T) { + dpdk := dpdk{ + AdditionalCommands: []string{ + "/test", "commandWithoutSlash", + }, + } + + err := dpdk.validateCommands() + + require.Error(t, err) + require.Contains(t, err.Error(), "command should start with '/'") + }) + + t.Run("when validating long command (without parameters) then error should be returned", func(t *testing.T) { + dpdk := dpdk{ + AdditionalCommands: []string{ + "/test", "/" + strings.Repeat("a", maxCommandLength), + }, + } + + err := dpdk.validateCommands() + + require.Error(t, err) + require.Contains(t, err.Error(), "command is too long") + }) + + t.Run("when validating long command (with params) then error should be returned", func(t *testing.T) { + dpdk := dpdk{ + AdditionalCommands: []string{ + "/test", "/," + strings.Repeat("a", maxCommandLengthWithParams), + }, + } + + err := dpdk.validateCommands() + + require.Error(t, err) + require.Contains(t, err.Error(), "shall be less than 1024 characters") + }) + + t.Run("when validating empty command then error should be returned", func(t *testing.T) { + dpdk := dpdk{ + AdditionalCommands: []string{ + "/test", "", + }, + } + + err := dpdk.validateCommands() + + require.Error(t, err) + require.Contains(t, err.Error(), "got empty command") + }) + + t.Run("when validating commands with duplicates then duplicates should be removed and no error should be returned", func(t *testing.T) { + dpdk := dpdk{ + AdditionalCommands: []string{ + "/test", "/test", + }, + } + require.Equal(t, 2, len(dpdk.AdditionalCommands)) + + err := dpdk.validateCommands() + + require.Equal(t, 1, len(dpdk.AdditionalCommands)) + require.NoError(t, err) + }) +} + +func Test_dpdkPluginDescriber(t *testing.T) { + dpdk := dpdk{} + t.Run("sampleConfig function should return value from constant", func(t *testing.T) { + require.Equal(t, sampleConfig, dpdk.SampleConfig()) + }) + + t.Run("description function should return value from constant", func(t *testing.T) { + require.Equal(t, description, dpdk.Description()) + }) +} + +func prepareEnvironment() (*mocks.Conn, dpdk, *testutil.Accumulator) { + mockConnection := &mocks.Conn{} + dpdk := dpdk{ + connector: &dpdkConnector{ + connection: mockConnection, + maxOutputLen: 1024, + accessTimeout: 2 * time.Second, + }, + Log: testutil.Logger{}, + } + mockAcc := &testutil.Accumulator{} + return mockConnection, dpdk, mockAcc +} + +func Test_processCommand(t *testing.T) { + t.Run("should pass if received valid response", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := `{"/": ["/", "/eal/app_params", "/eal/params", "/ethdev/link_status"]}` + simulateResponse(mockConn, response, nil) + + dpdk.processCommand(mockAcc, "/") + + require.Equal(t, 0, len(mockAcc.Errors)) + }) + + t.Run("if received a non-JSON object then should return error", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := `notAJson` + simulateResponse(mockConn, response, nil) + + dpdk.processCommand(mockAcc, "/") + + require.Equal(t, 1, len(mockAcc.Errors)) + require.Contains(t, mockAcc.Errors[0].Error(), "invalid character") + }) + + t.Run("if failed to get command response then accumulator should contain error", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + mockConn.On("Write", mock.Anything).Return(0, fmt.Errorf("deadline exceeded")) + mockConn.On("SetDeadline", mock.Anything).Return(nil) + mockConn.On("Close").Return(nil) + + dpdk.processCommand(mockAcc, "/") + + require.Equal(t, 1, len(mockAcc.Errors)) + require.Contains(t, mockAcc.Errors[0].Error(), "deadline exceeded") + }) + + t.Run("if response contains nil or empty value then error should be returned in accumulator", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := `{"/test": null}` + simulateResponse(mockConn, response, nil) + + dpdk.processCommand(mockAcc, "/test,param") + + require.Equal(t, 1, len(mockAcc.Errors)) + require.Contains(t, mockAcc.Errors[0].Error(), "got empty json on") + }) +} + +func Test_appendCommandsWithParams(t *testing.T) { + t.Run("when got valid data, then valid commands with params should be created", func(t *testing.T) { + mockConn, dpdk, _ := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := `{"/testendpoint": [1,123]}` + simulateResponse(mockConn, response, nil) + expectedCommands := []string{"/action1,1", "/action1,123", "/action2,1", "/action2,123"} + + result, err := dpdk.appendCommandsWithParamsFromList("/testendpoint", []string{"/action1", "/action2"}) + + require.NoError(t, err) + require.Equal(t, 4, len(result)) + require.ElementsMatch(t, result, expectedCommands) + }) +} + +func Test_getCommandsAndParamsCombinations(t *testing.T) { + t.Run("when 2 ethdev commands are enabled, then 2*numberOfIds new commands should be appended", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := fmt.Sprintf(`{"%s": [1, 123]}`, ethdevListCommand) + simulateResponse(mockConn, response, nil) + expectedCommands := []string{"/ethdev/stats,1", "/ethdev/stats,123", "/ethdev/xstats,1", "/ethdev/xstats,123"} + + dpdk.DeviceTypes = []string{"ethdev"} + dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats"} + dpdk.ethdevExcludedCommandsFilter, _ = filter.Compile([]string{}) + dpdk.AdditionalCommands = []string{} + commands := dpdk.gatherCommands(mockAcc) + + require.ElementsMatch(t, commands, expectedCommands) + require.Equal(t, 0, len(mockAcc.Errors)) + }) + + t.Run("when 1 rawdev command is enabled, then 2*numberOfIds new commands should be appended", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := fmt.Sprintf(`{"%s": [1, 123]}`, rawdevListCommand) + simulateResponse(mockConn, response, nil) + expectedCommands := []string{"/rawdev/xstats,1", "/rawdev/xstats,123"} + + dpdk.DeviceTypes = []string{"rawdev"} + dpdk.rawdevCommands = []string{"/rawdev/xstats"} + dpdk.AdditionalCommands = []string{} + commands := dpdk.gatherCommands(mockAcc) + + require.ElementsMatch(t, commands, expectedCommands) + require.Equal(t, 0, len(mockAcc.Errors)) + }) + + t.Run("when 2 ethdev commands are enabled but one command is disabled, then numberOfIds new commands should be appended", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + response := fmt.Sprintf(`{"%s": [1, 123]}`, ethdevListCommand) + simulateResponse(mockConn, response, nil) + expectedCommands := []string{"/ethdev/stats,1", "/ethdev/stats,123"} + + dpdk.DeviceTypes = []string{"ethdev"} + dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats"} + dpdk.ethdevExcludedCommandsFilter, _ = filter.Compile([]string{"/ethdev/xstats"}) + dpdk.AdditionalCommands = []string{} + commands := dpdk.gatherCommands(mockAcc) + + require.ElementsMatch(t, commands, expectedCommands) + require.Equal(t, 0, len(mockAcc.Errors)) + }) + + t.Run("when ethdev commands are enabled but params fetching command returns error then error should be logged in accumulator", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + simulateResponse(mockConn, `{notAJson}`, fmt.Errorf("some error")) + + dpdk.DeviceTypes = []string{"ethdev"} + dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats"} + dpdk.ethdevExcludedCommandsFilter, _ = filter.Compile([]string{}) + dpdk.AdditionalCommands = []string{} + commands := dpdk.gatherCommands(mockAcc) + + require.Equal(t, 0, len(commands)) + require.Equal(t, 1, len(mockAcc.Errors)) + }) +} + +func Test_Gather(t *testing.T) { + t.Run("When parsing a plain json without nested object, then its key should be equal to \"\"", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + dpdk.AdditionalCommands = []string{"/endpoint1"} + simulateResponse(mockConn, `{"/endpoint1":"myvalue"}`, nil) + + err := dpdk.Gather(mockAcc) + + require.NoError(t, err) + require.Equal(t, 0, len(mockAcc.Errors)) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": "/endpoint1", + "params": "", + }, + map[string]interface{}{ + "": "myvalue", + }, + time.Unix(0, 0), + ), + } + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) + + t.Run("When parsing a list of value in nested object then list should be flattened", func(t *testing.T) { + mockConn, dpdk, mockAcc := prepareEnvironment() + defer mockConn.AssertExpectations(t) + dpdk.AdditionalCommands = []string{"/endpoint1"} + simulateResponse(mockConn, `{"/endpoint1":{"myvalue":[0,1,123]}}`, nil) + + err := dpdk.Gather(mockAcc) + require.NoError(t, err) + require.Equal(t, 0, len(mockAcc.Errors)) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "dpdk", + map[string]string{ + "command": "/endpoint1", + "params": "", + }, + map[string]interface{}{ + "myvalue_0": float64(0), + "myvalue_1": float64(1), + "myvalue_2": float64(123), + }, + time.Unix(0, 0), + ), + } + + actual := mockAcc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) +} + +func simulateResponse(mockConn *mocks.Conn, response string, readErr error) { + mockConn.On("Write", mock.Anything).Return(0, nil) + mockConn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, response) + }).Return(len(response), readErr) + mockConn.On("SetDeadline", mock.Anything).Return(nil) + + if readErr != nil { + mockConn.On("Close").Return(nil) + } +} + +func simulateSocketResponse(socket net.Listener, t *testing.T) { + conn, err := socket.Accept() + require.NoError(t, err) + + initMessage, err := json.Marshal(initMessage{MaxOutputLen: 1}) + require.NoError(t, err) + + _, err = conn.Write(initMessage) + require.NoError(t, err) +} diff --git a/plugins/inputs/dpdk/dpdk_utils.go b/plugins/inputs/dpdk/dpdk_utils.go new file mode 100644 index 000000000..962186a42 --- /dev/null +++ b/plugins/inputs/dpdk/dpdk_utils.go @@ -0,0 +1,116 @@ +// +build linux + +package dpdk + +import ( + "encoding/json" + "fmt" + "os" + "reflect" + "strconv" + "strings" + + "github.com/influxdata/telegraf/filter" +) + +func commandWithParams(command string, params string) string { + if params != "" { + return command + "," + params + } + return command +} + +func stripParams(command string) string { + index := strings.IndexRune(command, ',') + if index == -1 { + return command + } + return command[:index] +} + +// Since DPDK is an open-source project, developers can use their own format of params +// so it could "/command,1,3,5,123" or "/command,userId=1, count=1234". +// To avoid issues with different formats of params, all params are returned as single string +func getParams(command string) string { + index := strings.IndexRune(command, ',') + if index == -1 { + return "" + } + return command[index+1:] +} + +// Checks if provided path points to socket +func isSocket(path string) error { + pathInfo, err := os.Lstat(path) + if os.IsNotExist(err) { + return fmt.Errorf("provided path does not exist: '%v'", path) + } + + if err != nil { + return fmt.Errorf("cannot get system information of '%v' file: %v", path, err) + } + + if pathInfo.Mode()&os.ModeSocket != os.ModeSocket { + return fmt.Errorf("provided path does not point to a socket file: '%v'", path) + } + + return nil +} + +// Converts JSON array containing devices identifiers from DPDK response to string slice +func jsonToArray(input []byte, command string) ([]string, error) { + if len(input) == 0 { + return nil, fmt.Errorf("got empty object instead of json") + } + + var rawMessage map[string]json.RawMessage + err := json.Unmarshal(input, &rawMessage) + if err != nil { + return nil, err + } + + var intArray []int64 + var stringArray []string + err = json.Unmarshal(rawMessage[command], &intArray) + if err != nil { + return nil, fmt.Errorf("failed to unmarshall json response - %v", err) + } + + for _, value := range intArray { + stringArray = append(stringArray, strconv.FormatInt(value, 10)) + } + + return stringArray, nil +} + +func removeSubset(elements []string, excludedFilter filter.Filter) []string { + if excludedFilter == nil { + return elements + } + + var result []string + for _, element := range elements { + if !excludedFilter.Match(element) { + result = append(result, element) + } + } + + return result +} + +func uniqueValues(values []string) []string { + in := make(map[string]bool) + result := make([]string, 0, len(values)) + + for _, value := range values { + if !in[value] { + in[value] = true + result = append(result, value) + } + } + return result +} + +func isEmpty(value interface{}) bool { + return value == nil || (reflect.ValueOf(value).Kind() == reflect.Ptr && reflect.ValueOf(value).IsNil()) +} diff --git a/plugins/inputs/dpdk/dpdk_utils_test.go b/plugins/inputs/dpdk/dpdk_utils_test.go new file mode 100644 index 000000000..6697e9ab3 --- /dev/null +++ b/plugins/inputs/dpdk/dpdk_utils_test.go @@ -0,0 +1,137 @@ +// +build linux + +package dpdk + +import ( + "fmt" + "net" + "os" + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_isSocket(t *testing.T) { + t.Run("when path points to non-existing file then error should be returned", func(t *testing.T) { + err := isSocket("/tmp/file-that-doesnt-exists") + + require.Error(t, err) + require.Contains(t, err.Error(), "provided path does not exist") + }) + + t.Run("should pass if path points to socket", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t) + defer socket.Close() + + err := isSocket(pathToSocket) + + require.NoError(t, err) + }) + + t.Run("if path points to regular file instead of socket then error should be returned", func(t *testing.T) { + pathToFile := "/tmp/dpdk-text-file.txt" + _, err := os.Create(pathToFile) + require.NoError(t, err) + defer os.Remove(pathToFile) + + err = isSocket(pathToFile) + + require.Error(t, err) + require.Contains(t, err.Error(), "provided path does not point to a socket file") + }) +} + +func Test_stripParams(t *testing.T) { + command := "/mycommand" + params := "myParams" + t.Run("when passed string without params then passed string should be returned", func(t *testing.T) { + strippedCommand := stripParams(command) + + require.Equal(t, command, strippedCommand) + }) + + t.Run("when passed string with params then string without params should be returned", func(t *testing.T) { + strippedCommand := stripParams(commandWithParams(command, params)) + + require.Equal(t, command, strippedCommand) + }) +} + +func Test_commandWithParams(t *testing.T) { + command := "/mycommand" + params := "myParams" + t.Run("when passed string with params then command with comma should be returned", func(t *testing.T) { + commandWithParams := commandWithParams(command, params) + + require.Equal(t, command+","+params, commandWithParams) + }) + + t.Run("when passed command with no params then command should be returned", func(t *testing.T) { + commandWithParams := commandWithParams(command, "") + + require.Equal(t, command, commandWithParams) + }) +} + +func Test_getParams(t *testing.T) { + command := "/mycommand" + params := "myParams" + t.Run("when passed string with params then command with comma should be returned", func(t *testing.T) { + commandParams := getParams(commandWithParams(command, params)) + + require.Equal(t, params, commandParams) + }) + + t.Run("when passed command with no params then empty string (representing empty params) should be returned", func(t *testing.T) { + commandParams := getParams(commandWithParams(command, "")) + + require.Equal(t, "", commandParams) + }) +} + +func Test_jsonToArray(t *testing.T) { + key := "/ethdev/list" + t.Run("when got numeric array then string array should be returned", func(t *testing.T) { + firstValue := int64(0) + secondValue := int64(1) + jsonString := fmt.Sprintf(`{"%s": [%d, %d]}`, key, firstValue, secondValue) + + arr, err := jsonToArray([]byte(jsonString), key) + + require.NoError(t, err) + require.Equal(t, strconv.FormatInt(firstValue, 10), arr[0]) + require.Equal(t, strconv.FormatInt(secondValue, 10), arr[1]) + }) + + t.Run("if non-json string is supplied as input then error should be returned", func(t *testing.T) { + _, err := jsonToArray([]byte("{notAJson}"), key) + + require.Error(t, err) + }) + + t.Run("when empty string is supplied as input then error should be returned", func(t *testing.T) { + jsonString := "" + + _, err := jsonToArray([]byte(jsonString), key) + + require.Error(t, err) + require.Contains(t, err.Error(), "got empty object instead of json") + }) + + t.Run("when valid json with json-object is supplied as input then error should be returned", func(t *testing.T) { + jsonString := fmt.Sprintf(`{"%s": {"testKey": "testValue"}}`, key) + + _, err := jsonToArray([]byte(jsonString), key) + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to unmarshall json response") + }) +} + +func createSocketForTest(t *testing.T) (string, net.Listener) { + pathToSocket := "/tmp/dpdk-test-socket" + socket, err := net.Listen("unixpacket", pathToSocket) + require.NoError(t, err) + return pathToSocket, socket +} diff --git a/plugins/inputs/dpdk/mocks/conn.go b/plugins/inputs/dpdk/mocks/conn.go new file mode 100644 index 000000000..58961039d --- /dev/null +++ b/plugins/inputs/dpdk/mocks/conn.go @@ -0,0 +1,146 @@ +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. + +package mocks + +import ( + net "net" + + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// Conn is an autogenerated mock type for the Conn type +type Conn struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *Conn) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// LocalAddr provides a mock function with given fields: +func (_m *Conn) LocalAddr() net.Addr { + ret := _m.Called() + + var r0 net.Addr + if rf, ok := ret.Get(0).(func() net.Addr); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(net.Addr) + } + } + + return r0 +} + +// Read provides a mock function with given fields: b +func (_m *Conn) Read(b []byte) (int, error) { + ret := _m.Called(b) + + var r0 int + if rf, ok := ret.Get(0).(func([]byte) int); ok { + r0 = rf(b) + } else { + r0 = ret.Get(0).(int) + } + + var r1 error + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(b) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RemoteAddr provides a mock function with given fields: +func (_m *Conn) RemoteAddr() net.Addr { + ret := _m.Called() + + var r0 net.Addr + if rf, ok := ret.Get(0).(func() net.Addr); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(net.Addr) + } + } + + return r0 +} + +// SetDeadline provides a mock function with given fields: t +func (_m *Conn) SetDeadline(t time.Time) error { + ret := _m.Called(t) + + var r0 error + if rf, ok := ret.Get(0).(func(time.Time) error); ok { + r0 = rf(t) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetReadDeadline provides a mock function with given fields: t +func (_m *Conn) SetReadDeadline(t time.Time) error { + ret := _m.Called(t) + + var r0 error + if rf, ok := ret.Get(0).(func(time.Time) error); ok { + r0 = rf(t) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetWriteDeadline provides a mock function with given fields: t +func (_m *Conn) SetWriteDeadline(t time.Time) error { + ret := _m.Called(t) + + var r0 error + if rf, ok := ret.Get(0).(func(time.Time) error); ok { + r0 = rf(t) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Write provides a mock function with given fields: b +func (_m *Conn) Write(b []byte) (int, error) { + ret := _m.Called(b) + + var r0 int + if rf, ok := ret.Get(0).(func([]byte) int); ok { + r0 = rf(b) + } else { + r0 = ret.Get(0).(int) + } + + var r1 error + if rf, ok := ret.Get(1).(func([]byte) error); ok { + r1 = rf(b) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +}