chore: Fix linter findings for `revive:exported` in `plugins/inputs/i*` (#16075)

This commit is contained in:
Paweł Żak 2024-10-25 12:45:08 +02:00 committed by GitHub
parent 8e3b9aeff5
commit f8999c716b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
49 changed files with 782 additions and 818 deletions

View File

@ -21,22 +21,24 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
var levels = []string{"ok", "warning", "critical", "unknown"}
type Icinga2 struct { type Icinga2 struct {
Server string Server string `toml:"server"`
Objects []string Objects []string `toml:"objects"`
Status []string Status []string `toml:"status"`
ObjectType string `toml:"object_type" deprecated:"1.26.0;1.35.0;use 'objects' instead"` ObjectType string `toml:"object_type" deprecated:"1.26.0;1.35.0;use 'objects' instead"`
Username string Username string `toml:"username"`
Password string Password string `toml:"password"`
ResponseTimeout config.Duration ResponseTimeout config.Duration `toml:"response_timeout"`
tls.ClientConfig tls.ClientConfig
Log telegraf.Logger Log telegraf.Logger `toml:"-"`
client *http.Client client *http.Client
} }
type ResultObject struct { type resultObject struct {
Results []struct { Results []struct {
Attrs struct { Attrs struct {
CheckCommand string `json:"check_command"` CheckCommand string `json:"check_command"`
@ -52,13 +54,13 @@ type ResultObject struct {
} `json:"results"` } `json:"results"`
} }
type ResultCIB struct { type resultCIB struct {
Results []struct { Results []struct {
Status map[string]interface{} `json:"status"` Status map[string]interface{} `json:"status"`
} `json:"results"` } `json:"results"`
} }
type ResultPerfdata struct { type resultPerfdata struct {
Results []struct { Results []struct {
Perfdata []struct { Perfdata []struct {
Label string `json:"label"` Label string `json:"label"`
@ -67,8 +69,6 @@ type ResultPerfdata struct {
} `json:"results"` } `json:"results"`
} }
var levels = []string{"ok", "warning", "critical", "unknown"}
func (*Icinga2) SampleConfig() string { func (*Icinga2) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -102,7 +102,69 @@ func (i *Icinga2) Init() error {
return nil return nil
} }
func (i *Icinga2) gatherObjects(acc telegraf.Accumulator, checks ResultObject, objectType string) { func (i *Icinga2) Gather(acc telegraf.Accumulator) error {
// Collect /v1/objects
for _, objectType := range i.Objects {
requestURL := "%s/v1/objects/%s?attrs=name&attrs=display_name&attrs=state&attrs=check_command"
// Note: attrs=host_name is only valid for 'services' requests, using check.Attrs.HostName for the host
// 'hosts' requests will need to use attrs=name only, using check.Attrs.Name for the host
if objectType == "services" {
requestURL += "&attrs=host_name"
}
address := fmt.Sprintf(requestURL, i.Server, objectType)
resp, err := i.icingaRequest(address)
if err != nil {
return err
}
result := resultObject{}
err = i.parseObjectResponse(resp, &result)
if err != nil {
return fmt.Errorf("could not parse object response: %w", err)
}
i.gatherObjects(acc, result, objectType)
}
// Collect /v1/status
for _, statusType := range i.Status {
address := fmt.Sprintf("%s/v1/status/%s", i.Server, statusType)
resp, err := i.icingaRequest(address)
if err != nil {
return err
}
tags := map[string]string{
"component": statusType,
}
var fields map[string]interface{}
switch statusType {
case "ApiListener":
fields, err = i.parsePerfdataResponse(resp)
case "CIB":
fields, err = i.parseCIBResponse(resp)
case "IdoMysqlConnection":
fields, err = i.parsePerfdataResponse(resp)
case "IdoPgsqlConnection":
fields, err = i.parsePerfdataResponse(resp)
}
if err != nil {
return fmt.Errorf("could not parse %s response: %w", statusType, err)
}
acc.AddFields("icinga2_status", fields, tags)
}
return nil
}
func (i *Icinga2) gatherObjects(acc telegraf.Accumulator, checks resultObject, objectType string) {
for _, check := range checks.Results { for _, check := range checks.Results {
serverURL, err := url.Parse(i.Server) serverURL, err := url.Parse(i.Server)
if err != nil { if err != nil {
@ -171,7 +233,7 @@ func (i *Icinga2) icingaRequest(address string) (*http.Response, error) {
return resp, nil return resp, nil
} }
func (i *Icinga2) parseObjectResponse(resp *http.Response, result *ResultObject) error { func (i *Icinga2) parseObjectResponse(resp *http.Response, result *resultObject) error {
err := json.NewDecoder(resp.Body).Decode(&result) err := json.NewDecoder(resp.Body).Decode(&result)
if err != nil { if err != nil {
return err return err
@ -185,7 +247,7 @@ func (i *Icinga2) parseObjectResponse(resp *http.Response, result *ResultObject)
} }
func (i *Icinga2) parseCIBResponse(resp *http.Response) (map[string]interface{}, error) { func (i *Icinga2) parseCIBResponse(resp *http.Response) (map[string]interface{}, error) {
result := ResultCIB{} result := resultCIB{}
err := json.NewDecoder(resp.Body).Decode(&result) err := json.NewDecoder(resp.Body).Decode(&result)
if err != nil { if err != nil {
@ -201,7 +263,7 @@ func (i *Icinga2) parseCIBResponse(resp *http.Response) (map[string]interface{},
} }
func (i *Icinga2) parsePerfdataResponse(resp *http.Response) (map[string]interface{}, error) { func (i *Icinga2) parsePerfdataResponse(resp *http.Response) (map[string]interface{}, error) {
result := ResultPerfdata{} result := resultPerfdata{}
err := json.NewDecoder(resp.Body).Decode(&result) err := json.NewDecoder(resp.Body).Decode(&result)
if err != nil { if err != nil {
@ -226,68 +288,6 @@ func (i *Icinga2) parsePerfdataResponse(resp *http.Response) (map[string]interfa
return fields, nil return fields, nil
} }
func (i *Icinga2) Gather(acc telegraf.Accumulator) error {
// Collect /v1/objects
for _, objectType := range i.Objects {
requestURL := "%s/v1/objects/%s?attrs=name&attrs=display_name&attrs=state&attrs=check_command"
// Note: attrs=host_name is only valid for 'services' requests, using check.Attrs.HostName for the host
// 'hosts' requests will need to use attrs=name only, using check.Attrs.Name for the host
if objectType == "services" {
requestURL += "&attrs=host_name"
}
address := fmt.Sprintf(requestURL, i.Server, objectType)
resp, err := i.icingaRequest(address)
if err != nil {
return err
}
result := ResultObject{}
err = i.parseObjectResponse(resp, &result)
if err != nil {
return fmt.Errorf("could not parse object response: %w", err)
}
i.gatherObjects(acc, result, objectType)
}
// Collect /v1/status
for _, statusType := range i.Status {
address := fmt.Sprintf("%s/v1/status/%s", i.Server, statusType)
resp, err := i.icingaRequest(address)
if err != nil {
return err
}
tags := map[string]string{
"component": statusType,
}
var fields map[string]interface{}
switch statusType {
case "ApiListener":
fields, err = i.parsePerfdataResponse(resp)
case "CIB":
fields, err = i.parseCIBResponse(resp)
case "IdoMysqlConnection":
fields, err = i.parsePerfdataResponse(resp)
case "IdoPgsqlConnection":
fields, err = i.parsePerfdataResponse(resp)
}
if err != nil {
return fmt.Errorf("could not parse %s response: %w", statusType, err)
}
acc.AddFields("icinga2_status", fields, tags)
}
return nil
}
func init() { func init() {
inputs.Add("icinga2", func() telegraf.Input { inputs.Add("icinga2", func() telegraf.Input {
return &Icinga2{ return &Icinga2{

View File

@ -7,9 +7,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
) )
func TestIcinga2Default(t *testing.T) { func TestIcinga2Default(t *testing.T) {

View File

@ -11,8 +11,6 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// Stores the configuration values for the infiniband plugin - as there are no
// config values, this is intentionally empty
type Infiniband struct { type Infiniband struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }

View File

@ -26,19 +26,6 @@ const (
maxErrorResponseBodyLength = 1024 maxErrorResponseBodyLength = 1024
) )
type APIError struct {
StatusCode int
Reason string
Description string `json:"error"`
}
func (e *APIError) Error() string {
if e.Description != "" {
return e.Reason + ": " + e.Description
}
return e.Reason
}
type InfluxDB struct { type InfluxDB struct {
URLs []string `toml:"urls"` URLs []string `toml:"urls"`
Username string `toml:"username"` Username string `toml:"username"`
@ -49,6 +36,19 @@ type InfluxDB struct {
client *http.Client client *http.Client
} }
type apiError struct {
StatusCode int
Reason string
Description string `json:"error"`
}
func (e *apiError) Error() string {
if e.Description != "" {
return e.Reason + ": " + e.Description
}
return e.Reason
}
func (*InfluxDB) SampleConfig() string { func (*InfluxDB) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -284,7 +284,7 @@ func (i *InfluxDB) gatherURL(acc telegraf.Accumulator, url string) error {
} }
func readResponseError(resp *http.Response) error { func readResponseError(resp *http.Response) error {
apiError := &APIError{ apiErr := &apiError{
StatusCode: resp.StatusCode, StatusCode: resp.StatusCode,
Reason: resp.Status, Reason: resp.Status,
} }
@ -293,15 +293,15 @@ func readResponseError(resp *http.Response) error {
r := io.LimitReader(resp.Body, maxErrorResponseBodyLength) r := io.LimitReader(resp.Body, maxErrorResponseBodyLength)
_, err := buf.ReadFrom(r) _, err := buf.ReadFrom(r)
if err != nil { if err != nil {
return apiError return apiErr
} }
err = json.Unmarshal(buf.Bytes(), apiError) err = json.Unmarshal(buf.Bytes(), apiErr)
if err != nil { if err != nil {
return apiError return apiErr
} }
return apiError return apiErr
} }
func init() { func init() {

View File

@ -1,4 +1,4 @@
package influxdb_test package influxdb
import ( import (
"fmt" "fmt"
@ -10,7 +10,6 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs/influxdb"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -26,7 +25,7 @@ func TestBasic(t *testing.T) {
})) }))
defer fakeServer.Close() defer fakeServer.Close()
plugin := &influxdb.InfluxDB{ plugin := &InfluxDB{
URLs: []string{fakeServer.URL + "/endpoint"}, URLs: []string{fakeServer.URL + "/endpoint"},
} }
@ -77,7 +76,7 @@ func TestInfluxDB(t *testing.T) {
})) }))
defer fakeInfluxServer.Close() defer fakeInfluxServer.Close()
plugin := &influxdb.InfluxDB{ plugin := &InfluxDB{
URLs: []string{fakeInfluxServer.URL + "/endpoint"}, URLs: []string{fakeInfluxServer.URL + "/endpoint"},
} }
@ -149,7 +148,7 @@ func TestInfluxDB2(t *testing.T) {
})) }))
defer fakeInfluxServer.Close() defer fakeInfluxServer.Close()
plugin := &influxdb.InfluxDB{ plugin := &InfluxDB{
URLs: []string{fakeInfluxServer.URL + "/endpoint"}, URLs: []string{fakeInfluxServer.URL + "/endpoint"},
} }
@ -190,7 +189,7 @@ func TestCloud1(t *testing.T) {
defer server.Close() defer server.Close()
// Setup the plugin // Setup the plugin
plugin := &influxdb.InfluxDB{ plugin := &InfluxDB{
URLs: []string{server.URL + "/endpoint"}, URLs: []string{server.URL + "/endpoint"},
} }
@ -224,7 +223,7 @@ func TestErrorHandling(t *testing.T) {
})) }))
defer badServer.Close() defer badServer.Close()
plugin := &influxdb.InfluxDB{ plugin := &InfluxDB{
URLs: []string{badServer.URL + "/endpoint"}, URLs: []string{badServer.URL + "/endpoint"},
} }
@ -243,7 +242,7 @@ func TestErrorHandling404(t *testing.T) {
})) }))
defer badServer.Close() defer badServer.Close()
plugin := &influxdb.InfluxDB{ plugin := &InfluxDB{
URLs: []string{badServer.URL}, URLs: []string{badServer.URL},
} }
@ -259,7 +258,7 @@ func TestErrorResponse(t *testing.T) {
})) }))
defer ts.Close() defer ts.Close()
plugin := &influxdb.InfluxDB{ plugin := &InfluxDB{
URLs: []string{ts.URL}, URLs: []string{ts.URL},
} }
@ -268,7 +267,7 @@ func TestErrorResponse(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
expected := []error{ expected := []error{
&influxdb.APIError{ &apiError{
StatusCode: http.StatusUnauthorized, StatusCode: http.StatusUnauthorized,
Reason: fmt.Sprintf("%d %s", http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)), Reason: fmt.Sprintf("%d %s", http.StatusUnauthorized, http.StatusText(http.StatusUnauthorized)),
Description: "unable to parse authentication credentials", Description: "unable to parse authentication credentials",

View File

@ -80,28 +80,6 @@ func (h *InfluxDBListener) Gather(_ telegraf.Accumulator) error {
return nil return nil
} }
func (h *InfluxDBListener) routes() {
var authHandler func(http.Handler) http.Handler
if h.TokenSharedSecret != "" {
authHandler = internal.JWTAuthHandler(h.TokenSharedSecret, h.TokenUsername,
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
} else {
authHandler = internal.BasicAuthHandler(h.BasicUsername, h.BasicPassword, "influxdb",
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
}
h.mux.Handle("/write", authHandler(h.handleWrite()))
h.mux.Handle("/query", authHandler(h.handleQuery()))
h.mux.Handle("/ping", h.handlePing())
h.mux.Handle("/", authHandler(h.handleDefault()))
}
func (h *InfluxDBListener) Init() error { func (h *InfluxDBListener) Init() error {
// Check the config setting // Check the config setting
if (h.BasicUsername != "" || h.BasicPassword != "") && (h.TokenSharedSecret != "" || h.TokenUsername != "") { if (h.BasicUsername != "" || h.BasicPassword != "") && (h.TokenSharedSecret != "" || h.TokenUsername != "") {
@ -139,7 +117,6 @@ func (h *InfluxDBListener) Init() error {
return nil return nil
} }
// Start starts the InfluxDB listener service.
func (h *InfluxDBListener) Start(acc telegraf.Accumulator) error { func (h *InfluxDBListener) Start(acc telegraf.Accumulator) error {
h.acc = acc h.acc = acc
@ -183,7 +160,6 @@ func (h *InfluxDBListener) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
// Stop cleans up all resources
func (h *InfluxDBListener) Stop() { func (h *InfluxDBListener) Stop() {
err := h.server.Shutdown(context.Background()) err := h.server.Shutdown(context.Background())
if err != nil { if err != nil {
@ -191,6 +167,28 @@ func (h *InfluxDBListener) Stop() {
} }
} }
func (h *InfluxDBListener) routes() {
var authHandler func(http.Handler) http.Handler
if h.TokenSharedSecret != "" {
authHandler = internal.JWTAuthHandler(h.TokenSharedSecret, h.TokenUsername,
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
} else {
authHandler = internal.BasicAuthHandler(h.BasicUsername, h.BasicPassword, "influxdb",
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
}
h.mux.Handle("/write", authHandler(h.handleWrite()))
h.mux.Handle("/query", authHandler(h.handleQuery()))
h.mux.Handle("/ping", h.handlePing())
h.mux.Handle("/", authHandler(h.handleDefault()))
}
func (h *InfluxDBListener) ServeHTTP(res http.ResponseWriter, req *http.Request) { func (h *InfluxDBListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
h.requestsRecv.Incr(1) h.requestsRecv.Incr(1)
h.mux.ServeHTTP(res, req) h.mux.ServeHTTP(res, req)

View File

@ -33,18 +33,11 @@ var sampleConfig string
const ( const (
// defaultMaxBodySize is the default maximum request body size, in bytes. // defaultMaxBodySize is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error. // if the request body is over this size, we will return an HTTP 413 error.
defaultMaxBodySize = 32 * 1024 * 1024 defaultMaxBodySize = 32 * 1024 * 1024
defaultReadTimeout = 10 * time.Second defaultReadTimeout = 10 * time.Second
defaultWriteTimeout = 10 * time.Second defaultWriteTimeout = 10 * time.Second
) internalError BadRequestCode = "internal error"
invalid BadRequestCode = "invalid"
// The BadRequestCode constants keep standard error messages
// see: https://v2.docs.influxdata.com/v2.0/api/#operation/PostWrite
type BadRequestCode string
const (
InternalError BadRequestCode = "internal error"
Invalid BadRequestCode = "invalid"
) )
type InfluxDBV2Listener struct { type InfluxDBV2Listener struct {
@ -60,68 +53,44 @@ type InfluxDBV2Listener struct {
BucketTag string `toml:"bucket_tag"` BucketTag string `toml:"bucket_tag"`
ParserType string `toml:"parser_type"` ParserType string `toml:"parser_type"`
ctx context.Context Log telegraf.Logger `toml:"-"`
cancel context.CancelFunc
trackingMetricCount map[telegraf.TrackingID]int64 ctx context.Context
countLock sync.Mutex cancel context.CancelFunc
trackingMetricCount map[telegraf.TrackingID]int64
countLock sync.Mutex
totalUndeliveredMetrics atomic.Int64 totalUndeliveredMetrics atomic.Int64
timeFunc influx.TimeFunc timeFunc influx.TimeFunc
listener net.Listener listener net.Listener
server http.Server
acc telegraf.Accumulator server http.Server
trackingAcc telegraf.TrackingAccumulator acc telegraf.Accumulator
trackingAcc telegraf.TrackingAccumulator
bytesRecv selfstat.Stat bytesRecv selfstat.Stat
requestsServed selfstat.Stat requestsServed selfstat.Stat
writesServed selfstat.Stat writesServed selfstat.Stat
readysServed selfstat.Stat readysServed selfstat.Stat
requestsRecv selfstat.Stat requestsRecv selfstat.Stat
notFoundsServed selfstat.Stat notFoundsServed selfstat.Stat
authFailures selfstat.Stat
authFailures selfstat.Stat
startTime time.Time startTime time.Time
Log telegraf.Logger `toml:"-"`
mux http.ServeMux mux http.ServeMux
} }
// The BadRequestCode constants keep standard error messages
// see: https://v2.docs.influxdata.com/v2.0/api/#operation/PostWrite
type BadRequestCode string
func (*InfluxDBV2Listener) SampleConfig() string { func (*InfluxDBV2Listener) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (h *InfluxDBV2Listener) Gather(_ telegraf.Accumulator) error {
return nil
}
func (h *InfluxDBV2Listener) routes() error {
credentials := ""
if !h.Token.Empty() {
secBuf, err := h.Token.Get()
if err != nil {
return err
}
credentials = "Token " + secBuf.String()
secBuf.Destroy()
}
authHandler := internal.GenericAuthHandler(credentials,
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
h.mux.Handle("/api/v2/write", authHandler(h.handleWrite()))
h.mux.Handle("/api/v2/ready", h.handleReady())
h.mux.Handle("/", authHandler(h.handleDefault()))
return nil
}
func (h *InfluxDBV2Listener) Init() error { func (h *InfluxDBV2Listener) Init() error {
tags := map[string]string{ tags := map[string]string{
"address": h.ServiceAddress, "address": h.ServiceAddress,
@ -151,7 +120,10 @@ func (h *InfluxDBV2Listener) Init() error {
return nil return nil
} }
// Start starts the InfluxDB listener service. func (h *InfluxDBV2Listener) Gather(_ telegraf.Accumulator) error {
return nil
}
func (h *InfluxDBV2Listener) Start(acc telegraf.Accumulator) error { func (h *InfluxDBV2Listener) Start(acc telegraf.Accumulator) error {
h.acc = acc h.acc = acc
h.ctx, h.cancel = context.WithCancel(context.Background()) h.ctx, h.cancel = context.WithCancel(context.Background())
@ -217,7 +189,6 @@ func (h *InfluxDBV2Listener) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
// Stop cleans up all resources
func (h *InfluxDBV2Listener) Stop() { func (h *InfluxDBV2Listener) Stop() {
h.cancel() h.cancel()
err := h.server.Shutdown(context.Background()) err := h.server.Shutdown(context.Background())
@ -232,6 +203,31 @@ func (h *InfluxDBV2Listener) ServeHTTP(res http.ResponseWriter, req *http.Reques
h.requestsServed.Incr(1) h.requestsServed.Incr(1)
} }
func (h *InfluxDBV2Listener) routes() error {
credentials := ""
if !h.Token.Empty() {
secBuf, err := h.Token.Get()
if err != nil {
return err
}
credentials = "Token " + secBuf.String()
secBuf.Destroy()
}
authHandler := internal.GenericAuthHandler(credentials,
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
h.mux.Handle("/api/v2/write", authHandler(h.handleWrite()))
h.mux.Handle("/api/v2/ready", h.handleReady())
h.mux.Handle("/", authHandler(h.handleDefault()))
return nil
}
func (h *InfluxDBV2Listener) handleReady() http.HandlerFunc { func (h *InfluxDBV2Listener) handleReady() http.HandlerFunc {
return func(res http.ResponseWriter, _ *http.Request) { return func(res http.ResponseWriter, _ *http.Request) {
defer h.readysServed.Incr(1) defer h.readysServed.Incr(1)
@ -281,7 +277,7 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
body, err = gzip.NewReader(body) body, err = gzip.NewReader(body)
if err != nil { if err != nil {
h.Log.Debugf("Error decompressing request body: %v", err.Error()) h.Log.Debugf("Error decompressing request body: %v", err.Error())
if err := badRequest(res, Invalid, err.Error()); err != nil { if err := badRequest(res, invalid, err.Error()); err != nil {
h.Log.Debugf("error in bad-request: %v", err) h.Log.Debugf("error in bad-request: %v", err)
} }
return return
@ -294,7 +290,7 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
bytes, readErr = io.ReadAll(body) bytes, readErr = io.ReadAll(body)
if readErr != nil { if readErr != nil {
h.Log.Debugf("Error parsing the request body: %v", readErr.Error()) h.Log.Debugf("Error parsing the request body: %v", readErr.Error())
if err := badRequest(res, InternalError, readErr.Error()); err != nil { if err := badRequest(res, internalError, readErr.Error()); err != nil {
h.Log.Debugf("error in bad-request: %v", err) h.Log.Debugf("error in bad-request: %v", err)
} }
return return
@ -341,7 +337,7 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
if !errors.Is(err, io.EOF) && err != nil { if !errors.Is(err, io.EOF) && err != nil {
h.Log.Debugf("Error parsing the request body: %v", err.Error()) h.Log.Debugf("Error parsing the request body: %v", err.Error())
if err := badRequest(res, Invalid, err.Error()); err != nil { if err := badRequest(res, invalid, err.Error()); err != nil {
h.Log.Debugf("error in bad-request: %v", err) h.Log.Debugf("error in bad-request: %v", err)
} }
return return
@ -401,7 +397,7 @@ func tooLarge(res http.ResponseWriter, maxLength int64) error {
res.Header().Set("X-Influxdb-Error", "http: request body too large") res.Header().Set("X-Influxdb-Error", "http: request body too large")
res.WriteHeader(http.StatusRequestEntityTooLarge) res.WriteHeader(http.StatusRequestEntityTooLarge)
b, err := json.Marshal(map[string]string{ b, err := json.Marshal(map[string]string{
"code": fmt.Sprint(Invalid), "code": fmt.Sprint(invalid),
"message": "http: request body too large", "message": "http: request body too large",
"maxLength": strconv.FormatInt(maxLength, 10)}) "maxLength": strconv.FormatInt(maxLength, 10)})
if err != nil { if err != nil {

View File

@ -15,6 +15,9 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
//go:embed sample.conf
var sampleConfig string
const ( const (
// plugin name. Exposed with all metrics // plugin name. Exposed with all metrics
pluginName = "intel_baseband" pluginName = "intel_baseband"
@ -38,9 +41,6 @@ const (
defaultWaitForTelemetryTimeout = config.Duration(time.Second) defaultWaitForTelemetryTimeout = config.Duration(time.Second)
) )
//go:embed sample.conf
var sampleConfig string
type Baseband struct { type Baseband struct {
// required params // required params
SocketPath string `toml:"socket_path"` SocketPath string `toml:"socket_path"`
@ -60,7 +60,6 @@ func (b *Baseband) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Init performs one time setup of the plugin
func (b *Baseband) Init() error { func (b *Baseband) Init() error {
if b.SocketAccessTimeout < 0 { if b.SocketAccessTimeout < 0 {
return errors.New("socket_access_timeout should be positive number or equal to 0 (to disable timeouts)") return errors.New("socket_access_timeout should be positive number or equal to 0 (to disable timeouts)")

View File

@ -17,11 +17,12 @@ type Baseband struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*Baseband) SampleConfig() string { return sampleConfig }
func (b *Baseband) Init() error { func (b *Baseband) Init() error {
b.Log.Warn("current platform is not supported") b.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*Baseband) SampleConfig() string { return sampleConfig }
func (*Baseband) Gather(_ telegraf.Accumulator) error { return nil } func (*Baseband) Gather(_ telegraf.Accumulator) error { return nil }
func init() { func init() {

View File

@ -63,7 +63,7 @@ func TestInit(t *testing.T) {
defer tempSocket.Close() defer tempSocket.Close()
logTempFile := newTempLogFile(t) logTempFile := newTempLogFile(t)
defer logTempFile.Close() defer logTempFile.close()
baseband.SocketPath = tempSocket.pathToSocket baseband.SocketPath = tempSocket.pathToSocket
baseband.FileLogPath = logTempFile.pathToFile baseband.FileLogPath = logTempFile.pathToFile
@ -148,7 +148,7 @@ type tempLogFile struct {
file *os.File file *os.File
} }
func (tlf *tempLogFile) Close() { func (tlf *tempLogFile) close() {
var err error var err error
if err = tlf.file.Close(); err != nil { if err = tlf.file.Close(); err != nil {
panic(err) panic(err)

View File

@ -68,7 +68,7 @@ func TestDumpTelemetryToLog(t *testing.T) {
tempSocket := newTempSocket(t) tempSocket := newTempSocket(t)
defer tempSocket.Close() defer tempSocket.Close()
tempLogFile := newTempLogFile(t) tempLogFile := newTempLogFile(t)
defer tempLogFile.Close() defer tempLogFile.close()
connector := newSocketConnector(tempSocket.pathToSocket, 5*time.Second) connector := newSocketConnector(tempSocket.pathToSocket, 5*time.Second)
err := connector.dumpTelemetryToLog() err := connector.dumpTelemetryToLog()

View File

@ -26,6 +26,17 @@ var sampleConfig string
var unreachableSocketBehaviors = []string{"error", "ignore"} var unreachableSocketBehaviors = []string{"error", "ignore"}
const (
defaultSocketPath = "/var/run/dpdk/rte/dpdk_telemetry.v2"
pluginName = "intel_dlb"
eventdevListCommand = "/eventdev/dev_list"
dlbDeviceIDLocation = "/sys/devices/*/*/device"
aerCorrectableFileName = "aer_dev_correctable"
aerFatalFileName = "aer_dev_fatal"
aerNonFatalFileName = "aer_dev_nonfatal"
defaultDLBDevice = "0x2710"
)
type IntelDLB struct { type IntelDLB struct {
SocketPath string `toml:"socket_path"` SocketPath string `toml:"socket_path"`
EventdevCommands []string `toml:"eventdev_commands"` EventdevCommands []string `toml:"eventdev_commands"`
@ -39,23 +50,10 @@ type IntelDLB struct {
maxInitMessageLength uint32 maxInitMessageLength uint32
} }
const (
defaultSocketPath = "/var/run/dpdk/rte/dpdk_telemetry.v2"
pluginName = "intel_dlb"
eventdevListCommand = "/eventdev/dev_list"
dlbDeviceIDLocation = "/sys/devices/*/*/device"
aerCorrectableFileName = "aer_dev_correctable"
aerFatalFileName = "aer_dev_fatal"
aerNonFatalFileName = "aer_dev_nonfatal"
defaultDLBDevice = "0x2710"
)
// SampleConfig returns sample config
func (d *IntelDLB) SampleConfig() string { func (d *IntelDLB) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Init performs validation of all parameters from configuration.
func (d *IntelDLB) Init() error { func (d *IntelDLB) Init() error {
var err error var err error
@ -105,7 +103,6 @@ func (d *IntelDLB) Init() error {
return nil return nil
} }
// Gather all unique commands and process each command sequentially.
func (d *IntelDLB) Gather(acc telegraf.Accumulator) error { func (d *IntelDLB) Gather(acc telegraf.Accumulator) error {
err := d.gatherMetricsFromSocket(acc) err := d.gatherMetricsFromSocket(acc)
if err != nil { if err != nil {

View File

@ -17,11 +17,12 @@ type IntelDLB struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*IntelDLB) SampleConfig() string { return sampleConfig }
func (i *IntelDLB) Init() error { func (i *IntelDLB) Init() error {
i.Log.Warn("current platform is not supported") i.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*IntelDLB) SampleConfig() string { return sampleConfig }
func (*IntelDLB) Gather(_ telegraf.Accumulator) error { return nil } func (*IntelDLB) Gather(_ telegraf.Accumulator) error { return nil }
func init() { func init() {

View File

@ -33,14 +33,6 @@ const (
pluginName = "intel_pmt" pluginName = "intel_pmt"
) )
type pmtFileInfo []fileInfo
type fileInfo struct {
path string
numaNode string
pciBdf string // PCI Bus:Device.Function (BDF)
}
type IntelPMT struct { type IntelPMT struct {
PmtSpec string `toml:"spec"` PmtSpec string `toml:"spec"`
DatatypeFilter []string `toml:"datatypes_enabled"` DatatypeFilter []string `toml:"datatypes_enabled"`
@ -56,12 +48,18 @@ type IntelPMT struct {
pmtTransformations map[string]map[string]transformation pmtTransformations map[string]map[string]transformation
} }
// SampleConfig returns a sample configuration (See sample.conf). type pmtFileInfo []fileInfo
type fileInfo struct {
path string
numaNode string
pciBdf string // PCI Bus:Device.Function (BDF)
}
func (p *IntelPMT) SampleConfig() string { func (p *IntelPMT) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Init performs one time setup of the plugin
func (p *IntelPMT) Init() error { func (p *IntelPMT) Init() error {
err := p.checkPmtSpec() err := p.checkPmtSpec()
if err != nil { if err != nil {
@ -76,7 +74,6 @@ func (p *IntelPMT) Init() error {
return p.parseXMLs() return p.parseXMLs()
} }
// Gather collects the plugin's metrics.
func (p *IntelPMT) Gather(acc telegraf.Accumulator) error { func (p *IntelPMT) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup var wg sync.WaitGroup
var hasError atomic.Bool var hasError atomic.Bool

View File

@ -17,11 +17,13 @@ type IntelPMT struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*IntelPMT) SampleConfig() string { return sampleConfig }
func (p *IntelPMT) Init() error { func (p *IntelPMT) Init() error {
p.Log.Warn("Current platform is not supported") p.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*IntelPMT) SampleConfig() string { return sampleConfig }
func (*IntelPMT) Gather(_ telegraf.Accumulator) error { return nil } func (*IntelPMT) Gather(_ telegraf.Accumulator) error { return nil }
func init() { func init() {

View File

@ -62,7 +62,7 @@ func (iaEventsActivator) activateMulti(a ia.MultiActivator, p []ia.PlacementProv
} }
type entitiesActivator interface { type entitiesActivator interface {
activateEntities(coreEntities []*CoreEventEntity, uncoreEntities []*UncoreEventEntity) error activateEntities(coreEntities []*coreEventEntity, uncoreEntities []*uncoreEventEntity) error
} }
type iaEntitiesActivator struct { type iaEntitiesActivator struct {
@ -70,7 +70,7 @@ type iaEntitiesActivator struct {
perfActivator eventsActivator perfActivator eventsActivator
} }
func (ea *iaEntitiesActivator) activateEntities(coreEntities []*CoreEventEntity, uncoreEntities []*UncoreEventEntity) error { func (ea *iaEntitiesActivator) activateEntities(coreEntities []*coreEventEntity, uncoreEntities []*uncoreEventEntity) error {
for _, coreEventsEntity := range coreEntities { for _, coreEventsEntity := range coreEntities {
err := ea.activateCoreEvents(coreEventsEntity) err := ea.activateCoreEvents(coreEventsEntity)
if err != nil { if err != nil {
@ -86,7 +86,7 @@ func (ea *iaEntitiesActivator) activateEntities(coreEntities []*CoreEventEntity,
return nil return nil
} }
func (ea *iaEntitiesActivator) activateCoreEvents(entity *CoreEventEntity) error { func (ea *iaEntitiesActivator) activateCoreEvents(entity *coreEventEntity) error {
if entity == nil { if entity == nil {
return errors.New("core events entity is nil") return errors.New("core events entity is nil")
} }
@ -117,7 +117,7 @@ func (ea *iaEntitiesActivator) activateCoreEvents(entity *CoreEventEntity) error
return nil return nil
} }
func (ea *iaEntitiesActivator) activateUncoreEvents(entity *UncoreEventEntity) error { func (ea *iaEntitiesActivator) activateUncoreEvents(entity *uncoreEventEntity) error {
if entity == nil { if entity == nil {
return errors.New("uncore events entity is nil") return errors.New("uncore events entity is nil")
} }
@ -150,7 +150,7 @@ func (ea *iaEntitiesActivator) activateUncoreEvents(entity *UncoreEventEntity) e
return nil return nil
} }
func (ea *iaEntitiesActivator) activateCoreEventsGroup(entity *CoreEventEntity) error { func (ea *iaEntitiesActivator) activateCoreEventsGroup(entity *coreEventEntity) error {
if ea.perfActivator == nil || ea.placementMaker == nil { if ea.perfActivator == nil || ea.placementMaker == nil {
return errors.New("missing perf activator or placement maker") return errors.New("missing perf activator or placement maker")
} }

View File

@ -34,7 +34,7 @@ func TestActivateEntities(t *testing.T) {
// more core test cases in TestActivateCoreEvents // more core test cases in TestActivateCoreEvents
t.Run("failed to activate core events", func(t *testing.T) { t.Run("failed to activate core events", func(t *testing.T) {
tag := "TAG" tag := "TAG"
mEntities := []*CoreEventEntity{{EventsTag: tag}} mEntities := []*coreEventEntity{{EventsTag: tag}}
err := mEntitiesActivator.activateEntities(mEntities, nil) err := mEntitiesActivator.activateEntities(mEntities, nil)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("failed to activate core events %q", tag)) require.Contains(t, err.Error(), fmt.Sprintf("failed to activate core events %q", tag))
@ -43,7 +43,7 @@ func TestActivateEntities(t *testing.T) {
// more uncore test cases in TestActivateUncoreEvents // more uncore test cases in TestActivateUncoreEvents
t.Run("failed to activate uncore events", func(t *testing.T) { t.Run("failed to activate uncore events", func(t *testing.T) {
tag := "TAG" tag := "TAG"
mEntities := []*UncoreEventEntity{{EventsTag: tag}} mEntities := []*uncoreEventEntity{{EventsTag: tag}}
err := mEntitiesActivator.activateEntities(nil, mEntities) err := mEntitiesActivator.activateEntities(nil, mEntities)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("failed to activate uncore events %q", tag)) require.Contains(t, err.Error(), fmt.Sprintf("failed to activate uncore events %q", tag))
@ -69,7 +69,7 @@ func TestActivateUncoreEvents(t *testing.T) {
t.Run("event is nil", func(t *testing.T) { t.Run("event is nil", func(t *testing.T) {
mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: mActivator} mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: mActivator}
mEntity := &UncoreEventEntity{parsedEvents: []*eventWithQuals{nil, nil}} mEntity := &uncoreEventEntity{parsedEvents: []*eventWithQuals{nil, nil}}
err := mEntitiesActivator.activateUncoreEvents(mEntity) err := mEntitiesActivator.activateUncoreEvents(mEntity)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "uncore parsed event is nil") require.Contains(t, err.Error(), "uncore parsed event is nil")
@ -78,7 +78,7 @@ func TestActivateUncoreEvents(t *testing.T) {
t.Run("perf event is nil", func(t *testing.T) { t.Run("perf event is nil", func(t *testing.T) {
mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: mActivator} mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: mActivator}
name := "event name" name := "event name"
mEntity := &UncoreEventEntity{parsedEvents: []*eventWithQuals{{name: name, custom: ia.CustomizableEvent{Event: nil}}}} mEntity := &uncoreEventEntity{parsedEvents: []*eventWithQuals{{name: name, custom: ia.CustomizableEvent{Event: nil}}}}
err := mEntitiesActivator.activateUncoreEvents(mEntity) err := mEntitiesActivator.activateUncoreEvents(mEntity)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("perf event of %q event is nil", name)) require.Contains(t, err.Error(), fmt.Sprintf("perf event of %q event is nil", name))
@ -86,7 +86,7 @@ func TestActivateUncoreEvents(t *testing.T) {
t.Run("placement maker and perf activator is nil", func(t *testing.T) { t.Run("placement maker and perf activator is nil", func(t *testing.T) {
mEntitiesActivator := &iaEntitiesActivator{placementMaker: nil, perfActivator: nil} mEntitiesActivator := &iaEntitiesActivator{placementMaker: nil, perfActivator: nil}
err := mEntitiesActivator.activateUncoreEvents(&UncoreEventEntity{}) err := mEntitiesActivator.activateUncoreEvents(&uncoreEventEntity{})
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "events activator or placement maker is nil") require.Contains(t, err.Error(), "events activator or placement maker is nil")
}) })
@ -95,7 +95,7 @@ func TestActivateUncoreEvents(t *testing.T) {
mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: mActivator} mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: mActivator}
eventName := "mock event 1" eventName := "mock event 1"
parsedEvents := []*eventWithQuals{{name: eventName, custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: eventName}}}} parsedEvents := []*eventWithQuals{{name: eventName, custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: eventName}}}}
mEntity := &UncoreEventEntity{parsedEvents: parsedEvents, parsedSockets: []int{0, 1, 2}} mEntity := &uncoreEventEntity{parsedEvents: parsedEvents, parsedSockets: []int{0, 1, 2}}
mMaker.On("makeUncorePlacements", parsedEvents[0].custom.Event, mEntity.parsedSockets[0]).Return(nil, errMock).Once() mMaker.On("makeUncorePlacements", parsedEvents[0].custom.Event, mEntity.parsedSockets[0]).Return(nil, errMock).Once()
err := mEntitiesActivator.activateUncoreEvents(mEntity) err := mEntitiesActivator.activateUncoreEvents(mEntity)
@ -110,7 +110,7 @@ func TestActivateUncoreEvents(t *testing.T) {
eventName := "mock event 1" eventName := "mock event 1"
parsedEvents := []*eventWithQuals{{name: eventName, custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: eventName}}}} parsedEvents := []*eventWithQuals{{name: eventName, custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: eventName}}}}
placements := []ia.PlacementProvider{&ia.Placement{CPU: 0}, &ia.Placement{CPU: 1}} placements := []ia.PlacementProvider{&ia.Placement{CPU: 0}, &ia.Placement{CPU: 1}}
mEntity := &UncoreEventEntity{parsedEvents: parsedEvents, parsedSockets: []int{0, 1, 2}} mEntity := &uncoreEventEntity{parsedEvents: parsedEvents, parsedSockets: []int{0, 1, 2}}
mMaker.On("makeUncorePlacements", parsedEvents[0].custom.Event, mEntity.parsedSockets[0]).Return(placements, nil).Once() mMaker.On("makeUncorePlacements", parsedEvents[0].custom.Event, mEntity.parsedSockets[0]).Return(placements, nil).Once()
mActivator.On("activateMulti", parsedEvents[0].custom.Event, placements, parsedEvents[0].custom.Options).Return(nil, errMock).Once() mActivator.On("activateMulti", parsedEvents[0].custom.Event, placements, parsedEvents[0].custom.Options).Return(nil, errMock).Once()
@ -131,7 +131,7 @@ func TestActivateUncoreEvents(t *testing.T) {
{custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: "mock event 3", Uncore: true}}}, {custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: "mock event 3", Uncore: true}}},
{custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: "mock event 4", Uncore: true}}}, {custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: "mock event 4", Uncore: true}}},
} }
mEntity := &UncoreEventEntity{parsedEvents: parsedEvents, parsedSockets: []int{0, 1, 2}} mEntity := &uncoreEventEntity{parsedEvents: parsedEvents, parsedSockets: []int{0, 1, 2}}
placements := []ia.PlacementProvider{&ia.Placement{}, &ia.Placement{}, &ia.Placement{}} placements := []ia.PlacementProvider{&ia.Placement{}, &ia.Placement{}, &ia.Placement{}}
var expectedEvents []multiEvent var expectedEvents []multiEvent
@ -166,14 +166,14 @@ func TestActivateCoreEvents(t *testing.T) {
t.Run("placement maker is nil", func(t *testing.T) { t.Run("placement maker is nil", func(t *testing.T) {
mEntitiesActivator := &iaEntitiesActivator{placementMaker: nil, perfActivator: mActivator} mEntitiesActivator := &iaEntitiesActivator{placementMaker: nil, perfActivator: mActivator}
err := mEntitiesActivator.activateCoreEvents(&CoreEventEntity{}) err := mEntitiesActivator.activateCoreEvents(&coreEventEntity{})
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "placement maker is nil") require.Contains(t, err.Error(), "placement maker is nil")
}) })
t.Run("event is nil", func(t *testing.T) { t.Run("event is nil", func(t *testing.T) {
mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: mActivator} mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: mActivator}
mEntity := &CoreEventEntity{parsedEvents: []*eventWithQuals{nil, nil}} mEntity := &coreEventEntity{parsedEvents: []*eventWithQuals{nil, nil}}
err := mEntitiesActivator.activateCoreEvents(mEntity) err := mEntitiesActivator.activateCoreEvents(mEntity)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "core parsed event is nil") require.Contains(t, err.Error(), "core parsed event is nil")
@ -182,7 +182,7 @@ func TestActivateCoreEvents(t *testing.T) {
t.Run("failed to create placements", func(t *testing.T) { t.Run("failed to create placements", func(t *testing.T) {
mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: mActivator} mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: mActivator}
parsedEvents := []*eventWithQuals{{name: "mock event 1", custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: "mock event 1"}}}} parsedEvents := []*eventWithQuals{{name: "mock event 1", custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: "mock event 1"}}}}
mEntity := &CoreEventEntity{PerfGroup: false, parsedEvents: parsedEvents, parsedCores: []int{0, 1, 2}} mEntity := &coreEventEntity{PerfGroup: false, parsedEvents: parsedEvents, parsedCores: []int{0, 1, 2}}
mMaker.On("makeCorePlacements", mEntity.parsedCores, parsedEvents[0].custom.Event).Return(nil, errMock).Once() mMaker.On("makeCorePlacements", mEntity.parsedCores, parsedEvents[0].custom.Event).Return(nil, errMock).Once()
err := mEntitiesActivator.activateCoreEvents(mEntity) err := mEntitiesActivator.activateCoreEvents(mEntity)
@ -197,7 +197,7 @@ func TestActivateCoreEvents(t *testing.T) {
parsedEvents := []*eventWithQuals{{name: "mock event 1", custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: "mock event 1"}}}} parsedEvents := []*eventWithQuals{{name: "mock event 1", custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: "mock event 1"}}}}
placements := []ia.PlacementProvider{&ia.Placement{CPU: 0}, &ia.Placement{CPU: 1}} placements := []ia.PlacementProvider{&ia.Placement{CPU: 0}, &ia.Placement{CPU: 1}}
mEntity := &CoreEventEntity{PerfGroup: false, parsedEvents: parsedEvents, parsedCores: []int{0, 1, 2}} mEntity := &coreEventEntity{PerfGroup: false, parsedEvents: parsedEvents, parsedCores: []int{0, 1, 2}}
event := parsedEvents[0] event := parsedEvents[0]
plc := placements[0] plc := placements[0]
@ -213,7 +213,7 @@ func TestActivateCoreEvents(t *testing.T) {
t.Run("failed to activate core events group", func(t *testing.T) { t.Run("failed to activate core events group", func(t *testing.T) {
mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: nil} mEntitiesActivator := &iaEntitiesActivator{placementMaker: mMaker, perfActivator: nil}
mEntity := &CoreEventEntity{PerfGroup: true, parsedEvents: nil} mEntity := &coreEventEntity{PerfGroup: true, parsedEvents: nil}
err := mEntitiesActivator.activateCoreEvents(mEntity) err := mEntitiesActivator.activateCoreEvents(mEntity)
require.Error(t, err) require.Error(t, err)
@ -230,7 +230,7 @@ func TestActivateCoreEvents(t *testing.T) {
{custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: "mock event 4"}}}, {custom: ia.CustomizableEvent{Event: &ia.PerfEvent{Name: "mock event 4"}}},
} }
placements := []ia.PlacementProvider{&ia.Placement{CPU: 0}, &ia.Placement{CPU: 1}, &ia.Placement{CPU: 2}} placements := []ia.PlacementProvider{&ia.Placement{CPU: 0}, &ia.Placement{CPU: 1}, &ia.Placement{CPU: 2}}
mEntity := &CoreEventEntity{PerfGroup: false, parsedEvents: parsedEvents, parsedCores: []int{0, 1, 2}} mEntity := &coreEventEntity{PerfGroup: false, parsedEvents: parsedEvents, parsedCores: []int{0, 1, 2}}
var activeEvents []*ia.ActiveEvent var activeEvents []*ia.ActiveEvent
for _, event := range parsedEvents { for _, event := range parsedEvents {
@ -265,7 +265,7 @@ func TestActivateCoreEventsGroup(t *testing.T) {
// cannot populate this struct due to unexported events field // cannot populate this struct due to unexported events field
activeGroup := &ia.ActiveEventGroup{} activeGroup := &ia.ActiveEventGroup{}
mEntity := &CoreEventEntity{ mEntity := &coreEventEntity{
EventsTag: "mock group", EventsTag: "mock group",
PerfGroup: true, PerfGroup: true,
parsedEvents: parsedEvents, parsedEvents: parsedEvents,
@ -292,7 +292,7 @@ func TestActivateCoreEventsGroup(t *testing.T) {
}) })
t.Run("nil in parsed event", func(t *testing.T) { t.Run("nil in parsed event", func(t *testing.T) {
mEntity := &CoreEventEntity{EventsTag: "Nice tag", PerfGroup: true, parsedEvents: []*eventWithQuals{nil, nil}} mEntity := &coreEventEntity{EventsTag: "Nice tag", PerfGroup: true, parsedEvents: []*eventWithQuals{nil, nil}}
err := eActivator.activateCoreEventsGroup(mEntity) err := eActivator.activateCoreEventsGroup(mEntity)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "core event is nil") require.Contains(t, err.Error(), "core event is nil")

View File

@ -15,7 +15,7 @@ import (
const maxIDsSize = 1 << 13 const maxIDsSize = 1 << 13
type entitiesParser interface { type entitiesParser interface {
parseEntities(coreEntities []*CoreEventEntity, uncoreEntities []*UncoreEventEntity) (err error) parseEntities(coreEntities []*coreEventEntity, uncoreEntities []*uncoreEventEntity) (err error)
} }
type configParser struct { type configParser struct {
@ -23,7 +23,7 @@ type configParser struct {
sys sysInfoProvider sys sysInfoProvider
} }
func (cp *configParser) parseEntities(coreEntities []*CoreEventEntity, uncoreEntities []*UncoreEventEntity) (err error) { func (cp *configParser) parseEntities(coreEntities []*coreEventEntity, uncoreEntities []*uncoreEventEntity) (err error) {
if len(coreEntities) == 0 && len(uncoreEntities) == 0 { if len(coreEntities) == 0 && len(uncoreEntities) == 0 {
return errors.New("neither core nor uncore entities configured") return errors.New("neither core nor uncore entities configured")
} }

View File

@ -32,12 +32,12 @@ func TestConfigParser_parseEntities(t *testing.T) {
coreTests := []struct { coreTests := []struct {
name string name string
coreEntity *CoreEventEntity coreEntity *coreEventEntity
parsedCoreEvents []*eventWithQuals parsedCoreEvents []*eventWithQuals
parsedCores []int parsedCores []int
coreAll bool coreAll bool
uncoreEntity *UncoreEventEntity uncoreEntity *uncoreEventEntity
parsedUncoreEvents []*eventWithQuals parsedUncoreEvents []*eventWithQuals
parsedSockets []int parsedSockets []int
uncoreAll bool uncoreAll bool
@ -45,31 +45,31 @@ func TestConfigParser_parseEntities(t *testing.T) {
failMsg string failMsg string
}{ }{
{"no events provided", {"no events provided",
&CoreEventEntity{Events: nil, Cores: []string{"1"}}, nil, []int{1}, true, &coreEventEntity{Events: nil, Cores: []string{"1"}}, nil, []int{1}, true,
&UncoreEventEntity{Events: nil, Sockets: []string{"0"}}, nil, []int{0}, true, &uncoreEventEntity{Events: nil, Sockets: []string{"0"}}, nil, []int{0}, true,
""}, ""},
{"uncore entity is nil", {"uncore entity is nil",
&CoreEventEntity{Events: []string{"EVENT"}, Cores: []string{"1,2"}}, []*eventWithQuals{{"EVENT", nil, e}}, []int{1, 2}, false, &coreEventEntity{Events: []string{"EVENT"}, Cores: []string{"1,2"}}, []*eventWithQuals{{"EVENT", nil, e}}, []int{1, 2}, false,
nil, nil, nil, false, nil, nil, nil, false,
"uncore entity is nil"}, "uncore entity is nil"},
{"core entity is nil", {"core entity is nil",
nil, nil, nil, false, nil, nil, nil, false,
&UncoreEventEntity{Events: []string{"EVENT"}, Sockets: []string{"1,2"}}, []*eventWithQuals{{"EVENT", nil, e}}, []int{1, 2}, false, &uncoreEventEntity{Events: []string{"EVENT"}, Sockets: []string{"1,2"}}, []*eventWithQuals{{"EVENT", nil, e}}, []int{1, 2}, false,
"core entity is nil"}, "core entity is nil"},
{"error parsing sockets", {"error parsing sockets",
&CoreEventEntity{Events: nil, Cores: []string{"1,2"}}, nil, []int{1, 2}, true, &coreEventEntity{Events: nil, Cores: []string{"1,2"}}, nil, []int{1, 2}, true,
&UncoreEventEntity{Events: []string{"E"}, Sockets: []string{"wrong sockets"}}, []*eventWithQuals{{"E", nil, e}}, nil, false, &uncoreEventEntity{Events: []string{"E"}, Sockets: []string{"wrong sockets"}}, []*eventWithQuals{{"E", nil, e}}, nil, false,
"error during sockets parsing"}, "error during sockets parsing"},
{"error parsing cores", {"error parsing cores",
&CoreEventEntity{Events: nil, Cores: []string{"wrong cpus"}}, nil, nil, true, &coreEventEntity{Events: nil, Cores: []string{"wrong cpus"}}, nil, nil, true,
&UncoreEventEntity{Events: nil, Sockets: []string{"0,1"}}, nil, []int{0, 1}, true, &uncoreEventEntity{Events: nil, Sockets: []string{"0,1"}}, nil, []int{0, 1}, true,
"error during cores parsing"}, "error during cores parsing"},
{"valid settings", {"valid settings",
&CoreEventEntity{ &coreEventEntity{
Events: []string{"E1", "E2:config=123"}, Events: []string{"E1", "E2:config=123"},
Cores: []string{"1-5"}, Cores: []string{"1-5"},
}, []*eventWithQuals{{"E1", nil, e}, {"E2", []string{"config=123"}, e}}, []int{1, 2, 3, 4, 5}, false, }, []*eventWithQuals{{"E1", nil, e}, {"E2", []string{"config=123"}, e}}, []int{1, 2, 3, 4, 5}, false,
&UncoreEventEntity{ &uncoreEventEntity{
Events: []string{"E1", "E2", "E3"}, Events: []string{"E1", "E2", "E3"},
Sockets: []string{"0,2-6"}, Sockets: []string{"0,2-6"},
}, []*eventWithQuals{{"E1", nil, e}, {"E2", nil, e}, {"E3", nil, e}}, []int{0, 2, 3, 4, 5, 6}, false, }, []*eventWithQuals{{"E1", nil, e}, {"E2", nil, e}, {"E3", nil, e}}, []int{0, 2, 3, 4, 5, 6}, false,
@ -78,8 +78,8 @@ func TestConfigParser_parseEntities(t *testing.T) {
for _, test := range coreTests { for _, test := range coreTests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
coreEntities := []*CoreEventEntity{test.coreEntity} coreEntities := []*coreEventEntity{test.coreEntity}
uncoreEntities := []*UncoreEventEntity{test.uncoreEntity} uncoreEntities := []*uncoreEventEntity{test.uncoreEntity}
err := mConfigParser.parseEntities(coreEntities, uncoreEntities) err := mConfigParser.parseEntities(coreEntities, uncoreEntities)

View File

@ -26,6 +26,56 @@ var sampleConfig string
// Linux availability: https://www.kernel.org/doc/Documentation/sysctl/fs.txt // Linux availability: https://www.kernel.org/doc/Documentation/sysctl/fs.txt
const fileMaxPath = "/proc/sys/fs/file-max" const fileMaxPath = "/proc/sys/fs/file-max"
type IntelPMU struct {
EventListPaths []string `toml:"event_definitions"`
CoreEntities []*coreEventEntity `toml:"core_events"`
UncoreEntities []*uncoreEventEntity `toml:"uncore_events"`
Log telegraf.Logger `toml:"-"`
fileInfo fileInfoProvider
entitiesReader entitiesValuesReader
}
type coreEventEntity struct {
Events []string `toml:"events"`
Cores []string `toml:"cores"`
EventsTag string `toml:"events_tag"`
PerfGroup bool `toml:"perf_group"`
parsedEvents []*eventWithQuals
parsedCores []int
allEvents bool
activeEvents []*ia.ActiveEvent
}
type uncoreEventEntity struct {
Events []string `toml:"events"`
Sockets []string `toml:"sockets"`
Aggregate bool `toml:"aggregate_uncore_units"`
EventsTag string `toml:"events_tag"`
parsedEvents []*eventWithQuals
parsedSockets []int
allEvents bool
activeMultiEvents []multiEvent
}
type multiEvent struct {
activeEvents []*ia.ActiveEvent
perfEvent *ia.PerfEvent
socket int
}
type eventWithQuals struct {
name string
qualifiers []string
custom ia.CustomizableEvent
}
type fileInfoProvider interface { type fileInfoProvider interface {
readFile(string) ([]byte, error) readFile(string) ([]byte, error)
lstat(string) (os.FileInfo, error) lstat(string) (os.FileInfo, error)
@ -63,65 +113,6 @@ func (iaSysInfo) allSockets() ([]int, error) {
return ia.AllSockets() return ia.AllSockets()
} }
// IntelPMU is the plugin type.
type IntelPMU struct {
EventListPaths []string `toml:"event_definitions"`
CoreEntities []*CoreEventEntity `toml:"core_events"`
UncoreEntities []*UncoreEventEntity `toml:"uncore_events"`
Log telegraf.Logger `toml:"-"`
fileInfo fileInfoProvider
entitiesReader entitiesValuesReader
}
// CoreEventEntity represents config section for core events.
type CoreEventEntity struct {
Events []string `toml:"events"`
Cores []string `toml:"cores"`
EventsTag string `toml:"events_tag"`
PerfGroup bool `toml:"perf_group"`
parsedEvents []*eventWithQuals
parsedCores []int
allEvents bool
activeEvents []*ia.ActiveEvent
}
// UncoreEventEntity represents config section for uncore events.
type UncoreEventEntity struct {
Events []string `toml:"events"`
Sockets []string `toml:"sockets"`
Aggregate bool `toml:"aggregate_uncore_units"`
EventsTag string `toml:"events_tag"`
parsedEvents []*eventWithQuals
parsedSockets []int
allEvents bool
activeMultiEvents []multiEvent
}
type multiEvent struct {
activeEvents []*ia.ActiveEvent
perfEvent *ia.PerfEvent
socket int
}
type eventWithQuals struct {
name string
qualifiers []string
custom ia.CustomizableEvent
}
// Start is required for IntelPMU to implement the telegraf.ServiceInput interface.
// Necessary initialization and config checking are done in Init.
func (IntelPMU) Start(_ telegraf.Accumulator) error {
return nil
}
func (*IntelPMU) SampleConfig() string { func (*IntelPMU) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -146,65 +137,9 @@ func (i *IntelPMU) Init() error {
return i.initialization(parser, resolver, activator) return i.initialization(parser, resolver, activator)
} }
func (i *IntelPMU) initialization(parser entitiesParser, resolver entitiesResolver, activator entitiesActivator) error { // Start is required for IntelPMU to implement the telegraf.ServiceInput interface.
if parser == nil || resolver == nil || activator == nil { // Necessary initialization and config checking are done in Init.
return errors.New("entities parser and/or resolver and/or activator is nil") func (*IntelPMU) Start(_ telegraf.Accumulator) error {
}
err := parser.parseEntities(i.CoreEntities, i.UncoreEntities)
if err != nil {
return fmt.Errorf("error during parsing configuration sections: %w", err)
}
err = resolver.resolveEntities(i.CoreEntities, i.UncoreEntities)
if err != nil {
return fmt.Errorf("error during events resolving: %w", err)
}
err = i.checkFileDescriptors()
if err != nil {
return fmt.Errorf("error during file descriptors checking: %w", err)
}
err = activator.activateEntities(i.CoreEntities, i.UncoreEntities)
if err != nil {
return fmt.Errorf("error during events activation: %w", err)
}
return nil
}
func (i *IntelPMU) checkFileDescriptors() error {
coreFd, err := estimateCoresFd(i.CoreEntities)
if err != nil {
return fmt.Errorf("failed to estimate number of core events file descriptors: %w", err)
}
uncoreFd, err := estimateUncoreFd(i.UncoreEntities)
if err != nil {
return fmt.Errorf("failed to estimate number of uncore events file descriptors: %w", err)
}
if coreFd > math.MaxUint64-uncoreFd {
return errors.New("requested number of file descriptors exceeds uint64")
}
allFd := coreFd + uncoreFd
// maximum file descriptors enforced on a kernel level
maxFd, err := readMaxFD(i.fileInfo)
if err != nil {
i.Log.Warnf("Cannot obtain number of available file descriptors: %v", err)
} else if allFd > maxFd {
return fmt.Errorf("required file descriptors number `%d` exceeds maximum number of available file descriptors `%d`"+
": consider increasing the maximum number", allFd, maxFd)
}
// soft limit for current process
limit, err := i.fileInfo.fileLimit()
if err != nil {
i.Log.Warnf("Cannot obtain limit value of open files: %v", err)
} else if allFd > limit {
return fmt.Errorf("required file descriptors number `%d` exceeds soft limit of open files `%d`"+
": consider increasing the limit", allFd, limit)
}
return nil return nil
} }
@ -271,6 +206,68 @@ func (i *IntelPMU) Stop() {
} }
} }
func (i *IntelPMU) initialization(parser entitiesParser, resolver entitiesResolver, activator entitiesActivator) error {
if parser == nil || resolver == nil || activator == nil {
return errors.New("entities parser and/or resolver and/or activator is nil")
}
err := parser.parseEntities(i.CoreEntities, i.UncoreEntities)
if err != nil {
return fmt.Errorf("error during parsing configuration sections: %w", err)
}
err = resolver.resolveEntities(i.CoreEntities, i.UncoreEntities)
if err != nil {
return fmt.Errorf("error during events resolving: %w", err)
}
err = i.checkFileDescriptors()
if err != nil {
return fmt.Errorf("error during file descriptors checking: %w", err)
}
err = activator.activateEntities(i.CoreEntities, i.UncoreEntities)
if err != nil {
return fmt.Errorf("error during events activation: %w", err)
}
return nil
}
func (i *IntelPMU) checkFileDescriptors() error {
coreFd, err := estimateCoresFd(i.CoreEntities)
if err != nil {
return fmt.Errorf("failed to estimate number of core events file descriptors: %w", err)
}
uncoreFd, err := estimateUncoreFd(i.UncoreEntities)
if err != nil {
return fmt.Errorf("failed to estimate number of uncore events file descriptors: %w", err)
}
if coreFd > math.MaxUint64-uncoreFd {
return errors.New("requested number of file descriptors exceeds uint64")
}
allFd := coreFd + uncoreFd
// maximum file descriptors enforced on a kernel level
maxFd, err := readMaxFD(i.fileInfo)
if err != nil {
i.Log.Warnf("Cannot obtain number of available file descriptors: %v", err)
} else if allFd > maxFd {
return fmt.Errorf("required file descriptors number `%d` exceeds maximum number of available file descriptors `%d`"+
": consider increasing the maximum number", allFd, maxFd)
}
// soft limit for current process
limit, err := i.fileInfo.fileLimit()
if err != nil {
i.Log.Warnf("Cannot obtain limit value of open files: %v", err)
} else if allFd > limit {
return fmt.Errorf("required file descriptors number `%d` exceeds soft limit of open files `%d`"+
": consider increasing the limit", allFd, limit)
}
return nil
}
func newReader(log telegraf.Logger, files []string) (*ia.JSONFilesReader, error) { func newReader(log telegraf.Logger, files []string) (*ia.JSONFilesReader, error) {
reader := ia.NewFilesReader() reader := ia.NewFilesReader()
for _, file := range files { for _, file := range files {
@ -287,7 +284,7 @@ func newReader(log telegraf.Logger, files []string) (*ia.JSONFilesReader, error)
return reader, nil return reader, nil
} }
func estimateCoresFd(entities []*CoreEventEntity) (uint64, error) { func estimateCoresFd(entities []*coreEventEntity) (uint64, error) {
var err error var err error
number := uint64(0) number := uint64(0)
for _, entity := range entities { for _, entity := range entities {
@ -304,7 +301,7 @@ func estimateCoresFd(entities []*CoreEventEntity) (uint64, error) {
return number, nil return number, nil
} }
func estimateUncoreFd(entities []*UncoreEventEntity) (uint64, error) { func estimateUncoreFd(entities []*uncoreEventEntity) (uint64, error) {
var err error var err error
number := uint64(0) number := uint64(0)
for _, entity := range entities { for _, entity := range entities {

View File

@ -17,14 +17,18 @@ type IntelPMU struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*IntelPMU) SampleConfig() string { return sampleConfig }
func (i *IntelPMU) Init() error { func (i *IntelPMU) Init() error {
i.Log.Warn("current platform is not supported") i.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*IntelPMU) SampleConfig() string { return sampleConfig }
func (*IntelPMU) Start(_ telegraf.Accumulator) error { return nil }
func (*IntelPMU) Gather(_ telegraf.Accumulator) error { return nil } func (*IntelPMU) Gather(_ telegraf.Accumulator) error { return nil }
func (*IntelPMU) Start(_ telegraf.Accumulator) error { return nil }
func (*IntelPMU) Stop() {} func (*IntelPMU) Stop() {}
func init() { func init() {
inputs.Add("intel_pmu", func() telegraf.Input { inputs.Add("intel_pmu", func() telegraf.Input {

View File

@ -63,7 +63,7 @@ func TestInitialization(t *testing.T) {
t.Run("exceeded file descriptors", func(t *testing.T) { t.Run("exceeded file descriptors", func(t *testing.T) {
limit := []byte("10") limit := []byte("10")
uncoreEntities := []*UncoreEventEntity{{parsedEvents: makeEvents(10, 21), parsedSockets: makeIDs(5)}} uncoreEntities := []*uncoreEventEntity{{parsedEvents: makeEvents(10, 21), parsedSockets: makeIDs(5)}}
estimation := 1050 estimation := 1050
mIntelPMU := IntelPMU{EventListPaths: paths, Log: testutil.Logger{}, fileInfo: mFileInfo, UncoreEntities: uncoreEntities} mIntelPMU := IntelPMU{EventListPaths: paths, Log: testutil.Logger{}, fileInfo: mFileInfo, UncoreEntities: uncoreEntities}
@ -291,31 +291,31 @@ func TestGather(t *testing.T) {
func TestCheckFileDescriptors(t *testing.T) { func TestCheckFileDescriptors(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
uncores []*UncoreEventEntity uncores []*uncoreEventEntity
cores []*CoreEventEntity cores []*coreEventEntity
estimation uint64 estimation uint64
maxFD []byte maxFD []byte
fileLimit uint64 fileLimit uint64
errMsg string errMsg string
}{ }{
{"exceed maximum file descriptors number", []*UncoreEventEntity{ {"exceed maximum file descriptors number", []*uncoreEventEntity{
{parsedEvents: makeEvents(100, 21), parsedSockets: makeIDs(5)}, {parsedEvents: makeEvents(100, 21), parsedSockets: makeIDs(5)},
{parsedEvents: makeEvents(25, 3), parsedSockets: makeIDs(7)}, {parsedEvents: makeEvents(25, 3), parsedSockets: makeIDs(7)},
{parsedEvents: makeEvents(2, 7), parsedSockets: makeIDs(20)}}, {parsedEvents: makeEvents(2, 7), parsedSockets: makeIDs(20)}},
[]*CoreEventEntity{ []*coreEventEntity{
{parsedEvents: makeEvents(100, 1), parsedCores: makeIDs(5)}, {parsedEvents: makeEvents(100, 1), parsedCores: makeIDs(5)},
{parsedEvents: makeEvents(25, 1), parsedCores: makeIDs(7)}, {parsedEvents: makeEvents(25, 1), parsedCores: makeIDs(7)},
{parsedEvents: makeEvents(2, 1), parsedCores: makeIDs(20)}}, {parsedEvents: makeEvents(2, 1), parsedCores: makeIDs(20)}},
12020, []byte("11000"), 8000, fmt.Sprintf("required file descriptors number `%d` exceeds maximum number of available file descriptors `%d`"+ 12020, []byte("11000"), 8000, fmt.Sprintf("required file descriptors number `%d` exceeds maximum number of available file descriptors `%d`"+
": consider increasing the maximum number", 12020, 11000), ": consider increasing the maximum number", 12020, 11000),
}, },
{"exceed soft file limit", []*UncoreEventEntity{{parsedEvents: makeEvents(100, 21), parsedSockets: makeIDs(5)}}, []*CoreEventEntity{ {"exceed soft file limit", []*uncoreEventEntity{{parsedEvents: makeEvents(100, 21), parsedSockets: makeIDs(5)}}, []*coreEventEntity{
{parsedEvents: makeEvents(100, 1), parsedCores: makeIDs(5)}}, {parsedEvents: makeEvents(100, 1), parsedCores: makeIDs(5)}},
11000, []byte("2515357"), 800, fmt.Sprintf("required file descriptors number `%d` exceeds soft limit of open files `%d`"+ 11000, []byte("2515357"), 800, fmt.Sprintf("required file descriptors number `%d` exceeds soft limit of open files `%d`"+
": consider increasing the limit", 11000, 800), ": consider increasing the limit", 11000, 800),
}, },
{"no exceeds", []*UncoreEventEntity{{parsedEvents: makeEvents(100, 21), parsedSockets: makeIDs(5)}}, {"no exceeds", []*uncoreEventEntity{{parsedEvents: makeEvents(100, 21), parsedSockets: makeIDs(5)}},
[]*CoreEventEntity{{parsedEvents: makeEvents(100, 1), parsedCores: makeIDs(5)}}, []*coreEventEntity{{parsedEvents: makeEvents(100, 1), parsedCores: makeIDs(5)}},
11000, []byte("2515357"), 13000, "", 11000, []byte("2515357"), 13000, "",
}, },
} }
@ -347,14 +347,14 @@ func TestCheckFileDescriptors(t *testing.T) {
func TestEstimateUncoreFd(t *testing.T) { func TestEstimateUncoreFd(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
entities []*UncoreEventEntity entities []*uncoreEventEntity
result uint64 result uint64
}{ }{
{"nil entities", nil, 0}, {"nil entities", nil, 0},
{"nil perf event", []*UncoreEventEntity{{parsedEvents: []*eventWithQuals{{"", nil, ia.CustomizableEvent{}}}, parsedSockets: makeIDs(0)}}, 0}, {"nil perf event", []*uncoreEventEntity{{parsedEvents: []*eventWithQuals{{"", nil, ia.CustomizableEvent{}}}, parsedSockets: makeIDs(0)}}, 0},
{"one uncore entity", []*UncoreEventEntity{{parsedEvents: makeEvents(10, 10), parsedSockets: makeIDs(20)}}, 2000}, {"one uncore entity", []*uncoreEventEntity{{parsedEvents: makeEvents(10, 10), parsedSockets: makeIDs(20)}}, 2000},
{"nil entity", []*UncoreEventEntity{nil, {parsedEvents: makeEvents(1, 8), parsedSockets: makeIDs(1)}}, 8}, {"nil entity", []*uncoreEventEntity{nil, {parsedEvents: makeEvents(1, 8), parsedSockets: makeIDs(1)}}, 8},
{"many core entities", []*UncoreEventEntity{ {"many core entities", []*uncoreEventEntity{
{parsedEvents: makeEvents(100, 21), parsedSockets: makeIDs(5)}, {parsedEvents: makeEvents(100, 21), parsedSockets: makeIDs(5)},
{parsedEvents: makeEvents(25, 3), parsedSockets: makeIDs(7)}, {parsedEvents: makeEvents(25, 3), parsedSockets: makeIDs(7)},
{parsedEvents: makeEvents(2, 7), parsedSockets: makeIDs(20)}, {parsedEvents: makeEvents(2, 7), parsedSockets: makeIDs(20)},
@ -374,19 +374,19 @@ func TestEstimateUncoreFd(t *testing.T) {
func TestEstimateCoresFd(t *testing.T) { func TestEstimateCoresFd(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
entities []*CoreEventEntity entities []*coreEventEntity
result uint64 result uint64
}{ }{
{"nil entities", nil, 0}, {"nil entities", nil, 0},
{"one core entity", []*CoreEventEntity{{parsedEvents: makeEvents(10, 1), parsedCores: makeIDs(20)}}, 200}, {"one core entity", []*coreEventEntity{{parsedEvents: makeEvents(10, 1), parsedCores: makeIDs(20)}}, 200},
{"nil entity", []*CoreEventEntity{nil, {parsedEvents: makeEvents(10, 1), parsedCores: makeIDs(20)}}, 200}, {"nil entity", []*coreEventEntity{nil, {parsedEvents: makeEvents(10, 1), parsedCores: makeIDs(20)}}, 200},
{"many core entities", []*CoreEventEntity{ {"many core entities", []*coreEventEntity{
{parsedEvents: makeEvents(100, 1), parsedCores: makeIDs(5)}, {parsedEvents: makeEvents(100, 1), parsedCores: makeIDs(5)},
{parsedEvents: makeEvents(25, 1), parsedCores: makeIDs(7)}, {parsedEvents: makeEvents(25, 1), parsedCores: makeIDs(7)},
{parsedEvents: makeEvents(2, 1), parsedCores: makeIDs(20)}, {parsedEvents: makeEvents(2, 1), parsedCores: makeIDs(20)},
}, 715}, }, 715},
{"1024 events", []*CoreEventEntity{{parsedEvents: makeEvents(1024, 1), parsedCores: makeIDs(12)}}, 12288}, {"1024 events", []*coreEventEntity{{parsedEvents: makeEvents(1024, 1), parsedCores: makeIDs(12)}}, 12288},
{"big number", []*CoreEventEntity{{parsedEvents: makeEvents(1024, 1), parsedCores: makeIDs(1048576)}}, 1073741824}, {"big number", []*coreEventEntity{{parsedEvents: makeEvents(1024, 1), parsedCores: makeIDs(1048576)}}, 1073741824},
} }
for _, test := range tests { for _, test := range tests {

View File

@ -41,11 +41,11 @@ type mockEntitiesValuesReader struct {
} }
// readEntities provides a mock function with given fields: _a0, _a1 // readEntities provides a mock function with given fields: _a0, _a1
func (_m *mockEntitiesValuesReader) readEntities(_a0 []*CoreEventEntity, _a1 []*UncoreEventEntity) ([]coreMetric, []uncoreMetric, error) { func (_m *mockEntitiesValuesReader) readEntities(_a0 []*coreEventEntity, _a1 []*uncoreEventEntity) ([]coreMetric, []uncoreMetric, error) {
ret := _m.Called(_a0, _a1) ret := _m.Called(_a0, _a1)
var r0 []coreMetric var r0 []coreMetric
if rf, ok := ret.Get(0).(func([]*CoreEventEntity, []*UncoreEventEntity) []coreMetric); ok { if rf, ok := ret.Get(0).(func([]*coreEventEntity, []*uncoreEventEntity) []coreMetric); ok {
r0 = rf(_a0, _a1) r0 = rf(_a0, _a1)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
@ -54,7 +54,7 @@ func (_m *mockEntitiesValuesReader) readEntities(_a0 []*CoreEventEntity, _a1 []*
} }
var r1 []uncoreMetric var r1 []uncoreMetric
if rf, ok := ret.Get(1).(func([]*CoreEventEntity, []*UncoreEventEntity) []uncoreMetric); ok { if rf, ok := ret.Get(1).(func([]*coreEventEntity, []*uncoreEventEntity) []uncoreMetric); ok {
r1 = rf(_a0, _a1) r1 = rf(_a0, _a1)
} else { } else {
if ret.Get(1) != nil { if ret.Get(1) != nil {
@ -63,7 +63,7 @@ func (_m *mockEntitiesValuesReader) readEntities(_a0 []*CoreEventEntity, _a1 []*
} }
var r2 error var r2 error
if rf, ok := ret.Get(2).(func([]*CoreEventEntity, []*UncoreEventEntity) error); ok { if rf, ok := ret.Get(2).(func([]*coreEventEntity, []*uncoreEventEntity) error); ok {
r2 = rf(_a0, _a1) r2 = rf(_a0, _a1)
} else { } else {
r2 = ret.Error(2) r2 = ret.Error(2)
@ -78,11 +78,11 @@ type mockEntitiesActivator struct {
} }
// activateEntities provides a mock function with given fields: coreEntities, uncoreEntities // activateEntities provides a mock function with given fields: coreEntities, uncoreEntities
func (_m *mockEntitiesActivator) activateEntities(coreEntities []*CoreEventEntity, uncoreEntities []*UncoreEventEntity) error { func (_m *mockEntitiesActivator) activateEntities(coreEntities []*coreEventEntity, uncoreEntities []*uncoreEventEntity) error {
ret := _m.Called(coreEntities, uncoreEntities) ret := _m.Called(coreEntities, uncoreEntities)
var r0 error var r0 error
if rf, ok := ret.Get(0).(func([]*CoreEventEntity, []*UncoreEventEntity) error); ok { if rf, ok := ret.Get(0).(func([]*coreEventEntity, []*uncoreEventEntity) error); ok {
r0 = rf(coreEntities, uncoreEntities) r0 = rf(coreEntities, uncoreEntities)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
@ -97,11 +97,11 @@ type mockEntitiesParser struct {
} }
// parseEntities provides a mock function with given fields: coreEntities, uncoreEntities // parseEntities provides a mock function with given fields: coreEntities, uncoreEntities
func (_m *mockEntitiesParser) parseEntities(coreEntities []*CoreEventEntity, uncoreEntities []*UncoreEventEntity) error { func (_m *mockEntitiesParser) parseEntities(coreEntities []*coreEventEntity, uncoreEntities []*uncoreEventEntity) error {
ret := _m.Called(coreEntities, uncoreEntities) ret := _m.Called(coreEntities, uncoreEntities)
var r0 error var r0 error
if rf, ok := ret.Get(0).(func([]*CoreEventEntity, []*UncoreEventEntity) error); ok { if rf, ok := ret.Get(0).(func([]*coreEventEntity, []*uncoreEventEntity) error); ok {
r0 = rf(coreEntities, uncoreEntities) r0 = rf(coreEntities, uncoreEntities)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)
@ -116,11 +116,11 @@ type mockEntitiesResolver struct {
} }
// resolveEntities provides a mock function with given fields: coreEntities, uncoreEntities // resolveEntities provides a mock function with given fields: coreEntities, uncoreEntities
func (_m *mockEntitiesResolver) resolveEntities(coreEntities []*CoreEventEntity, uncoreEntities []*UncoreEventEntity) error { func (_m *mockEntitiesResolver) resolveEntities(coreEntities []*coreEventEntity, uncoreEntities []*uncoreEventEntity) error {
ret := _m.Called(coreEntities, uncoreEntities) ret := _m.Called(coreEntities, uncoreEntities)
var r0 error var r0 error
if rf, ok := ret.Get(0).(func([]*CoreEventEntity, []*UncoreEventEntity) error); ok { if rf, ok := ret.Get(0).(func([]*coreEventEntity, []*uncoreEventEntity) error); ok {
r0 = rf(coreEntities, uncoreEntities) r0 = rf(coreEntities, uncoreEntities)
} else { } else {
r0 = ret.Error(0) r0 = ret.Error(0)

View File

@ -48,7 +48,7 @@ func (iaValuesReader) readValue(event *ia.ActiveEvent) (ia.CounterValue, error)
} }
type entitiesValuesReader interface { type entitiesValuesReader interface {
readEntities([]*CoreEventEntity, []*UncoreEventEntity) ([]coreMetric, []uncoreMetric, error) readEntities([]*coreEventEntity, []*uncoreEventEntity) ([]coreMetric, []uncoreMetric, error)
} }
type iaEntitiesValuesReader struct { type iaEntitiesValuesReader struct {
@ -66,7 +66,7 @@ func (realClock) now() time.Time {
return time.Now() return time.Now()
} }
func (ie *iaEntitiesValuesReader) readEntities(coreEntities []*CoreEventEntity, uncoreEntities []*UncoreEventEntity) ([]coreMetric, []uncoreMetric, error) { func (ie *iaEntitiesValuesReader) readEntities(coreEntities []*coreEventEntity, uncoreEntities []*uncoreEventEntity) ([]coreMetric, []uncoreMetric, error) {
var coreMetrics []coreMetric var coreMetrics []coreMetric
var uncoreMetrics []uncoreMetric var uncoreMetrics []uncoreMetric
@ -87,7 +87,7 @@ func (ie *iaEntitiesValuesReader) readEntities(coreEntities []*CoreEventEntity,
return coreMetrics, uncoreMetrics, nil return coreMetrics, uncoreMetrics, nil
} }
func (ie *iaEntitiesValuesReader) readCoreEvents(entity *CoreEventEntity) ([]coreMetric, error) { func (ie *iaEntitiesValuesReader) readCoreEvents(entity *coreEventEntity) ([]coreMetric, error) {
if ie.eventReader == nil || ie.timer == nil { if ie.eventReader == nil || ie.timer == nil {
return nil, errors.New("event values reader or timer is nil") return nil, errors.New("event values reader or timer is nil")
} }
@ -126,7 +126,7 @@ func (ie *iaEntitiesValuesReader) readCoreEvents(entity *CoreEventEntity) ([]cor
return metrics, nil return metrics, nil
} }
func (ie *iaEntitiesValuesReader) readUncoreEvents(entity *UncoreEventEntity) ([]uncoreMetric, error) { func (ie *iaEntitiesValuesReader) readUncoreEvents(entity *uncoreEventEntity) ([]uncoreMetric, error) {
if entity == nil { if entity == nil {
return nil, errors.New("entity is nil") return nil, errors.New("entity is nil")
} }

View File

@ -51,7 +51,7 @@ func TestReadCoreEvents(t *testing.T) {
}) })
t.Run("nil events", func(t *testing.T) { t.Run("nil events", func(t *testing.T) {
entity := &CoreEventEntity{} entity := &coreEventEntity{}
entity.activeEvents = append(entity.activeEvents, nil) entity.activeEvents = append(entity.activeEvents, nil)
metrics, err := mEntitiesReader.readCoreEvents(entity) metrics, err := mEntitiesReader.readCoreEvents(entity)
@ -65,7 +65,7 @@ func TestReadCoreEvents(t *testing.T) {
errMock := errors.New("mock error") errMock := errors.New("mock error")
event := &ia.ActiveEvent{PerfEvent: &ia.PerfEvent{Name: "event1"}} event := &ia.ActiveEvent{PerfEvent: &ia.PerfEvent{Name: "event1"}}
entity := &CoreEventEntity{} entity := &coreEventEntity{}
entity.activeEvents = append(entity.activeEvents, event) entity.activeEvents = append(entity.activeEvents, event)
mReader.On("readValue", event).Return(ia.CounterValue{}, errMock).Once() mReader.On("readValue", event).Return(ia.CounterValue{}, errMock).Once()
@ -79,7 +79,7 @@ func TestReadCoreEvents(t *testing.T) {
}) })
t.Run("read active events values", func(t *testing.T) { t.Run("read active events values", func(t *testing.T) {
entity := &CoreEventEntity{} entity := &coreEventEntity{}
var expected []coreMetric var expected []coreMetric
tEvents := []eventWithValues{ tEvents := []eventWithValues{
@ -346,7 +346,7 @@ func TestReadUncoreEvents(t *testing.T) {
time: mTimer.now(), time: mTimer.now(),
} }
expected := []uncoreMetric{newMetric, newMetric2} expected := []uncoreMetric{newMetric, newMetric2}
entityAgg := &UncoreEventEntity{Aggregate: true, activeMultiEvents: []multiEvent{multi, multi2}} entityAgg := &uncoreEventEntity{Aggregate: true, activeMultiEvents: []multiEvent{multi, multi2}}
metrics, err := mEntitiesReader.readUncoreEvents(entityAgg) metrics, err := mEntitiesReader.readUncoreEvents(entityAgg)
@ -360,7 +360,7 @@ func TestReadUncoreEvents(t *testing.T) {
mReader.On("readValue", event).Return(ia.CounterValue{}, errMock).Once() mReader.On("readValue", event).Return(ia.CounterValue{}, errMock).Once()
entityAgg := &UncoreEventEntity{Aggregate: true, activeMultiEvents: []multiEvent{multi}} entityAgg := &uncoreEventEntity{Aggregate: true, activeMultiEvents: []multiEvent{multi}}
metrics, err = mEntitiesReader.readUncoreEvents(entityAgg) metrics, err = mEntitiesReader.readUncoreEvents(entityAgg)
require.Error(t, err) require.Error(t, err)
@ -416,7 +416,7 @@ func TestReadUncoreEvents(t *testing.T) {
} }
expected = append(expected, newMetric) expected = append(expected, newMetric)
} }
entity := &UncoreEventEntity{activeMultiEvents: []multiEvent{multi, multi2}} entity := &uncoreEventEntity{activeMultiEvents: []multiEvent{multi, multi2}}
metrics, err := mEntitiesReader.readUncoreEvents(entity) metrics, err := mEntitiesReader.readUncoreEvents(entity)
@ -430,7 +430,7 @@ func TestReadUncoreEvents(t *testing.T) {
mReader.On("readValue", event).Return(ia.CounterValue{}, errMock).Once() mReader.On("readValue", event).Return(ia.CounterValue{}, errMock).Once()
entityAgg := &UncoreEventEntity{activeMultiEvents: []multiEvent{multi}} entityAgg := &uncoreEventEntity{activeMultiEvents: []multiEvent{multi}}
metrics, err = mEntitiesReader.readUncoreEvents(entityAgg) metrics, err = mEntitiesReader.readUncoreEvents(entityAgg)
require.Error(t, err) require.Error(t, err)
@ -477,9 +477,9 @@ func TestReadEntities(t *testing.T) {
time: mTimer.now(), time: mTimer.now(),
} }
coreEntities := []*CoreEventEntity{{activeEvents: activeCoreEvent}, {activeEvents: activeCoreEvent2}} coreEntities := []*coreEventEntity{{activeEvents: activeCoreEvent}, {activeEvents: activeCoreEvent2}}
uncoreEntities := []*UncoreEventEntity{ uncoreEntities := []*uncoreEventEntity{
{activeMultiEvents: []multiEvent{{activeEvents: activeUncoreEvent, perfEvent: uncorePerfEvent, socket: socket}}}, {activeMultiEvents: []multiEvent{{activeEvents: activeUncoreEvent, perfEvent: uncorePerfEvent, socket: socket}}},
{activeMultiEvents: []multiEvent{{activeEvents: activeUncoreEvent2, perfEvent: uncorePerfEvent2, socket: socket}}}, {activeMultiEvents: []multiEvent{{activeEvents: activeUncoreEvent2, perfEvent: uncorePerfEvent2, socket: socket}}},
} }
@ -501,7 +501,7 @@ func TestReadEntities(t *testing.T) {
}) })
t.Run("core entity reading failed", func(t *testing.T) { t.Run("core entity reading failed", func(t *testing.T) {
coreEntities := []*CoreEventEntity{nil} coreEntities := []*coreEventEntity{nil}
coreMetrics, uncoreMetrics, err := mEntitiesReader.readEntities(coreEntities, nil) coreMetrics, uncoreMetrics, err := mEntitiesReader.readEntities(coreEntities, nil)
require.Error(t, err) require.Error(t, err)
@ -511,7 +511,7 @@ func TestReadEntities(t *testing.T) {
}) })
t.Run("uncore entity reading failed", func(t *testing.T) { t.Run("uncore entity reading failed", func(t *testing.T) {
uncoreEntities := []*UncoreEventEntity{nil} uncoreEntities := []*uncoreEventEntity{nil}
coreMetrics, uncoreMetrics, err := mEntitiesReader.readEntities(nil, uncoreEntities) coreMetrics, uncoreMetrics, err := mEntitiesReader.readEntities(nil, uncoreEntities)
require.Error(t, err) require.Error(t, err)

View File

@ -13,7 +13,7 @@ import (
) )
type entitiesResolver interface { type entitiesResolver interface {
resolveEntities(coreEntities []*CoreEventEntity, uncoreEntities []*UncoreEventEntity) error resolveEntities(coreEntities []*coreEventEntity, uncoreEntities []*uncoreEventEntity) error
} }
type iaEntitiesResolver struct { type iaEntitiesResolver struct {
@ -22,7 +22,7 @@ type iaEntitiesResolver struct {
log telegraf.Logger log telegraf.Logger
} }
func (e *iaEntitiesResolver) resolveEntities(coreEntities []*CoreEventEntity, uncoreEntities []*UncoreEventEntity) error { func (e *iaEntitiesResolver) resolveEntities(coreEntities []*coreEventEntity, uncoreEntities []*uncoreEventEntity) error {
for _, entity := range coreEntities { for _, entity := range coreEntities {
if entity == nil { if entity == nil {
return errors.New("core entity is nil") return errors.New("core entity is nil")

View File

@ -25,27 +25,27 @@ func TestResolveEntities(t *testing.T) {
} }
t.Run("nil entities", func(t *testing.T) { t.Run("nil entities", func(t *testing.T) {
err := mResolver.resolveEntities([]*CoreEventEntity{nil}, nil) err := mResolver.resolveEntities([]*coreEventEntity{nil}, nil)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "core entity is nil") require.Contains(t, err.Error(), "core entity is nil")
err = mResolver.resolveEntities(nil, []*UncoreEventEntity{nil}) err = mResolver.resolveEntities(nil, []*uncoreEventEntity{nil})
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "uncore entity is nil") require.Contains(t, err.Error(), "uncore entity is nil")
}) })
t.Run("nil parsed events", func(t *testing.T) { t.Run("nil parsed events", func(t *testing.T) {
mCoreEntity := &CoreEventEntity{parsedEvents: []*eventWithQuals{nil, nil}} mCoreEntity := &coreEventEntity{parsedEvents: []*eventWithQuals{nil, nil}}
mUncoreEntity := &UncoreEventEntity{parsedEvents: []*eventWithQuals{nil, nil}} mUncoreEntity := &uncoreEventEntity{parsedEvents: []*eventWithQuals{nil, nil}}
err := mResolver.resolveEntities([]*CoreEventEntity{mCoreEntity}, nil) err := mResolver.resolveEntities([]*coreEventEntity{mCoreEntity}, nil)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "parsed core event is nil") require.Contains(t, err.Error(), "parsed core event is nil")
err = mResolver.resolveEntities(nil, []*UncoreEventEntity{mUncoreEntity}) err = mResolver.resolveEntities(nil, []*uncoreEventEntity{mUncoreEntity})
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "parsed uncore event is nil") require.Contains(t, err.Error(), "parsed uncore event is nil")
@ -53,11 +53,11 @@ func TestResolveEntities(t *testing.T) {
t.Run("fail to resolve core events", func(t *testing.T) { t.Run("fail to resolve core events", func(t *testing.T) {
name := "mock event 1" name := "mock event 1"
mCoreEntity := &CoreEventEntity{parsedEvents: []*eventWithQuals{{name: name}}, allEvents: false} mCoreEntity := &coreEventEntity{parsedEvents: []*eventWithQuals{{name: name}}, allEvents: false}
matcher := ia.NewNameMatcher(name) matcher := ia.NewNameMatcher(name)
mTransformer.On("Transform", nil, matcher).Once().Return(nil, errMock) mTransformer.On("Transform", nil, matcher).Once().Return(nil, errMock)
err := mResolver.resolveEntities([]*CoreEventEntity{mCoreEntity}, nil) err := mResolver.resolveEntities([]*coreEventEntity{mCoreEntity}, nil)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("failed to resolve core event %q", name)) require.Contains(t, err.Error(), fmt.Sprintf("failed to resolve core event %q", name))
@ -66,11 +66,11 @@ func TestResolveEntities(t *testing.T) {
t.Run("fail to resolve uncore events", func(t *testing.T) { t.Run("fail to resolve uncore events", func(t *testing.T) {
name := "mock event 1" name := "mock event 1"
mUncoreEntity := &UncoreEventEntity{parsedEvents: []*eventWithQuals{{name: name}}, allEvents: false} mUncoreEntity := &uncoreEventEntity{parsedEvents: []*eventWithQuals{{name: name}}, allEvents: false}
matcher := ia.NewNameMatcher(name) matcher := ia.NewNameMatcher(name)
mTransformer.On("Transform", nil, matcher).Once().Return(nil, errMock) mTransformer.On("Transform", nil, matcher).Once().Return(nil, errMock)
err := mResolver.resolveEntities(nil, []*UncoreEventEntity{mUncoreEntity}) err := mResolver.resolveEntities(nil, []*uncoreEventEntity{mUncoreEntity})
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("failed to resolve uncore event %q", name)) require.Contains(t, err.Error(), fmt.Sprintf("failed to resolve uncore event %q", name))
@ -78,8 +78,8 @@ func TestResolveEntities(t *testing.T) {
}) })
t.Run("resolve all core and uncore events", func(t *testing.T) { t.Run("resolve all core and uncore events", func(t *testing.T) {
mCoreEntity := &CoreEventEntity{allEvents: true} mCoreEntity := &coreEventEntity{allEvents: true}
mUncoreEntity := &UncoreEventEntity{allEvents: true} mUncoreEntity := &uncoreEventEntity{allEvents: true}
corePerfEvents := []*ia.PerfEvent{ corePerfEvents := []*ia.PerfEvent{
{Name: "core event1"}, {Name: "core event1"},
{Name: "core event2"}, {Name: "core event2"},
@ -94,7 +94,7 @@ func TestResolveEntities(t *testing.T) {
t.Run("fail to resolve all core events", func(t *testing.T) { t.Run("fail to resolve all core events", func(t *testing.T) {
mTransformer.On("Transform", nil, matcher).Once().Return(nil, errMock) mTransformer.On("Transform", nil, matcher).Once().Return(nil, errMock)
err := mResolver.resolveEntities([]*CoreEventEntity{mCoreEntity}, nil) err := mResolver.resolveEntities([]*coreEventEntity{mCoreEntity}, nil)
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "failed to resolve all events") require.Contains(t, err.Error(), "failed to resolve all events")
mTransformer.AssertExpectations(t) mTransformer.AssertExpectations(t)
@ -102,7 +102,7 @@ func TestResolveEntities(t *testing.T) {
t.Run("fail to resolve all uncore events", func(t *testing.T) { t.Run("fail to resolve all uncore events", func(t *testing.T) {
mTransformer.On("Transform", nil, matcher).Once().Return(nil, errMock) mTransformer.On("Transform", nil, matcher).Once().Return(nil, errMock)
err := mResolver.resolveEntities(nil, []*UncoreEventEntity{mUncoreEntity}) err := mResolver.resolveEntities(nil, []*uncoreEventEntity{mUncoreEntity})
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "failed to resolve all events") require.Contains(t, err.Error(), "failed to resolve all events")
mTransformer.AssertExpectations(t) mTransformer.AssertExpectations(t)
@ -114,7 +114,7 @@ func TestResolveEntities(t *testing.T) {
mTransformer.On("Transform", nil, matcher).Once().Return(corePerfEvents, transformErr).Once() mTransformer.On("Transform", nil, matcher).Once().Return(corePerfEvents, transformErr).Once()
mTransformer.On("Transform", nil, matcher).Once().Return(uncorePerfEvents, transformErr).Once() mTransformer.On("Transform", nil, matcher).Once().Return(uncorePerfEvents, transformErr).Once()
err := mResolver.resolveEntities([]*CoreEventEntity{mCoreEntity}, []*UncoreEventEntity{mUncoreEntity}) err := mResolver.resolveEntities([]*coreEventEntity{mCoreEntity}, []*uncoreEventEntity{mUncoreEntity})
require.NoError(t, err) require.NoError(t, err)
require.Len(t, mCoreEntity.parsedEvents, len(corePerfEvents)) require.Len(t, mCoreEntity.parsedEvents, len(corePerfEvents))
require.Len(t, mUncoreEntity.parsedEvents, len(uncorePerfEvents)) require.Len(t, mUncoreEntity.parsedEvents, len(uncorePerfEvents))
@ -130,7 +130,7 @@ func TestResolveEntities(t *testing.T) {
mTransformer.On("Transform", nil, matcher).Once().Return(corePerfEvents, nil).Once() mTransformer.On("Transform", nil, matcher).Once().Return(corePerfEvents, nil).Once()
mTransformer.On("Transform", nil, matcher).Once().Return(uncorePerfEvents, nil).Once() mTransformer.On("Transform", nil, matcher).Once().Return(uncorePerfEvents, nil).Once()
err := mResolver.resolveEntities([]*CoreEventEntity{mCoreEntity}, []*UncoreEventEntity{mUncoreEntity}) err := mResolver.resolveEntities([]*coreEventEntity{mCoreEntity}, []*uncoreEventEntity{mUncoreEntity})
require.NoError(t, err) require.NoError(t, err)
require.Len(t, mCoreEntity.parsedEvents, len(corePerfEvents)) require.Len(t, mCoreEntity.parsedEvents, len(corePerfEvents))
require.Len(t, mUncoreEntity.parsedEvents, len(uncorePerfEvents)) require.Len(t, mUncoreEntity.parsedEvents, len(uncorePerfEvents))
@ -155,8 +155,8 @@ func TestResolveEntities(t *testing.T) {
matcher := ia.NewNameMatcher(eventName) matcher := ia.NewNameMatcher(eventName)
mTransformer.On("Transform", nil, matcher).Return([]*ia.PerfEvent{testCase.perfEvent}, nil).Once() mTransformer.On("Transform", nil, matcher).Return([]*ia.PerfEvent{testCase.perfEvent}, nil).Once()
mCoreEntity := &CoreEventEntity{parsedEvents: []*eventWithQuals{testCase.event}, allEvents: false} mCoreEntity := &coreEventEntity{parsedEvents: []*eventWithQuals{testCase.event}, allEvents: false}
err := mResolver.resolveEntities([]*CoreEventEntity{mCoreEntity}, nil) err := mResolver.resolveEntities([]*coreEventEntity{mCoreEntity}, nil)
require.ErrorContains(t, err, fmt.Sprintf("uncore event %q found in core entity", eventName)) require.ErrorContains(t, err, fmt.Sprintf("uncore event %q found in core entity", eventName))
mTransformer.AssertExpectations(t) mTransformer.AssertExpectations(t)
}) })
@ -173,8 +173,8 @@ func TestResolveEntities(t *testing.T) {
matcher := ia.NewNameMatcher(eventName) matcher := ia.NewNameMatcher(eventName)
mTransformer.On("Transform", nil, matcher).Return([]*ia.PerfEvent{testCase.perfEvent}, nil).Once() mTransformer.On("Transform", nil, matcher).Return([]*ia.PerfEvent{testCase.perfEvent}, nil).Once()
mUncoreEntity := &UncoreEventEntity{parsedEvents: []*eventWithQuals{testCase.event}, allEvents: false} mUncoreEntity := &uncoreEventEntity{parsedEvents: []*eventWithQuals{testCase.event}, allEvents: false}
err := mResolver.resolveEntities(nil, []*UncoreEventEntity{mUncoreEntity}) err := mResolver.resolveEntities(nil, []*uncoreEventEntity{mUncoreEntity})
require.ErrorContains(t, err, fmt.Sprintf("core event %q found in uncore entity", eventName)) require.ErrorContains(t, err, fmt.Sprintf("core event %q found in uncore entity", eventName))
mTransformer.AssertExpectations(t) mTransformer.AssertExpectations(t)
@ -225,9 +225,9 @@ func TestResolveEntities(t *testing.T) {
nUncoreEvents = append(nUncoreEvents, test.event) nUncoreEvents = append(nUncoreEvents, test.event)
} }
mCoreEntity := &CoreEventEntity{parsedEvents: mCoreEvents, allEvents: false} mCoreEntity := &coreEventEntity{parsedEvents: mCoreEvents, allEvents: false}
mUncoreEntity := &UncoreEventEntity{parsedEvents: nUncoreEvents, allEvents: false} mUncoreEntity := &uncoreEventEntity{parsedEvents: nUncoreEvents, allEvents: false}
err = mResolver.resolveEntities([]*CoreEventEntity{mCoreEntity}, []*UncoreEventEntity{mUncoreEntity}) err = mResolver.resolveEntities([]*coreEventEntity{mCoreEntity}, []*uncoreEventEntity{mUncoreEntity})
require.NoError(t, err) require.NoError(t, err)
for _, test := range append(coreTestCases, uncoreTestCases...) { for _, test := range append(coreTestCases, uncoreTestCases...) {

View File

@ -56,12 +56,10 @@ type PowerStat struct {
logOnce map[string]struct{} logOnce map[string]struct{}
} }
// SampleConfig returns a sample configuration (See sample.conf).
func (*PowerStat) SampleConfig() string { func (*PowerStat) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Init parses config file and sets up configuration of the plugin.
func (p *PowerStat) Init() error { func (p *PowerStat) Init() error {
if err := p.disableUnsupportedMetrics(); err != nil { if err := p.disableUnsupportedMetrics(); err != nil {
return err return err
@ -106,18 +104,6 @@ func (p *PowerStat) Start(_ telegraf.Accumulator) error {
return nil return nil
} }
// Stop deactivates perf events if one or more of the requested metrics rely on perf.
func (p *PowerStat) Stop() {
if !p.needsPerf {
return
}
if err := p.fetcher.DeactivatePerfEvents(); err != nil {
p.Log.Errorf("Failed to deactivate perf events: %v", err)
}
}
// Gather collects the plugin's metrics.
func (p *PowerStat) Gather(acc telegraf.Accumulator) error { func (p *PowerStat) Gather(acc telegraf.Accumulator) error {
// gather CPU metrics relying on coreFreq and msr which share CPU IDs. // gather CPU metrics relying on coreFreq and msr which share CPU IDs.
if p.needsCoreFreq || p.needsMsrCPU { if p.needsCoreFreq || p.needsMsrCPU {
@ -137,6 +123,17 @@ func (p *PowerStat) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
// Stop deactivates perf events if one or more of the requested metrics rely on perf.
func (p *PowerStat) Stop() {
if !p.needsPerf {
return
}
if err := p.fetcher.DeactivatePerfEvents(); err != nil {
p.Log.Errorf("Failed to deactivate perf events: %v", err)
}
}
// parseConfig is a helper method that parses configuration fields from the receiver such as included/excluded CPU IDs. // parseConfig is a helper method that parses configuration fields from the receiver such as included/excluded CPU IDs.
func (p *PowerStat) parseConfig() error { func (p *PowerStat) parseConfig() error {
if p.MsrReadTimeout < 0 { if p.MsrReadTimeout < 0 {

View File

@ -17,12 +17,13 @@ type IntelPowerstat struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*IntelPowerstat) SampleConfig() string { return sampleConfig }
func (i *IntelPowerstat) Init() error { func (i *IntelPowerstat) Init() error {
i.Log.Warn("current platform is not supported") i.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*IntelPowerstat) SampleConfig() string { return sampleConfig }
func (*IntelPowerstat) Gather(_ telegraf.Accumulator) error { return nil } func (*IntelPowerstat) Gather(_ telegraf.Accumulator) error { return nil }
func init() { func init() {

View File

@ -29,14 +29,6 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
const (
timestampFormat = "2006-01-02 15:04:05"
defaultSamplingInterval = 10
pqosInitOutputLinesNumber = 4
numberOfMetrics = 6
secondsDenominator = 10
)
var pqosMetricOrder = map[int]string{ var pqosMetricOrder = map[int]string{
0: "IPC", // Instructions Per Cycle 0: "IPC", // Instructions Per Cycle
1: "LLC_Misses", // Cache Misses 1: "LLC_Misses", // Cache Misses
@ -46,17 +38,25 @@ var pqosMetricOrder = map[int]string{
5: "MBT", // Total Memory Bandwidth 5: "MBT", // Total Memory Bandwidth
} }
type IntelRDT struct { const (
PqosPath string `toml:"pqos_path"` timestampFormat = "2006-01-02 15:04:05"
Cores []string `toml:"cores"` defaultSamplingInterval = 10
Processes []string `toml:"processes"` pqosInitOutputLinesNumber = 4
SamplingInterval int32 `toml:"sampling_interval"` numberOfMetrics = 6
ShortenedMetrics bool `toml:"shortened_metrics"` secondsDenominator = 10
UseSudo bool `toml:"use_sudo"` )
Log telegraf.Logger `toml:"-"` type IntelRDT struct {
Publisher Publisher `toml:"-"` PqosPath string `toml:"pqos_path"`
Processor ProcessesHandler `toml:"-"` Cores []string `toml:"cores"`
Processes []string `toml:"processes"`
SamplingInterval int32 `toml:"sampling_interval"`
ShortenedMetrics bool `toml:"shortened_metrics"`
UseSudo bool `toml:"use_sudo"`
Log telegraf.Logger `toml:"-"`
publisher publisher
processor processesHandler
stopPQOSChan chan bool stopPQOSChan chan bool
quitChan chan struct{} quitChan chan struct{}
errorChan chan error errorChan chan error
@ -81,31 +81,35 @@ func (*IntelRDT) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// All gathering is done in the Start function
func (r *IntelRDT) Gather(_ telegraf.Accumulator) error {
return nil
}
func (r *IntelRDT) Start(acc telegraf.Accumulator) error { func (r *IntelRDT) Start(acc telegraf.Accumulator) error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel r.cancel = cancel
r.Processor = NewProcessor() r.processor = newProcessor()
r.Publisher = NewPublisher(acc, r.Log, r.ShortenedMetrics) r.publisher = newPublisher(acc, r.Log, r.ShortenedMetrics)
err := r.Initialize() err := r.initialize()
if err != nil { if err != nil {
return err return err
} }
r.Publisher.publish(ctx) r.publisher.publish(ctx)
go r.errorHandler(ctx) go r.errorHandler(ctx)
go r.scheduler(ctx) go r.scheduler(ctx)
return nil return nil
} }
func (r *IntelRDT) Initialize() error { func (r *IntelRDT) Gather(_ telegraf.Accumulator) error {
return nil
}
func (r *IntelRDT) Stop() {
r.cancel()
r.wg.Wait()
}
func (r *IntelRDT) initialize() error {
r.stopPQOSChan = make(chan bool) r.stopPQOSChan = make(chan bool)
r.quitChan = make(chan struct{}) r.quitChan = make(chan struct{})
r.errorChan = make(chan error) r.errorChan = make(chan error)
@ -179,11 +183,6 @@ func (r *IntelRDT) scheduler(ctx context.Context) {
} }
} }
func (r *IntelRDT) Stop() {
r.cancel()
r.wg.Wait()
}
func (r *IntelRDT) checkPIDsAssociation(ctx context.Context) error { func (r *IntelRDT) checkPIDsAssociation(ctx context.Context) error {
newProcessesPIDsMap, err := r.associateProcessesWithPIDs(r.Processes) newProcessesPIDsMap, err := r.associateProcessesWithPIDs(r.Processes)
if err != nil { if err != nil {
@ -202,7 +201,7 @@ func (r *IntelRDT) checkPIDsAssociation(ctx context.Context) error {
} }
func (r *IntelRDT) associateProcessesWithPIDs(providedProcesses []string) (map[string]string, error) { func (r *IntelRDT) associateProcessesWithPIDs(providedProcesses []string) (map[string]string, error) {
availableProcesses, err := r.Processor.getAllProcesses() availableProcesses, err := r.processor.getAllProcesses()
if err != nil { if err != nil {
return nil, errors.New("cannot gather information of all available processes") return nil, errors.New("cannot gather information of all available processes")
} }
@ -318,9 +317,9 @@ func (r *IntelRDT) processOutput(cmdReader io.ReadCloser, processesPIDsAssociati
newMetric.measurement = out newMetric.measurement = out
} }
} }
r.Publisher.BufferChanProcess <- newMetric r.publisher.bufferChanProcess <- newMetric
} else { } else {
r.Publisher.BufferChanCores <- out r.publisher.bufferChanCores <- out
} }
} }
} }

View File

@ -10,10 +10,10 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
type MockProc struct{} type mockProc struct{}
func (m *MockProc) getAllProcesses() ([]Process, error) { func (m *mockProc) getAllProcesses() ([]process, error) {
procs := []Process{ procs := []process{
{Name: "process", PID: 1000}, {Name: "process", PID: 1000},
{Name: "process2", PID: 1002}, {Name: "process2", PID: 1002},
{Name: "process2", PID: 1003}, {Name: "process2", PID: 1003},
@ -23,10 +23,10 @@ func (m *MockProc) getAllProcesses() ([]Process, error) {
func TestAssociateProcessesWithPIDs(t *testing.T) { func TestAssociateProcessesWithPIDs(t *testing.T) {
log := testutil.Logger{} log := testutil.Logger{}
proc := &MockProc{} proc := &mockProc{}
rdt := IntelRDT{ rdt := IntelRDT{
Log: log, Log: log,
Processor: proc, processor: proc,
} }
processes := []string{"process"} processes := []string{"process"}
expectedPID := "1000" expectedPID := "1000"

View File

@ -16,14 +16,18 @@ type IntelRDT struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*IntelRDT) SampleConfig() string { return sampleConfig }
func (i *IntelRDT) Init() error { func (i *IntelRDT) Init() error {
i.Log.Warn("current platform is not supported") i.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*IntelRDT) SampleConfig() string { return sampleConfig }
func (*IntelRDT) Start(_ telegraf.Accumulator) error { return nil }
func (*IntelRDT) Gather(_ telegraf.Accumulator) error { return nil } func (*IntelRDT) Gather(_ telegraf.Accumulator) error { return nil }
func (*IntelRDT) Start(_ telegraf.Accumulator) error { return nil }
func (*IntelRDT) Stop() {} func (*IntelRDT) Stop() {}
func init() { func init() {
inputs.Add("intel_rdt", func() telegraf.Input { inputs.Add("intel_rdt", func() telegraf.Input {

View File

@ -4,34 +4,34 @@ package intel_rdt
import "github.com/prometheus/procfs" import "github.com/prometheus/procfs"
type ProcessesHandler interface { type processesHandler interface {
getAllProcesses() ([]Process, error) getAllProcesses() ([]process, error)
} }
type Process struct { type process struct {
Name string Name string
PID int PID int
} }
type ProcessManager struct{} type processManager struct{}
func NewProcessor() ProcessesHandler { func newProcessor() processesHandler {
return &ProcessManager{} return &processManager{}
} }
func (p *ProcessManager) getAllProcesses() ([]Process, error) { func (p *processManager) getAllProcesses() ([]process, error) {
allProcesses, err := procfs.AllProcs() allProcesses, err := procfs.AllProcs()
if err != nil { if err != nil {
return nil, err return nil, err
} }
processes := make([]Process, 0, len(allProcesses)) processes := make([]process, 0, len(allProcesses))
for _, proc := range allProcesses { for _, proc := range allProcesses {
procComm, err := proc.Comm() procComm, err := proc.Comm()
if err != nil { if err != nil {
continue continue
} }
newProcess := Process{ newProcess := process{
PID: proc.PID, PID: proc.PID,
Name: procComm, Name: procComm,
} }

View File

@ -24,37 +24,37 @@ type parsedProcessMeasurement struct {
time time.Time time time.Time
} }
// Publisher for publish new RDT metrics to telegraf accumulator // publisher for publish new RDT metrics to telegraf accumulator
type Publisher struct { type publisher struct {
acc telegraf.Accumulator acc telegraf.Accumulator
Log telegraf.Logger log telegraf.Logger
shortenedMetrics bool shortenedMetrics bool
BufferChanProcess chan processMeasurement bufferChanProcess chan processMeasurement
BufferChanCores chan string bufferChanCores chan string
errChan chan error errChan chan error
} }
func NewPublisher(acc telegraf.Accumulator, log telegraf.Logger, shortenedMetrics bool) Publisher { func newPublisher(acc telegraf.Accumulator, log telegraf.Logger, shortenedMetrics bool) publisher {
return Publisher{ return publisher{
acc: acc, acc: acc,
Log: log, log: log,
shortenedMetrics: shortenedMetrics, shortenedMetrics: shortenedMetrics,
BufferChanProcess: make(chan processMeasurement), bufferChanProcess: make(chan processMeasurement),
BufferChanCores: make(chan string), bufferChanCores: make(chan string),
errChan: make(chan error), errChan: make(chan error),
} }
} }
func (p *Publisher) publish(ctx context.Context) { func (p *publisher) publish(ctx context.Context) {
go func() { go func() {
for { for {
select { select {
case newMeasurements := <-p.BufferChanCores: case newMeasurements := <-p.bufferChanCores:
p.publishCores(newMeasurements) p.publishCores(newMeasurements)
case newMeasurements := <-p.BufferChanProcess: case newMeasurements := <-p.bufferChanProcess:
p.publishProcess(newMeasurements) p.publishProcess(newMeasurements)
case err := <-p.errChan: case err := <-p.errChan:
p.Log.Error(err) p.log.Error(err)
case <-ctx.Done(): case <-ctx.Done():
return return
} }
@ -62,7 +62,7 @@ func (p *Publisher) publish(ctx context.Context) {
}() }()
} }
func (p *Publisher) publishCores(measurement string) { func (p *publisher) publishCores(measurement string) {
parsedCoresMeasurement, err := parseCoresMeasurement(measurement) parsedCoresMeasurement, err := parseCoresMeasurement(measurement)
if err != nil { if err != nil {
p.errChan <- err p.errChan <- err
@ -70,7 +70,7 @@ func (p *Publisher) publishCores(measurement string) {
p.addToAccumulatorCores(parsedCoresMeasurement) p.addToAccumulatorCores(parsedCoresMeasurement)
} }
func (p *Publisher) publishProcess(measurement processMeasurement) { func (p *publisher) publishProcess(measurement processMeasurement) {
parsedProcessMeasurement, err := parseProcessesMeasurement(measurement) parsedProcessMeasurement, err := parseProcessesMeasurement(measurement)
if err != nil { if err != nil {
p.errChan <- err p.errChan <- err
@ -103,7 +103,7 @@ func parseCoresMeasurement(measurements string) (parsedCoresMeasurement, error)
return parsedCoresMeasurement{coresString, values, timestamp}, nil return parsedCoresMeasurement{coresString, values, timestamp}, nil
} }
func (p *Publisher) addToAccumulatorCores(measurement parsedCoresMeasurement) { func (p *publisher) addToAccumulatorCores(measurement parsedCoresMeasurement) {
for i, value := range measurement.values { for i, value := range measurement.values {
if p.shortenedMetrics { if p.shortenedMetrics {
// 0: "IPC" // 0: "IPC"
@ -154,7 +154,7 @@ func parseProcessesMeasurement(measurement processMeasurement) (parsedProcessMea
return parsedProcessMeasurement{actualProcess, cores, values, timestamp}, nil return parsedProcessMeasurement{actualProcess, cores, values, timestamp}, nil
} }
func (p *Publisher) addToAccumulatorProcesses(measurement parsedProcessMeasurement) { func (p *publisher) addToAccumulatorProcesses(measurement parsedProcessMeasurement) {
for i, value := range measurement.values { for i, value := range measurement.values {
if p.shortenedMetrics { if p.shortenedMetrics {
// 0: "IPC" // 0: "IPC"

View File

@ -198,7 +198,7 @@ func TestParseProcessesMeasurement(t *testing.T) {
func TestAddToAccumulatorCores(t *testing.T) { func TestAddToAccumulatorCores(t *testing.T) {
t.Run("shortened false", func(t *testing.T) { t.Run("shortened false", func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
publisher := Publisher{acc: &acc} publisher := publisher{acc: &acc}
cores := "1,2,3" cores := "1,2,3"
metricsValues := []float64{1, 2, 3, 4, 5, 6} metricsValues := []float64{1, 2, 3, 4, 5, 6}
@ -212,7 +212,7 @@ func TestAddToAccumulatorCores(t *testing.T) {
}) })
t.Run("shortened true", func(t *testing.T) { t.Run("shortened true", func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
publisher := Publisher{acc: &acc, shortenedMetrics: true} publisher := publisher{acc: &acc, shortenedMetrics: true}
cores := "1,2,3" cores := "1,2,3"
metricsValues := []float64{1, 2, 3, 4, 5, 6} metricsValues := []float64{1, 2, 3, 4, 5, 6}
@ -229,7 +229,7 @@ func TestAddToAccumulatorCores(t *testing.T) {
func TestAddToAccumulatorProcesses(t *testing.T) { func TestAddToAccumulatorProcesses(t *testing.T) {
t.Run("shortened false", func(t *testing.T) { t.Run("shortened false", func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
publisher := Publisher{acc: &acc} publisher := publisher{acc: &acc}
process := "process_name" process := "process_name"
cores := "1,2,3" cores := "1,2,3"
@ -244,7 +244,7 @@ func TestAddToAccumulatorProcesses(t *testing.T) {
}) })
t.Run("shortened true", func(t *testing.T) { t.Run("shortened true", func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
publisher := Publisher{acc: &acc, shortenedMetrics: true} publisher := publisher{acc: &acc, shortenedMetrics: true}
process := "process_name" process := "process_name"
cores := "1,2,3" cores := "1,2,3"

View File

@ -17,16 +17,16 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type Self struct { type Internal struct {
CollectMemstats bool `toml:"collect_memstats"` CollectMemstats bool `toml:"collect_memstats"`
CollectGostats bool `toml:"collect_gostats"` CollectGostats bool `toml:"collect_gostats"`
} }
func (*Self) SampleConfig() string { func (*Internal) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (s *Self) Gather(acc telegraf.Accumulator) error { func (s *Internal) Gather(acc telegraf.Accumulator) error {
for _, m := range selfstat.Metrics() { for _, m := range selfstat.Metrics() {
if m.Name() == "internal_agent" { if m.Name() == "internal_agent" {
m.AddTag("go_version", strings.TrimPrefix(runtime.Version(), "go")) m.AddTag("go_version", strings.TrimPrefix(runtime.Version(), "go"))
@ -135,7 +135,7 @@ func medianBucket(h *metrics.Float64Histogram) float64 {
func init() { func init() {
inputs.Add("internal", func() telegraf.Input { inputs.Add("internal", func() telegraf.Input {
return &Self{ return &Internal{
CollectMemstats: true, CollectMemstats: true,
} }
}) })

View File

@ -11,7 +11,7 @@ import (
) )
func TestSelfPlugin(t *testing.T) { func TestSelfPlugin(t *testing.T) {
s := Self{ s := Internal{
CollectMemstats: true, CollectMemstats: true,
} }
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
@ -69,7 +69,7 @@ func TestSelfPlugin(t *testing.T) {
} }
func TestNoMemStat(t *testing.T) { func TestNoMemStat(t *testing.T) {
s := Self{ s := Internal{
CollectMemstats: false, CollectMemstats: false,
CollectGostats: false, CollectGostats: false,
} }
@ -81,7 +81,7 @@ func TestNoMemStat(t *testing.T) {
} }
func TestGostats(t *testing.T) { func TestGostats(t *testing.T) {
s := Self{ s := Internal{
CollectMemstats: false, CollectMemstats: false,
CollectGostats: true, CollectGostats: true,
} }

View File

@ -22,6 +22,12 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
const (
measurement = "internet_speed"
testModeSingle = "single"
testModeMulti = "multi"
)
// InternetSpeed is used to store configuration values. // InternetSpeed is used to store configuration values.
type InternetSpeed struct { type InternetSpeed struct {
ServerIDInclude []string `toml:"server_id_include"` ServerIDInclude []string `toml:"server_id_include"`
@ -39,12 +45,6 @@ type InternetSpeed struct {
serverFilter filter.Filter serverFilter filter.Filter
} }
const (
measurement = "internet_speed"
testModeSingle = "single"
testModeMulti = "multi"
)
func (*InternetSpeed) SampleConfig() string { func (*InternetSpeed) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -200,12 +200,12 @@ func (is *InternetSpeed) findClosestServer() error {
return errors.New("no server set: filter excluded all servers or no available server found") return errors.New("no server set: filter excluded all servers or no available server found")
} }
func timeDurationMillisecondToFloat64(d time.Duration) float64 {
return float64(d) / float64(time.Millisecond)
}
func init() { func init() {
inputs.Add("internet_speed", func() telegraf.Input { inputs.Add("internet_speed", func() telegraf.Input {
return &InternetSpeed{} return &InternetSpeed{}
}) })
} }
func timeDurationMillisecondToFloat64(d time.Duration) float64 {
return float64(d) / float64(time.Millisecond)
}

View File

@ -21,74 +21,12 @@ type Interrupts struct {
CPUAsTag bool `toml:"cpu_as_tag"` CPUAsTag bool `toml:"cpu_as_tag"`
} }
type IRQ struct { type irq struct {
ID string id string
Type string typ string
Device string device string
Total int64 total int64
Cpus []int64 cpus []int64
}
func NewIRQ(id string) *IRQ {
return &IRQ{ID: id, Cpus: []int64{}}
}
func parseInterrupts(r io.Reader) ([]IRQ, error) {
var irqs []IRQ
var cpucount int
scanner := bufio.NewScanner(r)
if scanner.Scan() {
cpus := strings.Fields(scanner.Text())
if cpus[0] != "CPU0" {
return nil, fmt.Errorf("expected first line to start with CPU0, but was %s", scanner.Text())
}
cpucount = len(cpus)
}
scan:
for scanner.Scan() {
fields := strings.Fields(scanner.Text())
if !strings.HasSuffix(fields[0], ":") {
continue
}
irqid := strings.TrimRight(fields[0], ":")
irq := NewIRQ(irqid)
irqvals := fields[1:]
for i := 0; i < cpucount; i++ {
if i < len(irqvals) {
irqval, err := strconv.ParseInt(irqvals[i], 10, 64)
if err != nil {
continue scan
}
irq.Cpus = append(irq.Cpus, irqval)
}
}
for _, irqval := range irq.Cpus {
irq.Total += irqval
}
_, err := strconv.ParseInt(irqid, 10, 64)
if err == nil && len(fields) >= cpucount+2 {
irq.Type = fields[cpucount+1]
irq.Device = strings.Join(fields[cpucount+2:], " ")
} else if len(fields) > cpucount {
irq.Type = strings.Join(fields[cpucount+1:], " ")
}
irqs = append(irqs, *irq)
}
if scanner.Err() != nil {
return nil, fmt.Errorf("error scanning file: %w", scanner.Err())
}
return irqs, nil
}
func gatherTagsFields(irq IRQ) (map[string]string, map[string]interface{}) {
tags := map[string]string{"irq": irq.ID, "type": irq.Type, "device": irq.Device}
fields := map[string]interface{}{"total": irq.Total}
for i := 0; i < len(irq.Cpus); i++ {
cpu := fmt.Sprintf("CPU%d", i)
fields[cpu] = irq.Cpus[i]
}
return tags, fields
} }
func (*Interrupts) SampleConfig() string { func (*Interrupts) SampleConfig() string {
@ -107,7 +45,65 @@ func (s *Interrupts) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func parseFile(file string) ([]IRQ, error) { func parseInterrupts(r io.Reader) ([]irq, error) {
var irqs []irq
var cpucount int
scanner := bufio.NewScanner(r)
if scanner.Scan() {
cpus := strings.Fields(scanner.Text())
if cpus[0] != "CPU0" {
return nil, fmt.Errorf("expected first line to start with CPU0, but was %s", scanner.Text())
}
cpucount = len(cpus)
}
scan:
for scanner.Scan() {
fields := strings.Fields(scanner.Text())
if !strings.HasSuffix(fields[0], ":") {
continue
}
irqid := strings.TrimRight(fields[0], ":")
irq := newIRQ(irqid)
irqvals := fields[1:]
for i := 0; i < cpucount; i++ {
if i < len(irqvals) {
irqval, err := strconv.ParseInt(irqvals[i], 10, 64)
if err != nil {
continue scan
}
irq.cpus = append(irq.cpus, irqval)
}
}
for _, irqval := range irq.cpus {
irq.total += irqval
}
_, err := strconv.ParseInt(irqid, 10, 64)
if err == nil && len(fields) >= cpucount+2 {
irq.typ = fields[cpucount+1]
irq.device = strings.Join(fields[cpucount+2:], " ")
} else if len(fields) > cpucount {
irq.typ = strings.Join(fields[cpucount+1:], " ")
}
irqs = append(irqs, *irq)
}
if scanner.Err() != nil {
return nil, fmt.Errorf("error scanning file: %w", scanner.Err())
}
return irqs, nil
}
func gatherTagsFields(irq irq) (map[string]string, map[string]interface{}) {
tags := map[string]string{"irq": irq.id, "type": irq.typ, "device": irq.device}
fields := map[string]interface{}{"total": irq.total}
for i := 0; i < len(irq.cpus); i++ {
cpu := fmt.Sprintf("CPU%d", i)
fields[cpu] = irq.cpus[i]
}
return tags, fields
}
func parseFile(file string) ([]irq, error) {
f, err := os.Open(file) f, err := os.Open(file)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not open file: %s", file) return nil, fmt.Errorf("could not open file: %s", file)
@ -121,11 +117,11 @@ func parseFile(file string) ([]IRQ, error) {
return irqs, nil return irqs, nil
} }
func reportMetrics(measurement string, irqs []IRQ, acc telegraf.Accumulator, cpusAsTags bool) { func reportMetrics(measurement string, irqs []irq, acc telegraf.Accumulator, cpusAsTags bool) {
for _, irq := range irqs { for _, irq := range irqs {
tags, fields := gatherTagsFields(irq) tags, fields := gatherTagsFields(irq)
if cpusAsTags { if cpusAsTags {
for cpu, count := range irq.Cpus { for cpu, count := range irq.cpus {
cpuTags := map[string]string{"cpu": fmt.Sprintf("cpu%d", cpu)} cpuTags := map[string]string{"cpu": fmt.Sprintf("cpu%d", cpu)}
for k, v := range tags { for k, v := range tags {
cpuTags[k] = v cpuTags[k] = v
@ -138,6 +134,10 @@ func reportMetrics(measurement string, irqs []IRQ, acc telegraf.Accumulator, cpu
} }
} }
func newIRQ(id string) *irq {
return &irq{id: id, cpus: []int64{}}
}
func init() { func init() {
inputs.Add("interrupts", func() telegraf.Input { inputs.Add("interrupts", func() telegraf.Input {
return &Interrupts{} return &Interrupts{}

View File

@ -14,28 +14,28 @@ import (
// Setup and helper functions // Setup and helper functions
// ===================================================================================== // =====================================================================================
func expectCPUAsTags(m *testutil.Accumulator, t *testing.T, measurement string, irq IRQ) { func expectCPUAsTags(m *testutil.Accumulator, t *testing.T, measurement string, irq irq) {
for idx, value := range irq.Cpus { for idx, value := range irq.cpus {
m.AssertContainsTaggedFields(t, measurement, m.AssertContainsTaggedFields(t, measurement,
map[string]interface{}{"count": value}, map[string]interface{}{"count": value},
map[string]string{"irq": irq.ID, "type": irq.Type, "device": irq.Device, "cpu": fmt.Sprintf("cpu%d", idx)}, map[string]string{"irq": irq.id, "type": irq.typ, "device": irq.device, "cpu": fmt.Sprintf("cpu%d", idx)},
) )
} }
} }
func expectCPUAsFields(m *testutil.Accumulator, t *testing.T, measurement string, irq IRQ) { func expectCPUAsFields(m *testutil.Accumulator, t *testing.T, measurement string, irq irq) {
fields := map[string]interface{}{} fields := map[string]interface{}{}
total := int64(0) total := int64(0)
for idx, count := range irq.Cpus { for idx, count := range irq.cpus {
fields[fmt.Sprintf("CPU%d", idx)] = count fields[fmt.Sprintf("CPU%d", idx)] = count
total += count total += count
} }
fields["total"] = total fields["total"] = total
m.AssertContainsTaggedFields(t, measurement, fields, map[string]string{"irq": irq.ID, "type": irq.Type, "device": irq.Device}) m.AssertContainsTaggedFields(t, measurement, fields, map[string]string{"irq": irq.id, "type": irq.typ, "device": irq.device})
} }
func setup(t *testing.T, irqString string, cpuAsTags bool) (*testutil.Accumulator, []IRQ) { func setup(t *testing.T, irqString string, cpuAsTags bool) (*testutil.Accumulator, []irq) {
f := bytes.NewBufferString(irqString) f := bytes.NewBufferString(irqString)
irqs, err := parseInterrupts(f) irqs, err := parseInterrupts(f)
require.NoError(t, err) require.NoError(t, err)
@ -60,13 +60,13 @@ const softIrqsString = ` CPU0 CPU1
NET_RX: 867028 225 NET_RX: 867028 225
TASKLET: 205 0` TASKLET: 205 0`
var softIrqsExpectedArgs = []IRQ{ var softIrqsExpectedArgs = []irq{
{ID: "0", Type: "IO-APIC-edge", Device: "timer", Cpus: []int64{134, 0}}, {id: "0", typ: "IO-APIC-edge", device: "timer", cpus: []int64{134, 0}},
{ID: "1", Type: "IO-APIC-edge", Device: "i8042", Cpus: []int64{7, 3}}, {id: "1", typ: "IO-APIC-edge", device: "i8042", cpus: []int64{7, 3}},
{ID: "NMI", Type: "Non-maskable interrupts", Cpus: []int64{0, 0}}, {id: "NMI", typ: "Non-maskable interrupts", cpus: []int64{0, 0}},
{ID: "MIS", Cpus: []int64{0}}, {id: "MIS", cpus: []int64{0}},
{ID: "NET_RX", Cpus: []int64{867028, 225}}, {id: "NET_RX", cpus: []int64{867028, 225}},
{ID: "TASKLET", Cpus: []int64{205, 0}}, {id: "TASKLET", cpus: []int64{205, 0}},
} }
func TestCpuAsTagsSoftIrqs(t *testing.T) { func TestCpuAsTagsSoftIrqs(t *testing.T) {
@ -116,29 +116,29 @@ const hwIrqsString = ` CPU0 CPU1 CPU2 CPU3
IPI5: 4348149 1843985 3819457 1822877 IRQ work interrupts IPI5: 4348149 1843985 3819457 1822877 IRQ work interrupts
IPI6: 0 0 0 0 completion interrupts` IPI6: 0 0 0 0 completion interrupts`
var hwIrqsExpectedArgs = []IRQ{ var hwIrqsExpectedArgs = []irq{
{ID: "16", Type: "bcm2836-timer", Device: "0 Edge arch_timer", Cpus: []int64{0, 0, 0, 0}}, {id: "16", typ: "bcm2836-timer", device: "0 Edge arch_timer", cpus: []int64{0, 0, 0, 0}},
{ID: "17", Type: "bcm2836-timer", Device: "1 Edge arch_timer", Cpus: []int64{127224250, 118424219, 127224437, 117885416}}, {id: "17", typ: "bcm2836-timer", device: "1 Edge arch_timer", cpus: []int64{127224250, 118424219, 127224437, 117885416}},
{ID: "21", Type: "bcm2836-pmu", Device: "9 Edge arm-pmu", Cpus: []int64{0, 0, 0, 0}}, {id: "21", typ: "bcm2836-pmu", device: "9 Edge arm-pmu", cpus: []int64{0, 0, 0, 0}},
{ID: "23", Type: "ARMCTRL-level", Device: "1 Edge 3f00b880.mailbox", Cpus: []int64{1549514, 0, 0, 0}}, {id: "23", typ: "ARMCTRL-level", device: "1 Edge 3f00b880.mailbox", cpus: []int64{1549514, 0, 0, 0}},
{ID: "24", Type: "ARMCTRL-level", Device: "2 Edge VCHIQ doorbell", Cpus: []int64{2, 0, 0, 0}}, {id: "24", typ: "ARMCTRL-level", device: "2 Edge VCHIQ doorbell", cpus: []int64{2, 0, 0, 0}},
{ID: "46", Type: "ARMCTRL-level", Device: "48 Edge bcm2708_fb dma", Cpus: []int64{0, 0, 0, 0}}, {id: "46", typ: "ARMCTRL-level", device: "48 Edge bcm2708_fb dma", cpus: []int64{0, 0, 0, 0}},
{ID: "48", Type: "ARMCTRL-level", Device: "50 Edge DMA IRQ", Cpus: []int64{0, 0, 0, 0}}, {id: "48", typ: "ARMCTRL-level", device: "50 Edge DMA IRQ", cpus: []int64{0, 0, 0, 0}},
{ID: "50", Type: "ARMCTRL-level", Device: "52 Edge DMA IRQ", Cpus: []int64{0, 0, 0, 0}}, {id: "50", typ: "ARMCTRL-level", device: "52 Edge DMA IRQ", cpus: []int64{0, 0, 0, 0}},
{ID: "51", Type: "ARMCTRL-level", Device: "53 Edge DMA IRQ", Cpus: []int64{208, 0, 0, 0}}, {id: "51", typ: "ARMCTRL-level", device: "53 Edge DMA IRQ", cpus: []int64{208, 0, 0, 0}},
{ID: "54", Type: "ARMCTRL-level", Device: "56 Edge DMA IRQ", Cpus: []int64{883002, 0, 0, 0}}, {id: "54", typ: "ARMCTRL-level", device: "56 Edge DMA IRQ", cpus: []int64{883002, 0, 0, 0}},
{ID: "59", Type: "ARMCTRL-level", Device: "61 Edge bcm2835-auxirq", Cpus: []int64{0, 0, 0, 0}}, {id: "59", typ: "ARMCTRL-level", device: "61 Edge bcm2835-auxirq", cpus: []int64{0, 0, 0, 0}},
{ID: "62", Type: "ARMCTRL-level", Device: "64 Edge dwc_otg, dwc_otg_pcd, dwc_otg_hcd:usb1", Cpus: []int64{521451447, 0, 0, 0}}, {id: "62", typ: "ARMCTRL-level", device: "64 Edge dwc_otg, dwc_otg_pcd, dwc_otg_hcd:usb1", cpus: []int64{521451447, 0, 0, 0}},
{ID: "86", Type: "ARMCTRL-level", Device: "88 Edge mmc0", Cpus: []int64{857597, 0, 0, 0}}, {id: "86", typ: "ARMCTRL-level", device: "88 Edge mmc0", cpus: []int64{857597, 0, 0, 0}},
{ID: "87", Type: "ARMCTRL-level", Device: "89 Edge uart-pl011", Cpus: []int64{4938, 0, 0, 0}}, {id: "87", typ: "ARMCTRL-level", device: "89 Edge uart-pl011", cpus: []int64{4938, 0, 0, 0}},
{ID: "92", Type: "ARMCTRL-level", Device: "94 Edge mmc1", Cpus: []int64{5669, 0, 0, 0}}, {id: "92", typ: "ARMCTRL-level", device: "94 Edge mmc1", cpus: []int64{5669, 0, 0, 0}},
{ID: "IPI0", Type: "CPU wakeup interrupts", Cpus: []int64{0, 0, 0, 0}}, {id: "IPI0", typ: "CPU wakeup interrupts", cpus: []int64{0, 0, 0, 0}},
{ID: "IPI1", Type: "Timer broadcast interrupts", Cpus: []int64{0, 0, 0, 0}}, {id: "IPI1", typ: "Timer broadcast interrupts", cpus: []int64{0, 0, 0, 0}},
{ID: "IPI2", Type: "Rescheduling interrupts", Cpus: []int64{23564958, 23464876, 23531165, 23040826}}, {id: "IPI2", typ: "Rescheduling interrupts", cpus: []int64{23564958, 23464876, 23531165, 23040826}},
{ID: "IPI3", Type: "Function call interrupts", Cpus: []int64{148438, 639704, 644266, 588150}}, {id: "IPI3", typ: "Function call interrupts", cpus: []int64{148438, 639704, 644266, 588150}},
{ID: "IPI4", Type: "CPU stop interrupts", Cpus: []int64{0, 0, 0, 0}}, {id: "IPI4", typ: "CPU stop interrupts", cpus: []int64{0, 0, 0, 0}},
{ID: "IPI5", Type: "IRQ work interrupts", Cpus: []int64{4348149, 1843985, 3819457, 1822877}}, {id: "IPI5", typ: "IRQ work interrupts", cpus: []int64{4348149, 1843985, 3819457, 1822877}},
{ID: "IPI6", Type: "completion interrupts", Cpus: []int64{0, 0, 0, 0}}, {id: "IPI6", typ: "completion interrupts", cpus: []int64{0, 0, 0, 0}},
} }
func TestCpuAsTagsHwIrqs(t *testing.T) { func TestCpuAsTagsHwIrqs(t *testing.T) {

View File

@ -1,27 +1,25 @@
package ipmi_sensor package ipmi_sensor
import ( import (
"fmt"
"net"
"strconv" "strconv"
"strings" "strings"
) )
// Connection properties for a Client // connection properties for a Client
type Connection struct { type connection struct {
Hostname string hostname string
Username string username string
Password string password string
Port int port int
Interface string intf string
Privilege string privilege string
HexKey string hexKey string
} }
func NewConnection(server, privilege, hexKey string) *Connection { func newConnection(server, privilege, hexKey string) *connection {
conn := &Connection{ conn := &connection{
Privilege: privilege, privilege: privilege,
HexKey: hexKey, hexKey: hexKey,
} }
inx1 := strings.LastIndex(server, "@") inx1 := strings.LastIndex(server, "@")
inx2 := strings.Index(server, "(") inx2 := strings.Index(server, "(")
@ -33,8 +31,8 @@ func NewConnection(server, privilege, hexKey string) *Connection {
connstr = server[inx1+1:] connstr = server[inx1+1:]
up := strings.SplitN(security, ":", 2) up := strings.SplitN(security, ":", 2)
if len(up) == 2 { if len(up) == 2 {
conn.Username = up[0] conn.username = up[0]
conn.Password = up[1] conn.password = up[1]
} }
} }
@ -42,59 +40,34 @@ func NewConnection(server, privilege, hexKey string) *Connection {
inx2 = strings.Index(connstr, "(") inx2 = strings.Index(connstr, "(")
inx3 := strings.Index(connstr, ")") inx3 := strings.Index(connstr, ")")
conn.Interface = connstr[0:inx2] conn.intf = connstr[0:inx2]
conn.Hostname = connstr[inx2+1 : inx3] conn.hostname = connstr[inx2+1 : inx3]
} }
return conn return conn
} }
func (c *Connection) options() []string { func (c *connection) options() []string {
intf := c.Interface intf := c.intf
if intf == "" { if intf == "" {
intf = "lan" intf = "lan"
} }
options := []string{ options := []string{
"-H", c.Hostname, "-H", c.hostname,
"-U", c.Username, "-U", c.username,
"-P", c.Password, "-P", c.password,
"-I", intf, "-I", intf,
} }
if c.HexKey != "" { if c.hexKey != "" {
options = append(options, "-y", c.HexKey) options = append(options, "-y", c.hexKey)
} }
if c.Port != 0 { if c.port != 0 {
options = append(options, "-p", strconv.Itoa(c.Port)) options = append(options, "-p", strconv.Itoa(c.port))
} }
if c.Privilege != "" { if c.privilege != "" {
options = append(options, "-L", c.Privilege) options = append(options, "-L", c.privilege)
} }
return options return options
} }
// RemoteIP returns the remote (bmc) IP address of the Connection
func (c *Connection) RemoteIP() string {
if net.ParseIP(c.Hostname) == nil {
addrs, err := net.LookupHost(c.Hostname)
if err != nil && len(addrs) > 0 {
return addrs[0]
}
}
return c.Hostname
}
// LocalIP returns the local (client) IP address of the Connection
func (c *Connection) LocalIP() string {
conn, err := net.Dial("udp", fmt.Sprintf("%s:%d", c.Hostname, c.Port))
if err != nil {
// don't bother returning an error, since this value will never
// make it to the bmc if we can't connect to it.
return c.Hostname
}
_ = conn.Close()
//nolint:errcheck // unable to propagate
host, _, _ := net.SplitHostPort(conn.LocalAddr().String())
return host
}

View File

@ -9,73 +9,73 @@ import (
func TestNewConnection(t *testing.T) { func TestNewConnection(t *testing.T) {
testData := []struct { testData := []struct {
addr string addr string
con *Connection con *connection
}{ }{
{ {
"USERID:PASSW0RD@lan(192.168.1.1)", "USERID:PASSW0RD@lan(192.168.1.1)",
&Connection{ &connection{
Hostname: "192.168.1.1", hostname: "192.168.1.1",
Username: "USERID", username: "USERID",
Password: "PASSW0RD", password: "PASSW0RD",
Interface: "lan", intf: "lan",
Privilege: "USER", privilege: "USER",
HexKey: "0001", hexKey: "0001",
}, },
}, },
{ {
"USERID:PASS:!@#$%^&*(234)_+W0RD@lan(192.168.1.1)", "USERID:PASS:!@#$%^&*(234)_+W0RD@lan(192.168.1.1)",
&Connection{ &connection{
Hostname: "192.168.1.1", hostname: "192.168.1.1",
Username: "USERID", username: "USERID",
Password: "PASS:!@#$%^&*(234)_+W0RD", password: "PASS:!@#$%^&*(234)_+W0RD",
Interface: "lan", intf: "lan",
Privilege: "USER", privilege: "USER",
HexKey: "0001", hexKey: "0001",
}, },
}, },
// test connection doesn't panic if incorrect symbol used // test connection doesn't panic if incorrect symbol used
{ {
"USERID@PASSW0RD@lan(192.168.1.1)", "USERID@PASSW0RD@lan(192.168.1.1)",
&Connection{ &connection{
Hostname: "192.168.1.1", hostname: "192.168.1.1",
Username: "", username: "",
Password: "", password: "",
Interface: "lan", intf: "lan",
Privilege: "USER", privilege: "USER",
HexKey: "0001", hexKey: "0001",
}, },
}, },
} }
for _, v := range testData { for _, v := range testData {
require.EqualValues(t, v.con, NewConnection(v.addr, "USER", "0001")) require.EqualValues(t, v.con, newConnection(v.addr, "USER", "0001"))
} }
} }
func TestGetCommandOptions(t *testing.T) { func TestGetCommandOptions(t *testing.T) {
testData := []struct { testData := []struct {
connection *Connection connection *connection
options []string options []string
}{ }{
{ {
&Connection{ &connection{
Hostname: "192.168.1.1", hostname: "192.168.1.1",
Username: "user", username: "user",
Password: "password", password: "password",
Interface: "lan", intf: "lan",
Privilege: "USER", privilege: "USER",
HexKey: "0001", hexKey: "0001",
}, },
[]string{"-H", "192.168.1.1", "-U", "user", "-P", "password", "-I", "lan", "-y", "0001", "-L", "USER"}, []string{"-H", "192.168.1.1", "-U", "user", "-P", "password", "-I", "lan", "-y", "0001", "-L", "USER"},
}, },
{ {
&Connection{ &connection{
Hostname: "192.168.1.1", hostname: "192.168.1.1",
Username: "user", username: "user",
Password: "password", password: "password",
Interface: "lan", intf: "lan",
Privilege: "USER", privilege: "USER",
HexKey: "", hexKey: "",
}, },
[]string{"-H", "192.168.1.1", "-U", "user", "-P", "password", "-I", "lan", "-L", "USER"}, []string{"-H", "192.168.1.1", "-U", "user", "-P", "password", "-I", "lan", "-L", "USER"},
}, },

View File

@ -35,7 +35,8 @@ var (
dcmiPowerReading = regexp.MustCompile(`^(?P<name>[^|]*)\:(?P<value>.* Watts)?`) dcmiPowerReading = regexp.MustCompile(`^(?P<name>[^|]*)\:(?P<value>.* Watts)?`)
) )
// Ipmi stores the configuration values for the ipmi_sensor input plugin const cmd = "ipmitool"
type Ipmi struct { type Ipmi struct {
Path string `toml:"path"` Path string `toml:"path"`
Privilege string `toml:"privilege"` Privilege string `toml:"privilege"`
@ -50,8 +51,6 @@ type Ipmi struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
const cmd = "ipmitool"
func (*Ipmi) SampleConfig() string { func (*Ipmi) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -83,7 +82,6 @@ func (m *Ipmi) Init() error {
return nil return nil
} }
// Gather is the main execution function for the plugin
func (m *Ipmi) Gather(acc telegraf.Accumulator) error { func (m *Ipmi) Gather(acc telegraf.Accumulator) error {
if len(m.Path) == 0 { if len(m.Path) == 0 {
return errors.New("ipmitool not found: verify that ipmitool is installed and that ipmitool is in your PATH") return errors.New("ipmitool not found: verify that ipmitool is installed and that ipmitool is in your PATH")
@ -129,8 +127,8 @@ func (m *Ipmi) parse(acc telegraf.Accumulator, server, sensor string) error {
opts := make([]string, 0) opts := make([]string, 0)
hostname := "" hostname := ""
if server != "" { if server != "" {
conn := NewConnection(server, m.Privilege, m.HexKey) conn := newConnection(server, m.Privilege, m.HexKey)
hostname = conn.Hostname hostname = conn.hostname
opts = conn.options() opts = conn.options()
} }

View File

@ -32,10 +32,10 @@ func TestGather(t *testing.T) {
require.NoError(t, acc.GatherError(i.Gather)) require.NoError(t, acc.GatherError(i.Gather))
require.EqualValues(t, 262, acc.NFields(), "non-numeric measurements should be ignored") require.EqualValues(t, 262, acc.NFields(), "non-numeric measurements should be ignored")
conn := NewConnection(i.Servers[0], i.Privilege, i.HexKey) conn := newConnection(i.Servers[0], i.Privilege, i.HexKey)
require.EqualValues(t, "USERID", conn.Username) require.EqualValues(t, "USERID", conn.username)
require.EqualValues(t, "lan", conn.Interface) require.EqualValues(t, "lan", conn.intf)
require.EqualValues(t, "1234567F", conn.HexKey) require.EqualValues(t, "1234567F", conn.hexKey)
var testsWithServer = []struct { var testsWithServer = []struct {
fields map[string]interface{} fields map[string]interface{}
@ -402,10 +402,10 @@ func TestGatherV2(t *testing.T) {
require.NoError(t, i.Init()) require.NoError(t, i.Init())
require.NoError(t, acc.GatherError(i.Gather)) require.NoError(t, acc.GatherError(i.Gather))
conn := NewConnection(i.Servers[0], i.Privilege, i.HexKey) conn := newConnection(i.Servers[0], i.Privilege, i.HexKey)
require.EqualValues(t, "USERID", conn.Username) require.EqualValues(t, "USERID", conn.username)
require.EqualValues(t, "lan", conn.Interface) require.EqualValues(t, "lan", conn.intf)
require.EqualValues(t, "0000000F", conn.HexKey) require.EqualValues(t, "0000000F", conn.hexKey)
var testsWithServer = []struct { var testsWithServer = []struct {
fields map[string]interface{} fields map[string]interface{}

View File

@ -20,19 +20,19 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// Ipsets is a telegraf plugin to gather packets and bytes counters from ipset var defaultTimeout = config.Duration(time.Second)
type Ipset struct {
IncludeUnmatchedSets bool
UseSudo bool
Timeout config.Duration
lister setLister
}
type setLister func(Timeout config.Duration, UseSudo bool) (*bytes.Buffer, error)
const measurement = "ipset" const measurement = "ipset"
var defaultTimeout = config.Duration(time.Second) type Ipset struct {
IncludeUnmatchedSets bool `toml:"include_unmatched_sets"`
UseSudo bool `toml:"use_sudo"`
Timeout config.Duration `toml:"timeout"`
lister setLister
}
type setLister func(Timeout config.Duration, UseSudo bool) (*bytes.Buffer, error)
func (*Ipset) SampleConfig() string { func (*Ipset) SampleConfig() string {
return sampleConfig return sampleConfig

View File

@ -18,21 +18,31 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// Iptables is a telegraf plugin to gather packets and bytes throughput from Linux's iptables packet filter. var (
errParse = errors.New("cannot parse iptables list information")
chainNameRe = regexp.MustCompile(`^Chain\s+(\S+)`)
fieldsHeaderRe = regexp.MustCompile(`^\s*pkts\s+bytes\s+target`)
valuesRe = regexp.MustCompile(`^\s*(\d+)\s+(\d+)\s+(\w+).*?/\*\s*(.+?)\s*\*/\s*`)
)
const measurement = "iptables"
type Iptables struct { type Iptables struct {
UseSudo bool UseSudo bool `toml:"use_sudo"`
UseLock bool UseLock bool `toml:"use_lock"`
Binary string Binary string `toml:"binary"`
Table string Table string `toml:"table"`
Chains []string Chains []string `toml:"chains"`
lister chainLister
lister chainLister
} }
type chainLister func(table, chain string) (string, error)
func (*Iptables) SampleConfig() string { func (*Iptables) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Gather gathers iptables packets and bytes throughput from the configured tables and chains.
func (ipt *Iptables) Gather(acc telegraf.Accumulator) error { func (ipt *Iptables) Gather(acc telegraf.Accumulator) error {
if ipt.Table == "" || len(ipt.Chains) == 0 { if ipt.Table == "" || len(ipt.Chains) == 0 {
return nil return nil
@ -80,15 +90,6 @@ func (ipt *Iptables) chainList(table, chain string) (string, error) {
return string(out), err return string(out), err
} }
const measurement = "iptables"
var (
errParse = errors.New("cannot parse iptables list information")
chainNameRe = regexp.MustCompile(`^Chain\s+(\S+)`)
fieldsHeaderRe = regexp.MustCompile(`^\s*pkts\s+bytes\s+target`)
valuesRe = regexp.MustCompile(`^\s*(\d+)\s+(\d+)\s+(\w+).*?/\*\s*(.+?)\s*\*/\s*`)
)
func (ipt *Iptables) parseAndGather(data string, acc telegraf.Accumulator) error { func (ipt *Iptables) parseAndGather(data string, acc telegraf.Accumulator) error {
lines := strings.Split(data, "\n") lines := strings.Split(data, "\n")
if len(lines) < 3 { if len(lines) < 3 {
@ -129,8 +130,6 @@ func (ipt *Iptables) parseAndGather(data string, acc telegraf.Accumulator) error
return nil return nil
} }
type chainLister func(table, chain string) (string, error)
func init() { func init() {
inputs.Add("iptables", func() telegraf.Input { inputs.Add("iptables", func() telegraf.Input {
ipt := &Iptables{} ipt := &Iptables{}

View File

@ -17,11 +17,13 @@ type Iptables struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*Iptables) SampleConfig() string { return sampleConfig }
func (i *Iptables) Init() error { func (i *Iptables) Init() error {
i.Log.Warn("current platform is not supported") i.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*Iptables) SampleConfig() string { return sampleConfig }
func (*Iptables) Gather(_ telegraf.Accumulator) error { return nil } func (*Iptables) Gather(_ telegraf.Accumulator) error { return nil }
func init() { func init() {

View File

@ -22,15 +22,14 @@ var sampleConfig string
// IPVS holds the state for this input plugin // IPVS holds the state for this input plugin
type IPVS struct { type IPVS struct {
Log telegraf.Logger `toml:"-"`
handle *ipvs.Handle handle *ipvs.Handle
Log telegraf.Logger
} }
func (*IPVS) SampleConfig() string { func (*IPVS) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Gather gathers the stats
func (i *IPVS) Gather(acc telegraf.Accumulator) error { func (i *IPVS) Gather(acc telegraf.Accumulator) error {
if i.handle == nil { if i.handle == nil {
h, err := ipvs.New("") // TODO: make the namespace configurable h, err := ipvs.New("") // TODO: make the namespace configurable

View File

@ -17,11 +17,13 @@ type Ipvs struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
func (*Ipvs) SampleConfig() string { return sampleConfig }
func (i *Ipvs) Init() error { func (i *Ipvs) Init() error {
i.Log.Warn("current platform is not supported") i.Log.Warn("Current platform is not supported")
return nil return nil
} }
func (*Ipvs) SampleConfig() string { return sampleConfig }
func (*Ipvs) Gather(_ telegraf.Accumulator) error { return nil } func (*Ipvs) Gather(_ telegraf.Accumulator) error { return nil }
func init() { func init() {