chore: fix linter findings for unparam and revive.unused-parameter (#12150)

This commit is contained in:
Paweł Żak 2022-11-08 20:04:12 +01:00 committed by GitHub
parent 7807847356
commit 6816aefcd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
100 changed files with 478 additions and 755 deletions

View File

@ -23,6 +23,7 @@ linters:
- tparallel
- typecheck
- unconvert
- unparam
- unused
linters-settings:
@ -115,7 +116,7 @@ linters-settings:
arguments: [ "outputBuffer.Write", "fmt.Printf", "fmt.Println", "fmt.Print", "fmt.Fprintf", "fmt.Fprint", "fmt.Fprintln" ]
- name: unnecessary-stmt
- name: unreachable-code
# - name: unused-parameter
- name: unused-parameter
- name: var-declaration
- name: var-naming
- name: waitgroup-by-value

View File

@ -24,11 +24,11 @@ type Agent struct {
}
// NewAgent returns an Agent for the given Config.
func NewAgent(cfg *config.Config) (*Agent, error) {
func NewAgent(cfg *config.Config) *Agent {
a := &Agent{
Config: cfg,
}
return a, nil
return a
}
// inputUnit is a group of input plugins and the shared channel they write to.

View File

@ -4,108 +4,107 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/config"
_ "github.com/influxdata/telegraf/plugins/inputs/all"
_ "github.com/influxdata/telegraf/plugins/outputs/all"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAgent_OmitHostname(t *testing.T) {
c := config.NewConfig()
c.Agent.OmitHostname = true
_, err := NewAgent(c)
assert.NoError(t, err)
assert.NotContains(t, c.Tags, "host")
_ = NewAgent(c)
require.NotContains(t, c.Tags, "host")
}
func TestAgent_LoadPlugin(t *testing.T) {
c := config.NewConfig()
c.InputFilters = []string{"mysql"}
err := c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ := NewAgent(c)
assert.Equal(t, 1, len(a.Config.Inputs))
require.NoError(t, err)
a := NewAgent(c)
require.Equal(t, 1, len(a.Config.Inputs))
c = config.NewConfig()
c.InputFilters = []string{"foo"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 0, len(a.Config.Inputs))
require.NoError(t, err)
a = NewAgent(c)
require.Equal(t, 0, len(a.Config.Inputs))
c = config.NewConfig()
c.InputFilters = []string{"mysql", "foo"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 1, len(a.Config.Inputs))
require.NoError(t, err)
a = NewAgent(c)
require.Equal(t, 1, len(a.Config.Inputs))
c = config.NewConfig()
c.InputFilters = []string{"mysql", "redis"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 2, len(a.Config.Inputs))
require.NoError(t, err)
a = NewAgent(c)
require.Equal(t, 2, len(a.Config.Inputs))
c = config.NewConfig()
c.InputFilters = []string{"mysql", "foo", "redis", "bar"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 2, len(a.Config.Inputs))
require.NoError(t, err)
a = NewAgent(c)
require.Equal(t, 2, len(a.Config.Inputs))
}
func TestAgent_LoadOutput(t *testing.T) {
c := config.NewConfig()
c.OutputFilters = []string{"influxdb"}
err := c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ := NewAgent(c)
assert.Equal(t, 2, len(a.Config.Outputs))
require.NoError(t, err)
a := NewAgent(c)
require.Equal(t, 2, len(a.Config.Outputs))
c = config.NewConfig()
c.OutputFilters = []string{"kafka"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 1, len(a.Config.Outputs))
require.NoError(t, err)
a = NewAgent(c)
require.Equal(t, 1, len(a.Config.Outputs))
c = config.NewConfig()
c.OutputFilters = []string{}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))
require.NoError(t, err)
a = NewAgent(c)
require.Equal(t, 3, len(a.Config.Outputs))
c = config.NewConfig()
c.OutputFilters = []string{"foo"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 0, len(a.Config.Outputs))
require.NoError(t, err)
a = NewAgent(c)
require.Equal(t, 0, len(a.Config.Outputs))
c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "foo"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 2, len(a.Config.Outputs))
require.NoError(t, err)
a = NewAgent(c)
require.Equal(t, 2, len(a.Config.Outputs))
c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "kafka"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
assert.Equal(t, 3, len(c.Outputs))
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))
require.NoError(t, err)
require.Equal(t, 3, len(c.Outputs))
a = NewAgent(c)
require.Equal(t, 3, len(a.Config.Outputs))
c = config.NewConfig()
c.OutputFilters = []string{"influxdb", "foo", "kafka", "bar"}
err = c.LoadConfig("../config/testdata/telegraf-agent.toml")
assert.NoError(t, err)
a, _ = NewAgent(c)
assert.Equal(t, 3, len(a.Config.Outputs))
require.NoError(t, err)
a = NewAgent(c)
require.Equal(t, 3, len(a.Config.Outputs))
}
func TestWindow(t *testing.T) {

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/benbjohnson/clock"
"github.com/influxdata/telegraf/internal"
)
@ -137,7 +138,7 @@ func NewUnalignedTicker(interval, jitter, offset time.Duration) *UnalignedTicker
return t
}
func (t *UnalignedTicker) start(clk clock.Clock) *UnalignedTicker {
func (t *UnalignedTicker) start(clk clock.Clock) {
t.ch = make(chan time.Time, 1)
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
@ -153,8 +154,6 @@ func (t *UnalignedTicker) start(clk clock.Clock) *UnalignedTicker {
defer t.wg.Done()
t.run(ctx, ticker, clk)
}()
return t
}
func sleep(ctx context.Context, duration time.Duration, clk clock.Clock) error {
@ -234,7 +233,7 @@ func NewRollingTicker(interval, jitter time.Duration) *RollingTicker {
return t
}
func (t *RollingTicker) start(clk clock.Clock) *RollingTicker {
func (t *RollingTicker) start(clk clock.Clock) {
t.ch = make(chan time.Time, 1)
ctx, cancel := context.WithCancel(context.Background())
@ -248,8 +247,6 @@ func (t *RollingTicker) start(clk clock.Clock) *RollingTicker {
defer t.wg.Done()
t.run(ctx, timer)
}()
return t
}
func (t *RollingTicker) next() time.Duration {

View File

@ -27,7 +27,7 @@ func NewMockTelegraf() *MockTelegraf {
return &MockTelegraf{}
}
func (m *MockTelegraf) Init(serverErr <-chan error, f Filters, g GlobalFlags, w WindowFlags) {
func (m *MockTelegraf) Init(_ <-chan error, _ Filters, g GlobalFlags, w WindowFlags) {
m.GlobalFlags = g
m.WindowFlags = w
}
@ -47,7 +47,7 @@ func NewMockConfig(buffer io.Writer) *MockConfig {
}
}
func (m *MockConfig) CollectDeprecationInfos(inFilter, outFilter, aggFilter, procFilter []string) map[string][]config.PluginDeprecationInfo {
func (m *MockConfig) CollectDeprecationInfos(_, _, _, _ []string) map[string][]config.PluginDeprecationInfo {
return m.ExpectedDeprecatedPlugins
}
@ -65,7 +65,7 @@ func NewMockServer() *MockServer {
return &MockServer{}
}
func (m *MockServer) Start(address string) {
func (m *MockServer) Start(_ string) {
m.Address = "localhost:6060"
}

View File

@ -252,10 +252,7 @@ func (t *Telegraf) runAgent(ctx context.Context) error {
log.Printf("W! Deprecated outputs: %d and %d options", count[0], count[1])
}
ag, err := agent.NewAgent(c)
if err != nil {
return err
}
ag := agent.NewAgent(c)
// Notify systemd that telegraf is ready
// SdNotify() only tries to notify if the NOTIFY_SOCKET environment is set, so it's safe to call when systemd isn't present.

View File

@ -736,17 +736,13 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
}
parser := creator(parentname)
conf, err := c.buildParser(parentname, table)
if err != nil {
return nil, err
}
conf := c.buildParser(parentname, table)
if err := c.toml.UnmarshalTable(table, parser); err != nil {
return nil, err
}
running := models.NewRunningParser(parser, conf)
err = running.Init()
err := running.Init()
return running, err
}
@ -1051,16 +1047,16 @@ func (c *Config) buildAggregator(name string, tbl *ast.Table) (*models.Aggregato
// buildParser parses Parser specific items from the ast.Table,
// builds the filter and returns a
// models.ParserConfig to be inserted into models.RunningParser
func (c *Config) buildParser(name string, tbl *ast.Table) (*models.ParserConfig, error) {
var dataformat string
c.getFieldString(tbl, "data_format", &dataformat)
func (c *Config) buildParser(name string, tbl *ast.Table) *models.ParserConfig {
var dataFormat string
c.getFieldString(tbl, "data_format", &dataFormat)
conf := &models.ParserConfig{
Parent: name,
DataFormat: dataformat,
DataFormat: dataFormat,
}
return conf, nil
return conf
}
// buildProcessor parses Processor specific items from the ast.Table,

View File

@ -72,9 +72,9 @@ func (r *GzipReader) Read(b []byte) (int, error) {
func NewContentEncoder(encoding string) (ContentEncoder, error) {
switch encoding {
case "gzip":
return NewGzipEncoder()
return NewGzipEncoder(), nil
case "zlib":
return NewZlibEncoder()
return NewZlibEncoder(), nil
case "identity", "":
return NewIdentityEncoder(), nil
default:
@ -99,26 +99,25 @@ func (a *AutoDecoder) Decode(data []byte) ([]byte, error) {
return a.identity.Decode(data)
}
func NewAutoContentDecoder() (*AutoDecoder, error) {
func NewAutoContentDecoder() *AutoDecoder {
var a AutoDecoder
var err error
a.identity = NewIdentityDecoder()
a.gzip, err = NewGzipDecoder()
return &a, err
a.gzip = NewGzipDecoder()
return &a
}
// NewContentDecoder returns a ContentDecoder for the encoding type.
func NewContentDecoder(encoding string) (ContentDecoder, error) {
switch encoding {
case "gzip":
return NewGzipDecoder()
return NewGzipDecoder(), nil
case "zlib":
return NewZlibDecoder()
return NewZlibDecoder(), nil
case "identity", "":
return NewIdentityDecoder(), nil
case "auto":
return NewAutoContentDecoder()
return NewAutoContentDecoder(), nil
default:
return nil, errors.New("invalid value for content_encoding")
}
@ -135,12 +134,12 @@ type GzipEncoder struct {
buf *bytes.Buffer
}
func NewGzipEncoder() (*GzipEncoder, error) {
func NewGzipEncoder() *GzipEncoder {
var buf bytes.Buffer
return &GzipEncoder{
writer: gzip.NewWriter(&buf),
buf: &buf,
}, nil
}
}
func (e *GzipEncoder) Encode(data []byte) ([]byte, error) {
@ -163,12 +162,12 @@ type ZlibEncoder struct {
buf *bytes.Buffer
}
func NewZlibEncoder() (*ZlibEncoder, error) {
func NewZlibEncoder() *ZlibEncoder {
var buf bytes.Buffer
return &ZlibEncoder{
writer: zlib.NewWriter(&buf),
buf: &buf,
}, nil
}
}
func (e *ZlibEncoder) Encode(data []byte) ([]byte, error) {
@ -209,11 +208,11 @@ type GzipDecoder struct {
buf *bytes.Buffer
}
func NewGzipDecoder() (*GzipDecoder, error) {
func NewGzipDecoder() *GzipDecoder {
return &GzipDecoder{
reader: new(gzip.Reader),
buf: new(bytes.Buffer),
}, nil
}
}
func (*GzipDecoder) SetEncoding(string) {}
@ -240,10 +239,10 @@ type ZlibDecoder struct {
buf *bytes.Buffer
}
func NewZlibDecoder() (*ZlibDecoder, error) {
func NewZlibDecoder() *ZlibDecoder {
return &ZlibDecoder{
buf: new(bytes.Buffer),
}, nil
}
}
func (*ZlibDecoder) SetEncoding(string) {}

View File

@ -9,10 +9,8 @@ import (
)
func TestGzipEncodeDecode(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
dec, err := NewGzipDecoder()
require.NoError(t, err)
enc := NewGzipEncoder()
dec := NewGzipDecoder()
payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)
@ -24,10 +22,8 @@ func TestGzipEncodeDecode(t *testing.T) {
}
func TestGzipReuse(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
dec, err := NewGzipDecoder()
require.NoError(t, err)
enc := NewGzipEncoder()
dec := NewGzipDecoder()
payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)
@ -47,10 +43,8 @@ func TestGzipReuse(t *testing.T) {
}
func TestZlibEncodeDecode(t *testing.T) {
enc, err := NewZlibEncoder()
require.NoError(t, err)
dec, err := NewZlibDecoder()
require.NoError(t, err)
enc := NewZlibEncoder()
dec := NewZlibDecoder()
payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)
@ -90,8 +84,7 @@ func TestStreamIdentityDecode(t *testing.T) {
}
func TestStreamGzipDecode(t *testing.T) {
enc, err := NewGzipEncoder()
require.NoError(t, err)
enc := NewGzipEncoder()
written, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)

View File

@ -176,7 +176,7 @@ func TrapLookup(oid string) (e MibEntry, err error) {
// The following is for snmp
func GetIndex(mibPrefix string, node gosmi.SmiNode) (col []string, tagOids map[string]struct{}, err error) {
func GetIndex(mibPrefix string, node gosmi.SmiNode) (col []string, tagOids map[string]struct{}) {
// first attempt to get the table's tags
tagOids = map[string]struct{}{}
@ -190,7 +190,7 @@ func GetIndex(mibPrefix string, node gosmi.SmiNode) (col []string, tagOids map[s
// mimmicks grabbing everything returned from snmptable -Ch -Cl -c public 127.0.0.1 oidFullName
_, col = node.GetColumns()
return col, tagOids, nil
return col, tagOids
}
//nolint:revive //Too many return variable but necessary

View File

@ -42,7 +42,7 @@ func (g *SeriesGrouper) Add(
tm time.Time,
field string,
fieldValue interface{},
) error {
) {
taglist := make([]*telegraf.Tag, 0, len(tags))
for k, v := range tags {
taglist = append(taglist,
@ -59,7 +59,6 @@ func (g *SeriesGrouper) Add(
} else {
m.AddField(field, fieldValue)
}
return nil
}
// AddMetric adds a metric to the series, merging with any previous matching metrics.

View File

@ -60,10 +60,7 @@ func LoadConfig(filePath *string) (loaded loadedConfig, err error) {
data = expandEnvVars(b)
} else {
conf, err = DefaultImportedPlugins()
if err != nil {
return loadedConfig{}, err
}
conf = DefaultImportedPlugins()
}
md, err := toml.Decode(data, &conf)
@ -148,7 +145,7 @@ func createPluginsWithTomlConfig(md toml.MetaData, conf config) (loadedConfig, e
// DefaultImportedPlugins defaults to whatever plugins happen to be loaded and
// have registered themselves with the registry. This makes loading plugins
// without having to define a config dead easy.
func DefaultImportedPlugins() (config, error) {
func DefaultImportedPlugins() config {
conf := config{
Inputs: map[string][]toml.Primitive{},
Processors: map[string][]toml.Primitive{},
@ -157,19 +154,19 @@ func DefaultImportedPlugins() (config, error) {
for name := range inputs.Inputs {
log.Println("No config found. Loading default config for plugin", name)
conf.Inputs[name] = []toml.Primitive{}
return conf, nil
return conf
}
for name := range processors.Processors {
log.Println("No config found. Loading default config for plugin", name)
conf.Processors[name] = []toml.Primitive{}
return conf, nil
return conf
}
for name := range outputs.Outputs {
log.Println("No config found. Loading default config for plugin", name)
conf.Outputs[name] = []toml.Primitive{}
return conf, nil
return conf
}
return conf, nil
return conf
}
type unwrappable interface {

View File

@ -6,11 +6,12 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestOutputShim(t *testing.T) {
@ -32,7 +33,7 @@ func TestOutputShim(t *testing.T) {
wg.Done()
}()
serializer, _ := serializers.NewInfluxSerializer()
serializer := serializers.NewInfluxSerializer()
m := metric.New("thing",
map[string]string{

View File

@ -52,7 +52,7 @@ func testSendAndReceive(t *testing.T, fieldKey string, fieldValue string) {
wg.Done()
}()
serializer, _ := serializers.NewInfluxSerializer()
serializer := serializers.NewInfluxSerializer()
parser := influx.Parser{}
require.NoError(t, parser.Init())

View File

@ -202,7 +202,7 @@ func loadCertificate(config *tls.Config, certFile, keyFile string) error {
return nil
}
func (c *ServerConfig) verifyPeerCertificate(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
func (c *ServerConfig) verifyPeerCertificate(rawCerts [][]byte, _ [][]*x509.Certificate) error {
// The certificate chain is client + intermediate + root.
// Let's review the client certificate.
cert, err := x509.ParseCertificate(rawCerts[0])

View File

@ -32,9 +32,7 @@ func TestAerospikeStatisticsIntegration(t *testing.T) {
}
container := launchTestServer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
a := &Aerospike{
Servers: []string{fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])},
@ -61,9 +59,7 @@ func TestAerospikeStatisticsPartialErrIntegration(t *testing.T) {
}
container := launchTestServer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
a := &Aerospike{
Servers: []string{
@ -90,9 +86,7 @@ func TestSelectNamespacesIntegration(t *testing.T) {
}
container := launchTestServer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
// Select nonexistent namespace
a := &Aerospike{
@ -129,9 +123,7 @@ func TestDisableQueryNamespacesIntegration(t *testing.T) {
}
container := launchTestServer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
a := &Aerospike{
Servers: []string{
@ -161,9 +153,7 @@ func TestQuerySetsIntegration(t *testing.T) {
}
container := launchTestServer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
portInt, err := strconv.Atoi(container.Ports[servicePort])
require.NoError(t, err)
@ -218,9 +208,7 @@ func TestSelectQuerySetsIntegration(t *testing.T) {
}
container := launchTestServer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
portInt, err := strconv.Atoi(container.Ports[servicePort])
require.NoError(t, err)
@ -276,9 +264,7 @@ func TestDisableTTLHistogramIntegration(t *testing.T) {
}
container := launchTestServer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
a := &Aerospike{
Servers: []string{
@ -303,9 +289,7 @@ func TestDisableObjectSizeLinearHistogramIntegration(t *testing.T) {
}
container := launchTestServer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
a := &Aerospike{
Servers: []string{

View File

@ -37,12 +37,8 @@ func (rsmi *ROCmSMI) Gather(acc telegraf.Accumulator) error {
return fmt.Errorf("rocm-smi binary not found in path %s, cannot query GPUs statistics", rsmi.BinPath)
}
data, err := rsmi.pollROCmSMI()
if err != nil {
return err
}
err = gatherROCmSMI(data, acc)
data := rsmi.pollROCmSMI()
err := gatherROCmSMI(data, acc)
if err != nil {
return err
}
@ -59,7 +55,7 @@ func init() {
})
}
func (rsmi *ROCmSMI) pollROCmSMI() ([]byte, error) {
func (rsmi *ROCmSMI) pollROCmSMI() []byte {
// Construct and execute metrics query, there currently exist (ROCm v4.3.x) a "-a" option
// that does not provide all the information, so each needed parameter is set manually
cmd := exec.Command(rsmi.BinPath,
@ -102,9 +98,8 @@ func (rsmi *ROCmSMI) pollROCmSMI() ([]byte, error) {
"--showtoponuma",
"--json")
ret, _ := internal.StdOutputTimeout(cmd,
time.Duration(rsmi.Timeout))
return ret, nil
ret, _ := internal.StdOutputTimeout(cmd, time.Duration(rsmi.Timeout))
return ret
}
func gatherROCmSMI(ret []byte, acc telegraf.Accumulator) error {

View File

@ -3,17 +3,17 @@ package amqp_consumer
import (
"testing"
"github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
"github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/require"
)
func TestAutoEncoding(t *testing.T) {
enc, err := internal.NewGzipEncoder()
require.NoError(t, err)
enc := internal.NewGzipEncoder()
payload, err := enc.Encode([]byte(`measurementName fieldKey="gzip" 1556813561098000000`))
require.NoError(t, err)

View File

@ -58,9 +58,7 @@ func addJSONCounter(acc telegraf.Accumulator, commonTags map[string]string, stat
tags[k] = v
}
if err := grouper.Add("bind_counter", tags, ts, name, value); err != nil {
acc.AddError(fmt.Errorf("adding field %q to group failed: %v", name, err))
}
grouper.Add("bind_counter", tags, ts, name, value)
}
//Add grouped metrics
@ -135,9 +133,7 @@ func (b *Bind) addStatsJSON(stats jsonStats, acc telegraf.Accumulator, urlTag st
"type": cntrType,
}
if err := grouper.Add("bind_counter", tags, ts, cntrName, value); err != nil {
acc.AddError(fmt.Errorf("adding tags %q to group failed: %v", tags, err))
}
grouper.Add("bind_counter", tags, ts, cntrName, value)
}
}
}

View File

@ -75,9 +75,7 @@ func addXMLv2Counter(acc telegraf.Accumulator, commonTags map[string]string, sta
tags[k] = v
}
if err := grouper.Add("bind_counter", tags, ts, c.Name, c.Value); err != nil {
acc.AddError(fmt.Errorf("adding field %q to group failed: %v", c.Name, err))
}
grouper.Add("bind_counter", tags, ts, c.Name, c.Value)
}
//Add grouped metrics

View File

@ -81,9 +81,7 @@ func (b *Bind) addStatsXMLv3(stats v3Stats, acc telegraf.Accumulator, hostPort s
tags := map[string]string{"url": hostPort, "source": host, "port": port, "type": cg.Type}
if err := grouper.Add("bind_counter", tags, ts, c.Name, c.Value); err != nil {
acc.AddError(fmt.Errorf("adding tags %q to group failed: %v", tags, err))
}
grouper.Add("bind_counter", tags, ts, c.Name, c.Value)
}
}
@ -120,9 +118,7 @@ func (b *Bind) addStatsXMLv3(stats v3Stats, acc telegraf.Accumulator, hostPort s
"type": cg.Type,
}
if err := grouper.Add("bind_counter", tags, ts, c.Name, c.Value); err != nil {
acc.AddError(fmt.Errorf("adding tags %q to group failed: %v", tags, err))
}
grouper.Add("bind_counter", tags, ts, c.Name, c.Value)
}
}
}

View File

@ -503,9 +503,7 @@ func (c *CiscoTelemetryMDT) parseRib(grouper *metric.SeriesGrouper, field *telem
tags[subfield.Name] = decodeTag(subfield)
}
if value := decodeValue(subfield); value != nil {
if err := grouper.Add(measurement, tags, timestamp, subfield.Name, value); err != nil {
c.Log.Errorf("adding field %q to group failed: %v", subfield.Name, err)
}
grouper.Add(measurement, tags, timestamp, subfield.Name, value)
}
if subfield.Name != "nextHop" {
continue
@ -520,9 +518,7 @@ func (c *CiscoTelemetryMDT) parseRib(grouper *metric.SeriesGrouper, field *telem
}
if value := decodeValue(ff); value != nil {
name := "nextHop/" + ff.Name
if err := grouper.Add(measurement, tags, timestamp, name, value); err != nil {
c.Log.Errorf("adding field %q to group failed: %v", name, err)
}
grouper.Add(measurement, tags, timestamp, name, value)
}
}
}
@ -587,13 +583,9 @@ func (c *CiscoTelemetryMDT) parseContentField(grouper *metric.SeriesGrouper, fie
}
if val := c.nxosValueXform(field, value, encodingPath); val != nil {
if err := grouper.Add(measurement, tags, timestamp, name, val); err != nil {
c.Log.Errorf("adding field %q to group failed: %v", name, err)
}
grouper.Add(measurement, tags, timestamp, name, val)
} else {
if err := grouper.Add(measurement, tags, timestamp, name, value); err != nil {
c.Log.Errorf("adding field %q to group failed: %v", name, err)
}
grouper.Add(measurement, tags, timestamp, name, value)
}
return
}

View File

@ -258,10 +258,7 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
}
}
} else {
allMetrics, err := c.fetchNamespaceMetrics()
if err != nil {
return nil, err
}
allMetrics := c.fetchNamespaceMetrics()
for _, name := range m.MetricNames {
for _, metric := range allMetrics {
if isSelected(name, metric, m.Dimensions) {
@ -294,11 +291,7 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
})
}
} else {
metrics, err := c.fetchNamespaceMetrics()
if err != nil {
return nil, err
}
metrics := c.fetchNamespaceMetrics()
fMetrics = []filteredMetric{
{
metrics: metrics,
@ -317,7 +310,7 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
}
// fetchNamespaceMetrics retrieves available metrics for a given CloudWatch namespace.
func (c *CloudWatch) fetchNamespaceMetrics() ([]types.Metric, error) {
func (c *CloudWatch) fetchNamespaceMetrics() []types.Metric {
metrics := []types.Metric{}
for _, namespace := range c.Namespaces {
@ -344,7 +337,7 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]types.Metric, error) {
params.NextToken = resp.NextToken
}
}
return metrics, nil
return metrics
}
func (c *CloudWatch) updateWindow(relativeTo time.Time) {
@ -497,9 +490,7 @@ func (c *CloudWatch) aggregateMetrics(
tags["region"] = c.Region
for i := range result.Values {
if err := grouper.Add(namespace, tags, result.Timestamps[i], *result.Label, result.Values[i]); err != nil {
acc.AddError(err)
}
grouper.Add(namespace, tags, result.Timestamps[i], *result.Label, result.Values[i])
}
}
}

View File

@ -165,7 +165,7 @@ type mockSelectMetricsCloudWatchClient struct{}
func (m *mockSelectMetricsCloudWatchClient) ListMetrics(
_ context.Context,
params *cwClient.ListMetricsInput,
_ *cwClient.ListMetricsInput,
_ ...func(*cwClient.Options),
) (*cwClient.ListMetricsOutput, error) {
metrics := []types.Metric{}
@ -216,7 +216,7 @@ func (m *mockSelectMetricsCloudWatchClient) ListMetrics(
func (m *mockSelectMetricsCloudWatchClient) GetMetricData(
_ context.Context,
params *cwClient.GetMetricDataInput,
_ *cwClient.GetMetricDataInput,
_ ...func(*cwClient.Options),
) (*cwClient.GetMetricDataOutput, error) {
return nil, nil

View File

@ -69,12 +69,12 @@ func getHTTPSClient() *http.Client {
}
}
func createURL(metricStream *CloudWatchMetricStreams, scheme string, path string, rawquery string) string {
func createURL(scheme string, path string) string {
u := url.URL{
Scheme: scheme,
Host: "localhost:8080",
Path: path,
RawQuery: rawquery,
RawQuery: "",
}
return u.String()
}
@ -118,7 +118,7 @@ func TestWriteHTTPSNoClientAuth(t *testing.T) {
// post single message to the metric stream listener
record := readJSON(t, "testdata/record.json")
resp, err := noClientAuthClient.Post(createURL(metricStream, "https", "/write", ""), "", bytes.NewBuffer(record))
resp, err := noClientAuthClient.Post(createURL("https", "/write"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
@ -134,7 +134,7 @@ func TestWriteHTTPSWithClientAuth(t *testing.T) {
// post single message to the metric stream listener
record := readJSON(t, "testdata/record.json")
resp, err := getHTTPSClient().Post(createURL(metricStream, "https", "/write", ""), "", bytes.NewBuffer(record))
resp, err := getHTTPSClient().Post(createURL("https", "/write"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
@ -151,7 +151,7 @@ func TestWriteHTTPSuccessfulAuth(t *testing.T) {
client := &http.Client{}
record := readJSON(t, "testdata/record.json")
req, err := http.NewRequest("POST", createURL(metricStream, "http", "/write", ""), bytes.NewBuffer(record))
req, err := http.NewRequest("POST", createURL("http", "/write"), bytes.NewBuffer(record))
require.NoError(t, err)
req.Header.Set("X-Amz-Firehose-Access-Key", accessKey)
@ -173,7 +173,7 @@ func TestWriteHTTPFailedAuth(t *testing.T) {
client := &http.Client{}
record := readJSON(t, "testdata/record.json")
req, err := http.NewRequest("POST", createURL(metricStream, "http", "/write", ""), bytes.NewBuffer(record))
req, err := http.NewRequest("POST", createURL("http", "/write"), bytes.NewBuffer(record))
require.NoError(t, err)
req.Header.Set("X-Amz-Firehose-Access-Key", badAccessKey)
@ -194,7 +194,7 @@ func TestWriteHTTP(t *testing.T) {
// post single message to the metric stream listener
record := readJSON(t, "testdata/record.json")
resp, err := http.Post(createURL(metricStream, "http", "/write", ""), "", bytes.NewBuffer(record))
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
@ -210,7 +210,7 @@ func TestWriteHTTPMultipleRecords(t *testing.T) {
// post multiple records to the metric stream listener
records := readJSON(t, "testdata/records.json")
resp, err := http.Post(createURL(metricStream, "http", "/write", ""), "", bytes.NewBuffer(records))
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBuffer(records))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
@ -227,7 +227,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) {
defer metricStream.Stop()
// post single message to the metric stream listener
resp, err := http.Post(createURL(metricStream, "http", "/write", ""), "", bytes.NewBuffer(record))
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode)
@ -244,7 +244,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
// post single message to the metric stream listener
record := readJSON(t, "testdata/record.json")
resp, err := http.Post(createURL(metricStream, "http", "/write", ""), "", bytes.NewBuffer(record))
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 413, resp.StatusCode)
@ -260,7 +260,7 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
// post single message to the metric stream listener
record := readJSON(t, "testdata/record.json")
resp, err := http.Post(createURL(metricStream, "http", "/foobar", ""), "", bytes.NewBuffer(record))
resp, err := http.Post(createURL("http", "/foobar"), "", bytes.NewBuffer(record))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 404, resp.StatusCode)
@ -275,7 +275,7 @@ func TestWriteHTTPInvalid(t *testing.T) {
defer metricStream.Stop()
// post a badly formatted message to the metric stream listener
resp, err := http.Post(createURL(metricStream, "http", "/write", ""), "", bytes.NewBuffer([]byte(badMsg)))
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBuffer([]byte(badMsg)))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 400, resp.StatusCode)
@ -290,7 +290,7 @@ func TestWriteHTTPEmpty(t *testing.T) {
defer metricStream.Stop()
// post empty message to the metric stream listener
resp, err := http.Post(createURL(metricStream, "http", "/write", ""), "", bytes.NewBuffer([]byte(emptyMsg)))
resp, err := http.Post(createURL("http", "/write"), "", bytes.NewBuffer([]byte(emptyMsg)))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 400, resp.StatusCode)
@ -370,7 +370,7 @@ func TestWriteHTTPGzippedData(t *testing.T) {
data, err := os.ReadFile("./testdata/records.gz")
require.NoError(t, err)
req, err := http.NewRequest("POST", createURL(metricStream, "http", "/write", ""), bytes.NewBuffer(data))
req, err := http.NewRequest("POST", createURL("http", "/write"), bytes.NewBuffer(data))
require.NoError(t, err)
req.Header.Set("Content-Encoding", "gzip")

View File

@ -200,9 +200,7 @@ func TestDovecotContainerIntegration(t *testing.T) {
wait.ForListeningPort(nat.Port(servicePort)),
),
}
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
err = container.Start()
require.NoError(t, err, "failed to start container")

View File

@ -13,12 +13,13 @@ import (
"github.com/docker/go-connections/nat"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
elastic5 "gopkg.in/olivere/elastic.v5"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
)
const (
@ -604,9 +605,7 @@ func TestElasticsearchQueryIntegration(t *testing.T) {
container, err := setupIntegrationTest(t)
require.NoError(t, err)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
var acc testutil.Accumulator
e := &ElasticsearchQuery{
@ -665,9 +664,7 @@ func TestElasticsearchQueryIntegration_getMetricFields(t *testing.T) {
container, err := setupIntegrationTest(t)
require.NoError(t, err)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
type args struct {
ctx context.Context

View File

@ -207,11 +207,7 @@ func TestMain(m *testing.M) {
func runCounterProgram() error {
envMetricName := os.Getenv("METRIC_NAME")
i := 0
serializer, err := serializers.NewInfluxSerializer()
if err != nil {
fmt.Fprintln(os.Stderr, "ERR InfluxSerializer failed to load")
return err
}
serializer := serializers.NewInfluxSerializer()
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {

View File

@ -403,9 +403,7 @@ func (c *GNMI) handleSubscribeResponseUpdate(worker *Worker, response *gnmiLib.S
}
}
if err := grouper.Add(name, tags, timestamp, key, v); err != nil {
c.Log.Errorf("cannot add to grouper: %v", err)
}
grouper.Add(name, tags, timestamp, key, v)
}
lastAliasPath = aliasPath

View File

@ -488,7 +488,7 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
t.Logf("rt: starting network")
ctx := context.Background()
networkName := "telegraf-test-kafka-consumer-network"
net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
network, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
@ -497,7 +497,7 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
})
require.NoError(t, err)
defer func() {
require.NoError(t, net.Remove(ctx), "terminating network failed")
require.NoError(t, network.Remove(ctx), "terminating network failed")
}()
t.Logf("rt: starting zookeeper")
@ -510,9 +510,7 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
Name: zookeeperName,
}
require.NoError(t, zookeeper.Start(), "failed to start container")
defer func() {
require.NoError(t, zookeeper.Terminate(), "terminating container failed")
}()
defer zookeeper.Terminate()
t.Logf("rt: starting broker")
topic := "Test"
@ -530,9 +528,7 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"),
}
require.NoError(t, container.Start(), "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
brokers := []string{
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
@ -544,7 +540,7 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
output, ok := creator().(*kafkaOutput.Kafka)
require.True(t, ok)
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
output.SetSerializer(s)
output.Brokers = brokers
output.Topic = topic

View File

@ -8,11 +8,11 @@ type KNXDummyInterface struct {
inbound chan knx.GroupEvent
}
func NewDummyInterface() (di KNXDummyInterface, err error) {
di, err = KNXDummyInterface{}, nil
func NewDummyInterface() KNXDummyInterface {
di := KNXDummyInterface{}
di.inbound = make(chan knx.GroupEvent)
return di, err
return di
}
func (di *KNXDummyInterface) Send(event knx.GroupEvent) {

View File

@ -106,10 +106,7 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
}
kl.client = &c
case "dummy":
c, err := NewDummyInterface()
if err != nil {
return err
}
c := NewDummyInterface()
kl.client = &c
default:
return fmt.Errorf("invalid interface type: %s", kl.ServiceAddress)

View File

@ -61,9 +61,7 @@ func TestMcrouterGeneratesMetricsIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
m := &Mcrouter{
Servers: []string{

View File

@ -26,9 +26,7 @@ func TestMemcachedGeneratesMetricsIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
m := &Memcached{
Servers: []string{fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])},

View File

@ -441,10 +441,7 @@ func (m *Modbus) collectFields(acc telegraf.Accumulator, timestamp time.Time, ta
}
// Group the data by series
if err := grouper.Add(measurement, rtags, timestamp, field.name, field.value); err != nil {
acc.AddError(fmt.Errorf("cannot add field %q for measurement %q: %v", field.name, measurement, err))
continue
}
grouper.Add(measurement, rtags, timestamp, field.name, field.value)
}
}

View File

@ -35,9 +35,7 @@ func TestGetDefaultTagsIntegration(t *testing.T) {
}
container := createTestServer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
m := &MongoDB{
Log: testutil.Logger{},
@ -73,9 +71,7 @@ func TestAddDefaultStatsIntegration(t *testing.T) {
}
container := createTestServer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
m := &MongoDB{
Log: testutil.Logger{},

View File

@ -33,9 +33,7 @@ func TestMysqlDefaultsToLocalIntegration(t *testing.T) {
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
m := &Mysql{
Servers: []string{fmt.Sprintf("root@tcp(%s:%s)/", container.Address, container.Ports[servicePort])},
@ -70,9 +68,7 @@ func TestMysqlMultipleInstancesIntegration(t *testing.T) {
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
testServer := fmt.Sprintf("root@tcp(%s:%s)/?tls=false", container.Address, container.Ports[servicePort])
m := &Mysql{

View File

@ -6,12 +6,13 @@ import (
"time"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
)
const servicePort = "4840"
@ -47,9 +48,7 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
var testopctags = []OPCTags{
{"ProductName", "1", "i", "2261", "open62541 OPC UA Server"},
@ -115,9 +114,7 @@ func TestReadClientIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
var testopctags = []OPCTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},

View File

@ -3,7 +3,9 @@ package opcua
import (
"context"
"fmt"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
@ -141,10 +143,3 @@ func (o *ReadClient) read() error {
}
return nil
}
// StartStreamValues does nothing for the read client, as it has to actively fetch values. The channel is closed immediately.
func (o *ReadClient) StartStreamValues(_ context.Context) (<-chan telegraf.Metric, error) {
c := make(chan telegraf.Metric)
defer close(c)
return c, nil
}

View File

@ -7,12 +7,13 @@ import (
"time"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
)
const servicePort = "4840"
@ -48,9 +49,7 @@ func TestSubscribeClientIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
var testopctags = []OPCTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},

View File

@ -77,9 +77,7 @@ func TestOpenldapGeneratesMetricsIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
port, err := strconv.Atoi(container.Ports[servicePort])
require.NoError(t, err)
@ -134,9 +132,7 @@ func TestOpenldapStartTLSIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
port, err := strconv.Atoi(container.Ports[servicePort])
require.NoError(t, err)
@ -197,9 +193,7 @@ func TestOpenldapLDAPSIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
port, err := strconv.Atoi(container.Ports[servicePortSecure])
require.NoError(t, err)
@ -255,9 +249,7 @@ func TestOpenldapInvalidSSLIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
port, err := strconv.Atoi(container.Ports[servicePortSecure])
require.NoError(t, err)
@ -295,9 +287,7 @@ func TestOpenldapBindIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
port, err := strconv.Atoi(container.Ports[servicePort])
require.NoError(t, err)
@ -347,9 +337,7 @@ func TestOpenldapReverseMetricsIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
port, err := strconv.Atoi(container.Ports[servicePort])
require.NoError(t, err)

View File

@ -30,9 +30,7 @@ func TestPgBouncerGeneratesMetricsIntegration(t *testing.T) {
}
err := backend.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, backend.Terminate(), "terminating container failed")
}()
defer backend.Terminate()
container := testutil.Container{
Image: "z9pascal/pgbouncer-container:1.17.0-latest",
@ -48,9 +46,7 @@ func TestPgBouncerGeneratesMetricsIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
p := &PgBouncer{
Service: postgresql.Service{

View File

@ -40,9 +40,7 @@ func TestPostgresqlGeneratesMetricsIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
p := &Postgresql{
Service: Service{
@ -131,9 +129,7 @@ func TestPostgresqlTagsMetricsWithDatabaseNameIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
p := &Postgresql{
Service: Service{
@ -163,9 +159,7 @@ func TestPostgresqlDefaultsToAllDatabasesIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
p := &Postgresql{
Service: Service{
@ -202,9 +196,7 @@ func TestPostgresqlIgnoresUnwantedColumnsIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
p := &Postgresql{
Service: Service{
@ -231,9 +223,7 @@ func TestPostgresqlDatabaseWhitelistTestIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
p := &Postgresql{
Service: Service{
@ -277,9 +267,7 @@ func TestPostgresqlDatabaseBlacklistTestIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
p := &Postgresql{
Service: Service{

View File

@ -30,9 +30,7 @@ func queryRunner(t *testing.T, q query) *testutil.Accumulator {
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
p := &Postgresql{
Log: testutil.Logger{},

View File

@ -103,11 +103,7 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error {
return err
}
err = p.updateProcesses(pids, tags, p.procs, newProcs)
if err != nil {
acc.AddError(fmt.Errorf("procstat getting process, exe: [%s] pidfile: [%s] pattern: [%s] user: [%s] %s",
p.Exe, p.PidFile, p.Pattern, p.User, err.Error()))
}
p.updateProcesses(pids, tags, p.procs, newProcs)
}
p.procs = newProcs
@ -298,7 +294,7 @@ func (p *Procstat) addMetric(proc Process, acc telegraf.Accumulator, t time.Time
}
// Update monitored Processes
func (p *Procstat) updateProcesses(pids []PID, tags map[string]string, prevInfo map[PID]Process, procs map[PID]Process) error {
func (p *Procstat) updateProcesses(pids []PID, tags map[string]string, prevInfo map[PID]Process, procs map[PID]Process) {
for _, pid := range pids {
info, ok := prevInfo[pid]
if ok {
@ -333,7 +329,6 @@ func (p *Procstat) updateProcesses(pids []PID, tags map[string]string, prevInfo
}
}
}
return nil
}
// Create and return PIDGatherer lazily

View File

@ -9,9 +9,10 @@ import (
"github.com/docker/go-connections/nat"
"github.com/go-redis/redis/v8"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/testutil"
)
type testClient struct {
@ -46,9 +47,7 @@ func TestRedisConnectIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
addr := fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])

View File

@ -9,11 +9,11 @@ import (
"time"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
)
const masterName = "mymaster"
@ -31,9 +31,7 @@ func TestRedisSentinelConnectIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
addr := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])

View File

@ -6,9 +6,10 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestIPv4SW(t *testing.T) {
@ -62,8 +63,7 @@ func TestIPv4SW(t *testing.T) {
actual := []telegraf.Metric{}
dc := NewDecoder()
dc.OnPacket(func(p *V5Format) {
metrics, err := makeMetrics(p)
require.NoError(t, err)
metrics := makeMetrics(p)
actual = append(actual, metrics...)
})
buf := bytes.NewReader(packet)
@ -166,8 +166,7 @@ func TestExpandFlow(t *testing.T) {
dc := NewDecoder()
p, err := dc.DecodeOnePacket(bytes.NewBuffer(packet))
require.NoError(t, err)
actual, err := makeMetrics(p)
require.NoError(t, err)
actual := makeMetrics(p)
expected := []telegraf.Metric{
testutil.MustMetric(
@ -284,8 +283,7 @@ func TestIPv4SWRT(t *testing.T) {
dc := NewDecoder()
p, err := dc.DecodeOnePacket(bytes.NewBuffer(packet))
require.NoError(t, err)
actual, err := makeMetrics(p)
require.NoError(t, err)
actual := makeMetrics(p)
expected := []telegraf.Metric{
testutil.MustMetric(
@ -507,8 +505,7 @@ func TestIPv6SW(t *testing.T) {
dc := NewDecoder()
p, err := dc.DecodeOnePacket(bytes.NewBuffer(packet))
require.NoError(t, err)
actual, err := makeMetrics(p)
require.NoError(t, err)
actual := makeMetrics(p)
expected := []telegraf.Metric{
@ -554,8 +551,7 @@ func TestExpandFlowCounter(t *testing.T) {
dc := NewDecoder()
p, err := dc.DecodeOnePacket(bytes.NewBuffer(packet))
require.NoError(t, err)
actual, err := makeMetrics(p)
require.NoError(t, err)
actual := makeMetrics(p)
expected := []telegraf.Metric{
testutil.MustMetric(
@ -749,8 +745,7 @@ func TestFlowExpandCounter(t *testing.T) {
dc := NewDecoder()
p, err := dc.DecodeOnePacket(bytes.NewBuffer(packet))
require.NoError(t, err)
actual, err := makeMetrics(p)
require.NoError(t, err)
actual := makeMetrics(p)
// we don't do anything with samples yet
expected := []telegraf.Metric{}

View File

@ -8,7 +8,7 @@ import (
"github.com/influxdata/telegraf/metric"
)
func makeMetrics(p *V5Format) ([]telegraf.Metric, error) {
func makeMetrics(p *V5Format) []telegraf.Metric {
now := time.Now()
metrics := []telegraf.Metric{}
tags := map[string]string{
@ -39,5 +39,5 @@ func makeMetrics(p *V5Format) ([]telegraf.Metric, error) {
}
}
}
return metrics, nil
return metrics
}

View File

@ -48,11 +48,7 @@ func (s *SFlow) Init() error {
// Start starts this sFlow listener listening on the configured network for sFlow packets
func (s *SFlow) Start(acc telegraf.Accumulator) error {
s.decoder.OnPacket(func(p *V5Format) {
metrics, err := makeMetrics(p)
if err != nil {
s.Log.Errorf("Failed to make metric from packet: %s", err)
return
}
metrics := makeMetrics(p)
for _, m := range metrics {
acc.AddMetric(m)
}

View File

@ -114,12 +114,11 @@ func (g *gosmiTranslator) SnmpTableCall(oid string) (mibName string, oidNum stri
mibPrefix := mibName + "::"
col, tagOids, err := snmp.GetIndex(mibPrefix, node)
col, tagOids := snmp.GetIndex(mibPrefix, node)
for _, c := range col {
_, isTag := tagOids[mibPrefix+c]
fields = append(fields, Field{Name: c, Oid: mibPrefix + c, IsTag: isTag})
}
return mibName, oidNum, oidText, fields, err
return mibName, oidNum, oidText, fields, nil
}

View File

@ -525,10 +525,7 @@ func (h *Host) SNMPGet(acc telegraf.Accumulator, initNode Node) error {
return err3
}
// Handle response
_, err = h.HandleResponse(oidsList, result, acc, initNode)
if err != nil {
return err
}
h.HandleResponse(oidsList, result, acc, initNode)
}
return nil
}
@ -568,10 +565,7 @@ func (h *Host) SNMPBulk(acc telegraf.Accumulator, initNode Node) error {
return err3
}
// Handle response
lastOid, err := h.HandleResponse(oidsList, result, acc, initNode)
if err != nil {
return err
}
lastOid := h.HandleResponse(oidsList, result, acc, initNode)
// Determine if we need more requests
if strings.HasPrefix(lastOid, oidAsked) {
needMoreRequests = true
@ -628,7 +622,7 @@ func (h *Host) HandleResponse(
result *gosnmp.SnmpPacket,
acc telegraf.Accumulator,
initNode Node,
) (string, error) {
) string {
var lastOid string
for _, variable := range result.Variables {
lastOid = variable.Name
@ -708,7 +702,7 @@ func (h *Host) HandleResponse(
}
}
}
return lastOid, nil
return lastOid
}
func init() {

View File

@ -2,11 +2,10 @@ package sql
import (
"fmt"
"testing"
"time"
"math/rand"
"path/filepath"
"testing"
"time"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
@ -61,9 +60,7 @@ func TestMariaDBIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
// Define the testset
var testset = []struct {
@ -168,9 +165,7 @@ func TestPostgreSQLIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
// Define the testset
var testset = []struct {
@ -271,9 +266,7 @@ func TestClickHouseIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
// Define the testset
var testset = []struct {

View File

@ -122,10 +122,10 @@ func (g *lockedSeriesGrouper) Add(
tm time.Time,
field string,
fieldValue interface{},
) error {
) {
g.Lock()
defer g.Unlock()
return g.SeriesGrouper.Add(measurement, tags, tm, field, fieldValue)
g.SeriesGrouper.Add(measurement, tags, tm, field, fieldValue)
}
// ListMetricDescriptors implements metricClient interface
@ -554,9 +554,7 @@ func (s *Stackdriver) gatherTimeSeries(
value = p.Value.GetStringValue()
}
if err := grouper.Add(tsConf.measurement, tags, ts, tsConf.fieldKey, value); err != nil {
return err
}
grouper.Add(tsConf.measurement, tags, ts, tsConf.fieldKey, value)
}
}
}
@ -638,23 +636,13 @@ func (s *Stackdriver) addDistribution(dist *distributionpb.Distribution, tags ma
field := tsConf.fieldKey
name := tsConf.measurement
if err := grouper.Add(name, tags, ts, field+"_count", dist.Count); err != nil {
return err
}
if err := grouper.Add(name, tags, ts, field+"_mean", dist.Mean); err != nil {
return err
}
if err := grouper.Add(name, tags, ts, field+"_sum_of_squared_deviation", dist.SumOfSquaredDeviation); err != nil {
return err
}
grouper.Add(name, tags, ts, field+"_count", dist.Count)
grouper.Add(name, tags, ts, field+"_mean", dist.Mean)
grouper.Add(name, tags, ts, field+"_sum_of_squared_deviation", dist.SumOfSquaredDeviation)
if dist.Range != nil {
if err := grouper.Add(name, tags, ts, field+"_range_min", dist.Range.Min); err != nil {
return err
}
if err := grouper.Add(name, tags, ts, field+"_range_max", dist.Range.Max); err != nil {
return err
}
grouper.Add(name, tags, ts, field+"_range_min", dist.Range.Min)
grouper.Add(name, tags, ts, field+"_range_max", dist.Range.Max)
}
bucket, err := NewBucket(dist)
@ -680,9 +668,7 @@ func (s *Stackdriver) addDistribution(dist *distributionpb.Distribution, tags ma
if i < int32(len(dist.BucketCounts)) {
count += dist.BucketCounts[i]
}
if err := grouper.Add(name, tags, ts, field+"_bucket", count); err != nil {
return err
}
grouper.Add(name, tags, ts, field+"_bucket", count)
}
return nil

View File

@ -5,9 +5,10 @@ import (
"testing"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/testutil"
)
func TestShort_SampleData(t *testing.T) {
@ -149,9 +150,7 @@ func TestIntegration_BasicGathering(t *testing.T) {
}
err = ctr.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, ctr.Terminate(), "terminating container failed")
}()
defer ctr.Terminate()
s := &Supervisor{
Server: "http://login:pass@" + testutil.GetLocalHost() + ":" + ctr.Ports[supervisorPort] + "/RPC2",
MetricsInc: []string{},

View File

@ -30,9 +30,7 @@ func TestZookeeperGeneratesMetricsIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
var testset = []struct {
name string

View File

@ -348,14 +348,14 @@ type mockIngestor struct {
records []string
}
func (m *mockIngestor) FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error) {
func (m *mockIngestor) FromReader(_ context.Context, reader io.Reader, _ ...ingest.FileOption) (*ingest.Result, error) {
bufbytes, _ := io.ReadAll(reader)
metricjson := string(bufbytes)
m.SetRecords(strings.Split(metricjson, "\n"))
return &ingest.Result{}, nil
}
func (m *mockIngestor) FromFile(ctx context.Context, fPath string, options ...ingest.FileOption) (*ingest.Result, error) {
func (m *mockIngestor) FromFile(_ context.Context, _ string, _ ...ingest.FileOption) (*ingest.Result, error) {
return &ingest.Result{}, nil
}

View File

@ -2,6 +2,7 @@ package cloud_pubsub
import (
"context"
"encoding/base64"
"errors"
"fmt"
"runtime"
@ -9,16 +10,15 @@ import (
"testing"
"time"
"encoding/base64"
"cloud.google.com/go/pubsub"
"github.com/stretchr/testify/require"
"google.golang.org/api/support/bundler"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/stretchr/testify/require"
"google.golang.org/api/support/bundler"
)
const (
@ -62,7 +62,7 @@ type (
)
func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []testMetric) (*PubSub, *stubTopic, []telegraf.Metric) {
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
metrics := make([]telegraf.Metric, len(testM))
t := &stubTopic{

View File

@ -8,12 +8,13 @@ import (
"time"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
)
const servicePort = "5432"
@ -43,9 +44,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
container := createTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
url := fmt.Sprintf("postgres://crate@%s:%s/test", container.Address, container.Ports[servicePort])
fmt.Println(url)
@ -155,9 +154,7 @@ func Test_escapeValueIntegration(t *testing.T) {
}
container := createTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
url := fmt.Sprintf("postgres://crate@%s:%s/test", container.Address, container.Ports[servicePort])
db, err := sql.Open("pgx", url)

View File

@ -14,11 +14,12 @@ import (
"github.com/dynatrace-oss/dynatrace-metric-utils-go/metric/apiconstants"
"github.com/dynatrace-oss/dynatrace-metric-utils-go/metric/dimensions"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestNilMetrics(t *testing.T) {
@ -516,7 +517,7 @@ type loggerStub struct {
testutil.Logger
}
func (l loggerStub) Warnf(format string, args ...interface{}) {
func (l loggerStub) Warnf(_ string, _ ...interface{}) {
warnfCalledTimes++
}

View File

@ -11,11 +11,12 @@ import (
"time"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
)
const servicePort = "9200"
@ -44,9 +45,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
@ -80,9 +79,7 @@ func TestConnectAndWriteMetricWithNaNValueEmptyIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
@ -123,9 +120,7 @@ func TestConnectAndWriteMetricWithNaNValueNoneIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
@ -167,9 +162,7 @@ func TestConnectAndWriteMetricWithNaNValueDropIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
@ -233,9 +226,7 @@ func TestConnectAndWriteMetricWithNaNValueReplacementIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
@ -283,9 +274,7 @@ func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
@ -314,9 +303,7 @@ func TestTemplateManagementIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
@ -349,9 +336,7 @@ func TestTemplateInvalidIndexPatternIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),

View File

@ -57,7 +57,7 @@ func TestExec(t *testing.T) {
runner: &CommandRunner{},
}
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
e.SetSerializer(s)
require.NoError(t, e.Connect())

View File

@ -24,8 +24,7 @@ import (
var now = time.Date(2020, 6, 30, 16, 16, 0, 0, time.UTC)
func TestExternalOutputWorks(t *testing.T) {
influxSerializer, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
influxSerializer := serializers.NewInfluxSerializer()
exe, err := os.Executable()
require.NoError(t, err)
@ -71,8 +70,7 @@ func TestExternalOutputWorks(t *testing.T) {
}
func TestPartiallyUnserializableThrowError(t *testing.T) {
influxSerializer, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
influxSerializer := serializers.NewInfluxSerializer()
exe, err := os.Executable()
require.NoError(t, err)
@ -108,8 +106,7 @@ func TestPartiallyUnserializableThrowError(t *testing.T) {
}
func TestPartiallyUnserializableCanBeSkipped(t *testing.T) {
influxSerializer, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
influxSerializer := serializers.NewInfluxSerializer()
exe, err := os.Executable()
require.NoError(t, err)

View File

@ -21,7 +21,7 @@ const (
func TestFileExistingFile(t *testing.T) {
fh := createFile(t)
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
f := File{
Files: []string{fh.Name()},
serializer: s,
@ -40,7 +40,7 @@ func TestFileExistingFile(t *testing.T) {
}
func TestFileNewFile(t *testing.T) {
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
fh := tmpFile(t)
f := File{
Files: []string{fh},
@ -64,7 +64,7 @@ func TestFileExistingFiles(t *testing.T) {
fh2 := createFile(t)
fh3 := createFile(t)
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
f := File{
Files: []string{fh1.Name(), fh2.Name(), fh3.Name()},
serializer: s,
@ -85,7 +85,7 @@ func TestFileExistingFiles(t *testing.T) {
}
func TestFileNewFiles(t *testing.T) {
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
fh1 := tmpFile(t)
fh2 := tmpFile(t)
fh3 := tmpFile(t)
@ -112,7 +112,7 @@ func TestFileBoth(t *testing.T) {
fh1 := createFile(t)
fh2 := tmpFile(t)
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
f := File{
Files: []string{fh1.Name(), fh2},
serializer: s,
@ -137,7 +137,7 @@ func TestFileStdout(t *testing.T) {
r, w, _ := os.Pipe()
os.Stdout = w
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
f := File{
Files: []string{"stdout"},
serializer: s,

View File

@ -121,11 +121,7 @@ func (g *Groundwork) Write(metrics []telegraf.Metric) error {
groupMap := make(map[string][]transit.ResourceRef)
resourceToServicesMap := make(map[string][]transit.MonitoredService)
for _, metric := range metrics {
meta, service, err := g.parseMetric(metric)
if err != nil {
g.Log.Errorf("%v", err)
continue
}
meta, service := g.parseMetric(metric)
resource := meta.resource
resourceToServicesMap[resource] = append(resourceToServicesMap[resource], *service)
@ -210,7 +206,7 @@ func init() {
})
}
func (g *Groundwork) parseMetric(metric telegraf.Metric) (metricMeta, *transit.MonitoredService, error) {
func (g *Groundwork) parseMetric(metric telegraf.Metric) (metricMeta, *transit.MonitoredService) {
group, _ := metric.GetTag(g.GroupTag)
resource := g.DefaultHost
@ -376,7 +372,7 @@ func (g *Groundwork) parseMetric(metric telegraf.Metric) (metricMeta, *transit.M
serviceObject.Status = status
}()
return metricMeta{resource: resource, group: group}, &serviceObject, nil
return metricMeta{resource: resource, group: group}, &serviceObject
}
func validStatus(status string) bool {

View File

@ -504,9 +504,7 @@ func TestIntegrationInserts(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start IoTDB container")
defer func() {
require.NoError(t, container.Terminate(), "terminating IoTDB container failed")
}()
defer container.Terminate()
t.Logf("Container Address:%q, ExposedPorts:[%v:%v]", container.Address, container.Ports[iotdbPort], iotdbPort)
// create a client and tests two groups of insertion

View File

@ -50,9 +50,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
err = zookeeper.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, zookeeper.Terminate(), "terminating container failed")
}()
defer zookeeper.Terminate()
container := testutil.Container{
Image: "wurstmeister/kafka",
@ -68,15 +66,13 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
brokers := []string{
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
}
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
k := &Kafka{
Brokers: brokers,
Topic: "Test",
@ -334,11 +330,10 @@ func TestTopicTag(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
tt.plugin.Log = testutil.Logger{}
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
s := serializers.NewInfluxSerializer()
tt.plugin.SetSerializer(s)
err = tt.plugin.Connect()
err := tt.plugin.Connect()
require.NoError(t, err)
producer := &MockProducer{}

View File

@ -7,11 +7,12 @@ import (
"time"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
)
func TestConnectAndWriteIntegrationNoAuth(t *testing.T) {
@ -30,9 +31,7 @@ func TestConnectAndWriteIntegrationNoAuth(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
// Run test
plugin := &MongoDB{
@ -74,9 +73,7 @@ func TestConnectAndWriteIntegrationSCRAMAuth(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
tests := []struct {
name string
@ -179,9 +176,7 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
tests := []struct {
name string

View File

@ -5,11 +5,11 @@ import (
"path/filepath"
"testing"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
)
const servicePort = "1883"
@ -38,12 +38,9 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
s := serializers.NewInfluxSerializer()
m := &MQTT{
Servers: []string{url},
serializer: s,
@ -52,7 +49,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
// Verify that we can connect to the MQTT broker
err = m.Connect()
err := m.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to the mqtt broker
@ -66,13 +63,10 @@ func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
s := serializers.NewInfluxSerializer()
m := &MQTT{
Servers: []string{url},
Protocol: "3.1.1",
@ -82,7 +76,7 @@ func TestConnectAndWriteIntegrationMQTTv3(t *testing.T) {
}
// Verify that we can connect to the MQTT broker
err = m.Connect()
err := m.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to the mqtt broker
@ -96,13 +90,10 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
}
container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewInfluxSerializer()
require.NoError(t, err)
s := serializers.NewInfluxSerializer()
m := &MQTT{
Servers: []string{url},
Protocol: "5",
@ -112,7 +103,7 @@ func TestConnectAndWriteIntegrationMQTTv5(t *testing.T) {
}
// Verify that we can connect to the MQTT broker
err = m.Connect()
err := m.Connect()
require.NoError(t, err)
// Verify that we can successfully write data to the mqtt broker

View File

@ -4,10 +4,11 @@ import (
"fmt"
"testing"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
)
func TestConnectAndWriteIntegration(t *testing.T) {
@ -23,12 +24,10 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
server := []string{fmt.Sprintf("nats://%s:%s", container.Address, container.Ports[servicePort])}
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
n := &NATS{
Servers: server,
Name: "telegraf",

View File

@ -5,10 +5,11 @@ import (
"testing"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
)
func TestConnectAndWriteIntegration(t *testing.T) {
@ -25,12 +26,10 @@ func TestConnectAndWriteIntegration(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
server := []string{fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])}
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
n := &NSQ{
Server: server[0],
Topic: "telegraf",

View File

@ -218,7 +218,7 @@ func (p *Postgresql) Write(metrics []telegraf.Metric) error {
var err error
if p.db.Stat().MaxConns() > 1 {
err = p.writeConcurrent(tableSources)
p.writeConcurrent(tableSources)
} else {
err = p.writeSequential(tableSources)
}
@ -276,15 +276,14 @@ func (p *Postgresql) writeSequential(tableSources map[string]*TableSource) error
return nil
}
func (p *Postgresql) writeConcurrent(tableSources map[string]*TableSource) error {
func (p *Postgresql) writeConcurrent(tableSources map[string]*TableSource) {
for _, tableSource := range tableSources {
select {
case p.writeChan <- tableSource:
case <-p.dbContext.Done():
return nil
return
}
}
return nil
}
func (p *Postgresql) writeWorker(ctx context.Context) {

View File

@ -11,17 +11,14 @@ import (
"time"
"github.com/docker/go-connections/nat"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/testutil"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
"github.com/influxdata/telegraf/testutil"
)
type Log struct {
@ -227,9 +224,7 @@ func newPostgresqlTest(tb testing.TB) *PostgresqlTest {
wait.ForListeningPort(nat.Port(servicePort)),
),
}
tb.Cleanup(func() {
require.NoError(tb, container.Terminate(), "terminating container failed")
})
tb.Cleanup(container.Terminate)
err := container.Start()
require.NoError(tb, err, "failed to start container")
@ -261,13 +256,13 @@ func TestPostgresqlConnectIntegration(t *testing.T) {
p := newPostgresqlTest(t)
require.NoError(t, p.Connect())
assert.EqualValues(t, 1, p.db.Stat().MaxConns())
require.EqualValues(t, 1, p.db.Stat().MaxConns())
p = newPostgresqlTest(t)
p.Connection += " pool_max_conns=2"
_ = p.Init()
require.NoError(t, p.Connect())
assert.EqualValues(t, 2, p.db.Stat().MaxConns())
require.EqualValues(t, 2, p.db.Stat().MaxConns())
}
func newMetric(
@ -319,13 +314,12 @@ func TestWriteIntegration_sequential(t *testing.T) {
dumpA := dbTableDump(t, p.db, "_a")
dumpB := dbTableDump(t, p.db, "_b")
if assert.Len(t, dumpA, 2) {
assert.EqualValues(t, 1, dumpA[0]["v"])
assert.EqualValues(t, 3, dumpA[1]["v"])
}
if assert.Len(t, dumpB, 1) {
assert.EqualValues(t, 2, dumpB[0]["v"])
}
require.Len(t, dumpA, 2)
require.EqualValues(t, 1, dumpA[0]["v"])
require.EqualValues(t, 3, dumpA[1]["v"])
require.Len(t, dumpB, 1)
require.EqualValues(t, 2, dumpB[0]["v"])
p.Logger.Clear()
require.NoError(t, p.Write(metrics))
@ -336,7 +330,7 @@ func TestWriteIntegration_sequential(t *testing.T) {
stmtCount++
}
}
assert.Equal(t, 6, stmtCount) // BEGIN, SAVEPOINT, COPY table _a, SAVEPOINT, COPY table _b, COMMIT
require.Equal(t, 6, stmtCount) // BEGIN, SAVEPOINT, COPY table _a, SAVEPOINT, COPY table _b, COMMIT
}
func TestWriteIntegration_concurrent(t *testing.T) {
@ -386,16 +380,15 @@ func TestWriteIntegration_concurrent(t *testing.T) {
dumpA := dbTableDump(t, p.db, "_a")
dumpB := dbTableDump(t, p.db, "_b")
if assert.Len(t, dumpA, 2) {
assert.EqualValues(t, 1, dumpA[0]["v"])
assert.EqualValues(t, 2, dumpA[1]["v"])
}
if assert.Len(t, dumpB, 1) {
assert.EqualValues(t, 3, dumpB[0]["v"])
}
require.Len(t, dumpA, 2)
require.EqualValues(t, 1, dumpA[0]["v"])
require.EqualValues(t, 2, dumpA[1]["v"])
require.Len(t, dumpB, 1)
require.EqualValues(t, 3, dumpB[0]["v"])
// We should have had 3 connections. One for the lock, and one for each table.
assert.EqualValues(t, 3, p.db.Stat().TotalConns())
require.EqualValues(t, 3, p.db.Stat().TotalConns())
}
// Test that the bad metric is dropped, and the rest of the batch succeeds.
@ -421,8 +414,8 @@ func TestWriteIntegration_sequentialPermError(t *testing.T) {
dumpA := dbTableDump(t, p.db, "_a")
dumpB := dbTableDump(t, p.db, "_b")
assert.Len(t, dumpA, 1)
assert.Len(t, dumpB, 2)
require.Len(t, dumpA, 1)
require.Len(t, dumpB, 2)
haveError := false
for _, l := range p.Logger.Logs() {
@ -431,7 +424,7 @@ func TestWriteIntegration_sequentialPermError(t *testing.T) {
break
}
}
assert.True(t, haveError, "write error not found in log")
require.True(t, haveError, "write error not found in log")
}
// Test that in a bach with only 1 sub-batch, that we don't return an error.
@ -482,8 +475,8 @@ func TestWriteIntegration_concurrentPermError(t *testing.T) {
dumpA := dbTableDump(t, p.db, "_a")
dumpB := dbTableDump(t, p.db, "_b")
assert.Len(t, dumpA, 1)
assert.Len(t, dumpB, 1)
require.Len(t, dumpA, 1)
require.Len(t, dumpB, 1)
}
// Verify that in sequential mode, errors are returned allowing telegraf agent to handle & retry
@ -516,11 +509,11 @@ func TestWriteIntegration_sequentialTempError(t *testing.T) {
conf := p.db.Config().ConnConfig
conf.Logger = nil
c, err := pgx.ConnectConfig(context.Background(), conf)
if !assert.NoError(t, err) {
if err != nil {
return true
}
_, err = c.Exec(context.Background(), "SELECT pg_terminate_backend($1)", pid)
assert.NoError(t, err)
require.NoError(t, err)
return true
}, false)
}()
@ -565,11 +558,11 @@ func TestWriteIntegration_concurrentTempError(t *testing.T) {
conf := p.db.Config().ConnConfig
conf.Logger = nil
c, err := pgx.ConnectConfig(context.Background(), conf)
if !assert.NoError(t, err) {
if err != nil {
return true
}
_, err = c.Exec(context.Background(), "SELECT pg_terminate_backend($1)", pid)
assert.NoError(t, err)
require.NoError(t, err)
return true
}, false)
}()
@ -583,7 +576,7 @@ func TestWriteIntegration_concurrentTempError(t *testing.T) {
p.Logger.WaitForCopy(t.Name()+"_a", false)
dumpA := dbTableDump(t, p.db, "_a")
assert.Len(t, dumpA, 1)
require.Len(t, dumpA, 1)
haveError := false
for _, l := range p.Logger.Logs() {
@ -592,7 +585,7 @@ func TestWriteIntegration_concurrentTempError(t *testing.T) {
break
}
}
assert.True(t, haveError, "write error not found in log")
require.True(t, haveError, "write error not found in log")
}
func TestWriteTagTableIntegration(t *testing.T) {
@ -611,12 +604,12 @@ func TestWriteTagTableIntegration(t *testing.T) {
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, 1)
assert.EqualValues(t, 1, dump[0]["v"])
require.EqualValues(t, 1, dump[0]["v"])
dumpTags := dbTableDump(t, p.db, p.TagTableSuffix)
require.Len(t, dumpTags, 1)
assert.EqualValues(t, dump[0]["tag_id"], dumpTags[0]["tag_id"])
assert.EqualValues(t, "foo", dumpTags[0]["tag"])
require.EqualValues(t, dump[0]["tag_id"], dumpTags[0]["tag_id"])
require.EqualValues(t, "foo", dumpTags[0]["tag"])
p.Logger.Clear()
require.NoError(t, p.Write(metrics))
@ -627,7 +620,7 @@ func TestWriteTagTableIntegration(t *testing.T) {
stmtCount++
}
}
assert.Equal(t, 3, stmtCount) // BEGIN, COPY metrics table, COMMIT
require.Equal(t, 3, stmtCount) // BEGIN, COPY metrics table, COMMIT
}
// Verify that when using TagsAsForeignKeys and a tag can't be written, that we still add the metrics.
@ -656,8 +649,8 @@ func TestWriteIntegration_tagError(t *testing.T) {
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, 2)
assert.EqualValues(t, 1, dump[0]["v"])
assert.EqualValues(t, 2, dump[1]["v"])
require.EqualValues(t, 1, dump[0]["v"])
require.EqualValues(t, 2, dump[1]["v"])
}
// Verify that when using TagsAsForeignKeys and ForeignTagConstraing and a tag can't be written, that we drop the metrics.
@ -683,7 +676,7 @@ func TestWriteIntegration_tagError_foreignConstraint(t *testing.T) {
metrics = []telegraf.Metric{
newMetric(t, "", MSS{"tag": "bar"}, MSI{"v": 2}),
}
assert.NoError(t, p.Write(metrics))
require.NoError(t, p.Write(metrics))
haveError := false
for _, l := range p.Logger.Logs() {
if strings.Contains(l.String(), "write error") {
@ -691,11 +684,11 @@ func TestWriteIntegration_tagError_foreignConstraint(t *testing.T) {
break
}
}
assert.True(t, haveError, "write error not found in log")
require.True(t, haveError, "write error not found in log")
dump := dbTableDump(t, p.db, "")
require.Len(t, dump, 1)
assert.EqualValues(t, 1, dump[0]["v"])
require.EqualValues(t, 1, dump[0]["v"])
}
func TestWriteIntegration_utf8(t *testing.T) {
@ -713,16 +706,16 @@ func TestWriteIntegration_utf8(t *testing.T) {
MSI{"АḂⲤ𝗗": "𝘢ƀ𝖼ḋếᵮℊ𝙝Ꭵ𝕛кιṃդⱺ𝓅𝘲𝕣𝖘ŧ𝑢ṽẉ𝘅ყž𝜡"},
),
}
assert.NoError(t, p.Write(metrics))
require.NoError(t, p.Write(metrics))
dump := dbTableDump(t, p.db, "Ѧ𝙱Ƈᗞ")
require.Len(t, dump, 1)
assert.EqualValues(t, "𝘢ƀ𝖼ḋếᵮℊ𝙝Ꭵ𝕛кιṃդⱺ𝓅𝘲𝕣𝖘ŧ𝑢ṽẉ𝘅ყž𝜡", dump[0]["АḂⲤ𝗗"])
require.EqualValues(t, "𝘢ƀ𝖼ḋếᵮℊ𝙝Ꭵ𝕛кιṃդⱺ𝓅𝘲𝕣𝖘ŧ𝑢ṽẉ𝘅ყž𝜡", dump[0]["АḂⲤ𝗗"])
dumpTags := dbTableDump(t, p.db, "Ѧ𝙱Ƈᗞ"+p.TagTableSuffix)
require.Len(t, dumpTags, 1)
assert.EqualValues(t, dump[0]["tag_id"], dumpTags[0]["tag_id"])
assert.EqualValues(t, "𝘈Ḇ𝖢𝕯٤ḞԍНǏ𝙅ƘԸⲘ𝙉০Ρ𝗤Ɍ𝓢ȚЦ𝒱Ѡ𝓧ƳȤ", dumpTags[0]["ăѣ𝔠ծ"])
require.EqualValues(t, dump[0]["tag_id"], dumpTags[0]["tag_id"])
require.EqualValues(t, "𝘈Ḇ𝖢𝕯٤ḞԍНǏ𝙅ƘԸⲘ𝙉০Ρ𝗤Ɍ𝓢ȚЦ𝒱Ѡ𝓧ƳȤ", dumpTags[0]["ăѣ𝔠ծ"])
}
func TestWriteIntegration_UnsignedIntegers(t *testing.T) {
@ -748,9 +741,8 @@ func TestWriteIntegration_UnsignedIntegers(t *testing.T) {
dump := dbTableDump(t, p.db, "")
if assert.Len(t, dump, 1) {
assert.EqualValues(t, uint64(math.MaxUint64), dump[0]["v"])
}
require.Len(t, dump, 1)
require.EqualValues(t, uint64(math.MaxUint64), dump[0]["v"])
}
// Last ditch effort to find any concurrency issues.
@ -790,9 +782,9 @@ func TestStressConcurrencyIntegration(t *testing.T) {
wgStart.Wait()
err := p.Write(mShuf)
assert.NoError(t, err)
assert.NoError(t, p.Close())
assert.False(t, p.Logger.HasLevel(pgx.LogLevelWarn))
require.NoError(t, err)
require.NoError(t, p.Close())
require.False(t, p.Logger.HasLevel(pgx.LogLevelWarn))
wgDone.Done()
}()
}

View File

@ -19,13 +19,12 @@ func newColumnList() *columnList {
}
}
func (cl *columnList) Add(column utils.Column) bool {
func (cl *columnList) Add(column utils.Column) {
if _, ok := cl.indices[column.Name]; ok {
return false
return
}
cl.columns = append(cl.columns, column)
cl.indices[column.Name] = len(cl.columns) - 1
return true
}
func (cl *columnList) Remove(name string) bool {

View File

@ -7,14 +7,13 @@ import (
"github.com/coocood/freecache"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
)
func TestTableSource(t *testing.T) {
func TestTableSource(_ *testing.T) {
}
type source interface {
@ -53,11 +52,11 @@ func TestTableSourceIntegration_tagJSONB(t *testing.T) {
row := nextSrcRow(tsrc)
require.NoError(t, tsrc.Err())
assert.IsType(t, time.Time{}, row["time"])
require.IsType(t, time.Time{}, row["time"])
var tags MSI
require.NoError(t, json.Unmarshal(row["tags"].([]byte), &tags))
assert.EqualValues(t, MSI{"a": "one", "b": "two"}, tags)
assert.EqualValues(t, 1, row["v"])
require.EqualValues(t, MSI{"a": "one", "b": "two"}, tags)
require.EqualValues(t, 1, row["v"])
}
func TestTableSourceIntegration_tagTable(t *testing.T) {
@ -76,11 +75,11 @@ func TestTableSourceIntegration_tagTable(t *testing.T) {
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
ttsrc := NewTagTableSource(tsrc)
ttrow := nextSrcRow(ttsrc)
assert.EqualValues(t, "one", ttrow["a"])
assert.EqualValues(t, "two", ttrow["b"])
require.EqualValues(t, "one", ttrow["a"])
require.EqualValues(t, "two", ttrow["b"])
row := nextSrcRow(tsrc)
assert.Equal(t, row["tag_id"], ttrow["tag_id"])
require.Equal(t, row["tag_id"], ttrow["tag_id"])
}
func TestTableSourceIntegration_tagTableJSONB(t *testing.T) {
@ -102,7 +101,7 @@ func TestTableSourceIntegration_tagTableJSONB(t *testing.T) {
ttrow := nextSrcRow(ttsrc)
var tags MSI
require.NoError(t, json.Unmarshal(ttrow["tags"].([]byte), &tags))
assert.EqualValues(t, MSI{"a": "one", "b": "two"}, tags)
require.EqualValues(t, MSI{"a": "one", "b": "two"}, tags)
}
func TestTableSourceIntegration_fieldsJSONB(t *testing.T) {
@ -122,7 +121,7 @@ func TestTableSourceIntegration_fieldsJSONB(t *testing.T) {
var fields MSI
require.NoError(t, json.Unmarshal(row["fields"].([]byte), &fields))
// json unmarshals numbers as floats
assert.EqualValues(t, MSI{"a": 1.0, "b": 2.0}, fields)
require.EqualValues(t, MSI{"a": 1.0, "b": 2.0}, fields)
}
// TagsAsForeignKeys=false
@ -151,9 +150,9 @@ func TestTableSourceIntegration_DropColumn_tag(t *testing.T) {
_ = tsrc.DropColumn(col)
row := nextSrcRow(tsrc)
assert.EqualValues(t, "one", row["a"])
assert.EqualValues(t, 2, row["v"])
assert.False(t, tsrc.Next())
require.EqualValues(t, "one", row["a"])
require.EqualValues(t, 2, row["v"])
require.False(t, tsrc.Next())
}
// TagsAsForeignKeys=true, ForeignTagConstraint=true
@ -186,12 +185,12 @@ func TestTableSourceIntegration_DropColumn_tag_fkTrue_fcTrue(t *testing.T) {
ttsrc := NewTagTableSource(tsrc)
row := nextSrcRow(ttsrc)
assert.EqualValues(t, "one", row["a"])
assert.False(t, ttsrc.Next())
require.EqualValues(t, "one", row["a"])
require.False(t, ttsrc.Next())
row = nextSrcRow(tsrc)
assert.EqualValues(t, 2, row["v"])
assert.False(t, tsrc.Next())
require.EqualValues(t, 2, row["v"])
require.False(t, tsrc.Next())
}
// TagsAsForeignKeys=true, ForeignTagConstraint=false
@ -224,13 +223,13 @@ func TestTableSourceIntegration_DropColumn_tag_fkTrue_fcFalse(t *testing.T) {
ttsrc := NewTagTableSource(tsrc)
row := nextSrcRow(ttsrc)
assert.EqualValues(t, "one", row["a"])
assert.False(t, ttsrc.Next())
require.EqualValues(t, "one", row["a"])
require.False(t, ttsrc.Next())
row = nextSrcRow(tsrc)
assert.EqualValues(t, 1, row["v"])
require.EqualValues(t, 1, row["v"])
row = nextSrcRow(tsrc)
assert.EqualValues(t, 2, row["v"])
require.EqualValues(t, 2, row["v"])
}
// Test that when a field is dropped, only the field is dropped, and all rows remain, unless it was the only field.
@ -258,9 +257,9 @@ func TestTableSourceIntegration_DropColumn_field(t *testing.T) {
_ = tsrc.DropColumn(col)
row := nextSrcRow(tsrc)
assert.EqualValues(t, "foo", row["tag"])
assert.EqualValues(t, 3, row["b"])
assert.False(t, tsrc.Next())
require.EqualValues(t, "foo", row["tag"])
require.EqualValues(t, 3, row["b"])
require.False(t, tsrc.Next())
}
func TestTableSourceIntegration_InconsistentTags(t *testing.T) {
@ -277,12 +276,12 @@ func TestTableSourceIntegration_InconsistentTags(t *testing.T) {
tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
trow := nextSrcRow(tsrc)
assert.EqualValues(t, "1", trow["a"])
assert.EqualValues(t, nil, trow["c"])
require.EqualValues(t, "1", trow["a"])
require.EqualValues(t, nil, trow["c"])
trow = nextSrcRow(tsrc)
assert.EqualValues(t, nil, trow["a"])
assert.EqualValues(t, "3", trow["c"])
require.EqualValues(t, nil, trow["a"])
require.EqualValues(t, "3", trow["c"])
}
func TestTagTableSourceIntegration_InconsistentTags(t *testing.T) {
@ -313,5 +312,5 @@ func TestTagTableSourceIntegration_InconsistentTags(t *testing.T) {
actual = append(actual, row)
}
assert.ElementsMatch(t, expected, actual)
require.ElementsMatch(t, expected, actual)
}

View File

@ -5,9 +5,10 @@ import (
"testing"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/testutil"
)
func TestConnectAndWrite(t *testing.T) {
@ -40,9 +41,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
}
require.NoError(t, container.Start(), "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
redis := &RedisTimeSeries{
Address: fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort]),
}

View File

@ -5,9 +5,10 @@ import (
"testing"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/testutil"
)
func TestConnectAndWrite(t *testing.T) {
@ -26,9 +27,7 @@ func TestConnectAndWrite(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
url := fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])

View File

@ -143,7 +143,7 @@ func (sw *SocketWriter) Close() error {
}
func newSocketWriter() *SocketWriter {
s, _ := serializers.NewInfluxSerializer()
s := serializers.NewInfluxSerializer()
return &SocketWriter{
Serializer: s,
}

View File

@ -191,9 +191,7 @@ func TestMysqlIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
//use the plugin to write to the database
address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v",
@ -273,9 +271,7 @@ func TestPostgresIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
//use the plugin to write to the database
// host, port, username, password, dbname
@ -362,9 +358,7 @@ func TestClickHouseIntegration(t *testing.T) {
}
err = container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
//use the plugin to write to the database
// host, port, username, password, dbname

View File

@ -6,12 +6,11 @@ import (
"time"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
)
func TestConnectAndWrite(t *testing.T) {
@ -27,9 +26,7 @@ func TestConnectAndWrite(t *testing.T) {
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
defer container.Terminate()
var url = fmt.Sprintf("%s:%s", container.Address, container.Ports[servicePort])
s, err := serializers.NewJSONSerializer(10*time.Second, "yyy-dd-mmThh:mm:ss", "")
require.NoError(t, err)

View File

@ -38,11 +38,11 @@ func (h *MetricHandler) SetTimeFunc(f TimeFunc) {
h.timeFunc = f
}
func (h *MetricHandler) Metric() (telegraf.Metric, error) {
func (h *MetricHandler) Metric() telegraf.Metric {
if h.metric.Time().IsZero() {
h.metric.SetTime(h.timeFunc().Truncate(h.timePrecision))
}
return h.metric, nil
return h.metric
}
func (h *MetricHandler) SetMeasurement(name []byte) error {

View File

@ -100,11 +100,7 @@ func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
}
}
metric, err := p.handler.Metric()
if err != nil {
return nil, err
}
metric := p.handler.Metric()
if metric == nil {
continue
}
@ -226,12 +222,7 @@ func (sp *StreamParser) Next() (telegraf.Metric, error) {
}
}
metric, err := sp.handler.Metric()
if err != nil {
return nil, err
}
return metric, nil
return sp.handler.Metric(), nil
}
// Position returns the current byte offset into the data.

View File

@ -573,7 +573,7 @@ func (p *Parser) isExcluded(key string) bool {
return false
}
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
func (p *Parser) ParseLine(_ string) (telegraf.Metric, error) {
return nil, fmt.Errorf("ParseLine is designed for parsing influx line protocol, therefore not implemented for parsing JSON")
}

View File

@ -5,12 +5,12 @@ import (
"math"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)
type Parser struct {
@ -84,7 +84,7 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func (p *Parser) InitFromConfig(config *parsers.Config) error {
func (p *Parser) InitFromConfig(_ *parsers.Config) error {
return nil
}

View File

@ -151,7 +151,7 @@ func TestMain(m *testing.M) {
func runCountMultiplierProgram() {
fieldName := os.Getenv("FIELD_NAME")
parser := influx.NewStreamParser(os.Stdin)
serializer, _ := serializers.NewInfluxSerializer()
serializer := serializers.NewInfluxSerializer()
for {
m, err := parser.Next()

View File

@ -197,7 +197,7 @@ func TestSerializeMetricString(t *testing.T) {
}
func TestSerializeMetricBool(t *testing.T) {
requireMetric := func(t *testing.T, tim time.Time, value bool) telegraf.Metric {
requireMetric := func(tim time.Time, value bool) telegraf.Metric {
tags := map[string]string{
"tag_name": "tag_value",
}
@ -218,22 +218,22 @@ func TestSerializeMetricBool(t *testing.T) {
expected string
}{
{
metric: requireMetric(t, now, false),
metric: requireMetric(now, false),
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu field=java_lang_GarbageCollector_Valid tag_name=tag_value 0 %d\n", now.Unix()),
},
{
metric: requireMetric(t, now, false),
metric: requireMetric(now, false),
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu_java_lang_GarbageCollector_Valid tag_name=tag_value 0 %d\n", now.Unix()),
},
{
metric: requireMetric(t, now, true),
metric: requireMetric(now, true),
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu field=java_lang_GarbageCollector_Valid tag_name=tag_value 1 %d\n", now.Unix()),
},
{
metric: requireMetric(t, now, true),
metric: requireMetric(now, true),
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu_java_lang_GarbageCollector_Valid tag_name=tag_value 1 %d\n", now.Unix()),
},

View File

@ -112,7 +112,7 @@ func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
s.buf.Reset()
for _, m := range metrics {
_, err := s.Write(&s.buf, m)
err := s.Write(&s.buf, m)
if err != nil {
if _, ok := err.(*MetricError); ok {
continue
@ -124,9 +124,8 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
copy(out, s.buf.Bytes())
return out, nil
}
func (s *Serializer) Write(w io.Writer, m telegraf.Metric) (int, error) {
err := s.writeMetric(w, m)
return s.bytesWritten, err
func (s *Serializer) Write(w io.Writer, m telegraf.Metric) error {
return s.writeMetric(w, m)
}
func (s *Serializer) writeString(w io.Writer, str string) error {

View File

@ -49,7 +49,7 @@ func (r *reader) Read(p []byte) (int, error) {
}
for _, metric := range r.metrics[r.offset:] {
_, err := r.serializer.Write(r.buf, metric)
err := r.serializer.Write(r.buf, metric)
r.offset++
if err != nil {
r.buf.Reset()

View File

@ -4,8 +4,9 @@ import (
"bytes"
"time"
"github.com/influxdata/telegraf"
"github.com/prometheus/common/expfmt"
"github.com/influxdata/telegraf"
)
// TimestampExport controls if the output contains timestamps.
@ -46,9 +47,8 @@ type Serializer struct {
config FormatConfig
}
func NewSerializer(config FormatConfig) (*Serializer, error) {
s := &Serializer{config: config}
return s, nil
func NewSerializer(config FormatConfig) *Serializer {
return &Serializer{config: config}
}
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {

View File

@ -5,9 +5,10 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestSerialize(t *testing.T) {
@ -181,13 +182,13 @@ cpu_time_idle{host="example.org"} 42
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, err := NewSerializer(FormatConfig{
s := NewSerializer(FormatConfig{
MetricSortOrder: SortMetrics,
TimestampExport: tt.config.TimestampExport,
StringHandling: tt.config.StringHandling,
CompactEncoding: tt.config.CompactEncoding,
})
require.NoError(t, err)
actual, err := s.Serialize(tt.metric)
require.NoError(t, err)
@ -697,12 +698,11 @@ rpc_duration_seconds_count 2693
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, err := NewSerializer(FormatConfig{
s := NewSerializer(FormatConfig{
MetricSortOrder: SortMetrics,
TimestampExport: tt.config.TimestampExport,
StringHandling: tt.config.StringHandling,
})
require.NoError(t, err)
actual, err := s.SerializeBatch(tt.metrics)
require.NoError(t, err)

View File

@ -43,9 +43,8 @@ type Serializer struct {
config FormatConfig
}
func NewSerializer(config FormatConfig) (*Serializer, error) {
s := &Serializer{config: config}
return s, nil
func NewSerializer(config FormatConfig) *Serializer {
return &Serializer{config: config}
}
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {

View File

@ -10,10 +10,10 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestRemoteWriteSerialize(t *testing.T) {
@ -134,11 +134,10 @@ http_request_duration_seconds_bucket{le="0.5"} 129389
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, err := NewSerializer(FormatConfig{
s := NewSerializer(FormatConfig{
MetricSortOrder: SortMetrics,
StringHandling: tt.config.StringHandling,
})
require.NoError(t, err)
data, err := s.Serialize(tt.metric)
require.NoError(t, err)
actual, err := prompbToText(data)
@ -639,11 +638,10 @@ rpc_duration_seconds_sum 17560473
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, err := NewSerializer(FormatConfig{
s := NewSerializer(FormatConfig{
MetricSortOrder: SortMetrics,
StringHandling: tt.config.StringHandling,
})
require.NoError(t, err)
data, err := s.SerializeBatch(tt.metrics)
require.NoError(t, err)
actual, err := prompbToText(data)

View File

@ -146,25 +146,25 @@ func NewSerializer(config *Config) (Serializer, error) {
case "csv":
serializer, err = NewCSVSerializer(config)
case "influx":
serializer, err = NewInfluxSerializerConfig(config)
serializer, err = NewInfluxSerializerConfig(config), nil
case "graphite":
serializer, err = NewGraphiteSerializer(config.Prefix, config.Template, config.GraphiteTagSupport, config.GraphiteTagSanitizeMode, config.GraphiteSeparator, config.Templates)
case "json":
serializer, err = NewJSONSerializer(config.TimestampUnits, config.TimestampFormat, config.Transformation)
case "splunkmetric":
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric, config.SplunkmetricOmitEventTag)
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric, config.SplunkmetricOmitEventTag), nil
case "nowmetric":
serializer, err = NewNowSerializer()
case "carbon2":
serializer, err = NewCarbon2Serializer(config.Carbon2Format, config.Carbon2SanitizeReplaceChar)
case "wavefront":
serializer, err = NewWavefrontSerializer(config.Prefix, config.WavefrontUseStrict, config.WavefrontSourceOverride, config.WavefrontDisablePrefixConversion)
serializer, err = NewWavefrontSerializer(config.Prefix, config.WavefrontUseStrict, config.WavefrontSourceOverride, config.WavefrontDisablePrefixConversion), nil
case "prometheus":
serializer, err = NewPrometheusSerializer(config)
serializer, err = NewPrometheusSerializer(config), nil
case "prometheusremotewrite":
serializer, err = NewPrometheusRemoteWriteSerializer(config)
serializer, err = NewPrometheusRemoteWriteSerializer(config), nil
case "msgpack":
serializer, err = NewMsgpackSerializer()
serializer, err = NewMsgpackSerializer(), nil
default:
err = fmt.Errorf("invalid data format: %s", config.DataFormat)
}
@ -175,7 +175,7 @@ func NewCSVSerializer(config *Config) (Serializer, error) {
return csv.NewSerializer(config.TimestampFormat, config.CSVSeparator, config.CSVHeader, config.CSVPrefix)
}
func NewPrometheusRemoteWriteSerializer(config *Config) (Serializer, error) {
func NewPrometheusRemoteWriteSerializer(config *Config) Serializer {
sortMetrics := prometheusremotewrite.NoSortMetrics
if config.PrometheusExportTimestamp {
sortMetrics = prometheusremotewrite.SortMetrics
@ -192,7 +192,7 @@ func NewPrometheusRemoteWriteSerializer(config *Config) (Serializer, error) {
})
}
func NewPrometheusSerializer(config *Config) (Serializer, error) {
func NewPrometheusSerializer(config *Config) Serializer {
exportTimestamp := prometheus.NoExportTimestamp
if config.PrometheusExportTimestamp {
exportTimestamp = prometheus.ExportTimestamp
@ -216,7 +216,7 @@ func NewPrometheusSerializer(config *Config) (Serializer, error) {
})
}
func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []string, disablePrefixConversions bool) (Serializer, error) {
func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []string, disablePrefixConversions bool) Serializer {
return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions)
}
@ -228,7 +228,7 @@ func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar strin
return carbon2.NewSerializer(carbon2format, carbon2SanitizeReplaceChar)
}
func NewSplunkmetricSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool, splunkmetricOmitEventTag bool) (Serializer, error) {
func NewSplunkmetricSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool, splunkmetricOmitEventTag bool) Serializer {
return splunkmetric.NewSerializer(splunkmetricHecRouting, splunkmetricMultimetric, splunkmetricOmitEventTag)
}
@ -236,7 +236,7 @@ func NewNowSerializer() (Serializer, error) {
return nowmetric.NewSerializer()
}
func NewInfluxSerializerConfig(config *Config) (Serializer, error) {
func NewInfluxSerializerConfig(config *Config) Serializer {
var sort influx.FieldSortOrder
if config.InfluxSortFields {
sort = influx.SortFields
@ -251,11 +251,11 @@ func NewInfluxSerializerConfig(config *Config) (Serializer, error) {
s.SetMaxLineBytes(config.InfluxMaxLineBytes)
s.SetFieldSortOrder(sort)
s.SetFieldTypeSupport(typeSupport)
return s, nil
return s
}
func NewInfluxSerializer() (Serializer, error) {
return influx.NewSerializer(), nil
func NewInfluxSerializer() Serializer {
return influx.NewSerializer()
}
func NewGraphiteSerializer(prefix, template string, tagSupport bool, tagSanitizeMode string, separator string, templates []string) (Serializer, error) {
@ -287,6 +287,6 @@ func NewGraphiteSerializer(prefix, template string, tagSupport bool, tagSanitize
}, nil
}
func NewMsgpackSerializer() (Serializer, error) {
return msgpack.NewSerializer(), nil
func NewMsgpackSerializer() Serializer {
return msgpack.NewSerializer()
}

View File

@ -31,14 +31,14 @@ type HECTimeSeries struct {
}
// NewSerializer Setup our new serializer
func NewSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool, splunkmetricOmitEventTag bool) (*serializer, error) {
func NewSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool, splunkmetricOmitEventTag bool) *serializer {
/* Define output params */
s := &serializer{
HecRouting: splunkmetricHecRouting,
SplunkmetricMultiMetric: splunkmetricMultimetric,
OmitEventTag: splunkmetricOmitEventTag,
}
return s, nil
return s
}
func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) {

View File

@ -21,7 +21,7 @@ func TestSerializeMetricFloat(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, _ := NewSerializer(false, false, false)
s := NewSerializer(false, false, false)
var buf []byte
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -40,7 +40,7 @@ func TestSerializeMetricFloatHec(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, _ := NewSerializer(true, false, false)
s := NewSerializer(true, false, false)
var buf []byte
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -58,7 +58,7 @@ func TestSerializeMetricInt(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, _ := NewSerializer(false, false, false)
s := NewSerializer(false, false, false)
var buf []byte
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -77,7 +77,7 @@ func TestSerializeMetricIntHec(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, _ := NewSerializer(true, false, false)
s := NewSerializer(true, false, false)
var buf []byte
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -96,7 +96,7 @@ func TestSerializeMetricBool(t *testing.T) {
}
m := metric.New("docker", tags, fields, now)
s, _ := NewSerializer(false, false, false)
s := NewSerializer(false, false, false)
var buf []byte
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -115,7 +115,7 @@ func TestSerializeMetricBoolHec(t *testing.T) {
}
m := metric.New("docker", tags, fields, now)
s, _ := NewSerializer(true, false, false)
s := NewSerializer(true, false, false)
var buf []byte
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -135,7 +135,7 @@ func TestSerializeMetricString(t *testing.T) {
}
m := metric.New("cpu", tags, fields, now)
s, _ := NewSerializer(false, false, false)
s := NewSerializer(false, false, false)
var buf []byte
buf, err := s.Serialize(m)
require.NoError(t, err)
@ -165,7 +165,7 @@ func TestSerializeBatch(t *testing.T) {
)
metrics := []telegraf.Metric{m, n}
s, _ := NewSerializer(false, false, false)
s := NewSerializer(false, false, false)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
@ -185,7 +185,7 @@ func TestSerializeMulti(t *testing.T) {
)
metrics := []telegraf.Metric{m}
s, _ := NewSerializer(false, true, false)
s := NewSerializer(false, true, false)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
@ -211,7 +211,7 @@ func TestSerializeBatchHec(t *testing.T) {
time.Unix(0, 0),
)
metrics := []telegraf.Metric{m, n}
s, _ := NewSerializer(true, false, false)
s := NewSerializer(true, false, false)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
@ -231,7 +231,7 @@ func TestSerializeMultiHec(t *testing.T) {
)
metrics := []telegraf.Metric{m}
s, _ := NewSerializer(true, true, false)
s := NewSerializer(true, true, false)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
@ -251,7 +251,7 @@ func TestSerializeOmitEvent(t *testing.T) {
)
metrics := []telegraf.Metric{m}
s, _ := NewSerializer(true, true, true)
s := NewSerializer(true, true, true)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)

View File

@ -48,14 +48,14 @@ type MetricPoint struct {
Tags map[string]string
}
func NewSerializer(prefix string, useStrict bool, sourceOverride []string, disablePrefixConversion bool) (*WavefrontSerializer, error) {
func NewSerializer(prefix string, useStrict bool, sourceOverride []string, disablePrefixConversion bool) *WavefrontSerializer {
s := &WavefrontSerializer{
Prefix: prefix,
UseStrict: useStrict,
SourceOverride: sourceOverride,
DisablePrefixConversions: disablePrefixConversion,
}
return s, nil
return s
}
func (s *WavefrontSerializer) serializeMetric(m telegraf.Metric) {

View File

@ -81,7 +81,7 @@ func (c *Container) Start() error {
err = c.LookupMappedPorts()
if err != nil {
_ = c.Terminate()
c.Terminate()
return fmt.Errorf("port lookup failed: %s", err)
}
@ -132,7 +132,7 @@ func (c *Container) PrintLogs() {
fmt.Println("--- Container Logs End ---")
}
func (c *Container) Terminate() error {
func (c *Container) Terminate() {
err := c.container.StopLogProducer()
if err != nil {
fmt.Println(err)
@ -144,6 +144,4 @@ func (c *Container) Terminate() error {
}
c.PrintLogs()
return nil
}

View File

@ -18,8 +18,7 @@ func TestEmptyContainerIntegration(t *testing.T) {
err := container.Start()
require.NoError(t, err)
err = container.Terminate()
require.NoError(t, err)
container.Terminate()
}
func TestMappedPortLookupIntegration(t *testing.T) {
@ -54,8 +53,7 @@ func TestMappedPortLookupIntegration(t *testing.T) {
require.Equal(t, tc.expected, container.Ports["80"])
}
err = container.Terminate()
require.NoError(t, err)
container.Terminate()
}
}

View File

@ -7,9 +7,10 @@ import (
"os"
"path/filepath"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/toml"
"github.com/influxdata/toml/ast"
"github.com/influxdata/telegraf/config"
)
type pluginState map[string]bool
@ -51,7 +52,7 @@ func ImportConfigurations(files, dirs []string) (*selection, int, error) {
return &sel, len(filenames), err
}
func (s *selection) Filter(p packageCollection) (*packageCollection, error) {
func (s *selection) Filter(p packageCollection) *packageCollection {
enabled := packageCollection{
packages: map[string][]packageInfo{},
}
@ -87,7 +88,7 @@ func (s *selection) Filter(p packageCollection) (*packageCollection, error) {
enabled.packages["parsers"] = parsers
}
return &enabled, nil
return &enabled
}
func (s *selection) importFiles(configurations []string) error {

View File

@ -113,10 +113,7 @@ func main() {
// Process the plugin list with the given config. This will
// only keep the plugins that adhere to the filtering criteria.
enabled, err := cfg.Filter(packages)
if err != nil {
log.Fatalf("Filtering plugins failed: %v", err)
}
enabled := cfg.Filter(packages)
if !quiet {
enabled.Print()
}