diff --git a/plugins/inputs/all/intel_dlb.go b/plugins/inputs/all/intel_dlb.go new file mode 100644 index 000000000..639d4880f --- /dev/null +++ b/plugins/inputs/all/intel_dlb.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.intel_dlb + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/intel_dlb" // register plugin diff --git a/plugins/inputs/intel_dlb/README.md b/plugins/inputs/intel_dlb/README.md new file mode 100644 index 000000000..d825cbe06 --- /dev/null +++ b/plugins/inputs/intel_dlb/README.md @@ -0,0 +1,97 @@ +# Intel® Dynamic Load Balancer (Intel® DLB) Input Plugin + +The `Intel DLB` plugin collects metrics exposed by applications built with +[Data Plane Development Kit](https://www.dpdk.org/) which is an extensive +set of open source libraries designed for accelerating packet processing +workloads, plugin is also using bifurcated driver. More specifically it's +targeted for applications that use Intel DLB as eventdev devices accessed +via bifurcated driver (allowing access from kernel and user-space). + +## Metrics + +There are two sources of metrics: + +- DPDK-based app for detailed eventdev metrics per device, per port and per queue +- Sysfs entries from kernel driver for RAS metrics + +## About Intel® Dynamic Load Balancer (Intel® DLB) + +The Intel® Dynamic Load Balancer (Intel® DLB) is a PCIe device that provides +load-balanced, prioritized scheduling of events (that is, packets) across +CPU cores enabling efficient core-to-core communication. It is a hardware +accelerator located inside the latest Intel® Xeon® devices offered by Intel. +It supports the event-driven programming model of DPDK's Event Device Library. +This library is used in packet processing pipelines for multi-core scalability, +dynamic load-balancing, and variety of packet distribution and synchronization +schemes. + +## About DPDK Event Device Library + +The DPDK Event device library is an abstraction that provides the application +with features to schedule events. The eventdev framework introduces the event +driven programming model. In a polling model, lcores poll ethdev ports and +associated Rx queues directly to look for a packet. By contrast in an event +driven model, lcores call the scheduler that selects packets for them based on +programmer-specified criteria. The Eventdev library adds support for an event +driven programming model, which offers applications automatic multicore scaling, +dynamic load balancing, pipelining, packet ingress order maintenance and +synchronization services to simplify application packet processing. +By introducing an event driven programming model, DPDK can support +both polling and event driven programming models for packet processing, +and applications are free to choose whatever model (or combination of the two) +best suits their needs. + +## Prerequisites + +- [DLB >= v7.4](https://www.intel.com/content/www/us/en/download/686372/intel-dynamic-load-balancer.html) +- [DPDK >= 20.11.3](http://core.dpdk.org/download/) +- Linux kernel >= 5.12 + +> **NOTE:** It may happen that sysfs entries or the socket telemetry interface +> exposed by DPDK-based app will require root access. This means that either +> access permissions have to be adjusted for sysfs / socket telemetry +> interface to allow Telegraf to access it, or Telegraf should run with root +> privileges. + +## Configuration + +```toml @sample.conf +## Reads metrics from DPDK using v2 telemetry interface. +[[inputs.intel_dlb]] + ## Path to DPDK telemetry socket. + # socket_path = "/var/run/dpdk/rte/dpdk_telemetry.v2" + + ## Default eventdev command list, it gathers metrics from socket by given commands. + ## Supported options: + ## "/eventdev/dev_xstats", "/eventdev/port_xstats", + ## "/eventdev/queue_xstats", "/eventdev/queue_links" + # eventdev_commands = ["/eventdev/dev_xstats", "/eventdev/port_xstats", "/eventdev/queue_xstats", "/eventdev/queue_links"] + + ## Detect DLB devices based on device id. + ## Currently, only supported and tested device id is `0x2710`. + ## Configuration added to support forward compatibility. + # dlb_device_types = ["0x2710"] + + ## Specifies plugin behavior regarding unreachable socket (which might not have been initialized yet). + ## Available choices: + ## - error: Telegraf will return an error on startup if socket is unreachable + ## - ignore: Telegraf will ignore error regarding unreachable socket on both startup and gather + # unreachable_socket_behavior = "error" +``` + +Default configuration allows getting metrics for all metrics +reported via `/eventdev/` command: + +- `/eventdev/dev_xstats` +- `/eventdev/port_xstats` +- `/eventdev/queue_xstats` +- `/eventdev/queue_links` + +## Example Output + +```text +intel_dlb,command=/eventdev/dev_xstats\,0,host=controller1 dev_dir_pool_size=0i,dev_inflight_events=8192i,dev_ldb_pool_size=8192i,dev_nb_events_limit=8192i,dev_pool_size=0i,dev_rx_drop=0i,dev_rx_interrupt_wait=0i,dev_rx_ok=463126660i,dev_rx_umonitor_umwait=0i,dev_total_polls=78422946i,dev_tx_nospc_dir_hw_credits=0i,dev_tx_nospc_hw_credits=584614i,dev_tx_nospc_inflight_credits=0i,dev_tx_nospc_inflight_max=0i,dev_tx_nospc_ldb_hw_credits=584614i,dev_tx_nospc_new_event_limit=59331982i,dev_tx_ok=694694059i,dev_zero_polls=29667908i 1641996791000000000 +intel_dlb,command=/eventdev/queue_links\,0\,1,host=controller1 qid_0=128i,qid_1=128i 1641996791000000000 +intel_dlb_ras,device=pci0000:6d,host=controller1,metric_file=aer_dev_correctable BadDLLP=0i,BadTLP=0i,CorrIntErr=0i,HeaderOF=0i,NonFatalErr=0i,Rollover=0i,RxErr=0i,TOTAL_ERR_COR=0i,Timeout=0i 1641996791000000000 +intel_dlb_ras,device=pci0000:6d,host=controller1,metric_file=aer_dev_fatal ACSViol=0i,AtomicOpBlocked=0i,BlockedTLP=0i,CmpltAbrt=0i,CmpltTO=0i,DLP=0i,ECRC=0i,FCP=0i,MalfTLP=0i,PoisonTLPBlocked=0i,RxOF=0i,SDES=0i,TLP=0i,TLPBlockedErr=0i,TOTAL_ERR_FATAL=0i,UncorrIntErr=0i,Undefined=0i,UnsupReq=0i,UnxCmplt=0i 1641996791000000000 +``` diff --git a/plugins/inputs/intel_dlb/intel_dlb.go b/plugins/inputs/intel_dlb/intel_dlb.go new file mode 100644 index 000000000..d9aa9d5a0 --- /dev/null +++ b/plugins/inputs/intel_dlb/intel_dlb.go @@ -0,0 +1,476 @@ +//go:generate ../../../tools/readme_config_includer/generator +//go:build linux +// +build linux + +package intel_dlb + +import ( + _ "embed" + "encoding/json" + "fmt" + "net" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/plugins/inputs" +) + +//go:embed sample.conf +var sampleConfig string + +var unreachableSocketBehaviors = []string{"error", "ignore"} + +type IntelDLB struct { + SocketPath string `toml:"socket_path"` + EventdevCommands []string `toml:"eventdev_commands"` + DLBDeviceIDs []string `toml:"dlb_device_types"` + UnreachableSocketBehavior string `toml:"unreachable_socket_behavior"` + Log telegraf.Logger `toml:"-"` + + connection net.Conn + devicesDir []string + rasReader rasReader + maxInitMessageLength uint32 +} + +const ( + defaultSocketPath = "/var/run/dpdk/rte/dpdk_telemetry.v2" + pluginName = "intel_dlb" + eventdevListCommand = "/eventdev/dev_list" + dlbDeviceIDLocation = "/sys/devices/*/*/device" + aerCorrectableFileName = "aer_dev_correctable" + aerFatalFileName = "aer_dev_fatal" + aerNonFatalFileName = "aer_dev_nonfatal" + defaultDLBDevice = "0x2710" +) + +// SampleConfig returns sample config +func (d *IntelDLB) SampleConfig() string { + return sampleConfig +} + +// Init performs validation of all parameters from configuration. +func (d *IntelDLB) Init() error { + var err error + + if d.UnreachableSocketBehavior == "" { + d.UnreachableSocketBehavior = "error" + } + + if err = choice.Check(d.UnreachableSocketBehavior, unreachableSocketBehaviors); err != nil { + return fmt.Errorf("unreachable_socket_behavior: %w", err) + } + + if d.SocketPath == "" { + d.SocketPath = defaultSocketPath + d.Log.Debugf("Using default '%v' path for socket_path", defaultSocketPath) + } + + err = checkSocketPath(d.SocketPath) + if err != nil { + if d.UnreachableSocketBehavior == "error" { + return err + } + d.Log.Warn(err) + } + + if len(d.EventdevCommands) == 0 { + eventdevDefaultCommands := []string{"/eventdev/dev_xstats", "/eventdev/port_xstats", "/eventdev/queue_xstats", "/eventdev/queue_links"} + d.EventdevCommands = eventdevDefaultCommands + d.Log.Debugf("Using default eventdev commands '%v'", eventdevDefaultCommands) + } + + if err = validateEventdevCommands(d.EventdevCommands); err != nil { + return err + } + + if len(d.DLBDeviceIDs) == 0 { + d.DLBDeviceIDs = []string{defaultDLBDevice} + d.Log.Debugf("Using default DLB Device ID '%v'", defaultDLBDevice) + } + + err = d.checkAndAddDLBDevice() + if err != nil { + return err + } + + d.maxInitMessageLength = 1024 + + return nil +} + +// Gather all unique commands and process each command sequentially. +func (d *IntelDLB) Gather(acc telegraf.Accumulator) error { + err := d.gatherMetricsFromSocket(acc) + if err != nil { + socketErr := fmt.Errorf("gathering metrics from socket by given commands failed: %v", err) + if d.UnreachableSocketBehavior == "error" { + return socketErr + } + d.Log.Debug(socketErr) + } + + err = d.gatherRasMetrics(acc) + if err != nil { + return fmt.Errorf("gathering RAS metrics failed: %v", err) + } + + return nil +} + +func (d *IntelDLB) gatherRasMetrics(acc telegraf.Accumulator) error { + for _, devicePath := range d.devicesDir { + rasTags := map[string]string{ + "device": filepath.Base(filepath.Dir(devicePath)), + } + + aerFilesName := []string{aerCorrectableFileName, aerFatalFileName, aerNonFatalFileName} + for _, fileName := range aerFilesName { + rasTags["metric_file"] = fileName + rasMetrics, err := d.readRasMetrics(devicePath, fileName) + if err != nil { + return err + } + acc.AddFields("intel_dlb_ras", rasMetrics, rasTags) + } + } + return nil +} + +func (d *IntelDLB) readRasMetrics(devicePath, metricPath string) (map[string]interface{}, error) { + deviceMetricPath := filepath.Join(devicePath, metricPath) + + data, err := d.rasReader.readFromFile(deviceMetricPath) + if err != nil { + return nil, err + } + + metrics := strings.Split(strings.TrimSpace(string(data)), "\n") + + rasMetric := make(map[string]interface{}) + for _, metric := range metrics { + metricPart := strings.Split(metric, " ") + if len(metricPart) < 2 { + return nil, fmt.Errorf("error occurred: no value to parse - %+q", metricPart) + } + + metricVal, err := strconv.ParseUint(metricPart[1], 10, 10) + if err != nil { + return nil, fmt.Errorf("error occurred: failed to parse value '%s': '%s'", metricPart[1], err) + } + rasMetric[metricPart[0]] = metricVal + } + + return rasMetric, nil +} + +func (d *IntelDLB) gatherMetricsFromSocket(acc telegraf.Accumulator) error { + // Get device indexes and those indexes to available commands + commandsWithIndex, err := d.gatherCommandsWithDeviceIndex() + if err != nil { + return err + } + + for _, command := range commandsWithIndex { + // Write message to socket, e.g.: "/eventdev/dev_xstats,0", then process result and parse it to variable. + var parsedDeviceXstats map[string]map[string]int + err := d.gatherCommandsResult(command, &parsedDeviceXstats) + if err != nil { + return err + } + var statsWithValue = make(map[string]interface{}) + for _, commandBody := range parsedDeviceXstats { + for metricName, metricValue := range commandBody { + statsWithValue[metricName] = metricValue + } + } + + var tags = map[string]string{ + "command": command, + } + acc.AddFields(pluginName, statsWithValue, tags) + } + + return nil +} + +func (d *IntelDLB) gatherCommandsWithDeviceIndex() ([]string, error) { + // Parse message from JSON format to map e.g.: key = "/eventdev/dev_list", and value = [0, 1] + var parsedDeviceIndexes map[string][]int + err := d.gatherCommandsResult(eventdevListCommand, &parsedDeviceIndexes) + if err != nil { + return nil, err + } + var commandsWithIndex []string + for _, deviceIndexes := range parsedDeviceIndexes { + for _, index := range deviceIndexes { + for _, command := range d.EventdevCommands { + if !strings.Contains(command, "dev_") { + secondDeviceIndexes, err := d.gatherSecondDeviceIndex(index, command) + if err != nil { + return nil, err + } + commandsWithIndex = append(commandsWithIndex, secondDeviceIndexes...) + } else { + // Append to "/eventdev/dev_xstats," device index eg.: "/eventdev/dev_xstats" + "," + "0" + commandWithIndex := fmt.Sprintf("%s,%d", command, index) + commandsWithIndex = append(commandsWithIndex, commandWithIndex) + } + } + } + } + + return commandsWithIndex, nil +} + +func (d *IntelDLB) gatherCommandsResult(command string, deviceToParse interface{}) error { + err := d.ensureConnected() + if err != nil { + return err + } + + replyMsgLen, socketReply, err := d.writeReadSocketMessage(command) + if err != nil { + return err + } + + err = d.parseJSON(replyMsgLen, socketReply, &deviceToParse) + if err != nil { + return err + } + + return nil +} + +func (d *IntelDLB) gatherSecondDeviceIndex(index int, command string) ([]string, error) { + eventdevListWithSecondIndex := []string{"/eventdev/port_list", "/eventdev/queue_list"} + var commandsWithIndex []string + for _, commandToGatherSecondIndex := range eventdevListWithSecondIndex { + // get command type e.g.: "port_xstat" gives "port" + commandType := strings.Split(command, "_") + if len(commandType) != 2 { + return nil, d.closeSocketAndThrowError("custom", fmt.Errorf("cannot split command - %s", commandType)) + } + + if strings.Contains(commandToGatherSecondIndex, commandType[0]) { + var parsedDeviceSecondIndexes map[string][]int + commandToGatherWithIndex := fmt.Sprintf("%s,%d", commandToGatherSecondIndex, index) + + err := d.gatherCommandsResult(commandToGatherWithIndex, &parsedDeviceSecondIndexes) + if err != nil { + return nil, err + } + + for _, indexArray := range parsedDeviceSecondIndexes { + for _, secondIndex := range indexArray { + commandWithIndex := fmt.Sprintf("%s,%d,%d", command, index, secondIndex) + commandsWithIndex = append(commandsWithIndex, commandWithIndex) + } + } + } + } + + return commandsWithIndex, nil +} + +func (d *IntelDLB) ensureConnected() error { + var err error + d.maxInitMessageLength = uint32(1024) + if d.connection == nil { + d.connection, err = net.Dial("unixpacket", d.SocketPath) + if err != nil { + return err + } + + err = d.setInitMessageLength() + if err != nil { + return err + } + } + + return nil +} + +func (d *IntelDLB) setInitMessageLength() error { + type initMessage struct { + Version string `json:"version"` + Pid int `json:"pid"` + MaxOutputLen uint32 `json:"max_output_len"` + } + buf := make([]byte, d.maxInitMessageLength) + messageLength, err := d.connection.Read(buf) + if err != nil { + return d.closeSocketAndThrowError("custom", fmt.Errorf("failed to read InitMessage from socket - %v", err)) + } + if messageLength > len(buf) { + return d.closeSocketAndThrowError("custom", fmt.Errorf("socket reply length is bigger than default buffer length")) + } + + var initMsg initMessage + err = json.Unmarshal(buf[:messageLength], &initMsg) + if err != nil { + return d.closeSocketAndThrowError("json", err) + } + if initMsg.MaxOutputLen == 0 { + return d.closeSocketAndThrowError("message", err) + } + + d.maxInitMessageLength = initMsg.MaxOutputLen + + return nil +} + +func (d *IntelDLB) writeReadSocketMessage(messageToWrite string) (int, []byte, error) { + _, writeErr := d.connection.Write([]byte(messageToWrite)) + if writeErr != nil { + return 0, nil, d.closeSocketAndThrowError("write", writeErr) + } + + // Read reply, and obtain length of it. + socketReply := make([]byte, d.maxInitMessageLength) + replyMsgLen, readErr := d.connection.Read(socketReply) + if readErr != nil { + return 0, nil, d.closeSocketAndThrowError("read", readErr) + } + + if replyMsgLen == 0 { + return 0, nil, d.closeSocketAndThrowError("message", fmt.Errorf("message length is empty")) + } + + return replyMsgLen, socketReply, nil +} + +func (d *IntelDLB) parseJSON(replyMsgLen int, socketReply []byte, parsedDeviceInfo interface{}) error { + if len(socketReply) == 0 { + return d.closeSocketAndThrowError("json", fmt.Errorf("socket reply is empty")) + } + if replyMsgLen > len(socketReply) { + return d.closeSocketAndThrowError("json", fmt.Errorf("socket reply length is bigger than it should be")) + } + if replyMsgLen == 0 { + return d.closeSocketAndThrowError("json", fmt.Errorf("socket reply message is empty")) + } + // Assign reply to variable, e.g.: {"/eventdev/dev_list": [0, 1]} + jsonDeviceIndexes := socketReply[:replyMsgLen] + + // Parse message from JSON format to map, e.g.: map[string]int. Key = "/eventdev/dev_list" Value = Array[int] {0,1} + jsonParseErr := json.Unmarshal(jsonDeviceIndexes, &parsedDeviceInfo) + if jsonParseErr != nil { + return d.closeSocketAndThrowError("json", jsonParseErr) + } + + return nil +} + +func (d *IntelDLB) closeSocketAndThrowError(errType string, err error) error { + const ( + writeErrMsg = "failed to send command to socket: '%v'" + readErrMsg = "failed to read response of from socket: '%v'" + msgLenErr = "got empty response from socket: '%v'" + jsonParseErr = "failed to parse json: '%v'" + failedConErr = " - and failed to close connection '%v'" + customErr = "error occurred: '%v'" + ) + + var errMsg string + switch errType { + case "write": + errMsg = writeErrMsg + case "read": + errMsg = readErrMsg + case "message": + errMsg = msgLenErr + case "json": + errMsg = jsonParseErr + case "custom": + errMsg = customErr + } + + if d.connection != nil { + closeConnectionErr := d.connection.Close() + d.connection = nil + if closeConnectionErr != nil { + errCloseMsg := errMsg + failedConErr + return fmt.Errorf(errCloseMsg, err, closeConnectionErr) + } + } + + return fmt.Errorf(errMsg, err) +} + +func (d *IntelDLB) checkAndAddDLBDevice() error { + if d.rasReader == nil { + return fmt.Errorf("rasreader was not initialized") + } + filePaths, err := d.rasReader.gatherPaths(dlbDeviceIDLocation) + if err != nil { + return err + } + + deviceIDToDirs := make(map[string][]string) + for _, path := range filePaths { + fileData, err := d.rasReader.readFromFile(path) + if err != nil { + return err + } + + // check if it is DLB device + trimmedDeviceID := strings.TrimSpace(string(fileData)) + if !choice.Contains(trimmedDeviceID, d.DLBDeviceIDs) { + continue + } + deviceDir := filepath.Dir(path) + deviceIDToDirs[trimmedDeviceID] = append(deviceIDToDirs[trimmedDeviceID], deviceDir) + d.devicesDir = append(d.devicesDir, deviceDir) + } + if len(d.devicesDir) == 0 { + return fmt.Errorf("cannot find any of provided IDs on the system - %+q", d.DLBDeviceIDs) + } + for _, deviceID := range d.DLBDeviceIDs { + if len(deviceIDToDirs[deviceID]) == 0 { + d.Log.Debugf("Device %s was not found on system", deviceID) + } + } + return nil +} + +func checkSocketPath(path string) error { + pathInfo, err := os.Lstat(path) + if os.IsNotExist(err) { + return fmt.Errorf("provided path does not exist: '%v'", path) + } + + if err != nil { + return fmt.Errorf("cannot get system information of '%v' file: %v", path, err) + } + + if pathInfo.Mode()&os.ModeSocket != os.ModeSocket { + return fmt.Errorf("provided path does not point to a socket file: '%v'", path) + } + + return nil +} + +func validateEventdevCommands(commands []string) error { + eventdevCommandRegex := regexp.MustCompile("^/eventdev/[a-z_]+$") + for _, command := range commands { + if !eventdevCommandRegex.Match([]byte(command)) { + return fmt.Errorf("provided command is not valid - %v", command) + } + } + + return nil +} + +func init() { + inputs.Add(pluginName, func() telegraf.Input { + return &IntelDLB{ + rasReader: rasReaderImpl{}, + } + }) +} diff --git a/plugins/inputs/intel_dlb/intel_dlb_notlinux.go b/plugins/inputs/intel_dlb/intel_dlb_notlinux.go new file mode 100644 index 000000000..8fbb3d038 --- /dev/null +++ b/plugins/inputs/intel_dlb/intel_dlb_notlinux.go @@ -0,0 +1,4 @@ +//go:build !linux +// +build !linux + +package intel_dlb diff --git a/plugins/inputs/intel_dlb/intel_dlb_test.go b/plugins/inputs/intel_dlb/intel_dlb_test.go new file mode 100644 index 000000000..43b87ac7a --- /dev/null +++ b/plugins/inputs/intel_dlb/intel_dlb_test.go @@ -0,0 +1,1119 @@ +//go:build linux +// +build linux + +package intel_dlb + +import ( + "encoding/json" + "fmt" + "net" + "os" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs/dpdk/mocks" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestDLB_Init(t *testing.T) { + t.Run("when SocketPath is empty, then set default value", func(t *testing.T) { + dlb := IntelDLB{ + SocketPath: "", + Log: testutil.Logger{}, + } + require.Equal(t, "", dlb.SocketPath) + + _ = dlb.Init() + + require.Equal(t, defaultSocketPath, dlb.SocketPath) + }) + + t.Run("invalid socket path throws error in Init method when UnreachableSocketBehavior is set to 'error'", func(t *testing.T) { + dlb := IntelDLB{ + SocketPath: "/this/is/wrong/path", + Log: testutil.Logger{}, + UnreachableSocketBehavior: "error", + } + err := dlb.Init() + + require.Error(t, err) + require.Contains(t, err.Error(), "provided path does not exist") + }) + + t.Run("not-existing socket path doesn't throw error in Init method when UnreachableSocketBehavior is set to 'ignore'", func(t *testing.T) { + dlb := IntelDLB{ + SocketPath: "/socket/is/not/there/yet", + Log: testutil.Logger{}, + UnreachableSocketBehavior: "ignore", + } + err := dlb.Init() + + require.Error(t, err) + require.NotContains(t, err.Error(), "provided path does not exist") + }) + + t.Run("wrong UnreachableSocketBehavior option throws error in Init method", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t) + defer socket.Close() + dlb := IntelDLB{ + SocketPath: pathToSocket, + UnreachableSocketBehavior: "DAS BOOT", + Log: testutil.Logger{}, + } + err := dlb.Init() + + require.Error(t, err) + require.Contains(t, err.Error(), "unreachable_socket_behavior: unknown choice DAS BOOT") + }) + + t.Run("wrong eventdev command throws error in Init method", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t) + defer socket.Close() + dlb := IntelDLB{ + SocketPath: pathToSocket, + EventdevCommands: []string{"/noteventdev/dev_xstats"}, + Log: testutil.Logger{}, + } + err := dlb.Init() + + require.Error(t, err) + require.Contains(t, err.Error(), "provided command is not valid - ") + }) + + t.Run("wrong eventdev command throws error", func(t *testing.T) { + dlb := IntelDLB{ + EventdevCommands: []string{"/noteventdev/dev_xstats"}, + } + err := validateEventdevCommands(dlb.EventdevCommands) + + require.Error(t, err) + require.Contains(t, err.Error(), "provided command is not valid - ") + }) + + t.Run("validate eventdev command", func(t *testing.T) { + dlb := IntelDLB{ + EventdevCommands: []string{"/eventdev/dev_xstats"}, + } + err := validateEventdevCommands(dlb.EventdevCommands) + + require.NoError(t, err) + }) + + t.Run("successfully initialize intel_dlb struct", func(t *testing.T) { + pathToSocket, socket := createSocketForTest(t) + fileMock := &mockRasReader{} + defer socket.Close() + dlb := IntelDLB{ + SocketPath: pathToSocket, + Log: testutil.Logger{}, + rasReader: fileMock, + } + const globPath = "/sys/devices/pci0000:00/0000:00:00.0/device" + fileMock.On("gatherPaths", mock.Anything).Return([]string{globPath}, nil).Once(). + On("readFromFile", mock.Anything).Return([]byte("0x2710"), nil).Once() + + err := dlb.Init() + require.NoError(t, err) + require.Equal(t, []string{"/eventdev/dev_xstats", "/eventdev/port_xstats", "/eventdev/queue_xstats", "/eventdev/queue_links"}, dlb.EventdevCommands) + fileMock.AssertExpectations(t) + }) + + t.Run("throw error while initializing dlb plugin when theres no dlb device", func(t *testing.T) { + fileMock := &mockRasReader{} + pathToSocket, socket := createSocketForTest(t) + defer socket.Close() + dlb := IntelDLB{ + rasReader: fileMock, + SocketPath: pathToSocket, + Log: testutil.Logger{}, + } + const emptyPath = "" + fileMock.On("gatherPaths", mock.Anything).Return([]string{emptyPath}, fmt.Errorf("can't find device folder")).Once() + err := dlb.Init() + require.Error(t, err) + require.Contains(t, err.Error(), "can't find device folder") + fileMock.AssertExpectations(t) + }) +} + +func TestDLB_writeReadSocketMessage(t *testing.T) { + t.Run("throws custom error message when write error occur", func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + } + mockConn.On("Write", []byte{}).Return(0, fmt.Errorf("write error")).Once(). + On("Close").Return(nil).Once() + + _, _, err := dlb.writeReadSocketMessage("") + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to send command to socket: 'write error'") + mockConn.AssertExpectations(t) + }) + + t.Run("throws custom error message when read error occur", func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + } + simulateResponse(mockConn, "", fmt.Errorf("read error")) + + _, _, err := dlb.writeReadSocketMessage("") + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to read response of from socket: 'read error'") + mockConn.AssertExpectations(t) + }) + + t.Run("throws custom error message when write error occur", func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + } + mockConn.On("Write", []byte{}).Return(0, nil).Once(). + On("Read", mock.Anything).Return(0, nil). + On("Close").Return(nil).Once() + + _, _, err := dlb.writeReadSocketMessage("") + + require.Error(t, err) + require.Contains(t, err.Error(), "got empty response from socket: 'message length is empty'") + mockConn.AssertExpectations(t) + }) +} + +func TestDLB_parseJSON(t *testing.T) { + var tests = []struct { + testName string + socketReply []byte + replyMsgLen int + errMsg string + }{ + {"wrong json format", []byte("/wrong/json"), 10, "invalid character '/' looking for beginning of value"}, + {"socket reply length equal to 0 throws error", []byte("/wrong/json"), 0, "socket reply message is empty"}, + {"invalid reply length throws error", []byte("/wrong/json"), 20, "socket reply length is bigger than it should be"}, + {"nil socket reply throws error", nil, 0, "socket reply is empty"}, + } + for _, testCase := range tests { + t.Run(testCase.testName, func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + } + mockConn.On("Close").Return(nil).Once() + + err := dlb.parseJSON(testCase.replyMsgLen, testCase.socketReply, make(map[string]interface{})) + + require.Error(t, err) + require.Contains(t, err.Error(), testCase.errMsg) + mockConn.AssertExpectations(t) + }) + } +} + +func TestDLB_getInitMessageLength(t *testing.T) { + t.Run("trying to unmarshal invalid JSON throws error", func(t *testing.T) { + fileMock := &mockRasReader{} + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + rasReader: fileMock, + } + mockConn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, "") + }).Return(len(""), nil).Once().On("Close").Return(nil).Once() + + err := dlb.setInitMessageLength() + require.Error(t, err) + require.Contains(t, err.Error(), "failed to parse json") + fileMock.AssertExpectations(t) + }) + + t.Run("when init message equals 0 throw error", func(t *testing.T) { + fileMock := &mockRasReader{} + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + rasReader: fileMock, + } + dlb.maxInitMessageLength = 1024 + const initMsgResponse = "{\"version\":\"DPDK 20.11.3\",\"pid\":208361,\"max_output_len\":0}" + mockConn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, initMsgResponse) + }).Return(len(initMsgResponse), nil).Once().On("Close").Return(nil).Once() + + err := dlb.setInitMessageLength() + require.Error(t, err) + require.Contains(t, err.Error(), "got empty response from socket") + fileMock.AssertExpectations(t) + }) +} + +func TestDLB_gatherCommandsResult(t *testing.T) { + t.Run("trying connecting to wrong socket throw error", func(t *testing.T) { + pathToSocket := "/tmp/dpdk-test-socket" + socket, err := net.Listen("unix", pathToSocket) + fileMock := &mockRasReader{} + defer socket.Close() + dlb := IntelDLB{ + SocketPath: pathToSocket, + Log: testutil.Logger{}, + rasReader: fileMock, + } + require.NoError(t, err) + + err = dlb.gatherCommandsResult("", nil) + require.Error(t, err) + require.Contains(t, err.Error(), "connect: protocol wrong type for socket") + fileMock.AssertExpectations(t) + }) +} + +func TestDLB_gatherCommandsWithDeviceIndex(t *testing.T) { + t.Run("process wrong commands should throw error", func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + EventdevCommands: []string{"/eventdev/dev_xstats"}, + } + response := "/wrong/JSON" + dlb.maxInitMessageLength = 1024 + simulateResponse(mockConn, response, nil) + mockConn.On("Write", mock.Anything).Return(0, nil) + mockConn.On("Close").Return(nil).Once() + + _, err := dlb.gatherCommandsWithDeviceIndex() + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to parse json") + mockConn.AssertExpectations(t) + }) + + t.Run("process commands should return array with command and device id", func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + maxInitMessageLength: 1024, + EventdevCommands: []string{"/eventdev/dev_xstats"}, + } + response := fmt.Sprintf(`{"%s": [0, 1]}`, eventdevListCommand) + simulateResponse(mockConn, response, nil) + + expectedCommands := []string{"/eventdev/dev_xstats,0", "/eventdev/dev_xstats,1"} + + commands, err := dlb.gatherCommandsWithDeviceIndex() + + require.NoError(t, err) + require.Equal(t, expectedCommands, commands) + mockConn.AssertExpectations(t) + }) + + t.Run("process commands should return array with queue and device id", func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + maxInitMessageLength: 1024, + EventdevCommands: []string{"/eventdev/queue_links"}, + } + responseDevList := fmt.Sprintf(`{"%s": [0]}`, eventdevListCommand) + simulateResponse(mockConn, responseDevList, nil) + responseQueueLinks := `{"0": [0]}` + simulateResponse(mockConn, responseQueueLinks, nil) + + expectedCommands := []string{"/eventdev/queue_links,0,0"} + + commands, err := dlb.gatherCommandsWithDeviceIndex() + + require.NoError(t, err) + require.Equal(t, expectedCommands, commands) + mockConn.AssertExpectations(t) + }) + + t.Run("process wrong commands should throw error", func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + maxInitMessageLength: 1024, + EventdevCommands: []string{"/eventdev/dev_xstats", "/eventdev/wrong"}, + } + response := fmt.Sprintf(`{"%s": [0, 1]}`, eventdevListCommand) + mockConn.On("Write", mock.Anything).Return(0, nil).Once() + mockConn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, response) + }).Return(len(response), nil).Once().On("Close").Return(nil).Once() + + _, err := dlb.gatherCommandsWithDeviceIndex() + + require.Error(t, err) + require.Contains(t, err.Error(), "cannot split command") + mockConn.AssertExpectations(t) + }) +} + +func TestDLB_gatherSecondDeviceIndex(t *testing.T) { + t.Run("process wrong commands should return error", func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + EventdevCommands: []string{"/eventdev/wrong"}, + } + mockConn.On("Close").Return(nil).Once() + _, err := dlb.gatherSecondDeviceIndex(0, dlb.EventdevCommands[0]) + + require.Error(t, err) + require.Contains(t, err.Error(), "cannot split command -") + mockConn.AssertExpectations(t) + }) + + t.Run("process wrong response commands should throw error", func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + EventdevCommands: []string{"/eventdev/port_xstats"}, + } + response := "/wrong/JSON" + + simulateResponse(mockConn, response, nil) + mockConn.On("Close").Return(nil).Once() + + _, err := dlb.gatherSecondDeviceIndex(0, dlb.EventdevCommands[0]) + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to parse json") + mockConn.AssertExpectations(t) + }) + + t.Run("process wrong response commands should throw error and close socket, after second function call should connect to socket", func(t *testing.T) { + mockConn := &mocks.Conn{} + pathToSocket, socket := createSocketForTest(t) + defer socket.Close() + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + maxInitMessageLength: 1024, + EventdevCommands: []string{"/eventdev/port_xstats"}, + } + + response := "/wrong/JSON" + + simulateResponse(mockConn, response, nil) + mockConn.On("Close").Return(nil).Once() + + _, err := dlb.gatherSecondDeviceIndex(0, dlb.EventdevCommands[0]) + require.Equal(t, nil, dlb.connection) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to parse json") + dlb.SocketPath = pathToSocket + go simulateSocketResponseForGather(socket, t) + commandDeviceIndexes, err := dlb.gatherSecondDeviceIndex(0, dlb.EventdevCommands[0]) + require.NoError(t, err) + + expectedCommands := []string{"/eventdev/port_xstats,0,0", "/eventdev/port_xstats,0,1"} + commands := commandDeviceIndexes + + require.Equal(t, expectedCommands, commands) + mockConn.AssertExpectations(t) + }) + + t.Run("process commands should return array with command and second device id", func(t *testing.T) { + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + maxInitMessageLength: 1024, + EventdevCommands: []string{"/eventdev/port_xstats"}, + } + eventdevListWithSecondIndex := []string{"/eventdev/port_list", "/eventdev/queue_list"} + response := fmt.Sprintf(`{"%s": [0, 1]}`, eventdevListWithSecondIndex[0]) + simulateResponse(mockConn, response, nil) + + expectedCommands := []string{"/eventdev/port_xstats,0,0", "/eventdev/port_xstats,0,1"} + + commandDeviceIndexes, err := dlb.gatherSecondDeviceIndex(0, dlb.EventdevCommands[0]) + + commands := commandDeviceIndexes + + require.NoError(t, err) + require.Equal(t, expectedCommands, commands) + mockConn.AssertExpectations(t) + }) +} + +func TestDLB_processCommandResult(t *testing.T) { + t.Run("gather xstats info with valid values", func(t *testing.T) { + mockAcc := &testutil.Accumulator{} + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + maxInitMessageLength: 1024, + EventdevCommands: []string{"/eventdev/dev_xstats"}, + } + response := fmt.Sprintf(`{"%s": [0]}`, eventdevListCommand) + simulateResponse(mockConn, response, nil) + + response = `{"/eventdev/dev_xstats": {"dev_rx_ok": 0}}` + simulateResponse(mockConn, response, nil) + err := dlb.gatherMetricsFromSocket(mockAcc) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "intel_dlb", + map[string]string{ + "command": "/eventdev/dev_xstats,0", + }, + map[string]interface{}{ + "dev_rx_ok": int64(0), + }, + time.Unix(0, 0), + ), + } + actual := mockAcc.GetTelegrafMetrics() + + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + mockConn.AssertExpectations(t) + }) + + t.Run("successfully gather xstats and aer metrics", func(t *testing.T) { + mockAcc := &testutil.Accumulator{} + mockConn := &mocks.Conn{} + fileMock := &mockRasReader{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + EventdevCommands: []string{"/eventdev/dev_xstats"}, + devicesDir: []string{"/sys/devices/pci0000:00/0000:00:00.0/device"}, + rasReader: fileMock, + maxInitMessageLength: 1024, + } + responseGather := fmt.Sprintf(`{"%s": [0]}`, eventdevListCommand) + mockConn.On("Write", mock.Anything).Return(0, nil).Twice() + mockConn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, responseGather) + }).Return(len(responseGather), nil).Once() + response := `{"/eventdev/dev_xstats": {"dev_rx_ok": 0}}` + mockConn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, response) + }).Return(len(response), nil).Once() + fileMock.On("readFromFile", mock.AnythingOfType("string")).Return([]byte(aerCorrectableData), nil).Once(). + On("readFromFile", mock.AnythingOfType("string")).Return([]byte(aerFatalData), nil).Once(). + On("readFromFile", mock.AnythingOfType("string")).Return([]byte(aerNonFatalData), nil).Once() + err := dlb.Gather(mockAcc) + require.NoError(t, err) + actual := mockAcc.GetTelegrafMetrics() + testutil.SortMetrics() + ex := expectedTelegrafMetrics + testutil.RequireMetricsEqual(t, ex, actual, testutil.IgnoreTime()) + mockConn.AssertExpectations(t) + }) + + t.Run("invalid JSON throws error in process result function", func(t *testing.T) { + mockAcc := &testutil.Accumulator{} + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + EventdevCommands: []string{"/eventdev/dev_xstats"}, + } + response := fmt.Sprintf(`{"%s": [0]}`, eventdevListCommand) + simulateResponse(mockConn, response, nil) + + simulateResponse(mockConn, "/wrong/json", nil) + mockConn.On("Close").Return(nil).Once() + + err := dlb.gatherMetricsFromSocket(mockAcc) + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to parse json") + mockConn.AssertExpectations(t) + }) + + t.Run("throw error when reply message is empty", func(t *testing.T) { + mockAcc := &testutil.Accumulator{} + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + } + const response = "" + mockConn.On("Write", mock.Anything).Return(0, nil) + mockConn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, response) + }).Return(len(response), nil).Once() + mockConn.On("Close").Return(nil) + + err := dlb.gatherMetricsFromSocket(mockAcc) + require.Error(t, err) + require.Contains(t, err.Error(), "got empty response from socket") + mockConn.AssertExpectations(t) + }) + + t.Run("throw error when can't read socket reply", func(t *testing.T) { + mockAcc := &testutil.Accumulator{} + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + } + const response = "" + mockConn.On("Write", mock.Anything).Return(0, nil) + mockConn.On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, response) + }).Return(len(response), fmt.Errorf("read error")).Once() + mockConn.On("Close").Return(nil) + + err := dlb.gatherMetricsFromSocket(mockAcc) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to read response of from socket") + mockConn.AssertExpectations(t) + }) + + t.Run("throw error when invalid reply was provided", func(t *testing.T) { + mockAcc := &testutil.Accumulator{} + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + maxInitMessageLength: 1024, + Log: testutil.Logger{}, + } + simulateResponse(mockConn, "\"string reply\"", nil) + mockConn.On("Close").Return(nil).Once() + err := dlb.gatherMetricsFromSocket(mockAcc) + + require.Error(t, err) + require.Contains(t, err.Error(), "json: cannot unmarshal string into Go value of type") + mockConn.AssertExpectations(t) + }) + + t.Run("throw error while processing xstats", func(t *testing.T) { + mockAcc := &testutil.Accumulator{} + mockConn := &mocks.Conn{} + fileMock := &mockRasReader{} + dlb := IntelDLB{ + connection: mockConn, + Log: testutil.Logger{}, + EventdevCommands: []string{"/eventdev/dev_xstats"}, + rasReader: fileMock, + maxInitMessageLength: 1024, + } + mockConn.On("Close").Return(nil) + + responseGather := fmt.Sprintf(`{"%s": [0]}`, eventdevListCommand) + mockConn.On("Write", mock.Anything).Return(0, nil).Once(). + On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, responseGather) + }).Return(len(responseGather), nil).Once() + + wrongResponse := "/wrong/json" + mockConn.On("Write", mock.Anything).Return(0, nil).Once(). + On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, wrongResponse) + }).Return(len(wrongResponse), nil).Once() + + err := dlb.gatherMetricsFromSocket(mockAcc) + + require.Error(t, err) + require.Contains(t, err.Error(), "failed to parse json:") + mockConn.AssertExpectations(t) + }) +} + +func Test_checkAndAddDLBDevice(t *testing.T) { + t.Run("throw error when dlb validation can't find device folder", func(t *testing.T) { + fileMock := &mockRasReader{} + dlb := IntelDLB{ + rasReader: fileMock, + Log: testutil.Logger{}, + } + fileMock.On("gatherPaths", mock.AnythingOfType("string")).Return(nil, fmt.Errorf("can't find device folder")).Once() + + err := dlb.checkAndAddDLBDevice() + + require.Error(t, err) + require.Contains(t, err.Error(), "can't find device folder") + fileMock.AssertExpectations(t) + }) + + t.Run("reading file throws error", func(t *testing.T) { + fileMock := &mockRasReader{} + dlb := IntelDLB{ + rasReader: fileMock, + Log: testutil.Logger{}, + devicesDir: []string{"/sys/devices/pci0000:00/0000:00:00.0/device"}, + } + const globPath = "/sys/devices/pci0000:00/0000:00:00.0/device" + fileMock.On("gatherPaths", mock.Anything).Return([]string{globPath}, nil).Once(). + On("readFromFile", mock.Anything).Return([]byte("0x2710"), fmt.Errorf("read error while getting device folders")).Once() + + err := dlb.checkAndAddDLBDevice() + + require.Error(t, err) + require.Contains(t, err.Error(), "read error while getting device folders") + fileMock.AssertExpectations(t) + }) + + t.Run("reading file with empty rasreader throws error", func(t *testing.T) { + fileMock := &mockRasReader{} + dlb := IntelDLB{ + Log: testutil.Logger{}, + } + err := dlb.checkAndAddDLBDevice() + + require.Error(t, err) + require.Contains(t, err.Error(), "rasreader was not initialized") + fileMock.AssertExpectations(t) + }) + + t.Run("reading file with unused device IDs throws error", func(t *testing.T) { + fileMock := &mockRasReader{} + dlb := IntelDLB{ + rasReader: fileMock, + Log: testutil.Logger{}, + devicesDir: []string{"/sys/devices/pci0000:00/0000:00:00.0/device"}, + DLBDeviceIDs: []string{"0x2710"}, + } + const globPath = "/sys/devices/pci0000:00/0000:00:00.0/device" + fileMock.On("gatherPaths", mock.Anything).Return([]string{globPath}, nil).Once(). + On("readFromFile", mock.Anything).Return([]byte("0x2710"), fmt.Errorf("read error while getting device folders")).Once() + + err := dlb.checkAndAddDLBDevice() + + require.Error(t, err) + require.Contains(t, err.Error(), "read error while getting device folders") + fileMock.AssertExpectations(t) + }) + + t.Run("no errors when dlb device was found while validating", func(t *testing.T) { + fileMock := &mockRasReader{} + dlb := IntelDLB{ + rasReader: fileMock, + Log: testutil.Logger{}, + DLBDeviceIDs: []string{"0x2710"}, + } + const globPath = "/sys/devices/pci0000:00/0000:00:00.0/device" + fileMock.On("gatherPaths", mock.Anything).Return([]string{globPath}, nil).Once(). + On("readFromFile", mock.Anything).Return([]byte("0x2710"), nil).Once() + + err := dlb.checkAndAddDLBDevice() + + require.NoError(t, err) + + expected := []string{"/sys/devices/pci0000:00/0000:00:00.0"} + require.Equal(t, expected, dlb.devicesDir) + fileMock.AssertExpectations(t) + }) + + t.Run("no errors when found unused dlb device", func(t *testing.T) { + fileMock := &mockRasReader{} + dlb := IntelDLB{ + rasReader: fileMock, + Log: testutil.Logger{}, + DLBDeviceIDs: []string{"0x2710", "0x0000"}, + } + const globPath = "/sys/devices/pci0000:00/0000:00:00.0/device" + fileMock.On("gatherPaths", mock.Anything).Return([]string{globPath}, nil).Once(). + On("readFromFile", mock.Anything).Return([]byte("0x2710"), nil).Once() + + err := dlb.checkAndAddDLBDevice() + + require.NoError(t, err) + + expected := []string{"/sys/devices/pci0000:00/0000:00:00.0"} + require.Equal(t, expected, dlb.devicesDir) + fileMock.AssertExpectations(t) + }) + + t.Run("error when dlb device was not found while validating", func(t *testing.T) { + fileMock := &mockRasReader{} + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + rasReader: fileMock, + Log: testutil.Logger{}, + } + const globPath = "/sys/devices/pci0000:00/0000:00:00.0/device" + fileMock.On("gatherPaths", mock.Anything).Return([]string{globPath}, nil).Once(). + On("readFromFile", mock.Anything).Return([]byte("0x7100"), nil).Once() + + err := dlb.checkAndAddDLBDevice() + + require.Error(t, err) + require.Contains(t, err.Error(), fmt.Sprintf("cannot find any of provided IDs on the system - %+q", dlb.DLBDeviceIDs)) + fileMock.AssertExpectations(t) + mockConn.AssertExpectations(t) + }) +} + +func Test_readRasMetrics(t *testing.T) { + var errorTests = []struct { + name string + returnResponse []byte + err error + errMsg string + }{ + {"error when reading fails", []byte(aerCorrectableData), fmt.Errorf("read error"), "read error"}, + {"error when empty data is given", []byte(""), nil, "no value to parse"}, + {"error when trying to split empty data", []byte("x1 x2"), nil, "failed to parse value"}, + } + + for _, test := range errorTests { + t.Run(test.name, func(t *testing.T) { + fileMock := &mockRasReader{} + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + rasReader: fileMock, + Log: testutil.Logger{}, + } + mockConn.On("Close").Return(nil).Once() + fileMock.On("readFromFile", mock.AnythingOfType("string")).Return(test.returnResponse, test.err).Once() + + _, err := dlb.readRasMetrics("/dlb", "device") + + require.Error(t, err) + require.Contains(t, err.Error(), test.errMsg) + fileMock.AssertExpectations(t) + }) + } + + t.Run("no error when reading countable error file", func(t *testing.T) { + fileMock := &mockRasReader{} + dlb := IntelDLB{ + rasReader: fileMock, + Log: testutil.Logger{}, + } + + fileMock.On("readFromFile", mock.AnythingOfType("string")).Return([]byte(aerCorrectableData), nil).Once() + + _, err := dlb.readRasMetrics("/dlb", "device") + + require.NoError(t, err) + fileMock.AssertExpectations(t) + }) +} + +func Test_gatherRasMetrics(t *testing.T) { + var errorTests = []struct { + name string + returnResponse []byte + err error + errMsg string + }{ + {"throw error when data in file is invalid", nil, nil, "no value to parse"}, + {"throw error when data in file is invalid", []byte("x1 x2"), nil, "failed to parse value"}, + } + for _, test := range errorTests { + t.Run(test.name, func(t *testing.T) { + fileMock := &mockRasReader{} + mockAcc := &testutil.Accumulator{} + mockConn := &mocks.Conn{} + dlb := IntelDLB{ + connection: mockConn, + rasReader: fileMock, + devicesDir: []string{"/sys/devices/pci0000:00/0000:00:00.0/device"}, + Log: testutil.Logger{}, + } + mockConn.On("Close").Return(nil).Once() + fileMock.On("readFromFile", mock.AnythingOfType("string")).Return(test.returnResponse, test.err).Once() + + err := dlb.gatherRasMetrics(mockAcc) + + require.Error(t, err) + require.Contains(t, err.Error(), test.errMsg) + fileMock.AssertExpectations(t) + }) + } + + t.Run("gather ras metrics and add to accumulator", func(t *testing.T) { + fileMock := &mockRasReader{} + mockAcc := &testutil.Accumulator{} + dlb := IntelDLB{ + rasReader: fileMock, + devicesDir: []string{"/sys/devices/pci0000:00/0000:00:00.0/device"}, + Log: testutil.Logger{}, + } + fileMock.On("readFromFile", mock.AnythingOfType("string")).Return([]byte(aerCorrectableData), nil).Once(). + On("readFromFile", mock.AnythingOfType("string")).Return([]byte(aerFatalData), nil).Once(). + On("readFromFile", mock.AnythingOfType("string")).Return([]byte(aerNonFatalData), nil).Once() + + err := dlb.gatherRasMetrics(mockAcc) + + require.NoError(t, err) + + actual := mockAcc.GetTelegrafMetrics() + testutil.SortMetrics() + testutil.RequireMetricsEqual(t, expectedRasMetrics, actual, testutil.IgnoreTime()) + fileMock.AssertExpectations(t) + }) +} + +func Test_rasReader(t *testing.T) { + file := rasReaderImpl{} + // Create unique temporary file + fileobj, err := os.CreateTemp("", "qat") + require.NoError(t, err) + + t.Run("tests with existing file", func(t *testing.T) { + // Remove the temporary file after this test + defer os.Remove(fileobj.Name()) + + _, err = fileobj.Write([]byte(testFileContent)) + require.NoError(t, err) + err = fileobj.Close() + require.NoError(t, err) + + // Check that content returned by read is equal to provided file. + data, err := file.readFromFile(fileobj.Name()) + require.NoError(t, err) + require.Equal(t, []byte(testFileContent), data) + + // Error if path is malformed. + _, err = file.readFromFile(fileobj.Name() + "/../..") + require.Error(t, err) + require.Contains(t, err.Error(), "not a directory") + }) + + var errorTests = []struct { + name string + filePath string + expectedErrMsg string + }{ + {"error if file does not exist", fileobj.Name(), "no such file or directory"}, + {"error if path does not point to regular file", os.TempDir(), "is a directory"}, + {"error if file does not exist", "/not/path/unreal/path", "no such file or directory"}, + } + + for _, test := range errorTests { + t.Run(test.name, func(t *testing.T) { + _, err = file.readFromFile(test.filePath) + require.Error(t, err) + require.Contains(t, err.Error(), test.expectedErrMsg) + }) + } +} + +func simulateResponse(mockConn *mocks.Conn, response string, readErr error) { + mockConn.On("Write", mock.Anything).Return(0, nil).Once(). + On("Read", mock.Anything).Run(func(arg mock.Arguments) { + elem := arg.Get(0).([]byte) + copy(elem, response) + }).Return(len(response), readErr).Once() + + if readErr != nil { + mockConn.On("Close").Return(nil).Once() + } +} + +func simulateSocketResponseForGather(socket net.Listener, t *testing.T) { + conn, err := socket.Accept() + require.NoError(t, err) + + type initMessage struct { + Version string `json:"version"` + Pid int `json:"pid"` + MaxOutputLen uint32 `json:"max_output_len"` + } + initMsg, _ := json.Marshal(initMessage{ + Version: "", + Pid: 1, + MaxOutputLen: 1024, + }) + _, err = conn.Write(initMsg) + require.NoError(t, err) + + require.NoError(t, err) + eventdevListWithSecondIndex := []string{"/eventdev/port_list", "/eventdev/queue_list"} + _, err = conn.Write([]byte(fmt.Sprintf(`{"%s": [0, 1]}`, eventdevListWithSecondIndex[0]))) + require.NoError(t, err) +} + +func createSocketForTest(t *testing.T) (string, net.Listener) { + pathToSocket := "/tmp/dpdk-test-socket" + socket, err := net.Listen("unixpacket", pathToSocket) + require.NoError(t, err) + return pathToSocket, socket +} + +const ( + testFileContent = ` +line1 +line2 2 +line3 +line4 +line5 +` + aerCorrectableData = ` +RxErr 1 +BadTLP 0 +BadDLLP 0 +Rollover 1 +Timeout 0 +NonFatalErr 0 +CorrIntErr 0 +HeaderOF 0 +TOTAL_ERR_COR 0` + aerFatalData = ` +Undefined 0 +DLP 1 +SDES 0 +TLP 0 +FCP 0 +CmpltTO 0 +CmpltAbrt 0 +UnxCmplt 0 +RxOF 0 +MalfTLP 0 +ECRC 0 +UnsupReq 0 +ACSViol 0 +UncorrIntErr 0 +BlockedTLP 0 +AtomicOpBlocked 0 +TLPBlockedErr 0 +PoisonTLPBlocked 0 +TOTAL_ERR_FATAL 3` + aerNonFatalData = ` +Undefined 0 +DLP 0 +SDES 0 +TLP 0 +FCP 0 +CmpltTO 2 +CmpltAbrt 0 +UnxCmplt 0 +RxOF 0 +MalfTLP 0 +ECRC 0 +UnsupReq 0 +ACSViol 0 +UncorrIntErr 0 +BlockedTLP 0 +AtomicOpBlocked 0 +TLPBlockedErr 0 +PoisonTLPBlocked 0 +TOTAL_ERR_NONFATAL 9` +) + +var ( + expectedRasMetrics = []telegraf.Metric{ + testutil.MustMetric( + "intel_dlb_ras", + map[string]string{ + "device": "0000:00:00.0", + "metric_file": aerCorrectableFileName, + }, + map[string]interface{}{ + "RxErr": uint64(1), + "BadTLP": uint64(0), + "BadDLLP": uint64(0), + "Rollover": uint64(1), + "Timeout": uint64(0), + "NonFatalErr": uint64(0), + "CorrIntErr": uint64(0), + "HeaderOF": uint64(0), + "TOTAL_ERR_COR": uint64(0), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "intel_dlb_ras", + map[string]string{ + "device": "0000:00:00.0", + "metric_file": aerFatalFileName, + }, + map[string]interface{}{ + "Undefined": uint64(0), + "DLP": uint64(1), + "SDES": uint64(0), + "TLP": uint64(0), + "FCP": uint64(0), + "CmpltTO": uint64(0), + "CmpltAbrt": uint64(0), + "UnxCmplt": uint64(0), + "RxOF": uint64(0), + "MalfTLP": uint64(0), + "ECRC": uint64(0), + "UnsupReq": uint64(0), + "ACSViol": uint64(0), + "UncorrIntErr": uint64(0), + "BlockedTLP": uint64(0), + "AtomicOpBlocked": uint64(0), + "TLPBlockedErr": uint64(0), + "PoisonTLPBlocked": uint64(0), + "TOTAL_ERR_FATAL": uint64(3), + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "intel_dlb_ras", + map[string]string{ + "device": "0000:00:00.0", + "metric_file": aerNonFatalFileName, + }, + map[string]interface{}{ + "Undefined": uint64(0), + "DLP": uint64(0), + "SDES": uint64(0), + "TLP": uint64(0), + "FCP": uint64(0), + "CmpltTO": uint64(2), + "CmpltAbrt": uint64(0), + "UnxCmplt": uint64(0), + "RxOF": uint64(0), + "MalfTLP": uint64(0), + "ECRC": uint64(0), + "UnsupReq": uint64(0), + "ACSViol": uint64(0), + "UncorrIntErr": uint64(0), + "BlockedTLP": uint64(0), + "AtomicOpBlocked": uint64(0), + "TLPBlockedErr": uint64(0), + "PoisonTLPBlocked": uint64(0), + "TOTAL_ERR_NONFATAL": uint64(9), + }, + time.Unix(0, 0), + ), + } + + expectedTelegrafMetrics = []telegraf.Metric{ + testutil.MustMetric( + "intel_dlb", + map[string]string{ + "command": "/eventdev/dev_xstats,0", + }, + map[string]interface{}{ + "dev_rx_ok": int64(0), + }, + time.Unix(0, 0), + ), + expectedRasMetrics[0], + expectedRasMetrics[1], + expectedRasMetrics[2], + } +) diff --git a/plugins/inputs/intel_dlb/mock_ras_reader.go b/plugins/inputs/intel_dlb/mock_ras_reader.go new file mode 100644 index 000000000..4d36e54f2 --- /dev/null +++ b/plugins/inputs/intel_dlb/mock_ras_reader.go @@ -0,0 +1,71 @@ +// Code generated by mockery v2.14.0. DO NOT EDIT. + +package intel_dlb + +import mock "github.com/stretchr/testify/mock" + +// mockRasReader is an autogenerated mock type for the rasReader type +type mockRasReader struct { + mock.Mock +} + +// gatherPaths provides a mock function with given fields: path +func (_m *mockRasReader) gatherPaths(path string) ([]string, error) { + ret := _m.Called(path) + + var r0 []string + if rf, ok := ret.Get(0).(func(string) []string); ok { + r0 = rf(path) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(path) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// readFromFile provides a mock function with given fields: filePath +func (_m *mockRasReader) readFromFile(filePath string) ([]byte, error) { + ret := _m.Called(filePath) + + var r0 []byte + if rf, ok := ret.Get(0).(func(string) []byte); ok { + r0 = rf(filePath) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(filePath) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTnewMockRasReader interface { + mock.TestingT + Cleanup(func()) +} + +// newMockRasReader creates a new instance of mockRasReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func newMockRasReader(t mockConstructorTestingTnewMockRasReader) *mockRasReader { + mock := &mockRasReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/plugins/inputs/intel_dlb/ras_reader.go b/plugins/inputs/intel_dlb/ras_reader.go new file mode 100644 index 000000000..9f430c2d9 --- /dev/null +++ b/plugins/inputs/intel_dlb/ras_reader.go @@ -0,0 +1,37 @@ +//go:build linux +// +build linux + +package intel_dlb + +import ( + "fmt" + "os" + "path/filepath" +) + +type rasReader interface { + gatherPaths(path string) ([]string, error) + readFromFile(filePath string) ([]byte, error) +} + +type rasReaderImpl struct { +} + +// gatherPaths gathers all paths based on provided pattern +func (rasReaderImpl) gatherPaths(pattern string) ([]string, error) { + filePaths, err := filepath.Glob(pattern) + if err != nil { + return nil, fmt.Errorf("glob failed for pattern: %s: %v", pattern, err) + } + + if len(filePaths) == 0 { + return nil, fmt.Errorf("no candidates for given pattern: %s", pattern) + } + + return filePaths, nil +} + +// readFromFile reads file content. +func (rasReaderImpl) readFromFile(filePath string) ([]byte, error) { + return os.ReadFile(filePath) +} diff --git a/plugins/inputs/intel_dlb/sample.conf b/plugins/inputs/intel_dlb/sample.conf new file mode 100644 index 000000000..abe7014d1 --- /dev/null +++ b/plugins/inputs/intel_dlb/sample.conf @@ -0,0 +1,21 @@ +## Reads metrics from DPDK using v2 telemetry interface. +[[inputs.intel_dlb]] + ## Path to DPDK telemetry socket. + # socket_path = "/var/run/dpdk/rte/dpdk_telemetry.v2" + + ## Default eventdev command list, it gathers metrics from socket by given commands. + ## Supported options: + ## "/eventdev/dev_xstats", "/eventdev/port_xstats", + ## "/eventdev/queue_xstats", "/eventdev/queue_links" + # eventdev_commands = ["/eventdev/dev_xstats", "/eventdev/port_xstats", "/eventdev/queue_xstats", "/eventdev/queue_links"] + + ## Detect DLB devices based on device id. + ## Currently, only supported and tested device id is `0x2710`. + ## Configuration added to support forward compatibility. + # dlb_device_types = ["0x2710"] + + ## Specifies plugin behavior regarding unreachable socket (which might not have been initialized yet). + ## Available choices: + ## - error: Telegraf will return an error on startup if socket is unreachable + ## - ignore: Telegraf will ignore error regarding unreachable socket on both startup and gather + # unreachable_socket_behavior = "error"