New input plugin for Intel RDT (Intel Resource Director Technology) (#8150)
Co-authored-by: Trojan, Kuba <kuba.trojan@intel.com>
This commit is contained in:
parent
c8a412e995
commit
5f02c69da5
|
|
@ -211,6 +211,7 @@ For documentation on the latest development code see the [documentation index][d
|
|||
* [infiniband](./plugins/inputs/infiniband)
|
||||
* [influxdb](./plugins/inputs/influxdb)
|
||||
* [influxdb_listener](./plugins/inputs/influxdb_listener)
|
||||
* [intel_rdt](./plugins/inputs/intel_rdt)
|
||||
* [internal](./plugins/inputs/internal)
|
||||
* [interrupts](./plugins/inputs/interrupts)
|
||||
* [ipmi_sensor](./plugins/inputs/ipmi_sensor)
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -109,6 +109,7 @@ require (
|
|||
github.com/prometheus/client_golang v1.5.1
|
||||
github.com/prometheus/client_model v0.2.0
|
||||
github.com/prometheus/common v0.9.1
|
||||
github.com/prometheus/procfs v0.0.8
|
||||
github.com/safchain/ethtool v0.0.0-20200218184317-f459e2d13664
|
||||
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec // indirect
|
||||
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect
|
||||
|
|
|
|||
|
|
@ -63,6 +63,7 @@ import (
|
|||
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb_listener"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb_v2_listener"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/intel_rdt"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/internal"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/interrupts"
|
||||
_ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,108 @@
|
|||
# Intel RDT Input Plugin
|
||||
The intel_rdt plugin collects information provided by monitoring features of
|
||||
Intel Resource Director Technology (Intel(R) RDT) like Cache Monitoring Technology (CMT),
|
||||
Memory Bandwidth Monitoring (MBM), Cache Allocation Technology (CAT) and Code
|
||||
and Data Prioritization (CDP) Technology provide the hardware framework to monitor
|
||||
and control the utilization of shared resources, like last level cache, memory bandwidth.
|
||||
These Technologies comprise Intel’s Resource Director Technology (RDT).
|
||||
As multithreaded and multicore platform architectures emerge,
|
||||
running workloads in single-threaded, multithreaded, or complex virtual machine environment,
|
||||
the last level cache and memory bandwidth are key resources to manage. Intel introduces CMT,
|
||||
MBM, CAT and CDP to manage these various workloads across shared resources.
|
||||
|
||||
To gather Intel RDT metrics plugin uses _pqos_ cli tool which is a part of [Intel(R) RDT Software Package](https://github.com/intel/intel-cmt-cat).
|
||||
Before using this plugin please be sure _pqos_ is properly installed and configured regarding that the plugin
|
||||
run _pqos_ to work with `OS Interface` mode. This plugin supports _pqos_ version 4.0.0 and above.
|
||||
Be aware pqos tool needs root privileges to work properly.
|
||||
|
||||
Metrics will be constantly reported from the following `pqos` commands within the given interval:
|
||||
|
||||
#### In case of cores monitoring:
|
||||
```
|
||||
pqos -r --iface-os --mon-file-type=csv --mon-interval=INTERVAL --mon-core=all:[CORES]\;mbt:[CORES]
|
||||
```
|
||||
where `CORES` is equal to group of cores provided in config. User can provide many groups.
|
||||
|
||||
#### In case of process monitoring:
|
||||
```
|
||||
pqos -r --iface-os --mon-file-type=csv --mon-interval=INTERVAL --mon-pid=all:[PIDS]\;mbt:[PIDS]
|
||||
```
|
||||
where `PIDS` is group of processes IDs which name are equal to provided process name in a config.
|
||||
User can provide many process names which lead to create many processes groups.
|
||||
|
||||
In both cases `INTERVAL` is equal to sampling_interval from config.
|
||||
|
||||
Because PIDs association within system could change in every moment, Intel RDT plugin provides a
|
||||
functionality to check on every interval if desired processes change their PIDs association.
|
||||
If some change is reported, plugin will restart _pqos_ tool with new arguments. If provided by user
|
||||
process name is not equal to any of available processes, will be omitted and plugin will constantly
|
||||
check for process availability.
|
||||
|
||||
### Useful links
|
||||
Pqos installation process: https://github.com/intel/intel-cmt-cat/blob/master/INSTALL
|
||||
Enabling OS interface: https://github.com/intel/intel-cmt-cat/wiki, https://github.com/intel/intel-cmt-cat/wiki/resctrl
|
||||
More about Intel RDT: https://www.intel.com/content/www/us/en/architecture-and-technology/resource-director-technology.html
|
||||
|
||||
### Configuration
|
||||
```toml
|
||||
# Read Intel RDT metrics
|
||||
[[inputs.IntelRDT]]
|
||||
## Optionally set sampling interval to Nx100ms.
|
||||
## This value is propagated to pqos tool. Interval format is defined by pqos itself.
|
||||
## If not provided or provided 0, will be set to 10 = 10x100ms = 1s.
|
||||
# sampling_interval = "10"
|
||||
|
||||
## Optionally specify the path to pqos executable.
|
||||
## If not provided, auto discovery will be performed.
|
||||
# pqos_path = "/usr/local/bin/pqos"
|
||||
|
||||
## Optionally specify if IPC and LLC_Misses metrics shouldn't be propagated.
|
||||
## If not provided, default value is false.
|
||||
# shortened_metrics = false
|
||||
|
||||
## Specify the list of groups of CPU core(s) to be provided as pqos input.
|
||||
## Mandatory if processes aren't set and forbidden if processes are specified.
|
||||
## e.g. ["0-3", "4,5,6"] or ["1-3,4"]
|
||||
# cores = ["0-3"]
|
||||
|
||||
## Specify the list of processes for which Metrics will be collected.
|
||||
## Mandatory if cores aren't set and forbidden if cores are specified.
|
||||
## e.g. ["qemu", "pmd"]
|
||||
# processes = ["process"]
|
||||
```
|
||||
|
||||
### Exposed metrics
|
||||
| Name | Full name | Description |
|
||||
|---------------|-----------------------------------------------|-------------|
|
||||
| MBL | Memory Bandwidth on Local NUMA Node | Memory bandwidth utilization by the relevant CPU core/process on the local NUMA memory channel |
|
||||
| MBR | Memory Bandwidth on Remote NUMA Node | Memory bandwidth utilization by the relevant CPU core/process on the remote NUMA memory channel |
|
||||
| MBT | Total Memory Bandwidth | Total memory bandwidth utilized by a CPU core/process on local and remote NUMA memory channels |
|
||||
| LLC | L3 Cache Occupancy | Total Last Level Cache occupancy by a CPU core/process |
|
||||
| *LLC_Misses | L3 Cache Misses | Total Last Level Cache misses by a CPU core/process |
|
||||
| *IPC | Instructions Per Cycle | Total instructions per cycle executed by a CPU core/process |
|
||||
|
||||
*optional
|
||||
|
||||
### Troubleshooting
|
||||
Pointing to non-existing core will lead to throwing an error by _pqos_ and plugin will not work properly.
|
||||
Be sure to check if provided core number exists within desired system.
|
||||
|
||||
Be aware reading Intel RDT metrics by _pqos_ cannot be done simultaneously on the same resource.
|
||||
So be sure to not use any other _pqos_ instance which is monitoring the same cores or PIDs within working system.
|
||||
Also there is no possibility to monitor same cores or PIDs on different groups.
|
||||
|
||||
Pids association for the given process could be manually checked by `pidof` command. E.g:
|
||||
```
|
||||
pidof PROCESS
|
||||
```
|
||||
where `PROCESS` is process name.
|
||||
|
||||
### Example Output
|
||||
```
|
||||
> rdt_metric,cores=12\,19,host=r2-compute-20,name=IPC,process=top value=0 1598962030000000000
|
||||
> rdt_metric,cores=12\,19,host=r2-compute-20,name=LLC_Misses,process=top value=0 1598962030000000000
|
||||
> rdt_metric,cores=12\,19,host=r2-compute-20,name=LLC,process=top value=0 1598962030000000000
|
||||
> rdt_metric,cores=12\,19,host=r2-compute-20,name=MBL,process=top value=0 1598962030000000000
|
||||
> rdt_metric,cores=12\,19,host=r2-compute-20,name=MBR,process=top value=0 1598962030000000000
|
||||
> rdt_metric,cores=12\,19,host=r2-compute-20,name=MBT,process=top value=0 1598962030000000000
|
||||
```
|
||||
|
|
@ -0,0 +1,552 @@
|
|||
// +build !windows
|
||||
|
||||
package intel_rdt
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal/choice"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
)
|
||||
|
||||
const (
|
||||
timestampFormat = "2006-01-02 15:04:05"
|
||||
defaultSamplingInterval = 10
|
||||
pqosInitOutputLinesNumber = 4
|
||||
numberOfMetrics = 6
|
||||
secondsDenominator = 10
|
||||
)
|
||||
|
||||
var pqosMetricOrder = map[int]string{
|
||||
0: "IPC", // Instructions Per Cycle
|
||||
1: "LLC_Misses", // Cache Misses
|
||||
2: "LLC", // L3 Cache Occupancy
|
||||
3: "MBL", // Memory Bandwidth on Local NUMA Node
|
||||
4: "MBR", // Memory Bandwidth on Remote NUMA Node
|
||||
5: "MBT", // Total Memory Bandwidth
|
||||
}
|
||||
|
||||
type IntelRDT struct {
|
||||
PqosPath string `toml:"pqos_path"`
|
||||
Cores []string `toml:"cores"`
|
||||
Processes []string `toml:"processes"`
|
||||
SamplingInterval int32 `toml:"sampling_interval"`
|
||||
ShortenedMetrics bool `toml:"shortened_metrics"`
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
Publisher Publisher `toml:"-"`
|
||||
Processor ProcessesHandler `toml:"-"`
|
||||
stopPQOSChan chan bool
|
||||
quitChan chan struct{}
|
||||
errorChan chan error
|
||||
parsedCores []string
|
||||
processesPIDsMap map[string]string
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type processMeasurement struct {
|
||||
name string
|
||||
measurement string
|
||||
}
|
||||
|
||||
// All gathering is done in the Start function
|
||||
func (r *IntelRDT) Gather(_ telegraf.Accumulator) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *IntelRDT) Description() string {
|
||||
return "Intel Resource Director Technology plugin"
|
||||
}
|
||||
|
||||
func (r *IntelRDT) SampleConfig() string {
|
||||
return `
|
||||
## Optionally set sampling interval to Nx100ms.
|
||||
## This value is propagated to pqos tool. Interval format is defined by pqos itself.
|
||||
## If not provided or provided 0, will be set to 10 = 10x100ms = 1s.
|
||||
# sampling_interval = "10"
|
||||
|
||||
## Optionally specify the path to pqos executable.
|
||||
## If not provided, auto discovery will be performed.
|
||||
# pqos_path = "/usr/local/bin/pqos"
|
||||
|
||||
## Optionally specify if IPC and LLC_Misses metrics shouldn't be propagated.
|
||||
## If not provided, default value is false.
|
||||
# shortened_metrics = false
|
||||
|
||||
## Specify the list of groups of CPU core(s) to be provided as pqos input.
|
||||
## Mandatory if processes aren't set and forbidden if processes are specified.
|
||||
## e.g. ["0-3", "4,5,6"] or ["1-3,4"]
|
||||
# cores = ["0-3"]
|
||||
|
||||
## Specify the list of processes for which Metrics will be collected.
|
||||
## Mandatory if cores aren't set and forbidden if cores are specified.
|
||||
## e.g. ["qemu", "pmd"]
|
||||
# processes = ["process"]
|
||||
`
|
||||
}
|
||||
|
||||
func (r *IntelRDT) Start(acc telegraf.Accumulator) error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
r.cancel = cancel
|
||||
|
||||
r.Processor = NewProcessor()
|
||||
r.Publisher = NewPublisher(acc, r.Log, r.ShortenedMetrics)
|
||||
|
||||
err := r.Initialize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.Publisher.publish(ctx)
|
||||
go r.errorHandler(ctx)
|
||||
go r.scheduler(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *IntelRDT) Initialize() error {
|
||||
r.stopPQOSChan = make(chan bool)
|
||||
r.quitChan = make(chan struct{})
|
||||
r.errorChan = make(chan error)
|
||||
|
||||
err := validatePqosPath(r.PqosPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(r.Cores) != 0 && len(r.Processes) != 0 {
|
||||
return fmt.Errorf("monitoring start error, process and core tracking can not be done simultaneously")
|
||||
}
|
||||
if len(r.Cores) == 0 && len(r.Processes) == 0 {
|
||||
return fmt.Errorf("monitoring start error, at least one of cores or processes must be provided in config")
|
||||
}
|
||||
if r.SamplingInterval == 0 {
|
||||
r.SamplingInterval = defaultSamplingInterval
|
||||
}
|
||||
if err = validateInterval(r.SamplingInterval); err != nil {
|
||||
return err
|
||||
}
|
||||
r.parsedCores, err = parseCoresConfig(r.Cores)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.processesPIDsMap, err = r.associateProcessesWithPIDs(r.Processes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *IntelRDT) errorHandler(ctx context.Context) {
|
||||
r.wg.Add(1)
|
||||
defer r.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case err := <-r.errorChan:
|
||||
if err != nil {
|
||||
r.Log.Error(fmt.Sprintf("Error: %v", err))
|
||||
r.quitChan <- struct{}{}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *IntelRDT) scheduler(ctx context.Context) {
|
||||
r.wg.Add(1)
|
||||
defer r.wg.Done()
|
||||
interval := time.Duration(r.SamplingInterval)
|
||||
ticker := time.NewTicker(interval * time.Second / secondsDenominator)
|
||||
|
||||
r.createArgsAndStartPQOS(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if len(r.Processes) != 0 {
|
||||
err := r.checkPIDsAssociation(ctx)
|
||||
if err != nil {
|
||||
r.errorChan <- err
|
||||
}
|
||||
}
|
||||
case <-r.quitChan:
|
||||
r.cancel()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *IntelRDT) Stop() {
|
||||
r.cancel()
|
||||
r.wg.Wait()
|
||||
}
|
||||
|
||||
func (r *IntelRDT) checkPIDsAssociation(ctx context.Context) error {
|
||||
newProcessesPIDsMap, err := r.associateProcessesWithPIDs(r.Processes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// change in PIDs association appears
|
||||
if !cmp.Equal(newProcessesPIDsMap, r.processesPIDsMap) {
|
||||
r.Log.Warnf("PIDs association has changed. Refreshing...")
|
||||
if len(r.processesPIDsMap) != 0 {
|
||||
r.stopPQOSChan <- true
|
||||
}
|
||||
r.processesPIDsMap = newProcessesPIDsMap
|
||||
r.createArgsAndStartPQOS(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *IntelRDT) associateProcessesWithPIDs(providedProcesses []string) (map[string]string, error) {
|
||||
mapProcessPIDs := map[string]string{}
|
||||
|
||||
availableProcesses, err := r.Processor.getAllProcesses()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot gather information of all available processes")
|
||||
}
|
||||
for _, availableProcess := range availableProcesses {
|
||||
if choice.Contains(availableProcess.Name, providedProcesses) {
|
||||
PID := availableProcess.PID
|
||||
mapProcessPIDs[availableProcess.Name] = mapProcessPIDs[availableProcess.Name] + fmt.Sprintf("%d", PID) + ","
|
||||
}
|
||||
}
|
||||
for key := range mapProcessPIDs {
|
||||
mapProcessPIDs[key] = strings.TrimSuffix(mapProcessPIDs[key], ",")
|
||||
}
|
||||
return mapProcessPIDs, nil
|
||||
}
|
||||
|
||||
func (r *IntelRDT) createArgsAndStartPQOS(ctx context.Context) {
|
||||
args := []string{"-r", "--iface-os", "--mon-file-type=csv", fmt.Sprintf("--mon-interval=%d", r.SamplingInterval)}
|
||||
|
||||
if len(r.parsedCores) != 0 {
|
||||
coresArg := createArgCores(r.parsedCores)
|
||||
args = append(args, coresArg)
|
||||
go r.readData(args, nil, ctx)
|
||||
|
||||
} else if len(r.processesPIDsMap) != 0 {
|
||||
processArg := createArgProcess(r.processesPIDsMap)
|
||||
args = append(args, processArg)
|
||||
go r.readData(args, r.processesPIDsMap, ctx)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (r *IntelRDT) readData(args []string, processesPIDsAssociation map[string]string, ctx context.Context) {
|
||||
r.wg.Add(1)
|
||||
defer r.wg.Done()
|
||||
|
||||
cmd := exec.Command(r.PqosPath, append(args)...)
|
||||
|
||||
cmdReader, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
r.errorChan <- err
|
||||
}
|
||||
go r.processOutput(cmdReader, processesPIDsAssociation)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-r.stopPQOSChan:
|
||||
if err := shutDownPqos(cmd); err != nil {
|
||||
r.Log.Error(err)
|
||||
}
|
||||
return
|
||||
case <-ctx.Done():
|
||||
if err := shutDownPqos(cmd); err != nil {
|
||||
r.Log.Error(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
r.errorChan <- fmt.Errorf("pqos: %v", err)
|
||||
return
|
||||
}
|
||||
err = cmd.Wait()
|
||||
if err != nil {
|
||||
r.errorChan <- fmt.Errorf("pqos: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *IntelRDT) processOutput(cmdReader io.ReadCloser, processesPIDsAssociation map[string]string) {
|
||||
reader := bufio.NewScanner(cmdReader)
|
||||
/*
|
||||
Omit constant, first 4 lines :
|
||||
"NOTE: Mixed use of MSR and kernel interfaces to manage
|
||||
CAT or CMT & MBM may lead to unexpected behavior.\n"
|
||||
CMT/MBM reset successful
|
||||
"Time,Core,IPC,LLC Misses,LLC[KB],MBL[MB/s],MBR[MB/s],MBT[MB/s]\n"
|
||||
*/
|
||||
toOmit := pqosInitOutputLinesNumber
|
||||
|
||||
// omit first measurements which are zeroes
|
||||
if len(r.parsedCores) != 0 {
|
||||
toOmit = toOmit + len(r.parsedCores)
|
||||
// specify how many lines should pass before stopping
|
||||
} else if len(processesPIDsAssociation) != 0 {
|
||||
toOmit = toOmit + len(processesPIDsAssociation)
|
||||
}
|
||||
for omitCounter := 0; omitCounter < toOmit; omitCounter++ {
|
||||
reader.Scan()
|
||||
}
|
||||
for reader.Scan() {
|
||||
out := reader.Text()
|
||||
// to handle situation when monitored PID disappear and "err" is shown in output
|
||||
if strings.Contains(out, "err") {
|
||||
continue
|
||||
}
|
||||
if len(r.Processes) != 0 {
|
||||
newMetric := processMeasurement{}
|
||||
|
||||
PIDs, err := findPIDsInMeasurement(out)
|
||||
if err != nil {
|
||||
r.errorChan <- err
|
||||
break
|
||||
}
|
||||
for processName, PIDsProcess := range processesPIDsAssociation {
|
||||
if PIDs == PIDsProcess {
|
||||
newMetric.name = processName
|
||||
newMetric.measurement = out
|
||||
}
|
||||
}
|
||||
r.Publisher.BufferChanProcess <- newMetric
|
||||
} else {
|
||||
r.Publisher.BufferChanCores <- out
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func shutDownPqos(pqos *exec.Cmd) error {
|
||||
if pqos.Process != nil {
|
||||
err := pqos.Process.Signal(os.Interrupt)
|
||||
if err != nil {
|
||||
err = pqos.Process.Kill()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to shut down pqos: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func createArgCores(cores []string) string {
|
||||
allGroupsArg := "--mon-core="
|
||||
for _, coreGroup := range cores {
|
||||
argGroup := createArgsForGroups(strings.Split(coreGroup, ","))
|
||||
allGroupsArg = allGroupsArg + argGroup
|
||||
}
|
||||
return allGroupsArg
|
||||
}
|
||||
|
||||
func createArgProcess(processPIDs map[string]string) string {
|
||||
allPIDsArg := "--mon-pid="
|
||||
for _, PIDs := range processPIDs {
|
||||
argPIDs := createArgsForGroups(strings.Split(PIDs, ","))
|
||||
allPIDsArg = allPIDsArg + argPIDs
|
||||
}
|
||||
return allPIDsArg
|
||||
}
|
||||
|
||||
func createArgsForGroups(coresOrPIDs []string) string {
|
||||
template := "all:[%s];mbt:[%s];"
|
||||
group := ""
|
||||
|
||||
for _, coreOrPID := range coresOrPIDs {
|
||||
group = group + coreOrPID + ","
|
||||
}
|
||||
if group != "" {
|
||||
group = strings.TrimSuffix(group, ",")
|
||||
return fmt.Sprintf(template, group, group)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func validatePqosPath(pqosPath string) error {
|
||||
if len(pqosPath) == 0 {
|
||||
return fmt.Errorf("monitoring start error, can not find pqos executable")
|
||||
}
|
||||
pathInfo, err := os.Stat(pqosPath)
|
||||
if os.IsNotExist(err) {
|
||||
return fmt.Errorf("monitoring start error, provided pqos path not exist")
|
||||
}
|
||||
if mode := pathInfo.Mode(); !mode.IsRegular() {
|
||||
return fmt.Errorf("monitoring start error, provided pqos path does not point to a regular file")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseCoresConfig(cores []string) ([]string, error) {
|
||||
var parsedCores []string
|
||||
var allCores []int
|
||||
configError := fmt.Errorf("wrong cores input config data format")
|
||||
|
||||
for _, singleCoreGroup := range cores {
|
||||
var actualGroupOfCores []int
|
||||
separatedCores := strings.Split(singleCoreGroup, ",")
|
||||
|
||||
for _, coreStr := range separatedCores {
|
||||
actualCores, err := validateAndParseCores(coreStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v: %v", configError, err)
|
||||
}
|
||||
if checkForDuplicates(allCores, actualCores) {
|
||||
return nil, fmt.Errorf("%v: %v", configError, "core value cannot be duplicated")
|
||||
}
|
||||
actualGroupOfCores = append(actualGroupOfCores, actualCores...)
|
||||
allCores = append(allCores, actualGroupOfCores...)
|
||||
}
|
||||
parsedCores = append(parsedCores, arrayToString(actualGroupOfCores))
|
||||
}
|
||||
return parsedCores, nil
|
||||
}
|
||||
|
||||
func validateAndParseCores(coreStr string) ([]int, error) {
|
||||
var processedCores []int
|
||||
if strings.Contains(coreStr, "-") {
|
||||
rangeValues := strings.Split(coreStr, "-")
|
||||
|
||||
if len(rangeValues) != 2 {
|
||||
return nil, fmt.Errorf("more than two values in range")
|
||||
}
|
||||
|
||||
startValue, err := strconv.Atoi(rangeValues[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stopValue, err := strconv.Atoi(rangeValues[1])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if startValue > stopValue {
|
||||
return nil, fmt.Errorf("first value cannot be higher than second")
|
||||
}
|
||||
|
||||
rangeOfCores := makeRange(startValue, stopValue)
|
||||
processedCores = append(processedCores, rangeOfCores...)
|
||||
} else {
|
||||
newCore, err := strconv.Atoi(coreStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
processedCores = append(processedCores, newCore)
|
||||
}
|
||||
return processedCores, nil
|
||||
}
|
||||
|
||||
func findPIDsInMeasurement(measurements string) (string, error) {
|
||||
// to distinguish PIDs from Cores (PIDs should be in quotes)
|
||||
var insideQuoteRegex = regexp.MustCompile(`"(.*?)"`)
|
||||
PIDsMatch := insideQuoteRegex.FindStringSubmatch(measurements)
|
||||
if len(PIDsMatch) < 2 {
|
||||
return "", fmt.Errorf("cannot find PIDs in measurement line")
|
||||
}
|
||||
PIDs := PIDsMatch[1]
|
||||
return PIDs, nil
|
||||
}
|
||||
|
||||
func splitCSVLineIntoValues(line string) (timeValue string, metricsValues, coreOrPIDsValues []string, err error) {
|
||||
values, err := splitMeasurementLine(line)
|
||||
if err != nil {
|
||||
return "", nil, nil, err
|
||||
}
|
||||
|
||||
timeValue = values[0]
|
||||
// Because pqos csv format is broken when many cores are involved in PID or
|
||||
// group of PIDs, there is need to work around it. E.g.:
|
||||
// Time,PID,Core,IPC,LLC Misses,LLC[KB],MBL[MB/s],MBR[MB/s],MBT[MB/s]
|
||||
// 2020-08-12 13:34:36,"45417,29170,",37,44,0.00,0,0.0,0.0,0.0,0.0
|
||||
metricsValues = values[len(values)-numberOfMetrics:]
|
||||
coreOrPIDsValues = values[1 : len(values)-numberOfMetrics]
|
||||
|
||||
return timeValue, metricsValues, coreOrPIDsValues, nil
|
||||
}
|
||||
|
||||
func validateInterval(interval int32) error {
|
||||
if interval < 0 {
|
||||
return fmt.Errorf("interval cannot be lower than 0")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func splitMeasurementLine(line string) ([]string, error) {
|
||||
values := strings.Split(line, ",")
|
||||
if len(values) < 8 {
|
||||
return nil, fmt.Errorf(fmt.Sprintf("not valid line format from pqos: %s", values))
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
func parseTime(value string) (time.Time, error) {
|
||||
timestamp, err := time.Parse(timestampFormat, value)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
return timestamp, nil
|
||||
}
|
||||
|
||||
func parseFloat(value string) (float64, error) {
|
||||
result, err := strconv.ParseFloat(value, 64)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func arrayToString(array []int) string {
|
||||
result := ""
|
||||
for _, value := range array {
|
||||
result = fmt.Sprintf("%s%d,", result, value)
|
||||
}
|
||||
return strings.TrimSuffix(result, ",")
|
||||
}
|
||||
|
||||
func checkForDuplicates(values []int, valuesToCheck []int) bool {
|
||||
for _, value := range values {
|
||||
for _, valueToCheck := range valuesToCheck {
|
||||
if value == valueToCheck {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func makeRange(min, max int) []int {
|
||||
a := make([]int, max-min+1)
|
||||
for i := range a {
|
||||
a[i] = min + i
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
func init() {
|
||||
inputs.Add("IntelRDT", func() telegraf.Input {
|
||||
rdt := IntelRDT{}
|
||||
pathPqos, _ := exec.LookPath("pqos")
|
||||
if len(pathPqos) > 0 {
|
||||
rdt.PqosPath = pathPqos
|
||||
}
|
||||
return &rdt
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,277 @@
|
|||
// +build !windows
|
||||
|
||||
package intel_rdt
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type MockProc struct{}
|
||||
|
||||
func (m *MockProc) getAllProcesses() ([]Process, error) {
|
||||
procs := []Process{
|
||||
{Name: "process", PID: 1000},
|
||||
{Name: "process2", PID: 1002},
|
||||
{Name: "process2", PID: 1003},
|
||||
}
|
||||
return procs, nil
|
||||
}
|
||||
|
||||
func TestAssociateProcessesWithPIDs(t *testing.T) {
|
||||
log := testutil.Logger{}
|
||||
proc := &MockProc{}
|
||||
rdt := IntelRDT{
|
||||
Log: log,
|
||||
Processor: proc,
|
||||
}
|
||||
processes := []string{"process"}
|
||||
expectedPID := "1000"
|
||||
result, err := rdt.associateProcessesWithPIDs(processes)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expectedPID, result[processes[0]])
|
||||
|
||||
processes = []string{"process2"}
|
||||
expectedPID = "1002,1003"
|
||||
result, err = rdt.associateProcessesWithPIDs(processes)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expectedPID, result[processes[0]])
|
||||
|
||||
processes = []string{"process1"}
|
||||
result, err = rdt.associateProcessesWithPIDs(processes)
|
||||
assert.Nil(t, err)
|
||||
assert.Len(t, result, 0)
|
||||
}
|
||||
|
||||
func TestSplitCSVLineIntoValues(t *testing.T) {
|
||||
line := "2020-08-12 13:34:36,\"45417,29170\",37,44,0.00,0,0.0,0.0,0.0,0.0"
|
||||
expectedTimeValue := "2020-08-12 13:34:36"
|
||||
expectedMetricsValue := []string{"0.00", "0", "0.0", "0.0", "0.0", "0.0"}
|
||||
expectedCoreOrPidsValue := []string{"\"45417", "29170\"", "37", "44"}
|
||||
|
||||
timeValue, metricsValue, coreOrPidsValue, err := splitCSVLineIntoValues(line)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expectedTimeValue, timeValue)
|
||||
assert.Equal(t, expectedMetricsValue, metricsValue)
|
||||
assert.Equal(t, expectedCoreOrPidsValue, coreOrPidsValue)
|
||||
|
||||
wrongLine := "2020-08-12 13:34:36,37,44,0.00,0,0.0"
|
||||
timeValue, metricsValue, coreOrPidsValue, err = splitCSVLineIntoValues(wrongLine)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "", timeValue)
|
||||
assert.Nil(t, nil, metricsValue)
|
||||
assert.Nil(t, nil, coreOrPidsValue)
|
||||
}
|
||||
|
||||
func TestFindPIDsInMeasurement(t *testing.T) {
|
||||
line := "2020-08-12 13:34:36,\"45417,29170\""
|
||||
expected := "45417,29170"
|
||||
result, err := findPIDsInMeasurement(line)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expected, result)
|
||||
|
||||
line = "pids not included"
|
||||
result, err = findPIDsInMeasurement(line)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "", result)
|
||||
}
|
||||
|
||||
func TestCreateArgsProcesses(t *testing.T) {
|
||||
processesPIDs := map[string]string{
|
||||
"process": "12345, 99999",
|
||||
}
|
||||
expected := "--mon-pid=all:[12345, 99999];mbt:[12345, 99999];"
|
||||
result := createArgProcess(processesPIDs)
|
||||
assert.EqualValues(t, expected, result)
|
||||
|
||||
processesPIDs = map[string]string{
|
||||
"process": "12345, 99999",
|
||||
"process2": "44444, 11111",
|
||||
}
|
||||
expectedPrefix := "--mon-pid="
|
||||
expectedSubstring := "all:[12345, 99999];mbt:[12345, 99999];"
|
||||
expectedSubstring2 := "all:[44444, 11111];mbt:[44444, 11111];"
|
||||
result = createArgProcess(processesPIDs)
|
||||
assert.Contains(t, result, expectedPrefix)
|
||||
assert.Contains(t, result, expectedSubstring)
|
||||
assert.Contains(t, result, expectedSubstring2)
|
||||
}
|
||||
|
||||
func TestCreateArgsCores(t *testing.T) {
|
||||
cores := []string{"1,2,3"}
|
||||
expected := "--mon-core=all:[1,2,3];mbt:[1,2,3];"
|
||||
result := createArgCores(cores)
|
||||
assert.EqualValues(t, expected, result)
|
||||
|
||||
cores = []string{"1,2,3", "4,5,6"}
|
||||
expected = "--mon-core="
|
||||
expectedPrefix := "--mon-core="
|
||||
expectedSubstring := "all:[1,2,3];mbt:[1,2,3];"
|
||||
expectedSubstring2 := "all:[4,5,6];mbt:[4,5,6];"
|
||||
result = createArgCores(cores)
|
||||
assert.Contains(t, result, expectedPrefix)
|
||||
assert.Contains(t, result, expectedSubstring)
|
||||
assert.Contains(t, result, expectedSubstring2)
|
||||
}
|
||||
|
||||
func TestParseCoresConfig(t *testing.T) {
|
||||
t.Run("empty slice", func(t *testing.T) {
|
||||
var configCores []string
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.Nil(t, err)
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("empty string in slice", func(t *testing.T) {
|
||||
configCores := []string{""}
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("not correct string", func(t *testing.T) {
|
||||
configCores := []string{"wrong string"}
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("not correct string", func(t *testing.T) {
|
||||
configCores := []string{"1,2", "wasd:#$!;"}
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("not correct string", func(t *testing.T) {
|
||||
configCores := []string{"1,2,2"}
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("coma separated cores - positive", func(t *testing.T) {
|
||||
configCores := []string{"0,1,2,3,4,5"}
|
||||
expected := []string{"0,1,2,3,4,5"}
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, expected, result)
|
||||
|
||||
configCores = []string{"0,1,2", "3,4,5"}
|
||||
expected = []string{"0,1,2", "3,4,5"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, expected, result)
|
||||
|
||||
configCores = []string{"0,4,1", "2,3,5", "9"}
|
||||
expected = []string{"0,4,1", "2,3,5", "9"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, expected, result)
|
||||
})
|
||||
|
||||
t.Run("coma separated cores - negative", func(t *testing.T) {
|
||||
// cannot monitor same cores in different groups
|
||||
configCores := []string{"0,1,2", "2"}
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
|
||||
configCores = []string{"0,1,2", "2,3,4"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
|
||||
configCores = []string{"0,-1,2", "2,3,4"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("dash separated cores - positive", func(t *testing.T) {
|
||||
configCores := []string{"0-5"}
|
||||
expected := []string{"0,1,2,3,4,5"}
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, expected, result)
|
||||
|
||||
configCores = []string{"0-5", "7-10"}
|
||||
expected = []string{"0,1,2,3,4,5", "7,8,9,10"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, expected, result)
|
||||
|
||||
configCores = []string{"5-5"}
|
||||
expected = []string{"5"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, expected, result)
|
||||
})
|
||||
|
||||
t.Run("dash separated cores - negative", func(t *testing.T) {
|
||||
// cannot monitor same cores in different groups
|
||||
configCores := []string{"0-5", "2-7"}
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
|
||||
// more than two values in range
|
||||
configCores = []string{"0-5-10"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
|
||||
// first value cannot be higher than second
|
||||
configCores = []string{"12-5"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
|
||||
configCores = []string{"0-"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
|
||||
t.Run("mixed separator - positive", func(t *testing.T) {
|
||||
configCores := []string{"0-5,6,7"}
|
||||
expected := []string{"0,1,2,3,4,5,6,7"}
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, expected, result)
|
||||
|
||||
configCores = []string{"0-5,6,7", "8,9,10"}
|
||||
expected = []string{"0,1,2,3,4,5,6,7", "8,9,10"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, expected, result)
|
||||
|
||||
configCores = []string{"0-7", "8-10"}
|
||||
expected = []string{"0,1,2,3,4,5,6,7", "8,9,10"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, expected, result)
|
||||
})
|
||||
|
||||
t.Run("mixed separator - negative", func(t *testing.T) {
|
||||
// cannot monitor same cores in different groups
|
||||
configCores := []string{"0-5,", "2-7"}
|
||||
result, err := parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
|
||||
// cores cannot be duplicated
|
||||
configCores = []string{"0-5,5"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
|
||||
// more than two values in range
|
||||
configCores = []string{"0-5-6,9"}
|
||||
result, err = parseCoresConfig(configCores)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, result)
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,3 @@
|
|||
// +build windows
|
||||
|
||||
package intel_rdt
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
// +build !windows
|
||||
|
||||
package intel_rdt
|
||||
|
||||
import "github.com/prometheus/procfs"
|
||||
|
||||
type ProcessesHandler interface {
|
||||
getAllProcesses() ([]Process, error)
|
||||
}
|
||||
|
||||
type Process struct {
|
||||
Name string
|
||||
PID int
|
||||
}
|
||||
|
||||
type ProcessManager struct{}
|
||||
|
||||
func NewProcessor() ProcessesHandler {
|
||||
return &ProcessManager{}
|
||||
}
|
||||
|
||||
func (p *ProcessManager) getAllProcesses() ([]Process, error) {
|
||||
var processes []Process
|
||||
allProcesses, err := procfs.AllProcs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, proc := range allProcesses {
|
||||
procComm, err := proc.Comm()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
newProcess := Process{
|
||||
PID: proc.PID,
|
||||
Name: procComm,
|
||||
}
|
||||
processes = append(processes, newProcess)
|
||||
}
|
||||
return processes, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,171 @@
|
|||
// +build !windows
|
||||
|
||||
package intel_rdt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
)
|
||||
|
||||
// Publisher for publish new RDT metrics to telegraf accumulator
|
||||
type Publisher struct {
|
||||
acc telegraf.Accumulator
|
||||
Log telegraf.Logger
|
||||
shortenedMetrics bool
|
||||
BufferChanProcess chan processMeasurement
|
||||
BufferChanCores chan string
|
||||
errChan chan error
|
||||
stopChan chan bool
|
||||
}
|
||||
|
||||
func NewPublisher(acc telegraf.Accumulator, log telegraf.Logger, shortenedMetrics bool) Publisher {
|
||||
return Publisher{
|
||||
acc: acc,
|
||||
Log: log,
|
||||
shortenedMetrics: shortenedMetrics,
|
||||
BufferChanProcess: make(chan processMeasurement),
|
||||
BufferChanCores: make(chan string),
|
||||
errChan: make(chan error),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Publisher) publish(ctx context.Context) {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case newMeasurements := <-p.BufferChanCores:
|
||||
p.publishCores(newMeasurements)
|
||||
case newMeasurements := <-p.BufferChanProcess:
|
||||
p.publishProcess(newMeasurements)
|
||||
case err := <-p.errChan:
|
||||
p.Log.Error(err)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *Publisher) publishCores(measurement string) {
|
||||
coresString, values, timestamp, err := parseCoresMeasurement(measurement)
|
||||
if err != nil {
|
||||
p.errChan <- err
|
||||
}
|
||||
p.addToAccumulatorCores(coresString, values, timestamp)
|
||||
return
|
||||
}
|
||||
|
||||
func (p *Publisher) publishProcess(measurement processMeasurement) {
|
||||
process, coresString, values, timestamp, err := parseProcessesMeasurement(measurement)
|
||||
if err != nil {
|
||||
p.errChan <- err
|
||||
}
|
||||
p.addToAccumulatorProcesses(process, coresString, values, timestamp)
|
||||
return
|
||||
}
|
||||
|
||||
func parseCoresMeasurement(measurements string) (string, []float64, time.Time, error) {
|
||||
var values []float64
|
||||
timeValue, metricsValues, cores, err := splitCSVLineIntoValues(measurements)
|
||||
if err != nil {
|
||||
return "", nil, time.Time{}, err
|
||||
}
|
||||
timestamp, err := parseTime(timeValue)
|
||||
if err != nil {
|
||||
return "", nil, time.Time{}, err
|
||||
}
|
||||
// change string slice to one string and separate it by coma
|
||||
coresString := strings.Join(cores, ",")
|
||||
// trim unwanted quotes
|
||||
coresString = strings.Trim(coresString, "\"")
|
||||
|
||||
for _, metric := range metricsValues {
|
||||
parsedValue, err := parseFloat(metric)
|
||||
if err != nil {
|
||||
return "", nil, time.Time{}, err
|
||||
}
|
||||
values = append(values, parsedValue)
|
||||
}
|
||||
return coresString, values, timestamp, nil
|
||||
}
|
||||
|
||||
func (p *Publisher) addToAccumulatorCores(cores string, metricsValues []float64, timestamp time.Time) {
|
||||
for i, value := range metricsValues {
|
||||
if p.shortenedMetrics {
|
||||
//0: "IPC"
|
||||
//1: "LLC_Misses"
|
||||
if i == 0 || i == 1 {
|
||||
continue
|
||||
}
|
||||
}
|
||||
tags := map[string]string{}
|
||||
fields := make(map[string]interface{})
|
||||
|
||||
tags["cores"] = cores
|
||||
tags["name"] = pqosMetricOrder[i]
|
||||
fields["value"] = value
|
||||
|
||||
p.acc.AddFields("rdt_metric", fields, tags, timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
func parseProcessesMeasurement(measurement processMeasurement) (string, string, []float64, time.Time, error) {
|
||||
var values []float64
|
||||
timeValue, metricsValues, coreOrPidsValues, pids, err := parseProcessMeasurement(measurement.measurement)
|
||||
if err != nil {
|
||||
return "", "", nil, time.Time{}, err
|
||||
}
|
||||
timestamp, err := parseTime(timeValue)
|
||||
if err != nil {
|
||||
return "", "", nil, time.Time{}, err
|
||||
}
|
||||
actualProcess := measurement.name
|
||||
lenOfPids := len(strings.Split(pids, ","))
|
||||
cores := coreOrPidsValues[lenOfPids:]
|
||||
coresString := strings.Trim(strings.Join(cores, ","), `"`)
|
||||
|
||||
for _, metric := range metricsValues {
|
||||
parsedValue, err := parseFloat(metric)
|
||||
if err != nil {
|
||||
return "", "", nil, time.Time{}, err
|
||||
}
|
||||
values = append(values, parsedValue)
|
||||
}
|
||||
return actualProcess, coresString, values, timestamp, nil
|
||||
}
|
||||
|
||||
func (p *Publisher) addToAccumulatorProcesses(process string, cores string, metricsValues []float64, timestamp time.Time) {
|
||||
for i, value := range metricsValues {
|
||||
if p.shortenedMetrics {
|
||||
//0: "IPC"
|
||||
//1: "LLC_Misses"
|
||||
if i == 0 || i == 1 {
|
||||
continue
|
||||
}
|
||||
}
|
||||
tags := map[string]string{}
|
||||
fields := make(map[string]interface{})
|
||||
|
||||
tags["process"] = process
|
||||
tags["cores"] = cores
|
||||
tags["name"] = pqosMetricOrder[i]
|
||||
fields["value"] = value
|
||||
|
||||
p.acc.AddFields("rdt_metric", fields, tags, timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
func parseProcessMeasurement(measurements string) (string, []string, []string, string, error) {
|
||||
timeValue, metricsValues, coreOrPidsValues, err := splitCSVLineIntoValues(measurements)
|
||||
if err != nil {
|
||||
return "", nil, nil, "", err
|
||||
}
|
||||
pids, err := findPIDsInMeasurement(measurements)
|
||||
if err != nil {
|
||||
return "", nil, nil, "", err
|
||||
}
|
||||
return timeValue, metricsValues, coreOrPidsValues, pids, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,444 @@
|
|||
// +build !windows
|
||||
|
||||
package intel_rdt
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var metricsValues = map[string]float64{
|
||||
"IPC": 0.5,
|
||||
"LLC_Misses": 61650,
|
||||
"LLC": 1632,
|
||||
"MBL": 0.6,
|
||||
"MBR": 0.9,
|
||||
"MBT": 1.9,
|
||||
}
|
||||
|
||||
func TestParseCoresMeasurement(t *testing.T) {
|
||||
timestamp := "2020-08-12 13:34:36"
|
||||
cores := "\"37,44\""
|
||||
|
||||
t.Run("valid measurement string", func(t *testing.T) {
|
||||
measurement := fmt.Sprintf("%s,%s,%f,%f,%f,%f,%f,%f",
|
||||
timestamp,
|
||||
cores,
|
||||
metricsValues["IPC"],
|
||||
metricsValues["LLC_Misses"],
|
||||
metricsValues["LLC"],
|
||||
metricsValues["MBL"],
|
||||
metricsValues["MBR"],
|
||||
metricsValues["MBT"])
|
||||
|
||||
expectedCores := "37,44"
|
||||
expectedTimestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
|
||||
|
||||
resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement)
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, expectedCores, resultCoresString)
|
||||
assert.Equal(t, expectedTimestamp, resultTimestamp)
|
||||
assert.Equal(t, resultValues[0], metricsValues["IPC"])
|
||||
assert.Equal(t, resultValues[1], metricsValues["LLC_Misses"])
|
||||
assert.Equal(t, resultValues[2], metricsValues["LLC"])
|
||||
assert.Equal(t, resultValues[3], metricsValues["MBL"])
|
||||
assert.Equal(t, resultValues[4], metricsValues["MBR"])
|
||||
assert.Equal(t, resultValues[5], metricsValues["MBT"])
|
||||
})
|
||||
t.Run("not valid measurement string", func(t *testing.T) {
|
||||
measurement := "not, valid, measurement"
|
||||
|
||||
resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement)
|
||||
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "", resultCoresString)
|
||||
assert.Nil(t, resultValues)
|
||||
assert.Equal(t, time.Time{}, resultTimestamp)
|
||||
})
|
||||
t.Run("not valid values string", func(t *testing.T) {
|
||||
measurement := fmt.Sprintf("%s,%s,%s,%s,%f,%f,%f,%f",
|
||||
timestamp,
|
||||
cores,
|
||||
"%d",
|
||||
"in",
|
||||
metricsValues["LLC"],
|
||||
metricsValues["MBL"],
|
||||
metricsValues["MBR"],
|
||||
metricsValues["MBT"])
|
||||
|
||||
resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement)
|
||||
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "", resultCoresString)
|
||||
assert.Nil(t, resultValues)
|
||||
assert.Equal(t, time.Time{}, resultTimestamp)
|
||||
})
|
||||
t.Run("not valid timestamp format", func(t *testing.T) {
|
||||
invalidTimestamp := "2020-08-12-21 13:34:"
|
||||
measurement := fmt.Sprintf("%s,%s,%f,%f,%f,%f,%f,%f",
|
||||
invalidTimestamp,
|
||||
cores,
|
||||
metricsValues["IPC"],
|
||||
metricsValues["LLC_Misses"],
|
||||
metricsValues["LLC"],
|
||||
metricsValues["MBL"],
|
||||
metricsValues["MBR"],
|
||||
metricsValues["MBT"])
|
||||
|
||||
resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement)
|
||||
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "", resultCoresString)
|
||||
assert.Nil(t, resultValues)
|
||||
assert.Equal(t, time.Time{}, resultTimestamp)
|
||||
})
|
||||
}
|
||||
|
||||
func TestParseProcessesMeasurement(t *testing.T) {
|
||||
timestamp := "2020-08-12 13:34:36"
|
||||
cores := "\"37,44\""
|
||||
pids := "\"12345,9999\""
|
||||
processName := "process_name"
|
||||
|
||||
t.Run("valid measurement string", func(t *testing.T) {
|
||||
measurement := fmt.Sprintf("%s,%s,%s,%f,%f,%f,%f,%f,%f",
|
||||
timestamp,
|
||||
pids,
|
||||
cores,
|
||||
metricsValues["IPC"],
|
||||
metricsValues["LLC_Misses"],
|
||||
metricsValues["LLC"],
|
||||
metricsValues["MBL"],
|
||||
metricsValues["MBR"],
|
||||
metricsValues["MBT"])
|
||||
|
||||
expectedCores := "37,44"
|
||||
expectedTimestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
|
||||
|
||||
newMeasurement := processMeasurement{
|
||||
name: processName,
|
||||
measurement: measurement,
|
||||
}
|
||||
actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement)
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, processName, actualProcess)
|
||||
assert.Equal(t, expectedCores, resultCoresString)
|
||||
assert.Equal(t, expectedTimestamp, resultTimestamp)
|
||||
assert.Equal(t, resultValues[0], metricsValues["IPC"])
|
||||
assert.Equal(t, resultValues[1], metricsValues["LLC_Misses"])
|
||||
assert.Equal(t, resultValues[2], metricsValues["LLC"])
|
||||
assert.Equal(t, resultValues[3], metricsValues["MBL"])
|
||||
assert.Equal(t, resultValues[4], metricsValues["MBR"])
|
||||
assert.Equal(t, resultValues[5], metricsValues["MBT"])
|
||||
})
|
||||
t.Run("not valid measurement string", func(t *testing.T) {
|
||||
processName := "process_name"
|
||||
measurement := "invalid,measurement,format"
|
||||
|
||||
newMeasurement := processMeasurement{
|
||||
name: processName,
|
||||
measurement: measurement,
|
||||
}
|
||||
actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement)
|
||||
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "", actualProcess)
|
||||
assert.Equal(t, "", resultCoresString)
|
||||
assert.Nil(t, resultValues)
|
||||
assert.Equal(t, time.Time{}, resultTimestamp)
|
||||
})
|
||||
t.Run("not valid timestamp format", func(t *testing.T) {
|
||||
invalidTimestamp := "2020-20-20-31"
|
||||
measurement := fmt.Sprintf("%s,%s,%s,%f,%f,%f,%f,%f,%f",
|
||||
invalidTimestamp,
|
||||
pids,
|
||||
cores,
|
||||
metricsValues["IPC"],
|
||||
metricsValues["LLC_Misses"],
|
||||
metricsValues["LLC"],
|
||||
metricsValues["MBL"],
|
||||
metricsValues["MBR"],
|
||||
metricsValues["MBT"])
|
||||
|
||||
newMeasurement := processMeasurement{
|
||||
name: processName,
|
||||
measurement: measurement,
|
||||
}
|
||||
actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement)
|
||||
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "", actualProcess)
|
||||
assert.Equal(t, "", resultCoresString)
|
||||
assert.Nil(t, resultValues)
|
||||
assert.Equal(t, time.Time{}, resultTimestamp)
|
||||
})
|
||||
t.Run("not valid values string", func(t *testing.T) {
|
||||
measurement := fmt.Sprintf("%s,%s,%s,%s,%s,%f,%f,%f,%f",
|
||||
timestamp,
|
||||
pids,
|
||||
cores,
|
||||
"1##",
|
||||
"da",
|
||||
metricsValues["LLC"],
|
||||
metricsValues["MBL"],
|
||||
metricsValues["MBR"],
|
||||
metricsValues["MBT"])
|
||||
|
||||
newMeasurement := processMeasurement{
|
||||
name: processName,
|
||||
measurement: measurement,
|
||||
}
|
||||
actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement)
|
||||
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "", actualProcess)
|
||||
assert.Equal(t, "", resultCoresString)
|
||||
assert.Nil(t, resultValues)
|
||||
assert.Equal(t, time.Time{}, resultTimestamp)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAddToAccumulatorCores(t *testing.T) {
|
||||
t.Run("shortened false", func(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
publisher := Publisher{acc: &acc}
|
||||
|
||||
cores := "1,2,3"
|
||||
metricsValues := []float64{1, 2, 3, 4, 5, 6}
|
||||
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
|
||||
|
||||
publisher.addToAccumulatorCores(cores, metricsValues, timestamp)
|
||||
|
||||
for _, test := range testCoreMetrics {
|
||||
acc.AssertContainsTaggedFields(t, "rdt_metric", test.fields, test.tags)
|
||||
}
|
||||
})
|
||||
t.Run("shortened true", func(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
publisher := Publisher{acc: &acc, shortenedMetrics: true}
|
||||
|
||||
cores := "1,2,3"
|
||||
metricsValues := []float64{1, 2, 3, 4, 5, 6}
|
||||
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
|
||||
|
||||
publisher.addToAccumulatorCores(cores, metricsValues, timestamp)
|
||||
|
||||
for _, test := range testCoreMetricsShortened {
|
||||
acc.AssertDoesNotContainsTaggedFields(t, "rdt_metric", test.fields, test.tags)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestAddToAccumulatorProcesses(t *testing.T) {
|
||||
t.Run("shortened false", func(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
publisher := Publisher{acc: &acc}
|
||||
|
||||
process := "process_name"
|
||||
cores := "1,2,3"
|
||||
metricsValues := []float64{1, 2, 3, 4, 5, 6}
|
||||
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
|
||||
|
||||
publisher.addToAccumulatorProcesses(process, cores, metricsValues, timestamp)
|
||||
|
||||
for _, test := range testCoreProcesses {
|
||||
acc.AssertContainsTaggedFields(t, "rdt_metric", test.fields, test.tags)
|
||||
}
|
||||
})
|
||||
t.Run("shortened true", func(t *testing.T) {
|
||||
var acc testutil.Accumulator
|
||||
publisher := Publisher{acc: &acc, shortenedMetrics: true}
|
||||
|
||||
process := "process_name"
|
||||
cores := "1,2,3"
|
||||
metricsValues := []float64{1, 2, 3, 4, 5, 6}
|
||||
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
|
||||
|
||||
publisher.addToAccumulatorProcesses(process, cores, metricsValues, timestamp)
|
||||
|
||||
for _, test := range testCoreProcessesShortened {
|
||||
acc.AssertDoesNotContainsTaggedFields(t, "rdt_metric", test.fields, test.tags)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
var (
|
||||
testCoreMetrics = []struct {
|
||||
fields map[string]interface{}
|
||||
tags map[string]string
|
||||
}{
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "IPC",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(2),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "LLC_Misses",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(3),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "LLC",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(4),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "MBL",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(5),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "MBR",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(6),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "MBT",
|
||||
},
|
||||
},
|
||||
}
|
||||
testCoreMetricsShortened = []struct {
|
||||
fields map[string]interface{}
|
||||
tags map[string]string
|
||||
}{
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "IPC",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(2),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "LLC_Misses",
|
||||
},
|
||||
},
|
||||
}
|
||||
testCoreProcesses = []struct {
|
||||
fields map[string]interface{}
|
||||
tags map[string]string
|
||||
}{
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "IPC",
|
||||
"process": "process_name",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(2),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "LLC_Misses",
|
||||
"process": "process_name",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(3),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "LLC",
|
||||
"process": "process_name",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(4),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "MBL",
|
||||
"process": "process_name",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(5),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "MBR",
|
||||
"process": "process_name",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(6),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "MBT",
|
||||
"process": "process_name",
|
||||
},
|
||||
},
|
||||
}
|
||||
testCoreProcessesShortened = []struct {
|
||||
fields map[string]interface{}
|
||||
tags map[string]string
|
||||
}{
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(1),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "IPC",
|
||||
"process": "process_name",
|
||||
},
|
||||
},
|
||||
{
|
||||
map[string]interface{}{
|
||||
"value": float64(2),
|
||||
},
|
||||
map[string]string{
|
||||
"cores": "1,2,3",
|
||||
"name": "LLC_Misses",
|
||||
"process": "process_name",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
Loading…
Reference in New Issue