diff --git a/plugins/outputs/amqp/amqp.go b/plugins/outputs/amqp/amqp.go index 0e486695f..06dcc6fb9 100644 --- a/plugins/outputs/amqp/amqp.go +++ b/plugins/outputs/amqp/amqp.go @@ -300,7 +300,7 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) { } func connect(clientConfig *ClientConfig) (Client, error) { - return Connect(clientConfig) + return newClient(clientConfig) } func init() { diff --git a/plugins/outputs/amqp/client.go b/plugins/outputs/amqp/client.go index 4e6067fb2..6bcbff6c6 100644 --- a/plugins/outputs/amqp/client.go +++ b/plugins/outputs/amqp/client.go @@ -35,8 +35,8 @@ type client struct { config *ClientConfig } -// Connect opens a connection to one of the brokers at random -func Connect(config *ClientConfig) (*client, error) { +// newClient opens a connection to one of the brokers at random +func newClient(config *ClientConfig) (*client, error) { client := &client{ config: config, } diff --git a/plugins/outputs/cloudwatch/cloudwatch.go b/plugins/outputs/cloudwatch/cloudwatch.go index 24ac44d4a..8b06526bd 100644 --- a/plugins/outputs/cloudwatch/cloudwatch.go +++ b/plugins/outputs/cloudwatch/cloudwatch.go @@ -160,7 +160,6 @@ func (*CloudWatch) SampleConfig() string { } func (c *CloudWatch) Connect() error { - cfg, err := c.CredentialConfig.Credentials() if err != nil { diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 036bd271f..6ca5755b4 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -4,7 +4,6 @@ package kafka import ( _ "embed" "fmt" - "log" "strings" "time" @@ -68,22 +67,21 @@ type TopicSuffix struct { // DebugLogger logs messages from sarama at the debug level. type DebugLogger struct { + Log telegraf.Logger } -func (*DebugLogger) Print(v ...interface{}) { +func (l *DebugLogger) Print(v ...interface{}) { args := make([]interface{}, 0, len(v)+1) - args = append(append(args, "D! [sarama] "), v...) - log.Print(args...) + args = append(append(args, "[sarama] "), v...) + l.Log.Debug(args...) } -func (*DebugLogger) Printf(format string, v ...interface{}) { - log.Printf("D! [sarama] "+format, v...) +func (l *DebugLogger) Printf(format string, v ...interface{}) { + l.Log.Debugf("[sarama] "+format, v...) } -func (*DebugLogger) Println(v ...interface{}) { - args := make([]interface{}, 0, len(v)+1) - args = append(append(args, "D! [sarama] "), v...) - log.Println(args...) +func (l *DebugLogger) Println(v ...interface{}) { + l.Print(v) } func ValidateTopicSuffixMethod(method string) error { @@ -140,6 +138,8 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) { } func (k *Kafka) Init() error { + sarama.Logger = &DebugLogger{Log: k.Log} + err := ValidateTopicSuffixMethod(k.TopicSuffix.Method) if err != nil { return err @@ -259,7 +259,6 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { } func init() { - sarama.Logger = &DebugLogger{} outputs.Add("kafka", func() telegraf.Output { return &Kafka{ WriteConfig: kafka.WriteConfig{ diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index f6f10a558..ca30f34c9 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -80,6 +80,7 @@ func TestConnectAndWriteIntegration(t *testing.T) { k := &Kafka{ Brokers: brokers, Topic: "Test", + Log: testutil.Logger{}, serializer: s, producerFunc: sarama.NewSyncProducer, } @@ -133,6 +134,7 @@ func TestTopicSuffixes(t *testing.T) { k := &Kafka{ Topic: topic, TopicSuffix: topicSuffix, + Log: testutil.Logger{}, } _, topic := k.GetTopicName(m) @@ -200,6 +202,7 @@ func TestRoutingKey(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + tt.kafka.Log = testutil.Logger{} key, err := tt.kafka.routingKey(tt.metric) require.NoError(t, err) tt.check(t, key) @@ -328,6 +331,8 @@ func TestTopicTag(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + tt.plugin.Log = testutil.Logger{} + s, err := serializers.NewInfluxSerializer() require.NoError(t, err) tt.plugin.SetSerializer(s) diff --git a/plugins/outputs/logzio/logzio.go b/plugins/outputs/logzio/logzio.go index 285cd5541..a8bc23908 100644 --- a/plugins/outputs/logzio/logzio.go +++ b/plugins/outputs/logzio/logzio.go @@ -22,9 +22,7 @@ var sampleConfig string const ( defaultLogzioURL = "https://listener.logz.io:8071" - - logzioDescription = "Send aggregate metrics to Logz.io" - logzioType = "telegraf" + logzioType = "telegraf" ) type Logzio struct { diff --git a/plugins/outputs/opentelemetry/opentelemetry.go b/plugins/outputs/opentelemetry/opentelemetry.go index 7a811b148..f509f85ce 100644 --- a/plugins/outputs/opentelemetry/opentelemetry.go +++ b/plugins/outputs/opentelemetry/opentelemetry.go @@ -12,6 +12,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + + // Blank import to allow gzip encoding _ "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/metadata" diff --git a/plugins/outputs/opentsdb/opentsdb.go b/plugins/outputs/opentsdb/opentsdb.go index 35dc925e5..56a26dd66 100644 --- a/plugins/outputs/opentsdb/opentsdb.go +++ b/plugins/outputs/opentsdb/opentsdb.go @@ -114,6 +114,7 @@ func (o *OpenTSDB) WriteHTTP(metrics []telegraf.Metric, u *url.URL) error { BatchSize: o.HTTPBatchSize, Path: o.HTTPPath, Debug: o.Debug, + log: o.Log, } for _, m := range metrics { @@ -218,7 +219,7 @@ func buildValue(v interface{}) (string, error) { case uint64: retv = UIntToString(p) case float64: - retv = FloatToString(float64(p)) + retv = FloatToString(p) default: return retv, fmt.Errorf("unexpected type %T with value %v for OpenTSDB", v, v) } diff --git a/plugins/outputs/opentsdb/opentsdb_http.go b/plugins/outputs/opentsdb/opentsdb_http.go index 582a9bb85..adc4e8fb7 100644 --- a/plugins/outputs/opentsdb/opentsdb_http.go +++ b/plugins/outputs/opentsdb/opentsdb_http.go @@ -6,10 +6,11 @@ import ( "encoding/json" "fmt" "io" - "log" "net/http" "net/http/httputil" "net/url" + + "github.com/influxdata/telegraf" ) type HTTPMetric struct { @@ -28,6 +29,8 @@ type openTSDBHttp struct { Path string Debug bool + log telegraf.Logger + metricCounter int body requestBody } @@ -62,18 +65,20 @@ func (r *requestBody) reset(debug bool) { r.enc = json.NewEncoder(r.w) - io.WriteString(r.w, "[") + _, _ = io.WriteString(r.w, "[") r.empty = true } func (r *requestBody) addMetric(metric *HTTPMetric) error { if !r.empty { - io.WriteString(r.w, ",") + if _, err := io.WriteString(r.w, ","); err != nil { + return err + } } if err := r.enc.Encode(metric); err != nil { - return fmt.Errorf("Metric serialization error %s", err.Error()) + return fmt.Errorf("metric serialization error %w", err) } r.empty = false @@ -82,10 +87,12 @@ func (r *requestBody) addMetric(metric *HTTPMetric) error { } func (r *requestBody) close() error { - io.WriteString(r.w, "]") + if _, err := io.WriteString(r.w, "]"); err != nil { + return err + } if err := r.g.Close(); err != nil { - return fmt.Errorf("Error when closing gzip writer: %s", err.Error()) + return fmt.Errorf("error when closing gzip writer: %w", err) } return nil @@ -117,7 +124,9 @@ func (o *openTSDBHttp) flush() error { return nil } - o.body.close() + if err := o.body.close(); err != nil { + return err + } u := url.URL{ Scheme: o.Scheme, @@ -132,7 +141,7 @@ func (o *openTSDBHttp) flush() error { req, err := http.NewRequest("POST", u.String(), &o.body.b) if err != nil { - return fmt.Errorf("Error when building request: %s", err.Error()) + return fmt.Errorf("error when building request: %w", err) } req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Encoding", "gzip") @@ -140,7 +149,7 @@ func (o *openTSDBHttp) flush() error { if o.Debug { dump, err := httputil.DumpRequestOut(req, false) if err != nil { - return fmt.Errorf("Error when dumping request: %s", err.Error()) + return fmt.Errorf("error when dumping request: %w", err) } fmt.Printf("Sending metrics:\n%s", dump) @@ -149,14 +158,14 @@ func (o *openTSDBHttp) flush() error { resp, err := http.DefaultClient.Do(req) if err != nil { - return fmt.Errorf("Error when sending metrics: %s", err.Error()) + return fmt.Errorf("error when sending metrics: %w", err) } defer resp.Body.Close() if o.Debug { dump, err := httputil.DumpResponse(resp, true) if err != nil { - return fmt.Errorf("Error when dumping response: %s", err.Error()) + return fmt.Errorf("error when dumping response: %w", err) } fmt.Printf("Received response\n%s\n\n", dump) @@ -165,14 +174,11 @@ func (o *openTSDBHttp) flush() error { _, _ = io.Copy(io.Discard, resp.Body) } - if resp.StatusCode/100 != 2 { - if resp.StatusCode/100 == 4 { - log.Printf("E! Received %d status code. Dropping metrics to avoid overflowing buffer.", - resp.StatusCode) - } else { - return fmt.Errorf("Error when sending metrics. Received status %d", - resp.StatusCode) + if resp.StatusCode < 200 || resp.StatusCode > 299 { + if resp.StatusCode < 400 || resp.StatusCode > 499 { + return fmt.Errorf("error sending metrics (status %d)", resp.StatusCode) } + o.log.Errorf("Received %d status code. Dropping metrics to avoid overflowing buffer.", resp.StatusCode) } return nil diff --git a/plugins/outputs/opentsdb/opentsdb_test.go b/plugins/outputs/opentsdb/opentsdb_test.go index 16a3c347a..f75711ed3 100644 --- a/plugins/outputs/opentsdb/opentsdb_test.go +++ b/plugins/outputs/opentsdb/opentsdb_test.go @@ -126,16 +126,16 @@ func TestSanitize(t *testing.T) { } func BenchmarkHttpSend(b *testing.B) { - const BatchSize = 50 - const MetricsCount = 4 * BatchSize - metrics := make([]telegraf.Metric, MetricsCount) - for i := 0; i < MetricsCount; i++ { + const batchSize = 50 + const metricsCount = 4 * batchSize + metrics := make([]telegraf.Metric, metricsCount) + for i := 0; i < metricsCount; i++ { metrics[i] = testutil.TestMetric(1.0) } ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - fmt.Fprintln(w, "{}") + _, _ = fmt.Fprintln(w, "{}") })) defer ts.Close() @@ -155,13 +155,13 @@ func BenchmarkHttpSend(b *testing.B) { Host: ts.URL, Port: port, Prefix: "", - HTTPBatchSize: BatchSize, + HTTPBatchSize: batchSize, HTTPPath: "/api/put", } b.ResetTimer() for i := 0; i < b.N; i++ { - o.Write(metrics) + _ = o.Write(metrics) } } func TestWriteIntegration(t *testing.T) { diff --git a/plugins/outputs/prometheus_client/v1/collector.go b/plugins/outputs/prometheus_client/v1/collector.go index a77f94d9f..95ce62504 100644 --- a/plugins/outputs/prometheus_client/v1/collector.go +++ b/plugins/outputs/prometheus_client/v1/collector.go @@ -212,14 +212,16 @@ func (c *Collector) Add(metrics []telegraf.Metric) error { // fields to labels if enabled. if c.StringAsLabel { for fn, fv := range point.Fields() { - switch fv := fv.(type) { - case string: - name, ok := serializer.SanitizeLabelName(fn) - if !ok { - continue - } - labels[name] = fv + sfv, ok := fv.(string) + if !ok { + continue } + + name, ok := serializer.SanitizeLabelName(fn) + if !ok { + continue + } + labels[name] = sfv } } diff --git a/plugins/outputs/stackdriver/stackdriver_test.go b/plugins/outputs/stackdriver/stackdriver_test.go index b963a3482..f01776929 100644 --- a/plugins/outputs/stackdriver/stackdriver_test.go +++ b/plugins/outputs/stackdriver/stackdriver_test.go @@ -17,6 +17,7 @@ import ( metricpb "google.golang.org/genproto/googleapis/api/metric" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" @@ -72,7 +73,8 @@ func TestMain(m *testing.M) { //nolint:errcheck,revive go serv.Serve(lis) - conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) + opt := grpc.WithTransportCredentials(insecure.NewCredentials()) + conn, err := grpc.Dial(lis.Addr().String(), opt) if err != nil { log.Fatal(err) } diff --git a/plugins/outputs/timestream/timestream.go b/plugins/outputs/timestream/timestream.go index 805f9bb1b..e8f6f6096 100644 --- a/plugins/outputs/timestream/timestream.go +++ b/plugins/outputs/timestream/timestream.go @@ -70,7 +70,6 @@ const MaxWriteRoutinesDefault = 1 // WriteFactory function provides a way to mock the client instantiation for testing purposes. var WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { - awsCreds, awsErr := credentialConfig.Credentials() if awsErr != nil { panic("Unable to load credentials config " + awsErr.Error()) diff --git a/plugins/outputs/timestream/timestream_test.go b/plugins/outputs/timestream/timestream_test.go index 36d820e03..e070eb3d7 100644 --- a/plugins/outputs/timestream/timestream_test.go +++ b/plugins/outputs/timestream/timestream_test.go @@ -361,7 +361,6 @@ func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) { } func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *testing.T) { - input1 := testutil.MustMetric( metricName1, map[string]string{"tag1": "value1"},