diff --git a/README.md b/README.md index c9a6b70f7..7bcbd9111 100644 --- a/README.md +++ b/README.md @@ -211,6 +211,7 @@ For documentation on the latest development code see the [documentation index][d * [infiniband](./plugins/inputs/infiniband) * [influxdb](./plugins/inputs/influxdb) * [influxdb_listener](./plugins/inputs/influxdb_listener) +* [intel_rdt](./plugins/inputs/intel_rdt) * [internal](./plugins/inputs/internal) * [interrupts](./plugins/inputs/interrupts) * [ipmi_sensor](./plugins/inputs/ipmi_sensor) diff --git a/go.mod b/go.mod index 4b02fbf16..2e340a635 100644 --- a/go.mod +++ b/go.mod @@ -109,6 +109,7 @@ require ( github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 + github.com/prometheus/procfs v0.0.8 github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664 github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 1f0b228f9..d25d329d4 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -63,6 +63,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/influxdb" _ "github.com/influxdata/telegraf/plugins/inputs/influxdb_listener" _ "github.com/influxdata/telegraf/plugins/inputs/influxdb_v2_listener" + _ "github.com/influxdata/telegraf/plugins/inputs/intel_rdt" _ "github.com/influxdata/telegraf/plugins/inputs/internal" _ "github.com/influxdata/telegraf/plugins/inputs/interrupts" _ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor" diff --git a/plugins/inputs/intel_rdt/README.md b/plugins/inputs/intel_rdt/README.md new file mode 100644 index 000000000..1a6e55f6a --- /dev/null +++ b/plugins/inputs/intel_rdt/README.md @@ -0,0 +1,108 @@ +# Intel RDT Input Plugin +The intel_rdt plugin collects information provided by monitoring features of +Intel Resource Director Technology (Intel(R) RDT) like Cache Monitoring Technology (CMT), +Memory Bandwidth Monitoring (MBM), Cache Allocation Technology (CAT) and Code +and Data Prioritization (CDP) Technology provide the hardware framework to monitor +and control the utilization of shared resources, like last level cache, memory bandwidth. +These Technologies comprise Intel’s Resource Director Technology (RDT). +As multithreaded and multicore platform architectures emerge, +running workloads in single-threaded, multithreaded, or complex virtual machine environment, +the last level cache and memory bandwidth are key resources to manage. Intel introduces CMT, +MBM, CAT and CDP to manage these various workloads across shared resources. + +To gather Intel RDT metrics plugin uses _pqos_ cli tool which is a part of [Intel(R) RDT Software Package](https://github.com/intel/intel-cmt-cat). +Before using this plugin please be sure _pqos_ is properly installed and configured regarding that the plugin +run _pqos_ to work with `OS Interface` mode. This plugin supports _pqos_ version 4.0.0 and above. +Be aware pqos tool needs root privileges to work properly. + +Metrics will be constantly reported from the following `pqos` commands within the given interval: + +#### In case of cores monitoring: +``` +pqos -r --iface-os --mon-file-type=csv --mon-interval=INTERVAL --mon-core=all:[CORES]\;mbt:[CORES] +``` +where `CORES` is equal to group of cores provided in config. User can provide many groups. + +#### In case of process monitoring: +``` +pqos -r --iface-os --mon-file-type=csv --mon-interval=INTERVAL --mon-pid=all:[PIDS]\;mbt:[PIDS] +``` +where `PIDS` is group of processes IDs which name are equal to provided process name in a config. +User can provide many process names which lead to create many processes groups. + +In both cases `INTERVAL` is equal to sampling_interval from config. + +Because PIDs association within system could change in every moment, Intel RDT plugin provides a +functionality to check on every interval if desired processes change their PIDs association. +If some change is reported, plugin will restart _pqos_ tool with new arguments. If provided by user +process name is not equal to any of available processes, will be omitted and plugin will constantly +check for process availability. + +### Useful links +Pqos installation process: https://github.com/intel/intel-cmt-cat/blob/master/INSTALL +Enabling OS interface: https://github.com/intel/intel-cmt-cat/wiki, https://github.com/intel/intel-cmt-cat/wiki/resctrl +More about Intel RDT: https://www.intel.com/content/www/us/en/architecture-and-technology/resource-director-technology.html + +### Configuration +```toml +# Read Intel RDT metrics +[[inputs.IntelRDT]] + ## Optionally set sampling interval to Nx100ms. + ## This value is propagated to pqos tool. Interval format is defined by pqos itself. + ## If not provided or provided 0, will be set to 10 = 10x100ms = 1s. + # sampling_interval = "10" + + ## Optionally specify the path to pqos executable. + ## If not provided, auto discovery will be performed. + # pqos_path = "/usr/local/bin/pqos" + + ## Optionally specify if IPC and LLC_Misses metrics shouldn't be propagated. + ## If not provided, default value is false. + # shortened_metrics = false + + ## Specify the list of groups of CPU core(s) to be provided as pqos input. + ## Mandatory if processes aren't set and forbidden if processes are specified. + ## e.g. ["0-3", "4,5,6"] or ["1-3,4"] + # cores = ["0-3"] + + ## Specify the list of processes for which Metrics will be collected. + ## Mandatory if cores aren't set and forbidden if cores are specified. + ## e.g. ["qemu", "pmd"] + # processes = ["process"] +``` + +### Exposed metrics +| Name | Full name | Description | +|---------------|-----------------------------------------------|-------------| +| MBL | Memory Bandwidth on Local NUMA Node | Memory bandwidth utilization by the relevant CPU core/process on the local NUMA memory channel | +| MBR | Memory Bandwidth on Remote NUMA Node | Memory bandwidth utilization by the relevant CPU core/process on the remote NUMA memory channel | +| MBT | Total Memory Bandwidth | Total memory bandwidth utilized by a CPU core/process on local and remote NUMA memory channels | +| LLC | L3 Cache Occupancy | Total Last Level Cache occupancy by a CPU core/process | +| *LLC_Misses | L3 Cache Misses | Total Last Level Cache misses by a CPU core/process | +| *IPC | Instructions Per Cycle | Total instructions per cycle executed by a CPU core/process | + +*optional + +### Troubleshooting +Pointing to non-existing core will lead to throwing an error by _pqos_ and plugin will not work properly. +Be sure to check if provided core number exists within desired system. + +Be aware reading Intel RDT metrics by _pqos_ cannot be done simultaneously on the same resource. +So be sure to not use any other _pqos_ instance which is monitoring the same cores or PIDs within working system. +Also there is no possibility to monitor same cores or PIDs on different groups. + +Pids association for the given process could be manually checked by `pidof` command. E.g: +``` +pidof PROCESS +``` +where `PROCESS` is process name. + +### Example Output +``` +> rdt_metric,cores=12\,19,host=r2-compute-20,name=IPC,process=top value=0 1598962030000000000 +> rdt_metric,cores=12\,19,host=r2-compute-20,name=LLC_Misses,process=top value=0 1598962030000000000 +> rdt_metric,cores=12\,19,host=r2-compute-20,name=LLC,process=top value=0 1598962030000000000 +> rdt_metric,cores=12\,19,host=r2-compute-20,name=MBL,process=top value=0 1598962030000000000 +> rdt_metric,cores=12\,19,host=r2-compute-20,name=MBR,process=top value=0 1598962030000000000 +> rdt_metric,cores=12\,19,host=r2-compute-20,name=MBT,process=top value=0 1598962030000000000 +``` \ No newline at end of file diff --git a/plugins/inputs/intel_rdt/intel_rdt.go b/plugins/inputs/intel_rdt/intel_rdt.go new file mode 100644 index 000000000..e61266c0a --- /dev/null +++ b/plugins/inputs/intel_rdt/intel_rdt.go @@ -0,0 +1,552 @@ +// +build !windows + +package intel_rdt + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "os/exec" + "regexp" + "strconv" + "strings" + "sync" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const ( + timestampFormat = "2006-01-02 15:04:05" + defaultSamplingInterval = 10 + pqosInitOutputLinesNumber = 4 + numberOfMetrics = 6 + secondsDenominator = 10 +) + +var pqosMetricOrder = map[int]string{ + 0: "IPC", // Instructions Per Cycle + 1: "LLC_Misses", // Cache Misses + 2: "LLC", // L3 Cache Occupancy + 3: "MBL", // Memory Bandwidth on Local NUMA Node + 4: "MBR", // Memory Bandwidth on Remote NUMA Node + 5: "MBT", // Total Memory Bandwidth +} + +type IntelRDT struct { + PqosPath string `toml:"pqos_path"` + Cores []string `toml:"cores"` + Processes []string `toml:"processes"` + SamplingInterval int32 `toml:"sampling_interval"` + ShortenedMetrics bool `toml:"shortened_metrics"` + + Log telegraf.Logger `toml:"-"` + Publisher Publisher `toml:"-"` + Processor ProcessesHandler `toml:"-"` + stopPQOSChan chan bool + quitChan chan struct{} + errorChan chan error + parsedCores []string + processesPIDsMap map[string]string + cancel context.CancelFunc + wg sync.WaitGroup +} + +type processMeasurement struct { + name string + measurement string +} + +// All gathering is done in the Start function +func (r *IntelRDT) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (r *IntelRDT) Description() string { + return "Intel Resource Director Technology plugin" +} + +func (r *IntelRDT) SampleConfig() string { + return ` + ## Optionally set sampling interval to Nx100ms. + ## This value is propagated to pqos tool. Interval format is defined by pqos itself. + ## If not provided or provided 0, will be set to 10 = 10x100ms = 1s. + # sampling_interval = "10" + + ## Optionally specify the path to pqos executable. + ## If not provided, auto discovery will be performed. + # pqos_path = "/usr/local/bin/pqos" + + ## Optionally specify if IPC and LLC_Misses metrics shouldn't be propagated. + ## If not provided, default value is false. + # shortened_metrics = false + + ## Specify the list of groups of CPU core(s) to be provided as pqos input. + ## Mandatory if processes aren't set and forbidden if processes are specified. + ## e.g. ["0-3", "4,5,6"] or ["1-3,4"] + # cores = ["0-3"] + + ## Specify the list of processes for which Metrics will be collected. + ## Mandatory if cores aren't set and forbidden if cores are specified. + ## e.g. ["qemu", "pmd"] + # processes = ["process"] +` +} + +func (r *IntelRDT) Start(acc telegraf.Accumulator) error { + ctx, cancel := context.WithCancel(context.Background()) + r.cancel = cancel + + r.Processor = NewProcessor() + r.Publisher = NewPublisher(acc, r.Log, r.ShortenedMetrics) + + err := r.Initialize() + if err != nil { + return err + } + + r.Publisher.publish(ctx) + go r.errorHandler(ctx) + go r.scheduler(ctx) + + return nil +} + +func (r *IntelRDT) Initialize() error { + r.stopPQOSChan = make(chan bool) + r.quitChan = make(chan struct{}) + r.errorChan = make(chan error) + + err := validatePqosPath(r.PqosPath) + if err != nil { + return err + } + if len(r.Cores) != 0 && len(r.Processes) != 0 { + return fmt.Errorf("monitoring start error, process and core tracking can not be done simultaneously") + } + if len(r.Cores) == 0 && len(r.Processes) == 0 { + return fmt.Errorf("monitoring start error, at least one of cores or processes must be provided in config") + } + if r.SamplingInterval == 0 { + r.SamplingInterval = defaultSamplingInterval + } + if err = validateInterval(r.SamplingInterval); err != nil { + return err + } + r.parsedCores, err = parseCoresConfig(r.Cores) + if err != nil { + return err + } + r.processesPIDsMap, err = r.associateProcessesWithPIDs(r.Processes) + if err != nil { + return err + } + return nil +} + +func (r *IntelRDT) errorHandler(ctx context.Context) { + r.wg.Add(1) + defer r.wg.Done() + for { + select { + case err := <-r.errorChan: + if err != nil { + r.Log.Error(fmt.Sprintf("Error: %v", err)) + r.quitChan <- struct{}{} + } + case <-ctx.Done(): + return + } + } +} + +func (r *IntelRDT) scheduler(ctx context.Context) { + r.wg.Add(1) + defer r.wg.Done() + interval := time.Duration(r.SamplingInterval) + ticker := time.NewTicker(interval * time.Second / secondsDenominator) + + r.createArgsAndStartPQOS(ctx) + + for { + select { + case <-ticker.C: + if len(r.Processes) != 0 { + err := r.checkPIDsAssociation(ctx) + if err != nil { + r.errorChan <- err + } + } + case <-r.quitChan: + r.cancel() + return + case <-ctx.Done(): + return + } + } +} + +func (r *IntelRDT) Stop() { + r.cancel() + r.wg.Wait() +} + +func (r *IntelRDT) checkPIDsAssociation(ctx context.Context) error { + newProcessesPIDsMap, err := r.associateProcessesWithPIDs(r.Processes) + if err != nil { + return err + } + // change in PIDs association appears + if !cmp.Equal(newProcessesPIDsMap, r.processesPIDsMap) { + r.Log.Warnf("PIDs association has changed. Refreshing...") + if len(r.processesPIDsMap) != 0 { + r.stopPQOSChan <- true + } + r.processesPIDsMap = newProcessesPIDsMap + r.createArgsAndStartPQOS(ctx) + } + return nil +} + +func (r *IntelRDT) associateProcessesWithPIDs(providedProcesses []string) (map[string]string, error) { + mapProcessPIDs := map[string]string{} + + availableProcesses, err := r.Processor.getAllProcesses() + if err != nil { + return nil, fmt.Errorf("cannot gather information of all available processes") + } + for _, availableProcess := range availableProcesses { + if choice.Contains(availableProcess.Name, providedProcesses) { + PID := availableProcess.PID + mapProcessPIDs[availableProcess.Name] = mapProcessPIDs[availableProcess.Name] + fmt.Sprintf("%d", PID) + "," + } + } + for key := range mapProcessPIDs { + mapProcessPIDs[key] = strings.TrimSuffix(mapProcessPIDs[key], ",") + } + return mapProcessPIDs, nil +} + +func (r *IntelRDT) createArgsAndStartPQOS(ctx context.Context) { + args := []string{"-r", "--iface-os", "--mon-file-type=csv", fmt.Sprintf("--mon-interval=%d", r.SamplingInterval)} + + if len(r.parsedCores) != 0 { + coresArg := createArgCores(r.parsedCores) + args = append(args, coresArg) + go r.readData(args, nil, ctx) + + } else if len(r.processesPIDsMap) != 0 { + processArg := createArgProcess(r.processesPIDsMap) + args = append(args, processArg) + go r.readData(args, r.processesPIDsMap, ctx) + } + return +} + +func (r *IntelRDT) readData(args []string, processesPIDsAssociation map[string]string, ctx context.Context) { + r.wg.Add(1) + defer r.wg.Done() + + cmd := exec.Command(r.PqosPath, append(args)...) + + cmdReader, err := cmd.StdoutPipe() + if err != nil { + r.errorChan <- err + } + go r.processOutput(cmdReader, processesPIDsAssociation) + + go func() { + for { + select { + case <-r.stopPQOSChan: + if err := shutDownPqos(cmd); err != nil { + r.Log.Error(err) + } + return + case <-ctx.Done(): + if err := shutDownPqos(cmd); err != nil { + r.Log.Error(err) + } + return + } + } + }() + err = cmd.Start() + if err != nil { + r.errorChan <- fmt.Errorf("pqos: %v", err) + return + } + err = cmd.Wait() + if err != nil { + r.errorChan <- fmt.Errorf("pqos: %v", err) + } +} + +func (r *IntelRDT) processOutput(cmdReader io.ReadCloser, processesPIDsAssociation map[string]string) { + reader := bufio.NewScanner(cmdReader) + /* + Omit constant, first 4 lines : + "NOTE: Mixed use of MSR and kernel interfaces to manage + CAT or CMT & MBM may lead to unexpected behavior.\n" + CMT/MBM reset successful + "Time,Core,IPC,LLC Misses,LLC[KB],MBL[MB/s],MBR[MB/s],MBT[MB/s]\n" + */ + toOmit := pqosInitOutputLinesNumber + + // omit first measurements which are zeroes + if len(r.parsedCores) != 0 { + toOmit = toOmit + len(r.parsedCores) + // specify how many lines should pass before stopping + } else if len(processesPIDsAssociation) != 0 { + toOmit = toOmit + len(processesPIDsAssociation) + } + for omitCounter := 0; omitCounter < toOmit; omitCounter++ { + reader.Scan() + } + for reader.Scan() { + out := reader.Text() + // to handle situation when monitored PID disappear and "err" is shown in output + if strings.Contains(out, "err") { + continue + } + if len(r.Processes) != 0 { + newMetric := processMeasurement{} + + PIDs, err := findPIDsInMeasurement(out) + if err != nil { + r.errorChan <- err + break + } + for processName, PIDsProcess := range processesPIDsAssociation { + if PIDs == PIDsProcess { + newMetric.name = processName + newMetric.measurement = out + } + } + r.Publisher.BufferChanProcess <- newMetric + } else { + r.Publisher.BufferChanCores <- out + } + } +} + +func shutDownPqos(pqos *exec.Cmd) error { + if pqos.Process != nil { + err := pqos.Process.Signal(os.Interrupt) + if err != nil { + err = pqos.Process.Kill() + if err != nil { + return fmt.Errorf("failed to shut down pqos: %v", err) + } + } + } + return nil +} + +func createArgCores(cores []string) string { + allGroupsArg := "--mon-core=" + for _, coreGroup := range cores { + argGroup := createArgsForGroups(strings.Split(coreGroup, ",")) + allGroupsArg = allGroupsArg + argGroup + } + return allGroupsArg +} + +func createArgProcess(processPIDs map[string]string) string { + allPIDsArg := "--mon-pid=" + for _, PIDs := range processPIDs { + argPIDs := createArgsForGroups(strings.Split(PIDs, ",")) + allPIDsArg = allPIDsArg + argPIDs + } + return allPIDsArg +} + +func createArgsForGroups(coresOrPIDs []string) string { + template := "all:[%s];mbt:[%s];" + group := "" + + for _, coreOrPID := range coresOrPIDs { + group = group + coreOrPID + "," + } + if group != "" { + group = strings.TrimSuffix(group, ",") + return fmt.Sprintf(template, group, group) + } + return "" +} + +func validatePqosPath(pqosPath string) error { + if len(pqosPath) == 0 { + return fmt.Errorf("monitoring start error, can not find pqos executable") + } + pathInfo, err := os.Stat(pqosPath) + if os.IsNotExist(err) { + return fmt.Errorf("monitoring start error, provided pqos path not exist") + } + if mode := pathInfo.Mode(); !mode.IsRegular() { + return fmt.Errorf("monitoring start error, provided pqos path does not point to a regular file") + } + return nil +} + +func parseCoresConfig(cores []string) ([]string, error) { + var parsedCores []string + var allCores []int + configError := fmt.Errorf("wrong cores input config data format") + + for _, singleCoreGroup := range cores { + var actualGroupOfCores []int + separatedCores := strings.Split(singleCoreGroup, ",") + + for _, coreStr := range separatedCores { + actualCores, err := validateAndParseCores(coreStr) + if err != nil { + return nil, fmt.Errorf("%v: %v", configError, err) + } + if checkForDuplicates(allCores, actualCores) { + return nil, fmt.Errorf("%v: %v", configError, "core value cannot be duplicated") + } + actualGroupOfCores = append(actualGroupOfCores, actualCores...) + allCores = append(allCores, actualGroupOfCores...) + } + parsedCores = append(parsedCores, arrayToString(actualGroupOfCores)) + } + return parsedCores, nil +} + +func validateAndParseCores(coreStr string) ([]int, error) { + var processedCores []int + if strings.Contains(coreStr, "-") { + rangeValues := strings.Split(coreStr, "-") + + if len(rangeValues) != 2 { + return nil, fmt.Errorf("more than two values in range") + } + + startValue, err := strconv.Atoi(rangeValues[0]) + if err != nil { + return nil, err + } + stopValue, err := strconv.Atoi(rangeValues[1]) + if err != nil { + return nil, err + } + + if startValue > stopValue { + return nil, fmt.Errorf("first value cannot be higher than second") + } + + rangeOfCores := makeRange(startValue, stopValue) + processedCores = append(processedCores, rangeOfCores...) + } else { + newCore, err := strconv.Atoi(coreStr) + if err != nil { + return nil, err + } + processedCores = append(processedCores, newCore) + } + return processedCores, nil +} + +func findPIDsInMeasurement(measurements string) (string, error) { + // to distinguish PIDs from Cores (PIDs should be in quotes) + var insideQuoteRegex = regexp.MustCompile(`"(.*?)"`) + PIDsMatch := insideQuoteRegex.FindStringSubmatch(measurements) + if len(PIDsMatch) < 2 { + return "", fmt.Errorf("cannot find PIDs in measurement line") + } + PIDs := PIDsMatch[1] + return PIDs, nil +} + +func splitCSVLineIntoValues(line string) (timeValue string, metricsValues, coreOrPIDsValues []string, err error) { + values, err := splitMeasurementLine(line) + if err != nil { + return "", nil, nil, err + } + + timeValue = values[0] + // Because pqos csv format is broken when many cores are involved in PID or + // group of PIDs, there is need to work around it. E.g.: + // Time,PID,Core,IPC,LLC Misses,LLC[KB],MBL[MB/s],MBR[MB/s],MBT[MB/s] + // 2020-08-12 13:34:36,"45417,29170,",37,44,0.00,0,0.0,0.0,0.0,0.0 + metricsValues = values[len(values)-numberOfMetrics:] + coreOrPIDsValues = values[1 : len(values)-numberOfMetrics] + + return timeValue, metricsValues, coreOrPIDsValues, nil +} + +func validateInterval(interval int32) error { + if interval < 0 { + return fmt.Errorf("interval cannot be lower than 0") + } + return nil +} + +func splitMeasurementLine(line string) ([]string, error) { + values := strings.Split(line, ",") + if len(values) < 8 { + return nil, fmt.Errorf(fmt.Sprintf("not valid line format from pqos: %s", values)) + } + return values, nil +} + +func parseTime(value string) (time.Time, error) { + timestamp, err := time.Parse(timestampFormat, value) + if err != nil { + return time.Time{}, err + } + return timestamp, nil +} + +func parseFloat(value string) (float64, error) { + result, err := strconv.ParseFloat(value, 64) + if err != nil { + return result, err + } + return result, nil +} + +func arrayToString(array []int) string { + result := "" + for _, value := range array { + result = fmt.Sprintf("%s%d,", result, value) + } + return strings.TrimSuffix(result, ",") +} + +func checkForDuplicates(values []int, valuesToCheck []int) bool { + for _, value := range values { + for _, valueToCheck := range valuesToCheck { + if value == valueToCheck { + return true + } + } + } + return false +} + +func makeRange(min, max int) []int { + a := make([]int, max-min+1) + for i := range a { + a[i] = min + i + } + return a +} + +func init() { + inputs.Add("IntelRDT", func() telegraf.Input { + rdt := IntelRDT{} + pathPqos, _ := exec.LookPath("pqos") + if len(pathPqos) > 0 { + rdt.PqosPath = pathPqos + } + return &rdt + }) +} diff --git a/plugins/inputs/intel_rdt/intel_rdt_test.go b/plugins/inputs/intel_rdt/intel_rdt_test.go new file mode 100644 index 000000000..7e8764257 --- /dev/null +++ b/plugins/inputs/intel_rdt/intel_rdt_test.go @@ -0,0 +1,277 @@ +// +build !windows + +package intel_rdt + +import ( + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +type MockProc struct{} + +func (m *MockProc) getAllProcesses() ([]Process, error) { + procs := []Process{ + {Name: "process", PID: 1000}, + {Name: "process2", PID: 1002}, + {Name: "process2", PID: 1003}, + } + return procs, nil +} + +func TestAssociateProcessesWithPIDs(t *testing.T) { + log := testutil.Logger{} + proc := &MockProc{} + rdt := IntelRDT{ + Log: log, + Processor: proc, + } + processes := []string{"process"} + expectedPID := "1000" + result, err := rdt.associateProcessesWithPIDs(processes) + assert.Nil(t, err) + assert.Equal(t, expectedPID, result[processes[0]]) + + processes = []string{"process2"} + expectedPID = "1002,1003" + result, err = rdt.associateProcessesWithPIDs(processes) + assert.Nil(t, err) + assert.Equal(t, expectedPID, result[processes[0]]) + + processes = []string{"process1"} + result, err = rdt.associateProcessesWithPIDs(processes) + assert.Nil(t, err) + assert.Len(t, result, 0) +} + +func TestSplitCSVLineIntoValues(t *testing.T) { + line := "2020-08-12 13:34:36,\"45417,29170\",37,44,0.00,0,0.0,0.0,0.0,0.0" + expectedTimeValue := "2020-08-12 13:34:36" + expectedMetricsValue := []string{"0.00", "0", "0.0", "0.0", "0.0", "0.0"} + expectedCoreOrPidsValue := []string{"\"45417", "29170\"", "37", "44"} + + timeValue, metricsValue, coreOrPidsValue, err := splitCSVLineIntoValues(line) + assert.Nil(t, err) + assert.Equal(t, expectedTimeValue, timeValue) + assert.Equal(t, expectedMetricsValue, metricsValue) + assert.Equal(t, expectedCoreOrPidsValue, coreOrPidsValue) + + wrongLine := "2020-08-12 13:34:36,37,44,0.00,0,0.0" + timeValue, metricsValue, coreOrPidsValue, err = splitCSVLineIntoValues(wrongLine) + assert.NotNil(t, err) + assert.Equal(t, "", timeValue) + assert.Nil(t, nil, metricsValue) + assert.Nil(t, nil, coreOrPidsValue) +} + +func TestFindPIDsInMeasurement(t *testing.T) { + line := "2020-08-12 13:34:36,\"45417,29170\"" + expected := "45417,29170" + result, err := findPIDsInMeasurement(line) + assert.Nil(t, err) + assert.Equal(t, expected, result) + + line = "pids not included" + result, err = findPIDsInMeasurement(line) + assert.NotNil(t, err) + assert.Equal(t, "", result) +} + +func TestCreateArgsProcesses(t *testing.T) { + processesPIDs := map[string]string{ + "process": "12345, 99999", + } + expected := "--mon-pid=all:[12345, 99999];mbt:[12345, 99999];" + result := createArgProcess(processesPIDs) + assert.EqualValues(t, expected, result) + + processesPIDs = map[string]string{ + "process": "12345, 99999", + "process2": "44444, 11111", + } + expectedPrefix := "--mon-pid=" + expectedSubstring := "all:[12345, 99999];mbt:[12345, 99999];" + expectedSubstring2 := "all:[44444, 11111];mbt:[44444, 11111];" + result = createArgProcess(processesPIDs) + assert.Contains(t, result, expectedPrefix) + assert.Contains(t, result, expectedSubstring) + assert.Contains(t, result, expectedSubstring2) +} + +func TestCreateArgsCores(t *testing.T) { + cores := []string{"1,2,3"} + expected := "--mon-core=all:[1,2,3];mbt:[1,2,3];" + result := createArgCores(cores) + assert.EqualValues(t, expected, result) + + cores = []string{"1,2,3", "4,5,6"} + expected = "--mon-core=" + expectedPrefix := "--mon-core=" + expectedSubstring := "all:[1,2,3];mbt:[1,2,3];" + expectedSubstring2 := "all:[4,5,6];mbt:[4,5,6];" + result = createArgCores(cores) + assert.Contains(t, result, expectedPrefix) + assert.Contains(t, result, expectedSubstring) + assert.Contains(t, result, expectedSubstring2) +} + +func TestParseCoresConfig(t *testing.T) { + t.Run("empty slice", func(t *testing.T) { + var configCores []string + result, err := parseCoresConfig(configCores) + assert.Nil(t, err) + assert.Nil(t, result) + }) + + t.Run("empty string in slice", func(t *testing.T) { + configCores := []string{""} + result, err := parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + }) + + t.Run("not correct string", func(t *testing.T) { + configCores := []string{"wrong string"} + result, err := parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + }) + + t.Run("not correct string", func(t *testing.T) { + configCores := []string{"1,2", "wasd:#$!;"} + result, err := parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + }) + + t.Run("not correct string", func(t *testing.T) { + configCores := []string{"1,2,2"} + result, err := parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + }) + + t.Run("coma separated cores - positive", func(t *testing.T) { + configCores := []string{"0,1,2,3,4,5"} + expected := []string{"0,1,2,3,4,5"} + result, err := parseCoresConfig(configCores) + assert.Nil(t, err) + assert.EqualValues(t, expected, result) + + configCores = []string{"0,1,2", "3,4,5"} + expected = []string{"0,1,2", "3,4,5"} + result, err = parseCoresConfig(configCores) + assert.Nil(t, err) + assert.EqualValues(t, expected, result) + + configCores = []string{"0,4,1", "2,3,5", "9"} + expected = []string{"0,4,1", "2,3,5", "9"} + result, err = parseCoresConfig(configCores) + assert.Nil(t, err) + assert.EqualValues(t, expected, result) + }) + + t.Run("coma separated cores - negative", func(t *testing.T) { + // cannot monitor same cores in different groups + configCores := []string{"0,1,2", "2"} + result, err := parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + + configCores = []string{"0,1,2", "2,3,4"} + result, err = parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + + configCores = []string{"0,-1,2", "2,3,4"} + result, err = parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + }) + + t.Run("dash separated cores - positive", func(t *testing.T) { + configCores := []string{"0-5"} + expected := []string{"0,1,2,3,4,5"} + result, err := parseCoresConfig(configCores) + assert.Nil(t, err) + assert.EqualValues(t, expected, result) + + configCores = []string{"0-5", "7-10"} + expected = []string{"0,1,2,3,4,5", "7,8,9,10"} + result, err = parseCoresConfig(configCores) + assert.Nil(t, err) + assert.EqualValues(t, expected, result) + + configCores = []string{"5-5"} + expected = []string{"5"} + result, err = parseCoresConfig(configCores) + assert.Nil(t, err) + assert.EqualValues(t, expected, result) + }) + + t.Run("dash separated cores - negative", func(t *testing.T) { + // cannot monitor same cores in different groups + configCores := []string{"0-5", "2-7"} + result, err := parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + + // more than two values in range + configCores = []string{"0-5-10"} + result, err = parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + + // first value cannot be higher than second + configCores = []string{"12-5"} + result, err = parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + + configCores = []string{"0-"} + result, err = parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + }) + + t.Run("mixed separator - positive", func(t *testing.T) { + configCores := []string{"0-5,6,7"} + expected := []string{"0,1,2,3,4,5,6,7"} + result, err := parseCoresConfig(configCores) + assert.Nil(t, err) + assert.EqualValues(t, expected, result) + + configCores = []string{"0-5,6,7", "8,9,10"} + expected = []string{"0,1,2,3,4,5,6,7", "8,9,10"} + result, err = parseCoresConfig(configCores) + assert.Nil(t, err) + assert.EqualValues(t, expected, result) + + configCores = []string{"0-7", "8-10"} + expected = []string{"0,1,2,3,4,5,6,7", "8,9,10"} + result, err = parseCoresConfig(configCores) + assert.Nil(t, err) + assert.EqualValues(t, expected, result) + }) + + t.Run("mixed separator - negative", func(t *testing.T) { + // cannot monitor same cores in different groups + configCores := []string{"0-5,", "2-7"} + result, err := parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + + // cores cannot be duplicated + configCores = []string{"0-5,5"} + result, err = parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + + // more than two values in range + configCores = []string{"0-5-6,9"} + result, err = parseCoresConfig(configCores) + assert.NotNil(t, err) + assert.Nil(t, result) + }) +} diff --git a/plugins/inputs/intel_rdt/intel_rdt_windows.go b/plugins/inputs/intel_rdt/intel_rdt_windows.go new file mode 100644 index 000000000..e3ab0978f --- /dev/null +++ b/plugins/inputs/intel_rdt/intel_rdt_windows.go @@ -0,0 +1,3 @@ +// +build windows + +package intel_rdt diff --git a/plugins/inputs/intel_rdt/processes.go b/plugins/inputs/intel_rdt/processes.go new file mode 100644 index 000000000..ff86a4e6b --- /dev/null +++ b/plugins/inputs/intel_rdt/processes.go @@ -0,0 +1,40 @@ +// +build !windows + +package intel_rdt + +import "github.com/prometheus/procfs" + +type ProcessesHandler interface { + getAllProcesses() ([]Process, error) +} + +type Process struct { + Name string + PID int +} + +type ProcessManager struct{} + +func NewProcessor() ProcessesHandler { + return &ProcessManager{} +} + +func (p *ProcessManager) getAllProcesses() ([]Process, error) { + var processes []Process + allProcesses, err := procfs.AllProcs() + if err != nil { + return nil, err + } + for _, proc := range allProcesses { + procComm, err := proc.Comm() + if err != nil { + continue + } + newProcess := Process{ + PID: proc.PID, + Name: procComm, + } + processes = append(processes, newProcess) + } + return processes, nil +} diff --git a/plugins/inputs/intel_rdt/publisher.go b/plugins/inputs/intel_rdt/publisher.go new file mode 100644 index 000000000..5ca989047 --- /dev/null +++ b/plugins/inputs/intel_rdt/publisher.go @@ -0,0 +1,171 @@ +// +build !windows + +package intel_rdt + +import ( + "context" + "strings" + "time" + + "github.com/influxdata/telegraf" +) + +// Publisher for publish new RDT metrics to telegraf accumulator +type Publisher struct { + acc telegraf.Accumulator + Log telegraf.Logger + shortenedMetrics bool + BufferChanProcess chan processMeasurement + BufferChanCores chan string + errChan chan error + stopChan chan bool +} + +func NewPublisher(acc telegraf.Accumulator, log telegraf.Logger, shortenedMetrics bool) Publisher { + return Publisher{ + acc: acc, + Log: log, + shortenedMetrics: shortenedMetrics, + BufferChanProcess: make(chan processMeasurement), + BufferChanCores: make(chan string), + errChan: make(chan error), + } +} + +func (p *Publisher) publish(ctx context.Context) { + go func() { + for { + select { + case newMeasurements := <-p.BufferChanCores: + p.publishCores(newMeasurements) + case newMeasurements := <-p.BufferChanProcess: + p.publishProcess(newMeasurements) + case err := <-p.errChan: + p.Log.Error(err) + case <-ctx.Done(): + return + } + } + }() +} + +func (p *Publisher) publishCores(measurement string) { + coresString, values, timestamp, err := parseCoresMeasurement(measurement) + if err != nil { + p.errChan <- err + } + p.addToAccumulatorCores(coresString, values, timestamp) + return +} + +func (p *Publisher) publishProcess(measurement processMeasurement) { + process, coresString, values, timestamp, err := parseProcessesMeasurement(measurement) + if err != nil { + p.errChan <- err + } + p.addToAccumulatorProcesses(process, coresString, values, timestamp) + return +} + +func parseCoresMeasurement(measurements string) (string, []float64, time.Time, error) { + var values []float64 + timeValue, metricsValues, cores, err := splitCSVLineIntoValues(measurements) + if err != nil { + return "", nil, time.Time{}, err + } + timestamp, err := parseTime(timeValue) + if err != nil { + return "", nil, time.Time{}, err + } + // change string slice to one string and separate it by coma + coresString := strings.Join(cores, ",") + // trim unwanted quotes + coresString = strings.Trim(coresString, "\"") + + for _, metric := range metricsValues { + parsedValue, err := parseFloat(metric) + if err != nil { + return "", nil, time.Time{}, err + } + values = append(values, parsedValue) + } + return coresString, values, timestamp, nil +} + +func (p *Publisher) addToAccumulatorCores(cores string, metricsValues []float64, timestamp time.Time) { + for i, value := range metricsValues { + if p.shortenedMetrics { + //0: "IPC" + //1: "LLC_Misses" + if i == 0 || i == 1 { + continue + } + } + tags := map[string]string{} + fields := make(map[string]interface{}) + + tags["cores"] = cores + tags["name"] = pqosMetricOrder[i] + fields["value"] = value + + p.acc.AddFields("rdt_metric", fields, tags, timestamp) + } +} + +func parseProcessesMeasurement(measurement processMeasurement) (string, string, []float64, time.Time, error) { + var values []float64 + timeValue, metricsValues, coreOrPidsValues, pids, err := parseProcessMeasurement(measurement.measurement) + if err != nil { + return "", "", nil, time.Time{}, err + } + timestamp, err := parseTime(timeValue) + if err != nil { + return "", "", nil, time.Time{}, err + } + actualProcess := measurement.name + lenOfPids := len(strings.Split(pids, ",")) + cores := coreOrPidsValues[lenOfPids:] + coresString := strings.Trim(strings.Join(cores, ","), `"`) + + for _, metric := range metricsValues { + parsedValue, err := parseFloat(metric) + if err != nil { + return "", "", nil, time.Time{}, err + } + values = append(values, parsedValue) + } + return actualProcess, coresString, values, timestamp, nil +} + +func (p *Publisher) addToAccumulatorProcesses(process string, cores string, metricsValues []float64, timestamp time.Time) { + for i, value := range metricsValues { + if p.shortenedMetrics { + //0: "IPC" + //1: "LLC_Misses" + if i == 0 || i == 1 { + continue + } + } + tags := map[string]string{} + fields := make(map[string]interface{}) + + tags["process"] = process + tags["cores"] = cores + tags["name"] = pqosMetricOrder[i] + fields["value"] = value + + p.acc.AddFields("rdt_metric", fields, tags, timestamp) + } +} + +func parseProcessMeasurement(measurements string) (string, []string, []string, string, error) { + timeValue, metricsValues, coreOrPidsValues, err := splitCSVLineIntoValues(measurements) + if err != nil { + return "", nil, nil, "", err + } + pids, err := findPIDsInMeasurement(measurements) + if err != nil { + return "", nil, nil, "", err + } + return timeValue, metricsValues, coreOrPidsValues, pids, nil +} diff --git a/plugins/inputs/intel_rdt/publisher_test.go b/plugins/inputs/intel_rdt/publisher_test.go new file mode 100644 index 000000000..5248ede7a --- /dev/null +++ b/plugins/inputs/intel_rdt/publisher_test.go @@ -0,0 +1,444 @@ +// +build !windows + +package intel_rdt + +import ( + "fmt" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +var metricsValues = map[string]float64{ + "IPC": 0.5, + "LLC_Misses": 61650, + "LLC": 1632, + "MBL": 0.6, + "MBR": 0.9, + "MBT": 1.9, +} + +func TestParseCoresMeasurement(t *testing.T) { + timestamp := "2020-08-12 13:34:36" + cores := "\"37,44\"" + + t.Run("valid measurement string", func(t *testing.T) { + measurement := fmt.Sprintf("%s,%s,%f,%f,%f,%f,%f,%f", + timestamp, + cores, + metricsValues["IPC"], + metricsValues["LLC_Misses"], + metricsValues["LLC"], + metricsValues["MBL"], + metricsValues["MBR"], + metricsValues["MBT"]) + + expectedCores := "37,44" + expectedTimestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC) + + resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement) + + assert.Nil(t, err) + assert.Equal(t, expectedCores, resultCoresString) + assert.Equal(t, expectedTimestamp, resultTimestamp) + assert.Equal(t, resultValues[0], metricsValues["IPC"]) + assert.Equal(t, resultValues[1], metricsValues["LLC_Misses"]) + assert.Equal(t, resultValues[2], metricsValues["LLC"]) + assert.Equal(t, resultValues[3], metricsValues["MBL"]) + assert.Equal(t, resultValues[4], metricsValues["MBR"]) + assert.Equal(t, resultValues[5], metricsValues["MBT"]) + }) + t.Run("not valid measurement string", func(t *testing.T) { + measurement := "not, valid, measurement" + + resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement) + + assert.NotNil(t, err) + assert.Equal(t, "", resultCoresString) + assert.Nil(t, resultValues) + assert.Equal(t, time.Time{}, resultTimestamp) + }) + t.Run("not valid values string", func(t *testing.T) { + measurement := fmt.Sprintf("%s,%s,%s,%s,%f,%f,%f,%f", + timestamp, + cores, + "%d", + "in", + metricsValues["LLC"], + metricsValues["MBL"], + metricsValues["MBR"], + metricsValues["MBT"]) + + resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement) + + assert.NotNil(t, err) + assert.Equal(t, "", resultCoresString) + assert.Nil(t, resultValues) + assert.Equal(t, time.Time{}, resultTimestamp) + }) + t.Run("not valid timestamp format", func(t *testing.T) { + invalidTimestamp := "2020-08-12-21 13:34:" + measurement := fmt.Sprintf("%s,%s,%f,%f,%f,%f,%f,%f", + invalidTimestamp, + cores, + metricsValues["IPC"], + metricsValues["LLC_Misses"], + metricsValues["LLC"], + metricsValues["MBL"], + metricsValues["MBR"], + metricsValues["MBT"]) + + resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement) + + assert.NotNil(t, err) + assert.Equal(t, "", resultCoresString) + assert.Nil(t, resultValues) + assert.Equal(t, time.Time{}, resultTimestamp) + }) +} + +func TestParseProcessesMeasurement(t *testing.T) { + timestamp := "2020-08-12 13:34:36" + cores := "\"37,44\"" + pids := "\"12345,9999\"" + processName := "process_name" + + t.Run("valid measurement string", func(t *testing.T) { + measurement := fmt.Sprintf("%s,%s,%s,%f,%f,%f,%f,%f,%f", + timestamp, + pids, + cores, + metricsValues["IPC"], + metricsValues["LLC_Misses"], + metricsValues["LLC"], + metricsValues["MBL"], + metricsValues["MBR"], + metricsValues["MBT"]) + + expectedCores := "37,44" + expectedTimestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC) + + newMeasurement := processMeasurement{ + name: processName, + measurement: measurement, + } + actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement) + + assert.Nil(t, err) + assert.Equal(t, processName, actualProcess) + assert.Equal(t, expectedCores, resultCoresString) + assert.Equal(t, expectedTimestamp, resultTimestamp) + assert.Equal(t, resultValues[0], metricsValues["IPC"]) + assert.Equal(t, resultValues[1], metricsValues["LLC_Misses"]) + assert.Equal(t, resultValues[2], metricsValues["LLC"]) + assert.Equal(t, resultValues[3], metricsValues["MBL"]) + assert.Equal(t, resultValues[4], metricsValues["MBR"]) + assert.Equal(t, resultValues[5], metricsValues["MBT"]) + }) + t.Run("not valid measurement string", func(t *testing.T) { + processName := "process_name" + measurement := "invalid,measurement,format" + + newMeasurement := processMeasurement{ + name: processName, + measurement: measurement, + } + actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement) + + assert.NotNil(t, err) + assert.Equal(t, "", actualProcess) + assert.Equal(t, "", resultCoresString) + assert.Nil(t, resultValues) + assert.Equal(t, time.Time{}, resultTimestamp) + }) + t.Run("not valid timestamp format", func(t *testing.T) { + invalidTimestamp := "2020-20-20-31" + measurement := fmt.Sprintf("%s,%s,%s,%f,%f,%f,%f,%f,%f", + invalidTimestamp, + pids, + cores, + metricsValues["IPC"], + metricsValues["LLC_Misses"], + metricsValues["LLC"], + metricsValues["MBL"], + metricsValues["MBR"], + metricsValues["MBT"]) + + newMeasurement := processMeasurement{ + name: processName, + measurement: measurement, + } + actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement) + + assert.NotNil(t, err) + assert.Equal(t, "", actualProcess) + assert.Equal(t, "", resultCoresString) + assert.Nil(t, resultValues) + assert.Equal(t, time.Time{}, resultTimestamp) + }) + t.Run("not valid values string", func(t *testing.T) { + measurement := fmt.Sprintf("%s,%s,%s,%s,%s,%f,%f,%f,%f", + timestamp, + pids, + cores, + "1##", + "da", + metricsValues["LLC"], + metricsValues["MBL"], + metricsValues["MBR"], + metricsValues["MBT"]) + + newMeasurement := processMeasurement{ + name: processName, + measurement: measurement, + } + actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement) + + assert.NotNil(t, err) + assert.Equal(t, "", actualProcess) + assert.Equal(t, "", resultCoresString) + assert.Nil(t, resultValues) + assert.Equal(t, time.Time{}, resultTimestamp) + }) +} + +func TestAddToAccumulatorCores(t *testing.T) { + t.Run("shortened false", func(t *testing.T) { + var acc testutil.Accumulator + publisher := Publisher{acc: &acc} + + cores := "1,2,3" + metricsValues := []float64{1, 2, 3, 4, 5, 6} + timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC) + + publisher.addToAccumulatorCores(cores, metricsValues, timestamp) + + for _, test := range testCoreMetrics { + acc.AssertContainsTaggedFields(t, "rdt_metric", test.fields, test.tags) + } + }) + t.Run("shortened true", func(t *testing.T) { + var acc testutil.Accumulator + publisher := Publisher{acc: &acc, shortenedMetrics: true} + + cores := "1,2,3" + metricsValues := []float64{1, 2, 3, 4, 5, 6} + timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC) + + publisher.addToAccumulatorCores(cores, metricsValues, timestamp) + + for _, test := range testCoreMetricsShortened { + acc.AssertDoesNotContainsTaggedFields(t, "rdt_metric", test.fields, test.tags) + } + }) +} + +func TestAddToAccumulatorProcesses(t *testing.T) { + t.Run("shortened false", func(t *testing.T) { + var acc testutil.Accumulator + publisher := Publisher{acc: &acc} + + process := "process_name" + cores := "1,2,3" + metricsValues := []float64{1, 2, 3, 4, 5, 6} + timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC) + + publisher.addToAccumulatorProcesses(process, cores, metricsValues, timestamp) + + for _, test := range testCoreProcesses { + acc.AssertContainsTaggedFields(t, "rdt_metric", test.fields, test.tags) + } + }) + t.Run("shortened true", func(t *testing.T) { + var acc testutil.Accumulator + publisher := Publisher{acc: &acc, shortenedMetrics: true} + + process := "process_name" + cores := "1,2,3" + metricsValues := []float64{1, 2, 3, 4, 5, 6} + timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC) + + publisher.addToAccumulatorProcesses(process, cores, metricsValues, timestamp) + + for _, test := range testCoreProcessesShortened { + acc.AssertDoesNotContainsTaggedFields(t, "rdt_metric", test.fields, test.tags) + } + }) +} + +var ( + testCoreMetrics = []struct { + fields map[string]interface{} + tags map[string]string + }{ + { + map[string]interface{}{ + "value": float64(1), + }, + map[string]string{ + "cores": "1,2,3", + "name": "IPC", + }, + }, + { + map[string]interface{}{ + "value": float64(2), + }, + map[string]string{ + "cores": "1,2,3", + "name": "LLC_Misses", + }, + }, + { + map[string]interface{}{ + "value": float64(3), + }, + map[string]string{ + "cores": "1,2,3", + "name": "LLC", + }, + }, + { + map[string]interface{}{ + "value": float64(4), + }, + map[string]string{ + "cores": "1,2,3", + "name": "MBL", + }, + }, + { + map[string]interface{}{ + "value": float64(5), + }, + map[string]string{ + "cores": "1,2,3", + "name": "MBR", + }, + }, + { + map[string]interface{}{ + "value": float64(6), + }, + map[string]string{ + "cores": "1,2,3", + "name": "MBT", + }, + }, + } + testCoreMetricsShortened = []struct { + fields map[string]interface{} + tags map[string]string + }{ + { + map[string]interface{}{ + "value": float64(1), + }, + map[string]string{ + "cores": "1,2,3", + "name": "IPC", + }, + }, + { + map[string]interface{}{ + "value": float64(2), + }, + map[string]string{ + "cores": "1,2,3", + "name": "LLC_Misses", + }, + }, + } + testCoreProcesses = []struct { + fields map[string]interface{} + tags map[string]string + }{ + { + map[string]interface{}{ + "value": float64(1), + }, + map[string]string{ + "cores": "1,2,3", + "name": "IPC", + "process": "process_name", + }, + }, + { + map[string]interface{}{ + "value": float64(2), + }, + map[string]string{ + "cores": "1,2,3", + "name": "LLC_Misses", + "process": "process_name", + }, + }, + { + map[string]interface{}{ + "value": float64(3), + }, + map[string]string{ + "cores": "1,2,3", + "name": "LLC", + "process": "process_name", + }, + }, + { + map[string]interface{}{ + "value": float64(4), + }, + map[string]string{ + "cores": "1,2,3", + "name": "MBL", + "process": "process_name", + }, + }, + { + map[string]interface{}{ + "value": float64(5), + }, + map[string]string{ + "cores": "1,2,3", + "name": "MBR", + "process": "process_name", + }, + }, + { + map[string]interface{}{ + "value": float64(6), + }, + map[string]string{ + "cores": "1,2,3", + "name": "MBT", + "process": "process_name", + }, + }, + } + testCoreProcessesShortened = []struct { + fields map[string]interface{} + tags map[string]string + }{ + { + map[string]interface{}{ + "value": float64(1), + }, + map[string]string{ + "cores": "1,2,3", + "name": "IPC", + "process": "process_name", + }, + }, + { + map[string]interface{}{ + "value": float64(2), + }, + map[string]string{ + "cores": "1,2,3", + "name": "LLC_Misses", + "process": "process_name", + }, + }, + } +)