Linter fixes for plugins/inputs/[de]* (#9379)

This commit is contained in:
Paweł Żak 2021-06-21 17:07:52 +02:00 committed by GitHub
parent 1ba865f0f0
commit 9a794919e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 187 additions and 171 deletions

View File

@ -50,7 +50,7 @@ linters-settings:
- name: error-return - name: error-return
- name: error-strings - name: error-strings
- name: errorf - name: errorf
- name: flag-parameter # - name: flag-parameter #disable for now
- name: function-result-limit - name: function-result-limit
arguments: [ 3 ] arguments: [ 3 ]
- name: identical-branches - name: identical-branches

View File

@ -156,7 +156,7 @@ func (c *ClusterClient) Login(ctx context.Context, sa *ServiceAccount) (*AuthTok
return nil, err return nil, err
} }
loc := c.url("/acs/api/v1/auth/login") loc := c.toURL("/acs/api/v1/auth/login")
req, err := http.NewRequest("POST", loc, bytes.NewBuffer(octets)) req, err := http.NewRequest("POST", loc, bytes.NewBuffer(octets))
if err != nil { if err != nil {
return nil, err return nil, err
@ -208,7 +208,7 @@ func (c *ClusterClient) Login(ctx context.Context, sa *ServiceAccount) (*AuthTok
func (c *ClusterClient) GetSummary(ctx context.Context) (*Summary, error) { func (c *ClusterClient) GetSummary(ctx context.Context) (*Summary, error) {
summary := &Summary{} summary := &Summary{}
err := c.doGet(ctx, c.url("/mesos/master/state-summary"), summary) err := c.doGet(ctx, c.toURL("/mesos/master/state-summary"), summary)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -220,7 +220,7 @@ func (c *ClusterClient) GetContainers(ctx context.Context, node string) ([]Conta
list := []string{} list := []string{}
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers", node) path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers", node)
err := c.doGet(ctx, c.url(path), &list) err := c.doGet(ctx, c.toURL(path), &list)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -233,10 +233,10 @@ func (c *ClusterClient) GetContainers(ctx context.Context, node string) ([]Conta
return containers, nil return containers, nil
} }
func (c *ClusterClient) getMetrics(ctx context.Context, url string) (*Metrics, error) { func (c *ClusterClient) getMetrics(ctx context.Context, address string) (*Metrics, error) {
metrics := &Metrics{} metrics := &Metrics{}
err := c.doGet(ctx, url, metrics) err := c.doGet(ctx, address, metrics)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -246,21 +246,21 @@ func (c *ClusterClient) getMetrics(ctx context.Context, url string) (*Metrics, e
func (c *ClusterClient) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) { func (c *ClusterClient) GetNodeMetrics(ctx context.Context, node string) (*Metrics, error) {
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node) path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/node", node)
return c.getMetrics(ctx, c.url(path)) return c.getMetrics(ctx, c.toURL(path))
} }
func (c *ClusterClient) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) { func (c *ClusterClient) GetContainerMetrics(ctx context.Context, node, container string) (*Metrics, error) {
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s", node, container) path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s", node, container)
return c.getMetrics(ctx, c.url(path)) return c.getMetrics(ctx, c.toURL(path))
} }
func (c *ClusterClient) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) { func (c *ClusterClient) GetAppMetrics(ctx context.Context, node, container string) (*Metrics, error) {
path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s/app", node, container) path := fmt.Sprintf("/system/v1/agent/%s/metrics/v0/containers/%s/app", node, container)
return c.getMetrics(ctx, c.url(path)) return c.getMetrics(ctx, c.toURL(path))
} }
func createGetRequest(url string, token string) (*http.Request, error) { func createGetRequest(address string, token string) (*http.Request, error) {
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", address, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -273,8 +273,8 @@ func createGetRequest(url string, token string) (*http.Request, error) {
return req, nil return req, nil
} }
func (c *ClusterClient) doGet(ctx context.Context, url string, v interface{}) error { func (c *ClusterClient) doGet(ctx context.Context, address string, v interface{}) error {
req, err := createGetRequest(url, c.token) req, err := createGetRequest(address, c.token)
if err != nil { if err != nil {
return err return err
} }
@ -304,7 +304,7 @@ func (c *ClusterClient) doGet(ctx context.Context, url string, v interface{}) er
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return &APIError{ return &APIError{
URL: url, URL: address,
StatusCode: resp.StatusCode, StatusCode: resp.StatusCode,
Title: resp.Status, Title: resp.Status,
} }
@ -318,10 +318,10 @@ func (c *ClusterClient) doGet(ctx context.Context, url string, v interface{}) er
return err return err
} }
func (c *ClusterClient) url(path string) string { func (c *ClusterClient) toURL(path string) string {
url := *c.clusterURL clusterURL := *c.clusterURL
url.Path = path clusterURL.Path = path
return url.String() return clusterURL.String()
} }
func (c *ClusterClient) createLoginToken(sa *ServiceAccount) (string, error) { func (c *ClusterClient) createLoginToken(sa *ServiceAccount) (string, error) {

View File

@ -10,6 +10,7 @@ import (
"time" "time"
jwt "github.com/dgrijalva/jwt-go/v4" jwt "github.com/dgrijalva/jwt-go/v4"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
@ -352,13 +353,13 @@ func (d *DCOS) createClient() (Client, error) {
return nil, err return nil, err
} }
url, err := url.Parse(d.ClusterURL) address, err := url.Parse(d.ClusterURL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
client := NewClusterClient( client := NewClusterClient(
url, address,
time.Duration(d.ResponseTimeout), time.Duration(d.ResponseTimeout),
d.MaxConnections, d.MaxConnections,
tlsCfg, tlsCfg,

View File

@ -2,27 +2,27 @@ package directory_monitor
import ( import (
"bufio" "bufio"
"compress/gzip"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"regexp" "regexp"
"sync"
"time" "time"
"golang.org/x/sync/semaphore"
"gopkg.in/djherbis/times.v1"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/selfstat"
"golang.org/x/sync/semaphore"
"gopkg.in/djherbis/times.v1"
"compress/gzip"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
) )
const sampleConfig = ` const sampleConfig = `
@ -263,9 +263,7 @@ func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Read
if err != nil { if err != nil {
return err return err
} }
if firstLine { firstLine = false
firstLine = false
}
if err := monitor.sendMetrics(metrics); err != nil { if err := monitor.sendMetrics(metrics); err != nil {
return err return err

View File

@ -8,9 +8,10 @@ import (
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
) )
func TestCSVGZImport(t *testing.T) { func TestCSVGZImport(t *testing.T) {
@ -77,8 +78,9 @@ func TestCSVGZImport(t *testing.T) {
// File should have gone back to the test directory, as we configured. // File should have gone back to the test directory, as we configured.
_, err = os.Stat(filepath.Join(finishedDirectory, testCsvFile)) _, err = os.Stat(filepath.Join(finishedDirectory, testCsvFile))
_, err = os.Stat(filepath.Join(finishedDirectory, testCsvGzFile)) require.NoError(t, err)
_, err = os.Stat(filepath.Join(finishedDirectory, testCsvGzFile))
require.NoError(t, err) require.NoError(t, err)
} }

View File

@ -13,7 +13,7 @@ type DiskStats struct {
ps system.PS ps system.PS
// Legacy support // Legacy support
Mountpoints []string `toml:"mountpoints"` LegacyMountPoints []string `toml:"mountpoints"`
MountPoints []string `toml:"mount_points"` MountPoints []string `toml:"mount_points"`
IgnoreFS []string `toml:"ignore_fs"` IgnoreFS []string `toml:"ignore_fs"`
@ -38,8 +38,8 @@ func (ds *DiskStats) SampleConfig() string {
func (ds *DiskStats) Gather(acc telegraf.Accumulator) error { func (ds *DiskStats) Gather(acc telegraf.Accumulator) error {
// Legacy support: // Legacy support:
if len(ds.Mountpoints) != 0 { if len(ds.LegacyMountPoints) != 0 {
ds.MountPoints = ds.Mountpoints ds.MountPoints = ds.LegacyMountPoints
} }
disks, partitions, err := ds.ps.DiskUsage(ds.MountPoints, ds.IgnoreFS) disks, partitions, err := ds.ps.DiskUsage(ds.MountPoints, ds.IgnoreFS)

View File

@ -5,12 +5,12 @@ import (
"os" "os"
"testing" "testing"
"github.com/influxdata/telegraf/plugins/inputs/system" diskUtil "github.com/shirou/gopsutil/disk"
"github.com/influxdata/telegraf/testutil"
"github.com/shirou/gopsutil/disk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs/system"
"github.com/influxdata/telegraf/testutil"
) )
type MockFileInfo struct { type MockFileInfo struct {
@ -25,7 +25,7 @@ func TestDiskUsage(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
var err error var err error
psAll := []disk.PartitionStat{ psAll := []diskUtil.PartitionStat{
{ {
Device: "/dev/sda", Device: "/dev/sda",
Mountpoint: "/", Mountpoint: "/",
@ -39,7 +39,7 @@ func TestDiskUsage(t *testing.T) {
Opts: "rw,noatime,nodiratime,errors=remount-ro", Opts: "rw,noatime,nodiratime,errors=remount-ro",
}, },
} }
duAll := []disk.UsageStat{ duAll := []diskUtil.UsageStat{
{ {
Path: "/", Path: "/",
Fstype: "ext4", Fstype: "ext4",
@ -72,7 +72,7 @@ func TestDiskUsage(t *testing.T) {
numDiskMetrics := acc.NFields() numDiskMetrics := acc.NFields()
expectedAllDiskMetrics := 14 expectedAllDiskMetrics := 14
assert.Equal(t, expectedAllDiskMetrics, numDiskMetrics) require.Equal(t, expectedAllDiskMetrics, numDiskMetrics)
tags1 := map[string]string{ tags1 := map[string]string{
"path": string(os.PathSeparator), "path": string(os.PathSeparator),
@ -111,26 +111,28 @@ func TestDiskUsage(t *testing.T) {
// We expect 6 more DiskMetrics to show up with an explicit match on "/" // We expect 6 more DiskMetrics to show up with an explicit match on "/"
// and /home not matching the /dev in MountPoints // and /home not matching the /dev in MountPoints
err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc) err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc)
assert.Equal(t, expectedAllDiskMetrics+7, acc.NFields()) require.NoError(t, err)
require.Equal(t, expectedAllDiskMetrics+7, acc.NFields())
// We should see all the diskpoints as MountPoints includes both // We should see all the diskpoints as MountPoints includes both
// / and /home // / and /home
err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/home"}}).Gather(&acc) err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/home"}}).Gather(&acc)
assert.Equal(t, 2*expectedAllDiskMetrics+7, acc.NFields()) require.NoError(t, err)
require.Equal(t, 2*expectedAllDiskMetrics+7, acc.NFields())
} }
func TestDiskUsageHostMountPrefix(t *testing.T) { func TestDiskUsageHostMountPrefix(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
partitionStats []disk.PartitionStat partitionStats []diskUtil.PartitionStat
usageStats []*disk.UsageStat usageStats []*diskUtil.UsageStat
hostMountPrefix string hostMountPrefix string
expectedTags map[string]string expectedTags map[string]string
expectedFields map[string]interface{} expectedFields map[string]interface{}
}{ }{
{ {
name: "no host mount prefix", name: "no host mount prefix",
partitionStats: []disk.PartitionStat{ partitionStats: []diskUtil.PartitionStat{
{ {
Device: "/dev/sda", Device: "/dev/sda",
Mountpoint: "/", Mountpoint: "/",
@ -138,7 +140,7 @@ func TestDiskUsageHostMountPrefix(t *testing.T) {
Opts: "ro", Opts: "ro",
}, },
}, },
usageStats: []*disk.UsageStat{ usageStats: []*diskUtil.UsageStat{
{ {
Path: "/", Path: "/",
Total: 42, Total: 42,
@ -162,7 +164,7 @@ func TestDiskUsageHostMountPrefix(t *testing.T) {
}, },
{ {
name: "host mount prefix", name: "host mount prefix",
partitionStats: []disk.PartitionStat{ partitionStats: []diskUtil.PartitionStat{
{ {
Device: "/dev/sda", Device: "/dev/sda",
Mountpoint: "/hostfs/var", Mountpoint: "/hostfs/var",
@ -170,7 +172,7 @@ func TestDiskUsageHostMountPrefix(t *testing.T) {
Opts: "ro", Opts: "ro",
}, },
}, },
usageStats: []*disk.UsageStat{ usageStats: []*diskUtil.UsageStat{
{ {
Path: "/hostfs/var", Path: "/hostfs/var",
Total: 42, Total: 42,
@ -195,7 +197,7 @@ func TestDiskUsageHostMountPrefix(t *testing.T) {
}, },
{ {
name: "host mount prefix exact match", name: "host mount prefix exact match",
partitionStats: []disk.PartitionStat{ partitionStats: []diskUtil.PartitionStat{
{ {
Device: "/dev/sda", Device: "/dev/sda",
Mountpoint: "/hostfs", Mountpoint: "/hostfs",
@ -203,7 +205,7 @@ func TestDiskUsageHostMountPrefix(t *testing.T) {
Opts: "ro", Opts: "ro",
}, },
}, },
usageStats: []*disk.UsageStat{ usageStats: []*diskUtil.UsageStat{
{ {
Path: "/hostfs", Path: "/hostfs",
Total: 42, Total: 42,
@ -259,7 +261,7 @@ func TestDiskStats(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
var err error var err error
duAll := []*disk.UsageStat{ duAll := []*diskUtil.UsageStat{
{ {
Path: "/", Path: "/",
Fstype: "ext4", Fstype: "ext4",
@ -281,7 +283,7 @@ func TestDiskStats(t *testing.T) {
InodesUsed: 2000, InodesUsed: 2000,
}, },
} }
duFiltered := []*disk.UsageStat{ duFiltered := []*diskUtil.UsageStat{
{ {
Path: "/", Path: "/",
Fstype: "ext4", Fstype: "ext4",
@ -294,7 +296,7 @@ func TestDiskStats(t *testing.T) {
}, },
} }
psAll := []*disk.PartitionStat{ psAll := []*diskUtil.PartitionStat{
{ {
Device: "/dev/sda", Device: "/dev/sda",
Mountpoint: "/", Mountpoint: "/",
@ -309,7 +311,7 @@ func TestDiskStats(t *testing.T) {
}, },
} }
psFiltered := []*disk.PartitionStat{ psFiltered := []*diskUtil.PartitionStat{
{ {
Device: "/dev/sda", Device: "/dev/sda",
Mountpoint: "/", Mountpoint: "/",
@ -327,7 +329,7 @@ func TestDiskStats(t *testing.T) {
numDiskMetrics := acc.NFields() numDiskMetrics := acc.NFields()
expectedAllDiskMetrics := 14 expectedAllDiskMetrics := 14
assert.Equal(t, expectedAllDiskMetrics, numDiskMetrics) require.Equal(t, expectedAllDiskMetrics, numDiskMetrics)
tags1 := map[string]string{ tags1 := map[string]string{
"path": "/", "path": "/",
@ -366,10 +368,12 @@ func TestDiskStats(t *testing.T) {
// We expect 6 more DiskMetrics to show up with an explicit match on "/" // We expect 6 more DiskMetrics to show up with an explicit match on "/"
// and /home not matching the /dev in MountPoints // and /home not matching the /dev in MountPoints
err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc) err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/dev"}}).Gather(&acc)
assert.Equal(t, expectedAllDiskMetrics+7, acc.NFields()) require.NoError(t, err)
require.Equal(t, expectedAllDiskMetrics+7, acc.NFields())
// We should see all the diskpoints as MountPoints includes both // We should see all the diskpoints as MountPoints includes both
// / and /home // / and /home
err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/home"}}).Gather(&acc) err = (&DiskStats{ps: &mps, MountPoints: []string{"/", "/home"}}).Gather(&acc)
assert.Equal(t, 2*expectedAllDiskMetrics+7, acc.NFields()) require.NoError(t, err)
require.Equal(t, 2*expectedAllDiskMetrics+7, acc.NFields())
} }

View File

@ -74,11 +74,11 @@ func hasMeta(s string) bool {
func (d *DiskIO) init() error { func (d *DiskIO) init() error {
for _, device := range d.Devices { for _, device := range d.Devices {
if hasMeta(device) { if hasMeta(device) {
filter, err := filter.Compile(d.Devices) deviceFilter, err := filter.Compile(d.Devices)
if err != nil { if err != nil {
return fmt.Errorf("error compiling device pattern: %s", err.Error()) return fmt.Errorf("error compiling device pattern: %s", err.Error())
} }
d.deviceFilter = filter d.deviceFilter = deviceFilter
} }
} }
d.initialized = true d.initialized = true

View File

@ -92,12 +92,14 @@ func TestDiskIOStats_diskName(t *testing.T) {
} }
for _, tc := range tests { for _, tc := range tests {
s := DiskIO{ func() {
NameTemplates: tc.templates, s := DiskIO{
} NameTemplates: tc.templates,
defer setupNullDisk(t, &s, "null")() }
name, _ := s.diskName("null") defer setupNullDisk(t, &s, "null")() //nolint:revive // done on purpose, cleaning will be executed properly
require.Equal(t, tc.expected, name, "Templates: %#v", tc.templates) name, _ := s.diskName("null")
require.Equal(t, tc.expected, name, "Templates: %#v", tc.templates)
}()
} }
} }
@ -107,7 +109,7 @@ func TestDiskIOStats_diskTags(t *testing.T) {
s := &DiskIO{ s := &DiskIO{
DeviceTags: []string{"MY_PARAM_2"}, DeviceTags: []string{"MY_PARAM_2"},
} }
defer setupNullDisk(t, s, "null")() defer setupNullDisk(t, s, "null")() //nolint:revive // done on purpose, cleaning will be executed properly
dt := s.diskTags("null") dt := s.diskTags("null")
require.Equal(t, map[string]string{"MY_PARAM_2": "myval2"}, dt) require.Equal(t, map[string]string{"MY_PARAM_2": "myval2"}, dt)
} }

View File

@ -65,10 +65,10 @@ var ErrProtocolError = errors.New("disque protocol error")
// Returns one of the errors encountered while gather stats (if any). // Returns one of the errors encountered while gather stats (if any).
func (d *Disque) Gather(acc telegraf.Accumulator) error { func (d *Disque) Gather(acc telegraf.Accumulator) error {
if len(d.Servers) == 0 { if len(d.Servers) == 0 {
url := &url.URL{ address := &url.URL{
Host: ":7711", Host: ":7711",
} }
return d.gatherServer(url, acc) return d.gatherServer(address, acc)
} }
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -16,9 +16,9 @@ import (
type ResultType uint64 type ResultType uint64
const ( const (
Success ResultType = 0 Success ResultType = iota
Timeout = 1 Timeout
Error = 2 Error
) )
type DNSQuery struct { type DNSQuery struct {

View File

@ -7,7 +7,7 @@ import (
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm" "github.com/docker/docker/api/types/swarm"
docker "github.com/docker/docker/client" dockerClient "github.com/docker/docker/client"
) )
var ( var (
@ -27,7 +27,7 @@ type Client interface {
} }
func NewEnvClient() (Client, error) { func NewEnvClient() (Client, error) {
client, err := docker.NewClientWithOpts(docker.FromEnv) client, err := dockerClient.NewClientWithOpts(dockerClient.FromEnv)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -40,11 +40,11 @@ func NewClient(host string, tlsConfig *tls.Config) (Client, error) {
} }
httpClient := &http.Client{Transport: transport} httpClient := &http.Client{Transport: transport}
client, err := docker.NewClientWithOpts( client, err := dockerClient.NewClientWithOpts(
docker.WithHTTPHeaders(defaultHeaders), dockerClient.WithHTTPHeaders(defaultHeaders),
docker.WithHTTPClient(httpClient), dockerClient.WithHTTPClient(httpClient),
docker.WithVersion(version), dockerClient.WithVersion(version),
docker.WithHost(host)) dockerClient.WithHost(host))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -53,7 +53,7 @@ func NewClient(host string, tlsConfig *tls.Config) (Client, error) {
} }
type SocketClient struct { type SocketClient struct {
client *docker.Client client *dockerClient.Client
} }
func (c *SocketClient) Info(ctx context.Context) (types.Info, error) { func (c *SocketClient) Info(ctx context.Context) (types.Info, error) {

View File

@ -15,11 +15,12 @@ import (
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm" "github.com/docker/docker/api/types/swarm"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/internal/docker" dockerint "github.com/influxdata/telegraf/internal/docker"
tlsint "github.com/influxdata/telegraf/plugins/common/tls" tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -513,7 +514,7 @@ func (d *Docker) gatherContainer(
return nil return nil
} }
imageName, imageVersion := docker.ParseImage(container.Image) imageName, imageVersion := dockerint.ParseImage(container.Image)
tags := map[string]string{ tags := map[string]string{
"engine_host": d.engineHost, "engine_host": d.engineHost,
@ -628,18 +629,16 @@ func (d *Docker) gatherContainerInspect(
} }
} }
parseContainerStats(v, acc, tags, container.ID, d.PerDeviceInclude, d.TotalInclude, daemonOSType) d.parseContainerStats(v, acc, tags, container.ID, daemonOSType)
return nil return nil
} }
func parseContainerStats( func (d *Docker) parseContainerStats(
stat *types.StatsJSON, stat *types.StatsJSON,
acc telegraf.Accumulator, acc telegraf.Accumulator,
tags map[string]string, tags map[string]string,
id string, id string,
perDeviceInclude []string,
totalInclude []string,
daemonOSType string, daemonOSType string,
) { ) {
tm := stat.Read tm := stat.Read
@ -708,7 +707,7 @@ func parseContainerStats(
acc.AddFields("docker_container_mem", memfields, tags, tm) acc.AddFields("docker_container_mem", memfields, tags, tm)
if choice.Contains("cpu", totalInclude) { if choice.Contains("cpu", d.TotalInclude) {
cpufields := map[string]interface{}{ cpufields := map[string]interface{}{
"usage_total": stat.CPUStats.CPUUsage.TotalUsage, "usage_total": stat.CPUStats.CPUUsage.TotalUsage,
"usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode, "usage_in_usermode": stat.CPUStats.CPUUsage.UsageInUsermode,
@ -735,7 +734,7 @@ func parseContainerStats(
acc.AddFields("docker_container_cpu", cpufields, cputags, tm) acc.AddFields("docker_container_cpu", cpufields, cputags, tm)
} }
if choice.Contains("cpu", perDeviceInclude) && len(stat.CPUStats.CPUUsage.PercpuUsage) > 0 { if choice.Contains("cpu", d.PerDeviceInclude) && len(stat.CPUStats.CPUUsage.PercpuUsage) > 0 {
// If we have OnlineCPUs field, then use it to restrict stats gathering to only Online CPUs // If we have OnlineCPUs field, then use it to restrict stats gathering to only Online CPUs
// (https://github.com/moby/moby/commit/115f91d7575d6de6c7781a96a082f144fd17e400) // (https://github.com/moby/moby/commit/115f91d7575d6de6c7781a96a082f144fd17e400)
var percpuusage []uint64 var percpuusage []uint64
@ -770,12 +769,12 @@ func parseContainerStats(
"container_id": id, "container_id": id,
} }
// Create a new network tag dictionary for the "network" tag // Create a new network tag dictionary for the "network" tag
if choice.Contains("network", perDeviceInclude) { if choice.Contains("network", d.PerDeviceInclude) {
nettags := copyTags(tags) nettags := copyTags(tags)
nettags["network"] = network nettags["network"] = network
acc.AddFields("docker_container_net", netfields, nettags, tm) acc.AddFields("docker_container_net", netfields, nettags, tm)
} }
if choice.Contains("network", totalInclude) { if choice.Contains("network", d.TotalInclude) {
for field, value := range netfields { for field, value := range netfields {
if field == "container_id" { if field == "container_id" {
continue continue
@ -802,17 +801,14 @@ func parseContainerStats(
} }
// totalNetworkStatMap could be empty if container is running with --net=host. // totalNetworkStatMap could be empty if container is running with --net=host.
if choice.Contains("network", totalInclude) && len(totalNetworkStatMap) != 0 { if choice.Contains("network", d.TotalInclude) && len(totalNetworkStatMap) != 0 {
nettags := copyTags(tags) nettags := copyTags(tags)
nettags["network"] = "total" nettags["network"] = "total"
totalNetworkStatMap["container_id"] = id totalNetworkStatMap["container_id"] = id
acc.AddFields("docker_container_net", totalNetworkStatMap, nettags, tm) acc.AddFields("docker_container_net", totalNetworkStatMap, nettags, tm)
} }
perDeviceBlkio := choice.Contains("blkio", perDeviceInclude) d.gatherBlockIOMetrics(acc, stat, tags, tm, id)
totalBlkio := choice.Contains("blkio", totalInclude)
gatherBlockIOMetrics(stat, acc, tags, tm, id, perDeviceBlkio, totalBlkio)
} }
// Make a map of devices to their block io stats // Make a map of devices to their block io stats
@ -877,27 +873,27 @@ func getDeviceStatMap(blkioStats types.BlkioStats) map[string]map[string]interfa
return deviceStatMap return deviceStatMap
} }
func gatherBlockIOMetrics( func (d *Docker) gatherBlockIOMetrics(
stat *types.StatsJSON,
acc telegraf.Accumulator, acc telegraf.Accumulator,
stat *types.StatsJSON,
tags map[string]string, tags map[string]string,
tm time.Time, tm time.Time,
id string, id string,
perDevice bool,
total bool,
) { ) {
perDeviceBlkio := choice.Contains("blkio", d.PerDeviceInclude)
totalBlkio := choice.Contains("blkio", d.TotalInclude)
blkioStats := stat.BlkioStats blkioStats := stat.BlkioStats
deviceStatMap := getDeviceStatMap(blkioStats) deviceStatMap := getDeviceStatMap(blkioStats)
totalStatMap := make(map[string]interface{}) totalStatMap := make(map[string]interface{})
for device, fields := range deviceStatMap { for device, fields := range deviceStatMap {
fields["container_id"] = id fields["container_id"] = id
if perDevice { if perDeviceBlkio {
iotags := copyTags(tags) iotags := copyTags(tags)
iotags["device"] = device iotags["device"] = device
acc.AddFields("docker_container_blkio", fields, iotags, tm) acc.AddFields("docker_container_blkio", fields, iotags, tm)
} }
if total { if totalBlkio {
for field, value := range fields { for field, value := range fields {
if field == "container_id" { if field == "container_id" {
continue continue
@ -922,7 +918,7 @@ func gatherBlockIOMetrics(
} }
} }
} }
if total { if totalBlkio {
totalStatMap["container_id"] = id totalStatMap["container_id"] = id
iotags := copyTags(tags) iotags := copyTags(tags)
iotags["device"] = "total" iotags["device"] = "total"
@ -965,20 +961,20 @@ func (d *Docker) createContainerFilters() error {
d.ContainerInclude = append(d.ContainerInclude, d.ContainerNames...) d.ContainerInclude = append(d.ContainerInclude, d.ContainerNames...)
} }
filter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude) containerFilter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude)
if err != nil { if err != nil {
return err return err
} }
d.containerFilter = filter d.containerFilter = containerFilter
return nil return nil
} }
func (d *Docker) createLabelFilters() error { func (d *Docker) createLabelFilters() error {
filter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude) labelFilter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude)
if err != nil { if err != nil {
return err return err
} }
d.labelFilter = filter d.labelFilter = labelFilter
return nil return nil
} }
@ -986,11 +982,11 @@ func (d *Docker) createContainerStateFilters() error {
if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 { if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 {
d.ContainerStateInclude = []string{"running"} d.ContainerStateInclude = []string{"running"}
} }
filter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude) stateFilter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude)
if err != nil { if err != nil {
return err return err
} }
d.stateFilter = filter d.stateFilter = stateFilter
return nil return nil
} }

View File

@ -12,10 +12,11 @@ import (
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm" "github.com/docker/docker/api/types/swarm"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
) )
type MockClient struct { type MockClient struct {
@ -120,7 +121,12 @@ func TestDockerGatherContainerStats(t *testing.T) {
"container_image": "redis/image", "container_image": "redis/image",
} }
parseContainerStats(stats, &acc, tags, "123456789", containerMetricClasses, containerMetricClasses, "linux") d := &Docker{
Log: testutil.Logger{},
PerDeviceInclude: containerMetricClasses,
TotalInclude: containerMetricClasses,
}
d.parseContainerStats(stats, &acc, tags, "123456789", "linux")
// test docker_container_net measurement // test docker_container_net measurement
netfields := map[string]interface{}{ netfields := map[string]interface{}{
@ -1270,8 +1276,12 @@ func Test_parseContainerStatsPerDeviceAndTotal(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
parseContainerStats(tt.args.stat, &acc, tt.args.tags, tt.args.id, tt.args.perDeviceInclude, d := &Docker{
tt.args.totalInclude, tt.args.daemonOSType) Log: testutil.Logger{},
PerDeviceInclude: tt.args.perDeviceInclude,
TotalInclude: tt.args.totalInclude,
}
d.parseContainerStats(tt.args.stat, &acc, tt.args.tags, tt.args.id, tt.args.daemonOSType)
actual := FilterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool { actual := FilterMetrics(acc.GetTelegrafMetrics(), func(m telegraf.Metric) bool {
return choice.Contains(m.Name(), return choice.Contains(m.Name(),

View File

@ -15,6 +15,7 @@ import (
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/filters"
"github.com/docker/docker/pkg/stdcopy" "github.com/docker/docker/pkg/stdcopy"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
@ -307,8 +308,7 @@ func (d *DockerLogs) tailContainerLogs(
func parseLine(line []byte) (time.Time, string, error) { func parseLine(line []byte) (time.Time, string, error) {
parts := bytes.SplitN(line, []byte(" "), 2) parts := bytes.SplitN(line, []byte(" "), 2)
switch len(parts) { if len(parts) == 1 {
case 1:
parts = append(parts, []byte("")) parts = append(parts, []byte(""))
} }
@ -421,20 +421,20 @@ func (d *DockerLogs) Stop() {
// Following few functions have been inherited from telegraf docker input plugin // Following few functions have been inherited from telegraf docker input plugin
func (d *DockerLogs) createContainerFilters() error { func (d *DockerLogs) createContainerFilters() error {
filter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude) containerFilter, err := filter.NewIncludeExcludeFilter(d.ContainerInclude, d.ContainerExclude)
if err != nil { if err != nil {
return err return err
} }
d.containerFilter = filter d.containerFilter = containerFilter
return nil return nil
} }
func (d *DockerLogs) createLabelFilters() error { func (d *DockerLogs) createLabelFilters() error {
filter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude) labelFilter, err := filter.NewIncludeExcludeFilter(d.LabelInclude, d.LabelExclude)
if err != nil { if err != nil {
return err return err
} }
d.labelFilter = filter d.labelFilter = labelFilter
return nil return nil
} }
@ -442,11 +442,11 @@ func (d *DockerLogs) createContainerStateFilters() error {
if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 { if len(d.ContainerStateInclude) == 0 && len(d.ContainerStateExclude) == 0 {
d.ContainerStateInclude = []string{"running"} d.ContainerStateInclude = []string{"running"}
} }
filter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude) stateFilter, err := filter.NewIncludeExcludeFilter(d.ContainerStateInclude, d.ContainerStateExclude)
if err != nil { if err != nil {
return err return err
} }
d.stateFilter = filter d.stateFilter = stateFilter
return nil return nil
} }

View File

@ -220,20 +220,20 @@ func mergeTags(a map[string]string, b map[string]string) map[string]string {
} }
func (ecs *Ecs) createContainerNameFilters() error { func (ecs *Ecs) createContainerNameFilters() error {
filter, err := filter.NewIncludeExcludeFilter(ecs.ContainerNameInclude, ecs.ContainerNameExclude) containerNameFilter, err := filter.NewIncludeExcludeFilter(ecs.ContainerNameInclude, ecs.ContainerNameExclude)
if err != nil { if err != nil {
return err return err
} }
ecs.containerNameFilter = filter ecs.containerNameFilter = containerNameFilter
return nil return nil
} }
func (ecs *Ecs) createLabelFilters() error { func (ecs *Ecs) createLabelFilters() error {
filter, err := filter.NewIncludeExcludeFilter(ecs.LabelInclude, ecs.LabelExclude) labelFilter, err := filter.NewIncludeExcludeFilter(ecs.LabelInclude, ecs.LabelExclude)
if err != nil { if err != nil {
return err return err
} }
ecs.labelFilter = filter ecs.labelFilter = labelFilter
return nil return nil
} }
@ -250,11 +250,11 @@ func (ecs *Ecs) createContainerStatusFilters() error {
ecs.ContainerStatusExclude[i] = strings.ToUpper(exclude) ecs.ContainerStatusExclude[i] = strings.ToUpper(exclude)
} }
filter, err := filter.NewIncludeExcludeFilter(ecs.ContainerStatusInclude, ecs.ContainerStatusExclude) statusFilter, err := filter.NewIncludeExcludeFilter(ecs.ContainerStatusInclude, ecs.ContainerStatusExclude)
if err != nil { if err != nil {
return err return err
} }
ecs.statusFilter = filter ecs.statusFilter = statusFilter
return nil return nil
} }

View File

@ -644,7 +644,8 @@ func (e *Elasticsearch) gatherSingleIndexStats(name string, index indexStat, now
// determine shard tag and primary/replica designation // determine shard tag and primary/replica designation
shardType := "replica" shardType := "replica"
if flattened.Fields["routing_primary"] == true { routingPrimary, _ := flattened.Fields["routing_primary"].(bool)
if routingPrimary {
shardType = "primary" shardType = "primary"
} }
delete(flattened.Fields, "routing_primary") delete(flattened.Fields, "routing_primary")

View File

@ -6,9 +6,9 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil"
) )
func defaultTags() map[string]string { func defaultTags() map[string]string {
@ -206,8 +206,8 @@ func TestGatherClusterStatsMaster(t *testing.T) {
info.masterID = masterID info.masterID = masterID
es.serverInfo["http://example.com:9200"] = info es.serverInfo["http://example.com:9200"] = info
IsMasterResultTokens := strings.Split(string(IsMasterResult), " ") isMasterResultTokens := strings.Split(IsMasterResult, " ")
require.Equal(t, masterID, IsMasterResultTokens[0], "catmaster is incorrect") require.Equal(t, masterID, isMasterResultTokens[0], "catmaster is incorrect")
// now get node status, which determines whether we're master // now get node status, which determines whether we're master
var acc testutil.Accumulator var acc testutil.Accumulator
@ -244,8 +244,8 @@ func TestGatherClusterStatsNonMaster(t *testing.T) {
masterID, err := es.getCatMaster("junk") masterID, err := es.getCatMaster("junk")
require.NoError(t, err) require.NoError(t, err)
IsNotMasterResultTokens := strings.Split(string(IsNotMasterResult), " ") isNotMasterResultTokens := strings.Split(IsNotMasterResult, " ")
require.Equal(t, masterID, IsNotMasterResultTokens[0], "catmaster is incorrect") require.Equal(t, masterID, isNotMasterResultTokens[0], "catmaster is incorrect")
// now get node status, which determines whether we're master // now get node status, which determines whether we're master
var acc testutil.Accumulator var acc testutil.Accumulator

View File

@ -6,15 +6,16 @@ import (
"net" "net"
"sync" "sync"
"github.com/pkg/errors"
ethtoolLib "github.com/safchain/ethtool"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/pkg/errors"
"github.com/safchain/ethtool"
) )
type CommandEthtool struct { type CommandEthtool struct {
ethtool *ethtool.Ethtool ethtool *ethtoolLib.Ethtool
} }
func (e *Ethtool) Gather(acc telegraf.Accumulator) error { func (e *Ethtool) Gather(acc telegraf.Accumulator) error {
@ -98,7 +99,7 @@ func (c *CommandEthtool) Init() error {
return nil return nil
} }
e, err := ethtool.NewEthtool() e, err := ethtoolLib.NewEthtool()
if err == nil { if err == nil {
c.ethtool = e c.ethtool = e
} }

View File

@ -6,9 +6,10 @@ import (
"net" "net"
"testing" "testing"
"github.com/influxdata/telegraf/testutil"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/influxdata/telegraf/testutil"
) )
var command *Ethtool var command *Ethtool
@ -31,13 +32,12 @@ func (c *CommandEthtoolMock) Init() error {
return nil return nil
} }
func (c *CommandEthtoolMock) DriverName(intf string) (driverName string, err error) { func (c *CommandEthtoolMock) DriverName(intf string) (string, error) {
i := c.InterfaceMap[intf] i := c.InterfaceMap[intf]
if i != nil { if i != nil {
driverName = i.DriverName return i.DriverName, nil
return
} }
return driverName, errors.New("interface not found") return "", errors.New("interface not found")
} }
func (c *CommandEthtoolMock) Interfaces() ([]net.Interface, error) { func (c *CommandEthtoolMock) Interfaces() ([]net.Interface, error) {
@ -66,13 +66,12 @@ func (c *CommandEthtoolMock) Interfaces() ([]net.Interface, error) {
return interfaceNames, nil return interfaceNames, nil
} }
func (c *CommandEthtoolMock) Stats(intf string) (stat map[string]uint64, err error) { func (c *CommandEthtoolMock) Stats(intf string) (map[string]uint64, error) {
i := c.InterfaceMap[intf] i := c.InterfaceMap[intf]
if i != nil { if i != nil {
stat = i.Stat return i.Stat, nil
return
} }
return stat, errors.New("interface not found") return nil, errors.New("interface not found")
} }
func setup() { func setup() {

View File

@ -6,8 +6,9 @@ import (
"sync" "sync"
"time" "time"
eventhub "github.com/Azure/azure-event-hubs-go/v3" eventhubClient "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/persist" "github.com/Azure/azure-event-hubs-go/v3/persist"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -54,7 +55,7 @@ type EventHub struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
// Azure // Azure
hub *eventhub.Hub hub *eventhubClient.Hub
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
@ -172,7 +173,7 @@ func (e *EventHub) Init() (err error) {
} }
// Set hub options // Set hub options
hubOpts := []eventhub.HubOption{} hubOpts := []eventhubClient.HubOption{}
if e.PersistenceDir != "" { if e.PersistenceDir != "" {
persister, err := persist.NewFilePersister(e.PersistenceDir) persister, err := persist.NewFilePersister(e.PersistenceDir)
@ -180,20 +181,20 @@ func (e *EventHub) Init() (err error) {
return err return err
} }
hubOpts = append(hubOpts, eventhub.HubWithOffsetPersistence(persister)) hubOpts = append(hubOpts, eventhubClient.HubWithOffsetPersistence(persister))
} }
if e.UserAgent != "" { if e.UserAgent != "" {
hubOpts = append(hubOpts, eventhub.HubWithUserAgent(e.UserAgent)) hubOpts = append(hubOpts, eventhubClient.HubWithUserAgent(e.UserAgent))
} else { } else {
hubOpts = append(hubOpts, eventhub.HubWithUserAgent(internal.ProductToken())) hubOpts = append(hubOpts, eventhubClient.HubWithUserAgent(internal.ProductToken()))
} }
// Create event hub connection // Create event hub connection
if e.ConnectionString != "" { if e.ConnectionString != "" {
e.hub, err = eventhub.NewHubFromConnectionString(e.ConnectionString, hubOpts...) e.hub, err = eventhubClient.NewHubFromConnectionString(e.ConnectionString, hubOpts...)
} else { } else {
e.hub, err = eventhub.NewHubFromEnvironment(hubOpts...) e.hub, err = eventhubClient.NewHubFromEnvironment(hubOpts...)
} }
return err return err
@ -236,25 +237,25 @@ func (e *EventHub) Start(acc telegraf.Accumulator) error {
return nil return nil
} }
func (e *EventHub) configureReceiver() []eventhub.ReceiveOption { func (e *EventHub) configureReceiver() []eventhubClient.ReceiveOption {
receiveOpts := []eventhub.ReceiveOption{} receiveOpts := []eventhubClient.ReceiveOption{}
if e.ConsumerGroup != "" { if e.ConsumerGroup != "" {
receiveOpts = append(receiveOpts, eventhub.ReceiveWithConsumerGroup(e.ConsumerGroup)) receiveOpts = append(receiveOpts, eventhubClient.ReceiveWithConsumerGroup(e.ConsumerGroup))
} }
if !e.FromTimestamp.IsZero() { if !e.FromTimestamp.IsZero() {
receiveOpts = append(receiveOpts, eventhub.ReceiveFromTimestamp(e.FromTimestamp)) receiveOpts = append(receiveOpts, eventhubClient.ReceiveFromTimestamp(e.FromTimestamp))
} else if e.Latest { } else if e.Latest {
receiveOpts = append(receiveOpts, eventhub.ReceiveWithLatestOffset()) receiveOpts = append(receiveOpts, eventhubClient.ReceiveWithLatestOffset())
} }
if e.PrefetchCount != 0 { if e.PrefetchCount != 0 {
receiveOpts = append(receiveOpts, eventhub.ReceiveWithPrefetchCount(e.PrefetchCount)) receiveOpts = append(receiveOpts, eventhubClient.ReceiveWithPrefetchCount(e.PrefetchCount))
} }
if e.Epoch != 0 { if e.Epoch != 0 {
receiveOpts = append(receiveOpts, eventhub.ReceiveWithEpoch(e.Epoch)) receiveOpts = append(receiveOpts, eventhubClient.ReceiveWithEpoch(e.Epoch))
} }
return receiveOpts return receiveOpts
@ -263,7 +264,7 @@ func (e *EventHub) configureReceiver() []eventhub.ReceiveOption {
// OnMessage handles an Event. When this function returns without error the // OnMessage handles an Event. When this function returns without error the
// Event is immediately accepted and the offset is updated. If an error is // Event is immediately accepted and the offset is updated. If an error is
// returned the Event is marked for redelivery. // returned the Event is marked for redelivery.
func (e *EventHub) onMessage(ctx context.Context, event *eventhub.Event) error { func (e *EventHub) onMessage(ctx context.Context, event *eventhubClient.Event) error {
metrics, err := e.createMetrics(event) metrics, err := e.createMetrics(event)
if err != nil { if err != nil {
return err return err
@ -345,7 +346,7 @@ func deepCopyMetrics(in []telegraf.Metric) []telegraf.Metric {
} }
// CreateMetrics returns the Metrics from the Event. // CreateMetrics returns the Metrics from the Event.
func (e *EventHub) createMetrics(event *eventhub.Event) ([]telegraf.Metric, error) { func (e *EventHub) createMetrics(event *eventhubClient.Event) ([]telegraf.Metric, error) {
metrics, err := e.parser.Parse(event.Data) metrics, err := e.parser.Parse(event.Data)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -4,20 +4,21 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"os/exec" osExec "os/exec"
"path/filepath" "path/filepath"
"runtime" "runtime"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/kballard/go-shellquote"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios" "github.com/influxdata/telegraf/plugins/parsers/nagios"
"github.com/kballard/go-shellquote"
) )
const sampleConfig = ` const sampleConfig = `
@ -76,7 +77,7 @@ func (c CommandRunner) Run(
return nil, nil, fmt.Errorf("exec: unable to parse command, %s", err) return nil, nil, fmt.Errorf("exec: unable to parse command, %s", err)
} }
cmd := exec.Command(splitCmd[0], splitCmd[1:]...) cmd := osExec.Command(splitCmd[0], splitCmd[1:]...)
var ( var (
out bytes.Buffer out bytes.Buffer

View File

@ -139,8 +139,8 @@ func (tm *TestMetricMaker) LogName() string {
return tm.Name() return tm.Name()
} }
func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric { func (tm *TestMetricMaker) MakeMetric(aMetric telegraf.Metric) telegraf.Metric {
return metric return aMetric
} }
func (tm *TestMetricMaker) Log() telegraf.Logger { func (tm *TestMetricMaker) Log() telegraf.Logger {