chore: Fix linter findings for `revive:exported` in `plugins/inputs/d*` (#16016)

This commit is contained in:
Paweł Żak 2024-10-16 12:38:11 +02:00 committed by GitHub
parent ce0f4b0dc8
commit e8c19d9987
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 569 additions and 575 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/docker/docker/api/types/container"
)
// CalculateCPUPercentUnix calculate CPU usage (for Unix, in percentages)
func CalculateCPUPercentUnix(previousCPU, previousSystem uint64, v *container.StatsResponse) float64 {
var (
cpuPercent = 0.0
@ -25,7 +26,8 @@ func CalculateCPUPercentUnix(previousCPU, previousSystem uint64, v *container.St
return cpuPercent
}
func calculateCPUPercentWindows(v *container.StatsResponse) float64 {
// CalculateCPUPercentWindows calculate CPU usage (for Windows, in percentages)
func CalculateCPUPercentWindows(v *container.StatsResponse) float64 {
// Max number of 100ns intervals between the previous time read and now
possIntervals := uint64(v.Read.Sub(v.PreRead).Nanoseconds()) // Start with number of ns intervals
possIntervals /= 100 // Convert to number of 100ns intervals
@ -66,6 +68,7 @@ func CalculateMemUsageUnixNoCache(mem container.MemoryStats) float64 {
return float64(mem.Usage)
}
// CalculateMemPercentUnixNoCache calculate memory usage of the container, in percentages.
func CalculateMemPercentUnixNoCache(limit, usedNoCache float64) float64 {
// MemoryStats.Limit will never be 0 unless the container is not running and we haven't
// got any data from cgroup

View File

@ -18,23 +18,23 @@ const (
loginDuration = 65 * time.Minute
)
// Client is an interface for communicating with the DC/OS API.
type Client interface {
SetToken(token string)
// client is an interface for communicating with the DC/OS API.
type client interface {
setToken(token string)
Login(ctx context.Context, sa *ServiceAccount) (*authToken, error)
GetSummary(ctx context.Context) (*summary, error)
GetContainers(ctx context.Context, node string) ([]container, error)
GetNodeMetrics(ctx context.Context, node string) (*metrics, error)
GetContainerMetrics(ctx context.Context, node, container string) (*metrics, error)
GetAppMetrics(ctx context.Context, node, container string) (*metrics, error)
login(ctx context.Context, sa *serviceAccount) (*authToken, error)
getSummary(ctx context.Context) (*summary, error)
getContainers(ctx context.Context, node string) ([]container, error)
getNodeMetrics(ctx context.Context, node string) (*metrics, error)
getContainerMetrics(ctx context.Context, node, container string) (*metrics, error)
getAppMetrics(ctx context.Context, node, container string) (*metrics, error)
}
type apiError struct {
URL string
StatusCode int
Title string
Description string
url string
statusCode int
title string
description string
}
// login is request data for logging in.
@ -90,7 +90,7 @@ type authToken struct {
Expire time.Time
}
// clusterClient is a Client that uses the cluster URL.
// clusterClient is a client that uses the cluster URL.
type clusterClient struct {
clusterURL *url.URL
httpClient *http.Client
@ -104,13 +104,13 @@ type claims struct {
}
func (e apiError) Error() string {
if e.Description != "" {
return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description)
if e.description != "" {
return fmt.Sprintf("[%s] %s: %s", e.url, e.title, e.description)
}
return fmt.Sprintf("[%s] %s", e.URL, e.Title)
return fmt.Sprintf("[%s] %s", e.url, e.title)
}
func NewClusterClient(clusterURL *url.URL, timeout time.Duration, maxConns int, tlsConfig *tls.Config) *clusterClient {
func newClusterClient(clusterURL *url.URL, timeout time.Duration, maxConns int, tlsConfig *tls.Config) *clusterClient {
httpClient := &http.Client{
Transport: &http.Transport{
MaxIdleConns: maxConns,
@ -128,11 +128,11 @@ func NewClusterClient(clusterURL *url.URL, timeout time.Duration, maxConns int,
return c
}
func (c *clusterClient) SetToken(token string) {
func (c *clusterClient) setToken(token string) {
c.token = token
}
func (c *clusterClient) Login(ctx context.Context, sa *ServiceAccount) (*authToken, error) {
func (c *clusterClient) login(ctx context.Context, sa *serviceAccount) (*authToken, error) {
token, err := c.createLoginToken(sa)
if err != nil {
return nil, err
@ -141,7 +141,7 @@ func (c *clusterClient) Login(ctx context.Context, sa *ServiceAccount) (*authTok
exp := time.Now().Add(loginDuration)
body := &login{
UID: sa.AccountID,
UID: sa.accountID,
Exp: exp.Unix(),
Token: token,
}
@ -185,23 +185,23 @@ func (c *clusterClient) Login(ctx context.Context, sa *ServiceAccount) (*authTok
err = dec.Decode(loginError)
if err != nil {
err := &apiError{
URL: loc,
StatusCode: resp.StatusCode,
Title: resp.Status,
url: loc,
statusCode: resp.StatusCode,
title: resp.Status,
}
return nil, err
}
err = &apiError{
URL: loc,
StatusCode: resp.StatusCode,
Title: loginError.Title,
Description: loginError.Description,
url: loc,
statusCode: resp.StatusCode,
title: loginError.Title,
description: loginError.Description,
}
return nil, err
}
func (c *clusterClient) GetSummary(ctx context.Context) (*summary, error) {
func (c *clusterClient) getSummary(ctx context.Context) (*summary, error) {
summary := &summary{}
err := c.doGet(ctx, c.toURL("/mesos/master/state-summary"), summary)
if err != nil {
@ -211,7 +211,7 @@ func (c *clusterClient) GetSummary(ctx context.Context) (*summary, error) {
return summary, nil
}
func (c *clusterClient) GetContainers(ctx context.Context, node string) ([]container, error) {
func (c *clusterClient) getContainers(ctx context.Context, node string) ([]container, error) {
list := []string{}
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers", node)
@ -239,17 +239,17 @@ func (c *clusterClient) getMetrics(ctx context.Context, address string) (*metric
return metrics, nil
}
func (c *clusterClient) GetNodeMetrics(ctx context.Context, node string) (*metrics, error) {
func (c *clusterClient) getNodeMetrics(ctx context.Context, node string) (*metrics, error) {
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node)
return c.getMetrics(ctx, c.toURL(path))
}
func (c *clusterClient) GetContainerMetrics(ctx context.Context, node, container string) (*metrics, error) {
func (c *clusterClient) getContainerMetrics(ctx context.Context, node, container string) (*metrics, error) {
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s", node, container)
return c.getMetrics(ctx, c.toURL(path))
}
func (c *clusterClient) GetAppMetrics(ctx context.Context, node, container string) (*metrics, error) {
func (c *clusterClient) getAppMetrics(ctx context.Context, node, container string) (*metrics, error) {
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s/app", node, container)
return c.getMetrics(ctx, c.toURL(path))
}
@ -298,9 +298,9 @@ func (c *clusterClient) doGet(ctx context.Context, address string, v interface{}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return &apiError{
URL: address,
StatusCode: resp.StatusCode,
Title: resp.Status,
url: address,
statusCode: resp.StatusCode,
title: resp.Status,
}
}
@ -318,13 +318,13 @@ func (c *clusterClient) toURL(path string) string {
return clusterURL.String()
}
func (c *clusterClient) createLoginToken(sa *ServiceAccount) (string, error) {
func (c *clusterClient) createLoginToken(sa *serviceAccount) (string, error) {
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims{
UID: sa.AccountID,
UID: sa.accountID,
RegisteredClaims: jwt.RegisteredClaims{
// How long we have to login with this token
ExpiresAt: jwt.NewNumericDate(time.Now().Add(time.Minute * 5)),
},
})
return token.SignedString(sa.PrivateKey)
return token.SignedString(sa.privateKey)
}

View File

@ -39,10 +39,10 @@ func TestLogin(t *testing.T) {
responseCode: http.StatusUnauthorized,
responseBody: `{"title": "x", "description": "y"}`,
expectedError: &apiError{
URL: ts.URL + "/acs/api/v1/auth/login",
StatusCode: http.StatusUnauthorized,
Title: "x",
Description: "y",
url: ts.URL + "/acs/api/v1/auth/login",
statusCode: http.StatusUnauthorized,
title: "x",
description: "y",
},
expectedToken: "",
},
@ -62,12 +62,12 @@ func TestLogin(t *testing.T) {
require.NoError(t, err)
ctx := context.Background()
sa := &ServiceAccount{
AccountID: "telegraf",
PrivateKey: key,
sa := &serviceAccount{
accountID: "telegraf",
privateKey: key,
}
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
auth, err := client.Login(ctx, sa)
client := newClusterClient(u, defaultResponseTimeout, 1, nil)
auth, err := client.login(ctx, sa)
require.Equal(t, tt.expectedError, err)
@ -104,9 +104,9 @@ func TestGetSummary(t *testing.T) {
responseBody: `<html></html>`,
expectedValue: nil,
expectedError: &apiError{
URL: ts.URL + "/mesos/master/state-summary",
StatusCode: http.StatusUnauthorized,
Title: "401 Unauthorized",
url: ts.URL + "/mesos/master/state-summary",
statusCode: http.StatusUnauthorized,
title: "401 Unauthorized",
},
},
{
@ -136,8 +136,8 @@ func TestGetSummary(t *testing.T) {
require.NoError(t, err)
ctx := context.Background()
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
summary, err := client.GetSummary(ctx)
client := newClusterClient(u, defaultResponseTimeout, 1, nil)
summary, err := client.getSummary(ctx)
require.Equal(t, tt.expectedError, err)
require.Equal(t, tt.expectedValue, summary)
@ -177,8 +177,8 @@ func TestGetNodeMetrics(t *testing.T) {
require.NoError(t, err)
ctx := context.Background()
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
m, err := client.GetNodeMetrics(ctx, "foo")
client := newClusterClient(u, defaultResponseTimeout, 1, nil)
m, err := client.getNodeMetrics(ctx, "foo")
require.Equal(t, tt.expectedError, err)
require.Equal(t, tt.expectedValue, m)
@ -218,8 +218,8 @@ func TestGetContainerMetrics(t *testing.T) {
require.NoError(t, err)
ctx := context.Background()
client := NewClusterClient(u, defaultResponseTimeout, 1, nil)
m, err := client.GetContainerMetrics(ctx, "foo", "bar")
client := newClusterClient(u, defaultResponseTimeout, 1, nil)
m, err := client.getContainerMetrics(ctx, "foo", "bar")
require.Equal(t, tt.expectedError, err)
require.Equal(t, tt.expectedValue, m)

View File

@ -15,27 +15,27 @@ const (
relogDuration = 5 * time.Minute
)
type Credentials interface {
Token(ctx context.Context, client Client) (string, error)
IsExpired() bool
type credentials interface {
token(ctx context.Context, client client) (string, error)
isExpired() bool
}
type ServiceAccount struct {
AccountID string
PrivateKey *rsa.PrivateKey
type serviceAccount struct {
accountID string
privateKey *rsa.PrivateKey
auth *authToken
}
type TokenCreds struct {
type tokenCreds struct {
Path string
}
type NullCreds struct {
type nullCreds struct {
}
func (c *ServiceAccount) Token(ctx context.Context, client Client) (string, error) {
auth, err := client.Login(ctx, c)
func (c *serviceAccount) token(ctx context.Context, client client) (string, error) {
auth, err := client.login(ctx, c)
if err != nil {
return "", err
}
@ -43,11 +43,11 @@ func (c *ServiceAccount) Token(ctx context.Context, client Client) (string, erro
return auth.Text, nil
}
func (c *ServiceAccount) IsExpired() bool {
func (c *serviceAccount) isExpired() bool {
return c.auth.Text != "" || c.auth.Expire.Add(relogDuration).After(time.Now())
}
func (c *TokenCreds) Token(_ context.Context, _ Client) (string, error) {
func (c *tokenCreds) token(_ context.Context, _ client) (string, error) {
octets, err := os.ReadFile(c.Path)
if err != nil {
return "", fmt.Errorf("error reading token file %q: %w", c.Path, err)
@ -59,14 +59,14 @@ func (c *TokenCreds) Token(_ context.Context, _ Client) (string, error) {
return token, nil
}
func (c *TokenCreds) IsExpired() bool {
func (c *tokenCreds) isExpired() bool {
return true
}
func (c *NullCreds) Token(_ context.Context, _ Client) (string, error) {
func (c *nullCreds) token(_ context.Context, _ client) (string, error) {
return "", nil
}
func (c *NullCreds) IsExpired() bool {
func (c *nullCreds) isExpired() bool {
return true
}

View File

@ -24,11 +24,6 @@ import (
//go:embed sample.conf
var sampleConfig string
const (
defaultMaxConnections = 10
defaultResponseTimeout = 20 * time.Second
)
var (
nodeDimensions = []string{
"hostname",
@ -47,27 +42,32 @@ var (
}
)
const (
defaultMaxConnections = 10
defaultResponseTimeout = 20 * time.Second
)
type DCOS struct {
ClusterURL string `toml:"cluster_url"`
ServiceAccountID string `toml:"service_account_id"`
ServiceAccountPrivateKey string
ServiceAccountPrivateKey string `toml:"service_account_private_key"`
TokenFile string
TokenFile string `toml:"token_file"`
NodeInclude []string
NodeExclude []string
ContainerInclude []string
ContainerExclude []string
AppInclude []string
AppExclude []string
NodeInclude []string `toml:"node_include"`
NodeExclude []string `toml:"node_exclude"`
ContainerInclude []string `toml:"container_include"`
ContainerExclude []string `toml:"container_exclude"`
AppInclude []string `toml:"app_include"`
AppExclude []string `toml:"app_exclude"`
MaxConnections int
ResponseTimeout config.Duration
MaxConnections int `toml:"max_connections"`
ResponseTimeout config.Duration `toml:"response_timeout"`
tls.ClientConfig
client Client
creds Credentials
client client
creds credentials
initialized bool
nodeFilter filter.Filter
@ -75,25 +75,31 @@ type DCOS struct {
appFilter filter.Filter
}
type point struct {
tags map[string]string
labels map[string]string
fields map[string]interface{}
}
func (*DCOS) SampleConfig() string {
return sampleConfig
}
func (d *DCOS) Gather(acc telegraf.Accumulator) error {
err := d.init()
err := d.initialize()
if err != nil {
return err
}
ctx := context.Background()
token, err := d.creds.Token(ctx, d.client)
token, err := d.creds.token(ctx, d.client)
if err != nil {
return err
}
d.client.SetToken(token)
d.client.setToken(token)
summary, err := d.client.GetSummary(ctx)
summary, err := d.client.getSummary(ctx)
if err != nil {
return err
}
@ -103,7 +109,7 @@ func (d *DCOS) Gather(acc telegraf.Accumulator) error {
wg.Add(1)
go func(node string) {
defer wg.Done()
d.GatherNode(ctx, acc, summary.Cluster, node)
d.gatherNode(ctx, acc, summary.Cluster, node)
}(node.ID)
}
wg.Wait()
@ -111,7 +117,7 @@ func (d *DCOS) Gather(acc telegraf.Accumulator) error {
return nil
}
func (d *DCOS) GatherNode(ctx context.Context, acc telegraf.Accumulator, cluster, node string) {
func (d *DCOS) gatherNode(ctx context.Context, acc telegraf.Accumulator, cluster, node string) {
if !d.nodeFilter.Match(node) {
return
}
@ -120,7 +126,7 @@ func (d *DCOS) GatherNode(ctx context.Context, acc telegraf.Accumulator, cluster
wg.Add(1)
go func() {
defer wg.Done()
m, err := d.client.GetNodeMetrics(ctx, node)
m, err := d.client.getNodeMetrics(ctx, node)
if err != nil {
acc.AddError(err)
return
@ -128,12 +134,12 @@ func (d *DCOS) GatherNode(ctx context.Context, acc telegraf.Accumulator, cluster
d.addNodeMetrics(acc, cluster, m)
}()
d.GatherContainers(ctx, acc, cluster, node)
d.gatherContainers(ctx, acc, cluster, node)
wg.Wait()
}
func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, cluster, node string) {
containers, err := d.client.GetContainers(ctx, node)
func (d *DCOS) gatherContainers(ctx context.Context, acc telegraf.Accumulator, cluster, node string) {
containers, err := d.client.getContainers(ctx, node)
if err != nil {
acc.AddError(err)
return
@ -145,10 +151,10 @@ func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, c
wg.Add(1)
go func(container string) {
defer wg.Done()
m, err := d.client.GetContainerMetrics(ctx, node, container)
m, err := d.client.getContainerMetrics(ctx, node, container)
if err != nil {
var apiErr apiError
if errors.As(err, &apiErr) && apiErr.StatusCode == 404 {
if errors.As(err, &apiErr) && apiErr.statusCode == 404 {
return
}
acc.AddError(err)
@ -162,10 +168,10 @@ func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, c
wg.Add(1)
go func(container string) {
defer wg.Done()
m, err := d.client.GetAppMetrics(ctx, node, container)
m, err := d.client.getAppMetrics(ctx, node, container)
if err != nil {
var apiErr apiError
if errors.As(err, &apiErr) && apiErr.StatusCode == 404 {
if errors.As(err, &apiErr) && apiErr.statusCode == 404 {
return
}
acc.AddError(err)
@ -178,12 +184,6 @@ func (d *DCOS) GatherContainers(ctx context.Context, acc telegraf.Accumulator, c
wg.Wait()
}
type point struct {
tags map[string]string
labels map[string]string
fields map[string]interface{}
}
func (d *DCOS) createPoints(m *metrics) []*point {
points := make(map[string]*point)
for _, dp := range m.Datapoints {
@ -278,7 +278,7 @@ func (d *DCOS) addAppMetrics(acc telegraf.Accumulator, cluster string, m *metric
d.addMetrics(acc, cluster, "dcos_app", m, appDimensions)
}
func (d *DCOS) init() error {
func (d *DCOS) initialize() error {
if !d.initialized {
err := d.createFilters()
if err != nil {
@ -306,7 +306,7 @@ func (d *DCOS) init() error {
return nil
}
func (d *DCOS) createClient() (Client, error) {
func (d *DCOS) createClient() (client, error) {
tlsCfg, err := d.ClientConfig.TLSConfig()
if err != nil {
return nil, err
@ -317,7 +317,7 @@ func (d *DCOS) createClient() (Client, error) {
return nil, err
}
client := NewClusterClient(
client := newClusterClient(
address,
time.Duration(d.ResponseTimeout),
d.MaxConnections,
@ -327,7 +327,7 @@ func (d *DCOS) createClient() (Client, error) {
return client, nil
}
func (d *DCOS) createCredentials() (Credentials, error) {
func (d *DCOS) createCredentials() (credentials, error) {
if d.ServiceAccountID != "" && d.ServiceAccountPrivateKey != "" {
bs, err := os.ReadFile(d.ServiceAccountPrivateKey)
if err != nil {
@ -339,19 +339,19 @@ func (d *DCOS) createCredentials() (Credentials, error) {
return nil, err
}
creds := &ServiceAccount{
AccountID: d.ServiceAccountID,
PrivateKey: privateKey,
creds := &serviceAccount{
accountID: d.ServiceAccountID,
privateKey: privateKey,
}
return creds, nil
} else if d.TokenFile != "" {
creds := &TokenCreds{
creds := &tokenCreds{
Path: d.TokenFile,
}
return creds, nil
}
return &NullCreds{}, nil
return &nullCreds{}, nil
}
func (d *DCOS) createFilters() error {

View File

@ -12,7 +12,7 @@ import (
type mockClient struct {
SetTokenF func()
LoginF func(ctx context.Context, sa *ServiceAccount) (*authToken, error)
LoginF func(ctx context.Context, sa *serviceAccount) (*authToken, error)
GetSummaryF func() (*summary, error)
GetContainersF func() ([]container, error)
GetNodeMetricsF func() (*metrics, error)
@ -20,31 +20,31 @@ type mockClient struct {
GetAppMetricsF func(ctx context.Context, node, container string) (*metrics, error)
}
func (c *mockClient) SetToken(string) {
func (c *mockClient) setToken(string) {
c.SetTokenF()
}
func (c *mockClient) Login(ctx context.Context, sa *ServiceAccount) (*authToken, error) {
func (c *mockClient) login(ctx context.Context, sa *serviceAccount) (*authToken, error) {
return c.LoginF(ctx, sa)
}
func (c *mockClient) GetSummary(context.Context) (*summary, error) {
func (c *mockClient) getSummary(context.Context) (*summary, error) {
return c.GetSummaryF()
}
func (c *mockClient) GetContainers(context.Context, string) ([]container, error) {
func (c *mockClient) getContainers(context.Context, string) ([]container, error) {
return c.GetContainersF()
}
func (c *mockClient) GetNodeMetrics(context.Context, string) (*metrics, error) {
func (c *mockClient) getNodeMetrics(context.Context, string) (*metrics, error) {
return c.GetNodeMetricsF()
}
func (c *mockClient) GetContainerMetrics(ctx context.Context, node, container string) (*metrics, error) {
func (c *mockClient) getContainerMetrics(ctx context.Context, node, container string) (*metrics, error) {
return c.GetContainerMetricsF(ctx, node, container)
}
func (c *mockClient) GetAppMetrics(ctx context.Context, node, container string) (*metrics, error) {
func (c *mockClient) getAppMetrics(ctx context.Context, node, container string) (*metrics, error) {
return c.GetAppMetricsF(ctx, node, container)
}
@ -356,7 +356,7 @@ func TestGatherFilterNode(t *testing.T) {
name string
nodeInclude []string
nodeExclude []string
client Client
client client
check func(*testutil.Accumulator) []bool
}{
{

View File

@ -33,9 +33,7 @@ var sampleConfig string
var once sync.Once
var (
defaultFilesToMonitor = []string{}
defaultFilesToIgnore = []string{}
const (
defaultMaxBufferedMetrics = 10000
defaultDirectoryDurationThreshold = config.Duration(0 * time.Millisecond)
defaultFileQueueSize = 100000
@ -78,6 +76,94 @@ func (*DirectoryMonitor) SampleConfig() string {
return sampleConfig
}
func (monitor *DirectoryMonitor) SetParserFunc(fn telegraf.ParserFunc) {
monitor.parserFunc = fn
}
func (monitor *DirectoryMonitor) Init() error {
if monitor.Directory == "" || monitor.FinishedDirectory == "" {
return errors.New("missing one of the following required config options: directory, finished_directory")
}
if monitor.FileQueueSize <= 0 {
return errors.New("file queue size needs to be more than 0")
}
// Finished directory can be created if not exists for convenience.
if _, err := os.Stat(monitor.FinishedDirectory); os.IsNotExist(err) {
err = os.Mkdir(monitor.FinishedDirectory, 0750)
if err != nil {
return err
}
}
tags := map[string]string{
"directory": monitor.Directory,
}
monitor.filesDropped = selfstat.Register("directory_monitor", "files_dropped", map[string]string{})
monitor.filesDroppedDir = selfstat.Register("directory_monitor", "files_dropped_per_dir", tags)
monitor.filesProcessed = selfstat.Register("directory_monitor", "files_processed", map[string]string{})
monitor.filesProcessedDir = selfstat.Register("directory_monitor", "files_processed_per_dir", tags)
monitor.filesQueuedDir = selfstat.Register("directory_monitor", "files_queue_per_dir", tags)
// If an error directory should be used but has not been configured yet, create one ourselves.
if monitor.ErrorDirectory != "" {
if _, err := os.Stat(monitor.ErrorDirectory); os.IsNotExist(err) {
err := os.Mkdir(monitor.ErrorDirectory, 0750)
if err != nil {
return err
}
}
}
monitor.waitGroup = &sync.WaitGroup{}
monitor.sem = semaphore.NewWeighted(int64(monitor.MaxBufferedMetrics))
monitor.context, monitor.cancel = context.WithCancel(context.Background())
monitor.filesToProcess = make(chan string, monitor.FileQueueSize)
// Establish file matching / exclusion regexes.
for _, matcher := range monitor.FilesToMonitor {
regex, err := regexp.Compile(matcher)
if err != nil {
return err
}
monitor.fileRegexesToMatch = append(monitor.fileRegexesToMatch, regex)
}
for _, matcher := range monitor.FilesToIgnore {
regex, err := regexp.Compile(matcher)
if err != nil {
return err
}
monitor.fileRegexesToIgnore = append(monitor.fileRegexesToIgnore, regex)
}
if err := choice.Check(monitor.ParseMethod, []string{"line-by-line", "at-once"}); err != nil {
return fmt.Errorf("config option parse_method: %w", err)
}
return nil
}
func (monitor *DirectoryMonitor) Start(acc telegraf.Accumulator) error {
// Use tracking to determine when more metrics can be added without overflowing the outputs.
monitor.acc = acc.WithTracking(monitor.MaxBufferedMetrics)
go func() {
for range monitor.acc.Delivered() {
monitor.sem.Release(1)
}
}()
// Monitor the files channel and read what they receive.
monitor.waitGroup.Add(1)
go func() {
monitor.monitor()
monitor.waitGroup.Done()
}()
return nil
}
func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error {
processFile := func(path string) error {
// We've been cancelled via Stop().
@ -138,25 +224,6 @@ func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error {
return nil
}
func (monitor *DirectoryMonitor) Start(acc telegraf.Accumulator) error {
// Use tracking to determine when more metrics can be added without overflowing the outputs.
monitor.acc = acc.WithTracking(monitor.MaxBufferedMetrics)
go func() {
for range monitor.acc.Delivered() {
monitor.sem.Release(1)
}
}()
// Monitor the files channel and read what they receive.
monitor.waitGroup.Add(1)
go func() {
monitor.Monitor()
monitor.waitGroup.Done()
}()
return nil
}
func (monitor *DirectoryMonitor) Stop() {
// Before stopping, wrap up all file-reading routines.
monitor.cancel()
@ -165,7 +232,7 @@ func (monitor *DirectoryMonitor) Stop() {
monitor.waitGroup.Wait()
}
func (monitor *DirectoryMonitor) Monitor() {
func (monitor *DirectoryMonitor) monitor() {
for filePath := range monitor.filesToProcess {
if monitor.context.Err() != nil {
return
@ -400,80 +467,9 @@ func (monitor *DirectoryMonitor) isIgnoredFile(fileName string) bool {
return false
}
func (monitor *DirectoryMonitor) SetParserFunc(fn telegraf.ParserFunc) {
monitor.parserFunc = fn
}
func (monitor *DirectoryMonitor) Init() error {
if monitor.Directory == "" || monitor.FinishedDirectory == "" {
return errors.New("missing one of the following required config options: directory, finished_directory")
}
if monitor.FileQueueSize <= 0 {
return errors.New("file queue size needs to be more than 0")
}
// Finished directory can be created if not exists for convenience.
if _, err := os.Stat(monitor.FinishedDirectory); os.IsNotExist(err) {
err = os.Mkdir(monitor.FinishedDirectory, 0750)
if err != nil {
return err
}
}
tags := map[string]string{
"directory": monitor.Directory,
}
monitor.filesDropped = selfstat.Register("directory_monitor", "files_dropped", map[string]string{})
monitor.filesDroppedDir = selfstat.Register("directory_monitor", "files_dropped_per_dir", tags)
monitor.filesProcessed = selfstat.Register("directory_monitor", "files_processed", map[string]string{})
monitor.filesProcessedDir = selfstat.Register("directory_monitor", "files_processed_per_dir", tags)
monitor.filesQueuedDir = selfstat.Register("directory_monitor", "files_queue_per_dir", tags)
// If an error directory should be used but has not been configured yet, create one ourselves.
if monitor.ErrorDirectory != "" {
if _, err := os.Stat(monitor.ErrorDirectory); os.IsNotExist(err) {
err := os.Mkdir(monitor.ErrorDirectory, 0750)
if err != nil {
return err
}
}
}
monitor.waitGroup = &sync.WaitGroup{}
monitor.sem = semaphore.NewWeighted(int64(monitor.MaxBufferedMetrics))
monitor.context, monitor.cancel = context.WithCancel(context.Background())
monitor.filesToProcess = make(chan string, monitor.FileQueueSize)
// Establish file matching / exclusion regexes.
for _, matcher := range monitor.FilesToMonitor {
regex, err := regexp.Compile(matcher)
if err != nil {
return err
}
monitor.fileRegexesToMatch = append(monitor.fileRegexesToMatch, regex)
}
for _, matcher := range monitor.FilesToIgnore {
regex, err := regexp.Compile(matcher)
if err != nil {
return err
}
monitor.fileRegexesToIgnore = append(monitor.fileRegexesToIgnore, regex)
}
if err := choice.Check(monitor.ParseMethod, []string{"line-by-line", "at-once"}); err != nil {
return fmt.Errorf("config option parse_method: %w", err)
}
return nil
}
func init() {
inputs.Add("directory_monitor", func() telegraf.Input {
return &DirectoryMonitor{
FilesToMonitor: defaultFilesToMonitor,
FilesToIgnore: defaultFilesToIgnore,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
DirectoryDurationThreshold: defaultDirectoryDurationThreshold,
FileQueueSize: defaultFileQueueSize,

View File

@ -22,8 +22,6 @@ func TestCreator(t *testing.T) {
require.True(t, found)
expected := &DirectoryMonitor{
FilesToMonitor: defaultFilesToMonitor,
FilesToIgnore: defaultFilesToIgnore,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
DirectoryDurationThreshold: defaultDirectoryDurationThreshold,
FileQueueSize: defaultFileQueueSize,

View File

@ -6,16 +6,17 @@ import (
"fmt"
"strings"
"github.com/shirou/gopsutil/v4/disk"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/system"
"github.com/shirou/gopsutil/v4/disk"
)
//go:embed sample.conf
var sampleConfig string
type DiskStats struct {
type Disk struct {
MountPoints []string `toml:"mount_points"`
IgnoreFS []string `toml:"ignore_fs"`
IgnoreMountOpts []string `toml:"ignore_mount_opts"`
@ -24,11 +25,11 @@ type DiskStats struct {
ps system.PS
}
func (*DiskStats) SampleConfig() string {
func (*Disk) SampleConfig() string {
return sampleConfig
}
func (ds *DiskStats) Init() error {
func (ds *Disk) Init() error {
ps := system.NewSystemPS()
ps.Log = ds.Log
ds.ps = ps
@ -36,7 +37,7 @@ func (ds *DiskStats) Init() error {
return nil
}
func (ds *DiskStats) Gather(acc telegraf.Accumulator) error {
func (ds *Disk) Gather(acc telegraf.Accumulator) error {
disks, partitions, err := ds.ps.DiskUsage(ds.MountPoints, ds.IgnoreMountOpts, ds.IgnoreFS)
if err != nil {
return fmt.Errorf("error getting disk usage info: %w", err)
@ -48,12 +49,12 @@ func (ds *DiskStats) Gather(acc telegraf.Accumulator) error {
}
device := partitions[i].Device
mountOpts := MountOptions(partitions[i].Opts)
mountOpts := mountOptions(partitions[i].Opts)
tags := map[string]string{
"path": du.Path,
"device": strings.ReplaceAll(device, "/dev/", ""),
"fstype": du.Fstype,
"mode": mountOpts.Mode(),
"mode": mountOpts.mode(),
}
label, err := disk.Label(strings.TrimPrefix(device, "/dev/"))
@ -89,9 +90,9 @@ func (ds *DiskStats) Gather(acc telegraf.Accumulator) error {
return nil
}
type MountOptions []string
type mountOptions []string
func (opts MountOptions) Mode() string {
func (opts mountOptions) mode() string {
if opts.exists("rw") {
return "rw"
} else if opts.exists("ro") {
@ -100,7 +101,7 @@ func (opts MountOptions) Mode() string {
return "unknown"
}
func (opts MountOptions) exists(opt string) bool {
func (opts mountOptions) exists(opt string) bool {
for _, o := range opts {
if o == opt {
return true
@ -111,6 +112,6 @@ func (opts MountOptions) exists(opt string) bool {
func init() {
inputs.Add("disk", func() telegraf.Input {
return &DiskStats{}
return &Disk{}
})
}

View File

@ -18,10 +18,6 @@ import (
"github.com/influxdata/telegraf/testutil"
)
type MockFileInfo struct {
os.FileInfo
}
func TestDiskUsage(t *testing.T) {
mck := &mock.Mock{}
mps := system.MockPSDisk{SystemPS: &system.SystemPS{PSDiskDeps: &system.MockDiskUsage{Mock: mck}}, Mock: mck}
@ -89,7 +85,7 @@ func TestDiskUsage(t *testing.T) {
mps.On("PSDiskUsage", "/home").Return(&duAll[1], nil)
mps.On("PSDiskUsage", "/var/rootbind").Return(&duAll[2], nil)
err = (&DiskStats{ps: mps}).Gather(&acc)
err = (&Disk{ps: mps}).Gather(&acc)
require.NoError(t, err)
numDiskMetrics := acc.NFields()
@ -151,18 +147,18 @@ func TestDiskUsage(t *testing.T) {
// We expect 7 more DiskMetrics to show up with an explicit match on "/"
// and /home not matching the /dev in MountPoints
err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc)
err = (&Disk{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc)
require.NoError(t, err)
require.Equal(t, expectedAllDiskMetrics+8, acc.NFields())
// We should see all the diskpoints as MountPoints includes both
// /, /home, and /var/rootbind
err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/home", "/var/rootbind"}}).Gather(&acc)
err = (&Disk{ps: &mps, MountPoints: []string{"/", "/home", "/var/rootbind"}}).Gather(&acc)
require.NoError(t, err)
require.Equal(t, expectedAllDiskMetrics+8*4, acc.NFields())
// We should see all the mounts as MountPoints except the bind mound
err = (&DiskStats{ps: &mps, IgnoreMountOpts: []string{"bind"}}).Gather(&acc)
err = (&Disk{ps: &mps, IgnoreMountOpts: []string{"bind"}}).Gather(&acc)
require.NoError(t, err)
require.Equal(t, expectedAllDiskMetrics+8*6, acc.NFields())
}
@ -296,7 +292,7 @@ func TestDiskUsageHostMountPrefix(t *testing.T) {
mps.On("OSGetenv", "HOST_MOUNT_PREFIX").Return(tt.hostMountPrefix)
err = (&DiskStats{ps: mps}).Gather(&acc)
err = (&Disk{ps: mps}).Gather(&acc)
require.NoError(t, err)
acc.AssertContainsTaggedFields(t, "disk", tt.expectedFields, tt.expectedTags)
@ -426,7 +422,7 @@ func TestDiskStats(t *testing.T) {
mps.On("DiskUsage", []string{"/", "/home", "/var/rootbind"}, []string(nil), []string(nil)).Return(duAll, psAll, nil)
mps.On("DiskUsage", []string(nil), []string{"bind"}, []string(nil)).Return(duOptFiltered, psOptFiltered, nil)
err = (&DiskStats{ps: &mps}).Gather(&acc)
err = (&Disk{ps: &mps}).Gather(&acc)
require.NoError(t, err)
numDiskMetrics := acc.NFields()
@ -471,18 +467,18 @@ func TestDiskStats(t *testing.T) {
// We expect 7 more DiskMetrics to show up with an explicit match on "/"
// and /home and /var/rootbind not matching the /dev in MountPoints
err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc)
err = (&Disk{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc)
require.NoError(t, err)
require.Equal(t, expectedAllDiskMetrics+8, acc.NFields())
// We should see all the diskpoints as MountPoints includes both
// /, /home, and /var/rootbind
err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/home", "/var/rootbind"}}).Gather(&acc)
err = (&Disk{ps: &mps, MountPoints: []string{"/", "/home", "/var/rootbind"}}).Gather(&acc)
require.NoError(t, err)
require.Equal(t, expectedAllDiskMetrics+8*4, acc.NFields())
// We should see all the mounts as MountPoints except the bind mound
err = (&DiskStats{ps: &mps, IgnoreMountOpts: []string{"bind"}}).Gather(&acc)
err = (&Disk{ps: &mps, IgnoreMountOpts: []string{"bind"}}).Gather(&acc)
require.NoError(t, err)
require.Equal(t, expectedAllDiskMetrics+8*6, acc.NFields())
}
@ -654,7 +650,7 @@ func TestDiskUsageIssues(t *testing.T) {
// Setup the plugin and run the test
var acc testutil.Accumulator
plugin := &DiskStats{ps: &mps}
plugin := &Disk{ps: &mps}
require.NoError(t, plugin.Gather(&acc))
actual := acc.GetTelegrafMetrics()

View File

@ -23,11 +23,6 @@ var (
varRegex = regexp.MustCompile(`\$(?:\w+|\{\w+\})`)
)
// hasMeta reports whether s contains any special glob characters.
func hasMeta(s string) bool {
return strings.ContainsAny(s, "*?[")
}
type DiskIO struct {
Devices []string `toml:"devices"`
DeviceTags []string `toml:"device_tags"`
@ -151,6 +146,11 @@ func (d *DiskIO) Gather(acc telegraf.Accumulator) error {
return nil
}
// hasMeta reports whether s contains any special glob characters.
func hasMeta(s string) bool {
return strings.ContainsAny(s, "*?[")
}
func (d *DiskIO) diskName(devName string) (string, []string) {
di, err := d.diskInfo(devName)
devLinks := strings.Split(di["DEVLINKS"], " ")

View File

@ -20,52 +20,53 @@ import (
//go:embed sample.conf
var sampleConfig string
var (
defaultTimeout = 5 * time.Second
tracking = map[string]string{
"uptime_in_seconds": "uptime",
"connected_clients": "clients",
"blocked_clients": "blocked_clients",
"used_memory": "used_memory",
"used_memory_rss": "used_memory_rss",
"used_memory_peak": "used_memory_peak",
"total_connections_received": "total_connections_received",
"total_commands_processed": "total_commands_processed",
"instantaneous_ops_per_sec": "instantaneous_ops_per_sec",
"latest_fork_usec": "latest_fork_usec",
"mem_fragmentation_ratio": "mem_fragmentation_ratio",
"used_cpu_sys": "used_cpu_sys",
"used_cpu_user": "used_cpu_user",
"used_cpu_sys_children": "used_cpu_sys_children",
"used_cpu_user_children": "used_cpu_user_children",
"registered_jobs": "registered_jobs",
"registered_queues": "registered_queues",
}
errProtocol = errors.New("disque protocol error")
)
const (
defaultPort = "7711"
)
type Disque struct {
Servers []string
Servers []string `toml:"servers"`
c net.Conn
}
var defaultTimeout = 5 * time.Second
var Tracking = map[string]string{
"uptime_in_seconds": "uptime",
"connected_clients": "clients",
"blocked_clients": "blocked_clients",
"used_memory": "used_memory",
"used_memory_rss": "used_memory_rss",
"used_memory_peak": "used_memory_peak",
"total_connections_received": "total_connections_received",
"total_commands_processed": "total_commands_processed",
"instantaneous_ops_per_sec": "instantaneous_ops_per_sec",
"latest_fork_usec": "latest_fork_usec",
"mem_fragmentation_ratio": "mem_fragmentation_ratio",
"used_cpu_sys": "used_cpu_sys",
"used_cpu_user": "used_cpu_user",
"used_cpu_sys_children": "used_cpu_sys_children",
"used_cpu_user_children": "used_cpu_user_children",
"registered_jobs": "registered_jobs",
"registered_queues": "registered_queues",
}
var ErrProtocolError = errors.New("disque protocol error")
func (*Disque) SampleConfig() string {
return sampleConfig
}
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (d *Disque) Gather(acc telegraf.Accumulator) error {
if len(d.Servers) == 0 {
address := &url.URL{
Host: ":7711",
Host: ":" + defaultPort,
}
return d.gatherServer(address, acc)
}
var wg sync.WaitGroup
for _, serv := range d.Servers {
u, err := url.Parse(serv)
if err != nil {
@ -89,8 +90,6 @@ func (d *Disque) Gather(acc telegraf.Accumulator) error {
return nil
}
const defaultPort = "7711"
func (d *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
if d.c == nil {
_, _, err := net.SplitHostPort(addr.Host)
@ -142,7 +141,7 @@ func (d *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
}
if line[0] != '$' {
return fmt.Errorf("bad line start: %w", ErrProtocolError)
return fmt.Errorf("bad line start: %w", errProtocol)
}
line = strings.TrimSpace(line)
@ -151,7 +150,7 @@ func (d *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
sz, err := strconv.Atoi(szStr)
if err != nil {
return fmt.Errorf("bad size string <<%s>>: %w", szStr, ErrProtocolError)
return fmt.Errorf("bad size string <<%s>>: %w", szStr, errProtocol)
}
var read int
@ -174,7 +173,7 @@ func (d *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
name := parts[0]
metric, ok := Tracking[name]
metric, ok := tracking[name]
if !ok {
continue
}

View File

@ -12,7 +12,8 @@ import (
var sampleConfig string
type DMCache struct {
PerDevice bool `toml:"per_device"`
PerDevice bool `toml:"per_device"`
getCurrentStatus func() ([]string, error)
}

View File

@ -25,12 +25,12 @@ var ignoredErrors = []string{
"NXDOMAIN",
}
type ResultType uint64
type resultType uint64
const (
Success ResultType = iota
Timeout
Error
successResult resultType = iota
timeoutResult
errorResult
)
type DNSQuery struct {
@ -117,7 +117,7 @@ func (d *DNSQuery) query(domain, server string) (map[string]interface{}, map[str
fields := map[string]interface{}{
"query_time_ms": float64(0),
"result_code": uint64(Error),
"result_code": uint64(errorResult),
}
c := dns.Client{
@ -140,7 +140,7 @@ func (d *DNSQuery) query(domain, server string) (map[string]interface{}, map[str
var opErr *net.OpError
if errors.As(err, &opErr) && opErr.Timeout() {
tags["result"] = "timeout"
fields["result_code"] = uint64(Timeout)
fields["result_code"] = uint64(timeoutResult)
return fields, tags, err
}
return fields, tags, err
@ -158,7 +158,7 @@ func (d *DNSQuery) query(domain, server string) (map[string]interface{}, map[str
// Success
tags["result"] = "success"
fields["result_code"] = uint64(Success)
fields["result_code"] = uint64(successResult)
// Fill out custom fields for specific record types
for _, record := range r.Answer {

View File

@ -13,8 +13,10 @@ import (
"github.com/influxdata/telegraf/testutil"
)
var servers = []string{"8.8.8.8"}
var domains = []string{"google.com"}
var (
servers = []string{"8.8.8.8"}
domains = []string{"google.com"}
)
func TestGathering(t *testing.T) {
if testing.Short() {

View File

@ -16,7 +16,7 @@ var (
defaultHeaders = map[string]string{"User-Agent": "engine-api-cli-1.0"}
)
type Client interface {
type dockerClient interface {
Info(ctx context.Context) (system.Info, error)
ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error)
ContainerStats(ctx context.Context, containerID string, stream bool) (container.StatsResponseReader, error)
@ -29,15 +29,15 @@ type Client interface {
Close() error
}
func NewEnvClient() (Client, error) {
func newEnvClient() (dockerClient, error) {
dockerClient, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, err
}
return &SocketClient{dockerClient}, nil
return &socketClient{dockerClient}, nil
}
func NewClient(host string, tlsConfig *tls.Config) (Client, error) {
func newClient(host string, tlsConfig *tls.Config) (dockerClient, error) {
transport := &http.Transport{
TLSClientConfig: tlsConfig,
}
@ -52,42 +52,42 @@ func NewClient(host string, tlsConfig *tls.Config) (Client, error) {
return nil, err
}
return &SocketClient{dockerClient}, nil
return &socketClient{dockerClient}, nil
}
type SocketClient struct {
type socketClient struct {
client *client.Client
}
func (c *SocketClient) Info(ctx context.Context) (system.Info, error) {
func (c *socketClient) Info(ctx context.Context) (system.Info, error) {
return c.client.Info(ctx)
}
func (c *SocketClient) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) {
func (c *socketClient) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) {
return c.client.ContainerList(ctx, options)
}
func (c *SocketClient) ContainerStats(ctx context.Context, containerID string, stream bool) (container.StatsResponseReader, error) {
func (c *socketClient) ContainerStats(ctx context.Context, containerID string, stream bool) (container.StatsResponseReader, error) {
return c.client.ContainerStats(ctx, containerID, stream)
}
func (c *SocketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
func (c *socketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return c.client.ContainerInspect(ctx, containerID)
}
func (c *SocketClient) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) {
func (c *socketClient) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) {
return c.client.ServiceList(ctx, options)
}
func (c *SocketClient) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) {
func (c *socketClient) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) {
return c.client.TaskList(ctx, options)
}
func (c *SocketClient) NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error) {
func (c *socketClient) NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error) {
return c.client.NodeList(ctx, options)
}
func (c *SocketClient) DiskUsage(ctx context.Context, options types.DiskUsageOptions) (types.DiskUsage, error) {
func (c *socketClient) DiskUsage(ctx context.Context, options types.DiskUsageOptions) (types.DiskUsage, error) {
return c.client.DiskUsage(ctx, options)
}
func (c *SocketClient) ClientVersion() string {
func (c *socketClient) ClientVersion() string {
return c.client.ClientVersion()
}
func (c *SocketClient) Close() error {
func (c *socketClient) Close() error {
return c.client.Close()
}

View File

@ -26,6 +26,7 @@ import (
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/internal/docker"
docker_stats "github.com/influxdata/telegraf/plugins/common/docker"
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
@ -33,48 +34,15 @@ import (
//go:embed sample.conf
var sampleConfig string
// Docker object
type Docker struct {
Endpoint string
ContainerNames []string `toml:"container_names" deprecated:"1.4.0;1.35.0;use 'container_name_include' instead"`
var (
sizeRegex = regexp.MustCompile(`^(\d+(\.\d+)*) ?([kKmMgGtTpP])?[bB]?$`)
containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"}
containerMetricClasses = []string{"cpu", "network", "blkio"}
now = time.Now
GatherServices bool `toml:"gather_services"`
Timeout config.Duration
PerDevice bool `toml:"perdevice" deprecated:"1.18.0;1.35.0;use 'perdevice_include' instead"`
PerDeviceInclude []string `toml:"perdevice_include"`
Total bool `toml:"total" deprecated:"1.18.0;1.35.0;use 'total_include' instead"`
TotalInclude []string `toml:"total_include"`
TagEnvironment []string `toml:"tag_env"`
LabelInclude []string `toml:"docker_label_include"`
LabelExclude []string `toml:"docker_label_exclude"`
ContainerInclude []string `toml:"container_name_include"`
ContainerExclude []string `toml:"container_name_exclude"`
ContainerStateInclude []string `toml:"container_state_include"`
ContainerStateExclude []string `toml:"container_state_exclude"`
StorageObjects []string `toml:"storage_objects"`
IncludeSourceTag bool `toml:"source_tag"`
Log telegraf.Logger
common_tls.ClientConfig
newEnvClient func() (Client, error)
newClient func(string, *tls.Config) (Client, error)
client Client
engineHost string
serverVersion string
filtersCreated bool
labelFilter filter.Filter
containerFilter filter.Filter
stateFilter filter.Filter
objectTypes []types.DiskUsageObject
}
minVersion = semver.MustParse("1.23")
minDiskUsageVersion = semver.MustParse("1.42")
)
// KB, MB, GB, TB, PB...human friendly
const (
@ -87,15 +55,48 @@ const (
defaultEndpoint = "unix:///var/run/docker.sock"
)
var (
sizeRegex = regexp.MustCompile(`^(\d+(\.\d+)*) ?([kKmMgGtTpP])?[bB]?$`)
containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"}
containerMetricClasses = []string{"cpu", "network", "blkio"}
now = time.Now
// Docker object
type Docker struct {
Endpoint string `toml:"endpoint"`
ContainerNames []string `toml:"container_names" deprecated:"1.4.0;1.35.0;use 'container_name_include' instead"`
minVersion = semver.MustParse("1.23")
minDiskUsageVersion = semver.MustParse("1.42")
)
GatherServices bool `toml:"gather_services"`
Timeout config.Duration `toml:"timeout"`
PerDevice bool `toml:"perdevice" deprecated:"1.18.0;1.35.0;use 'perdevice_include' instead"`
PerDeviceInclude []string `toml:"perdevice_include"`
Total bool `toml:"total" deprecated:"1.18.0;1.35.0;use 'total_include' instead"`
TotalInclude []string `toml:"total_include"`
TagEnvironment []string `toml:"tag_env"`
LabelInclude []string `toml:"docker_label_include"`
LabelExclude []string `toml:"docker_label_exclude"`
ContainerInclude []string `toml:"container_name_include"`
ContainerExclude []string `toml:"container_name_exclude"`
ContainerStateInclude []string `toml:"container_state_include"`
ContainerStateExclude []string `toml:"container_state_exclude"`
StorageObjects []string `toml:"storage_objects"`
IncludeSourceTag bool `toml:"source_tag"`
Log telegraf.Logger `toml:"-"`
common_tls.ClientConfig
newEnvClient func() (dockerClient, error)
newClient func(string, *tls.Config) (dockerClient, error)
client dockerClient
engineHost string
serverVersion string
filtersCreated bool
labelFilter filter.Filter
containerFilter filter.Filter
stateFilter filter.Filter
objectTypes []types.DiskUsageObject
}
func (*Docker) SampleConfig() string {
return sampleConfig
@ -149,7 +150,6 @@ func (d *Docker) Init() error {
return nil
}
// Gather metrics from the docker server.
func (d *Docker) Gather(acc telegraf.Accumulator) error {
if d.client == nil {
c, err := d.getNewClient()
@ -664,10 +664,10 @@ func (d *Docker) parseContainerStats(
memfields["limit"] = stat.MemoryStats.Limit
memfields["max_usage"] = stat.MemoryStats.MaxUsage
mem := CalculateMemUsageUnixNoCache(stat.MemoryStats)
mem := docker_stats.CalculateMemUsageUnixNoCache(stat.MemoryStats)
memLimit := float64(stat.MemoryStats.Limit)
memfields["usage"] = uint64(mem)
memfields["usage_percent"] = CalculateMemPercentUnixNoCache(memLimit, mem)
memfields["usage_percent"] = docker_stats.CalculateMemPercentUnixNoCache(memLimit, mem)
} else {
memfields["commit_bytes"] = stat.MemoryStats.Commit
memfields["commit_peak_bytes"] = stat.MemoryStats.CommitPeak
@ -691,10 +691,10 @@ func (d *Docker) parseContainerStats(
if daemonOSType != "windows" {
previousCPU := stat.PreCPUStats.CPUUsage.TotalUsage
previousSystem := stat.PreCPUStats.SystemUsage
cpuPercent := CalculateCPUPercentUnix(previousCPU, previousSystem, stat)
cpuPercent := docker_stats.CalculateCPUPercentUnix(previousCPU, previousSystem, stat)
cpufields["usage_percent"] = cpuPercent
} else {
cpuPercent := calculateCPUPercentWindows(stat)
cpuPercent := docker_stats.CalculateCPUPercentWindows(stat)
cpufields["usage_percent"] = cpuPercent
}
@ -1046,7 +1046,7 @@ func (d *Docker) createContainerStateFilters() error {
return nil
}
func (d *Docker) getNewClient() (Client, error) {
func (d *Docker) getNewClient() (dockerClient, error) {
if d.Endpoint == "ENV" {
return d.newEnvClient()
}
@ -1067,8 +1067,8 @@ func init() {
TotalInclude: []string{"cpu", "blkio", "network"},
Timeout: config.Duration(time.Second * 5),
Endpoint: defaultEndpoint,
newEnvClient: NewEnvClient,
newClient: NewClient,
newEnvClient: newEnvClient,
newClient: newClient,
filtersCreated: false,
}
})

View File

@ -21,7 +21,7 @@ import (
"github.com/influxdata/telegraf/testutil"
)
type MockClient struct {
type mockClient struct {
InfoF func() (system.Info, error)
ContainerListF func(options container.ListOptions) ([]types.Container, error)
ContainerStatsF func(containerID string) (container.StatsResponseReader, error)
@ -34,47 +34,47 @@ type MockClient struct {
CloseF func() error
}
func (c *MockClient) Info(context.Context) (system.Info, error) {
func (c *mockClient) Info(context.Context) (system.Info, error) {
return c.InfoF()
}
func (c *MockClient) ContainerList(_ context.Context, options container.ListOptions) ([]types.Container, error) {
func (c *mockClient) ContainerList(_ context.Context, options container.ListOptions) ([]types.Container, error) {
return c.ContainerListF(options)
}
func (c *MockClient) ContainerStats(_ context.Context, containerID string, _ bool) (container.StatsResponseReader, error) {
func (c *mockClient) ContainerStats(_ context.Context, containerID string, _ bool) (container.StatsResponseReader, error) {
return c.ContainerStatsF(containerID)
}
func (c *MockClient) ContainerInspect(context.Context, string) (types.ContainerJSON, error) {
func (c *mockClient) ContainerInspect(context.Context, string) (types.ContainerJSON, error) {
return c.ContainerInspectF()
}
func (c *MockClient) ServiceList(context.Context, types.ServiceListOptions) ([]swarm.Service, error) {
func (c *mockClient) ServiceList(context.Context, types.ServiceListOptions) ([]swarm.Service, error) {
return c.ServiceListF()
}
func (c *MockClient) TaskList(context.Context, types.TaskListOptions) ([]swarm.Task, error) {
func (c *mockClient) TaskList(context.Context, types.TaskListOptions) ([]swarm.Task, error) {
return c.TaskListF()
}
func (c *MockClient) NodeList(context.Context, types.NodeListOptions) ([]swarm.Node, error) {
func (c *mockClient) NodeList(context.Context, types.NodeListOptions) ([]swarm.Node, error) {
return c.NodeListF()
}
func (c *MockClient) DiskUsage(context.Context, types.DiskUsageOptions) (types.DiskUsage, error) {
func (c *mockClient) DiskUsage(context.Context, types.DiskUsageOptions) (types.DiskUsage, error) {
return c.DiskUsageF()
}
func (c *MockClient) ClientVersion() string {
func (c *mockClient) ClientVersion() string {
return c.ClientVersionF()
}
func (c *MockClient) Close() error {
func (c *mockClient) Close() error {
return c.CloseF()
}
var baseClient = MockClient{
var baseClient = mockClient{
InfoF: func() (system.Info, error) {
return info, nil
},
@ -88,13 +88,13 @@ var baseClient = MockClient{
return containerInspect(), nil
},
ServiceListF: func() ([]swarm.Service, error) {
return ServiceList, nil
return serviceList, nil
},
TaskListF: func() ([]swarm.Task, error) {
return TaskList, nil
return taskList, nil
},
NodeListF: func() ([]swarm.Node, error) {
return NodeList, nil
return nodeList, nil
},
DiskUsageF: func() (types.DiskUsage, error) {
return diskUsage, nil
@ -421,8 +421,8 @@ func TestDocker_WindowsMemoryContainerStats(t *testing.T) {
d := Docker{
Log: testutil.Logger{},
newClient: func(string, *tls.Config) (Client, error) {
return &MockClient{
newClient: func(string, *tls.Config) (dockerClient, error) {
return &mockClient{
InfoF: func() (system.Info, error) {
return info, nil
},
@ -436,13 +436,13 @@ func TestDocker_WindowsMemoryContainerStats(t *testing.T) {
return containerInspect(), nil
},
ServiceListF: func() ([]swarm.Service, error) {
return ServiceList, nil
return serviceList, nil
},
TaskListF: func() ([]swarm.Task, error) {
return TaskList, nil
return taskList, nil
},
NodeListF: func() ([]swarm.Node, error) {
return NodeList, nil
return nodeList, nil
},
DiskUsageF: func() (types.DiskUsage, error) {
return diskUsage, nil
@ -559,7 +559,7 @@ func TestContainerLabels(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var acc testutil.Accumulator
newClientFunc := func(string, *tls.Config) (Client, error) {
newClientFunc := func(string, *tls.Config) (dockerClient, error) {
client := baseClient
client.ContainerListF = func(container.ListOptions) ([]types.Container, error) {
return []types.Container{tt.container}, nil
@ -679,7 +679,7 @@ func TestContainerNames(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var acc testutil.Accumulator
newClientFunc := func(string, *tls.Config) (Client, error) {
newClientFunc := func(string, *tls.Config) (dockerClient, error) {
client := baseClient
client.ContainerListF = func(container.ListOptions) ([]types.Container, error) {
return containerList, nil
@ -720,7 +720,7 @@ func TestContainerNames(t *testing.T) {
}
}
func FilterMetrics(metrics []telegraf.Metric, f func(telegraf.Metric) bool) []telegraf.Metric {
func filterMetrics(metrics []telegraf.Metric, f func(telegraf.Metric) bool) []telegraf.Metric {
results := []telegraf.Metric{}
for _, m := range metrics {
if f(m) {
@ -889,7 +889,7 @@ func TestContainerStatus(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var (
acc testutil.Accumulator
newClientFunc = func(string, *tls.Config) (Client, error) {
newClientFunc = func(string, *tls.Config) (dockerClient, error) {
client := baseClient
client.ContainerListF = func(container.ListOptions) ([]types.Container, error) {
return containerList[:1], nil
@ -918,7 +918,7 @@ func TestContainerStatus(t *testing.T) {
err := d.Gather(&acc)
require.NoError(t, err)
actual := FilterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool {
actual := filterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool {
return m.Name() == "docker_container_status"
})
testutil.RequireMetricsEqual(t, tt.expected, actual)
@ -930,7 +930,7 @@ func TestDockerGatherInfo(t *testing.T) {
var acc testutil.Accumulator
d := Docker{
Log: testutil.Logger{},
newClient: func(string, *tls.Config) (Client, error) { return &baseClient, nil },
newClient: func(string, *tls.Config) (dockerClient, error) { return &baseClient, nil },
TagEnvironment: []string{"ENVVAR1", "ENVVAR2", "ENVVAR3", "ENVVAR5",
"ENVVAR6", "ENVVAR7", "ENVVAR8", "ENVVAR9"},
PerDeviceInclude: []string{"cpu", "network", "blkio"},
@ -1083,7 +1083,7 @@ func TestDockerGatherSwarmInfo(t *testing.T) {
var acc testutil.Accumulator
d := Docker{
Log: testutil.Logger{},
newClient: func(string, *tls.Config) (Client, error) { return &baseClient, nil },
newClient: func(string, *tls.Config) (dockerClient, error) { return &baseClient, nil },
}
err := acc.GatherError(d.Gather)
@ -1174,7 +1174,7 @@ func TestContainerStateFilter(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var acc testutil.Accumulator
newClientFunc := func(string, *tls.Config) (Client, error) {
newClientFunc := func(string, *tls.Config) (dockerClient, error) {
client := baseClient
client.ContainerListF = func(options container.ListOptions) ([]types.Container, error) {
for k, v := range tt.expected {
@ -1205,12 +1205,12 @@ func TestContainerStateFilter(t *testing.T) {
func TestContainerName(t *testing.T) {
tests := []struct {
name string
clientFunc func(host string, tlsConfig *tls.Config) (Client, error)
clientFunc func(host string, tlsConfig *tls.Config) (dockerClient, error)
expected string
}{
{
name: "container stats name is preferred",
clientFunc: func(string, *tls.Config) (Client, error) {
clientFunc: func(string, *tls.Config) (dockerClient, error) {
client := baseClient
client.ContainerListF = func(container.ListOptions) ([]types.Container, error) {
var containers []types.Container
@ -1230,7 +1230,7 @@ func TestContainerName(t *testing.T) {
},
{
name: "container stats without name uses container list name",
clientFunc: func(string, *tls.Config) (Client, error) {
clientFunc: func(string, *tls.Config) (dockerClient, error) {
client := baseClient
client.ContainerListF = func(container.ListOptions) ([]types.Container, error) {
var containers []types.Container
@ -1444,7 +1444,7 @@ func Test_parseContainerStatsPerDeviceAndTotal(t *testing.T) {
}
d.parseContainerStats(tt.args.stat, &acc, tt.args.tags, tt.args.id, tt.args.daemonOSType)
actual := FilterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool {
actual := filterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool {
return choice.Contains(m.Name(),
[]string{"docker_container_cpu", "docker_container_net", "docker_container_blkio"})
})
@ -1547,7 +1547,7 @@ func TestDockerGatherDiskUsage(t *testing.T) {
var acc testutil.Accumulator
d := Docker{
Log: testutil.Logger{},
newClient: func(string, *tls.Config) (Client, error) { return &baseClient, nil },
newClient: func(string, *tls.Config) (dockerClient, error) { return &baseClient, nil },
}
require.NoError(t, acc.GatherError(d.Gather))

View File

@ -171,7 +171,7 @@ var containerList = []types.Container{
}
var two = uint64(2)
var ServiceList = []swarm.Service{
var serviceList = []swarm.Service{
{
ID: "qolkls9g5iasdiuihcyz9rnx2",
Spec: swarm.ServiceSpec{
@ -198,7 +198,7 @@ var ServiceList = []swarm.Service{
},
}
var TaskList = []swarm.Task{
var taskList = []swarm.Task{
{
ID: "kwh0lv7hwwbh",
ServiceID: "qolkls9g5iasdiuihcyz9rnx2",
@ -228,7 +228,7 @@ var TaskList = []swarm.Task{
},
}
var NodeList = []swarm.Node{
var nodeList = []swarm.Node{
{
ID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.NodeStatus{

View File

@ -17,21 +17,21 @@ var (
defaultHeaders = map[string]string{"User-Agent": "engine-api-cli-1.0"}
)
type Client interface {
type dockerClient interface {
ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error)
ContainerLogs(ctx context.Context, containerID string, options container.LogsOptions) (io.ReadCloser, error)
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
}
func NewEnvClient() (Client, error) {
func newEnvClient() (dockerClient, error) {
client, err := docker.NewClientWithOpts(docker.FromEnv)
if err != nil {
return nil, err
}
return &SocketClient{client}, nil
return &socketClient{client}, nil
}
func NewClient(host string, tlsConfig *tls.Config) (Client, error) {
func newClient(host string, tlsConfig *tls.Config) (dockerClient, error) {
transport := &http.Transport{
TLSClientConfig: tlsConfig,
}
@ -45,20 +45,20 @@ func NewClient(host string, tlsConfig *tls.Config) (Client, error) {
if err != nil {
return nil, err
}
return &SocketClient{client}, nil
return &socketClient{client}, nil
}
type SocketClient struct {
type socketClient struct {
client *docker.Client
}
func (c *SocketClient) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) {
func (c *socketClient) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) {
return c.client.ContainerList(ctx, options)
}
func (c *SocketClient) ContainerLogs(ctx context.Context, containerID string, options container.LogsOptions) (io.ReadCloser, error) {
func (c *socketClient) ContainerLogs(ctx context.Context, containerID string, options container.LogsOptions) (io.ReadCloser, error) {
return c.client.ContainerLogs(ctx, containerID, options)
}
func (c *SocketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
func (c *socketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return c.client.ContainerInspect(ctx, containerID)
}

View File

@ -31,14 +31,14 @@ import (
//go:embed sample.conf
var sampleConfig string
const (
defaultEndpoint = "unix:///var/run/docker.sock"
var (
// ensure *DockerLogs implements telegraf.ServiceInput
_ telegraf.ServiceInput = (*DockerLogs)(nil)
containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"}
)
var (
containerStates = []string{"created", "restarting", "running", "removing", "paused", "exited", "dead"}
// ensure *DockerLogs implements telegraf.ServiceInput
_ telegraf.ServiceInput = (*DockerLogs)(nil)
const (
defaultEndpoint = "unix:///var/run/docker.sock"
)
type DockerLogs struct {
@ -55,10 +55,10 @@ type DockerLogs struct {
common_tls.ClientConfig
newEnvClient func() (Client, error)
newClient func(string, *tls.Config) (Client, error)
newEnvClient func() (dockerClient, error)
newClient func(string, *tls.Config) (dockerClient, error)
client Client
client dockerClient
labelFilter filter.Filter
containerFilter filter.Filter
stateFilter filter.Filter
@ -127,7 +127,11 @@ func (d *DockerLogs) Init() error {
return nil
}
// State persistence interfaces
// Start is a noop which is required for a *DockerLogs to implement the telegraf.ServiceInput interface
func (d *DockerLogs) Start(telegraf.Accumulator) error {
return nil
}
func (d *DockerLogs) GetState() interface{} {
d.lastRecordMtx.Lock()
recordOffsets := make(map[string]time.Time, len(d.lastRecord))
@ -153,6 +157,50 @@ func (d *DockerLogs) SetState(state interface{}) error {
return nil
}
func (d *DockerLogs) Gather(acc telegraf.Accumulator) error {
ctx := context.Background()
acc.SetPrecision(time.Nanosecond)
ctx, cancel := context.WithTimeout(ctx, time.Duration(d.Timeout))
defer cancel()
containers, err := d.client.ContainerList(ctx, d.opts)
if err != nil {
return err
}
for _, cntnr := range containers {
if d.containerInContainerList(cntnr.ID) {
continue
}
containerName := d.matchedContainerName(cntnr.Names)
if containerName == "" {
continue
}
ctx, cancel := context.WithCancel(context.Background())
d.addToContainerList(cntnr.ID, cancel)
// Start a new goroutine for every new container that has logs to collect
d.wg.Add(1)
go func(container types.Container) {
defer d.wg.Done()
defer d.removeFromContainerList(container.ID)
err = d.tailContainerLogs(ctx, acc, container, containerName)
if err != nil && !errors.Is(err, context.Canceled) {
acc.AddError(err)
}
}(cntnr)
}
return nil
}
func (d *DockerLogs) Stop() {
d.cancelTails()
d.wg.Wait()
}
func (d *DockerLogs) addToContainerList(containerID string, cancel context.CancelFunc) {
d.mu.Lock()
defer d.mu.Unlock()
@ -195,45 +243,6 @@ func (d *DockerLogs) matchedContainerName(names []string) string {
return ""
}
func (d *DockerLogs) Gather(acc telegraf.Accumulator) error {
ctx := context.Background()
acc.SetPrecision(time.Nanosecond)
ctx, cancel := context.WithTimeout(ctx, time.Duration(d.Timeout))
defer cancel()
containers, err := d.client.ContainerList(ctx, d.opts)
if err != nil {
return err
}
for _, cntnr := range containers {
if d.containerInContainerList(cntnr.ID) {
continue
}
containerName := d.matchedContainerName(cntnr.Names)
if containerName == "" {
continue
}
ctx, cancel := context.WithCancel(context.Background())
d.addToContainerList(cntnr.ID, cancel)
// Start a new goroutine for every new container that has logs to collect
d.wg.Add(1)
go func(container types.Container) {
defer d.wg.Done()
defer d.removeFromContainerList(container.ID)
err = d.tailContainerLogs(ctx, acc, container, containerName)
if err != nil && !errors.Is(err, context.Canceled) {
acc.AddError(err)
}
}(cntnr)
}
return nil
}
func (d *DockerLogs) hasTTY(ctx context.Context, cntnr types.Container) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(d.Timeout))
defer cancel()
@ -439,17 +448,6 @@ func tailMultiplexed(
return tsStderr, nil
}
// Start is a noop which is required for a *DockerLogs to implement
// the telegraf.ServiceInput interface
func (d *DockerLogs) Start(telegraf.Accumulator) error {
return nil
}
func (d *DockerLogs) Stop() {
d.cancelTails()
d.wg.Wait()
}
// Following few functions have been inherited from telegraf docker input plugin
func (d *DockerLogs) createContainerFilters() error {
containerFilter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude)
@ -481,21 +479,21 @@ func (d *DockerLogs) createContainerStateFilters() error {
return nil
}
func init() {
inputs.Add("docker_log", func() telegraf.Input {
return &DockerLogs{
Timeout: config.Duration(time.Second * 5),
Endpoint: defaultEndpoint,
newEnvClient: NewEnvClient,
newClient: NewClient,
containerList: make(map[string]context.CancelFunc),
}
})
}
func hostnameFromID(id string) string {
if len(id) > 12 {
return id[0:12]
}
return id
}
func init() {
inputs.Add("docker_log", func() telegraf.Input {
return &DockerLogs{
Timeout: config.Duration(time.Second * 5),
Endpoint: defaultEndpoint,
newEnvClient: newEnvClient,
newClient: newClient,
containerList: make(map[string]context.CancelFunc),
}
})
}

View File

@ -18,33 +18,33 @@ import (
"github.com/influxdata/telegraf/testutil"
)
type MockClient struct {
type mockClient struct {
ContainerListF func() ([]types.Container, error)
ContainerInspectF func() (types.ContainerJSON, error)
ContainerLogsF func() (io.ReadCloser, error)
}
func (c *MockClient) ContainerList(context.Context, container.ListOptions) ([]types.Container, error) {
func (c *mockClient) ContainerList(context.Context, container.ListOptions) ([]types.Container, error) {
return c.ContainerListF()
}
func (c *MockClient) ContainerInspect(context.Context, string) (types.ContainerJSON, error) {
func (c *mockClient) ContainerInspect(context.Context, string) (types.ContainerJSON, error) {
return c.ContainerInspectF()
}
func (c *MockClient) ContainerLogs(context.Context, string, container.LogsOptions) (io.ReadCloser, error) {
func (c *mockClient) ContainerLogs(context.Context, string, container.LogsOptions) (io.ReadCloser, error) {
return c.ContainerLogsF()
}
type Response struct {
type response struct {
io.Reader
}
func (r *Response) Close() error {
func (r *response) Close() error {
return nil
}
func MustParse(layout, value string) time.Time {
func mustParse(layout, value string) time.Time {
tm, err := time.Parse(layout, value)
if err != nil {
panic(err)
@ -55,12 +55,12 @@ func MustParse(layout, value string) time.Time {
func Test(t *testing.T) {
tests := []struct {
name string
client *MockClient
client *mockClient
expected []telegraf.Metric
}{
{
name: "no containers",
client: &MockClient{
client: &mockClient{
ContainerListF: func() ([]types.Container, error) {
return nil, nil
},
@ -68,7 +68,7 @@ func Test(t *testing.T) {
},
{
name: "one container tty",
client: &MockClient{
client: &mockClient{
ContainerListF: func() ([]types.Container, error) {
return []types.Container{
{
@ -86,7 +86,7 @@ func Test(t *testing.T) {
}, nil
},
ContainerLogsF: func() (io.ReadCloser, error) {
return &Response{Reader: bytes.NewBufferString("2020-04-28T18:43:16.432691200Z hello\n")}, nil
return &response{Reader: bytes.NewBufferString("2020-04-28T18:43:16.432691200Z hello\n")}, nil
},
},
expected: []telegraf.Metric{
@ -103,13 +103,13 @@ func Test(t *testing.T) {
"container_id": "deadbeef",
"message": "hello",
},
MustParse(time.RFC3339Nano, "2020-04-28T18:43:16.432691200Z"),
mustParse(time.RFC3339Nano, "2020-04-28T18:43:16.432691200Z"),
),
},
},
{
name: "one container multiplex",
client: &MockClient{
client: &mockClient{
ContainerListF: func() ([]types.Container, error) {
return []types.Container{
{
@ -130,7 +130,7 @@ func Test(t *testing.T) {
var buf bytes.Buffer
w := stdcopy.NewStdWriter(&buf, stdcopy.Stdout)
_, err := w.Write([]byte("2020-04-28T18:42:16.432691200Z hello from stdout"))
return &Response{Reader: &buf}, err
return &response{Reader: &buf}, err
},
},
expected: []telegraf.Metric{
@ -147,7 +147,7 @@ func Test(t *testing.T) {
"container_id": "deadbeef",
"message": "hello from stdout",
},
MustParse(time.RFC3339Nano, "2020-04-28T18:42:16.432691200Z"),
mustParse(time.RFC3339Nano, "2020-04-28T18:42:16.432691200Z"),
),
},
},
@ -157,7 +157,7 @@ func Test(t *testing.T) {
var acc testutil.Accumulator
plugin := &DockerLogs{
Timeout: config.Duration(time.Second * 5),
newClient: func(string, *tls.Config) (Client, error) { return tt.client, nil },
newClient: func(string, *tls.Config) (dockerClient, error) { return tt.client, nil },
containerList: make(map[string]context.CancelFunc),
IncludeSourceTag: true,
}

View File

@ -20,23 +20,23 @@ import (
//go:embed sample.conf
var sampleConfig string
var (
defaultTimeout = time.Second * time.Duration(5)
validQuery = map[string]bool{
"user": true, "domain": true, "global": true, "ip": true,
}
)
type Dovecot struct {
Type string
Filters []string
Servers []string
}
var defaultTimeout = time.Second * time.Duration(5)
var validQuery = map[string]bool{
"user": true, "domain": true, "global": true, "ip": true,
Type string `toml:"type"`
Filters []string `toml:"filters"`
Servers []string `toml:"servers"`
}
func (*Dovecot) SampleConfig() string {
return sampleConfig
}
// Reads stats from all configured servers.
func (d *Dovecot) Gather(acc telegraf.Accumulator) error {
if !validQuery[d.Type] {
return fmt.Errorf("error: %s is not a valid query type", d.Type)

View File

@ -40,7 +40,7 @@ const (
unreachableSocketBehaviorError = "error"
)
type dpdk struct {
type Dpdk struct {
SocketPath string `toml:"socket_path"`
AccessTimeout config.Duration `toml:"socket_access_timeout"`
DeviceTypes []string `toml:"device_types"`
@ -62,12 +62,12 @@ type ethdevConfig struct {
EthdevExcludeCommands []string `toml:"exclude_commands"`
}
func (*dpdk) SampleConfig() string {
func (*Dpdk) SampleConfig() string {
return sampleConfig
}
// Init performs validation of all parameters from configuration
func (dpdk *dpdk) Init() error {
func (dpdk *Dpdk) Init() error {
dpdk.setupDefaultValues()
err := dpdk.validateAdditionalCommands()
@ -101,22 +101,13 @@ func (dpdk *dpdk) Init() error {
}
// Start implements ServiceInput interface
func (dpdk *dpdk) Start(telegraf.Accumulator) error {
func (dpdk *Dpdk) Start(telegraf.Accumulator) error {
return dpdk.maintainConnections()
}
func (dpdk *dpdk) Stop() {
for _, connector := range dpdk.connectors {
if err := connector.tryClose(); err != nil {
dpdk.Log.Warnf("Couldn't close connection for %q: %v", connector.pathToSocket, err)
}
}
dpdk.connectors = nil
}
// Gather function gathers all unique commands and processes each command sequentially
// Parallel processing could be achieved by running several instances of this plugin with different settings
func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error {
func (dpdk *Dpdk) Gather(acc telegraf.Accumulator) error {
if err := dpdk.Start(acc); err != nil {
return err
}
@ -130,8 +121,17 @@ func (dpdk *dpdk) Gather(acc telegraf.Accumulator) error {
return nil
}
func (dpdk *Dpdk) Stop() {
for _, connector := range dpdk.connectors {
if err := connector.tryClose(); err != nil {
dpdk.Log.Warnf("Couldn't close connection for %q: %v", connector.pathToSocket, err)
}
}
dpdk.connectors = nil
}
// Setup default values for dpdk
func (dpdk *dpdk) setupDefaultValues() {
func (dpdk *Dpdk) setupDefaultValues() {
if dpdk.SocketPath == "" {
dpdk.SocketPath = defaultPathToSocket
}
@ -156,7 +156,7 @@ func (dpdk *dpdk) setupDefaultValues() {
dpdk.ethdevCommands = []string{"/ethdev/stats", "/ethdev/xstats", "/ethdev/info", ethdevLinkStatusCommand}
}
func (dpdk *dpdk) getDpdkInMemorySocketPaths() []string {
func (dpdk *Dpdk) getDpdkInMemorySocketPaths() []string {
filePaths := dpdk.socketGlobPath.Match()
var results []string
@ -175,7 +175,7 @@ func (dpdk *dpdk) getDpdkInMemorySocketPaths() []string {
}
// Checks that user-supplied commands are unique and match DPDK commands format
func (dpdk *dpdk) validateAdditionalCommands() error {
func (dpdk *Dpdk) validateAdditionalCommands() error {
dpdk.AdditionalCommands = uniqueValues(dpdk.AdditionalCommands)
for _, cmd := range dpdk.AdditionalCommands {
@ -200,7 +200,7 @@ func (dpdk *dpdk) validateAdditionalCommands() error {
}
// Establishes connections do DPDK telemetry sockets
func (dpdk *dpdk) maintainConnections() error {
func (dpdk *Dpdk) maintainConnections() error {
candidates := []string{dpdk.SocketPath}
if choice.Contains(dpdkPluginOptionInMemory, dpdk.PluginOptions) {
candidates = dpdk.getDpdkInMemorySocketPaths()
@ -259,7 +259,7 @@ func (dpdk *dpdk) maintainConnections() error {
}
// Gathers all unique commands
func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator, dpdkConnector *dpdkConnector) []string {
func (dpdk *Dpdk) gatherCommands(acc telegraf.Accumulator, dpdkConnector *dpdkConnector) []string {
var commands []string
if choice.Contains("ethdev", dpdk.DeviceTypes) {
ethdevCommands := removeSubset(dpdk.ethdevCommands, dpdk.ethdevExcludedCommandsFilter)
@ -284,7 +284,7 @@ func (dpdk *dpdk) gatherCommands(acc telegraf.Accumulator, dpdkConnector *dpdkCo
func init() {
inputs.Add(pluginName, func() telegraf.Input {
dpdk := &dpdk{
dpdk := &Dpdk{
// Setting it here (rather than in `Init()`) to distinguish between "zero" value,
// default value and don't having value in config at all.
AccessTimeout: defaultAccessTimeout,

View File

@ -10,8 +10,8 @@ import (
type linkStatus int64
const (
DOWN linkStatus = iota
UP
down linkStatus = iota
up
)
const (
@ -22,8 +22,8 @@ const (
var (
linkStatusMap = map[string]linkStatus{
"down": DOWN,
"up": UP,
"down": down,
"up": up,
}
)

View File

@ -92,7 +92,7 @@ func Test_readMaxOutputLen(t *testing.T) {
func Test_connect(t *testing.T) {
t.Run("should pass if PathToSocket points to socket", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t, "")
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: pathToSocket,
connectors: []*dpdkConnector{newDpdkConnector(pathToSocket, 0)},
}

View File

@ -26,7 +26,7 @@ import (
func Test_Init(t *testing.T) {
t.Run("when SocketPath field isn't set then it should be set to default value", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
Log: testutil.Logger{},
SocketPath: "",
}
@ -39,7 +39,7 @@ func Test_Init(t *testing.T) {
})
t.Run("when Metadata Fields isn't set then it should be set to default value (dpdk_pid)", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
Log: testutil.Logger{},
}
require.Nil(t, dpdk.MetadataFields)
@ -49,7 +49,7 @@ func Test_Init(t *testing.T) {
})
t.Run("when PluginOptions field isn't set then it should be set to default value (in_memory)", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
Log: testutil.Logger{},
}
require.Nil(t, dpdk.PluginOptions)
@ -60,7 +60,7 @@ func Test_Init(t *testing.T) {
t.Run("when commands are in invalid format (doesn't start with '/') then error should be returned", func(t *testing.T) {
pathToSocket, _ := createSocketForTest(t, "")
dpdk := dpdk{
dpdk := Dpdk{
Log: testutil.Logger{},
SocketPath: pathToSocket,
AdditionalCommands: []string{"invalid"},
@ -73,7 +73,7 @@ func Test_Init(t *testing.T) {
})
t.Run("when AccessTime is < 0 then error should be returned", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
Log: testutil.Logger{},
AccessTimeout: -1,
}
@ -85,7 +85,7 @@ func Test_Init(t *testing.T) {
t.Run("when device_types and additional_commands are empty, then error should be returned", func(t *testing.T) {
pathToSocket, _ := createSocketForTest(t, "")
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: pathToSocket,
DeviceTypes: []string{},
AdditionalCommands: []string{},
@ -99,7 +99,7 @@ func Test_Init(t *testing.T) {
})
t.Run("when UnreachableSocketBehavior specified with unknown value - err should be returned", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
UnreachableSocketBehavior: "whatisthat",
@ -113,7 +113,7 @@ func Test_Init(t *testing.T) {
func Test_Start(t *testing.T) {
t.Run("when socket doesn't exist err should be returned", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
}
@ -126,7 +126,7 @@ func Test_Start(t *testing.T) {
})
t.Run("when socket doesn't exist, but UnreachableSocketBehavior is Ignore err shouldn't be returned", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
UnreachableSocketBehavior: unreachableSocketBehaviorIgnore,
@ -140,7 +140,7 @@ func Test_Start(t *testing.T) {
t.Run("when all values are valid, then no error should be returned", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t, "")
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: pathToSocket,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
@ -157,7 +157,7 @@ func Test_Start(t *testing.T) {
func TestMaintainConnections(t *testing.T) {
t.Run("maintainConnections should return the error if socket doesn't exist", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: "/tmp/justrandompath",
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
@ -173,7 +173,7 @@ func TestMaintainConnections(t *testing.T) {
})
t.Run("maintainConnections should return the error if socket not found with dpdkPluginOptionInMemory", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: defaultPathToSocket,
Log: testutil.Logger{},
PluginOptions: []string{dpdkPluginOptionInMemory},
@ -191,7 +191,7 @@ func TestMaintainConnections(t *testing.T) {
t.Run("maintainConnections shouldn't return error with 1 socket", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t, "")
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: pathToSocket,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
@ -212,7 +212,7 @@ func TestMaintainConnections(t *testing.T) {
pathToSockets, sockets := createMultipleSocketsForTest(t, numSockets, "")
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: pathToSockets[0],
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
@ -236,7 +236,7 @@ func TestMaintainConnections(t *testing.T) {
t.Run("Test maintainConnections without dpdkPluginOptionInMemory option", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t, "")
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: pathToSocket,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
@ -256,7 +256,7 @@ func TestMaintainConnections(t *testing.T) {
t.Run("Test maintainConnections with dpdkPluginOptionInMemory option", func(t *testing.T) {
pathToSocket1, socket1 := createSocketForTest(t, "")
go simulateSocketResponse(socket1, t)
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: pathToSocket1,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
@ -297,7 +297,7 @@ func TestMaintainConnections(t *testing.T) {
func TestClose(t *testing.T) {
t.Run("Num of connections should be 0 after Stop func", func(t *testing.T) {
pathToSocket, socket := createSocketForTest(t, "")
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: pathToSocket,
DeviceTypes: []string{"ethdev"},
Log: testutil.Logger{},
@ -317,7 +317,7 @@ func TestClose(t *testing.T) {
func Test_validateAdditionalCommands(t *testing.T) {
t.Run("when validating commands in correct format then no error should be returned", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
AdditionalCommands: []string{"/test", "/help"},
}
@ -327,7 +327,7 @@ func Test_validateAdditionalCommands(t *testing.T) {
})
t.Run("when validating command that doesn't begin with slash then error should be returned", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
AdditionalCommands: []string{
"/test", "commandWithoutSlash",
},
@ -340,7 +340,7 @@ func Test_validateAdditionalCommands(t *testing.T) {
})
t.Run("when validating long command (without parameters) then error should be returned", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
AdditionalCommands: []string{
"/test", "/" + strings.Repeat("a", maxCommandLength),
},
@ -353,7 +353,7 @@ func Test_validateAdditionalCommands(t *testing.T) {
})
t.Run("when validating long command (with params) then error should be returned", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
AdditionalCommands: []string{
"/test", "/," + strings.Repeat("a", maxCommandLengthWithParams),
},
@ -366,7 +366,7 @@ func Test_validateAdditionalCommands(t *testing.T) {
})
t.Run("when validating empty command then error should be returned", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
AdditionalCommands: []string{
"/test", "",
},
@ -379,7 +379,7 @@ func Test_validateAdditionalCommands(t *testing.T) {
})
t.Run("when validating commands with duplicates then duplicates should be removed and no error should be returned", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
AdditionalCommands: []string{
"/test", "/test",
},
@ -393,9 +393,9 @@ func Test_validateAdditionalCommands(t *testing.T) {
})
}
func prepareEnvironment() (*mocks.Conn, dpdk, *testutil.Accumulator) {
func prepareEnvironment() (*mocks.Conn, Dpdk, *testutil.Accumulator) {
mockConnection := &mocks.Conn{}
dpdk := dpdk{
dpdk := Dpdk{
connectors: []*dpdkConnector{{
connection: mockConnection,
initMessage: &initMessage{
@ -411,9 +411,9 @@ func prepareEnvironment() (*mocks.Conn, dpdk, *testutil.Accumulator) {
return mockConnection, dpdk, mockAcc
}
func prepareEnvironmentWithMultiSockets() ([]*mocks.Conn, dpdk, *testutil.Accumulator) {
func prepareEnvironmentWithMultiSockets() ([]*mocks.Conn, Dpdk, *testutil.Accumulator) {
mockConnections := []*mocks.Conn{{}, {}}
dpdk := dpdk{
dpdk := Dpdk{
connectors: []*dpdkConnector{
{
connection: mockConnections[0],
@ -440,9 +440,9 @@ func prepareEnvironmentWithMultiSockets() ([]*mocks.Conn, dpdk, *testutil.Accumu
return mockConnections, dpdk, mockAcc
}
func prepareEnvironmentWithInitializedMessage(initMsg *initMessage) (*mocks.Conn, dpdk, *testutil.Accumulator) {
func prepareEnvironmentWithInitializedMessage(initMsg *initMessage) (*mocks.Conn, Dpdk, *testutil.Accumulator) {
mockConnection := &mocks.Conn{}
dpdk := dpdk{
dpdk := Dpdk{
connectors: []*dpdkConnector{{
connection: mockConnection,
accessTimeout: 2 * time.Second,
@ -548,7 +548,7 @@ func Test_getDpdkInMemorySocketPaths(t *testing.T) {
var err error
t.Run("Should return nil if path doesn't exist", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: "/tmp/nothing-should-exist-here/test.socket",
Log: testutil.Logger{},
}
@ -560,7 +560,7 @@ func Test_getDpdkInMemorySocketPaths(t *testing.T) {
})
t.Run("Should return nil if can't read the dir", func(t *testing.T) {
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: "/root/no_access",
Log: testutil.Logger{},
}
@ -574,7 +574,7 @@ func Test_getDpdkInMemorySocketPaths(t *testing.T) {
t.Run("Should return one socket from socket path", func(t *testing.T) {
socketPath, _ := createSocketForTest(t, "")
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: socketPath,
Log: testutil.Logger{},
}
@ -589,7 +589,7 @@ func Test_getDpdkInMemorySocketPaths(t *testing.T) {
t.Run("Should return 2 sockets from socket path", func(t *testing.T) {
socketPaths, _ := createMultipleSocketsForTest(t, 2, "")
dpdk := dpdk{
dpdk := Dpdk{
SocketPath: socketPaths[0],
Log: testutil.Logger{},
}
@ -605,7 +605,7 @@ func Test_getDpdkInMemorySocketPaths(t *testing.T) {
func Test_Gather(t *testing.T) {
t.Run("Gather should return error, because socket weren't created", func(t *testing.T) {
mockAcc := &testutil.Accumulator{}
dpdk := dpdk{
dpdk := Dpdk{
Log: testutil.Logger{},
PluginOptions: []string{},
}
@ -619,7 +619,7 @@ func Test_Gather(t *testing.T) {
t.Run("Gather shouldn't return error with UnreachableSocketBehavior: Ignore option, because socket weren't created", func(t *testing.T) {
mockAcc := &testutil.Accumulator{}
dpdk := dpdk{
dpdk := Dpdk{
Log: testutil.Logger{},
PluginOptions: []string{},
UnreachableSocketBehavior: unreachableSocketBehaviorIgnore,

View File

@ -8,7 +8,7 @@ import (
"github.com/docker/docker/api/types/container"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs/docker"
"github.com/influxdata/telegraf/plugins/common/docker"
)
func parseContainerStats(c *Container, acc telegraf.Accumulator, tags map[string]string) {