fix: correct timezone in intel rdt plugin (#10026)

This commit is contained in:
trojanku 2021-11-01 19:53:23 +01:00 committed by GitHub
parent 43017559fa
commit 317dd38af3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 169 additions and 166 deletions

View File

@ -66,6 +66,12 @@ type processMeasurement struct {
measurement string
}
type splitCSVLine struct {
timeValue string
metricsValues []string
coreOrPIDsValues []string
}
// All gathering is done in the Start function
func (r *IntelRDT) Gather(_ telegraf.Accumulator) error {
return nil
@ -230,8 +236,8 @@ func (r *IntelRDT) associateProcessesWithPIDs(providedProcesses []string) (map[s
}
for _, availableProcess := range availableProcesses {
if choice.Contains(availableProcess.Name, providedProcesses) {
PID := availableProcess.PID
mapProcessPIDs[availableProcess.Name] = mapProcessPIDs[availableProcess.Name] + fmt.Sprintf("%d", PID) + ","
pid := availableProcess.PID
mapProcessPIDs[availableProcess.Name] = mapProcessPIDs[availableProcess.Name] + fmt.Sprintf("%d", pid) + ","
}
}
for key := range mapProcessPIDs {
@ -258,7 +264,7 @@ func (r *IntelRDT) readData(ctx context.Context, args []string, processesPIDsAss
r.wg.Add(1)
defer r.wg.Done()
cmd := exec.Command(r.PqosPath, append(args)...)
cmd := exec.Command(r.PqosPath, args...)
if r.UseSudo {
// run pqos with `/bin/sh -c "sudo /path/to/pqos ..."`
@ -327,13 +333,13 @@ func (r *IntelRDT) processOutput(cmdReader io.ReadCloser, processesPIDsAssociati
if len(r.Processes) != 0 {
newMetric := processMeasurement{}
PIDs, err := findPIDsInMeasurement(out)
pids, err := findPIDsInMeasurement(out)
if err != nil {
r.errorChan <- err
break
}
for processName, PIDsProcess := range processesPIDsAssociation {
if PIDs == PIDsProcess {
if pids == PIDsProcess {
newMetric.name = processName
newMetric.measurement = out
}
@ -482,29 +488,29 @@ func validateAndParseCores(coreStr string) ([]int, error) {
func findPIDsInMeasurement(measurements string) (string, error) {
// to distinguish PIDs from Cores (PIDs should be in quotes)
var insideQuoteRegex = regexp.MustCompile(`"(.*?)"`)
PIDsMatch := insideQuoteRegex.FindStringSubmatch(measurements)
if len(PIDsMatch) < 2 {
pidsMatch := insideQuoteRegex.FindStringSubmatch(measurements)
if len(pidsMatch) < 2 {
return "", fmt.Errorf("cannot find PIDs in measurement line")
}
PIDs := PIDsMatch[1]
return PIDs, nil
pids := pidsMatch[1]
return pids, nil
}
func splitCSVLineIntoValues(line string) (timeValue string, metricsValues, coreOrPIDsValues []string, err error) {
func splitCSVLineIntoValues(line string) (splitCSVLine, error) {
values, err := splitMeasurementLine(line)
if err != nil {
return "", nil, nil, err
return splitCSVLine{}, err
}
timeValue = values[0]
timeValue := values[0]
// Because pqos csv format is broken when many cores are involved in PID or
// group of PIDs, there is need to work around it. E.g.:
// Time,PID,Core,IPC,LLC Misses,LLC[KB],MBL[MB/s],MBR[MB/s],MBT[MB/s]
// 2020-08-12 13:34:36,"45417,29170,",37,44,0.00,0,0.0,0.0,0.0,0.0
metricsValues = values[len(values)-numberOfMetrics:]
coreOrPIDsValues = values[1 : len(values)-numberOfMetrics]
metricsValues := values[len(values)-numberOfMetrics:]
coreOrPIDsValues := values[1 : len(values)-numberOfMetrics]
return timeValue, metricsValues, coreOrPIDsValues, nil
return splitCSVLine{timeValue, metricsValues, coreOrPIDsValues}, nil
}
func validateInterval(interval int32) error {
@ -523,7 +529,7 @@ func splitMeasurementLine(line string) ([]string, error) {
}
func parseTime(value string) (time.Time, error) {
timestamp, err := time.Parse(timestampFormat, value)
timestamp, err := time.ParseInLocation(timestampFormat, value, time.Local)
if err != nil {
return time.Time{}, err
}

View File

@ -52,18 +52,18 @@ func TestSplitCSVLineIntoValues(t *testing.T) {
expectedMetricsValue := []string{"0.00", "0", "0.0", "0.0", "0.0", "0.0"}
expectedCoreOrPidsValue := []string{"\"45417", "29170\"", "37", "44"}
timeValue, metricsValue, coreOrPidsValue, err := splitCSVLineIntoValues(line)
splitCSV, err := splitCSVLineIntoValues(line)
assert.Nil(t, err)
assert.Equal(t, expectedTimeValue, timeValue)
assert.Equal(t, expectedMetricsValue, metricsValue)
assert.Equal(t, expectedCoreOrPidsValue, coreOrPidsValue)
assert.Equal(t, expectedTimeValue, splitCSV.timeValue)
assert.Equal(t, expectedMetricsValue, splitCSV.metricsValues)
assert.Equal(t, expectedCoreOrPidsValue, splitCSV.coreOrPIDsValues)
wrongLine := "2020-08-12 13:34:36,37,44,0.00,0,0.0"
timeValue, metricsValue, coreOrPidsValue, err = splitCSVLineIntoValues(wrongLine)
splitCSV, err = splitCSVLineIntoValues(wrongLine)
assert.NotNil(t, err)
assert.Equal(t, "", timeValue)
assert.Nil(t, nil, metricsValue)
assert.Nil(t, nil, coreOrPidsValue)
assert.Equal(t, "", splitCSV.timeValue)
assert.Nil(t, nil, splitCSV.metricsValues)
assert.Nil(t, nil, splitCSV.coreOrPIDsValues)
}
func TestFindPIDsInMeasurement(t *testing.T) {
@ -107,7 +107,6 @@ func TestCreateArgsCores(t *testing.T) {
assert.EqualValues(t, expected, result)
cores = []string{"1,2,3", "4,5,6"}
expected = "--mon-core="
expectedPrefix := "--mon-core="
expectedSubstring := "all:[1,2,3];mbt:[1,2,3];"
expectedSubstring2 := "all:[4,5,6];mbt:[4,5,6];"

View File

@ -5,12 +5,26 @@ package intel_rdt
import (
"context"
"errors"
"strings"
"time"
"github.com/influxdata/telegraf"
)
type parsedCoresMeasurement struct {
cores string
values []float64
time time.Time
}
type parsedProcessMeasurement struct {
process string
cores string
values []float64
time time.Time
}
// Publisher for publish new RDT metrics to telegraf accumulator
type Publisher struct {
acc telegraf.Accumulator
@ -50,48 +64,48 @@ func (p *Publisher) publish(ctx context.Context) {
}
func (p *Publisher) publishCores(measurement string) {
coresString, values, timestamp, err := parseCoresMeasurement(measurement)
parsedCoresMeasurement, err := parseCoresMeasurement(measurement)
if err != nil {
p.errChan <- err
}
p.addToAccumulatorCores(coresString, values, timestamp)
p.addToAccumulatorCores(parsedCoresMeasurement)
}
func (p *Publisher) publishProcess(measurement processMeasurement) {
process, coresString, values, timestamp, err := parseProcessesMeasurement(measurement)
parsedProcessMeasurement, err := parseProcessesMeasurement(measurement)
if err != nil {
p.errChan <- err
}
p.addToAccumulatorProcesses(process, coresString, values, timestamp)
p.addToAccumulatorProcesses(parsedProcessMeasurement)
}
func parseCoresMeasurement(measurements string) (string, []float64, time.Time, error) {
func parseCoresMeasurement(measurements string) (parsedCoresMeasurement, error) {
var values []float64
timeValue, metricsValues, cores, err := splitCSVLineIntoValues(measurements)
splitCSV, err := splitCSVLineIntoValues(measurements)
if err != nil {
return "", nil, time.Time{}, err
return parsedCoresMeasurement{}, err
}
timestamp, err := parseTime(timeValue)
timestamp, err := parseTime(splitCSV.timeValue)
if err != nil {
return "", nil, time.Time{}, err
return parsedCoresMeasurement{}, err
}
// change string slice to one string and separate it by coma
coresString := strings.Join(cores, ",")
coresString := strings.Join(splitCSV.coreOrPIDsValues, ",")
// trim unwanted quotes
coresString = strings.Trim(coresString, "\"")
for _, metric := range metricsValues {
for _, metric := range splitCSV.metricsValues {
parsedValue, err := parseFloat(metric)
if err != nil {
return "", nil, time.Time{}, err
return parsedCoresMeasurement{}, err
}
values = append(values, parsedValue)
}
return coresString, values, timestamp, nil
return parsedCoresMeasurement{coresString, values, timestamp}, nil
}
func (p *Publisher) addToAccumulatorCores(cores string, metricsValues []float64, timestamp time.Time) {
for i, value := range metricsValues {
func (p *Publisher) addToAccumulatorCores(measurement parsedCoresMeasurement) {
for i, value := range measurement.values {
if p.shortenedMetrics {
//0: "IPC"
//1: "LLC_Misses"
@ -102,41 +116,47 @@ func (p *Publisher) addToAccumulatorCores(cores string, metricsValues []float64,
tags := map[string]string{}
fields := make(map[string]interface{})
tags["cores"] = cores
tags["cores"] = measurement.cores
tags["name"] = pqosMetricOrder[i]
fields["value"] = value
p.acc.AddFields("rdt_metric", fields, tags, timestamp)
p.acc.AddFields("rdt_metric", fields, tags, measurement.time)
}
}
func parseProcessesMeasurement(measurement processMeasurement) (string, string, []float64, time.Time, error) {
var values []float64
timeValue, metricsValues, coreOrPidsValues, pids, err := parseProcessMeasurement(measurement.measurement)
func parseProcessesMeasurement(measurement processMeasurement) (parsedProcessMeasurement, error) {
splitCSV, err := splitCSVLineIntoValues(measurement.measurement)
if err != nil {
return "", "", nil, time.Time{}, err
return parsedProcessMeasurement{}, err
}
timestamp, err := parseTime(timeValue)
pids, err := findPIDsInMeasurement(measurement.measurement)
if err != nil {
return "", "", nil, time.Time{}, err
return parsedProcessMeasurement{}, err
}
lenOfPIDs := len(strings.Split(pids, ","))
if lenOfPIDs > len(splitCSV.coreOrPIDsValues) {
return parsedProcessMeasurement{}, errors.New("detected more pids (quoted) than actual number of pids in csv line")
}
timestamp, err := parseTime(splitCSV.timeValue)
if err != nil {
return parsedProcessMeasurement{}, err
}
actualProcess := measurement.name
lenOfPids := len(strings.Split(pids, ","))
cores := coreOrPidsValues[lenOfPids:]
coresString := strings.Trim(strings.Join(cores, ","), `"`)
cores := strings.Trim(strings.Join(splitCSV.coreOrPIDsValues[lenOfPIDs:], ","), `"`)
for _, metric := range metricsValues {
var values []float64
for _, metric := range splitCSV.metricsValues {
parsedValue, err := parseFloat(metric)
if err != nil {
return "", "", nil, time.Time{}, err
return parsedProcessMeasurement{}, err
}
values = append(values, parsedValue)
}
return actualProcess, coresString, values, timestamp, nil
return parsedProcessMeasurement{actualProcess, cores, values, timestamp}, nil
}
func (p *Publisher) addToAccumulatorProcesses(process string, cores string, metricsValues []float64, timestamp time.Time) {
for i, value := range metricsValues {
func (p *Publisher) addToAccumulatorProcesses(measurement parsedProcessMeasurement) {
for i, value := range measurement.values {
if p.shortenedMetrics {
//0: "IPC"
//1: "LLC_Misses"
@ -147,23 +167,11 @@ func (p *Publisher) addToAccumulatorProcesses(process string, cores string, metr
tags := map[string]string{}
fields := make(map[string]interface{})
tags["process"] = process
tags["cores"] = cores
tags["process"] = measurement.process
tags["cores"] = measurement.cores
tags["name"] = pqosMetricOrder[i]
fields["value"] = value
p.acc.AddFields("rdt_metric", fields, tags, timestamp)
p.acc.AddFields("rdt_metric", fields, tags, measurement.time)
}
}
func parseProcessMeasurement(measurements string) (string, []string, []string, string, error) {
timeValue, metricsValues, coreOrPidsValues, err := splitCSVLineIntoValues(measurements)
if err != nil {
return "", nil, nil, "", err
}
pids, err := findPIDsInMeasurement(measurements)
if err != nil {
return "", nil, nil, "", err
}
return timeValue, metricsValues, coreOrPidsValues, pids, nil
}

View File

@ -37,29 +37,29 @@ func TestParseCoresMeasurement(t *testing.T) {
metricsValues["MBT"])
expectedCores := "37,44"
expectedTimestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
expectedTimestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.Local)
resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement)
result, err := parseCoresMeasurement(measurement)
assert.Nil(t, err)
assert.Equal(t, expectedCores, resultCoresString)
assert.Equal(t, expectedTimestamp, resultTimestamp)
assert.Equal(t, resultValues[0], metricsValues["IPC"])
assert.Equal(t, resultValues[1], metricsValues["LLC_Misses"])
assert.Equal(t, resultValues[2], metricsValues["LLC"])
assert.Equal(t, resultValues[3], metricsValues["MBL"])
assert.Equal(t, resultValues[4], metricsValues["MBR"])
assert.Equal(t, resultValues[5], metricsValues["MBT"])
assert.Equal(t, expectedCores, result.cores)
assert.Equal(t, expectedTimestamp, result.time)
assert.Equal(t, result.values[0], metricsValues["IPC"])
assert.Equal(t, result.values[1], metricsValues["LLC_Misses"])
assert.Equal(t, result.values[2], metricsValues["LLC"])
assert.Equal(t, result.values[3], metricsValues["MBL"])
assert.Equal(t, result.values[4], metricsValues["MBR"])
assert.Equal(t, result.values[5], metricsValues["MBT"])
})
t.Run("not valid measurement string", func(t *testing.T) {
measurement := "not, valid, measurement"
resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement)
result, err := parseCoresMeasurement(measurement)
assert.NotNil(t, err)
assert.Equal(t, "", resultCoresString)
assert.Nil(t, resultValues)
assert.Equal(t, time.Time{}, resultTimestamp)
assert.Equal(t, "", result.cores)
assert.Nil(t, result.values)
assert.Equal(t, time.Time{}, result.time)
})
t.Run("not valid values string", func(t *testing.T) {
measurement := fmt.Sprintf("%s,%s,%s,%s,%f,%f,%f,%f",
@ -72,12 +72,12 @@ func TestParseCoresMeasurement(t *testing.T) {
metricsValues["MBR"],
metricsValues["MBT"])
resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement)
result, err := parseCoresMeasurement(measurement)
assert.NotNil(t, err)
assert.Equal(t, "", resultCoresString)
assert.Nil(t, resultValues)
assert.Equal(t, time.Time{}, resultTimestamp)
assert.Equal(t, "", result.cores)
assert.Nil(t, result.values)
assert.Equal(t, time.Time{}, result.time)
})
t.Run("not valid timestamp format", func(t *testing.T) {
invalidTimestamp := "2020-08-12-21 13:34:"
@ -91,12 +91,12 @@ func TestParseCoresMeasurement(t *testing.T) {
metricsValues["MBR"],
metricsValues["MBT"])
resultCoresString, resultValues, resultTimestamp, err := parseCoresMeasurement(measurement)
result, err := parseCoresMeasurement(measurement)
assert.NotNil(t, err)
assert.Equal(t, "", resultCoresString)
assert.Nil(t, resultValues)
assert.Equal(t, time.Time{}, resultTimestamp)
assert.Equal(t, "", result.cores)
assert.Nil(t, result.values)
assert.Equal(t, time.Time{}, result.time)
})
}
@ -119,44 +119,36 @@ func TestParseProcessesMeasurement(t *testing.T) {
metricsValues["MBT"])
expectedCores := "37,44"
expectedTimestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
expectedTimestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.Local)
newMeasurement := processMeasurement{
name: processName,
measurement: measurement,
}
actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement)
result, err := parseProcessesMeasurement(newMeasurement)
assert.Nil(t, err)
assert.Equal(t, processName, actualProcess)
assert.Equal(t, expectedCores, resultCoresString)
assert.Equal(t, expectedTimestamp, resultTimestamp)
assert.Equal(t, resultValues[0], metricsValues["IPC"])
assert.Equal(t, resultValues[1], metricsValues["LLC_Misses"])
assert.Equal(t, resultValues[2], metricsValues["LLC"])
assert.Equal(t, resultValues[3], metricsValues["MBL"])
assert.Equal(t, resultValues[4], metricsValues["MBR"])
assert.Equal(t, resultValues[5], metricsValues["MBT"])
assert.Equal(t, processName, result.process)
assert.Equal(t, expectedCores, result.cores)
assert.Equal(t, expectedTimestamp, result.time)
assert.Equal(t, result.values[0], metricsValues["IPC"])
assert.Equal(t, result.values[1], metricsValues["LLC_Misses"])
assert.Equal(t, result.values[2], metricsValues["LLC"])
assert.Equal(t, result.values[3], metricsValues["MBL"])
assert.Equal(t, result.values[4], metricsValues["MBR"])
assert.Equal(t, result.values[5], metricsValues["MBT"])
})
t.Run("not valid measurement string", func(t *testing.T) {
processName := "process_name"
measurement := "invalid,measurement,format"
newMeasurement := processMeasurement{
name: processName,
measurement: measurement,
}
actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement)
assert.NotNil(t, err)
assert.Equal(t, "", actualProcess)
assert.Equal(t, "", resultCoresString)
assert.Nil(t, resultValues)
assert.Equal(t, time.Time{}, resultTimestamp)
})
t.Run("not valid timestamp format", func(t *testing.T) {
invalidTimestamp := "2020-20-20-31"
measurement := fmt.Sprintf("%s,%s,%s,%f,%f,%f,%f,%f,%f",
invalidTimestamp := "2020-20-20-31"
negativeTests := []struct {
name string
measurement string
}{{
name: "not valid measurement string",
measurement: "invalid,measurement,format",
}, {
name: "not valid timestamp format",
measurement: fmt.Sprintf("%s,%s,%s,%f,%f,%f,%f,%f,%f",
invalidTimestamp,
pids,
cores,
@ -165,44 +157,42 @@ func TestParseProcessesMeasurement(t *testing.T) {
metricsValues["LLC"],
metricsValues["MBL"],
metricsValues["MBR"],
metricsValues["MBT"])
metricsValues["MBT"]),
},
{
name: "not valid values string",
measurement: fmt.Sprintf("%s,%s,%s,%s,%s,%f,%f,%f,%f",
timestamp,
pids,
cores,
"1##",
"da",
metricsValues["LLC"],
metricsValues["MBL"],
metricsValues["MBR"],
metricsValues["MBT"]),
},
{
name: "not valid csv line with quotes",
measurement: "0000-08-02 0:00:00,,\",,,,,,,,,,,,,,,,,,,,,,,,\",,",
},
}
newMeasurement := processMeasurement{
name: processName,
measurement: measurement,
}
actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement)
for _, test := range negativeTests {
t.Run(test.name, func(t *testing.T) {
newMeasurement := processMeasurement{
name: processName,
measurement: test.measurement,
}
result, err := parseProcessesMeasurement(newMeasurement)
assert.NotNil(t, err)
assert.Equal(t, "", actualProcess)
assert.Equal(t, "", resultCoresString)
assert.Nil(t, resultValues)
assert.Equal(t, time.Time{}, resultTimestamp)
})
t.Run("not valid values string", func(t *testing.T) {
measurement := fmt.Sprintf("%s,%s,%s,%s,%s,%f,%f,%f,%f",
timestamp,
pids,
cores,
"1##",
"da",
metricsValues["LLC"],
metricsValues["MBL"],
metricsValues["MBR"],
metricsValues["MBT"])
newMeasurement := processMeasurement{
name: processName,
measurement: measurement,
}
actualProcess, resultCoresString, resultValues, resultTimestamp, err := parseProcessesMeasurement(newMeasurement)
assert.NotNil(t, err)
assert.Equal(t, "", actualProcess)
assert.Equal(t, "", resultCoresString)
assert.Nil(t, resultValues)
assert.Equal(t, time.Time{}, resultTimestamp)
})
assert.NotNil(t, err)
assert.Equal(t, "", result.process)
assert.Equal(t, "", result.cores)
assert.Nil(t, result.values)
assert.Equal(t, time.Time{}, result.time)
})
}
}
func TestAddToAccumulatorCores(t *testing.T) {
@ -212,9 +202,9 @@ func TestAddToAccumulatorCores(t *testing.T) {
cores := "1,2,3"
metricsValues := []float64{1, 2, 3, 4, 5, 6}
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.Local)
publisher.addToAccumulatorCores(cores, metricsValues, timestamp)
publisher.addToAccumulatorCores(parsedCoresMeasurement{cores, metricsValues, timestamp})
for _, test := range testCoreMetrics {
acc.AssertContainsTaggedFields(t, "rdt_metric", test.fields, test.tags)
@ -226,9 +216,9 @@ func TestAddToAccumulatorCores(t *testing.T) {
cores := "1,2,3"
metricsValues := []float64{1, 2, 3, 4, 5, 6}
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.Local)
publisher.addToAccumulatorCores(cores, metricsValues, timestamp)
publisher.addToAccumulatorCores(parsedCoresMeasurement{cores, metricsValues, timestamp})
for _, test := range testCoreMetricsShortened {
acc.AssertDoesNotContainsTaggedFields(t, "rdt_metric", test.fields, test.tags)
@ -244,9 +234,9 @@ func TestAddToAccumulatorProcesses(t *testing.T) {
process := "process_name"
cores := "1,2,3"
metricsValues := []float64{1, 2, 3, 4, 5, 6}
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.Local)
publisher.addToAccumulatorProcesses(process, cores, metricsValues, timestamp)
publisher.addToAccumulatorProcesses(parsedProcessMeasurement{process, cores, metricsValues, timestamp})
for _, test := range testCoreProcesses {
acc.AssertContainsTaggedFields(t, "rdt_metric", test.fields, test.tags)
@ -259,9 +249,9 @@ func TestAddToAccumulatorProcesses(t *testing.T) {
process := "process_name"
cores := "1,2,3"
metricsValues := []float64{1, 2, 3, 4, 5, 6}
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.UTC)
timestamp := time.Date(2020, 8, 12, 13, 34, 36, 0, time.Local)
publisher.addToAccumulatorProcesses(process, cores, metricsValues, timestamp)
publisher.addToAccumulatorProcesses(parsedProcessMeasurement{process, cores, metricsValues, timestamp})
for _, test := range testCoreProcessesShortened {
acc.AssertDoesNotContainsTaggedFields(t, "rdt_metric", test.fields, test.tags)