From e8c19d9987273580c35f073204594a17216695f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20=C5=BBak?= Date: Wed, 16 Oct 2024 12:38:11 +0200 Subject: [PATCH] chore: Fix linter findings for `revive:exported` in `plugins/inputs/d*` (#16016) --- .../docker/stats_helpers.go | 5 +- plugins/inputs/dcos/client.go | 78 ++++---- plugins/inputs/dcos/client_test.go | 36 ++-- plugins/inputs/dcos/creds.go | 30 +-- plugins/inputs/dcos/dcos.go | 92 ++++----- plugins/inputs/dcos/dcos_test.go | 18 +- .../directory_monitor/directory_monitor.go | 184 +++++++++--------- .../directory_monitor_test.go | 2 - plugins/inputs/disk/disk.go | 23 +-- plugins/inputs/disk/disk_test.go | 24 +-- plugins/inputs/diskio/diskio.go | 10 +- plugins/inputs/disque/disque.go | 67 ++++--- plugins/inputs/dmcache/dmcache.go | 3 +- plugins/inputs/dns_query/dns_query.go | 14 +- plugins/inputs/dns_query/dns_query_test.go | 6 +- plugins/inputs/docker/client.go | 32 +-- plugins/inputs/docker/docker.go | 114 +++++------ plugins/inputs/docker/docker_test.go | 66 +++---- plugins/inputs/docker/docker_testdata.go | 6 +- plugins/inputs/docker_log/client.go | 18 +- plugins/inputs/docker_log/docker_log.go | 142 +++++++------- plugins/inputs/docker_log/docker_log_test.go | 32 +-- plugins/inputs/dovecot/dovecot.go | 20 +- plugins/inputs/dpdk/dpdk.go | 40 ++-- plugins/inputs/dpdk/dpdk_cmds.go | 8 +- plugins/inputs/dpdk/dpdk_connector_test.go | 2 +- plugins/inputs/dpdk/dpdk_test.go | 70 +++---- plugins/inputs/ecs/stats.go | 2 +- 28 files changed, 569 insertions(+), 575 deletions(-) rename plugins/{inputs => common}/docker/stats_helpers.go (90%) diff --git a/plugins/inputs/docker/stats_helpers.go b/plugins/common/docker/stats_helpers.go similarity index 90% rename from plugins/inputs/docker/stats_helpers.go rename to plugins/common/docker/stats_helpers.go index 97d8fc06e..554936319 100644 --- a/plugins/inputs/docker/stats_helpers.go +++ b/plugins/common/docker/stats_helpers.go @@ -6,6 +6,7 @@ import ( "github.com/docker/docker/api/types/container" ) +// CalculateCPUPercentUnix calculate CPU usage (for Unix, in percentages) func CalculateCPUPercentUnix(previousCPU, previousSystem uint64, v *container.StatsResponse) float64 { var ( cpuPercent = 0.0 @@ -25,7 +26,8 @@ func CalculateCPUPercentUnix(previousCPU, previousSystem uint64, v *container.St return cpuPercent } -func calculateCPUPercentWindows(v *container.StatsResponse) float64 { +// CalculateCPUPercentWindows calculate CPU usage (for Windows, in percentages) +func CalculateCPUPercentWindows(v *container.StatsResponse) float64 { // Max number of 100ns intervals between the previous time read and now possIntervals := uint64(v.Read.Sub(v.PreRead).Nanoseconds()) // Start with number of ns intervals possIntervals /= 100 // Convert to number of 100ns intervals @@ -66,6 +68,7 @@ func CalculateMemUsageUnixNoCache(mem container.MemoryStats) float64 { return float64(mem.Usage) } +// CalculateMemPercentUnixNoCache calculate memory usage of the container, in percentages. func CalculateMemPercentUnixNoCache(limit, usedNoCache float64) float64 { // MemoryStats.Limit will never be 0 unless the container is not running and we haven't // got any data from cgroup diff --git a/plugins/inputs/dcos/client.go b/plugins/inputs/dcos/client.go index 01b8af20f..608abd62e 100644 --- a/plugins/inputs/dcos/client.go +++ b/plugins/inputs/dcos/client.go @@ -18,23 +18,23 @@ const ( loginDuration = 65 * time.Minute ) -// Client is an interface for communicating with the DC/OS API. -type Client interface { - SetToken(token string) +// client is an interface for communicating with the DC/OS API. +type client interface { + setToken(token string) - Login(ctx context.Context, sa *ServiceAccount) (*authToken, error) - GetSummary(ctx context.Context) (*summary, error) - GetContainers(ctx context.Context, node string) ([]container, error) - GetNodeMetrics(ctx context.Context, node string) (*metrics, error) - GetContainerMetrics(ctx context.Context, node, container string) (*metrics, error) - GetAppMetrics(ctx context.Context, node, container string) (*metrics, error) + login(ctx context.Context, sa *serviceAccount) (*authToken, error) + getSummary(ctx context.Context) (*summary, error) + getContainers(ctx context.Context, node string) ([]container, error) + getNodeMetrics(ctx context.Context, node string) (*metrics, error) + getContainerMetrics(ctx context.Context, node, container string) (*metrics, error) + getAppMetrics(ctx context.Context, node, container string) (*metrics, error) } type apiError struct { - URL string - StatusCode int - Title string - Description string + url string + statusCode int + title string + description string } // login is request data for logging in. @@ -90,7 +90,7 @@ type authToken struct { Expire time.Time } -// clusterClient is a Client that uses the cluster URL. +// clusterClient is a client that uses the cluster URL. type clusterClient struct { clusterURL *url.URL httpClient *http.Client @@ -104,13 +104,13 @@ type claims struct { } func (e apiError) Error() string { - if e.Description != "" { - return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description) + if e.description != "" { + return fmt.Sprintf("[%s] %s: %s", e.url, e.title, e.description) } - return fmt.Sprintf("[%s] %s", e.URL, e.Title) + return fmt.Sprintf("[%s] %s", e.url, e.title) } -func NewClusterClient(clusterURL *url.URL, timeout time.Duration, maxConns int, tlsConfig *tls.Config) *clusterClient { +func newClusterClient(clusterURL *url.URL, timeout time.Duration, maxConns int, tlsConfig *tls.Config) *clusterClient { httpClient := &http.Client{ Transport: &http.Transport{ MaxIdleConns: maxConns, @@ -128,11 +128,11 @@ func NewClusterClient(clusterURL *url.URL, timeout time.Duration, maxConns int, return c } -func (c *clusterClient) SetToken(token string) { +func (c *clusterClient) setToken(token string) { c.token = token } -func (c *clusterClient) Login(ctx context.Context, sa *ServiceAccount) (*authToken, error) { +func (c *clusterClient) login(ctx context.Context, sa *serviceAccount) (*authToken, error) { token, err := c.createLoginToken(sa) if err != nil { return nil, err @@ -141,7 +141,7 @@ func (c *clusterClient) Login(ctx context.Context, sa *ServiceAccount) (*authTok exp := time.Now().Add(loginDuration) body := &login{ - UID: sa.AccountID, + UID: sa.accountID, Exp: exp.Unix(), Token: token, } @@ -185,23 +185,23 @@ func (c *clusterClient) Login(ctx context.Context, sa *ServiceAccount) (*authTok err = dec.Decode(loginError) if err != nil { err := &apiError{ - URL: loc, - StatusCode: resp.StatusCode, - Title: resp.Status, + url: loc, + statusCode: resp.StatusCode, + title: resp.Status, } return nil, err } err = &apiError{ - URL: loc, - StatusCode: resp.StatusCode, - Title: loginError.Title, - Description: loginError.Description, + url: loc, + statusCode: resp.StatusCode, + title: loginError.Title, + description: loginError.Description, } return nil, err } -func (c *clusterClient) GetSummary(ctx context.Context) (*summary, error) { +func (c *clusterClient) getSummary(ctx context.Context) (*summary, error) { summary := &summary{} err := c.doGet(ctx, c.toURL("/mesos/master/state-summary"), summary) if err != nil { @@ -211,7 +211,7 @@ func (c *clusterClient) GetSummary(ctx context.Context) (*summary, error) { return summary, nil } -func (c *clusterClient) GetContainers(ctx context.Context, node string) ([]container, error) { +func (c *clusterClient) getContainers(ctx context.Context, node string) ([]container, error) { list := []string{} path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers", node) @@ -239,17 +239,17 @@ func (c *clusterClient) getMetrics(ctx context.Context, address string) (*metric return metrics, nil } -func (c *clusterClient) GetNodeMetrics(ctx context.Context, node string) (*metrics, error) { +func (c *clusterClient) getNodeMetrics(ctx context.Context, node string) (*metrics, error) { path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node) return c.getMetrics(ctx, c.toURL(path)) } -func (c *clusterClient) GetContainerMetrics(ctx context.Context, node, container string) (*metrics, error) { +func (c *clusterClient) getContainerMetrics(ctx context.Context, node, container string) (*metrics, error) { path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s", node, container) return c.getMetrics(ctx, c.toURL(path)) } -func (c *clusterClient) GetAppMetrics(ctx context.Context, node, container string) (*metrics, error) { +func (c *clusterClient) getAppMetrics(ctx context.Context, node, container string) (*metrics, error) { path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s/app", node, container) return c.getMetrics(ctx, c.toURL(path)) } @@ -298,9 +298,9 @@ func (c *clusterClient) doGet(ctx context.Context, address string, v interface{} if resp.StatusCode < 200 || resp.StatusCode >= 300 { return &apiError{ - URL: address, - StatusCode: resp.StatusCode, - Title: resp.Status, + url: address, + statusCode: resp.StatusCode, + title: resp.Status, } } @@ -318,13 +318,13 @@ func (c *clusterClient) toURL(path string) string { return clusterURL.String() } -func (c *clusterClient) createLoginToken(sa *ServiceAccount) (string, error) { +func (c *clusterClient) createLoginToken(sa *serviceAccount) (string, error) { token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims{ - UID: sa.AccountID, + UID: sa.accountID, RegisteredClaims: jwt.RegisteredClaims{ // How long we have to login with this token ExpiresAt: jwt.NewNumericDate(time.Now().Add(time.Minute * 5)), }, }) - return token.SignedString(sa.PrivateKey) + return token.SignedString(sa.privateKey) } diff --git a/plugins/inputs/dcos/client_test.go b/plugins/inputs/dcos/client_test.go index cccb20b20..f6574fca3 100644 --- a/plugins/inputs/dcos/client_test.go +++ b/plugins/inputs/dcos/client_test.go @@ -39,10 +39,10 @@ func TestLogin(t *testing.T) { responseCode: http.StatusUnauthorized, responseBody: `{"title": "x", "description": "y"}`, expectedError: &apiError{ - URL: ts.URL + "/acs/api/v1/auth/login", - StatusCode: http.StatusUnauthorized, - Title: "x", - Description: "y", + url: ts.URL + "/acs/api/v1/auth/login", + statusCode: http.StatusUnauthorized, + title: "x", + description: "y", }, expectedToken: "", }, @@ -62,12 +62,12 @@ func TestLogin(t *testing.T) { require.NoError(t, err) ctx := context.Background() - sa := &ServiceAccount{ - AccountID: "telegraf", - PrivateKey: key, + sa := &serviceAccount{ + accountID: "telegraf", + privateKey: key, } - client := NewClusterClient(u, defaultResponseTimeout, 1, nil) - auth, err := client.Login(ctx, sa) + client := newClusterClient(u, defaultResponseTimeout, 1, nil) + auth, err := client.login(ctx, sa) require.Equal(t, tt.expectedError, err) @@ -104,9 +104,9 @@ func TestGetSummary(t *testing.T) { responseBody: ``, expectedValue: nil, expectedError: &apiError{ - URL: ts.URL + "/mesos/master/state-summary", - StatusCode: http.StatusUnauthorized, - Title: "401 Unauthorized", + url: ts.URL + "/mesos/master/state-summary", + statusCode: http.StatusUnauthorized, + title: "401 Unauthorized", }, }, { @@ -136,8 +136,8 @@ func TestGetSummary(t *testing.T) { require.NoError(t, err) ctx := context.Background() - client := NewClusterClient(u, defaultResponseTimeout, 1, nil) - summary, err := client.GetSummary(ctx) + client := newClusterClient(u, defaultResponseTimeout, 1, nil) + summary, err := client.getSummary(ctx) require.Equal(t, tt.expectedError, err) require.Equal(t, tt.expectedValue, summary) @@ -177,8 +177,8 @@ func TestGetNodeMetrics(t *testing.T) { require.NoError(t, err) ctx := context.Background() - client := NewClusterClient(u, defaultResponseTimeout, 1, nil) - m, err := client.GetNodeMetrics(ctx, "foo") + client := newClusterClient(u, defaultResponseTimeout, 1, nil) + m, err := client.getNodeMetrics(ctx, "foo") require.Equal(t, tt.expectedError, err) require.Equal(t, tt.expectedValue, m) @@ -218,8 +218,8 @@ func TestGetContainerMetrics(t *testing.T) { require.NoError(t, err) ctx := context.Background() - client := NewClusterClient(u, defaultResponseTimeout, 1, nil) - m, err := client.GetContainerMetrics(ctx, "foo", "bar") + client := newClusterClient(u, defaultResponseTimeout, 1, nil) + m, err := client.getContainerMetrics(ctx, "foo", "bar") require.Equal(t, tt.expectedError, err) require.Equal(t, tt.expectedValue, m) diff --git a/plugins/inputs/dcos/creds.go b/plugins/inputs/dcos/creds.go index 521344086..411c3c732 100644 --- a/plugins/inputs/dcos/creds.go +++ b/plugins/inputs/dcos/creds.go @@ -15,27 +15,27 @@ const ( relogDuration = 5 * time.Minute ) -type Credentials interface { - Token(ctx context.Context, client Client) (string, error) - IsExpired() bool +type credentials interface { + token(ctx context.Context, client client) (string, error) + isExpired() bool } -type ServiceAccount struct { - AccountID string - PrivateKey *rsa.PrivateKey +type serviceAccount struct { + accountID string + privateKey *rsa.PrivateKey auth *authToken } -type TokenCreds struct { +type tokenCreds struct { Path string } -type NullCreds struct { +type nullCreds struct { } -func (c *ServiceAccount) Token(ctx context.Context, client Client) (string, error) { - auth, err := client.Login(ctx, c) +func (c *serviceAccount) token(ctx context.Context, client client) (string, error) { + auth, err := client.login(ctx, c) if err != nil { return "", err } @@ -43,11 +43,11 @@ func (c *ServiceAccount) Token(ctx context.Context, client Client) (string, erro return auth.Text, nil } -func (c *ServiceAccount) IsExpired() bool { +func (c *serviceAccount) isExpired() bool { return c.auth.Text != "" || c.auth.Expire.Add(relogDuration).After(time.Now()) } -func (c *TokenCreds) Token(_ context.Context, _ Client) (string, error) { +func (c *tokenCreds) token(_ context.Context, _ client) (string, error) { octets, err := os.ReadFile(c.Path) if err != nil { return "", fmt.Errorf("error reading token file %q: %w", c.Path, err) @@ -59,14 +59,14 @@ func (c *TokenCreds) Token(_ context.Context, _ Client) (string, error) { return token, nil } -func (c *TokenCreds) IsExpired() bool { +func (c *tokenCreds) isExpired() bool { return true } -func (c *NullCreds) Token(_ context.Context, _ Client) (string, error) { +func (c *nullCreds) token(_ context.Context, _ client) (string, error) { return "", nil } -func (c *NullCreds) IsExpired() bool { +func (c *nullCreds) isExpired() bool { return true } diff --git a/plugins/inputs/dcos/dcos.go b/plugins/inputs/dcos/dcos.go index e5b2f74f6..1b099fb3b 100644 --- a/plugins/inputs/dcos/dcos.go +++ b/plugins/inputs/dcos/dcos.go @@ -24,11 +24,6 @@ import ( //go:embed sample.conf var sampleConfig string -const ( - defaultMaxConnections = 10 - defaultResponseTimeout = 20 * time.Second -) - var ( nodeDimensions = []string{ "hostname", @@ -47,27 +42,32 @@ var ( } ) +const ( + defaultMaxConnections = 10 + defaultResponseTimeout = 20 * time.Second +) + type DCOS struct { ClusterURL string `toml:"cluster_url"` ServiceAccountID string `toml:"service_account_id"` - ServiceAccountPrivateKey string + ServiceAccountPrivateKey string `toml:"service_account_private_key"` - TokenFile string + TokenFile string `toml:"token_file"` - NodeInclude []string - NodeExclude []string - ContainerInclude []string - ContainerExclude []string - AppInclude []string - AppExclude []string + NodeInclude []string `toml:"node_include"` + NodeExclude []string `toml:"node_exclude"` + ContainerInclude []string `toml:"container_include"` + ContainerExclude []string `toml:"container_exclude"` + AppInclude []string `toml:"app_include"` + AppExclude []string `toml:"app_exclude"` - MaxConnections int - ResponseTimeout config.Duration + MaxConnections int `toml:"max_connections"` + ResponseTimeout config.Duration `toml:"response_timeout"` tls.ClientConfig - client Client - creds Credentials + client client + creds credentials initialized bool nodeFilter filter.Filter @@ -75,25 +75,31 @@ type DCOS struct { appFilter filter.Filter } +type point struct { + tags map[string]string + labels map[string]string + fields map[string]interface{} +} + func (*DCOS) SampleConfig() string { return sampleConfig } func (d *DCOS) Gather(acc telegraf.Accumulator) error { - err := d.init() + err := d.initialize() if err != nil { return err } ctx := context.Background() - token, err := d.creds.Token(ctx, d.client) + token, err := d.creds.token(ctx, d.client) if err != nil { return err } - d.client.SetToken(token) + d.client.setToken(token) - summary, err := d.client.GetSummary(ctx) + summary, err := d.client.getSummary(ctx) if err != nil { return err } @@ -103,7 +109,7 @@ func (d *DCOS) Gather(acc telegraf.Accumulator) error { wg.Add(1) go func(node string) { defer wg.Done() - d.GatherNode(ctx, acc, summary.Cluster, node) + d.gatherNode(ctx, acc, summary.Cluster, node) }(node.ID) } wg.Wait() @@ -111,7 +117,7 @@ func (d *DCOS) Gather(acc telegraf.Accumulator) error { return nil } -func (d *DCOS) GatherNode(ctx context.Context, acc telegraf.Accumulator, cluster, node string) { +func (d *DCOS) gatherNode(ctx context.Context, acc telegraf.Accumulator, cluster, node string) { if !d.nodeFilter.Match(node) { return } @@ -120,7 +126,7 @@ func (d *DCOS) GatherNode(ctx context.Context, acc telegraf.Accumulator, cluster wg.Add(1) go func() { defer wg.Done() - m, err := d.client.GetNodeMetrics(ctx, node) + m, err := d.client.getNodeMetrics(ctx, node) if err != nil { acc.AddError(err) return @@ -128,12 +134,12 @@ func (d *DCOS) GatherNode(ctx context.Context, acc telegraf.Accumulator, cluster d.addNodeMetrics(acc, cluster, m) }() - d.GatherContainers(ctx, acc, cluster, node) + d.gatherContainers(ctx, acc, cluster, node) wg.Wait() } -func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, cluster, node string) { - containers, err := d.client.GetContainers(ctx, node) +func (d *DCOS) gatherContainers(ctx context.Context, acc telegraf.Accumulator, cluster, node string) { + containers, err := d.client.getContainers(ctx, node) if err != nil { acc.AddError(err) return @@ -145,10 +151,10 @@ func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, c wg.Add(1) go func(container string) { defer wg.Done() - m, err := d.client.GetContainerMetrics(ctx, node, container) + m, err := d.client.getContainerMetrics(ctx, node, container) if err != nil { var apiErr apiError - if errors.As(err, &apiErr) && apiErr.StatusCode == 404 { + if errors.As(err, &apiErr) && apiErr.statusCode == 404 { return } acc.AddError(err) @@ -162,10 +168,10 @@ func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, c wg.Add(1) go func(container string) { defer wg.Done() - m, err := d.client.GetAppMetrics(ctx, node, container) + m, err := d.client.getAppMetrics(ctx, node, container) if err != nil { var apiErr apiError - if errors.As(err, &apiErr) && apiErr.StatusCode == 404 { + if errors.As(err, &apiErr) && apiErr.statusCode == 404 { return } acc.AddError(err) @@ -178,12 +184,6 @@ func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, c wg.Wait() } -type point struct { - tags map[string]string - labels map[string]string - fields map[string]interface{} -} - func (d *DCOS) createPoints(m *metrics) []*point { points := make(map[string]*point) for _, dp := range m.Datapoints { @@ -278,7 +278,7 @@ func (d *DCOS) addAppMetrics(acc telegraf.Accumulator, cluster string, m *metric d.addMetrics(acc, cluster, "dcos_app", m, appDimensions) } -func (d *DCOS) init() error { +func (d *DCOS) initialize() error { if !d.initialized { err := d.createFilters() if err != nil { @@ -306,7 +306,7 @@ func (d *DCOS) init() error { return nil } -func (d *DCOS) createClient() (Client, error) { +func (d *DCOS) createClient() (client, error) { tlsCfg, err := d.ClientConfig.TLSConfig() if err != nil { return nil, err @@ -317,7 +317,7 @@ func (d *DCOS) createClient() (Client, error) { return nil, err } - client := NewClusterClient( + client := newClusterClient( address, time.Duration(d.ResponseTimeout), d.MaxConnections, @@ -327,7 +327,7 @@ func (d *DCOS) createClient() (Client, error) { return client, nil } -func (d *DCOS) createCredentials() (Credentials, error) { +func (d *DCOS) createCredentials() (credentials, error) { if d.ServiceAccountID != "" && d.ServiceAccountPrivateKey != "" { bs, err := os.ReadFile(d.ServiceAccountPrivateKey) if err != nil { @@ -339,19 +339,19 @@ func (d *DCOS) createCredentials() (Credentials, error) { return nil, err } - creds := &ServiceAccount{ - AccountID: d.ServiceAccountID, - PrivateKey: privateKey, + creds := &serviceAccount{ + accountID: d.ServiceAccountID, + privateKey: privateKey, } return creds, nil } else if d.TokenFile != "" { - creds := &TokenCreds{ + creds := &tokenCreds{ Path: d.TokenFile, } return creds, nil } - return &NullCreds{}, nil + return &nullCreds{}, nil } func (d *DCOS) createFilters() error { diff --git a/plugins/inputs/dcos/dcos_test.go b/plugins/inputs/dcos/dcos_test.go index 266d60669..63683b9ac 100644 --- a/plugins/inputs/dcos/dcos_test.go +++ b/plugins/inputs/dcos/dcos_test.go @@ -12,7 +12,7 @@ import ( type mockClient struct { SetTokenF func() - LoginF func(ctx context.Context, sa *ServiceAccount) (*authToken, error) + LoginF func(ctx context.Context, sa *serviceAccount) (*authToken, error) GetSummaryF func() (*summary, error) GetContainersF func() ([]container, error) GetNodeMetricsF func() (*metrics, error) @@ -20,31 +20,31 @@ type mockClient struct { GetAppMetricsF func(ctx context.Context, node, container string) (*metrics, error) } -func (c *mockClient) SetToken(string) { +func (c *mockClient) setToken(string) { c.SetTokenF() } -func (c *mockClient) Login(ctx context.Context, sa *ServiceAccount) (*authToken, error) { +func (c *mockClient) login(ctx context.Context, sa *serviceAccount) (*authToken, error) { return c.LoginF(ctx, sa) } -func (c *mockClient) GetSummary(context.Context) (*summary, error) { +func (c *mockClient) getSummary(context.Context) (*summary, error) { return c.GetSummaryF() } -func (c *mockClient) GetContainers(context.Context, string) ([]container, error) { +func (c *mockClient) getContainers(context.Context, string) ([]container, error) { return c.GetContainersF() } -func (c *mockClient) GetNodeMetrics(context.Context, string) (*metrics, error) { +func (c *mockClient) getNodeMetrics(context.Context, string) (*metrics, error) { return c.GetNodeMetricsF() } -func (c *mockClient) GetContainerMetrics(ctx context.Context, node, container string) (*metrics, error) { +func (c *mockClient) getContainerMetrics(ctx context.Context, node, container string) (*metrics, error) { return c.GetContainerMetricsF(ctx, node, container) } -func (c *mockClient) GetAppMetrics(ctx context.Context, node, container string) (*metrics, error) { +func (c *mockClient) getAppMetrics(ctx context.Context, node, container string) (*metrics, error) { return c.GetAppMetricsF(ctx, node, container) } @@ -356,7 +356,7 @@ func TestGatherFilterNode(t *testing.T) { name string nodeInclude []string nodeExclude []string - client Client + client client check func(*testutil.Accumulator) []bool }{ { diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index b0b5a81a3..9e88ce49a 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -33,9 +33,7 @@ var sampleConfig string var once sync.Once -var ( - defaultFilesToMonitor = []string{} - defaultFilesToIgnore = []string{} +const ( defaultMaxBufferedMetrics = 10000 defaultDirectoryDurationThreshold = config.Duration(0 * time.Millisecond) defaultFileQueueSize = 100000 @@ -78,6 +76,94 @@ func (*DirectoryMonitor) SampleConfig() string { return sampleConfig } +func (monitor *DirectoryMonitor) SetParserFunc(fn telegraf.ParserFunc) { + monitor.parserFunc = fn +} + +func (monitor *DirectoryMonitor) Init() error { + if monitor.Directory == "" || monitor.FinishedDirectory == "" { + return errors.New("missing one of the following required config options: directory, finished_directory") + } + + if monitor.FileQueueSize <= 0 { + return errors.New("file queue size needs to be more than 0") + } + + // Finished directory can be created if not exists for convenience. + if _, err := os.Stat(monitor.FinishedDirectory); os.IsNotExist(err) { + err = os.Mkdir(monitor.FinishedDirectory, 0750) + if err != nil { + return err + } + } + + tags := map[string]string{ + "directory": monitor.Directory, + } + monitor.filesDropped = selfstat.Register("directory_monitor", "files_dropped", map[string]string{}) + monitor.filesDroppedDir = selfstat.Register("directory_monitor", "files_dropped_per_dir", tags) + monitor.filesProcessed = selfstat.Register("directory_monitor", "files_processed", map[string]string{}) + monitor.filesProcessedDir = selfstat.Register("directory_monitor", "files_processed_per_dir", tags) + monitor.filesQueuedDir = selfstat.Register("directory_monitor", "files_queue_per_dir", tags) + + // If an error directory should be used but has not been configured yet, create one ourselves. + if monitor.ErrorDirectory != "" { + if _, err := os.Stat(monitor.ErrorDirectory); os.IsNotExist(err) { + err := os.Mkdir(monitor.ErrorDirectory, 0750) + if err != nil { + return err + } + } + } + + monitor.waitGroup = &sync.WaitGroup{} + monitor.sem = semaphore.NewWeighted(int64(monitor.MaxBufferedMetrics)) + monitor.context, monitor.cancel = context.WithCancel(context.Background()) + monitor.filesToProcess = make(chan string, monitor.FileQueueSize) + + // Establish file matching / exclusion regexes. + for _, matcher := range monitor.FilesToMonitor { + regex, err := regexp.Compile(matcher) + if err != nil { + return err + } + monitor.fileRegexesToMatch = append(monitor.fileRegexesToMatch, regex) + } + + for _, matcher := range monitor.FilesToIgnore { + regex, err := regexp.Compile(matcher) + if err != nil { + return err + } + monitor.fileRegexesToIgnore = append(monitor.fileRegexesToIgnore, regex) + } + + if err := choice.Check(monitor.ParseMethod, []string{"line-by-line", "at-once"}); err != nil { + return fmt.Errorf("config option parse_method: %w", err) + } + + return nil +} + +func (monitor *DirectoryMonitor) Start(acc telegraf.Accumulator) error { + // Use tracking to determine when more metrics can be added without overflowing the outputs. + monitor.acc = acc.WithTracking(monitor.MaxBufferedMetrics) + go func() { + for range monitor.acc.Delivered() { + monitor.sem.Release(1) + } + }() + + // Monitor the files channel and read what they receive. + monitor.waitGroup.Add(1) + go func() { + monitor.monitor() + monitor.waitGroup.Done() + }() + + return nil +} + func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error { processFile := func(path string) error { // We've been cancelled via Stop(). @@ -138,25 +224,6 @@ func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error { return nil } -func (monitor *DirectoryMonitor) Start(acc telegraf.Accumulator) error { - // Use tracking to determine when more metrics can be added without overflowing the outputs. - monitor.acc = acc.WithTracking(monitor.MaxBufferedMetrics) - go func() { - for range monitor.acc.Delivered() { - monitor.sem.Release(1) - } - }() - - // Monitor the files channel and read what they receive. - monitor.waitGroup.Add(1) - go func() { - monitor.Monitor() - monitor.waitGroup.Done() - }() - - return nil -} - func (monitor *DirectoryMonitor) Stop() { // Before stopping, wrap up all file-reading routines. monitor.cancel() @@ -165,7 +232,7 @@ func (monitor *DirectoryMonitor) Stop() { monitor.waitGroup.Wait() } -func (monitor *DirectoryMonitor) Monitor() { +func (monitor *DirectoryMonitor) monitor() { for filePath := range monitor.filesToProcess { if monitor.context.Err() != nil { return @@ -400,80 +467,9 @@ func (monitor *DirectoryMonitor) isIgnoredFile(fileName string) bool { return false } -func (monitor *DirectoryMonitor) SetParserFunc(fn telegraf.ParserFunc) { - monitor.parserFunc = fn -} - -func (monitor *DirectoryMonitor) Init() error { - if monitor.Directory == "" || monitor.FinishedDirectory == "" { - return errors.New("missing one of the following required config options: directory, finished_directory") - } - - if monitor.FileQueueSize <= 0 { - return errors.New("file queue size needs to be more than 0") - } - - // Finished directory can be created if not exists for convenience. - if _, err := os.Stat(monitor.FinishedDirectory); os.IsNotExist(err) { - err = os.Mkdir(monitor.FinishedDirectory, 0750) - if err != nil { - return err - } - } - - tags := map[string]string{ - "directory": monitor.Directory, - } - monitor.filesDropped = selfstat.Register("directory_monitor", "files_dropped", map[string]string{}) - monitor.filesDroppedDir = selfstat.Register("directory_monitor", "files_dropped_per_dir", tags) - monitor.filesProcessed = selfstat.Register("directory_monitor", "files_processed", map[string]string{}) - monitor.filesProcessedDir = selfstat.Register("directory_monitor", "files_processed_per_dir", tags) - monitor.filesQueuedDir = selfstat.Register("directory_monitor", "files_queue_per_dir", tags) - - // If an error directory should be used but has not been configured yet, create one ourselves. - if monitor.ErrorDirectory != "" { - if _, err := os.Stat(monitor.ErrorDirectory); os.IsNotExist(err) { - err := os.Mkdir(monitor.ErrorDirectory, 0750) - if err != nil { - return err - } - } - } - - monitor.waitGroup = &sync.WaitGroup{} - monitor.sem = semaphore.NewWeighted(int64(monitor.MaxBufferedMetrics)) - monitor.context, monitor.cancel = context.WithCancel(context.Background()) - monitor.filesToProcess = make(chan string, monitor.FileQueueSize) - - // Establish file matching / exclusion regexes. - for _, matcher := range monitor.FilesToMonitor { - regex, err := regexp.Compile(matcher) - if err != nil { - return err - } - monitor.fileRegexesToMatch = append(monitor.fileRegexesToMatch, regex) - } - - for _, matcher := range monitor.FilesToIgnore { - regex, err := regexp.Compile(matcher) - if err != nil { - return err - } - monitor.fileRegexesToIgnore = append(monitor.fileRegexesToIgnore, regex) - } - - if err := choice.Check(monitor.ParseMethod, []string{"line-by-line", "at-once"}); err != nil { - return fmt.Errorf("config option parse_method: %w", err) - } - - return nil -} - func init() { inputs.Add("directory_monitor", func() telegraf.Input { return &DirectoryMonitor{ - FilesToMonitor: defaultFilesToMonitor, - FilesToIgnore: defaultFilesToIgnore, MaxBufferedMetrics: defaultMaxBufferedMetrics, DirectoryDurationThreshold: defaultDirectoryDurationThreshold, FileQueueSize: defaultFileQueueSize, diff --git a/plugins/inputs/directory_monitor/directory_monitor_test.go b/plugins/inputs/directory_monitor/directory_monitor_test.go index d6004b868..35b8f39cc 100644 --- a/plugins/inputs/directory_monitor/directory_monitor_test.go +++ b/plugins/inputs/directory_monitor/directory_monitor_test.go @@ -22,8 +22,6 @@ func TestCreator(t *testing.T) { require.True(t, found) expected := &DirectoryMonitor{ - FilesToMonitor: defaultFilesToMonitor, - FilesToIgnore: defaultFilesToIgnore, MaxBufferedMetrics: defaultMaxBufferedMetrics, DirectoryDurationThreshold: defaultDirectoryDurationThreshold, FileQueueSize: defaultFileQueueSize, diff --git a/plugins/inputs/disk/disk.go b/plugins/inputs/disk/disk.go index 96c7a0785..a6cc71933 100644 --- a/plugins/inputs/disk/disk.go +++ b/plugins/inputs/disk/disk.go @@ -6,16 +6,17 @@ import ( "fmt" "strings" + "github.com/shirou/gopsutil/v4/disk" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs/system" - "github.com/shirou/gopsutil/v4/disk" ) //go:embed sample.conf var sampleConfig string -type DiskStats struct { +type Disk struct { MountPoints []string `toml:"mount_points"` IgnoreFS []string `toml:"ignore_fs"` IgnoreMountOpts []string `toml:"ignore_mount_opts"` @@ -24,11 +25,11 @@ type DiskStats struct { ps system.PS } -func (*DiskStats) SampleConfig() string { +func (*Disk) SampleConfig() string { return sampleConfig } -func (ds *DiskStats) Init() error { +func (ds *Disk) Init() error { ps := system.NewSystemPS() ps.Log = ds.Log ds.ps = ps @@ -36,7 +37,7 @@ func (ds *DiskStats) Init() error { return nil } -func (ds *DiskStats) Gather(acc telegraf.Accumulator) error { +func (ds *Disk) Gather(acc telegraf.Accumulator) error { disks, partitions, err := ds.ps.DiskUsage(ds.MountPoints, ds.IgnoreMountOpts, ds.IgnoreFS) if err != nil { return fmt.Errorf("error getting disk usage info: %w", err) @@ -48,12 +49,12 @@ func (ds *DiskStats) Gather(acc telegraf.Accumulator) error { } device := partitions[i].Device - mountOpts := MountOptions(partitions[i].Opts) + mountOpts := mountOptions(partitions[i].Opts) tags := map[string]string{ "path": du.Path, "device": strings.ReplaceAll(device, "/dev/", ""), "fstype": du.Fstype, - "mode": mountOpts.Mode(), + "mode": mountOpts.mode(), } label, err := disk.Label(strings.TrimPrefix(device, "/dev/")) @@ -89,9 +90,9 @@ func (ds *DiskStats) Gather(acc telegraf.Accumulator) error { return nil } -type MountOptions []string +type mountOptions []string -func (opts MountOptions) Mode() string { +func (opts mountOptions) mode() string { if opts.exists("rw") { return "rw" } else if opts.exists("ro") { @@ -100,7 +101,7 @@ func (opts MountOptions) Mode() string { return "unknown" } -func (opts MountOptions) exists(opt string) bool { +func (opts mountOptions) exists(opt string) bool { for _, o := range opts { if o == opt { return true @@ -111,6 +112,6 @@ func (opts MountOptions) exists(opt string) bool { func init() { inputs.Add("disk", func() telegraf.Input { - return &DiskStats{} + return &Disk{} }) } diff --git a/plugins/inputs/disk/disk_test.go b/plugins/inputs/disk/disk_test.go index cb469a688..f94cb6804 100644 --- a/plugins/inputs/disk/disk_test.go +++ b/plugins/inputs/disk/disk_test.go @@ -18,10 +18,6 @@ import ( "github.com/influxdata/telegraf/testutil" ) -type MockFileInfo struct { - os.FileInfo -} - func TestDiskUsage(t *testing.T) { mck := &mock.Mock{} mps := system.MockPSDisk{SystemPS: &system.SystemPS{PSDiskDeps: &system.MockDiskUsage{Mock: mck}}, Mock: mck} @@ -89,7 +85,7 @@ func TestDiskUsage(t *testing.T) { mps.On("PSDiskUsage", "/home").Return(&duAll[1], nil) mps.On("PSDiskUsage", "/var/rootbind").Return(&duAll[2], nil) - err = (&DiskStats{ps: mps}).Gather(&acc) + err = (&Disk{ps: mps}).Gather(&acc) require.NoError(t, err) numDiskMetrics := acc.NFields() @@ -151,18 +147,18 @@ func TestDiskUsage(t *testing.T) { // We expect 7 more DiskMetrics to show up with an explicit match on "/" // and /home not matching the /dev in MountPoints - err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc) + err = (&Disk{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc) require.NoError(t, err) require.Equal(t, expectedAllDiskMetrics+8, acc.NFields()) // We should see all the diskpoints as MountPoints includes both // /, /home, and /var/rootbind - err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/home", "/var/rootbind"}}).Gather(&acc) + err = (&Disk{ps: &mps, MountPoints: []string{"/", "/home", "/var/rootbind"}}).Gather(&acc) require.NoError(t, err) require.Equal(t, expectedAllDiskMetrics+8*4, acc.NFields()) // We should see all the mounts as MountPoints except the bind mound - err = (&DiskStats{ps: &mps, IgnoreMountOpts: []string{"bind"}}).Gather(&acc) + err = (&Disk{ps: &mps, IgnoreMountOpts: []string{"bind"}}).Gather(&acc) require.NoError(t, err) require.Equal(t, expectedAllDiskMetrics+8*6, acc.NFields()) } @@ -296,7 +292,7 @@ func TestDiskUsageHostMountPrefix(t *testing.T) { mps.On("OSGetenv", "HOST_MOUNT_PREFIX").Return(tt.hostMountPrefix) - err = (&DiskStats{ps: mps}).Gather(&acc) + err = (&Disk{ps: mps}).Gather(&acc) require.NoError(t, err) acc.AssertContainsTaggedFields(t, "disk", tt.expectedFields, tt.expectedTags) @@ -426,7 +422,7 @@ func TestDiskStats(t *testing.T) { mps.On("DiskUsage", []string{"/", "/home", "/var/rootbind"}, []string(nil), []string(nil)).Return(duAll, psAll, nil) mps.On("DiskUsage", []string(nil), []string{"bind"}, []string(nil)).Return(duOptFiltered, psOptFiltered, nil) - err = (&DiskStats{ps: &mps}).Gather(&acc) + err = (&Disk{ps: &mps}).Gather(&acc) require.NoError(t, err) numDiskMetrics := acc.NFields() @@ -471,18 +467,18 @@ func TestDiskStats(t *testing.T) { // We expect 7 more DiskMetrics to show up with an explicit match on "/" // and /home and /var/rootbind not matching the /dev in MountPoints - err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc) + err = (&Disk{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc) require.NoError(t, err) require.Equal(t, expectedAllDiskMetrics+8, acc.NFields()) // We should see all the diskpoints as MountPoints includes both // /, /home, and /var/rootbind - err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/home", "/var/rootbind"}}).Gather(&acc) + err = (&Disk{ps: &mps, MountPoints: []string{"/", "/home", "/var/rootbind"}}).Gather(&acc) require.NoError(t, err) require.Equal(t, expectedAllDiskMetrics+8*4, acc.NFields()) // We should see all the mounts as MountPoints except the bind mound - err = (&DiskStats{ps: &mps, IgnoreMountOpts: []string{"bind"}}).Gather(&acc) + err = (&Disk{ps: &mps, IgnoreMountOpts: []string{"bind"}}).Gather(&acc) require.NoError(t, err) require.Equal(t, expectedAllDiskMetrics+8*6, acc.NFields()) } @@ -654,7 +650,7 @@ func TestDiskUsageIssues(t *testing.T) { // Setup the plugin and run the test var acc testutil.Accumulator - plugin := &DiskStats{ps: &mps} + plugin := &Disk{ps: &mps} require.NoError(t, plugin.Gather(&acc)) actual := acc.GetTelegrafMetrics() diff --git a/plugins/inputs/diskio/diskio.go b/plugins/inputs/diskio/diskio.go index e5fee0dca..da5252de7 100644 --- a/plugins/inputs/diskio/diskio.go +++ b/plugins/inputs/diskio/diskio.go @@ -23,11 +23,6 @@ var ( varRegex = regexp.MustCompile(`\$(?:\w+|\{\w+\})`) ) -// hasMeta reports whether s contains any special glob characters. -func hasMeta(s string) bool { - return strings.ContainsAny(s, "*?[") -} - type DiskIO struct { Devices []string `toml:"devices"` DeviceTags []string `toml:"device_tags"` @@ -151,6 +146,11 @@ func (d *DiskIO) Gather(acc telegraf.Accumulator) error { return nil } +// hasMeta reports whether s contains any special glob characters. +func hasMeta(s string) bool { + return strings.ContainsAny(s, "*?[") +} + func (d *DiskIO) diskName(devName string) (string, []string) { di, err := d.diskInfo(devName) devLinks := strings.Split(di["DEVLINKS"], " ") diff --git a/plugins/inputs/disque/disque.go b/plugins/inputs/disque/disque.go index 07309fecd..5800b1415 100644 --- a/plugins/inputs/disque/disque.go +++ b/plugins/inputs/disque/disque.go @@ -20,52 +20,53 @@ import ( //go:embed sample.conf var sampleConfig string +var ( + defaultTimeout = 5 * time.Second + tracking = map[string]string{ + "uptime_in_seconds": "uptime", + "connected_clients": "clients", + "blocked_clients": "blocked_clients", + "used_memory": "used_memory", + "used_memory_rss": "used_memory_rss", + "used_memory_peak": "used_memory_peak", + "total_connections_received": "total_connections_received", + "total_commands_processed": "total_commands_processed", + "instantaneous_ops_per_sec": "instantaneous_ops_per_sec", + "latest_fork_usec": "latest_fork_usec", + "mem_fragmentation_ratio": "mem_fragmentation_ratio", + "used_cpu_sys": "used_cpu_sys", + "used_cpu_user": "used_cpu_user", + "used_cpu_sys_children": "used_cpu_sys_children", + "used_cpu_user_children": "used_cpu_user_children", + "registered_jobs": "registered_jobs", + "registered_queues": "registered_queues", + } + errProtocol = errors.New("disque protocol error") +) + +const ( + defaultPort = "7711" +) + type Disque struct { - Servers []string + Servers []string `toml:"servers"` c net.Conn } -var defaultTimeout = 5 * time.Second - -var Tracking = map[string]string{ - "uptime_in_seconds": "uptime", - "connected_clients": "clients", - "blocked_clients": "blocked_clients", - "used_memory": "used_memory", - "used_memory_rss": "used_memory_rss", - "used_memory_peak": "used_memory_peak", - "total_connections_received": "total_connections_received", - "total_commands_processed": "total_commands_processed", - "instantaneous_ops_per_sec": "instantaneous_ops_per_sec", - "latest_fork_usec": "latest_fork_usec", - "mem_fragmentation_ratio": "mem_fragmentation_ratio", - "used_cpu_sys": "used_cpu_sys", - "used_cpu_user": "used_cpu_user", - "used_cpu_sys_children": "used_cpu_sys_children", - "used_cpu_user_children": "used_cpu_user_children", - "registered_jobs": "registered_jobs", - "registered_queues": "registered_queues", -} - -var ErrProtocolError = errors.New("disque protocol error") - func (*Disque) SampleConfig() string { return sampleConfig } -// Reads stats from all configured servers accumulates stats. -// Returns one of the errors encountered while gather stats (if any). func (d *Disque) Gather(acc telegraf.Accumulator) error { if len(d.Servers) == 0 { address := &url.URL{ - Host: ":7711", + Host: ":" + defaultPort, } return d.gatherServer(address, acc) } var wg sync.WaitGroup - for _, serv := range d.Servers { u, err := url.Parse(serv) if err != nil { @@ -89,8 +90,6 @@ func (d *Disque) Gather(acc telegraf.Accumulator) error { return nil } -const defaultPort = "7711" - func (d *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { if d.c == nil { _, _, err := net.SplitHostPort(addr.Host) @@ -142,7 +141,7 @@ func (d *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { } if line[0] != '$' { - return fmt.Errorf("bad line start: %w", ErrProtocolError) + return fmt.Errorf("bad line start: %w", errProtocol) } line = strings.TrimSpace(line) @@ -151,7 +150,7 @@ func (d *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { sz, err := strconv.Atoi(szStr) if err != nil { - return fmt.Errorf("bad size string <<%s>>: %w", szStr, ErrProtocolError) + return fmt.Errorf("bad size string <<%s>>: %w", szStr, errProtocol) } var read int @@ -174,7 +173,7 @@ func (d *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error { name := parts[0] - metric, ok := Tracking[name] + metric, ok := tracking[name] if !ok { continue } diff --git a/plugins/inputs/dmcache/dmcache.go b/plugins/inputs/dmcache/dmcache.go index cb23d9eab..4b52d5491 100644 --- a/plugins/inputs/dmcache/dmcache.go +++ b/plugins/inputs/dmcache/dmcache.go @@ -12,7 +12,8 @@ import ( var sampleConfig string type DMCache struct { - PerDevice bool `toml:"per_device"` + PerDevice bool `toml:"per_device"` + getCurrentStatus func() ([]string, error) } diff --git a/plugins/inputs/dns_query/dns_query.go b/plugins/inputs/dns_query/dns_query.go index cfef4adfb..400a2c203 100644 --- a/plugins/inputs/dns_query/dns_query.go +++ b/plugins/inputs/dns_query/dns_query.go @@ -25,12 +25,12 @@ var ignoredErrors = []string{ "NXDOMAIN", } -type ResultType uint64 +type resultType uint64 const ( - Success ResultType = iota - Timeout - Error + successResult resultType = iota + timeoutResult + errorResult ) type DNSQuery struct { @@ -117,7 +117,7 @@ func (d *DNSQuery) query(domain, server string) (map[string]interface{}, map[str fields := map[string]interface{}{ "query_time_ms": float64(0), - "result_code": uint64(Error), + "result_code": uint64(errorResult), } c := dns.Client{ @@ -140,7 +140,7 @@ func (d *DNSQuery) query(domain, server string) (map[string]interface{}, map[str var opErr *net.OpError if errors.As(err, &opErr) && opErr.Timeout() { tags["result"] = "timeout" - fields["result_code"] = uint64(Timeout) + fields["result_code"] = uint64(timeoutResult) return fields, tags, err } return fields, tags, err @@ -158,7 +158,7 @@ func (d *DNSQuery) query(domain, server string) (map[string]interface{}, map[str // Success tags["result"] = "success" - fields["result_code"] = uint64(Success) + fields["result_code"] = uint64(successResult) // Fill out custom fields for specific record types for _, record := range r.Answer { diff --git a/plugins/inputs/dns_query/dns_query_test.go b/plugins/inputs/dns_query/dns_query_test.go index e21550399..a7b7aa081 100644 --- a/plugins/inputs/dns_query/dns_query_test.go +++ b/plugins/inputs/dns_query/dns_query_test.go @@ -13,8 +13,10 @@ import ( "github.com/influxdata/telegraf/testutil" ) -var servers = []string{"8.8.8.8"} -var domains = []string{"google.com"} +var ( + servers = []string{"8.8.8.8"} + domains = []string{"google.com"} +) func TestGathering(t *testing.T) { if testing.Short() { diff --git a/plugins/inputs/docker/client.go b/plugins/inputs/docker/client.go index 6ef50aeb6..3d1f79adc 100644 --- a/plugins/inputs/docker/client.go +++ b/plugins/inputs/docker/client.go @@ -16,7 +16,7 @@ var ( defaultHeaders = map[string]string{"User-Agent": "engine-api-cli-1.0"} ) -type Client interface { +type dockerClient interface { Info(ctx context.Context) (system.Info, error) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) ContainerStats(ctx context.Context, containerID string, stream bool) (container.StatsResponseReader, error) @@ -29,15 +29,15 @@ type Client interface { Close() error } -func NewEnvClient() (Client, error) { +func newEnvClient() (dockerClient, error) { dockerClient, err := client.NewClientWithOpts(client.FromEnv) if err != nil { return nil, err } - return &SocketClient{dockerClient}, nil + return &socketClient{dockerClient}, nil } -func NewClient(host string, tlsConfig *tls.Config) (Client, error) { +func newClient(host string, tlsConfig *tls.Config) (dockerClient, error) { transport := &http.Transport{ TLSClientConfig: tlsConfig, } @@ -52,42 +52,42 @@ func NewClient(host string, tlsConfig *tls.Config) (Client, error) { return nil, err } - return &SocketClient{dockerClient}, nil + return &socketClient{dockerClient}, nil } -type SocketClient struct { +type socketClient struct { client *client.Client } -func (c *SocketClient) Info(ctx context.Context) (system.Info, error) { +func (c *socketClient) Info(ctx context.Context) (system.Info, error) { return c.client.Info(ctx) } -func (c *SocketClient) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) { +func (c *socketClient) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) { return c.client.ContainerList(ctx, options) } -func (c *SocketClient) ContainerStats(ctx context.Context, containerID string, stream bool) (container.StatsResponseReader, error) { +func (c *socketClient) ContainerStats(ctx context.Context, containerID string, stream bool) (container.StatsResponseReader, error) { return c.client.ContainerStats(ctx, containerID, stream) } -func (c *SocketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { +func (c *socketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { return c.client.ContainerInspect(ctx, containerID) } -func (c *SocketClient) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) { +func (c *socketClient) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) { return c.client.ServiceList(ctx, options) } -func (c *SocketClient) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) { +func (c *socketClient) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) { return c.client.TaskList(ctx, options) } -func (c *SocketClient) NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error) { +func (c *socketClient) NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error) { return c.client.NodeList(ctx, options) } -func (c *SocketClient) DiskUsage(ctx context.Context, options types.DiskUsageOptions) (types.DiskUsage, error) { +func (c *socketClient) DiskUsage(ctx context.Context, options types.DiskUsageOptions) (types.DiskUsage, error) { return c.client.DiskUsage(ctx, options) } -func (c *SocketClient) ClientVersion() string { +func (c *socketClient) ClientVersion() string { return c.client.ClientVersion() } -func (c *SocketClient) Close() error { +func (c *socketClient) Close() error { return c.client.Close() } diff --git a/plugins/inputs/docker/docker.go b/plugins/inputs/docker/docker.go index a8d8a71c0..a703da0cf 100644 --- a/plugins/inputs/docker/docker.go +++ b/plugins/inputs/docker/docker.go @@ -26,6 +26,7 @@ import ( "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/internal/docker" + docker_stats "github.com/influxdata/telegraf/plugins/common/docker" common_tls "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -33,48 +34,15 @@ import ( //go:embed sample.conf var sampleConfig string -// Docker object -type Docker struct { - Endpoint string - ContainerNames []string `toml:"container_names" deprecated:"1.4.0;1.35.0;use 'container_name_include' instead"` +var ( + sizeRegex = regexp.MustCompile(`^(\d+(\.\d+)*) ?([kKmMgGtTpP])?[bB]?$`) + containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"} + containerMetricClasses = []string{"cpu", "network", "blkio"} + now = time.Now - GatherServices bool `toml:"gather_services"` - - Timeout config.Duration - PerDevice bool `toml:"perdevice" deprecated:"1.18.0;1.35.0;use 'perdevice_include' instead"` - PerDeviceInclude []string `toml:"perdevice_include"` - Total bool `toml:"total" deprecated:"1.18.0;1.35.0;use 'total_include' instead"` - TotalInclude []string `toml:"total_include"` - TagEnvironment []string `toml:"tag_env"` - LabelInclude []string `toml:"docker_label_include"` - LabelExclude []string `toml:"docker_label_exclude"` - - ContainerInclude []string `toml:"container_name_include"` - ContainerExclude []string `toml:"container_name_exclude"` - - ContainerStateInclude []string `toml:"container_state_include"` - ContainerStateExclude []string `toml:"container_state_exclude"` - - StorageObjects []string `toml:"storage_objects"` - - IncludeSourceTag bool `toml:"source_tag"` - - Log telegraf.Logger - - common_tls.ClientConfig - - newEnvClient func() (Client, error) - newClient func(string, *tls.Config) (Client, error) - - client Client - engineHost string - serverVersion string - filtersCreated bool - labelFilter filter.Filter - containerFilter filter.Filter - stateFilter filter.Filter - objectTypes []types.DiskUsageObject -} + minVersion = semver.MustParse("1.23") + minDiskUsageVersion = semver.MustParse("1.42") +) // KB, MB, GB, TB, PB...human friendly const ( @@ -87,15 +55,48 @@ const ( defaultEndpoint = "unix:///var/run/docker.sock" ) -var ( - sizeRegex = regexp.MustCompile(`^(\d+(\.\d+)*) ?([kKmMgGtTpP])?[bB]?$`) - containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"} - containerMetricClasses = []string{"cpu", "network", "blkio"} - now = time.Now +// Docker object +type Docker struct { + Endpoint string `toml:"endpoint"` + ContainerNames []string `toml:"container_names" deprecated:"1.4.0;1.35.0;use 'container_name_include' instead"` - minVersion = semver.MustParse("1.23") - minDiskUsageVersion = semver.MustParse("1.42") -) + GatherServices bool `toml:"gather_services"` + + Timeout config.Duration `toml:"timeout"` + PerDevice bool `toml:"perdevice" deprecated:"1.18.0;1.35.0;use 'perdevice_include' instead"` + PerDeviceInclude []string `toml:"perdevice_include"` + Total bool `toml:"total" deprecated:"1.18.0;1.35.0;use 'total_include' instead"` + TotalInclude []string `toml:"total_include"` + TagEnvironment []string `toml:"tag_env"` + LabelInclude []string `toml:"docker_label_include"` + LabelExclude []string `toml:"docker_label_exclude"` + + ContainerInclude []string `toml:"container_name_include"` + ContainerExclude []string `toml:"container_name_exclude"` + + ContainerStateInclude []string `toml:"container_state_include"` + ContainerStateExclude []string `toml:"container_state_exclude"` + + StorageObjects []string `toml:"storage_objects"` + + IncludeSourceTag bool `toml:"source_tag"` + + Log telegraf.Logger `toml:"-"` + + common_tls.ClientConfig + + newEnvClient func() (dockerClient, error) + newClient func(string, *tls.Config) (dockerClient, error) + + client dockerClient + engineHost string + serverVersion string + filtersCreated bool + labelFilter filter.Filter + containerFilter filter.Filter + stateFilter filter.Filter + objectTypes []types.DiskUsageObject +} func (*Docker) SampleConfig() string { return sampleConfig @@ -149,7 +150,6 @@ func (d *Docker) Init() error { return nil } -// Gather metrics from the docker server. func (d *Docker) Gather(acc telegraf.Accumulator) error { if d.client == nil { c, err := d.getNewClient() @@ -664,10 +664,10 @@ func (d *Docker) parseContainerStats( memfields["limit"] = stat.MemoryStats.Limit memfields["max_usage"] = stat.MemoryStats.MaxUsage - mem := CalculateMemUsageUnixNoCache(stat.MemoryStats) + mem := docker_stats.CalculateMemUsageUnixNoCache(stat.MemoryStats) memLimit := float64(stat.MemoryStats.Limit) memfields["usage"] = uint64(mem) - memfields["usage_percent"] = CalculateMemPercentUnixNoCache(memLimit, mem) + memfields["usage_percent"] = docker_stats.CalculateMemPercentUnixNoCache(memLimit, mem) } else { memfields["commit_bytes"] = stat.MemoryStats.Commit memfields["commit_peak_bytes"] = stat.MemoryStats.CommitPeak @@ -691,10 +691,10 @@ func (d *Docker) parseContainerStats( if daemonOSType != "windows" { previousCPU := stat.PreCPUStats.CPUUsage.TotalUsage previousSystem := stat.PreCPUStats.SystemUsage - cpuPercent := CalculateCPUPercentUnix(previousCPU, previousSystem, stat) + cpuPercent := docker_stats.CalculateCPUPercentUnix(previousCPU, previousSystem, stat) cpufields["usage_percent"] = cpuPercent } else { - cpuPercent := calculateCPUPercentWindows(stat) + cpuPercent := docker_stats.CalculateCPUPercentWindows(stat) cpufields["usage_percent"] = cpuPercent } @@ -1046,7 +1046,7 @@ func (d *Docker) createContainerStateFilters() error { return nil } -func (d *Docker) getNewClient() (Client, error) { +func (d *Docker) getNewClient() (dockerClient, error) { if d.Endpoint == "ENV" { return d.newEnvClient() } @@ -1067,8 +1067,8 @@ func init() { TotalInclude: []string{"cpu", "blkio", "network"}, Timeout: config.Duration(time.Second * 5), Endpoint: defaultEndpoint, - newEnvClient: NewEnvClient, - newClient: NewClient, + newEnvClient: newEnvClient, + newClient: newClient, filtersCreated: false, } }) diff --git a/plugins/inputs/docker/docker_test.go b/plugins/inputs/docker/docker_test.go index 0172a1a55..14b826a9f 100644 --- a/plugins/inputs/docker/docker_test.go +++ b/plugins/inputs/docker/docker_test.go @@ -21,7 +21,7 @@ import ( "github.com/influxdata/telegraf/testutil" ) -type MockClient struct { +type mockClient struct { InfoF func() (system.Info, error) ContainerListF func(options container.ListOptions) ([]types.Container, error) ContainerStatsF func(containerID string) (container.StatsResponseReader, error) @@ -34,47 +34,47 @@ type MockClient struct { CloseF func() error } -func (c *MockClient) Info(context.Context) (system.Info, error) { +func (c *mockClient) Info(context.Context) (system.Info, error) { return c.InfoF() } -func (c *MockClient) ContainerList(_ context.Context, options container.ListOptions) ([]types.Container, error) { +func (c *mockClient) ContainerList(_ context.Context, options container.ListOptions) ([]types.Container, error) { return c.ContainerListF(options) } -func (c *MockClient) ContainerStats(_ context.Context, containerID string, _ bool) (container.StatsResponseReader, error) { +func (c *mockClient) ContainerStats(_ context.Context, containerID string, _ bool) (container.StatsResponseReader, error) { return c.ContainerStatsF(containerID) } -func (c *MockClient) ContainerInspect(context.Context, string) (types.ContainerJSON, error) { +func (c *mockClient) ContainerInspect(context.Context, string) (types.ContainerJSON, error) { return c.ContainerInspectF() } -func (c *MockClient) ServiceList(context.Context, types.ServiceListOptions) ([]swarm.Service, error) { +func (c *mockClient) ServiceList(context.Context, types.ServiceListOptions) ([]swarm.Service, error) { return c.ServiceListF() } -func (c *MockClient) TaskList(context.Context, types.TaskListOptions) ([]swarm.Task, error) { +func (c *mockClient) TaskList(context.Context, types.TaskListOptions) ([]swarm.Task, error) { return c.TaskListF() } -func (c *MockClient) NodeList(context.Context, types.NodeListOptions) ([]swarm.Node, error) { +func (c *mockClient) NodeList(context.Context, types.NodeListOptions) ([]swarm.Node, error) { return c.NodeListF() } -func (c *MockClient) DiskUsage(context.Context, types.DiskUsageOptions) (types.DiskUsage, error) { +func (c *mockClient) DiskUsage(context.Context, types.DiskUsageOptions) (types.DiskUsage, error) { return c.DiskUsageF() } -func (c *MockClient) ClientVersion() string { +func (c *mockClient) ClientVersion() string { return c.ClientVersionF() } -func (c *MockClient) Close() error { +func (c *mockClient) Close() error { return c.CloseF() } -var baseClient = MockClient{ +var baseClient = mockClient{ InfoF: func() (system.Info, error) { return info, nil }, @@ -88,13 +88,13 @@ var baseClient = MockClient{ return containerInspect(), nil }, ServiceListF: func() ([]swarm.Service, error) { - return ServiceList, nil + return serviceList, nil }, TaskListF: func() ([]swarm.Task, error) { - return TaskList, nil + return taskList, nil }, NodeListF: func() ([]swarm.Node, error) { - return NodeList, nil + return nodeList, nil }, DiskUsageF: func() (types.DiskUsage, error) { return diskUsage, nil @@ -421,8 +421,8 @@ func TestDocker_WindowsMemoryContainerStats(t *testing.T) { d := Docker{ Log: testutil.Logger{}, - newClient: func(string, *tls.Config) (Client, error) { - return &MockClient{ + newClient: func(string, *tls.Config) (dockerClient, error) { + return &mockClient{ InfoF: func() (system.Info, error) { return info, nil }, @@ -436,13 +436,13 @@ func TestDocker_WindowsMemoryContainerStats(t *testing.T) { return containerInspect(), nil }, ServiceListF: func() ([]swarm.Service, error) { - return ServiceList, nil + return serviceList, nil }, TaskListF: func() ([]swarm.Task, error) { - return TaskList, nil + return taskList, nil }, NodeListF: func() ([]swarm.Node, error) { - return NodeList, nil + return nodeList, nil }, DiskUsageF: func() (types.DiskUsage, error) { return diskUsage, nil @@ -559,7 +559,7 @@ func TestContainerLabels(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var acc testutil.Accumulator - newClientFunc := func(string, *tls.Config) (Client, error) { + newClientFunc := func(string, *tls.Config) (dockerClient, error) { client := baseClient client.ContainerListF = func(container.ListOptions) ([]types.Container, error) { return []types.Container{tt.container}, nil @@ -679,7 +679,7 @@ func TestContainerNames(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var acc testutil.Accumulator - newClientFunc := func(string, *tls.Config) (Client, error) { + newClientFunc := func(string, *tls.Config) (dockerClient, error) { client := baseClient client.ContainerListF = func(container.ListOptions) ([]types.Container, error) { return containerList, nil @@ -720,7 +720,7 @@ func TestContainerNames(t *testing.T) { } } -func FilterMetrics(metrics []telegraf.Metric, f func(telegraf.Metric) bool) []telegraf.Metric { +func filterMetrics(metrics []telegraf.Metric, f func(telegraf.Metric) bool) []telegraf.Metric { results := []telegraf.Metric{} for _, m := range metrics { if f(m) { @@ -889,7 +889,7 @@ func TestContainerStatus(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var ( acc testutil.Accumulator - newClientFunc = func(string, *tls.Config) (Client, error) { + newClientFunc = func(string, *tls.Config) (dockerClient, error) { client := baseClient client.ContainerListF = func(container.ListOptions) ([]types.Container, error) { return containerList[:1], nil @@ -918,7 +918,7 @@ func TestContainerStatus(t *testing.T) { err := d.Gather(&acc) require.NoError(t, err) - actual := FilterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool { + actual := filterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool { return m.Name() == "docker_container_status" }) testutil.RequireMetricsEqual(t, tt.expected, actual) @@ -930,7 +930,7 @@ func TestDockerGatherInfo(t *testing.T) { var acc testutil.Accumulator d := Docker{ Log: testutil.Logger{}, - newClient: func(string, *tls.Config) (Client, error) { return &baseClient, nil }, + newClient: func(string, *tls.Config) (dockerClient, error) { return &baseClient, nil }, TagEnvironment: []string{"ENVVAR1", "ENVVAR2", "ENVVAR3", "ENVVAR5", "ENVVAR6", "ENVVAR7", "ENVVAR8", "ENVVAR9"}, PerDeviceInclude: []string{"cpu", "network", "blkio"}, @@ -1083,7 +1083,7 @@ func TestDockerGatherSwarmInfo(t *testing.T) { var acc testutil.Accumulator d := Docker{ Log: testutil.Logger{}, - newClient: func(string, *tls.Config) (Client, error) { return &baseClient, nil }, + newClient: func(string, *tls.Config) (dockerClient, error) { return &baseClient, nil }, } err := acc.GatherError(d.Gather) @@ -1174,7 +1174,7 @@ func TestContainerStateFilter(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var acc testutil.Accumulator - newClientFunc := func(string, *tls.Config) (Client, error) { + newClientFunc := func(string, *tls.Config) (dockerClient, error) { client := baseClient client.ContainerListF = func(options container.ListOptions) ([]types.Container, error) { for k, v := range tt.expected { @@ -1205,12 +1205,12 @@ func TestContainerStateFilter(t *testing.T) { func TestContainerName(t *testing.T) { tests := []struct { name string - clientFunc func(host string, tlsConfig *tls.Config) (Client, error) + clientFunc func(host string, tlsConfig *tls.Config) (dockerClient, error) expected string }{ { name: "container stats name is preferred", - clientFunc: func(string, *tls.Config) (Client, error) { + clientFunc: func(string, *tls.Config) (dockerClient, error) { client := baseClient client.ContainerListF = func(container.ListOptions) ([]types.Container, error) { var containers []types.Container @@ -1230,7 +1230,7 @@ func TestContainerName(t *testing.T) { }, { name: "container stats without name uses container list name", - clientFunc: func(string, *tls.Config) (Client, error) { + clientFunc: func(string, *tls.Config) (dockerClient, error) { client := baseClient client.ContainerListF = func(container.ListOptions) ([]types.Container, error) { var containers []types.Container @@ -1444,7 +1444,7 @@ func Test_parseContainerStatsPerDeviceAndTotal(t *testing.T) { } d.parseContainerStats(tt.args.stat, &acc, tt.args.tags, tt.args.id, tt.args.daemonOSType) - actual := FilterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool { + actual := filterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool { return choice.Contains(m.Name(), []string{"docker_container_cpu", "docker_container_net", "docker_container_blkio"}) }) @@ -1547,7 +1547,7 @@ func TestDockerGatherDiskUsage(t *testing.T) { var acc testutil.Accumulator d := Docker{ Log: testutil.Logger{}, - newClient: func(string, *tls.Config) (Client, error) { return &baseClient, nil }, + newClient: func(string, *tls.Config) (dockerClient, error) { return &baseClient, nil }, } require.NoError(t, acc.GatherError(d.Gather)) diff --git a/plugins/inputs/docker/docker_testdata.go b/plugins/inputs/docker/docker_testdata.go index 08510224a..11027cf00 100644 --- a/plugins/inputs/docker/docker_testdata.go +++ b/plugins/inputs/docker/docker_testdata.go @@ -171,7 +171,7 @@ var containerList = []types.Container{ } var two = uint64(2) -var ServiceList = []swarm.Service{ +var serviceList = []swarm.Service{ { ID: "qolkls9g5iasdiuihcyz9rnx2", Spec: swarm.ServiceSpec{ @@ -198,7 +198,7 @@ var ServiceList = []swarm.Service{ }, } -var TaskList = []swarm.Task{ +var taskList = []swarm.Task{ { ID: "kwh0lv7hwwbh", ServiceID: "qolkls9g5iasdiuihcyz9rnx2", @@ -228,7 +228,7 @@ var TaskList = []swarm.Task{ }, } -var NodeList = []swarm.Node{ +var nodeList = []swarm.Node{ { ID: "0cl4jturcyd1ks3fwpd010kor", Status: swarm.NodeStatus{ diff --git a/plugins/inputs/docker_log/client.go b/plugins/inputs/docker_log/client.go index 8d92f2d59..effc51d47 100644 --- a/plugins/inputs/docker_log/client.go +++ b/plugins/inputs/docker_log/client.go @@ -17,21 +17,21 @@ var ( defaultHeaders = map[string]string{"User-Agent": "engine-api-cli-1.0"} ) -type Client interface { +type dockerClient interface { ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) ContainerLogs(ctx context.Context, containerID string, options container.LogsOptions) (io.ReadCloser, error) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) } -func NewEnvClient() (Client, error) { +func newEnvClient() (dockerClient, error) { client, err := docker.NewClientWithOpts(docker.FromEnv) if err != nil { return nil, err } - return &SocketClient{client}, nil + return &socketClient{client}, nil } -func NewClient(host string, tlsConfig *tls.Config) (Client, error) { +func newClient(host string, tlsConfig *tls.Config) (dockerClient, error) { transport := &http.Transport{ TLSClientConfig: tlsConfig, } @@ -45,20 +45,20 @@ func NewClient(host string, tlsConfig *tls.Config) (Client, error) { if err != nil { return nil, err } - return &SocketClient{client}, nil + return &socketClient{client}, nil } -type SocketClient struct { +type socketClient struct { client *docker.Client } -func (c *SocketClient) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) { +func (c *socketClient) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) { return c.client.ContainerList(ctx, options) } -func (c *SocketClient) ContainerLogs(ctx context.Context, containerID string, options container.LogsOptions) (io.ReadCloser, error) { +func (c *socketClient) ContainerLogs(ctx context.Context, containerID string, options container.LogsOptions) (io.ReadCloser, error) { return c.client.ContainerLogs(ctx, containerID, options) } -func (c *SocketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { +func (c *socketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) { return c.client.ContainerInspect(ctx, containerID) } diff --git a/plugins/inputs/docker_log/docker_log.go b/plugins/inputs/docker_log/docker_log.go index dff33dd94..abc0e489e 100644 --- a/plugins/inputs/docker_log/docker_log.go +++ b/plugins/inputs/docker_log/docker_log.go @@ -31,14 +31,14 @@ import ( //go:embed sample.conf var sampleConfig string -const ( - defaultEndpoint = "unix:///var/run/docker.sock" +var ( + // ensure *DockerLogs implements telegraf.ServiceInput + _ telegraf.ServiceInput = (*DockerLogs)(nil) + containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"} ) -var ( - containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"} - // ensure *DockerLogs implements telegraf.ServiceInput - _ telegraf.ServiceInput = (*DockerLogs)(nil) +const ( + defaultEndpoint = "unix:///var/run/docker.sock" ) type DockerLogs struct { @@ -55,10 +55,10 @@ type DockerLogs struct { common_tls.ClientConfig - newEnvClient func() (Client, error) - newClient func(string, *tls.Config) (Client, error) + newEnvClient func() (dockerClient, error) + newClient func(string, *tls.Config) (dockerClient, error) - client Client + client dockerClient labelFilter filter.Filter containerFilter filter.Filter stateFilter filter.Filter @@ -127,7 +127,11 @@ func (d *DockerLogs) Init() error { return nil } -// State persistence interfaces +// Start is a noop which is required for a *DockerLogs to implement the telegraf.ServiceInput interface +func (d *DockerLogs) Start(telegraf.Accumulator) error { + return nil +} + func (d *DockerLogs) GetState() interface{} { d.lastRecordMtx.Lock() recordOffsets := make(map[string]time.Time, len(d.lastRecord)) @@ -153,6 +157,50 @@ func (d *DockerLogs) SetState(state interface{}) error { return nil } +func (d *DockerLogs) Gather(acc telegraf.Accumulator) error { + ctx := context.Background() + acc.SetPrecision(time.Nanosecond) + + ctx, cancel := context.WithTimeout(ctx, time.Duration(d.Timeout)) + defer cancel() + containers, err := d.client.ContainerList(ctx, d.opts) + if err != nil { + return err + } + + for _, cntnr := range containers { + if d.containerInContainerList(cntnr.ID) { + continue + } + + containerName := d.matchedContainerName(cntnr.Names) + if containerName == "" { + continue + } + + ctx, cancel := context.WithCancel(context.Background()) + d.addToContainerList(cntnr.ID, cancel) + + // Start a new goroutine for every new container that has logs to collect + d.wg.Add(1) + go func(container types.Container) { + defer d.wg.Done() + defer d.removeFromContainerList(container.ID) + + err = d.tailContainerLogs(ctx, acc, container, containerName) + if err != nil && !errors.Is(err, context.Canceled) { + acc.AddError(err) + } + }(cntnr) + } + return nil +} + +func (d *DockerLogs) Stop() { + d.cancelTails() + d.wg.Wait() +} + func (d *DockerLogs) addToContainerList(containerID string, cancel context.CancelFunc) { d.mu.Lock() defer d.mu.Unlock() @@ -195,45 +243,6 @@ func (d *DockerLogs) matchedContainerName(names []string) string { return "" } -func (d *DockerLogs) Gather(acc telegraf.Accumulator) error { - ctx := context.Background() - acc.SetPrecision(time.Nanosecond) - - ctx, cancel := context.WithTimeout(ctx, time.Duration(d.Timeout)) - defer cancel() - containers, err := d.client.ContainerList(ctx, d.opts) - if err != nil { - return err - } - - for _, cntnr := range containers { - if d.containerInContainerList(cntnr.ID) { - continue - } - - containerName := d.matchedContainerName(cntnr.Names) - if containerName == "" { - continue - } - - ctx, cancel := context.WithCancel(context.Background()) - d.addToContainerList(cntnr.ID, cancel) - - // Start a new goroutine for every new container that has logs to collect - d.wg.Add(1) - go func(container types.Container) { - defer d.wg.Done() - defer d.removeFromContainerList(container.ID) - - err = d.tailContainerLogs(ctx, acc, container, containerName) - if err != nil && !errors.Is(err, context.Canceled) { - acc.AddError(err) - } - }(cntnr) - } - return nil -} - func (d *DockerLogs) hasTTY(ctx context.Context, cntnr types.Container) (bool, error) { ctx, cancel := context.WithTimeout(ctx, time.Duration(d.Timeout)) defer cancel() @@ -439,17 +448,6 @@ func tailMultiplexed( return tsStderr, nil } -// Start is a noop which is required for a *DockerLogs to implement -// the telegraf.ServiceInput interface -func (d *DockerLogs) Start(telegraf.Accumulator) error { - return nil -} - -func (d *DockerLogs) Stop() { - d.cancelTails() - d.wg.Wait() -} - // Following few functions have been inherited from telegraf docker input plugin func (d *DockerLogs) createContainerFilters() error { containerFilter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude) @@ -481,21 +479,21 @@ func (d *DockerLogs) createContainerStateFilters() error { return nil } -func init() { - inputs.Add("docker_log", func() telegraf.Input { - return &DockerLogs{ - Timeout: config.Duration(time.Second * 5), - Endpoint: defaultEndpoint, - newEnvClient: NewEnvClient, - newClient: NewClient, - containerList: make(map[string]context.CancelFunc), - } - }) -} - func hostnameFromID(id string) string { if len(id) > 12 { return id[0:12] } return id } + +func init() { + inputs.Add("docker_log", func() telegraf.Input { + return &DockerLogs{ + Timeout: config.Duration(time.Second * 5), + Endpoint: defaultEndpoint, + newEnvClient: newEnvClient, + newClient: newClient, + containerList: make(map[string]context.CancelFunc), + } + }) +} diff --git a/plugins/inputs/docker_log/docker_log_test.go b/plugins/inputs/docker_log/docker_log_test.go index 5d4c026af..95dedd43b 100644 --- a/plugins/inputs/docker_log/docker_log_test.go +++ b/plugins/inputs/docker_log/docker_log_test.go @@ -18,33 +18,33 @@ import ( "github.com/influxdata/telegraf/testutil" ) -type MockClient struct { +type mockClient struct { ContainerListF func() ([]types.Container, error) ContainerInspectF func() (types.ContainerJSON, error) ContainerLogsF func() (io.ReadCloser, error) } -func (c *MockClient) ContainerList(context.Context, container.ListOptions) ([]types.Container, error) { +func (c *mockClient) ContainerList(context.Context, container.ListOptions) ([]types.Container, error) { return c.ContainerListF() } -func (c *MockClient) ContainerInspect(context.Context, string) (types.ContainerJSON, error) { +func (c *mockClient) ContainerInspect(context.Context, string) (types.ContainerJSON, error) { return c.ContainerInspectF() } -func (c *MockClient) ContainerLogs(context.Context, string, container.LogsOptions) (io.ReadCloser, error) { +func (c *mockClient) ContainerLogs(context.Context, string, container.LogsOptions) (io.ReadCloser, error) { return c.ContainerLogsF() } -type Response struct { +type response struct { io.Reader } -func (r *Response) Close() error { +func (r *response) Close() error { return nil } -func MustParse(layout, value string) time.Time { +func mustParse(layout, value string) time.Time { tm, err := time.Parse(layout, value) if err != nil { panic(err) @@ -55,12 +55,12 @@ func MustParse(layout, value string) time.Time { func Test(t *testing.T) { tests := []struct { name string - client *MockClient + client *mockClient expected []telegraf.Metric }{ { name: "no containers", - client: &MockClient{ + client: &mockClient{ ContainerListF: func() ([]types.Container, error) { return nil, nil }, @@ -68,7 +68,7 @@ func Test(t *testing.T) { }, { name: "one container tty", - client: &MockClient{ + client: &mockClient{ ContainerListF: func() ([]types.Container, error) { return []types.Container{ { @@ -86,7 +86,7 @@ func Test(t *testing.T) { }, nil }, ContainerLogsF: func() (io.ReadCloser, error) { - return &Response{Reader: bytes.NewBufferString("2020-04-28T18:43:16.432691200Z hello\n")}, nil + return &response{Reader: bytes.NewBufferString("2020-04-28T18:43:16.432691200Z hello\n")}, nil }, }, expected: []telegraf.Metric{ @@ -103,13 +103,13 @@ func Test(t *testing.T) { "container_id": "deadbeef", "message": "hello", }, - MustParse(time.RFC3339Nano, "2020-04-28T18:43:16.432691200Z"), + mustParse(time.RFC3339Nano, "2020-04-28T18:43:16.432691200Z"), ), }, }, { name: "one container multiplex", - client: &MockClient{ + client: &mockClient{ ContainerListF: func() ([]types.Container, error) { return []types.Container{ { @@ -130,7 +130,7 @@ func Test(t *testing.T) { var buf bytes.Buffer w := stdcopy.NewStdWriter(&buf, stdcopy.Stdout) _, err := w.Write([]byte("2020-04-28T18:42:16.432691200Z hello from stdout")) - return &Response{Reader: &buf}, err + return &response{Reader: &buf}, err }, }, expected: []telegraf.Metric{ @@ -147,7 +147,7 @@ func Test(t *testing.T) { "container_id": "deadbeef", "message": "hello from stdout", }, - MustParse(time.RFC3339Nano, "2020-04-28T18:42:16.432691200Z"), + mustParse(time.RFC3339Nano, "2020-04-28T18:42:16.432691200Z"), ), }, }, @@ -157,7 +157,7 @@ func Test(t *testing.T) { var acc testutil.Accumulator plugin := &DockerLogs{ Timeout: config.Duration(time.Second * 5), - newClient: func(string, *tls.Config) (Client, error) { return tt.client, nil }, + newClient: func(string, *tls.Config) (dockerClient, error) { return tt.client, nil }, containerList: make(map[string]context.CancelFunc), IncludeSourceTag: true, } diff --git a/plugins/inputs/dovecot/dovecot.go b/plugins/inputs/dovecot/dovecot.go index ac5ec22de..f15643275 100644 --- a/plugins/inputs/dovecot/dovecot.go +++ b/plugins/inputs/dovecot/dovecot.go @@ -20,23 +20,23 @@ import ( //go:embed sample.conf var sampleConfig string +var ( + defaultTimeout = time.Second * time.Duration(5) + validQuery = map[string]bool{ + "user": true, "domain": true, "global": true, "ip": true, + } +) + type Dovecot struct { - Type string - Filters []string - Servers []string -} - -var defaultTimeout = time.Second * time.Duration(5) - -var validQuery = map[string]bool{ - "user": true, "domain": true, "global": true, "ip": true, + Type string `toml:"type"` + Filters []string `toml:"filters"` + Servers []string `toml:"servers"` } func (*Dovecot) SampleConfig() string { return sampleConfig } -// Reads stats from all configured servers. func (d *Dovecot) Gather(acc telegraf.Accumulator) error { if !validQuery[d.Type] { return fmt.Errorf("error: %s is not a valid query type", d.Type) diff --git a/plugins/inputs/dpdk/dpdk.go b/plugins/inputs/dpdk/dpdk.go index 2007b512f..079640714 100644 --- a/plugins/inputs/dpdk/dpdk.go +++ b/plugins/inputs/dpdk/dpdk.go @@ -40,7 +40,7 @@ const ( unreachableSocketBehaviorError = "error" ) -type dpdk struct { +type Dpdk struct { SocketPath string `toml:"socket_path"` AccessTimeout config.Duration `toml:"socket_access_timeout"` DeviceTypes []string `toml:"device_types"` @@ -62,12 +62,12 @@ type ethdevConfig struct { EthdevExcludeCommands []string `toml:"exclude_commands"` } -func (*dpdk) SampleConfig() string { +func (*Dpdk) SampleConfig() string { return sampleConfig } // Init performs validation of all parameters from configuration -func (dpdk *dpdk) Init() error { +func (dpdk *Dpdk) Init() error { dpdk.setupDefaultValues() err := dpdk.validateAdditionalCommands() @@ -101,22 +101,13 @@ func (dpdk *dpdk) Init() error { } // Start implements ServiceInput interface -func (dpdk *dpdk) Start(telegraf.Accumulator) error { +func (dpdk *Dpdk) Start(telegraf.Accumulator) error { return dpdk.maintainConnections() } -func (dpdk *dpdk) Stop() { - for _, connector := range dpdk.connectors { - if err := connector.tryClose(); err != nil { - dpdk.Log.Warnf("Couldn't close connection for %q: %v", connector.pathToSocket, err) - } - } - dpdk.connectors = nil -} - // Gather function gathers all unique commands and processes each command sequentially // Parallel processing could be achieved by running several instances of this plugin with different settings -func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error { +func (dpdk *Dpdk) Gather(acc telegraf.Accumulator) error { if err := dpdk.Start(acc); err != nil { return err } @@ -130,8 +121,17 @@ func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error { return nil } +func (dpdk *Dpdk) Stop() { + for _, connector := range dpdk.connectors { + if err := connector.tryClose(); err != nil { + dpdk.Log.Warnf("Couldn't close connection for %q: %v", connector.pathToSocket, err) + } + } + dpdk.connectors = nil +} + // Setup default values for dpdk -func (dpdk *dpdk) setupDefaultValues() { +func (dpdk *Dpdk) setupDefaultValues() { if dpdk.SocketPath == "" { dpdk.SocketPath = defaultPathToSocket } @@ -156,7 +156,7 @@ func (dpdk *dpdk) setupDefaultValues() { dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats", "/ethdev/info", ethdevLinkStatusCommand} } -func (dpdk *dpdk) getDpdkInMemorySocketPaths() []string { +func (dpdk *Dpdk) getDpdkInMemorySocketPaths() []string { filePaths := dpdk.socketGlobPath.Match() var results []string @@ -175,7 +175,7 @@ func (dpdk *dpdk) getDpdkInMemorySocketPaths() []string { } // Checks that user-supplied commands are unique and match DPDK commands format -func (dpdk *dpdk) validateAdditionalCommands() error { +func (dpdk *Dpdk) validateAdditionalCommands() error { dpdk.AdditionalCommands = uniqueValues(dpdk.AdditionalCommands) for _, cmd := range dpdk.AdditionalCommands { @@ -200,7 +200,7 @@ func (dpdk *dpdk) validateAdditionalCommands() error { } // Establishes connections do DPDK telemetry sockets -func (dpdk *dpdk) maintainConnections() error { +func (dpdk *Dpdk) maintainConnections() error { candidates := []string{dpdk.SocketPath} if choice.Contains(dpdkPluginOptionInMemory, dpdk.PluginOptions) { candidates = dpdk.getDpdkInMemorySocketPaths() @@ -259,7 +259,7 @@ func (dpdk *dpdk) maintainConnections() error { } // Gathers all unique commands -func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator, dpdkConnector *dpdkConnector) []string { +func (dpdk *Dpdk) gatherCommands(acc telegraf.Accumulator, dpdkConnector *dpdkConnector) []string { var commands []string if choice.Contains("ethdev", dpdk.DeviceTypes) { ethdevCommands := removeSubset(dpdk.ethdevCommands, dpdk.ethdevExcludedCommandsFilter) @@ -284,7 +284,7 @@ func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator, dpdkConnector *dpdkCo func init() { inputs.Add(pluginName, func() telegraf.Input { - dpdk := &dpdk{ + dpdk := &Dpdk{ // Setting it here (rather than in `Init()`) to distinguish between "zero" value, // default value and don't having value in config at all. AccessTimeout: defaultAccessTimeout, diff --git a/plugins/inputs/dpdk/dpdk_cmds.go b/plugins/inputs/dpdk/dpdk_cmds.go index 857db89d8..386e2a0db 100644 --- a/plugins/inputs/dpdk/dpdk_cmds.go +++ b/plugins/inputs/dpdk/dpdk_cmds.go @@ -10,8 +10,8 @@ import ( type linkStatus int64 const ( - DOWN linkStatus = iota - UP + down linkStatus = iota + up ) const ( @@ -22,8 +22,8 @@ const ( var ( linkStatusMap = map[string]linkStatus{ - "down": DOWN, - "up": UP, + "down": down, + "up": up, } ) diff --git a/plugins/inputs/dpdk/dpdk_connector_test.go b/plugins/inputs/dpdk/dpdk_connector_test.go index 3782a9014..3bc570cea 100644 --- a/plugins/inputs/dpdk/dpdk_connector_test.go +++ b/plugins/inputs/dpdk/dpdk_connector_test.go @@ -92,7 +92,7 @@ func Test_readMaxOutputLen(t *testing.T) { func Test_connect(t *testing.T) { t.Run("should pass if PathToSocket points to socket", func(t *testing.T) { pathToSocket, socket := createSocketForTest(t, "") - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: pathToSocket, connectors: []*dpdkConnector{newDpdkConnector(pathToSocket, 0)}, } diff --git a/plugins/inputs/dpdk/dpdk_test.go b/plugins/inputs/dpdk/dpdk_test.go index 60d65bd7d..0781cc862 100644 --- a/plugins/inputs/dpdk/dpdk_test.go +++ b/plugins/inputs/dpdk/dpdk_test.go @@ -26,7 +26,7 @@ import ( func Test_Init(t *testing.T) { t.Run("when SocketPath field isn't set then it should be set to default value", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ Log: testutil.Logger{}, SocketPath: "", } @@ -39,7 +39,7 @@ func Test_Init(t *testing.T) { }) t.Run("when Metadata Fields isn't set then it should be set to default value (dpdk_pid)", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ Log: testutil.Logger{}, } require.Nil(t, dpdk.MetadataFields) @@ -49,7 +49,7 @@ func Test_Init(t *testing.T) { }) t.Run("when PluginOptions field isn't set then it should be set to default value (in_memory)", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ Log: testutil.Logger{}, } require.Nil(t, dpdk.PluginOptions) @@ -60,7 +60,7 @@ func Test_Init(t *testing.T) { t.Run("when commands are in invalid format (doesn't start with '/') then error should be returned", func(t *testing.T) { pathToSocket, _ := createSocketForTest(t, "") - dpdk := dpdk{ + dpdk := Dpdk{ Log: testutil.Logger{}, SocketPath: pathToSocket, AdditionalCommands: []string{"invalid"}, @@ -73,7 +73,7 @@ func Test_Init(t *testing.T) { }) t.Run("when AccessTime is < 0 then error should be returned", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ Log: testutil.Logger{}, AccessTimeout: -1, } @@ -85,7 +85,7 @@ func Test_Init(t *testing.T) { t.Run("when device_types and additional_commands are empty, then error should be returned", func(t *testing.T) { pathToSocket, _ := createSocketForTest(t, "") - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: pathToSocket, DeviceTypes: []string{}, AdditionalCommands: []string{}, @@ -99,7 +99,7 @@ func Test_Init(t *testing.T) { }) t.Run("when UnreachableSocketBehavior specified with unknown value - err should be returned", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ DeviceTypes: []string{"ethdev"}, Log: testutil.Logger{}, UnreachableSocketBehavior: "whatisthat", @@ -113,7 +113,7 @@ func Test_Init(t *testing.T) { func Test_Start(t *testing.T) { t.Run("when socket doesn't exist err should be returned", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ DeviceTypes: []string{"ethdev"}, Log: testutil.Logger{}, } @@ -126,7 +126,7 @@ func Test_Start(t *testing.T) { }) t.Run("when socket doesn't exist, but UnreachableSocketBehavior is Ignore err shouldn't be returned", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ DeviceTypes: []string{"ethdev"}, Log: testutil.Logger{}, UnreachableSocketBehavior: unreachableSocketBehaviorIgnore, @@ -140,7 +140,7 @@ func Test_Start(t *testing.T) { t.Run("when all values are valid, then no error should be returned", func(t *testing.T) { pathToSocket, socket := createSocketForTest(t, "") - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: pathToSocket, DeviceTypes: []string{"ethdev"}, Log: testutil.Logger{}, @@ -157,7 +157,7 @@ func Test_Start(t *testing.T) { func TestMaintainConnections(t *testing.T) { t.Run("maintainConnections should return the error if socket doesn't exist", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: "/tmp/justrandompath", DeviceTypes: []string{"ethdev"}, Log: testutil.Logger{}, @@ -173,7 +173,7 @@ func TestMaintainConnections(t *testing.T) { }) t.Run("maintainConnections should return the error if socket not found with dpdkPluginOptionInMemory", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: defaultPathToSocket, Log: testutil.Logger{}, PluginOptions: []string{dpdkPluginOptionInMemory}, @@ -191,7 +191,7 @@ func TestMaintainConnections(t *testing.T) { t.Run("maintainConnections shouldn't return error with 1 socket", func(t *testing.T) { pathToSocket, socket := createSocketForTest(t, "") - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: pathToSocket, DeviceTypes: []string{"ethdev"}, Log: testutil.Logger{}, @@ -212,7 +212,7 @@ func TestMaintainConnections(t *testing.T) { pathToSockets, sockets := createMultipleSocketsForTest(t, numSockets, "") - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: pathToSockets[0], DeviceTypes: []string{"ethdev"}, Log: testutil.Logger{}, @@ -236,7 +236,7 @@ func TestMaintainConnections(t *testing.T) { t.Run("Test maintainConnections without dpdkPluginOptionInMemory option", func(t *testing.T) { pathToSocket, socket := createSocketForTest(t, "") - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: pathToSocket, DeviceTypes: []string{"ethdev"}, Log: testutil.Logger{}, @@ -256,7 +256,7 @@ func TestMaintainConnections(t *testing.T) { t.Run("Test maintainConnections with dpdkPluginOptionInMemory option", func(t *testing.T) { pathToSocket1, socket1 := createSocketForTest(t, "") go simulateSocketResponse(socket1, t) - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: pathToSocket1, DeviceTypes: []string{"ethdev"}, Log: testutil.Logger{}, @@ -297,7 +297,7 @@ func TestMaintainConnections(t *testing.T) { func TestClose(t *testing.T) { t.Run("Num of connections should be 0 after Stop func", func(t *testing.T) { pathToSocket, socket := createSocketForTest(t, "") - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: pathToSocket, DeviceTypes: []string{"ethdev"}, Log: testutil.Logger{}, @@ -317,7 +317,7 @@ func TestClose(t *testing.T) { func Test_validateAdditionalCommands(t *testing.T) { t.Run("when validating commands in correct format then no error should be returned", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ AdditionalCommands: []string{"/test", "/help"}, } @@ -327,7 +327,7 @@ func Test_validateAdditionalCommands(t *testing.T) { }) t.Run("when validating command that doesn't begin with slash then error should be returned", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ AdditionalCommands: []string{ "/test", "commandWithoutSlash", }, @@ -340,7 +340,7 @@ func Test_validateAdditionalCommands(t *testing.T) { }) t.Run("when validating long command (without parameters) then error should be returned", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ AdditionalCommands: []string{ "/test", "/" + strings.Repeat("a", maxCommandLength), }, @@ -353,7 +353,7 @@ func Test_validateAdditionalCommands(t *testing.T) { }) t.Run("when validating long command (with params) then error should be returned", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ AdditionalCommands: []string{ "/test", "/," + strings.Repeat("a", maxCommandLengthWithParams), }, @@ -366,7 +366,7 @@ func Test_validateAdditionalCommands(t *testing.T) { }) t.Run("when validating empty command then error should be returned", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ AdditionalCommands: []string{ "/test", "", }, @@ -379,7 +379,7 @@ func Test_validateAdditionalCommands(t *testing.T) { }) t.Run("when validating commands with duplicates then duplicates should be removed and no error should be returned", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ AdditionalCommands: []string{ "/test", "/test", }, @@ -393,9 +393,9 @@ func Test_validateAdditionalCommands(t *testing.T) { }) } -func prepareEnvironment() (*mocks.Conn, dpdk, *testutil.Accumulator) { +func prepareEnvironment() (*mocks.Conn, Dpdk, *testutil.Accumulator) { mockConnection := &mocks.Conn{} - dpdk := dpdk{ + dpdk := Dpdk{ connectors: []*dpdkConnector{{ connection: mockConnection, initMessage: &initMessage{ @@ -411,9 +411,9 @@ func prepareEnvironment() (*mocks.Conn, dpdk, *testutil.Accumulator) { return mockConnection, dpdk, mockAcc } -func prepareEnvironmentWithMultiSockets() ([]*mocks.Conn, dpdk, *testutil.Accumulator) { +func prepareEnvironmentWithMultiSockets() ([]*mocks.Conn, Dpdk, *testutil.Accumulator) { mockConnections := []*mocks.Conn{{}, {}} - dpdk := dpdk{ + dpdk := Dpdk{ connectors: []*dpdkConnector{ { connection: mockConnections[0], @@ -440,9 +440,9 @@ func prepareEnvironmentWithMultiSockets() ([]*mocks.Conn, dpdk, *testutil.Accumu return mockConnections, dpdk, mockAcc } -func prepareEnvironmentWithInitializedMessage(initMsg *initMessage) (*mocks.Conn, dpdk, *testutil.Accumulator) { +func prepareEnvironmentWithInitializedMessage(initMsg *initMessage) (*mocks.Conn, Dpdk, *testutil.Accumulator) { mockConnection := &mocks.Conn{} - dpdk := dpdk{ + dpdk := Dpdk{ connectors: []*dpdkConnector{{ connection: mockConnection, accessTimeout: 2 * time.Second, @@ -548,7 +548,7 @@ func Test_getDpdkInMemorySocketPaths(t *testing.T) { var err error t.Run("Should return nil if path doesn't exist", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: "/tmp/nothing-should-exist-here/test.socket", Log: testutil.Logger{}, } @@ -560,7 +560,7 @@ func Test_getDpdkInMemorySocketPaths(t *testing.T) { }) t.Run("Should return nil if can't read the dir", func(t *testing.T) { - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: "/root/no_access", Log: testutil.Logger{}, } @@ -574,7 +574,7 @@ func Test_getDpdkInMemorySocketPaths(t *testing.T) { t.Run("Should return one socket from socket path", func(t *testing.T) { socketPath, _ := createSocketForTest(t, "") - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: socketPath, Log: testutil.Logger{}, } @@ -589,7 +589,7 @@ func Test_getDpdkInMemorySocketPaths(t *testing.T) { t.Run("Should return 2 sockets from socket path", func(t *testing.T) { socketPaths, _ := createMultipleSocketsForTest(t, 2, "") - dpdk := dpdk{ + dpdk := Dpdk{ SocketPath: socketPaths[0], Log: testutil.Logger{}, } @@ -605,7 +605,7 @@ func Test_getDpdkInMemorySocketPaths(t *testing.T) { func Test_Gather(t *testing.T) { t.Run("Gather should return error, because socket weren't created", func(t *testing.T) { mockAcc := &testutil.Accumulator{} - dpdk := dpdk{ + dpdk := Dpdk{ Log: testutil.Logger{}, PluginOptions: []string{}, } @@ -619,7 +619,7 @@ func Test_Gather(t *testing.T) { t.Run("Gather shouldn't return error with UnreachableSocketBehavior: Ignore option, because socket weren't created", func(t *testing.T) { mockAcc := &testutil.Accumulator{} - dpdk := dpdk{ + dpdk := Dpdk{ Log: testutil.Logger{}, PluginOptions: []string{}, UnreachableSocketBehavior: unreachableSocketBehaviorIgnore, diff --git a/plugins/inputs/ecs/stats.go b/plugins/inputs/ecs/stats.go index fbc56f03f..042d6a262 100644 --- a/plugins/inputs/ecs/stats.go +++ b/plugins/inputs/ecs/stats.go @@ -8,7 +8,7 @@ import ( "github.com/docker/docker/api/types/container" "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/inputs/docker" + "github.com/influxdata/telegraf/plugins/common/docker" ) func parseContainerStats(c *Container, acc telegraf.Accumulator, tags map[string]string) {