feat: New Intel DLB input plugin (#11824)

This commit is contained in:
bkotlowski 2022-10-17 21:03:48 +02:00 committed by GitHub
parent 61611e8ede
commit f44e0d148c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1830 additions and 0 deletions

View File

@ -0,0 +1,5 @@
//go:build !custom || inputs || inputs.intel_dlb
package all
import _ "github.com/influxdata/telegraf/plugins/inputs/intel_dlb" // register plugin

View File

@ -0,0 +1,97 @@
# Intel® Dynamic Load Balancer (Intel® DLB) Input Plugin
The `Intel DLB` plugin collects metrics exposed by applications built with
[Data Plane Development Kit](https://www.dpdk.org/) which is an extensive
set of open source libraries designed for accelerating packet processing
workloads, plugin is also using bifurcated driver. More specifically it's
targeted for applications that use Intel DLB as eventdev devices accessed
via bifurcated driver (allowing access from kernel and user-space).
## Metrics
There are two sources of metrics:
- DPDK-based app for detailed eventdev metrics per device, per port and per queue
- Sysfs entries from kernel driver for RAS metrics
## About Intel® Dynamic Load Balancer (Intel® DLB)
The Intel® Dynamic Load Balancer (Intel® DLB) is a PCIe device that provides
load-balanced, prioritized scheduling of events (that is, packets) across
CPU cores enabling efficient core-to-core communication. It is a hardware
accelerator located inside the latest Intel® Xeon® devices offered by Intel.
It supports the event-driven programming model of DPDK's Event Device Library.
This library is used in packet processing pipelines for multi-core scalability,
dynamic load-balancing, and variety of packet distribution and synchronization
schemes.
## About DPDK Event Device Library
The DPDK Event device library is an abstraction that provides the application
with features to schedule events. The eventdev framework introduces the event
driven programming model. In a polling model, lcores poll ethdev ports and
associated Rx queues directly to look for a packet. By contrast in an event
driven model, lcores call the scheduler that selects packets for them based on
programmer-specified criteria. The Eventdev library adds support for an event
driven programming model, which offers applications automatic multicore scaling,
dynamic load balancing, pipelining, packet ingress order maintenance and
synchronization services to simplify application packet processing.
By introducing an event driven programming model, DPDK can support
both polling and event driven programming models for packet processing,
and applications are free to choose whatever model (or combination of the two)
best suits their needs.
## Prerequisites
- [DLB >= v7.4](https://www.intel.com/content/www/us/en/download/686372/intel-dynamic-load-balancer.html)
- [DPDK >= 20.11.3](http://core.dpdk.org/download/)
- Linux kernel >= 5.12
> **NOTE:** It may happen that sysfs entries or the socket telemetry interface
> exposed by DPDK-based app will require root access. This means that either
> access permissions have to be adjusted for sysfs / socket telemetry
> interface to allow Telegraf to access it, or Telegraf should run with root
> privileges.
## Configuration
```toml @sample.conf
## Reads metrics from DPDK using v2 telemetry interface.
[[inputs.intel_dlb]]
## Path to DPDK telemetry socket.
# socket_path = "/var/run/dpdk/rte/dpdk_telemetry.v2"
## Default eventdev command list, it gathers metrics from socket by given commands.
## Supported options:
## "/eventdev/dev_xstats", "/eventdev/port_xstats",
## "/eventdev/queue_xstats", "/eventdev/queue_links"
# eventdev_commands = ["/eventdev/dev_xstats", "/eventdev/port_xstats", "/eventdev/queue_xstats", "/eventdev/queue_links"]
## Detect DLB devices based on device id.
## Currently, only supported and tested device id is `0x2710`.
## Configuration added to support forward compatibility.
# dlb_device_types = ["0x2710"]
## Specifies plugin behavior regarding unreachable socket (which might not have been initialized yet).
## Available choices:
## - error: Telegraf will return an error on startup if socket is unreachable
## - ignore: Telegraf will ignore error regarding unreachable socket on both startup and gather
# unreachable_socket_behavior = "error"
```
Default configuration allows getting metrics for all metrics
reported via `/eventdev/` command:
- `/eventdev/dev_xstats`
- `/eventdev/port_xstats`
- `/eventdev/queue_xstats`
- `/eventdev/queue_links`
## Example Output
```text
intel_dlb,command=/eventdev/dev_xstats\,0,host=controller1 dev_dir_pool_size=0i,dev_inflight_events=8192i,dev_ldb_pool_size=8192i,dev_nb_events_limit=8192i,dev_pool_size=0i,dev_rx_drop=0i,dev_rx_interrupt_wait=0i,dev_rx_ok=463126660i,dev_rx_umonitor_umwait=0i,dev_total_polls=78422946i,dev_tx_nospc_dir_hw_credits=0i,dev_tx_nospc_hw_credits=584614i,dev_tx_nospc_inflight_credits=0i,dev_tx_nospc_inflight_max=0i,dev_tx_nospc_ldb_hw_credits=584614i,dev_tx_nospc_new_event_limit=59331982i,dev_tx_ok=694694059i,dev_zero_polls=29667908i 1641996791000000000
intel_dlb,command=/eventdev/queue_links\,0\,1,host=controller1 qid_0=128i,qid_1=128i 1641996791000000000
intel_dlb_ras,device=pci0000:6d,host=controller1,metric_file=aer_dev_correctable BadDLLP=0i,BadTLP=0i,CorrIntErr=0i,HeaderOF=0i,NonFatalErr=0i,Rollover=0i,RxErr=0i,TOTAL_ERR_COR=0i,Timeout=0i 1641996791000000000
intel_dlb_ras,device=pci0000:6d,host=controller1,metric_file=aer_dev_fatal ACSViol=0i,AtomicOpBlocked=0i,BlockedTLP=0i,CmpltAbrt=0i,CmpltTO=0i,DLP=0i,ECRC=0i,FCP=0i,MalfTLP=0i,PoisonTLPBlocked=0i,RxOF=0i,SDES=0i,TLP=0i,TLPBlockedErr=0i,TOTAL_ERR_FATAL=0i,UncorrIntErr=0i,Undefined=0i,UnsupReq=0i,UnxCmplt=0i 1641996791000000000
```

View File

@ -0,0 +1,476 @@
//go:generate ../../../tools/readme_config_includer/generator
//go:build linux
// +build linux
package intel_dlb
import (
_ "embed"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
var unreachableSocketBehaviors = []string{"error", "ignore"}
type IntelDLB struct {
SocketPath string `toml:"socket_path"`
EventdevCommands []string `toml:"eventdev_commands"`
DLBDeviceIDs []string `toml:"dlb_device_types"`
UnreachableSocketBehavior string `toml:"unreachable_socket_behavior"`
Log telegraf.Logger `toml:"-"`
connection net.Conn
devicesDir []string
rasReader rasReader
maxInitMessageLength uint32
}
const (
defaultSocketPath = "/var/run/dpdk/rte/dpdk_telemetry.v2"
pluginName = "intel_dlb"
eventdevListCommand = "/eventdev/dev_list"
dlbDeviceIDLocation = "/sys/devices/*/*/device"
aerCorrectableFileName = "aer_dev_correctable"
aerFatalFileName = "aer_dev_fatal"
aerNonFatalFileName = "aer_dev_nonfatal"
defaultDLBDevice = "0x2710"
)
// SampleConfig returns sample config
func (d *IntelDLB) SampleConfig() string {
return sampleConfig
}
// Init performs validation of all parameters from configuration.
func (d *IntelDLB) Init() error {
var err error
if d.UnreachableSocketBehavior == "" {
d.UnreachableSocketBehavior = "error"
}
if err = choice.Check(d.UnreachableSocketBehavior, unreachableSocketBehaviors); err != nil {
return fmt.Errorf("unreachable_socket_behavior: %w", err)
}
if d.SocketPath == "" {
d.SocketPath = defaultSocketPath
d.Log.Debugf("Using default '%v' path for socket_path", defaultSocketPath)
}
err = checkSocketPath(d.SocketPath)
if err != nil {
if d.UnreachableSocketBehavior == "error" {
return err
}
d.Log.Warn(err)
}
if len(d.EventdevCommands) == 0 {
eventdevDefaultCommands := []string{"/eventdev/dev_xstats", "/eventdev/port_xstats", "/eventdev/queue_xstats", "/eventdev/queue_links"}
d.EventdevCommands = eventdevDefaultCommands
d.Log.Debugf("Using default eventdev commands '%v'", eventdevDefaultCommands)
}
if err = validateEventdevCommands(d.EventdevCommands); err != nil {
return err
}
if len(d.DLBDeviceIDs) == 0 {
d.DLBDeviceIDs = []string{defaultDLBDevice}
d.Log.Debugf("Using default DLB Device ID '%v'", defaultDLBDevice)
}
err = d.checkAndAddDLBDevice()
if err != nil {
return err
}
d.maxInitMessageLength = 1024
return nil
}
// Gather all unique commands and process each command sequentially.
func (d *IntelDLB) Gather(acc telegraf.Accumulator) error {
err := d.gatherMetricsFromSocket(acc)
if err != nil {
socketErr := fmt.Errorf("gathering metrics from socket by given commands failed: %v", err)
if d.UnreachableSocketBehavior == "error" {
return socketErr
}
d.Log.Debug(socketErr)
}
err = d.gatherRasMetrics(acc)
if err != nil {
return fmt.Errorf("gathering RAS metrics failed: %v", err)
}
return nil
}
func (d *IntelDLB) gatherRasMetrics(acc telegraf.Accumulator) error {
for _, devicePath := range d.devicesDir {
rasTags := map[string]string{
"device": filepath.Base(filepath.Dir(devicePath)),
}
aerFilesName := []string{aerCorrectableFileName, aerFatalFileName, aerNonFatalFileName}
for _, fileName := range aerFilesName {
rasTags["metric_file"] = fileName
rasMetrics, err := d.readRasMetrics(devicePath, fileName)
if err != nil {
return err
}
acc.AddFields("intel_dlb_ras", rasMetrics, rasTags)
}
}
return nil
}
func (d *IntelDLB) readRasMetrics(devicePath, metricPath string) (map[string]interface{}, error) {
deviceMetricPath := filepath.Join(devicePath, metricPath)
data, err := d.rasReader.readFromFile(deviceMetricPath)
if err != nil {
return nil, err
}
metrics := strings.Split(strings.TrimSpace(string(data)), "\n")
rasMetric := make(map[string]interface{})
for _, metric := range metrics {
metricPart := strings.Split(metric, " ")
if len(metricPart) < 2 {
return nil, fmt.Errorf("error occurred: no value to parse - %+q", metricPart)
}
metricVal, err := strconv.ParseUint(metricPart[1], 10, 10)
if err != nil {
return nil, fmt.Errorf("error occurred: failed to parse value '%s': '%s'", metricPart[1], err)
}
rasMetric[metricPart[0]] = metricVal
}
return rasMetric, nil
}
func (d *IntelDLB) gatherMetricsFromSocket(acc telegraf.Accumulator) error {
// Get device indexes and those indexes to available commands
commandsWithIndex, err := d.gatherCommandsWithDeviceIndex()
if err != nil {
return err
}
for _, command := range commandsWithIndex {
// Write message to socket, e.g.: "/eventdev/dev_xstats,0", then process result and parse it to variable.
var parsedDeviceXstats map[string]map[string]int
err := d.gatherCommandsResult(command, &parsedDeviceXstats)
if err != nil {
return err
}
var statsWithValue = make(map[string]interface{})
for _, commandBody := range parsedDeviceXstats {
for metricName, metricValue := range commandBody {
statsWithValue[metricName] = metricValue
}
}
var tags = map[string]string{
"command": command,
}
acc.AddFields(pluginName, statsWithValue, tags)
}
return nil
}
func (d *IntelDLB) gatherCommandsWithDeviceIndex() ([]string, error) {
// Parse message from JSON format to map e.g.: key = "/eventdev/dev_list", and value = [0, 1]
var parsedDeviceIndexes map[string][]int
err := d.gatherCommandsResult(eventdevListCommand, &parsedDeviceIndexes)
if err != nil {
return nil, err
}
var commandsWithIndex []string
for _, deviceIndexes := range parsedDeviceIndexes {
for _, index := range deviceIndexes {
for _, command := range d.EventdevCommands {
if !strings.Contains(command, "dev_") {
secondDeviceIndexes, err := d.gatherSecondDeviceIndex(index, command)
if err != nil {
return nil, err
}
commandsWithIndex = append(commandsWithIndex, secondDeviceIndexes...)
} else {
// Append to "/eventdev/dev_xstats," device index eg.: "/eventdev/dev_xstats" + "," + "0"
commandWithIndex := fmt.Sprintf("%s,%d", command, index)
commandsWithIndex = append(commandsWithIndex, commandWithIndex)
}
}
}
}
return commandsWithIndex, nil
}
func (d *IntelDLB) gatherCommandsResult(command string, deviceToParse interface{}) error {
err := d.ensureConnected()
if err != nil {
return err
}
replyMsgLen, socketReply, err := d.writeReadSocketMessage(command)
if err != nil {
return err
}
err = d.parseJSON(replyMsgLen, socketReply, &deviceToParse)
if err != nil {
return err
}
return nil
}
func (d *IntelDLB) gatherSecondDeviceIndex(index int, command string) ([]string, error) {
eventdevListWithSecondIndex := []string{"/eventdev/port_list", "/eventdev/queue_list"}
var commandsWithIndex []string
for _, commandToGatherSecondIndex := range eventdevListWithSecondIndex {
// get command type e.g.: "port_xstat" gives "port"
commandType := strings.Split(command, "_")
if len(commandType) != 2 {
return nil, d.closeSocketAndThrowError("custom", fmt.Errorf("cannot split command - %s", commandType))
}
if strings.Contains(commandToGatherSecondIndex, commandType[0]) {
var parsedDeviceSecondIndexes map[string][]int
commandToGatherWithIndex := fmt.Sprintf("%s,%d", commandToGatherSecondIndex, index)
err := d.gatherCommandsResult(commandToGatherWithIndex, &parsedDeviceSecondIndexes)
if err != nil {
return nil, err
}
for _, indexArray := range parsedDeviceSecondIndexes {
for _, secondIndex := range indexArray {
commandWithIndex := fmt.Sprintf("%s,%d,%d", command, index, secondIndex)
commandsWithIndex = append(commandsWithIndex, commandWithIndex)
}
}
}
}
return commandsWithIndex, nil
}
func (d *IntelDLB) ensureConnected() error {
var err error
d.maxInitMessageLength = uint32(1024)
if d.connection == nil {
d.connection, err = net.Dial("unixpacket", d.SocketPath)
if err != nil {
return err
}
err = d.setInitMessageLength()
if err != nil {
return err
}
}
return nil
}
func (d *IntelDLB) setInitMessageLength() error {
type initMessage struct {
Version string `json:"version"`
Pid int `json:"pid"`
MaxOutputLen uint32 `json:"max_output_len"`
}
buf := make([]byte, d.maxInitMessageLength)
messageLength, err := d.connection.Read(buf)
if err != nil {
return d.closeSocketAndThrowError("custom", fmt.Errorf("failed to read InitMessage from socket - %v", err))
}
if messageLength > len(buf) {
return d.closeSocketAndThrowError("custom", fmt.Errorf("socket reply length is bigger than default buffer length"))
}
var initMsg initMessage
err = json.Unmarshal(buf[:messageLength], &initMsg)
if err != nil {
return d.closeSocketAndThrowError("json", err)
}
if initMsg.MaxOutputLen == 0 {
return d.closeSocketAndThrowError("message", err)
}
d.maxInitMessageLength = initMsg.MaxOutputLen
return nil
}
func (d *IntelDLB) writeReadSocketMessage(messageToWrite string) (int, []byte, error) {
_, writeErr := d.connection.Write([]byte(messageToWrite))
if writeErr != nil {
return 0, nil, d.closeSocketAndThrowError("write", writeErr)
}
// Read reply, and obtain length of it.
socketReply := make([]byte, d.maxInitMessageLength)
replyMsgLen, readErr := d.connection.Read(socketReply)
if readErr != nil {
return 0, nil, d.closeSocketAndThrowError("read", readErr)
}
if replyMsgLen == 0 {
return 0, nil, d.closeSocketAndThrowError("message", fmt.Errorf("message length is empty"))
}
return replyMsgLen, socketReply, nil
}
func (d *IntelDLB) parseJSON(replyMsgLen int, socketReply []byte, parsedDeviceInfo interface{}) error {
if len(socketReply) == 0 {
return d.closeSocketAndThrowError("json", fmt.Errorf("socket reply is empty"))
}
if replyMsgLen > len(socketReply) {
return d.closeSocketAndThrowError("json", fmt.Errorf("socket reply length is bigger than it should be"))
}
if replyMsgLen == 0 {
return d.closeSocketAndThrowError("json", fmt.Errorf("socket reply message is empty"))
}
// Assign reply to variable, e.g.: {"/eventdev/dev_list": [0, 1]}
jsonDeviceIndexes := socketReply[:replyMsgLen]
// Parse message from JSON format to map, e.g.: map[string]int. Key = "/eventdev/dev_list" Value = Array[int] {0,1}
jsonParseErr := json.Unmarshal(jsonDeviceIndexes, &parsedDeviceInfo)
if jsonParseErr != nil {
return d.closeSocketAndThrowError("json", jsonParseErr)
}
return nil
}
func (d *IntelDLB) closeSocketAndThrowError(errType string, err error) error {
const (
writeErrMsg = "failed to send command to socket: '%v'"
readErrMsg = "failed to read response of from socket: '%v'"
msgLenErr = "got empty response from socket: '%v'"
jsonParseErr = "failed to parse json: '%v'"
failedConErr = " - and failed to close connection '%v'"
customErr = "error occurred: '%v'"
)
var errMsg string
switch errType {
case "write":
errMsg = writeErrMsg
case "read":
errMsg = readErrMsg
case "message":
errMsg = msgLenErr
case "json":
errMsg = jsonParseErr
case "custom":
errMsg = customErr
}
if d.connection != nil {
closeConnectionErr := d.connection.Close()
d.connection = nil
if closeConnectionErr != nil {
errCloseMsg := errMsg + failedConErr
return fmt.Errorf(errCloseMsg, err, closeConnectionErr)
}
}
return fmt.Errorf(errMsg, err)
}
func (d *IntelDLB) checkAndAddDLBDevice() error {
if d.rasReader == nil {
return fmt.Errorf("rasreader was not initialized")
}
filePaths, err := d.rasReader.gatherPaths(dlbDeviceIDLocation)
if err != nil {
return err
}
deviceIDToDirs := make(map[string][]string)
for _, path := range filePaths {
fileData, err := d.rasReader.readFromFile(path)
if err != nil {
return err
}
// check if it is DLB device
trimmedDeviceID := strings.TrimSpace(string(fileData))
if !choice.Contains(trimmedDeviceID, d.DLBDeviceIDs) {
continue
}
deviceDir := filepath.Dir(path)
deviceIDToDirs[trimmedDeviceID] = append(deviceIDToDirs[trimmedDeviceID], deviceDir)
d.devicesDir = append(d.devicesDir, deviceDir)
}
if len(d.devicesDir) == 0 {
return fmt.Errorf("cannot find any of provided IDs on the system - %+q", d.DLBDeviceIDs)
}
for _, deviceID := range d.DLBDeviceIDs {
if len(deviceIDToDirs[deviceID]) == 0 {
d.Log.Debugf("Device %s was not found on system", deviceID)
}
}
return nil
}
func checkSocketPath(path string) error {
pathInfo, err := os.Lstat(path)
if os.IsNotExist(err) {
return fmt.Errorf("provided path does not exist: '%v'", path)
}
if err != nil {
return fmt.Errorf("cannot get system information of '%v' file: %v", path, err)
}
if pathInfo.Mode()&os.ModeSocket != os.ModeSocket {
return fmt.Errorf("provided path does not point to a socket file: '%v'", path)
}
return nil
}
func validateEventdevCommands(commands []string) error {
eventdevCommandRegex := regexp.MustCompile("^/eventdev/[a-z_]+$")
for _, command := range commands {
if !eventdevCommandRegex.Match([]byte(command)) {
return fmt.Errorf("provided command is not valid - %v", command)
}
}
return nil
}
func init() {
inputs.Add(pluginName, func() telegraf.Input {
return &IntelDLB{
rasReader: rasReaderImpl{},
}
})
}

View File

@ -0,0 +1,4 @@
//go:build !linux
// +build !linux
package intel_dlb

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,71 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
package intel_dlb
import mock "github.com/stretchr/testify/mock"
// mockRasReader is an autogenerated mock type for the rasReader type
type mockRasReader struct {
mock.Mock
}
// gatherPaths provides a mock function with given fields: path
func (_m *mockRasReader) gatherPaths(path string) ([]string, error) {
ret := _m.Called(path)
var r0 []string
if rf, ok := ret.Get(0).(func(string) []string); ok {
r0 = rf(path)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(path)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// readFromFile provides a mock function with given fields: filePath
func (_m *mockRasReader) readFromFile(filePath string) ([]byte, error) {
ret := _m.Called(filePath)
var r0 []byte
if rf, ok := ret.Get(0).(func(string) []byte); ok {
r0 = rf(filePath)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]byte)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(filePath)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
type mockConstructorTestingTnewMockRasReader interface {
mock.TestingT
Cleanup(func())
}
// newMockRasReader creates a new instance of mockRasReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func newMockRasReader(t mockConstructorTestingTnewMockRasReader) *mockRasReader {
mock := &mockRasReader{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -0,0 +1,37 @@
//go:build linux
// +build linux
package intel_dlb
import (
"fmt"
"os"
"path/filepath"
)
type rasReader interface {
gatherPaths(path string) ([]string, error)
readFromFile(filePath string) ([]byte, error)
}
type rasReaderImpl struct {
}
// gatherPaths gathers all paths based on provided pattern
func (rasReaderImpl) gatherPaths(pattern string) ([]string, error) {
filePaths, err := filepath.Glob(pattern)
if err != nil {
return nil, fmt.Errorf("glob failed for pattern: %s: %v", pattern, err)
}
if len(filePaths) == 0 {
return nil, fmt.Errorf("no candidates for given pattern: %s", pattern)
}
return filePaths, nil
}
// readFromFile reads file content.
func (rasReaderImpl) readFromFile(filePath string) ([]byte, error) {
return os.ReadFile(filePath)
}

View File

@ -0,0 +1,21 @@
## Reads metrics from DPDK using v2 telemetry interface.
[[inputs.intel_dlb]]
## Path to DPDK telemetry socket.
# socket_path = "/var/run/dpdk/rte/dpdk_telemetry.v2"
## Default eventdev command list, it gathers metrics from socket by given commands.
## Supported options:
## "/eventdev/dev_xstats", "/eventdev/port_xstats",
## "/eventdev/queue_xstats", "/eventdev/queue_links"
# eventdev_commands = ["/eventdev/dev_xstats", "/eventdev/port_xstats", "/eventdev/queue_xstats", "/eventdev/queue_links"]
## Detect DLB devices based on device id.
## Currently, only supported and tested device id is `0x2710`.
## Configuration added to support forward compatibility.
# dlb_device_types = ["0x2710"]
## Specifies plugin behavior regarding unreachable socket (which might not have been initialized yet).
## Available choices:
## - error: Telegraf will return an error on startup if socket is unreachable
## - ignore: Telegraf will ignore error regarding unreachable socket on both startup and gather
# unreachable_socket_behavior = "error"