chore: Fix linter findings for `revive:exported` in `plugins/inputs/e*` (#16044)

This commit is contained in:
Paweł Żak 2024-10-24 11:03:42 +02:00 committed by GitHub
parent d9254c210f
commit 7dc397a830
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 591 additions and 612 deletions

View File

@ -21,18 +21,18 @@ var (
ecsMetaStatsPath = "/task/stats" ecsMetaStatsPath = "/task/stats"
) )
// Client is the ECS client contract // client is the ECS client contract
type Client interface { type client interface {
Task() (*Task, error) task() (*ecsTask, error)
ContainerStats() (map[string]*container.StatsResponse, error) containerStats() (map[string]*container.StatsResponse, error)
} }
type httpClient interface { type httpClient interface {
Do(req *http.Request) (*http.Response, error) Do(req *http.Request) (*http.Response, error)
} }
// NewClient constructs an ECS client with the passed configuration params // newClient constructs an ECS client with the passed configuration params
func NewClient(timeout time.Duration, endpoint string, version int) (*EcsClient, error) { func newClient(timeout time.Duration, endpoint string, version int) (*ecsClient, error) {
if version < 2 || version > 4 { if version < 2 || version > 4 {
const msg = "expected metadata version 2, 3 or 4, got %d" const msg = "expected metadata version 2, 3 or 4, got %d"
return nil, fmt.Errorf(msg, version) return nil, fmt.Errorf(msg, version)
@ -47,7 +47,7 @@ func NewClient(timeout time.Duration, endpoint string, version int) (*EcsClient,
Timeout: timeout, Timeout: timeout,
} }
return &EcsClient{ return &ecsClient{
client: c, client: c,
baseURL: baseURL, baseURL: baseURL,
taskURL: resolveTaskURL(baseURL, version), taskURL: resolveTaskURL(baseURL, version),
@ -96,8 +96,8 @@ func resolveURL(base *url.URL, path string) string {
return base.String() + path return base.String() + path
} }
// EcsClient contains ECS connection config // ecsClient contains ECS connection config
type EcsClient struct { type ecsClient struct {
client httpClient client httpClient
version int version int
baseURL *url.URL baseURL *url.URL
@ -105,8 +105,8 @@ type EcsClient struct {
statsURL string statsURL string
} }
// Task calls the ECS metadata endpoint and returns a populated Task // task calls the ECS metadata endpoint and returns a populated task
func (c *EcsClient) Task() (*Task, error) { func (c *ecsClient) task() (*ecsTask, error) {
req, err := http.NewRequest("GET", c.taskURL, nil) req, err := http.NewRequest("GET", c.taskURL, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -131,8 +131,8 @@ func (c *EcsClient) Task() (*Task, error) {
return task, nil return task, nil
} }
// ContainerStats calls the ECS stats endpoint and returns a populated container stats map // containerStats calls the ECS stats endpoint and returns a populated container stats map
func (c *EcsClient) ContainerStats() (map[string]*container.StatsResponse, error) { func (c *ecsClient) containerStats() (map[string]*container.StatsResponse, error) {
req, err := http.NewRequest("GET", c.statsURL, nil) req, err := http.NewRequest("GET", c.statsURL, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -153,18 +153,19 @@ func (c *EcsClient) ContainerStats() (map[string]*container.StatsResponse, error
return unmarshalStats(resp.Body) return unmarshalStats(resp.Body)
} }
// PollSync executes Task and ContainerStats in parallel. If both succeed, both structs are returned. // pollSync executes task and containerStats in parallel.
// If both succeed, both structs are returned.
// If either errors, a single error is returned. // If either errors, a single error is returned.
func PollSync(c Client) (*Task, map[string]*container.StatsResponse, error) { func pollSync(c client) (*ecsTask, map[string]*container.StatsResponse, error) {
var task *Task var task *ecsTask
var stats map[string]*container.StatsResponse var stats map[string]*container.StatsResponse
var err error var err error
if stats, err = c.ContainerStats(); err != nil { if stats, err = c.containerStats(); err != nil {
return nil, nil, err return nil, nil, err
} }
if task, err = c.Task(); err != nil { if task, err = c.task(); err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -14,33 +14,33 @@ import (
) )
type pollMock struct { type pollMock struct {
task func() (*Task, error) getTask func() (*ecsTask, error)
stats func() (map[string]*container.StatsResponse, error) getStats func() (map[string]*container.StatsResponse, error)
} }
func (p *pollMock) Task() (*Task, error) { func (p *pollMock) task() (*ecsTask, error) {
return p.task() return p.getTask()
} }
func (p *pollMock) ContainerStats() (map[string]*container.StatsResponse, error) { func (p *pollMock) containerStats() (map[string]*container.StatsResponse, error) {
return p.stats() return p.getStats()
} }
func TestEcsClient_PollSync(t *testing.T) { func TestEcsClient_PollSync(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
mock *pollMock mock *pollMock
want *Task want *ecsTask
want1 map[string]*container.StatsResponse want1 map[string]*container.StatsResponse
wantErr bool wantErr bool
}{ }{
{ {
name: "success", name: "success",
mock: &pollMock{ mock: &pollMock{
task: func() (*Task, error) { getTask: func() (*ecsTask, error) {
return &validMeta, nil return &validMeta, nil
}, },
stats: func() (map[string]*container.StatsResponse, error) { getStats: func() (map[string]*container.StatsResponse, error) {
return validStats, nil return validStats, nil
}, },
}, },
@ -50,10 +50,10 @@ func TestEcsClient_PollSync(t *testing.T) {
{ {
name: "task err", name: "task err",
mock: &pollMock{ mock: &pollMock{
task: func() (*Task, error) { getTask: func() (*ecsTask, error) {
return nil, errors.New("err") return nil, errors.New("err")
}, },
stats: func() (map[string]*container.StatsResponse, error) { getStats: func() (map[string]*container.StatsResponse, error) {
return validStats, nil return validStats, nil
}, },
}, },
@ -62,10 +62,10 @@ func TestEcsClient_PollSync(t *testing.T) {
{ {
name: "stats err", name: "stats err",
mock: &pollMock{ mock: &pollMock{
task: func() (*Task, error) { getTask: func() (*ecsTask, error) {
return &validMeta, nil return &validMeta, nil
}, },
stats: func() (map[string]*container.StatsResponse, error) { getStats: func() (map[string]*container.StatsResponse, error) {
return nil, errors.New("err") return nil, errors.New("err")
}, },
}, },
@ -74,14 +74,14 @@ func TestEcsClient_PollSync(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got, got1, err := PollSync(tt.mock) got, got1, err := pollSync(tt.mock)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("EcsClient.PollSync() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("ecsClient.pollSync() error = %v, wantErr %v", err, tt.wantErr)
return return
} }
require.Equal(t, tt.want, got, "EcsClient.PollSync() got = %v, want %v", got, tt.want) require.Equal(t, tt.want, got, "ecsClient.pollSync() got = %v, want %v", got, tt.want)
require.Equal(t, tt.want1, got1, "EcsClient.PollSync() got1 = %v, want %v", got1, tt.want1) require.Equal(t, tt.want1, got1, "ecsClient.pollSync() got1 = %v, want %v", got1, tt.want1)
}) })
} }
} }
@ -98,7 +98,7 @@ func TestEcsClient_Task(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
client httpClient client httpClient
want *Task want *ecsTask
wantErr bool wantErr bool
}{ }{
{ {
@ -154,16 +154,16 @@ func TestEcsClient_Task(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
c := &EcsClient{ c := &ecsClient{
client: tt.client, client: tt.client,
taskURL: "abc", taskURL: "abc",
} }
got, err := c.Task() got, err := c.task()
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("EcsClient.Task() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("ecsClient.task() error = %v, wantErr %v", err, tt.wantErr)
return return
} }
require.Equal(t, tt.want, got, "EcsClient.Task() = %v, want %v", got, tt.want) require.Equal(t, tt.want, got, "ecsClient.task() = %v, want %v", got, tt.want)
}) })
} }
} }
@ -231,16 +231,16 @@ func TestEcsClient_ContainerStats(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
c := &EcsClient{ c := &ecsClient{
client: tt.client, client: tt.client,
statsURL: "abc", statsURL: "abc",
} }
got, err := c.ContainerStats() got, err := c.containerStats()
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("EcsClient.ContainerStats() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("ecsClient.containerStats() error = %v, wantErr %v", err, tt.wantErr)
return return
} }
require.Equal(t, tt.want, got, "EcsClient.ContainerStats() = %v, want %v", got, tt.want) require.Equal(t, tt.want, got, "ecsClient.containerStats() = %v, want %v", got, tt.want)
}) })
} }
} }

View File

@ -16,10 +16,13 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// Ecs config object const (
v2Endpoint = "http://169.254.170.2"
)
type Ecs struct { type Ecs struct {
EndpointURL string `toml:"endpoint_url"` EndpointURL string `toml:"endpoint_url"`
Timeout config.Duration Timeout config.Duration `toml:"timeout"`
ContainerNameInclude []string `toml:"container_name_include"` ContainerNameInclude []string `toml:"container_name_include"`
ContainerNameExclude []string `toml:"container_name_exclude"` ContainerNameExclude []string `toml:"container_name_exclude"`
@ -30,9 +33,9 @@ type Ecs struct {
LabelInclude []string `toml:"ecs_label_include"` LabelInclude []string `toml:"ecs_label_include"`
LabelExclude []string `toml:"ecs_label_exclude"` LabelExclude []string `toml:"ecs_label_exclude"`
newClient func(timeout time.Duration, endpoint string, version int) (*EcsClient, error) newClient func(timeout time.Duration, endpoint string, version int) (*ecsClient, error)
client Client client client
filtersCreated bool filtersCreated bool
labelFilter filter.Filter labelFilter filter.Filter
containerNameFilter filter.Filter containerNameFilter filter.Filter
@ -40,28 +43,17 @@ type Ecs struct {
metadataVersion int metadataVersion int
} }
const (
KB = 1000
MB = 1000 * KB
GB = 1000 * MB
TB = 1000 * GB
PB = 1000 * TB
v2Endpoint = "http://169.254.170.2"
)
func (*Ecs) SampleConfig() string { func (*Ecs) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Gather is the entrypoint for telegraf metrics collection
func (ecs *Ecs) Gather(acc telegraf.Accumulator) error { func (ecs *Ecs) Gather(acc telegraf.Accumulator) error {
err := initSetup(ecs) err := initSetup(ecs)
if err != nil { if err != nil {
return err return err
} }
task, stats, err := PollSync(ecs.client) task, stats, err := pollSync(ecs.client)
if err != nil { if err != nil {
return err return err
} }
@ -145,7 +137,7 @@ func resolveEndpoint(ecs *Ecs) {
ecs.metadataVersion = 2 ecs.metadataVersion = 2
} }
func (ecs *Ecs) accTask(task *Task, tags map[string]string, acc telegraf.Accumulator) { func (ecs *Ecs) accTask(task *ecsTask, tags map[string]string, acc telegraf.Accumulator) {
taskFields := map[string]interface{}{ taskFields := map[string]interface{}{
"desired_status": task.DesiredStatus, "desired_status": task.DesiredStatus,
"known_status": task.KnownStatus, "known_status": task.KnownStatus,
@ -156,7 +148,7 @@ func (ecs *Ecs) accTask(task *Task, tags map[string]string, acc telegraf.Accumul
acc.AddFields("ecs_task", taskFields, tags) acc.AddFields("ecs_task", taskFields, tags)
} }
func (ecs *Ecs) accContainers(task *Task, taskTags map[string]string, acc telegraf.Accumulator) { func (ecs *Ecs) accContainers(task *ecsTask, taskTags map[string]string, acc telegraf.Accumulator) {
for i := range task.Containers { for i := range task.Containers {
c := &task.Containers[i] c := &task.Containers[i]
if !ecs.containerNameFilter.Match(c.Name) { if !ecs.containerNameFilter.Match(c.Name) {
@ -245,7 +237,7 @@ func init() {
return &Ecs{ return &Ecs{
EndpointURL: "", EndpointURL: "",
Timeout: config.Duration(5 * time.Second), Timeout: config.Duration(5 * time.Second),
newClient: NewClient, newClient: newClient,
filtersCreated: false, filtersCreated: false,
} }
}) })

View File

@ -697,14 +697,14 @@ var metaStarted = mustParseNano("2018-11-19T15:31:27.975996351Z")
var metaPullStart = mustParseNano("2018-11-19T15:31:27.197327103Z") var metaPullStart = mustParseNano("2018-11-19T15:31:27.197327103Z")
var metaPullStop = mustParseNano("2018-11-19T15:31:27.609089471Z") var metaPullStop = mustParseNano("2018-11-19T15:31:27.609089471Z")
var validMeta = Task{ var validMeta = ecsTask{
Cluster: "test", Cluster: "test",
TaskARN: "arn:aws:ecs:aws-region-1:012345678901:task/a1234abc-a0a0-0a01-ab01-0abc012a0a0a", TaskARN: "arn:aws:ecs:aws-region-1:012345678901:task/a1234abc-a0a0-0a01-ab01-0abc012a0a0a",
Family: "nginx", Family: "nginx",
Revision: "2", Revision: "2",
DesiredStatus: "RUNNING", DesiredStatus: "RUNNING",
KnownStatus: "RUNNING", KnownStatus: "RUNNING",
Containers: []Container{ Containers: []ecsContainer{
{ {
ID: pauseStatsKey, ID: pauseStatsKey,
Name: "~internal~ecs~pause", Name: "~internal~ecs~pause",
@ -727,7 +727,7 @@ var validMeta = Task{
CreatedAt: metaPauseCreated, CreatedAt: metaPauseCreated,
StartedAt: metaPauseStarted, StartedAt: metaPauseStarted,
Type: "CNI_PAUSE", Type: "CNI_PAUSE",
Networks: []Network{ Networks: []network{
{ {
NetworkMode: "awsvpc", NetworkMode: "awsvpc",
IPv4Addresses: []string{ IPv4Addresses: []string{
@ -758,7 +758,7 @@ var validMeta = Task{
CreatedAt: metaCreated, CreatedAt: metaCreated,
StartedAt: metaStarted, StartedAt: metaStarted,
Type: "NORMAL", Type: "NORMAL",
Networks: []Network{ Networks: []network{
{ {
NetworkMode: "awsvpc", NetworkMode: "awsvpc",
IPv4Addresses: []string{ IPv4Addresses: []string{

View File

@ -11,7 +11,7 @@ import (
"github.com/influxdata/telegraf/plugins/common/docker" "github.com/influxdata/telegraf/plugins/common/docker"
) )
func parseContainerStats(c *Container, acc telegraf.Accumulator, tags map[string]string) { func parseContainerStats(c *ecsContainer, acc telegraf.Accumulator, tags map[string]string) {
id := c.ID id := c.ID
stats := &c.Stats stats := &c.Stats
tm := stats.Read tm := stats.Read
@ -27,7 +27,7 @@ func parseContainerStats(c *Container, acc telegraf.Accumulator, tags map[string
blkstats(id, stats, acc, tags, tm) blkstats(id, stats, acc, tags, tm)
} }
func metastats(id string, c *Container, acc telegraf.Accumulator, tags map[string]string, tm time.Time) { func metastats(id string, c *ecsContainer, acc telegraf.Accumulator, tags map[string]string, tm time.Time) {
metafields := map[string]interface{}{ metafields := map[string]interface{}{
"container_id": id, "container_id": id,
"docker_name": c.DockerName, "docker_name": c.DockerName,

View File

@ -9,22 +9,22 @@ import (
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
) )
// Task is the ECS task representation // ecsTask is the ECS task representation
type Task struct { type ecsTask struct {
Cluster string Cluster string
TaskARN string TaskARN string
Family string Family string
Revision string Revision string
DesiredStatus string DesiredStatus string
KnownStatus string KnownStatus string
Containers []Container Containers []ecsContainer
Limits map[string]float64 Limits map[string]float64
PullStartedAt time.Time PullStartedAt time.Time
PullStoppedAt time.Time PullStoppedAt time.Time
} }
// Container is the ECS metadata container representation // ecsContainer is the ECS metadata container representation
type Container struct { type ecsContainer struct {
ID string `json:"DockerId"` ID string `json:"DockerId"`
Name string Name string
DockerName string DockerName string
@ -38,17 +38,17 @@ type Container struct {
StartedAt time.Time StartedAt time.Time
Stats container.StatsResponse Stats container.StatsResponse
Type string Type string
Networks []Network Networks []network
} }
// Network is a docker network configuration // network is a docker network configuration
type Network struct { type network struct {
NetworkMode string NetworkMode string
IPv4Addresses []string IPv4Addresses []string
} }
func unmarshalTask(r io.Reader) (*Task, error) { func unmarshalTask(r io.Reader) (*ecsTask, error) {
task := &Task{} task := &ecsTask{}
err := json.NewDecoder(r).Decode(task) err := json.NewDecoder(r).Decode(task)
return task, err return task, err
} }
@ -62,8 +62,8 @@ func unmarshalStats(r io.Reader) (map[string]*container.StatsResponse, error) {
return statsMap, nil return statsMap, nil
} }
// interleaves Stats in to the Container objects in the Task // interleaves Stats in to the Container objects in the ecsTask
func mergeTaskStats(task *Task, stats map[string]*container.StatsResponse) { func mergeTaskStats(task *ecsTask, stats map[string]*container.StatsResponse) {
for i := range task.Containers { for i := range task.Containers {
c := &task.Containers[i] c := &task.Containers[i]
if strings.Trim(c.ID, " ") == "" { if strings.Trim(c.ID, " ") == "" {

View File

@ -29,9 +29,38 @@ var sampleConfig string
// mask for masking username/password from error messages // mask for masking username/password from error messages
var mask = regexp.MustCompile(`https?:\/\/\S+:\S+@`) var mask = regexp.MustCompile(`https?:\/\/\S+:\S+@`)
const (
// Node stats are always generated, so simply define a constant for these endpoints // Node stats are always generated, so simply define a constant for these endpoints
const statsPath = "/_nodes/stats" statsPath = "/_nodes/stats"
const statsPathLocal = "/_nodes/_local/stats" statsPathLocal = "/_nodes/_local/stats"
)
type Elasticsearch struct {
Local bool `toml:"local"`
Servers []string `toml:"servers"`
HTTPHeaders map[string]string `toml:"headers"`
HTTPTimeout config.Duration `toml:"http_timeout" deprecated:"1.29.0;1.35.0;use 'timeout' instead"`
ClusterHealth bool `toml:"cluster_health"`
ClusterHealthLevel string `toml:"cluster_health_level"`
ClusterStats bool `toml:"cluster_stats"`
ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"`
EnrichStats bool `toml:"enrich_stats"`
IndicesInclude []string `toml:"indices_include"`
IndicesLevel string `toml:"indices_level"`
NodeStats []string `toml:"node_stats"`
Username string `toml:"username"`
Password string `toml:"password"`
NumMostRecentIndices int `toml:"num_most_recent_indices"`
Log telegraf.Logger `toml:"-"`
client *http.Client
common_http.HTTPClientConfig
serverInfo map[string]serverInfo
serverInfoMutex sync.Mutex
indexMatchers map[string]filter.Filter
}
type nodeStat struct { type nodeStat struct {
Host string `json:"host"` Host string `json:"host"`
@ -109,89 +138,15 @@ type indexStat struct {
Total interface{} `json:"total"` Total interface{} `json:"total"`
Shards map[string][]interface{} `json:"shards"` Shards map[string][]interface{} `json:"shards"`
} }
// Elasticsearch is a plugin to read stats from one or many Elasticsearch
// servers.
type Elasticsearch struct {
Local bool `toml:"local"`
Servers []string `toml:"servers"`
HTTPHeaders map[string]string `toml:"headers"`
HTTPTimeout config.Duration `toml:"http_timeout" deprecated:"1.29.0;1.35.0;use 'timeout' instead"`
ClusterHealth bool `toml:"cluster_health"`
ClusterHealthLevel string `toml:"cluster_health_level"`
ClusterStats bool `toml:"cluster_stats"`
ClusterStatsOnlyFromMaster bool `toml:"cluster_stats_only_from_master"`
EnrichStats bool `toml:"enrich_stats"`
IndicesInclude []string `toml:"indices_include"`
IndicesLevel string `toml:"indices_level"`
NodeStats []string `toml:"node_stats"`
Username string `toml:"username"`
Password string `toml:"password"`
NumMostRecentIndices int `toml:"num_most_recent_indices"`
Log telegraf.Logger `toml:"-"`
client *http.Client
common_http.HTTPClientConfig
serverInfo map[string]serverInfo
serverInfoMutex sync.Mutex
indexMatchers map[string]filter.Filter
}
type serverInfo struct { type serverInfo struct {
nodeID string nodeID string
masterID string masterID string
} }
func (i serverInfo) isMaster() bool {
return i.nodeID == i.masterID
}
// NewElasticsearch return a new instance of Elasticsearch
func NewElasticsearch() *Elasticsearch {
return &Elasticsearch{
ClusterStatsOnlyFromMaster: true,
ClusterHealthLevel: "indices",
HTTPClientConfig: common_http.HTTPClientConfig{
ResponseHeaderTimeout: config.Duration(5 * time.Second),
Timeout: config.Duration(5 * time.Second),
},
}
}
// perform status mapping
func mapHealthStatusToCode(s string) int {
switch strings.ToLower(s) {
case "green":
return 1
case "yellow":
return 2
case "red":
return 3
}
return 0
}
// perform shard status mapping
func mapShardStatusToCode(s string) int {
switch strings.ToUpper(s) {
case "UNASSIGNED":
return 1
case "INITIALIZING":
return 2
case "STARTED":
return 3
case "RELOCATING":
return 4
}
return 0
}
func (*Elasticsearch) SampleConfig() string { func (*Elasticsearch) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Init the plugin.
func (e *Elasticsearch) Init() error { func (e *Elasticsearch) Init() error {
// Compile the configured indexes to match for sorting. // Compile the configured indexes to match for sorting.
indexMatchers, err := e.compileIndexMatchers() indexMatchers, err := e.compileIndexMatchers()
@ -208,8 +163,6 @@ func (e *Elasticsearch) Start(_ telegraf.Accumulator) error {
return nil return nil
} }
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error { func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
if e.client == nil { if e.client == nil {
client, err := e.createHTTPClient() client, err := e.createHTTPClient()
@ -784,8 +737,51 @@ func (e *Elasticsearch) compileIndexMatchers() (map[string]filter.Filter, error)
return indexMatchers, nil return indexMatchers, nil
} }
func (i serverInfo) isMaster() bool {
return i.nodeID == i.masterID
}
// perform status mapping
func mapHealthStatusToCode(s string) int {
switch strings.ToLower(s) {
case "green":
return 1
case "yellow":
return 2
case "red":
return 3
}
return 0
}
// perform shard status mapping
func mapShardStatusToCode(s string) int {
switch strings.ToUpper(s) {
case "UNASSIGNED":
return 1
case "INITIALIZING":
return 2
case "STARTED":
return 3
case "RELOCATING":
return 4
}
return 0
}
func newElasticsearch() *Elasticsearch {
return &Elasticsearch{
ClusterStatsOnlyFromMaster: true,
ClusterHealthLevel: "indices",
HTTPClientConfig: common_http.HTTPClientConfig{
ResponseHeaderTimeout: config.Duration(5 * time.Second),
Timeout: config.Duration(5 * time.Second),
},
}
}
func init() { func init() {
inputs.Add("elasticsearch", func() telegraf.Input { inputs.Add("elasticsearch", func() telegraf.Input {
return NewElasticsearch() return newElasticsearch()
}) })
} }

View File

@ -48,8 +48,6 @@ func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
return res, nil return res, nil
} }
func (t *transportMock) CancelRequest(_ *http.Request) {}
func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) { func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
tags := defaultTags() tags := defaultTags()
acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags) acc.AssertContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
@ -369,7 +367,7 @@ func TestGatherClusterIndiceShardsStats(t *testing.T) {
} }
func newElasticsearchWithClient() *Elasticsearch { func newElasticsearchWithClient() *Elasticsearch {
es := NewElasticsearch() es := newElasticsearch()
es.client = &http.Client{} es.client = &http.Client{}
return es return es
} }

View File

@ -23,7 +23,6 @@ import (
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
// ElasticsearchQuery struct
type ElasticsearchQuery struct { type ElasticsearchQuery struct {
URLs []string `toml:"urls"` URLs []string `toml:"urls"`
Username string `toml:"username"` Username string `toml:"username"`
@ -40,7 +39,6 @@ type ElasticsearchQuery struct {
esClient *elastic5.Client esClient *elastic5.Client
} }
// esAggregation struct
type esAggregation struct { type esAggregation struct {
Index string `toml:"index"` Index string `toml:"index"`
MeasurementName string `toml:"measurement_name"` MeasurementName string `toml:"measurement_name"`
@ -61,7 +59,6 @@ func (*ElasticsearchQuery) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// Init the plugin.
func (e *ElasticsearchQuery) Init() error { func (e *ElasticsearchQuery) Init() error {
if e.URLs == nil { if e.URLs == nil {
return errors.New("elasticsearch urls is not defined") return errors.New("elasticsearch urls is not defined")
@ -69,7 +66,7 @@ func (e *ElasticsearchQuery) Init() error {
err := e.connectToES() err := e.connectToES()
if err != nil { if err != nil {
e.Log.Errorf("E! error connecting to elasticsearch: %s", err) e.Log.Errorf("error connecting to elasticsearch: %s", err)
return nil return nil
} }
@ -92,6 +89,40 @@ func (e *ElasticsearchQuery) Init() error {
return nil return nil
} }
func (e *ElasticsearchQuery) Start(_ telegraf.Accumulator) error {
return nil
}
// Gather writes the results of the queries from Elasticsearch to the Accumulator.
func (e *ElasticsearchQuery) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
err := e.connectToES()
if err != nil {
return err
}
for i, agg := range e.Aggregations {
wg.Add(1)
go func(agg esAggregation, i int) {
defer wg.Done()
err := e.esAggregationQuery(acc, agg, i)
if err != nil {
acc.AddError(fmt.Errorf("elasticsearch query aggregation %s: %w", agg.MeasurementName, err))
}
}(agg, i)
}
wg.Wait()
return nil
}
func (e *ElasticsearchQuery) Stop() {
if e.httpclient != nil {
e.httpclient.CloseIdleConnections()
}
}
func (e *ElasticsearchQuery) initAggregation(ctx context.Context, agg esAggregation, i int) (err error) { func (e *ElasticsearchQuery) initAggregation(ctx context.Context, agg esAggregation, i int) (err error) {
// retrieve field mapping and build queries only once // retrieve field mapping and build queries only once
agg.mapMetricFields, err = e.getMetricFields(ctx, agg) agg.mapMetricFields, err = e.getMetricFields(ctx, agg)
@ -173,40 +204,6 @@ func (e *ElasticsearchQuery) connectToES() error {
return nil return nil
} }
func (e *ElasticsearchQuery) Start(_ telegraf.Accumulator) error {
return nil
}
// Gather writes the results of the queries from Elasticsearch to the Accumulator.
func (e *ElasticsearchQuery) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
err := e.connectToES()
if err != nil {
return err
}
for i, agg := range e.Aggregations {
wg.Add(1)
go func(agg esAggregation, i int) {
defer wg.Done()
err := e.esAggregationQuery(acc, agg, i)
if err != nil {
acc.AddError(fmt.Errorf("elasticsearch query aggregation %s: %w", agg.MeasurementName, err))
}
}(agg, i)
}
wg.Wait()
return nil
}
func (e *ElasticsearchQuery) Stop() {
if e.httpclient != nil {
e.httpclient.CloseIdleConnections()
}
}
func (e *ElasticsearchQuery) createHTTPClient() (*http.Client, error) { func (e *ElasticsearchQuery) createHTTPClient() (*http.Client, error) {
ctx := context.Background() ctx := context.Background()
return e.HTTPClientConfig.CreateClient(ctx, e.Log) return e.HTTPClientConfig.CreateClient(ctx, e.Log)

View File

@ -5,18 +5,10 @@ import (
_ "embed" _ "embed"
) )
const pluginName = "ethtool"
//go:embed sample.conf //go:embed sample.conf
var sampleConfig string var sampleConfig string
type Command interface { const pluginName = "ethtool"
Init() error
DriverName(intf NamespacedInterface) (string, error)
Interfaces(includeNamespaces bool) ([]NamespacedInterface, error)
Stats(intf NamespacedInterface) (map[string]uint64, error)
Get(intf NamespacedInterface) (map[string]uint64, error)
}
func (*Ethtool) SampleConfig() string { func (*Ethtool) SampleConfig() string {
return sampleConfig return sampleConfig

View File

@ -19,6 +19,8 @@ import (
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
var downInterfacesBehaviors = []string{"expose", "skip"}
const ( const (
tagInterface = "interface" tagInterface = "interface"
tagNamespace = "namespace" tagNamespace = "namespace"
@ -26,8 +28,6 @@ const (
fieldInterfaceUp = "interface_up" fieldInterfaceUp = "interface_up"
) )
var downInterfacesBehaviors = []string{"expose", "skip"}
type Ethtool struct { type Ethtool struct {
// This is the list of interface names to include // This is the list of interface names to include
InterfaceInclude []string `toml:"interface_include"` InterfaceInclude []string `toml:"interface_include"`
@ -54,12 +54,20 @@ type Ethtool struct {
includeNamespaces bool includeNamespaces bool
// the ethtool command // the ethtool command
command Command command command
} }
type CommandEthtool struct { type command interface {
Log telegraf.Logger init() error
namespaceGoroutines map[string]*NamespaceGoroutine driverName(intf namespacedInterface) (string, error)
interfaces(includeNamespaces bool) ([]namespacedInterface, error)
stats(intf namespacedInterface) (map[string]uint64, error)
get(intf namespacedInterface) (map[string]uint64, error)
}
type commandEthtool struct {
log telegraf.Logger
namespaceGoroutines map[string]*namespaceGoroutine
} }
func (e *Ethtool) Init() error { func (e *Ethtool) Init() error {
@ -90,16 +98,16 @@ func (e *Ethtool) Init() error {
return err return err
} }
if command, ok := e.command.(*CommandEthtool); ok { if command, ok := e.command.(*commandEthtool); ok {
command.Log = e.Log command.log = e.Log
} }
return e.command.Init() return e.command.init()
} }
func (e *Ethtool) Gather(acc telegraf.Accumulator) error { func (e *Ethtool) Gather(acc telegraf.Accumulator) error {
// Get the list of interfaces // Get the list of interfaces
interfaces, err := e.command.Interfaces(e.includeNamespaces) interfaces, err := e.command.interfaces(e.includeNamespaces)
if err != nil { if err != nil {
acc.AddError(err) acc.AddError(err)
return nil return nil
@ -113,7 +121,7 @@ func (e *Ethtool) Gather(acc telegraf.Accumulator) error {
if e.interfaceEligibleForGather(iface) { if e.interfaceEligibleForGather(iface) {
wg.Add(1) wg.Add(1)
go func(i NamespacedInterface) { go func(i namespacedInterface) {
e.gatherEthtoolStats(i, acc) e.gatherEthtoolStats(i, acc)
wg.Done() wg.Done()
}(iface) }(iface)
@ -125,14 +133,14 @@ func (e *Ethtool) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (e *Ethtool) interfaceEligibleForGather(iface NamespacedInterface) bool { func (e *Ethtool) interfaceEligibleForGather(iface namespacedInterface) bool {
// Don't gather if it is a loop back, or it isn't matched by the filter // Don't gather if it is a loop back, or it isn't matched by the filter
if isLoopback(iface) || !e.interfaceFilter.Match(iface.Name) { if isLoopback(iface) || !e.interfaceFilter.Match(iface.Name) {
return false return false
} }
// Don't gather if it's not in a namespace matched by the filter // Don't gather if it's not in a namespace matched by the filter
if !e.namespaceFilter.Match(iface.Namespace.Name()) { if !e.namespaceFilter.Match(iface.namespace.name()) {
return false return false
} }
@ -145,12 +153,12 @@ func (e *Ethtool) interfaceEligibleForGather(iface NamespacedInterface) bool {
} }
// Gather the stats for the interface. // Gather the stats for the interface.
func (e *Ethtool) gatherEthtoolStats(iface NamespacedInterface, acc telegraf.Accumulator) { func (e *Ethtool) gatherEthtoolStats(iface namespacedInterface, acc telegraf.Accumulator) {
tags := make(map[string]string) tags := make(map[string]string)
tags[tagInterface] = iface.Name tags[tagInterface] = iface.Name
tags[tagNamespace] = iface.Namespace.Name() tags[tagNamespace] = iface.namespace.name()
driverName, err := e.command.DriverName(iface) driverName, err := e.command.driverName(iface)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("%q driver: %w", iface.Name, err)) acc.AddError(fmt.Errorf("%q driver: %w", iface.Name, err))
return return
@ -159,7 +167,7 @@ func (e *Ethtool) gatherEthtoolStats(iface NamespacedInterface, acc telegraf.Acc
tags[tagDriverName] = driverName tags[tagDriverName] = driverName
fields := make(map[string]interface{}) fields := make(map[string]interface{})
stats, err := e.command.Stats(iface) stats, err := e.command.stats(iface)
if err != nil { if err != nil {
acc.AddError(fmt.Errorf("%q stats: %w", iface.Name, err)) acc.AddError(fmt.Errorf("%q stats: %w", iface.Name, err))
return return
@ -170,7 +178,7 @@ func (e *Ethtool) gatherEthtoolStats(iface NamespacedInterface, acc telegraf.Acc
fields[e.normalizeKey(k)] = v fields[e.normalizeKey(k)] = v
} }
cmdget, err := e.command.Get(iface) cmdget, err := e.command.get(iface)
// error text is directly from running ethtool and syscalls // error text is directly from running ethtool and syscalls
if err != nil && err.Error() != "operation not supported" { if err != nil && err.Error() != "operation not supported" {
acc.AddError(fmt.Errorf("%q get: %w", iface.Name, err)) acc.AddError(fmt.Errorf("%q get: %w", iface.Name, err))
@ -228,57 +236,57 @@ func inStringSlice(slice []string, value string) bool {
return false return false
} }
func isLoopback(iface NamespacedInterface) bool { func isLoopback(iface namespacedInterface) bool {
return (iface.Flags & net.FlagLoopback) != 0 return (iface.Flags & net.FlagLoopback) != 0
} }
func interfaceUp(iface NamespacedInterface) bool { func interfaceUp(iface namespacedInterface) bool {
return (iface.Flags & net.FlagUp) != 0 return (iface.Flags & net.FlagUp) != 0
} }
func NewCommandEthtool() *CommandEthtool { func newCommandEthtool() *commandEthtool {
return &CommandEthtool{} return &commandEthtool{}
} }
func (c *CommandEthtool) Init() error { func (c *commandEthtool) init() error {
// Create the goroutine for the initial namespace // Create the goroutine for the initial namespace
initialNamespace, err := netns.Get() initialNamespace, err := netns.Get()
if err != nil { if err != nil {
return err return err
} }
namespaceGoroutine := &NamespaceGoroutine{ nspaceGoroutine := &namespaceGoroutine{
name: "", namespaceName: "",
handle: initialNamespace, handle: initialNamespace,
Log: c.Log, log: c.log,
} }
if err := namespaceGoroutine.Start(); err != nil { if err := nspaceGoroutine.start(); err != nil {
c.Log.Errorf(`Failed to start goroutine for the initial namespace: %s`, err) c.log.Errorf(`Failed to start goroutine for the initial namespace: %s`, err)
return err return err
} }
c.namespaceGoroutines = map[string]*NamespaceGoroutine{ c.namespaceGoroutines = map[string]*namespaceGoroutine{
"": namespaceGoroutine, "": nspaceGoroutine,
} }
return nil return nil
} }
func (c *CommandEthtool) DriverName(intf NamespacedInterface) (driver string, err error) { func (c *commandEthtool) driverName(intf namespacedInterface) (driver string, err error) {
return intf.Namespace.DriverName(intf) return intf.namespace.driverName(intf)
} }
func (c *CommandEthtool) Stats(intf NamespacedInterface) (stats map[string]uint64, err error) { func (c *commandEthtool) stats(intf namespacedInterface) (stats map[string]uint64, err error) {
return intf.Namespace.Stats(intf) return intf.namespace.stats(intf)
} }
func (c *CommandEthtool) Get(intf NamespacedInterface) (stats map[string]uint64, err error) { func (c *commandEthtool) get(intf namespacedInterface) (stats map[string]uint64, err error) {
return intf.Namespace.Get(intf) return intf.namespace.get(intf)
} }
func (c *CommandEthtool) Interfaces(includeNamespaces bool) ([]NamespacedInterface, error) { func (c *commandEthtool) interfaces(includeNamespaces bool) ([]namespacedInterface, error) {
const namespaceDirectory = "/var/run/netns" const namespaceDirectory = "/var/run/netns"
initialNamespace, err := netns.Get() initialNamespace, err := netns.Get()
if err != nil { if err != nil {
c.Log.Errorf("Could not get initial namespace: %s", err) c.log.Errorf("Could not get initial namespace: %s", err)
return nil, err return nil, err
} }
defer initialNamespace.Close() defer initialNamespace.Close()
@ -294,7 +302,7 @@ func (c *CommandEthtool) Interfaces(includeNamespaces bool) ([]NamespacedInterfa
if includeNamespaces { if includeNamespaces {
namespaces, err := os.ReadDir(namespaceDirectory) namespaces, err := os.ReadDir(namespaceDirectory)
if err != nil { if err != nil {
c.Log.Warnf("Could not find namespace directory: %s", err) c.log.Warnf("Could not find namespace directory: %s", err)
} }
// We'll always have at least the initial namespace, so add one to ensure // We'll always have at least the initial namespace, so add one to ensure
@ -306,7 +314,7 @@ func (c *CommandEthtool) Interfaces(includeNamespaces bool) ([]NamespacedInterfa
handle, err := netns.GetFromPath(filepath.Join(namespaceDirectory, name)) handle, err := netns.GetFromPath(filepath.Join(namespaceDirectory, name))
if err != nil { if err != nil {
c.Log.Warnf("Could not get handle for namespace %q: %s", name, err.Error()) c.log.Warnf("Could not get handle for namespace %q: %s", name, err.Error())
continue continue
} }
handles[name] = handle handles[name] = handle
@ -323,24 +331,24 @@ func (c *CommandEthtool) Interfaces(includeNamespaces bool) ([]NamespacedInterfa
namespaceNames = append(namespaceNames, "") namespaceNames = append(namespaceNames, "")
} }
allInterfaces := make([]NamespacedInterface, 0) allInterfaces := make([]namespacedInterface, 0)
for _, namespace := range namespaceNames { for _, namespace := range namespaceNames {
if _, ok := c.namespaceGoroutines[namespace]; !ok { if _, ok := c.namespaceGoroutines[namespace]; !ok {
c.namespaceGoroutines[namespace] = &NamespaceGoroutine{ c.namespaceGoroutines[namespace] = &namespaceGoroutine{
name: namespace, namespaceName: namespace,
handle: handles[namespace], handle: handles[namespace],
Log: c.Log, log: c.log,
} }
if err := c.namespaceGoroutines[namespace].Start(); err != nil { if err := c.namespaceGoroutines[namespace].start(); err != nil {
c.Log.Errorf("Failed to start goroutine for namespace %q: %s", namespace, err.Error()) c.log.Errorf("Failed to start goroutine for namespace %q: %s", namespace, err.Error())
delete(c.namespaceGoroutines, namespace) delete(c.namespaceGoroutines, namespace)
continue continue
} }
} }
interfaces, err := c.namespaceGoroutines[namespace].Interfaces() interfaces, err := c.namespaceGoroutines[namespace].interfaces()
if err != nil { if err != nil {
c.Log.Warnf("Could not get interfaces from namespace %q: %s", namespace, err.Error()) c.log.Warnf("Could not get interfaces from namespace %q: %s", namespace, err.Error())
continue continue
} }
allInterfaces = append(allInterfaces, interfaces...) allInterfaces = append(allInterfaces, interfaces...)
@ -356,7 +364,7 @@ func init() {
InterfaceExclude: []string{}, InterfaceExclude: []string{},
NamespaceInclude: []string{}, NamespaceInclude: []string{},
NamespaceExclude: []string{}, NamespaceExclude: []string{},
command: NewCommandEthtool(), command: newCommandEthtool(),
} }
}) })
} }

View File

@ -13,77 +13,77 @@ import (
) )
var ( var (
command *Ethtool eth *Ethtool
interfaceMap map[string]*InterfaceMock interfaceMap map[string]*interfaceMock
) )
type InterfaceMock struct { type interfaceMock struct {
Name string
DriverName string
NamespaceName string
Stat map[string]uint64
LoopBack bool
InterfaceUp bool
CmdGet map[string]uint64
}
type NamespaceMock struct {
name string name string
driverName string
namespaceName string
stat map[string]uint64
loopBack bool
interfaceUp bool
cmdGet map[string]uint64
} }
func (n *NamespaceMock) Name() string { type namespaceMock struct {
return n.name namespaceName string
} }
func (n *NamespaceMock) Interfaces() ([]NamespacedInterface, error) { func (n *namespaceMock) name() string {
return n.namespaceName
}
func (n *namespaceMock) interfaces() ([]namespacedInterface, error) {
return nil, errors.New("it is a test bug to invoke this function") return nil, errors.New("it is a test bug to invoke this function")
} }
func (n *NamespaceMock) DriverName(_ NamespacedInterface) (string, error) { func (n *namespaceMock) driverName(_ namespacedInterface) (string, error) {
return "", errors.New("it is a test bug to invoke this function") return "", errors.New("it is a test bug to invoke this function")
} }
func (n *NamespaceMock) Stats(_ NamespacedInterface) (map[string]uint64, error) { func (n *namespaceMock) stats(_ namespacedInterface) (map[string]uint64, error) {
return nil, errors.New("it is a test bug to invoke this function") return nil, errors.New("it is a test bug to invoke this function")
} }
func (n *NamespaceMock) Get(_ NamespacedInterface) (map[string]uint64, error) { func (n *namespaceMock) get(_ namespacedInterface) (map[string]uint64, error) {
return nil, errors.New("it is a test bug to invoke this function") return nil, errors.New("it is a test bug to invoke this function")
} }
type CommandEthtoolMock struct { type commandEthtoolMock struct {
InterfaceMap map[string]*InterfaceMock interfaceMap map[string]*interfaceMock
} }
func (c *CommandEthtoolMock) Init() error { func (c *commandEthtoolMock) init() error {
// Not required for test mock // Not required for test mock
return nil return nil
} }
func (c *CommandEthtoolMock) DriverName(intf NamespacedInterface) (string, error) { func (c *commandEthtoolMock) driverName(intf namespacedInterface) (string, error) {
i := c.InterfaceMap[intf.Name] i := c.interfaceMap[intf.Name]
if i != nil { if i != nil {
return i.DriverName, nil return i.driverName, nil
} }
return "", errors.New("interface not found") return "", errors.New("interface not found")
} }
func (c *CommandEthtoolMock) Interfaces(includeNamespaces bool) ([]NamespacedInterface, error) { func (c *commandEthtoolMock) interfaces(includeNamespaces bool) ([]namespacedInterface, error) {
namespaces := map[string]*NamespaceMock{"": {name: ""}} namespaces := map[string]*namespaceMock{"": {namespaceName: ""}}
interfaces := make([]NamespacedInterface, 0) interfaces := make([]namespacedInterface, 0)
for k, v := range c.InterfaceMap { for k, v := range c.interfaceMap {
if v.NamespaceName != "" && !includeNamespaces { if v.namespaceName != "" && !includeNamespaces {
continue continue
} }
var flag net.Flags var flag net.Flags
// When interface is up // When interface is up
if v.InterfaceUp { if v.interfaceUp {
flag |= net.FlagUp flag |= net.FlagUp
} }
// For loopback interface // For loopback interface
if v.LoopBack { if v.loopBack {
flag |= net.FlagLoopback flag |= net.FlagLoopback
} }
@ -97,41 +97,41 @@ func (c *CommandEthtoolMock) Interfaces(includeNamespaces bool) ([]NamespacedInt
} }
// Ensure there is a namespace if necessary // Ensure there is a namespace if necessary
if _, ok := namespaces[v.NamespaceName]; !ok { if _, ok := namespaces[v.namespaceName]; !ok {
namespaces[v.NamespaceName] = &NamespaceMock{ namespaces[v.namespaceName] = &namespaceMock{
name: v.NamespaceName, namespaceName: v.namespaceName,
} }
} }
interfaces = append( interfaces = append(
interfaces, interfaces,
NamespacedInterface{ namespacedInterface{
Interface: iface, Interface: iface,
Namespace: namespaces[v.NamespaceName], namespace: namespaces[v.namespaceName],
}, },
) )
} }
return interfaces, nil return interfaces, nil
} }
func (c *CommandEthtoolMock) Stats(intf NamespacedInterface) (map[string]uint64, error) { func (c *commandEthtoolMock) stats(intf namespacedInterface) (map[string]uint64, error) {
i := c.InterfaceMap[intf.Name] i := c.interfaceMap[intf.Name]
if i != nil { if i != nil {
return i.Stat, nil return i.stat, nil
} }
return nil, errors.New("interface not found") return nil, errors.New("interface not found")
} }
func (c *CommandEthtoolMock) Get(intf NamespacedInterface) (map[string]uint64, error) { func (c *commandEthtoolMock) get(intf namespacedInterface) (map[string]uint64, error) {
i := c.InterfaceMap[intf.Name] i := c.interfaceMap[intf.Name]
if i != nil { if i != nil {
return i.CmdGet, nil return i.cmdGet, nil
} }
return nil, errors.New("interface not found") return nil, errors.New("interface not found")
} }
func setup() { func setup() {
interfaceMap = make(map[string]*InterfaceMock) interfaceMap = make(map[string]*interfaceMock)
eth1Stat := map[string]uint64{ eth1Stat := map[string]uint64{
"interface_up": 1, "interface_up": 1,
@ -238,8 +238,8 @@ func setup() {
"link": 1, "link": 1,
"speed": 1000, "speed": 1000,
} }
eth1 := &InterfaceMock{"eth1", "driver1", "", eth1Stat, false, true, eth1Get} eth1 := &interfaceMock{"eth1", "driver1", "", eth1Stat, false, true, eth1Get}
interfaceMap[eth1.Name] = eth1 interfaceMap[eth1.name] = eth1
eth2Stat := map[string]uint64{ eth2Stat := map[string]uint64{
"interface_up": 0, "interface_up": 0,
@ -346,8 +346,8 @@ func setup() {
"link": 0, "link": 0,
"speed": 9223372036854775807, "speed": 9223372036854775807,
} }
eth2 := &InterfaceMock{"eth2", "driver1", "", eth2Stat, false, false, eth2Get} eth2 := &interfaceMock{"eth2", "driver1", "", eth2Stat, false, false, eth2Get}
interfaceMap[eth2.Name] = eth2 interfaceMap[eth2.name] = eth2
eth3Stat := map[string]uint64{ eth3Stat := map[string]uint64{
"interface_up": 1, "interface_up": 1,
@ -454,8 +454,8 @@ func setup() {
"link": 1, "link": 1,
"speed": 1000, "speed": 1000,
} }
eth3 := &InterfaceMock{"eth3", "driver1", "namespace1", eth3Stat, false, true, eth3Get} eth3 := &interfaceMock{"eth3", "driver1", "namespace1", eth3Stat, false, true, eth3Get}
interfaceMap[eth3.Name] = eth3 interfaceMap[eth3.name] = eth3
eth4Stat := map[string]uint64{ eth4Stat := map[string]uint64{
"interface_up": 1, "interface_up": 1,
@ -562,8 +562,8 @@ func setup() {
"link": 1, "link": 1,
"speed": 100, "speed": 100,
} }
eth4 := &InterfaceMock{"eth4", "driver1", "namespace2", eth4Stat, false, true, eth4Get} eth4 := &interfaceMock{"eth4", "driver1", "namespace2", eth4Stat, false, true, eth4Get}
interfaceMap[eth4.Name] = eth4 interfaceMap[eth4.name] = eth4
// dummy loopback including dummy stat to ensure that the ignore feature is working // dummy loopback including dummy stat to ensure that the ignore feature is working
lo0Stat := map[string]uint64{ lo0Stat := map[string]uint64{
@ -575,11 +575,11 @@ func setup() {
"link": 1, "link": 1,
"speed": 1000, "speed": 1000,
} }
lo0 := &InterfaceMock{"lo0", "", "", lo0Stat, true, true, lo0Get} lo0 := &interfaceMock{"lo0", "", "", lo0Stat, true, true, lo0Get}
interfaceMap[lo0.Name] = lo0 interfaceMap[lo0.name] = lo0
c := &CommandEthtoolMock{interfaceMap} c := &commandEthtoolMock{interfaceMap}
command = &Ethtool{ eth = &Ethtool{
InterfaceInclude: []string{}, InterfaceInclude: []string{},
InterfaceExclude: []string{}, InterfaceExclude: []string{},
DownInterfaces: "expose", DownInterfaces: "expose",
@ -607,16 +607,16 @@ func toStringMapUint(in map[string]interface{}) map[string]uint64 {
func TestGather(t *testing.T) { func TestGather(t *testing.T) {
setup() setup()
err := command.Init() err := eth.Init()
require.NoError(t, err) require.NoError(t, err)
var acc testutil.Accumulator var acc testutil.Accumulator
err = command.Gather(&acc) err = eth.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, acc.Metrics, 2) require.Len(t, acc.Metrics, 2)
expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].Stat) expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].stat)
for k, v := range interfaceMap["eth1"].CmdGet { for k, v := range interfaceMap["eth1"].cmdGet {
expectedFieldsEth1[k] = v expectedFieldsEth1[k] = v
} }
expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"] expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"]
@ -629,8 +629,8 @@ func TestGather(t *testing.T) {
} }
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1) acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1)
expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat) expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].stat)
for k, v := range interfaceMap["eth2"].CmdGet { for k, v := range interfaceMap["eth2"].cmdGet {
expectedFieldsEth2[k] = v expectedFieldsEth2[k] = v
} }
expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"] expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"]
@ -646,19 +646,19 @@ func TestGather(t *testing.T) {
func TestGatherIncludeInterfaces(t *testing.T) { func TestGatherIncludeInterfaces(t *testing.T) {
setup() setup()
command.InterfaceInclude = append(command.InterfaceInclude, "eth1") eth.InterfaceInclude = append(eth.InterfaceInclude, "eth1")
err := command.Init() err := eth.Init()
require.NoError(t, err) require.NoError(t, err)
var acc testutil.Accumulator var acc testutil.Accumulator
err = command.Gather(&acc) err = eth.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, acc.Metrics, 1) require.Len(t, acc.Metrics, 1)
// Should contain eth1 // Should contain eth1
expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].Stat) expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].stat)
for k, v := range interfaceMap["eth1"].CmdGet { for k, v := range interfaceMap["eth1"].cmdGet {
expectedFieldsEth1[k] = v expectedFieldsEth1[k] = v
} }
expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"] expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"]
@ -671,8 +671,8 @@ func TestGatherIncludeInterfaces(t *testing.T) {
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1) acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1)
// Should not contain eth2 // Should not contain eth2
expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat) expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].stat)
for k, v := range interfaceMap["eth2"].CmdGet { for k, v := range interfaceMap["eth2"].cmdGet {
expectedFieldsEth2[k] = v expectedFieldsEth2[k] = v
} }
expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"] expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"]
@ -688,19 +688,19 @@ func TestGatherIncludeInterfaces(t *testing.T) {
func TestGatherIgnoreInterfaces(t *testing.T) { func TestGatherIgnoreInterfaces(t *testing.T) {
setup() setup()
command.InterfaceExclude = append(command.InterfaceExclude, "eth1") eth.InterfaceExclude = append(eth.InterfaceExclude, "eth1")
err := command.Init() err := eth.Init()
require.NoError(t, err) require.NoError(t, err)
var acc testutil.Accumulator var acc testutil.Accumulator
err = command.Gather(&acc) err = eth.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, acc.Metrics, 1) require.Len(t, acc.Metrics, 1)
// Should not contain eth1 // Should not contain eth1
expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].Stat) expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].stat)
for k, v := range interfaceMap["eth1"].CmdGet { for k, v := range interfaceMap["eth1"].cmdGet {
expectedFieldsEth1[k] = v expectedFieldsEth1[k] = v
} }
expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"] expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"]
@ -713,8 +713,8 @@ func TestGatherIgnoreInterfaces(t *testing.T) {
acc.AssertDoesNotContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1) acc.AssertDoesNotContainsTaggedFields(t, pluginName, expectedFieldsEth1, expectedTagsEth1)
// Should contain eth2 // Should contain eth2
expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat) expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].stat)
for k, v := range interfaceMap["eth2"].CmdGet { for k, v := range interfaceMap["eth2"].cmdGet {
expectedFieldsEth2[k] = v expectedFieldsEth2[k] = v
} }
expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"] expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"]
@ -730,18 +730,18 @@ func TestGatherIgnoreInterfaces(t *testing.T) {
func TestSkipMetricsForInterfaceDown(t *testing.T) { func TestSkipMetricsForInterfaceDown(t *testing.T) {
setup() setup()
command.DownInterfaces = "skip" eth.DownInterfaces = "skip"
err := command.Init() err := eth.Init()
require.NoError(t, err) require.NoError(t, err)
var acc testutil.Accumulator var acc testutil.Accumulator
err = command.Gather(&acc) err = eth.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, acc.Metrics, 1) require.Len(t, acc.Metrics, 1)
expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].Stat) expectedFieldsEth1 := toStringMapInterface(interfaceMap["eth1"].stat)
for k, v := range interfaceMap["eth1"].CmdGet { for k, v := range interfaceMap["eth1"].cmdGet {
expectedFieldsEth1[k] = v expectedFieldsEth1[k] = v
} }
expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"] expectedFieldsEth1["interface_up_counter"] = expectedFieldsEth1["interface_up"]
@ -759,18 +759,18 @@ func TestGatherIncludeNamespaces(t *testing.T) {
setup() setup()
var acc testutil.Accumulator var acc testutil.Accumulator
command.NamespaceInclude = append(command.NamespaceInclude, "namespace1") eth.NamespaceInclude = append(eth.NamespaceInclude, "namespace1")
err := command.Init() err := eth.Init()
require.NoError(t, err) require.NoError(t, err)
err = command.Gather(&acc) err = eth.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, acc.Metrics, 1) require.Len(t, acc.Metrics, 1)
// Should contain eth3 // Should contain eth3
expectedFieldsEth3 := toStringMapInterface(interfaceMap["eth3"].Stat) expectedFieldsEth3 := toStringMapInterface(interfaceMap["eth3"].stat)
for k, v := range interfaceMap["eth3"].CmdGet { for k, v := range interfaceMap["eth3"].cmdGet {
expectedFieldsEth3[k] = v expectedFieldsEth3[k] = v
} }
expectedFieldsEth3["interface_up_counter"] = expectedFieldsEth3["interface_up"] expectedFieldsEth3["interface_up_counter"] = expectedFieldsEth3["interface_up"]
@ -783,8 +783,8 @@ func TestGatherIncludeNamespaces(t *testing.T) {
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth3, expectedTagsEth3) acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth3, expectedTagsEth3)
// Should not contain eth2 // Should not contain eth2
expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat) expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].stat)
for k, v := range interfaceMap["eth2"].CmdGet { for k, v := range interfaceMap["eth2"].cmdGet {
expectedFieldsEth2[k] = v expectedFieldsEth2[k] = v
} }
expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"] expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"]
@ -801,18 +801,18 @@ func TestGatherIgnoreNamespaces(t *testing.T) {
setup() setup()
var acc testutil.Accumulator var acc testutil.Accumulator
command.NamespaceExclude = append(command.NamespaceExclude, "namespace2") eth.NamespaceExclude = append(eth.NamespaceExclude, "namespace2")
err := command.Init() err := eth.Init()
require.NoError(t, err) require.NoError(t, err)
err = command.Gather(&acc) err = eth.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, acc.Metrics, 3) require.Len(t, acc.Metrics, 3)
// Should not contain eth4 // Should not contain eth4
expectedFieldsEth4 := toStringMapInterface(interfaceMap["eth4"].Stat) expectedFieldsEth4 := toStringMapInterface(interfaceMap["eth4"].stat)
for k, v := range interfaceMap["eth4"].CmdGet { for k, v := range interfaceMap["eth4"].cmdGet {
expectedFieldsEth4[k] = v expectedFieldsEth4[k] = v
} }
expectedFieldsEth4["interface_up_counter"] = expectedFieldsEth4["interface_up"] expectedFieldsEth4["interface_up_counter"] = expectedFieldsEth4["interface_up"]
@ -825,8 +825,8 @@ func TestGatherIgnoreNamespaces(t *testing.T) {
acc.AssertDoesNotContainsTaggedFields(t, pluginName, expectedFieldsEth4, expectedTagsEth4) acc.AssertDoesNotContainsTaggedFields(t, pluginName, expectedFieldsEth4, expectedTagsEth4)
// Should contain eth2 // Should contain eth2
expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].Stat) expectedFieldsEth2 := toStringMapInterface(interfaceMap["eth2"].stat)
for k, v := range interfaceMap["eth2"].CmdGet { for k, v := range interfaceMap["eth2"].cmdGet {
expectedFieldsEth2[k] = v expectedFieldsEth2[k] = v
} }
expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"] expectedFieldsEth2["interface_up_counter"] = expectedFieldsEth2["interface_up"]
@ -839,8 +839,8 @@ func TestGatherIgnoreNamespaces(t *testing.T) {
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth2, expectedTagsEth2) acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth2, expectedTagsEth2)
// Should contain eth3 // Should contain eth3
expectedFieldsEth3 := toStringMapInterface(interfaceMap["eth3"].Stat) expectedFieldsEth3 := toStringMapInterface(interfaceMap["eth3"].stat)
for k, v := range interfaceMap["eth3"].CmdGet { for k, v := range interfaceMap["eth3"].cmdGet {
expectedFieldsEth3[k] = v expectedFieldsEth3[k] = v
} }
expectedFieldsEth3["interface_up_counter"] = expectedFieldsEth3["interface_up"] expectedFieldsEth3["interface_up_counter"] = expectedFieldsEth3["interface_up"]
@ -853,14 +853,14 @@ func TestGatherIgnoreNamespaces(t *testing.T) {
acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth3, expectedTagsEth3) acc.AssertContainsTaggedFields(t, pluginName, expectedFieldsEth3, expectedTagsEth3)
} }
type TestCase struct { type testCase struct {
normalization []string normalization []string
stats map[string]interface{} stats map[string]interface{}
expectedFields map[string]interface{} expectedFields map[string]interface{}
} }
func TestNormalizedKeys(t *testing.T) { func TestNormalizedKeys(t *testing.T) {
cases := []TestCase{ cases := []testCase{
{ {
normalization: []string{"underscore"}, normalization: []string{"underscore"},
stats: map[string]interface{}{ stats: map[string]interface{}{
@ -960,29 +960,29 @@ func TestNormalizedKeys(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
eth0 := &InterfaceMock{"eth0", "e1000e", "", toStringMapUint(c.stats), false, true, map[string]uint64{}} eth0 := &interfaceMock{"eth0", "e1000e", "", toStringMapUint(c.stats), false, true, map[string]uint64{}}
expectedTags := map[string]string{ expectedTags := map[string]string{
"interface": eth0.Name, "interface": eth0.name,
"driver": eth0.DriverName, "driver": eth0.driverName,
"namespace": "", "namespace": "",
} }
interfaceMap = make(map[string]*InterfaceMock) interfaceMap = make(map[string]*interfaceMock)
interfaceMap[eth0.Name] = eth0 interfaceMap[eth0.name] = eth0
cmd := &CommandEthtoolMock{interfaceMap} cmd := &commandEthtoolMock{interfaceMap}
command = &Ethtool{ eth = &Ethtool{
InterfaceInclude: []string{}, InterfaceInclude: []string{},
InterfaceExclude: []string{}, InterfaceExclude: []string{},
NormalizeKeys: c.normalization, NormalizeKeys: c.normalization,
command: cmd, command: cmd,
} }
err := command.Init() err := eth.Init()
require.NoError(t, err) require.NoError(t, err)
var acc testutil.Accumulator var acc testutil.Accumulator
err = command.Gather(&acc) err = eth.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, acc.Metrics, 1) require.Len(t, acc.Metrics, 1)

View File

@ -1,16 +0,0 @@
package ethtool
import "net"
type Namespace interface {
Name() string
Interfaces() ([]NamespacedInterface, error)
DriverName(intf NamespacedInterface) (string, error)
Stats(intf NamespacedInterface) (map[string]uint64, error)
Get(intf NamespacedInterface) (map[string]uint64, error)
}
type NamespacedInterface struct {
net.Interface
Namespace Namespace
}

View File

@ -1,3 +1,5 @@
//go:build linux
package ethtool package ethtool
import ( import (
@ -11,66 +13,79 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
type NamespacedAction struct { type namespace interface {
result chan<- NamespacedResult name() string
f func(*NamespaceGoroutine) (interface{}, error) interfaces() ([]namespacedInterface, error)
driverName(intf namespacedInterface) (string, error)
stats(intf namespacedInterface) (map[string]uint64, error)
get(intf namespacedInterface) (map[string]uint64, error)
} }
type NamespacedResult struct { type namespacedInterface struct {
Result interface{} net.Interface
Error error namespace namespace
} }
type NamespaceGoroutine struct { type namespacedAction struct {
name string result chan<- namespacedResult
f func(*namespaceGoroutine) (interface{}, error)
}
type namespacedResult struct {
result interface{}
err error
}
type namespaceGoroutine struct {
namespaceName string
handle netns.NsHandle handle netns.NsHandle
ethtoolClient *ethtool.Ethtool ethtoolClient *ethtool.Ethtool
c chan NamespacedAction c chan namespacedAction
Log telegraf.Logger log telegraf.Logger
} }
func (n *NamespaceGoroutine) Name() string { func (n *namespaceGoroutine) name() string {
return n.name return n.namespaceName
} }
func (n *NamespaceGoroutine) Interfaces() ([]NamespacedInterface, error) { func (n *namespaceGoroutine) interfaces() ([]namespacedInterface, error) {
interfaces, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) { interfaces, err := n.do(func(n *namespaceGoroutine) (interface{}, error) {
interfaces, err := net.Interfaces() interfaces, err := net.Interfaces()
if err != nil { if err != nil {
return nil, err return nil, err
} }
namespacedInterfaces := make([]NamespacedInterface, 0, len(interfaces)) namespacedInterfaces := make([]namespacedInterface, 0, len(interfaces))
for _, iface := range interfaces { for _, iface := range interfaces {
namespacedInterfaces = append( namespacedInterfaces = append(
namespacedInterfaces, namespacedInterfaces,
NamespacedInterface{ namespacedInterface{
Interface: iface, Interface: iface,
Namespace: n, namespace: n,
}, },
) )
} }
return namespacedInterfaces, nil return namespacedInterfaces, nil
}) })
return interfaces.([]NamespacedInterface), err return interfaces.([]namespacedInterface), err
} }
func (n *NamespaceGoroutine) DriverName(intf NamespacedInterface) (string, error) { func (n *namespaceGoroutine) driverName(intf namespacedInterface) (string, error) {
driver, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) { driver, err := n.do(func(n *namespaceGoroutine) (interface{}, error) {
return n.ethtoolClient.DriverName(intf.Name) return n.ethtoolClient.DriverName(intf.Name)
}) })
return driver.(string), err return driver.(string), err
} }
func (n *NamespaceGoroutine) Stats(intf NamespacedInterface) (map[string]uint64, error) { func (n *namespaceGoroutine) stats(intf namespacedInterface) (map[string]uint64, error) {
driver, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) { driver, err := n.do(func(n *namespaceGoroutine) (interface{}, error) {
return n.ethtoolClient.Stats(intf.Name) return n.ethtoolClient.Stats(intf.Name)
}) })
return driver.(map[string]uint64), err return driver.(map[string]uint64), err
} }
func (n *NamespaceGoroutine) Get(intf NamespacedInterface) (map[string]uint64, error) { func (n *namespaceGoroutine) get(intf namespacedInterface) (map[string]uint64, error) {
result, err := n.Do(func(n *NamespaceGoroutine) (interface{}, error) { result, err := n.do(func(n *namespaceGoroutine) (interface{}, error) {
ecmd := ethtool.EthtoolCmd{} ecmd := ethtool.EthtoolCmd{}
speed32, err := n.ethtoolClient.CmdGet(&ecmd, intf.Name) speed32, err := n.ethtoolClient.CmdGet(&ecmd, intf.Name)
if err != nil { if err != nil {
@ -102,10 +117,10 @@ func (n *NamespaceGoroutine) Get(intf NamespacedInterface) (map[string]uint64, e
return nil, err return nil, err
} }
// Start locks a goroutine to an OS thread and ties it to the namespace, then // start locks a goroutine to an OS thread and ties it to the namespace, then
// loops for actions to run in the namespace. // loops for actions to run in the namespace.
func (n *NamespaceGoroutine) Start() error { func (n *namespaceGoroutine) start() error {
n.c = make(chan NamespacedAction) n.c = make(chan namespacedAction)
started := make(chan error) started := make(chan error)
go func() { go func() {
// We're going to hold this thread locked permanently. We're going to // We're going to hold this thread locked permanently. We're going to
@ -121,13 +136,13 @@ func (n *NamespaceGoroutine) Start() error {
// current one. // current one.
initialNamespace, err := netns.Get() initialNamespace, err := netns.Get()
if err != nil { if err != nil {
n.Log.Errorf("Could not get initial namespace: %s", err) n.log.Errorf("Could not get initial namespace: %s", err)
started <- err started <- err
return return
} }
if !initialNamespace.Equal(n.handle) { if !initialNamespace.Equal(n.handle) {
if err := netns.Set(n.handle); err != nil { if err := netns.Set(n.handle); err != nil {
n.Log.Errorf("Could not switch to namespace %q: %s", n.name, err.Error()) n.log.Errorf("Could not switch to namespace %q: %s", n.namespaceName, err.Error())
started <- err started <- err
return return
} }
@ -136,7 +151,7 @@ func (n *NamespaceGoroutine) Start() error {
// Every namespace needs its own connection to ethtool // Every namespace needs its own connection to ethtool
e, err := ethtool.NewEthtool() e, err := ethtool.NewEthtool()
if err != nil { if err != nil {
n.Log.Errorf("Could not create ethtool client for namespace %q: %s", n.name, err.Error()) n.log.Errorf("Could not create ethtool client for namespace %q: %s", n.namespaceName, err.Error())
started <- err started <- err
return return
} }
@ -144,9 +159,9 @@ func (n *NamespaceGoroutine) Start() error {
started <- nil started <- nil
for command := range n.c { for command := range n.c {
result, err := command.f(n) result, err := command.f(n)
command.result <- NamespacedResult{ command.result <- namespacedResult{
Result: result, result: result,
Error: err, err: err,
} }
close(command.result) close(command.result)
} }
@ -154,13 +169,13 @@ func (n *NamespaceGoroutine) Start() error {
return <-started return <-started
} }
// Do runs a function inside the OS thread tied to the namespace. // do runs a function inside the OS thread tied to the namespace.
func (n *NamespaceGoroutine) Do(f func(*NamespaceGoroutine) (interface{}, error)) (interface{}, error) { func (n *namespaceGoroutine) do(f func(*namespaceGoroutine) (interface{}, error)) (interface{}, error) {
result := make(chan NamespacedResult) result := make(chan namespacedResult)
n.c <- NamespacedAction{ n.c <- namespacedAction{
result: result, result: result,
f: f, f: f,
} }
r := <-result r := <-result
return r.Result, r.Error return r.result, r.err
} }

View File

@ -26,10 +26,6 @@ const (
defaultMaxUndeliveredMessages = 1000 defaultMaxUndeliveredMessages = 1000
) )
type empty struct{}
type semaphore chan empty
// EventHub is the top level struct for this plugin
type EventHub struct { type EventHub struct {
// Configuration // Configuration
ConnectionString string `toml:"connection_string"` ConnectionString string `toml:"connection_string"`
@ -70,21 +66,15 @@ type EventHub struct {
in chan []telegraf.Metric in chan []telegraf.Metric
} }
type (
empty struct{}
semaphore chan empty
)
func (*EventHub) SampleConfig() string { func (*EventHub) SampleConfig() string {
return sampleConfig return sampleConfig
} }
// SetParser sets the parser
func (e *EventHub) SetParser(parser telegraf.Parser) {
e.parser = parser
}
// Gather function is unused
func (*EventHub) Gather(telegraf.Accumulator) error {
return nil
}
// Init the EventHub ServiceInput
func (e *EventHub) Init() (err error) { func (e *EventHub) Init() (err error) {
if e.MaxUndeliveredMessages == 0 { if e.MaxUndeliveredMessages == 0 {
e.MaxUndeliveredMessages = defaultMaxUndeliveredMessages e.MaxUndeliveredMessages = defaultMaxUndeliveredMessages
@ -118,7 +108,10 @@ func (e *EventHub) Init() (err error) {
return err return err
} }
// Start the EventHub ServiceInput func (e *EventHub) SetParser(parser telegraf.Parser) {
e.parser = parser
}
func (e *EventHub) Start(acc telegraf.Accumulator) error { func (e *EventHub) Start(acc telegraf.Accumulator) error {
e.in = make(chan []telegraf.Metric) e.in = make(chan []telegraf.Metric)
@ -155,6 +148,19 @@ func (e *EventHub) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
func (*EventHub) Gather(telegraf.Accumulator) error {
return nil
}
func (e *EventHub) Stop() {
err := e.hub.Close(context.Background())
if err != nil {
e.Log.Errorf("Error closing Event Hub connection: %v", err)
}
e.cancel()
e.wg.Wait()
}
func (e *EventHub) configureReceiver() []eventhub.ReceiveOption { func (e *EventHub) configureReceiver() []eventhub.ReceiveOption {
receiveOpts := []eventhub.ReceiveOption{} receiveOpts := []eventhub.ReceiveOption{}
@ -333,16 +339,6 @@ func (e *EventHub) createMetrics(event *eventhub.Event) ([]telegraf.Metric, erro
return metrics, nil return metrics, nil
} }
// Stop the EventHub ServiceInput
func (e *EventHub) Stop() {
err := e.hub.Close(context.Background())
if err != nil {
e.Log.Errorf("Error closing Event Hub connection: %v", err)
}
e.cancel()
e.wg.Wait()
}
func init() { func init() {
inputs.Add("eventhub_consumer", func() telegraf.Input { inputs.Add("eventhub_consumer", func() telegraf.Input {
return &EventHub{} return &EventHub{}

View File

@ -26,9 +26,7 @@ var sampleConfig string
var once sync.Once var once sync.Once
const MaxStderrBytes int = 512 const maxStderrBytes int = 512
type exitcodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric
type Exec struct { type Exec struct {
Commands []string `toml:"commands"` Commands []string `toml:"commands"`
@ -40,100 +38,29 @@ type Exec struct {
parser telegraf.Parser parser telegraf.Parser
runner Runner runner runner
// Allow post processing of command exit codes // Allow post-processing of command exit codes
exitcodeHandler exitcodeHandlerFunc exitCodeHandler exitCodeHandlerFunc
parseDespiteError bool parseDespiteError bool
} }
func NewExec() *Exec { type exitCodeHandlerFunc func([]telegraf.Metric, error, []byte) []telegraf.Metric
return &Exec{
runner: CommandRunner{}, type runner interface {
Timeout: config.Duration(time.Second * 5), run(string, []string, time.Duration) ([]byte, []byte, error)
}
} }
type Runner interface { type commandRunner struct {
Run(string, []string, time.Duration) ([]byte, []byte, error)
}
type CommandRunner struct {
debug bool debug bool
} }
func (c CommandRunner) truncate(buf bytes.Buffer) bytes.Buffer {
// Limit the number of bytes.
didTruncate := false
if buf.Len() > MaxStderrBytes {
buf.Truncate(MaxStderrBytes)
didTruncate = true
}
if i := bytes.IndexByte(buf.Bytes(), '\n'); i > 0 {
// Only show truncation if the newline wasn't the last character.
if i < buf.Len()-1 {
didTruncate = true
}
buf.Truncate(i)
}
if didTruncate {
buf.WriteString("...")
}
return buf
}
// removeWindowsCarriageReturns removes all carriage returns from the input if the
// OS is Windows. It does not return any errors.
func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer {
if runtime.GOOS == "windows" {
var buf bytes.Buffer
for {
byt, err := b.ReadBytes(0x0D)
byt = bytes.TrimRight(byt, "\x0d")
if len(byt) > 0 {
buf.Write(byt)
}
if errors.Is(err, io.EOF) {
return buf
}
}
}
return b
}
func (*Exec) SampleConfig() string { func (*Exec) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (e *Exec) ProcessCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) { func (e *Exec) Init() error {
defer wg.Done() return nil
out, errBuf, runErr := e.runner.Run(command, e.Environment, time.Duration(e.Timeout))
if !e.IgnoreError && !e.parseDespiteError && runErr != nil {
err := fmt.Errorf("exec: %w for command %q: %s", runErr, command, string(errBuf))
acc.AddError(err)
return
}
metrics, err := e.parser.Parse(out)
if err != nil {
acc.AddError(err)
return
}
if len(metrics) == 0 {
once.Do(func() {
e.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
if e.exitcodeHandler != nil {
metrics = e.exitcodeHandler(metrics, runErr, errBuf)
}
for _, m := range metrics {
acc.AddMetric(m)
}
} }
func (e *Exec) SetParser(parser telegraf.Parser) { func (e *Exec) SetParser(parser telegraf.Parser) {
@ -141,7 +68,7 @@ func (e *Exec) SetParser(parser telegraf.Parser) {
unwrapped, ok := parser.(*models.RunningParser) unwrapped, ok := parser.(*models.RunningParser)
if ok { if ok {
if _, ok := unwrapped.Parser.(*nagios.Parser); ok { if _, ok := unwrapped.Parser.(*nagios.Parser); ok {
e.exitcodeHandler = nagiosHandler e.exitCodeHandler = nagiosHandler
e.parseDespiteError = true e.parseDespiteError = true
} }
} }
@ -188,22 +115,95 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
wg.Add(len(commands)) wg.Add(len(commands))
for _, command := range commands { for _, command := range commands {
go e.ProcessCommand(command, acc, &wg) go e.processCommand(command, acc, &wg)
} }
wg.Wait() wg.Wait()
return nil return nil
} }
func (e *Exec) Init() error { func (c commandRunner) truncate(buf bytes.Buffer) bytes.Buffer {
return nil // Limit the number of bytes.
didTruncate := false
if buf.Len() > maxStderrBytes {
buf.Truncate(maxStderrBytes)
didTruncate = true
}
if i := bytes.IndexByte(buf.Bytes(), '\n'); i > 0 {
// Only show truncation if the newline wasn't the last character.
if i < buf.Len()-1 {
didTruncate = true
}
buf.Truncate(i)
}
if didTruncate {
buf.WriteString("...")
}
return buf
}
// removeWindowsCarriageReturns removes all carriage returns from the input if the
// OS is Windows. It does not return any errors.
func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer {
if runtime.GOOS == "windows" {
var buf bytes.Buffer
for {
byt, err := b.ReadBytes(0x0D)
byt = bytes.TrimRight(byt, "\x0d")
if len(byt) > 0 {
buf.Write(byt)
}
if errors.Is(err, io.EOF) {
return buf
}
}
}
return b
}
func (e *Exec) processCommand(command string, acc telegraf.Accumulator, wg *sync.WaitGroup) {
defer wg.Done()
out, errBuf, runErr := e.runner.run(command, e.Environment, time.Duration(e.Timeout))
if !e.IgnoreError && !e.parseDespiteError && runErr != nil {
err := fmt.Errorf("exec: %w for command %q: %s", runErr, command, string(errBuf))
acc.AddError(err)
return
}
metrics, err := e.parser.Parse(out)
if err != nil {
acc.AddError(err)
return
}
if len(metrics) == 0 {
once.Do(func() {
e.Log.Debug(internal.NoMetricsCreatedMsg)
})
}
if e.exitCodeHandler != nil {
metrics = e.exitCodeHandler(metrics, runErr, errBuf)
}
for _, m := range metrics {
acc.AddMetric(m)
}
} }
func nagiosHandler(metrics []telegraf.Metric, err error, msg []byte) []telegraf.Metric { func nagiosHandler(metrics []telegraf.Metric, err error, msg []byte) []telegraf.Metric {
return nagios.AddState(err, msg, metrics) return nagios.AddState(err, msg, metrics)
} }
func newExec() *Exec {
return &Exec{
runner: commandRunner{},
Timeout: config.Duration(time.Second * 5),
}
}
func init() { func init() {
inputs.Add("exec", func() telegraf.Input { inputs.Add("exec", func() telegraf.Input {
return NewExec() return newExec()
}) })
} }

View File

@ -44,12 +44,12 @@ const malformedJSON = `
"status": "green", "status": "green",
` `
type CarriageReturnTest struct { type carriageReturnTest struct {
input []byte input []byte
output []byte output []byte
} }
var crTests = []CarriageReturnTest{ var crTests = []carriageReturnTest{
{[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0d, 0x0a, 0x4c, 0x69, {[]byte{0x4c, 0x69, 0x6e, 0x65, 0x20, 0x31, 0x0d, 0x0a, 0x4c, 0x69,
0x6e, 0x65, 0x20, 0x32, 0x0d, 0x0a, 0x4c, 0x69, 0x6e, 0x65, 0x6e, 0x65, 0x20, 0x32, 0x0d, 0x0a, 0x4c, 0x69, 0x6e, 0x65,
0x20, 0x33}, 0x20, 0x33},
@ -73,7 +73,7 @@ type runnerMock struct {
err error err error
} }
func newRunnerMock(out, errout []byte, err error) Runner { func newRunnerMock(out, errout []byte, err error) runner {
return &runnerMock{ return &runnerMock{
out: out, out: out,
errout: errout, errout: errout,
@ -81,7 +81,7 @@ func newRunnerMock(out, errout []byte, err error) Runner {
} }
} }
func (r runnerMock) Run(_ string, _ []string, _ time.Duration) ([]byte, []byte, error) { func (r runnerMock) run(_ string, _ []string, _ time.Duration) ([]byte, []byte, error) {
return r.out, r.errout, r.err return r.out, r.errout, r.err
} }
@ -178,7 +178,7 @@ func TestExecCommandWithGlob(t *testing.T) {
} }
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := NewExec() e := newExec()
e.Commands = []string{"/bin/ech* metric_value"} e.Commands = []string{"/bin/ech* metric_value"}
e.SetParser(&parser) e.SetParser(&parser)
@ -198,7 +198,7 @@ func TestExecCommandWithoutGlob(t *testing.T) {
} }
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := NewExec() e := newExec()
e.Commands = []string{"/bin/echo metric_value"} e.Commands = []string{"/bin/echo metric_value"}
e.SetParser(&parser) e.SetParser(&parser)
@ -217,7 +217,7 @@ func TestExecCommandWithoutGlobAndPath(t *testing.T) {
DataType: "string", DataType: "string",
} }
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := NewExec() e := newExec()
e.Commands = []string{"echo metric_value"} e.Commands = []string{"echo metric_value"}
e.SetParser(&parser) e.SetParser(&parser)
@ -236,7 +236,7 @@ func TestExecCommandWithEnv(t *testing.T) {
DataType: "string", DataType: "string",
} }
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
e := NewExec() e := newExec()
e.Commands = []string{"/bin/sh -c 'echo ${METRIC_NAME}'"} e.Commands = []string{"/bin/sh -c 'echo ${METRIC_NAME}'"}
e.Environment = []string{"METRIC_NAME=metric_value"} e.Environment = []string{"METRIC_NAME=metric_value"}
e.SetParser(&parser) e.SetParser(&parser)
@ -283,17 +283,17 @@ func TestTruncate(t *testing.T) {
}, },
}, },
{ {
name: "should truncate to the MaxStderrBytes", name: "should truncate to the maxStderrBytes",
bufF: func() *bytes.Buffer { bufF: func() *bytes.Buffer {
var b bytes.Buffer var b bytes.Buffer
for i := 0; i < 2*MaxStderrBytes; i++ { for i := 0; i < 2*maxStderrBytes; i++ {
b.WriteByte('b') b.WriteByte('b')
} }
return &b return &b
}, },
expF: func() *bytes.Buffer { expF: func() *bytes.Buffer {
var b bytes.Buffer var b bytes.Buffer
for i := 0; i < MaxStderrBytes; i++ { for i := 0; i < maxStderrBytes; i++ {
b.WriteByte('b') b.WriteByte('b')
} }
b.WriteString("...") b.WriteString("...")
@ -302,7 +302,7 @@ func TestTruncate(t *testing.T) {
}, },
} }
c := CommandRunner{} c := commandRunner{}
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
res := c.truncate(*tt.bufF()) res := c.truncate(*tt.bufF())
@ -340,7 +340,7 @@ func TestCSVBehavior(t *testing.T) {
require.NoError(t, parser.Init()) require.NoError(t, parser.Init())
// Setup the plugin // Setup the plugin
plugin := NewExec() plugin := newExec()
plugin.Commands = []string{"echo \"a,b\n1,2\n3,4\""} plugin.Commands = []string{"echo \"a,b\n1,2\n3,4\""}
plugin.Log = testutil.Logger{} plugin.Log = testutil.Logger{}
plugin.SetParser(parser) plugin.SetParser(parser)
@ -408,7 +408,7 @@ func TestCSVBehavior(t *testing.T) {
func TestCases(t *testing.T) { func TestCases(t *testing.T) {
// Register the plugin // Register the plugin
inputs.Add("exec", func() telegraf.Input { inputs.Add("exec", func() telegraf.Input {
return NewExec() return newExec()
}) })
// Setup the plugin // Setup the plugin

View File

@ -15,7 +15,7 @@ import (
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
) )
func (c CommandRunner) Run( func (c commandRunner) run(
command string, command string,
environments []string, environments []string,
timeout time.Duration, timeout time.Duration,

View File

@ -15,7 +15,7 @@ import (
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
) )
func (c CommandRunner) Run( func (c commandRunner) run(
command string, command string,
environments []string, environments []string,
timeout time.Duration, timeout time.Duration,

View File

@ -45,6 +45,13 @@ func (*Execd) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (e *Execd) Init() error {
if len(e.Command) == 0 {
return errors.New("no command specified")
}
return nil
}
func (e *Execd) SetParser(parser telegraf.Parser) { func (e *Execd) SetParser(parser telegraf.Parser) {
e.parser = parser e.parser = parser
e.outputReader = e.cmdReadOut e.outputReader = e.cmdReadOut
@ -168,13 +175,6 @@ func (e *Execd) cmdReadErr(out io.Reader) {
} }
} }
func (e *Execd) Init() error {
if len(e.Command) == 0 {
return errors.New("no command specified")
}
return nil
}
func init() { func init() {
inputs.Add("execd", func() telegraf.Input { inputs.Add("execd", func() telegraf.Input {
return &Execd{ return &Execd{

View File

@ -23,13 +23,13 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/influx"
) )
type empty struct{}
var ( var (
envVarEscaper = strings.NewReplacer( envVarEscaper = strings.NewReplacer(
`"`, `\"`, `"`, `\"`,
`\`, `\\`, `\`, `\\`,
) )
oldpkg = "github.com/influxdata/telegraf/plugins/inputs/execd/shim"
newpkg = "github.com/influxdata/telegraf/plugins/common/shim"
) )
const ( const (
@ -50,10 +50,7 @@ type Shim struct {
stderr io.Writer stderr io.Writer
} }
var ( type empty struct{}
oldpkg = "github.com/influxdata/telegraf/plugins/inputs/execd/shim"
newpkg = "github.com/influxdata/telegraf/plugins/common/shim"
)
// New creates a new shim interface // New creates a new shim interface
func New() *Shim { func New() *Shim {
@ -166,6 +163,50 @@ loop:
return nil return nil
} }
// LoadConfig loads and adds the inputs to the shim
func (s *Shim) LoadConfig(filePath *string) error {
loadedInputs, err := LoadConfig(filePath)
if err != nil {
return err
}
return s.AddInputs(loadedInputs)
}
// DefaultImportedPlugins defaults to whatever plugins happen to be loaded and
// have registered themselves with the registry. This makes loading plugins
// without having to define a config dead easy.
func DefaultImportedPlugins() (i []telegraf.Input, e error) {
for _, inputCreatorFunc := range inputs.Inputs {
i = append(i, inputCreatorFunc())
}
return i, nil
}
// LoadConfig loads the config and returns inputs that later need to be loaded.
func LoadConfig(filePath *string) ([]telegraf.Input, error) {
if filePath == nil || *filePath == "" {
return DefaultImportedPlugins()
}
b, err := os.ReadFile(*filePath)
if err != nil {
return nil, err
}
s := expandEnvVars(b)
conf := struct {
Inputs map[string][]toml.Primitive
}{}
md, err := toml.Decode(s, &conf)
if err != nil {
return nil, err
}
return loadConfigIntoInputs(md, conf.Inputs)
}
func hasQuit(ctx context.Context) bool { func hasQuit(ctx context.Context) bool {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -252,50 +293,6 @@ func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc tel
} }
} }
// LoadConfig loads and adds the inputs to the shim
func (s *Shim) LoadConfig(filePath *string) error {
loadedInputs, err := LoadConfig(filePath)
if err != nil {
return err
}
return s.AddInputs(loadedInputs)
}
// DefaultImportedPlugins defaults to whatever plugins happen to be loaded and
// have registered themselves with the registry. This makes loading plugins
// without having to define a config dead easy.
func DefaultImportedPlugins() (i []telegraf.Input, e error) {
for _, inputCreatorFunc := range inputs.Inputs {
i = append(i, inputCreatorFunc())
}
return i, nil
}
// LoadConfig loads the config and returns inputs that later need to be loaded.
func LoadConfig(filePath *string) ([]telegraf.Input, error) {
if filePath == nil || *filePath == "" {
return DefaultImportedPlugins()
}
b, err := os.ReadFile(*filePath)
if err != nil {
return nil, err
}
s := expandEnvVars(b)
conf := struct {
Inputs map[string][]toml.Primitive
}{}
md, err := toml.Decode(s, &conf)
if err != nil {
return nil, err
}
return loadConfigIntoInputs(md, conf.Inputs)
}
func expandEnvVars(contents []byte) string { func expandEnvVars(contents []byte) string {
return os.Expand(string(contents), getEnv) return os.Expand(string(contents), getEnv)
} }

View File

@ -7,14 +7,17 @@ type inputShim struct {
Input telegraf.Input Input telegraf.Input
} }
// LogName satisfies the MetricMaker interface
func (i inputShim) LogName() string { func (i inputShim) LogName() string {
return "" return ""
} }
// MakeMetric satisfies the MetricMaker interface
func (i inputShim) MakeMetric(m telegraf.Metric) telegraf.Metric { func (i inputShim) MakeMetric(m telegraf.Metric) telegraf.Metric {
return m // don't need to do anything to it. return m // don't need to do anything to it.
} }
// Log satisfies the MetricMaker interface
func (i inputShim) Log() telegraf.Logger { func (i inputShim) Log() telegraf.Logger {
return nil return nil
} }