diff --git a/plugins/outputs/amon/amon.go b/plugins/outputs/amon/amon.go index 5bbbba981..952d3b023 100644 --- a/plugins/outputs/amon/amon.go +++ b/plugins/outputs/amon/amon.go @@ -142,7 +142,7 @@ func (p *Point) setValue(v interface{}) error { case float32: p[1] = float64(d) case float64: - p[1] = float64(d) + p[1] = d default: return fmt.Errorf("undeterminable type") } diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 95da1f99b..5224928f7 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -5,13 +5,14 @@ import ( "strings" "time" + "github.com/streadway/amqp" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" - "github.com/streadway/amqp" ) const ( @@ -180,11 +181,11 @@ func (q *AMQP) SetSerializer(serializer serializers.Serializer) { func (q *AMQP) Connect() error { if q.config == nil { - config, err := q.makeClientConfig() + clientConfig, err := q.makeClientConfig() if err != nil { return err } - q.config = config + q.config = clientConfig } var err error @@ -251,8 +252,8 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { if err != nil { // If this is the first attempt to publish and the connection is // closed, try to reconnect and retry once. + //nolint: revive // Simplifying if-else with early return will reduce clarity if aerr, ok := err.(*amqp.Error); first && ok && aerr == amqp.ErrClosed { - first = false q.client = nil err := q.publish(key, body) if err != nil { @@ -268,7 +269,9 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { if q.sentMessages >= q.MaxMessages && q.MaxMessages > 0 { q.Log.Debug("Sent MaxMessages; closing connection") - q.client.Close() + if err := q.client.Close(); err != nil { + q.Log.Errorf("Closing connection failed: %v", err) + } q.client = nil } @@ -315,52 +318,53 @@ func (q *AMQP) serialize(metrics []telegraf.Metric) ([]byte, error) { } func (q *AMQP) makeClientConfig() (*ClientConfig, error) { - config := &ClientConfig{ + clientConfig := &ClientConfig{ exchange: q.Exchange, exchangeType: q.ExchangeType, exchangePassive: q.ExchangePassive, encoding: q.ContentEncoding, timeout: time.Duration(q.Timeout), + log: q.Log, } switch q.ExchangeDurability { case "transient": - config.exchangeDurable = false + clientConfig.exchangeDurable = false default: - config.exchangeDurable = true + clientConfig.exchangeDurable = true } - config.brokers = q.Brokers - if len(config.brokers) == 0 { - config.brokers = []string{q.URL} + clientConfig.brokers = q.Brokers + if len(clientConfig.brokers) == 0 { + clientConfig.brokers = []string{q.URL} } switch q.DeliveryMode { case "transient": - config.deliveryMode = amqp.Transient + clientConfig.deliveryMode = amqp.Transient case "persistent": - config.deliveryMode = amqp.Persistent + clientConfig.deliveryMode = amqp.Persistent default: - config.deliveryMode = amqp.Transient + clientConfig.deliveryMode = amqp.Transient } if len(q.Headers) > 0 { - config.headers = make(amqp.Table, len(q.Headers)) + clientConfig.headers = make(amqp.Table, len(q.Headers)) for k, v := range q.Headers { - config.headers[k] = v + clientConfig.headers[k] = v } } else { // Copy deprecated fields into message header - config.headers = amqp.Table{ + clientConfig.headers = amqp.Table{ "database": q.Database, "retention_policy": q.RetentionPolicy, } } if len(q.ExchangeArguments) > 0 { - config.exchangeArguments = make(amqp.Table, len(q.ExchangeArguments)) + clientConfig.exchangeArguments = make(amqp.Table, len(q.ExchangeArguments)) for k, v := range q.ExchangeArguments { - config.exchangeArguments[k] = v + clientConfig.exchangeArguments[k] = v } } @@ -368,7 +372,7 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) { if err != nil { return nil, err } - config.tlsConfig = tlsConfig + clientConfig.tlsConfig = tlsConfig var auth []amqp.Authentication if strings.ToUpper(q.AuthMethod) == "EXTERNAL" { @@ -381,13 +385,13 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) { }, } } - config.auth = auth + clientConfig.auth = auth - return config, nil + return clientConfig, nil } -func connect(config *ClientConfig) (Client, error) { - return Connect(config) +func connect(clientConfig *ClientConfig) (Client, error) { + return Connect(clientConfig) } func init() { diff --git a/plugins/outputs/amqp/client.go b/plugins/outputs/amqp/client.go index 8c230b706..af0ef5470 100644 --- a/plugins/outputs/amqp/client.go +++ b/plugins/outputs/amqp/client.go @@ -4,12 +4,13 @@ import ( "crypto/tls" "errors" "fmt" - "log" "math/rand" "net" "time" "github.com/streadway/amqp" + + "github.com/influxdata/telegraf" ) type ClientConfig struct { @@ -25,6 +26,7 @@ type ClientConfig struct { tlsConfig *tls.Config timeout time.Duration auth []amqp.Authentication + log telegraf.Logger } type client struct { @@ -42,7 +44,7 @@ func Connect(config *ClientConfig) (*client, error) { p := rand.Perm(len(config.brokers)) for _, n := range p { broker := config.brokers[n] - log.Printf("D! Output [amqp] connecting to %q", broker) + config.log.Debugf("Connecting to %q", broker) conn, err := amqp.DialConfig( broker, amqp.Config{ TLSClientConfig: config.tlsConfig, @@ -53,10 +55,10 @@ func Connect(config *ClientConfig) (*client, error) { }) if err == nil { client.conn = conn - log.Printf("D! Output [amqp] connected to %q", broker) + config.log.Debugf("Connected to %q", broker) break } - log.Printf("D! Output [amqp] error connecting to %q - %s", broker, err.Error()) + config.log.Debugf("Error connecting to %q - %v", broker, err.Error()) } if client.conn == nil { diff --git a/plugins/outputs/application_insights/application_insights_test.go b/plugins/outputs/application_insights/application_insights_test.go index b685f6c31..fd0759343 100644 --- a/plugins/outputs/application_insights/application_insights_test.go +++ b/plugins/outputs/application_insights/application_insights_test.go @@ -5,21 +5,18 @@ import ( "testing" "time" - "github.com/influxdata/telegraf/testutil" - "github.com/microsoft/ApplicationInsights-Go/appinsights" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/application_insights/mocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" + "github.com/influxdata/telegraf/testutil" ) func TestConnectFailsIfNoIkey(t *testing.T) { - assert := assert.New(t) - transmitter := new(mocks.Transmitter) transmitter.On("Close").Return(closed) @@ -31,12 +28,10 @@ func TestConnectFailsIfNoIkey(t *testing.T) { } err := ai.Connect() - assert.Error(err) + require.Error(t, err) } func TestOutputCloseTimesOut(t *testing.T) { - assert := assert.New(t) - transmitter := new(mocks.Transmitter) transmitter.On("Close").Return(unfinished) @@ -47,13 +42,11 @@ func TestOutputCloseTimesOut(t *testing.T) { } err := ai.Close() - assert.NoError(err) + require.NoError(t, err) transmitter.AssertCalled(t, "Close") } func TestCloseRemovesDiagMsgListener(t *testing.T) { - assert := assert.New(t) - transmitter := new(mocks.Transmitter) transmitter.On("Close").Return(closed) @@ -75,11 +68,11 @@ func TestCloseRemovesDiagMsgListener(t *testing.T) { } err := ai.Connect() - assert.NoError(err) + require.NoError(t, err) diagMsgSubscriber.AssertCalled(t, "Subscribe", mock.AnythingOfType("appinsights.DiagnosticsMessageHandler")) err = ai.Close() - assert.NoError(err) + require.NoError(t, err) transmitter.AssertCalled(t, "Close") diagMsgListener.AssertCalled(t, "Remove") } @@ -137,7 +130,6 @@ func TestAggregateMetricCreated(t *testing.T) { for _, tt := range tests { tf := func(t *testing.T) { - assert := assert.New(t) now := time.Now().UTC() transmitter := new(mocks.Transmitter) @@ -158,17 +150,18 @@ func TestAggregateMetricCreated(t *testing.T) { } err := ai.Connect() - assert.NoError(err) + require.NoError(t, err) mSet := []telegraf.Metric{m} - ai.Write(mSet) + err = ai.Write(mSet) + require.NoError(t, err) transmitter.AssertNumberOfCalls(t, "Track", 1+len(tt.additionalMetricValueFields)) var pAggregateTelemetry *appinsights.AggregateMetricTelemetry - assert.IsType(pAggregateTelemetry, transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0), "Expected last telemetry to be AggregateMetricTelemetry") + require.IsType(t, pAggregateTelemetry, transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0), "Expected last telemetry to be AggregateMetricTelemetry") aggregateTelemetry := transmitter.Calls[len(transmitter.Calls)-1].Arguments.Get(0).(*appinsights.AggregateMetricTelemetry) - verifyAggregateTelemetry(assert, m, tt.valueField, tt.countField, aggregateTelemetry) + verifyAggregateTelemetry(t, m, tt.valueField, tt.countField, aggregateTelemetry) - verifyAdditionalTelemetry(assert, m, transmitter, tt.additionalMetricValueFields, metricName) + verifyAdditionalTelemetry(t, m, transmitter, tt.additionalMetricValueFields, metricName) } t.Run(tt.name, tf) @@ -195,7 +188,6 @@ func TestSimpleMetricCreated(t *testing.T) { for _, tt := range tests { tf := func(t *testing.T) { - assert := assert.New(t) now := time.Now().UTC() transmitter := new(mocks.Transmitter) @@ -216,10 +208,11 @@ func TestSimpleMetricCreated(t *testing.T) { } err := ai.Connect() - assert.NoError(err) + require.NoError(t, err) mSet := []telegraf.Metric{m} - ai.Write(mSet) + err = ai.Write(mSet) + require.NoError(t, err) expectedNumberOfCalls := len(tt.additionalMetricValueFields) if tt.primaryMetricValueField != "" { @@ -229,7 +222,7 @@ func TestSimpleMetricCreated(t *testing.T) { transmitter.AssertNumberOfCalls(t, "Track", expectedNumberOfCalls) if tt.primaryMetricValueField != "" { var pMetricTelemetry *appinsights.MetricTelemetry - assert.IsType(pMetricTelemetry, transmitter.Calls[0].Arguments.Get(0), "First created telemetry should be simple MetricTelemetry") + require.IsType(t, pMetricTelemetry, transmitter.Calls[0].Arguments.Get(0), "First created telemetry should be simple MetricTelemetry") metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.MetricTelemetry) var expectedTelemetryName string @@ -238,10 +231,10 @@ func TestSimpleMetricCreated(t *testing.T) { } else { expectedTelemetryName = m.Name() + "_" + tt.primaryMetricValueField } - verifySimpleTelemetry(assert, m, tt.primaryMetricValueField, expectedTelemetryName, metricTelemetry) + verifySimpleTelemetry(t, m, tt.primaryMetricValueField, expectedTelemetryName, metricTelemetry) } - verifyAdditionalTelemetry(assert, m, transmitter, tt.additionalMetricValueFields, metricName) + verifyAdditionalTelemetry(t, m, transmitter, tt.additionalMetricValueFields, metricName) } t.Run(tt.name, tf) @@ -265,7 +258,6 @@ func TestTagsAppliedToTelemetry(t *testing.T) { for _, tt := range tests { tf := func(t *testing.T) { - assert := assert.New(t) now := time.Now().UTC() transmitter := new(mocks.Transmitter) @@ -286,15 +278,16 @@ func TestTagsAppliedToTelemetry(t *testing.T) { } err := ai.Connect() - assert.NoError(err) + require.NoError(t, err) mSet := []telegraf.Metric{m} - ai.Write(mSet) + err = ai.Write(mSet) + require.NoError(t, err) transmitter.AssertNumberOfCalls(t, "Track", len(tt.metricValueFields)) transmitter.AssertCalled(t, "Track", mock.AnythingOfType("*appinsights.MetricTelemetry")) // Will verify that all original tags are present in telemetry.Properties map - verifyAdditionalTelemetry(assert, m, transmitter, tt.metricValueFields, metricName) + verifyAdditionalTelemetry(t, m, transmitter, tt.metricValueFields, metricName) } t.Run(tt.name, tf) @@ -302,7 +295,6 @@ func TestTagsAppliedToTelemetry(t *testing.T) { } func TestContextTagsSetOnSimpleTelemetry(t *testing.T) { - assert := assert.New(t) now := time.Now().UTC() transmitter := new(mocks.Transmitter) @@ -327,19 +319,19 @@ func TestContextTagsSetOnSimpleTelemetry(t *testing.T) { } err := ai.Connect() - assert.NoError(err) + require.NoError(t, err) mSet := []telegraf.Metric{m} - ai.Write(mSet) + err = ai.Write(mSet) + require.NoError(t, err) transmitter.AssertNumberOfCalls(t, "Track", 1) metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.MetricTelemetry) cloudTags := metricTelemetry.Tags.Cloud() - assert.Equal("atcsvc", cloudTags.GetRole()) - assert.Equal("bunkie17554", cloudTags.GetRoleInstance()) + require.Equal(t, "atcsvc", cloudTags.GetRole()) + require.Equal(t, "bunkie17554", cloudTags.GetRoleInstance()) } func TestContextTagsSetOnAggregateTelemetry(t *testing.T) { - assert := assert.New(t) now := time.Now().UTC() transmitter := new(mocks.Transmitter) @@ -364,15 +356,16 @@ func TestContextTagsSetOnAggregateTelemetry(t *testing.T) { } err := ai.Connect() - assert.NoError(err) + require.NoError(t, err) mSet := []telegraf.Metric{m} - ai.Write(mSet) + err = ai.Write(mSet) + require.NoError(t, err) transmitter.AssertNumberOfCalls(t, "Track", 1) metricTelemetry := transmitter.Calls[0].Arguments.Get(0).(*appinsights.AggregateMetricTelemetry) cloudTags := metricTelemetry.Tags.Cloud() - assert.Equal("atcsvc", cloudTags.GetRole()) - assert.Equal("bunkie17554", cloudTags.GetRoleInstance()) + require.Equal(t, "atcsvc", cloudTags.GetRole()) + require.Equal(t, "bunkie17554", cloudTags.GetRoleInstance()) } func closed() <-chan struct{} { @@ -387,49 +380,49 @@ func unfinished() <-chan struct{} { } func verifyAggregateTelemetry( - assert *assert.Assertions, - metric telegraf.Metric, + t *testing.T, + m telegraf.Metric, valueField string, countField string, telemetry *appinsights.AggregateMetricTelemetry, ) { verifyAggregateField := func(fieldName string, telemetryValue float64) { - metricRawFieldValue, found := metric.Fields()[fieldName] + metricRawFieldValue, found := m.Fields()[fieldName] if !found { return } if _, err := toFloat64(metricRawFieldValue); err == nil { - assert.EqualValues(metricRawFieldValue, telemetryValue, "Telemetry property %s does not match the metric field", fieldName) + require.EqualValues(t, metricRawFieldValue, telemetryValue, "Telemetry property %s does not match the metric field", fieldName) } } - assert.Equal(metric.Name(), telemetry.Name, "Telemetry name should be the same as metric name") - assert.EqualValues(metric.Fields()[valueField], telemetry.Value, "Telemetry value does not match metric value field") - assert.EqualValues(metric.Fields()[countField], telemetry.Count, "Telemetry sample count does not mach metric sample count field") + require.Equal(t, m.Name(), telemetry.Name, "Telemetry name should be the same as metric name") + require.EqualValues(t, m.Fields()[valueField], telemetry.Value, "Telemetry value does not match metric value field") + require.EqualValues(t, m.Fields()[countField], telemetry.Count, "Telemetry sample count does not mach metric sample count field") verifyAggregateField("min", telemetry.Min) verifyAggregateField("max", telemetry.Max) verifyAggregateField("stdev", telemetry.StdDev) verifyAggregateField("variance", telemetry.Variance) - assert.Equal(metric.Time(), telemetry.Timestamp, "Telemetry and metric timestamps do not match") - assertMapContains(assert, metric.Tags(), telemetry.Properties) + require.Equal(t, m.Time(), telemetry.Timestamp, "Telemetry and metric timestamps do not match") + assertMapContains(t, m.Tags(), telemetry.Properties) } func verifySimpleTelemetry( - assert *assert.Assertions, - metric telegraf.Metric, + t *testing.T, + m telegraf.Metric, valueField string, expectedTelemetryName string, telemetry *appinsights.MetricTelemetry, ) { - assert.Equal(expectedTelemetryName, telemetry.Name, "Telemetry name is not what was expected") - assert.EqualValues(metric.Fields()[valueField], telemetry.Value, "Telemetry value does not match metric value field") - assert.Equal(metric.Time(), telemetry.Timestamp, "Telemetry and metric timestamps do not match") - assertMapContains(assert, metric.Tags(), telemetry.Properties) + require.Equal(t, expectedTelemetryName, telemetry.Name, "Telemetry name is not what was expected") + require.EqualValues(t, m.Fields()[valueField], telemetry.Value, "Telemetry value does not match metric value field") + require.Equal(t, m.Time(), telemetry.Timestamp, "Telemetry and metric timestamps do not match") + assertMapContains(t, m.Tags(), telemetry.Properties) } func verifyAdditionalTelemetry( - assert *assert.Assertions, - metric telegraf.Metric, + t *testing.T, + m telegraf.Metric, transmitter *mocks.Transmitter, additionalMetricValueFields []string, telemetryNamePrefix string, @@ -437,9 +430,9 @@ func verifyAdditionalTelemetry( for _, fieldName := range additionalMetricValueFields { expectedTelemetryName := telemetryNamePrefix + "_" + fieldName telemetry := findTransmittedTelemetry(transmitter, expectedTelemetryName) - assert.NotNil(telemetry, "Expected telemetry named %s to be created, but could not find it", expectedTelemetryName) + require.NotNil(t, telemetry, "Expected telemetry named %s to be created, but could not find it", expectedTelemetryName) if telemetry != nil { - verifySimpleTelemetry(assert, metric, fieldName, expectedTelemetryName, telemetry) + verifySimpleTelemetry(t, m, fieldName, expectedTelemetryName, telemetry) } } } @@ -455,17 +448,17 @@ func findTransmittedTelemetry(transmitter *mocks.Transmitter, telemetryName stri return nil } -func assertMapContains(assert *assert.Assertions, expected, actual map[string]string) { +func assertMapContains(t *testing.T, expected, actual map[string]string) { if expected == nil && actual == nil { return } - assert.NotNil(expected, "Maps not equal: expected is nil but actual is not") - assert.NotNil(actual, "Maps not equal: actual is nil but expected is not") + require.NotNil(t, expected, "Maps not equal: expected is nil but actual is not") + require.NotNil(t, actual, "Maps not equal: actual is nil but expected is not") for k, v := range expected { av, ok := actual[k] - assert.True(ok, "Actual map does not contain a value for key '%s'", k) - assert.Equal(v, av, "The expected value for key '%s' is '%s' but the actual value is '%s", k, v, av) + require.True(t, ok, "Actual map does not contain a value for key '%s'", k) + require.Equal(t, v, av, "The expected value for key '%s' is '%s' but the actual value is '%s", k, v, av) } } diff --git a/plugins/outputs/azure_monitor/azure_monitor.go b/plugins/outputs/azure_monitor/azure_monitor.go index ca511a521..398be55cd 100644 --- a/plugins/outputs/azure_monitor/azure_monitor.go +++ b/plugins/outputs/azure_monitor/azure_monitor.go @@ -15,6 +15,7 @@ import ( "github.com/Azure/go-autorest/autorest" "github.com/Azure/go-autorest/autorest/azure/auth" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" @@ -208,7 +209,7 @@ func (a *AzureMonitor) Connect() error { } // vmMetadata retrieves metadata about the current Azure VM -func vmInstanceMetadata(c *http.Client) (string, string, error) { +func vmInstanceMetadata(c *http.Client) (region string, resourceID string, err error) { req, err := http.NewRequest("GET", vmInstanceMetadataURL, nil) if err != nil { return "", "", fmt.Errorf("error creating request: %v", err) @@ -235,8 +236,8 @@ func vmInstanceMetadata(c *http.Client) (string, string, error) { return "", "", err } - region := metadata.Compute.Location - resourceID := metadata.ResourceID() + region = metadata.Compute.Location + resourceID = metadata.ResourceID() return region, resourceID, nil } @@ -366,20 +367,20 @@ func (a *AzureMonitor) send(body []byte) error { func hashIDWithTagKeysOnly(m telegraf.Metric) uint64 { h := fnv.New64a() - h.Write([]byte(m.Name())) - h.Write([]byte("\n")) + h.Write([]byte(m.Name())) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" for _, tag := range m.TagList() { if tag.Key == "" || tag.Value == "" { continue } - h.Write([]byte(tag.Key)) - h.Write([]byte("\n")) + h.Write([]byte(tag.Key)) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" } b := make([]byte, binary.MaxVarintLen64) n := binary.PutUvarint(b, uint64(m.Time().UnixNano())) - h.Write(b[:n]) - h.Write([]byte("\n")) + h.Write(b[:n]) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" return h.Sum64() } @@ -573,10 +574,10 @@ func hashIDWithField(id uint64, fk string) uint64 { h := fnv.New64a() b := make([]byte, binary.MaxVarintLen64) n := binary.PutUvarint(b, id) - h.Write(b[:n]) - h.Write([]byte("\n")) - h.Write([]byte(fk)) - h.Write([]byte("\n")) + h.Write(b[:n]) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte(fk)) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" return h.Sum64() } diff --git a/plugins/outputs/cloud_pubsub/pubsub_test.go b/plugins/outputs/cloud_pubsub/pubsub_test.go index 967a33d74..e342acac4 100644 --- a/plugins/outputs/cloud_pubsub/pubsub_test.go +++ b/plugins/outputs/cloud_pubsub/pubsub_test.go @@ -1,16 +1,15 @@ package cloud_pubsub import ( + "encoding/base64" "testing" - "encoding/base64" - "cloud.google.com/go/pubsub" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" ) func TestPubSub_WriteSingle(t *testing.T) { @@ -51,8 +50,8 @@ func TestPubSub_WriteWithAttribute(t *testing.T) { for _, testM := range testMetrics { msg := verifyRawMetricPublished(t, testM.m, topic.published) - assert.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1") - assert.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2") + require.Equalf(t, "bar1", msg.Attributes["foo1"], "expected attribute foo1=bar1") + require.Equalf(t, "bar2", msg.Attributes["foo2"], "expected attribute foo2=bar2") } } @@ -74,7 +73,7 @@ func TestPubSub_WriteMultiple(t *testing.T) { for _, testM := range testMetrics { verifyRawMetricPublished(t, testM.m, topic.published) } - assert.Equalf(t, 1, topic.getBundleCount(), "unexpected bundle count") + require.Equalf(t, 1, topic.getBundleCount(), "unexpected bundle count") } func TestPubSub_WriteOverCountThreshold(t *testing.T) { @@ -98,7 +97,7 @@ func TestPubSub_WriteOverCountThreshold(t *testing.T) { for _, testM := range testMetrics { verifyRawMetricPublished(t, testM.m, topic.published) } - assert.Equalf(t, 2, topic.getBundleCount(), "unexpected bundle count") + require.Equalf(t, 2, topic.getBundleCount(), "unexpected bundle count") } func TestPubSub_WriteOverByteThreshold(t *testing.T) { @@ -121,7 +120,7 @@ func TestPubSub_WriteOverByteThreshold(t *testing.T) { for _, testM := range testMetrics { verifyRawMetricPublished(t, testM.m, topic.published) } - assert.Equalf(t, 2, topic.getBundleCount(), "unexpected bundle count") + require.Equalf(t, 2, topic.getBundleCount(), "unexpected bundle count") } func TestPubSub_WriteBase64Single(t *testing.T) { @@ -198,7 +197,7 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string if !ok { t.Fatalf("expected published metric to have a value") } - assert.Equal(t, v, publishedV, "incorrect published value") + require.Equal(t, v, publishedV, "incorrect published value") return psMsg } diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index 129f014bf..a48a3ee54 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -247,7 +247,7 @@ func (c *CloudWatch) WriteToCloudWatch(datums []types.MetricDatum) error { return err } -// Partition the MetricDatums into smaller slices of a max size so that are under the limit +// PartitionDatums partitions the MetricDatums into smaller slices of a max size so that are under the limit // for the AWS API calls. func PartitionDatums(size int, datums []types.MetricDatum) [][]types.MetricDatum { numberOfPartitions := len(datums) / size @@ -270,7 +270,7 @@ func PartitionDatums(size int, datums []types.MetricDatum) [][]types.MetricDatum return partitions } -// Make a MetricDatum from telegraf.Metric. It would check if all required fields of +// BuildMetricDatum makes a MetricDatum from telegraf.Metric. It would check if all required fields of // cloudwatch.StatisticSet are available. If so, it would build MetricDatum from statistic values. // Otherwise, fields would still been built independently. func BuildMetricDatum(buildStatistic bool, highResolutionMetrics bool, point telegraf.Metric) []types.MetricDatum { @@ -332,14 +332,14 @@ func BuildMetricDatum(buildStatistic bool, highResolutionMetrics bool, point tel return datums } -// Make a list of Dimensions by using a Point's tags. CloudWatch supports up to -// 10 dimensions per metric so we only keep up to the first 10 alphabetically. +// BuildDimensions makes a list of Dimensions by using a Point's tags. CloudWatch supports up to +// 10 dimensions per metric, so we only keep up to the first 10 alphabetically. // This always includes the "host" tag if it exists. func BuildDimensions(mTags map[string]string) []types.Dimension { - const MaxDimensions = 10 - dimensions := make([]types.Dimension, 0, MaxDimensions) + const maxDimensions = 10 + dimensions := make([]types.Dimension, 0, maxDimensions) - // This is pretty ugly but we always want to include the "host" tag if it exists. + // This is pretty ugly, but we always want to include the "host" tag if it exists. if host, ok := mTags["host"]; ok { dimensions = append(dimensions, types.Dimension{ Name: aws.String("host"), @@ -356,7 +356,7 @@ func BuildDimensions(mTags map[string]string) []types.Dimension { sort.Strings(keys) for _, k := range keys { - if len(dimensions) >= MaxDimensions { + if len(dimensions) >= maxDimensions { break } @@ -392,7 +392,8 @@ func getStatisticType(name string) (sType statisticType, fieldName string) { sType = statisticTypeNone fieldName = name } - return + + return sType, fieldName } func convert(v interface{}) (value float64, ok bool) { @@ -420,7 +421,7 @@ func convert(v interface{}) (value float64, ok bool) { default: // Skip unsupported type. ok = false - return + return value, ok } // Do CloudWatch boundary checking @@ -436,7 +437,7 @@ func convert(v interface{}) (value float64, ok bool) { return 0, false } - return + return value, ok } func init() { diff --git a/plugins/outputs/cloudwatch/cloudwatch_test.go b/plugins/outputs/cloudwatch/cloudwatch_test.go index df98381cf..b0f277c44 100644 --- a/plugins/outputs/cloudwatch/cloudwatch_test.go +++ b/plugins/outputs/cloudwatch/cloudwatch_test.go @@ -2,26 +2,23 @@ package cloudwatch import ( "fmt" - "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" "math" "sort" "testing" "time" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // Test that each tag becomes one dimension func TestBuildDimensions(t *testing.T) { - const MaxDimensions = 10 - - assert := assert.New(t) + const maxDimensions = 10 testPoint := testutil.TestMetric(1) dimensions := BuildDimensions(testPoint.Tags()) @@ -35,26 +32,24 @@ func TestBuildDimensions(t *testing.T) { sort.Strings(tagKeys) - if len(testPoint.Tags()) >= MaxDimensions { - assert.Equal(MaxDimensions, len(dimensions), "Number of dimensions should be less than MaxDimensions") + if len(testPoint.Tags()) >= maxDimensions { + require.Equal(t, maxDimensions, len(dimensions), "Number of dimensions should be less than MaxDimensions") } else { - assert.Equal(len(testPoint.Tags()), len(dimensions), "Number of dimensions should be equal to number of tags") + require.Equal(t, len(testPoint.Tags()), len(dimensions), "Number of dimensions should be equal to number of tags") } for i, key := range tagKeys { if i >= 10 { break } - assert.Equal(key, *dimensions[i].Name, "Key should be equal") - assert.Equal(testPoint.Tags()[key], *dimensions[i].Value, "Value should be equal") + require.Equal(t, key, *dimensions[i].Name, "Key should be equal") + require.Equal(t, testPoint.Tags()[key], *dimensions[i].Value, "Value should be equal") } } // Test that metrics with valid values have a MetricDatum created where as non valid do not. // Skips "time.Time" type as something is converting the value to string. func TestBuildMetricDatums(t *testing.T) { - assert := assert.New(t) - zero := 0.0 validMetrics := []telegraf.Metric{ testutil.TestMetric(1), @@ -75,11 +70,11 @@ func TestBuildMetricDatums(t *testing.T) { } for _, point := range validMetrics { datums := BuildMetricDatum(false, false, point) - assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point)) + require.Equal(t, 1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", point)) } for _, point := range invalidMetrics { datums := BuildMetricDatum(false, false, point) - assert.Equal(0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point)) + require.Equal(t, 0, len(datums), fmt.Sprintf("Valid point should not create a Datum {value: %v}", point)) } statisticMetric := metric.New( @@ -89,7 +84,7 @@ func TestBuildMetricDatums(t *testing.T) { time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), ) datums := BuildMetricDatum(true, false, statisticMetric) - assert.Equal(1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", statisticMetric)) + require.Equal(t, 1, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", statisticMetric)) multiFieldsMetric := metric.New( "test1", @@ -98,7 +93,7 @@ func TestBuildMetricDatums(t *testing.T) { time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), ) datums = BuildMetricDatum(true, false, multiFieldsMetric) - assert.Equal(4, len(datums), fmt.Sprintf("Each field should create a Datum {value: %v}", multiFieldsMetric)) + require.Equal(t, 4, len(datums), fmt.Sprintf("Each field should create a Datum {value: %v}", multiFieldsMetric)) multiStatisticMetric := metric.New( "test1", @@ -112,24 +107,22 @@ func TestBuildMetricDatums(t *testing.T) { time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), ) datums = BuildMetricDatum(true, false, multiStatisticMetric) - assert.Equal(7, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", multiStatisticMetric)) + require.Equal(t, 7, len(datums), fmt.Sprintf("Valid point should create a Datum {value: %v}", multiStatisticMetric)) } func TestMetricDatumResolution(t *testing.T) { const expectedStandardResolutionValue = int32(60) const expectedHighResolutionValue = int32(1) - assert := assert.New(t) + m := testutil.TestMetric(1) - metric := testutil.TestMetric(1) - - standardResolutionDatum := BuildMetricDatum(false, false, metric) + standardResolutionDatum := BuildMetricDatum(false, false, m) actualStandardResolutionValue := *standardResolutionDatum[0].StorageResolution - assert.Equal(expectedStandardResolutionValue, actualStandardResolutionValue) + require.Equal(t, expectedStandardResolutionValue, actualStandardResolutionValue) - highResolutionDatum := BuildMetricDatum(false, true, metric) + highResolutionDatum := BuildMetricDatum(false, true, m) actualHighResolutionValue := *highResolutionDatum[0].StorageResolution - assert.Equal(expectedHighResolutionValue, actualHighResolutionValue) + require.Equal(t, expectedHighResolutionValue, actualHighResolutionValue) } func TestBuildMetricDatums_SkipEmptyTags(t *testing.T) { @@ -150,8 +143,6 @@ func TestBuildMetricDatums_SkipEmptyTags(t *testing.T) { } func TestPartitionDatums(t *testing.T) { - assert := assert.New(t) - testDatum := types.MetricDatum{ MetricName: aws.String("Foo"), Value: aws.Float64(1), @@ -162,9 +153,9 @@ func TestPartitionDatums(t *testing.T) { twoDatum := []types.MetricDatum{testDatum, testDatum} threeDatum := []types.MetricDatum{testDatum, testDatum, testDatum} - assert.Equal([][]types.MetricDatum{}, PartitionDatums(2, zeroDatum)) - assert.Equal([][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum)) - assert.Equal([][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum)) - assert.Equal([][]types.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum)) - assert.Equal([][]types.MetricDatum{twoDatum, oneDatum}, PartitionDatums(2, threeDatum)) + require.Equal(t, [][]types.MetricDatum{}, PartitionDatums(2, zeroDatum)) + require.Equal(t, [][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum)) + require.Equal(t, [][]types.MetricDatum{oneDatum}, PartitionDatums(2, oneDatum)) + require.Equal(t, [][]types.MetricDatum{twoDatum}, PartitionDatums(2, twoDatum)) + require.Equal(t, [][]types.MetricDatum{twoDatum, oneDatum}, PartitionDatums(2, threeDatum)) } diff --git a/plugins/outputs/cloudwatch_logs/cloudwatch_logs_test.go b/plugins/outputs/cloudwatch_logs/cloudwatch_logs_test.go index e103eb53d..1263d665c 100644 --- a/plugins/outputs/cloudwatch_logs/cloudwatch_logs_test.go +++ b/plugins/outputs/cloudwatch_logs/cloudwatch_logs_test.go @@ -11,10 +11,11 @@ import ( cloudwatchlogsV2 "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" internalaws "github.com/influxdata/telegraf/config/aws" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) type mockCloudWatchLogs struct { @@ -57,9 +58,7 @@ func (c *mockCloudWatchLogs) PutLogEvents(_ context.Context, input *cloudwatchlo sequenceToken := "arbitraryToken" output := &cloudwatchlogsV2.PutLogEventsOutput{NextSequenceToken: &sequenceToken} //Saving messages - for _, event := range input.LogEvents { - c.pushedLogEvents = append(c.pushedLogEvents, event) - } + c.pushedLogEvents = append(c.pushedLogEvents, input.LogEvents...) return output, nil } diff --git a/plugins/outputs/cratedb/cratedb.go b/plugins/outputs/cratedb/cratedb.go index b56787114..40c8c2728 100644 --- a/plugins/outputs/cratedb/cratedb.go +++ b/plugins/outputs/cratedb/cratedb.go @@ -11,10 +11,11 @@ import ( "strings" "time" + _ "github.com/jackc/pgx/v4/stdlib" //to register stdlib from PostgreSQL Driver and Toolkit + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/outputs" - _ "github.com/jackc/pgx/v4/stdlib" //to register stdlib from PostgreSQL Driver and Toolkit ) const MaxInt64 = int64(^uint64(0) >> 1) @@ -47,7 +48,7 @@ func (c *CrateDB) Connect() error { if err != nil { return err } else if c.TableCreate { - sql := ` + query := ` CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( "hash_id" LONG INDEX OFF, "timestamp" TIMESTAMP, @@ -60,7 +61,7 @@ CREATE TABLE IF NOT EXISTS ` + c.Table + ` ( ` ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Timeout)) defer cancel() - if _, err := db.ExecContext(ctx, sql); err != nil { + if _, err := db.ExecContext(ctx, query); err != nil { return err } } @@ -106,10 +107,10 @@ func insertSQL(table string, keyReplacement string, metrics []telegraf.Metric) ( } rows[i] = `(` + strings.Join(escapedCols, ", ") + `)` } - sql := `INSERT INTO ` + table + ` ("hash_id", "timestamp", "name", "tags", "fields") + query := `INSERT INTO ` + table + ` ("hash_id", "timestamp", "name", "tags", "fields") VALUES ` + strings.Join(rows, " ,\n") + `;` - return sql, nil + return query, nil } // escapeValue returns a string version of val that is suitable for being used @@ -206,7 +207,7 @@ func escapeString(s string, quote string) string { // [1] https://github.com/influxdata/telegraf/pull/3210#discussion_r148411201 func hashID(m telegraf.Metric) int64 { h := sha512.New() - h.Write([]byte(m.Name())) + h.Write([]byte(m.Name())) //nolint:revive // from hash.go: "It never returns an error" tags := m.Tags() tmp := make([]string, len(tags)) i := 0 @@ -217,7 +218,7 @@ func hashID(m telegraf.Metric) int64 { sort.Strings(tmp) for _, s := range tmp { - h.Write([]byte(s)) + h.Write([]byte(s)) //nolint:revive // from hash.go: "It never returns an error" } sum := h.Sum(nil) diff --git a/plugins/outputs/datadog/datadog.go b/plugins/outputs/datadog/datadog.go index 47d8a4e91..6c89ab1e3 100644 --- a/plugins/outputs/datadog/datadog.go +++ b/plugins/outputs/datadog/datadog.go @@ -200,7 +200,7 @@ func (p *Point) setValue(v interface{}) error { case uint64: p[1] = float64(d) case float64: - p[1] = float64(d) + p[1] = d case bool: p[1] = float64(0) if d { diff --git a/plugins/outputs/datadog/datadog_test.go b/plugins/outputs/datadog/datadog_test.go index c893833b4..4c149bf60 100644 --- a/plugins/outputs/datadog/datadog_test.go +++ b/plugins/outputs/datadog/datadog_test.go @@ -10,10 +10,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) var ( @@ -36,6 +36,7 @@ func fakeDatadog() *Datadog { func TestUriOverride(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) + //nolint:errcheck,revive // Ignore the returned error as the test will fail anyway json.NewEncoder(w).Encode(`{"status":"ok"}`) })) defer ts.Close() @@ -51,6 +52,7 @@ func TestUriOverride(t *testing.T) { func TestBadStatusCode(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) + //nolint:errcheck,revive // Ignore the returned error as the test will fail anyway json.NewEncoder(w).Encode(`{ 'errors': [ 'Something bad happened to the server.', 'Your query made the server very sad.' @@ -75,7 +77,7 @@ func TestAuthenticatedUrl(t *testing.T) { d := fakeDatadog() authURL := d.authenticatedURL() - assert.EqualValues(t, fmt.Sprintf("%s?api_key=%s", fakeURL, fakeAPIKey), authURL) + require.EqualValues(t, fmt.Sprintf("%s?api_key=%s", fakeURL, fakeAPIKey), authURL) } func TestBuildTags(t *testing.T) { @@ -173,7 +175,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestMetric(bool(true), "test7"), + testutil.TestMetric(true, "test7"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 1.0, @@ -181,7 +183,7 @@ func TestBuildPoint(t *testing.T) { nil, }, { - testutil.TestMetric(bool(false), "test8"), + testutil.TestMetric(false, "test8"), Point{ float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 0.0, diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go index 8f57f4e12..235a8ee08 100644 --- a/plugins/outputs/elasticsearch/elasticsearch.go +++ b/plugins/outputs/elasticsearch/elasticsearch.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "log" "net/http" "strconv" "strings" @@ -12,12 +11,12 @@ import ( "time" "crypto/sha256" + "gopkg.in/olivere/elastic.v5" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" - "gopkg.in/olivere/elastic.v5" ) type Elasticsearch struct { @@ -36,6 +35,7 @@ type Elasticsearch struct { OverwriteTemplate bool ForceDocumentID bool `toml:"force_document_id"` MajorReleaseNumber int + Log telegraf.Logger `toml:"-"` tls.ClientConfig Client *elastic.Client @@ -174,7 +174,7 @@ type templatePart struct { func (a *Elasticsearch) Connect() error { if a.URLs == nil || a.IndexName == "" { - return fmt.Errorf("Elasticsearch urls or index_name is not defined") + return fmt.Errorf("elasticsearch urls or index_name is not defined") } ctx, cancel := context.WithTimeout(context.Background(), time.Duration(a.Timeout)) @@ -213,7 +213,7 @@ func (a *Elasticsearch) Connect() error { clientOptions = append(clientOptions, elastic.SetHealthcheck(false), ) - log.Printf("D! Elasticsearch output: disabling health check") + a.Log.Debugf("Disabling health check") } client, err := elastic.NewClient(clientOptions...) @@ -226,16 +226,16 @@ func (a *Elasticsearch) Connect() error { esVersion, err := client.ElasticsearchVersion(a.URLs[0]) if err != nil { - return fmt.Errorf("Elasticsearch version check failed: %s", err) + return fmt.Errorf("elasticsearch version check failed: %s", err) } // quit if ES version is not supported majorReleaseNumber, err := strconv.Atoi(strings.Split(esVersion, ".")[0]) if err != nil || majorReleaseNumber < 5 { - return fmt.Errorf("Elasticsearch version not supported: %s", esVersion) + return fmt.Errorf("elasticsearch version not supported: %s", esVersion) } - log.Println("I! Elasticsearch version: " + esVersion) + a.Log.Infof("Elasticsearch version: %q", esVersion) a.Client = client a.MajorReleaseNumber = majorReleaseNumber @@ -257,9 +257,9 @@ func GetPointID(m telegraf.Metric) string { var buffer bytes.Buffer //Timestamp(ns),measurement name and Series Hash for compute the final SHA256 based hash ID - buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10)) - buffer.WriteString(m.Name()) - buffer.WriteString(strconv.FormatUint(m.HashID(), 10)) + buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10)) //nolint:revive // from buffer.go: "err is always nil" + buffer.WriteString(m.Name()) //nolint:revive // from buffer.go: "err is always nil" + buffer.WriteString(strconv.FormatUint(m.HashID(), 10)) //nolint:revive // from buffer.go: "err is always nil" return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes())) } @@ -305,15 +305,15 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { res, err := bulkRequest.Do(ctx) if err != nil { - return fmt.Errorf("Error sending bulk request to Elasticsearch: %s", err) + return fmt.Errorf("error sending bulk request to Elasticsearch: %s", err) } if res.Errors { for id, err := range res.Failed() { - log.Printf("E! Elasticsearch indexing failure, id: %d, error: %s, caused by: %s, %s", id, err.Error.Reason, err.Error.CausedBy["reason"], err.Error.CausedBy["type"]) + a.Log.Errorf("Elasticsearch indexing failure, id: %d, error: %s, caused by: %s, %s", id, err.Error.Reason, err.Error.CausedBy["reason"], err.Error.CausedBy["type"]) break } - return fmt.Errorf("W! Elasticsearch failed to index %d metrics", len(res.Failed())) + return fmt.Errorf("elasticsearch failed to index %d metrics", len(res.Failed())) } return nil @@ -321,13 +321,13 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { func (a *Elasticsearch) manageTemplate(ctx context.Context) error { if a.TemplateName == "" { - return fmt.Errorf("Elasticsearch template_name configuration not defined") + return fmt.Errorf("elasticsearch template_name configuration not defined") } templateExists, errExists := a.Client.IndexTemplateExists(a.TemplateName).Do(ctx) if errExists != nil { - return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists) + return fmt.Errorf("elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists) } templatePattern := a.IndexName @@ -341,7 +341,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error { } if templatePattern == "" { - return fmt.Errorf("Template cannot be created for dynamic index names without an index prefix") + return fmt.Errorf("template cannot be created for dynamic index names without an index prefix") } if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") { @@ -353,16 +353,18 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error { t := template.Must(template.New("template").Parse(telegrafTemplate)) var tmpl bytes.Buffer - t.Execute(&tmpl, tp) + if err := t.Execute(&tmpl, tp); err != nil { + return err + } _, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl.String()).Do(ctx) if errCreateTemplate != nil { - return fmt.Errorf("Elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate) + return fmt.Errorf("elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate) } - log.Printf("D! Elasticsearch template %s created or updated\n", a.TemplateName) + a.Log.Debugf("Template %s created or updated\n", a.TemplateName) } else { - log.Println("D! Found existing Elasticsearch template. Skipping template management") + a.Log.Debug("Found existing Elasticsearch template. Skipping template management") } return nil } @@ -384,7 +386,7 @@ func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) { ) indexName = tagReplacer.Replace(indexName) - tagKeys = append(tagKeys, (strings.TrimSpace(tagName))) + tagKeys = append(tagKeys, strings.TrimSpace(tagName)) startTag = strings.Index(indexName, "{{") } @@ -413,7 +415,7 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagK if value, ok := metricTags[key]; ok { tagValues = append(tagValues, value) } else { - log.Printf("D! Tag '%s' not found, using '%s' on index name instead\n", key, a.DefaultTagValue) + a.Log.Debugf("Tag '%s' not found, using '%s' on index name instead\n", key, a.DefaultTagValue) tagValues = append(tagValues, a.DefaultTagValue) } } diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go index 7ad1e632c..ecfe03f2e 100644 --- a/plugins/outputs/elasticsearch/elasticsearch_test.go +++ b/plugins/outputs/elasticsearch/elasticsearch_test.go @@ -29,6 +29,7 @@ func TestConnectAndWriteIntegration(t *testing.T) { TemplateName: "telegraf", OverwriteTemplate: false, HealthCheckInterval: config.Duration(time.Second * 10), + Log: testutil.Logger{}, } // Verify that we can connect to Elasticsearch @@ -57,6 +58,7 @@ func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) { ManageTemplate: true, TemplateName: "", OverwriteTemplate: true, + Log: testutil.Logger{}, } err := e.manageTemplate(ctx) @@ -78,6 +80,7 @@ func TestTemplateManagementIntegration(t *testing.T) { ManageTemplate: true, TemplateName: "telegraf", OverwriteTemplate: true, + Log: testutil.Logger{}, } ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout)) @@ -105,6 +108,7 @@ func TestTemplateInvalidIndexPatternIntegration(t *testing.T) { ManageTemplate: true, TemplateName: "telegraf", OverwriteTemplate: true, + Log: testutil.Logger{}, } err := e.Connect() @@ -114,6 +118,7 @@ func TestTemplateInvalidIndexPatternIntegration(t *testing.T) { func TestGetTagKeys(t *testing.T) { e := &Elasticsearch{ DefaultTagValue: "none", + Log: testutil.Logger{}, } var tests = []struct { @@ -173,6 +178,7 @@ func TestGetTagKeys(t *testing.T) { func TestGetIndexName(t *testing.T) { e := &Elasticsearch{ DefaultTagValue: "none", + Log: testutil.Logger{}, } var tests = []struct { @@ -286,6 +292,7 @@ func TestRequestHeaderWhenGzipIsEnabled(t *testing.T) { Timeout: config.Duration(time.Second * 5), EnableGzip: true, ManageTemplate: false, + Log: testutil.Logger{}, } err := e.Connect() @@ -319,6 +326,7 @@ func TestRequestHeaderWhenGzipIsDisabled(t *testing.T) { Timeout: config.Duration(time.Second * 5), EnableGzip: false, ManageTemplate: false, + Log: testutil.Logger{}, } err := e.Connect() diff --git a/plugins/outputs/exec/exec.go b/plugins/outputs/exec/exec.go index b0313a382..68c61e1ca 100644 --- a/plugins/outputs/exec/exec.go +++ b/plugins/outputs/exec/exec.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "io" - "log" "os/exec" "runtime" "time" @@ -22,6 +21,7 @@ const maxStderrBytes = 512 type Exec struct { Command []string `toml:"command"` Timeout config.Duration `toml:"timeout"` + Log telegraf.Logger `toml:"-"` runner Runner serializer serializers.Serializer @@ -42,6 +42,8 @@ var sampleConfig = ` ` func (e *Exec) Init() error { + e.runner = &CommandRunner{log: e.Log} + return nil } @@ -77,7 +79,7 @@ func (e *Exec) Write(metrics []telegraf.Metric) error { if err != nil { return err } - buffer.Write(serializedMetrics) + buffer.Write(serializedMetrics) //nolint:revive // from buffer.go: "err is always nil" if buffer.Len() <= 0 { return nil @@ -94,6 +96,7 @@ type Runner interface { // CommandRunner runs a command with the ability to kill the process before the timeout. type CommandRunner struct { cmd *exec.Cmd + log telegraf.Logger } // Run runs the command. @@ -114,9 +117,9 @@ func (c *CommandRunner) Run(timeout time.Duration, command []string, buffer io.R s = removeWindowsCarriageReturns(s) if s.Len() > 0 { if !telegraf.Debug { - log.Printf("E! [outputs.exec] Command error: %q", c.truncate(s)) + c.log.Errorf("Command error: %q", c.truncate(s)) } else { - log.Printf("D! [outputs.exec] Command error: %q", s) + c.log.Debugf("Command error: %q", s) } } @@ -147,7 +150,7 @@ func (c *CommandRunner) truncate(buf bytes.Buffer) string { buf.Truncate(i) } if didTruncate { - buf.WriteString("...") + buf.WriteString("...") //nolint:revive // from buffer.go: "err is always nil" } return buf.String() } @@ -155,7 +158,6 @@ func (c *CommandRunner) truncate(buf bytes.Buffer) string { func init() { outputs.Add("exec", func() telegraf.Output { return &Exec{ - runner: &CommandRunner{}, Timeout: config.Duration(time.Second * 5), } }) diff --git a/plugins/outputs/exec/exec_test.go b/plugins/outputs/exec/exec_test.go index e75e1829d..40fac6327 100644 --- a/plugins/outputs/exec/exec_test.go +++ b/plugins/outputs/exec/exec_test.go @@ -6,11 +6,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) func TestExec(t *testing.T) { @@ -59,8 +60,7 @@ func TestExec(t *testing.T) { s, _ := serializers.NewInfluxSerializer() e.SetSerializer(s) - e.Connect() - + require.NoError(t, e.Connect()) require.Equal(t, tt.err, e.Write(tt.metrics) != nil) }) } diff --git a/plugins/outputs/execd/execd_test.go b/plugins/outputs/execd/execd_test.go index c14339d31..66bc28561 100644 --- a/plugins/outputs/execd/execd_test.go +++ b/plugins/outputs/execd/execd_test.go @@ -11,13 +11,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) var now = time.Date(2020, 6, 30, 16, 16, 0, 0, time.UTC) @@ -85,16 +86,20 @@ func runOutputConsumerProgram() { parser := influx.NewStreamParser(os.Stdin) for { - metric, err := parser.Next() + m, err := parser.Next() if err != nil { if err == influx.EOF { return // stream ended } if parseErr, isParseError := err.(*influx.ParseError); isParseError { + //nolint:errcheck,revive // Test will fail anyway fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) + //nolint:revive // error code is important for this "test" os.Exit(1) } + //nolint:errcheck,revive // Test will fail anyway fmt.Fprintf(os.Stderr, "ERR %v\n", err) + //nolint:revive // error code is important for this "test" os.Exit(1) } @@ -104,8 +109,10 @@ func runOutputConsumerProgram() { now, ) - if !testutil.MetricEqual(expected, metric) { + if !testutil.MetricEqual(expected, m) { + //nolint:errcheck,revive // Test will fail anyway fmt.Fprintf(os.Stderr, "metric doesn't match expected\n") + //nolint:revive // error code is important for this "test" os.Exit(1) } } diff --git a/plugins/outputs/file/file_test.go b/plugins/outputs/file/file_test.go index 5fcdc5119..36706c4dc 100644 --- a/plugins/outputs/file/file_test.go +++ b/plugins/outputs/file/file_test.go @@ -6,7 +6,7 @@ import ( "os" "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/serializers" @@ -20,7 +20,7 @@ const ( ) func TestFileExistingFile(t *testing.T) { - fh := createFile() + fh := createFile(t) defer os.Remove(fh.Name()) s, _ := serializers.NewInfluxSerializer() f := File{ @@ -29,20 +29,20 @@ func TestFileExistingFile(t *testing.T) { } err := f.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = f.Write(testutil.MockMetrics()) - assert.NoError(t, err) + require.NoError(t, err) - validateFile(fh.Name(), expExistFile, t) + validateFile(t, fh.Name(), expExistFile) err = f.Close() - assert.NoError(t, err) + require.NoError(t, err) } func TestFileNewFile(t *testing.T) { s, _ := serializers.NewInfluxSerializer() - fh := tmpFile() + fh := tmpFile(t) defer os.Remove(fh) f := File{ Files: []string{fh}, @@ -50,23 +50,23 @@ func TestFileNewFile(t *testing.T) { } err := f.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = f.Write(testutil.MockMetrics()) - assert.NoError(t, err) + require.NoError(t, err) - validateFile(fh, expNewFile, t) + validateFile(t, fh, expNewFile) err = f.Close() - assert.NoError(t, err) + require.NoError(t, err) } func TestFileExistingFiles(t *testing.T) { - fh1 := createFile() + fh1 := createFile(t) defer os.Remove(fh1.Name()) - fh2 := createFile() + fh2 := createFile(t) defer os.Remove(fh2.Name()) - fh3 := createFile() + fh3 := createFile(t) defer os.Remove(fh3.Name()) s, _ := serializers.NewInfluxSerializer() @@ -76,26 +76,26 @@ func TestFileExistingFiles(t *testing.T) { } err := f.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = f.Write(testutil.MockMetrics()) - assert.NoError(t, err) + require.NoError(t, err) - validateFile(fh1.Name(), expExistFile, t) - validateFile(fh2.Name(), expExistFile, t) - validateFile(fh3.Name(), expExistFile, t) + validateFile(t, fh1.Name(), expExistFile) + validateFile(t, fh2.Name(), expExistFile) + validateFile(t, fh3.Name(), expExistFile) err = f.Close() - assert.NoError(t, err) + require.NoError(t, err) } func TestFileNewFiles(t *testing.T) { s, _ := serializers.NewInfluxSerializer() - fh1 := tmpFile() + fh1 := tmpFile(t) defer os.Remove(fh1) - fh2 := tmpFile() + fh2 := tmpFile(t) defer os.Remove(fh2) - fh3 := tmpFile() + fh3 := tmpFile(t) defer os.Remove(fh3) f := File{ Files: []string{fh1, fh2, fh3}, @@ -103,23 +103,23 @@ func TestFileNewFiles(t *testing.T) { } err := f.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = f.Write(testutil.MockMetrics()) - assert.NoError(t, err) + require.NoError(t, err) - validateFile(fh1, expNewFile, t) - validateFile(fh2, expNewFile, t) - validateFile(fh3, expNewFile, t) + validateFile(t, fh1, expNewFile) + validateFile(t, fh2, expNewFile) + validateFile(t, fh3, expNewFile) err = f.Close() - assert.NoError(t, err) + require.NoError(t, err) } func TestFileBoth(t *testing.T) { - fh1 := createFile() + fh1 := createFile(t) defer os.Remove(fh1.Name()) - fh2 := tmpFile() + fh2 := tmpFile(t) defer os.Remove(fh2) s, _ := serializers.NewInfluxSerializer() @@ -129,16 +129,16 @@ func TestFileBoth(t *testing.T) { } err := f.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = f.Write(testutil.MockMetrics()) - assert.NoError(t, err) + require.NoError(t, err) - validateFile(fh1.Name(), expExistFile, t) - validateFile(fh2, expNewFile, t) + validateFile(t, fh1.Name(), expExistFile) + validateFile(t, fh2, expNewFile) err = f.Close() - assert.NoError(t, err) + require.NoError(t, err) } func TestFileStdout(t *testing.T) { @@ -154,52 +154,52 @@ func TestFileStdout(t *testing.T) { } err := f.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = f.Write(testutil.MockMetrics()) - assert.NoError(t, err) + require.NoError(t, err) err = f.Close() - assert.NoError(t, err) + require.NoError(t, err) outC := make(chan string) // copy the output in a separate goroutine so printing can't block indefinitely go func() { var buf bytes.Buffer - io.Copy(&buf, r) + _, err := io.Copy(&buf, r) + require.NoError(t, err) outC <- buf.String() }() // back to normal state - w.Close() + err = w.Close() + require.NoError(t, err) + // restoring the real stdout os.Stdout = old out := <-outC - assert.Equal(t, expNewFile, out) + require.Equal(t, expNewFile, out) } -func createFile() *os.File { +func createFile(t *testing.T) *os.File { f, err := os.CreateTemp("", "") - if err != nil { - panic(err) - } - f.WriteString("cpu,cpu=cpu0 value=100 1455312810012459582\n") + require.NoError(t, err) + + _, err = f.WriteString("cpu,cpu=cpu0 value=100 1455312810012459582\n") + require.NoError(t, err) return f } -func tmpFile() string { +func tmpFile(t *testing.T) string { d, err := os.MkdirTemp("", "") - if err != nil { - panic(err) - } + require.NoError(t, err) + return d + internal.RandomString(10) } -func validateFile(fname, expS string, t *testing.T) { - buf, err := os.ReadFile(fname) - if err != nil { - panic(err) - } - assert.Equal(t, expS, string(buf)) +func validateFile(t *testing.T, fileName, expS string) { + buf, err := os.ReadFile(fileName) + require.NoError(t, err) + require.Equal(t, expS, string(buf)) }