diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index 9c54c2dad..795163b4f 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -10,6 +10,10 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" @@ -17,8 +21,6 @@ import ( "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v1" "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v2" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" ) var ( @@ -121,9 +123,15 @@ func (p *PrometheusClient) Init() error { for collector := range defaultCollectors { switch collector { case "gocollector": - registry.Register(prometheus.NewGoCollector()) + err := registry.Register(collectors.NewGoCollector()) + if err != nil { + return err + } case "process": - registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + err := registry.Register(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + if err != nil { + return err + } default: return fmt.Errorf("unrecognized collector %s", collector) } @@ -160,7 +168,10 @@ func (p *PrometheusClient) Init() error { rangeHandler := internal.IPRangeHandler(ipRange, onError) promHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}) landingPageHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("Telegraf Output Plugin: Prometheus Client ")) + _, err := w.Write([]byte("Telegraf Output Plugin: Prometheus Client ")) + if err != nil { + p.Log.Errorf("Error occurred when writing HTTP reply: %v", err) + } }) mux := http.NewServeMux() @@ -229,7 +240,7 @@ func onError(rw http.ResponseWriter, code int) { http.Error(rw, http.StatusText(code), code) } -// Address returns the address the plugin is listening on. If not listening +// URL returns the address the plugin is listening on. If not listening // an empty string is returned. func (p *PrometheusClient) URL() string { if p.url != nil { diff --git a/plugins/outputs/prometheus_client/prometheus_client_v1_test.go b/plugins/outputs/prometheus_client/prometheus_client_v1_test.go index 95fa97fb6..7a93a9bc5 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_v1_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_v1_test.go @@ -10,14 +10,15 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) func TestMetricVersion1(t *testing.T) { - Logger := testutil.Logger{Name: "outputs.prometheus_client"} + logger := testutil.Logger{Name: "outputs.prometheus_client"} tests := []struct { name string output *PrometheusClient @@ -31,7 +32,7 @@ func TestMetricVersion1(t *testing.T) { MetricVersion: 1, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -58,7 +59,7 @@ cpu_time_idle{host="example.org"} 42 MetricVersion: 1, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -85,7 +86,7 @@ cpu_time_idle{host="example.org"} 42 MetricVersion: 1, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -114,7 +115,7 @@ cpu_time_idle{host="example.org"} 42 CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", StringAsLabel: true, - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -141,7 +142,7 @@ cpu_time_idle{host_name="example.org"} 42 MetricVersion: 1, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -169,7 +170,7 @@ cpu_time_idle{host="example.org"} 42 MetricVersion: 1, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -209,7 +210,7 @@ http_request_duration_seconds_count 144320 MetricVersion: 1, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -272,7 +273,7 @@ rpc_duration_seconds_count 2693 } func TestRoundTripMetricVersion1(t *testing.T) { - Logger := testutil.Logger{Name: "outputs.prometheus_client"} + logger := testutil.Logger{Name: "outputs.prometheus_client"} tests := []struct { name string data []byte @@ -348,17 +349,18 @@ rpc_duration_seconds_count 2693 ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() - url := fmt.Sprintf("http://%s", ts.Listener.Addr()) + address := fmt.Sprintf("http://%s", ts.Listener.Addr()) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) - w.Write(tt.data) + _, err := w.Write(tt.data) + require.NoError(t, err) }) input := &inputs.Prometheus{ - URLs: []string{url}, + URLs: []string{address}, URLTag: "", MetricVersion: 1, } @@ -375,7 +377,7 @@ rpc_duration_seconds_count 2693 Listen: "127.0.0.1:0", Path: defaultPath, MetricVersion: 1, - Log: Logger, + Log: logger, CollectorsExclude: []string{"gocollector", "process"}, } err = output.Init() @@ -391,6 +393,7 @@ rpc_duration_seconds_count 2693 resp, err := http.Get(output.URL()) require.NoError(t, err) + defer resp.Body.Close() actual, err := io.ReadAll(resp.Body) require.NoError(t, err) @@ -403,12 +406,12 @@ rpc_duration_seconds_count 2693 } func TestLandingPage(t *testing.T) { - Logger := testutil.Logger{Name: "outputs.prometheus_client"} + logger := testutil.Logger{Name: "outputs.prometheus_client"} output := PrometheusClient{ Listen: ":0", CollectorsExclude: []string{"process"}, MetricVersion: 1, - Log: Logger, + Log: logger, } expected := "Telegraf Output Plugin: Prometheus Client" @@ -419,8 +422,11 @@ func TestLandingPage(t *testing.T) { require.NoError(t, err) u, err := url.Parse(fmt.Sprintf("http://%s/", output.url.Host)) + require.NoError(t, err) + resp, err := http.Get(u.String()) require.NoError(t, err) + defer resp.Body.Close() actual, err := io.ReadAll(resp.Body) require.NoError(t, err) diff --git a/plugins/outputs/prometheus_client/prometheus_client_v2_test.go b/plugins/outputs/prometheus_client/prometheus_client_v2_test.go index c5ff76d40..2096caf6d 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_v2_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_v2_test.go @@ -9,14 +9,15 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) func TestMetricVersion2(t *testing.T) { - Logger := testutil.Logger{Name: "outputs.prometheus_client"} + logger := testutil.Logger{Name: "outputs.prometheus_client"} tests := []struct { name string output *PrometheusClient @@ -30,7 +31,7 @@ func TestMetricVersion2(t *testing.T) { MetricVersion: 2, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -57,7 +58,7 @@ cpu_time_idle{host="example.org"} 42 MetricVersion: 2, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -86,7 +87,7 @@ rpc_duration_seconds_count 2693 CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", ExportTimestamp: true, - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -114,7 +115,7 @@ cpu_time_idle{host="example.org"} 42 0 CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", StringAsLabel: true, - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -141,7 +142,7 @@ cpu_time_idle{host="example.org"} 42 CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", StringAsLabel: false, - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -167,7 +168,7 @@ cpu_time_idle 42 MetricVersion: 2, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -194,7 +195,7 @@ cpu_time_idle{host="example.org"} 42 MetricVersion: 2, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -276,7 +277,7 @@ cpu_usage_idle_count{cpu="cpu1"} 20 MetricVersion: 2, CollectorsExclude: []string{"gocollector", "process"}, Path: "/metrics", - Log: Logger, + Log: logger, }, metrics: []telegraf.Metric{ testutil.MustMetric( @@ -332,7 +333,7 @@ cpu_usage_idle_count{cpu="cpu1"} 20 } func TestRoundTripMetricVersion2(t *testing.T) { - Logger := testutil.Logger{Name: "outputs.prometheus_client"} + logger := testutil.Logger{Name: "outputs.prometheus_client"} tests := []struct { name string data []byte @@ -414,7 +415,8 @@ rpc_duration_seconds_count 2693 t.Run(tt.name, func(t *testing.T) { ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) - w.Write(tt.data) + _, err := w.Write(tt.data) + require.NoError(t, err) }) input := &inputs.Prometheus{ @@ -435,7 +437,7 @@ rpc_duration_seconds_count 2693 Listen: "127.0.0.1:0", Path: defaultPath, MetricVersion: 2, - Log: Logger, + Log: logger, CollectorsExclude: []string{"gocollector", "process"}, } err = output.Init() @@ -451,6 +453,7 @@ rpc_duration_seconds_count 2693 resp, err := http.Get(output.URL()) require.NoError(t, err) + defer resp.Body.Close() actual, err := io.ReadAll(resp.Body) require.NoError(t, err) diff --git a/plugins/outputs/riemann/riemann.go b/plugins/outputs/riemann/riemann.go index bad1e44a0..bfcc1e337 100644 --- a/plugins/outputs/riemann/riemann.go +++ b/plugins/outputs/riemann/riemann.go @@ -9,6 +9,7 @@ import ( "time" "github.com/amir/raidman" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/outputs" @@ -78,12 +79,12 @@ func (r *Riemann) Connect() error { return nil } -func (r *Riemann) Close() error { +func (r *Riemann) Close() (err error) { if r.client != nil { - r.client.Close() + err = r.client.Close() r.client = nil } - return nil + return err } func (r *Riemann) SampleConfig() string { @@ -113,7 +114,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error { } if err := r.client.SendMulti(events); err != nil { - r.Close() + r.Close() //nolint:revive // There is another error which will be returned here return fmt.Errorf("failed to send riemann message: %s", err) } return nil diff --git a/plugins/outputs/riemann_legacy/riemann.go b/plugins/outputs/riemann_legacy/riemann.go index 7fe80297d..0bd0f6b87 100644 --- a/plugins/outputs/riemann_legacy/riemann.go +++ b/plugins/outputs/riemann_legacy/riemann.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/amir/raidman" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" ) @@ -48,9 +49,9 @@ func (r *Riemann) Close() error { if r.client == nil { return nil } - r.client.Close() + err := r.client.Close() r.client = nil - return nil + return err } func (r *Riemann) SampleConfig() string { @@ -82,7 +83,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error { var senderr = r.client.SendMulti(events) if senderr != nil { - r.Close() // always returns nil + r.Close() //nolint:revive // There is another error which will be returned here return fmt.Errorf("failed to send riemann message (will try to reconnect), error: %s", senderr) } diff --git a/plugins/outputs/sensu/sensu.go b/plugins/outputs/sensu/sensu.go index 3cd8b2274..b1a937209 100644 --- a/plugins/outputs/sensu/sensu.go +++ b/plugins/outputs/sensu/sensu.go @@ -296,10 +296,10 @@ func (s *Sensu) Write(metrics []telegraf.Metric) error { return err } - return s.write(reqBody) + return s.writeMetrics(reqBody) } -func (s *Sensu) write(reqBody []byte) error { +func (s *Sensu) writeMetrics(reqBody []byte) error { var reqBodyBuffer io.Reader = bytes.NewBuffer(reqBody) method := http.MethodPost diff --git a/plugins/outputs/signalfx/signalfx.go b/plugins/outputs/signalfx/signalfx.go index d8452d7b7..b7550ae5b 100644 --- a/plugins/outputs/signalfx/signalfx.go +++ b/plugins/outputs/signalfx/signalfx.go @@ -6,12 +6,13 @@ import ( "fmt" "strings" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/outputs" "github.com/signalfx/golib/v3/datapoint" "github.com/signalfx/golib/v3/datapoint/dpsink" "github.com/signalfx/golib/v3/event" "github.com/signalfx/golib/v3/sfxclient" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" ) //init initializes the plugin context @@ -106,7 +107,7 @@ func (s *SignalFx) Connect() error { if s.IngestURL != "" { client.DatapointEndpoint = datapointEndpointForIngestURL(s.IngestURL) client.EventEndpoint = eventEndpointForIngestURL(s.IngestURL) - } else if s.SignalFxRealm != "" { + } else if s.SignalFxRealm != "" { //nolint: revive // "Simplifying" if c {...} else {... return } would not simplify anything at all in this case client.DatapointEndpoint = datapointEndpointForRealm(s.SignalFxRealm) client.EventEndpoint = eventEndpointForRealm(s.SignalFxRealm) } else { @@ -144,7 +145,7 @@ func (s *SignalFx) ConvertToSignalFx(metrics []telegraf.Metric) ([]*datapoint.Da if metricValue, err := datapoint.CastMetricValueWithBool(val); err == nil { var dp = datapoint.New(metricName, metricDims, - metricValue.(datapoint.Value), + metricValue, metricType, timestamp) diff --git a/plugins/outputs/signalfx/signalfx_test.go b/plugins/outputs/signalfx/signalfx_test.go index d21cff82f..fbb49d077 100644 --- a/plugins/outputs/signalfx/signalfx_test.go +++ b/plugins/outputs/signalfx/signalfx_test.go @@ -7,13 +7,14 @@ import ( "testing" "time" + "github.com/signalfx/golib/v3/datapoint" + "github.com/signalfx/golib/v3/event" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/testutil" - "github.com/signalfx/golib/v3/datapoint" - "github.com/signalfx/golib/v3/event" - "github.com/stretchr/testify/require" ) type sink struct { @@ -436,7 +437,9 @@ func TestSignalFx_SignalFx(t *testing.T) { measurements = append(measurements, m) } - s.Write(measurements) + err := s.Write(measurements) + require.NoError(t, err) + require.Eventually(t, func() bool { return len(s.client.(*sink).dps) == len(tt.want.datapoints) }, 5*time.Second, 100*time.Millisecond) require.Eventually(t, func() bool { return len(s.client.(*sink).evs) == len(tt.want.events) }, 5*time.Second, 100*time.Millisecond) @@ -596,7 +599,8 @@ func TestSignalFx_Errors(t *testing.T) { measurement.name, measurement.tags, measurement.fields, measurement.time, measurement.tp, ) - s.Write([]telegraf.Metric{m}) + err := s.Write([]telegraf.Metric{m}) + require.Error(t, err) } for !(len(s.client.(*errorsink).dps) == len(tt.want.datapoints) && len(s.client.(*errorsink).evs) == len(tt.want.events)) { time.Sleep(1 * time.Second) diff --git a/plugins/outputs/socket_writer/socket_writer.go b/plugins/outputs/socket_writer/socket_writer.go index 2546faa67..130a0f738 100644 --- a/plugins/outputs/socket_writer/socket_writer.go +++ b/plugins/outputs/socket_writer/socket_writer.go @@ -3,7 +3,6 @@ package socket_writer import ( "crypto/tls" "fmt" - "log" "net" "strings" "time" @@ -21,6 +20,7 @@ type SocketWriter struct { Address string KeepAlivePeriod *config.Duration tlsint.ClientConfig + Log telegraf.Logger `toml:"-"` serializers.Serializer @@ -99,7 +99,7 @@ func (sw *SocketWriter) Connect() error { } if err := sw.setKeepAlive(c); err != nil { - log.Printf("unable to configure keep alive (%s): %s", sw.Address, err) + sw.Log.Debugf("Unable to configure keep alive (%s): %s", sw.Address, err) } //set encoder sw.encoder, err = internal.NewContentEncoder(sw.ContentEncoding) @@ -142,13 +142,13 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error { for _, m := range metrics { bs, err := sw.Serialize(m) if err != nil { - log.Printf("D! [outputs.socket_writer] Could not serialize metric: %v", err) + sw.Log.Debugf("Could not serialize metric: %v", err) continue } bs, err = sw.encoder.Encode(bs) if err != nil { - log.Printf("D! [outputs.socket_writer] Could not encode metric: %v", err) + sw.Log.Debugf("Could not encode metric: %v", err) continue } @@ -156,7 +156,7 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error { //TODO log & keep going with remaining strings if err, ok := err.(net.Error); !ok || !err.Temporary() { // permanent error. close the connection - sw.Close() + sw.Close() //nolint:revive // There is another error which will be returned here sw.Conn = nil return fmt.Errorf("closing connection: %v", err) } diff --git a/plugins/outputs/socket_writer/socket_writer_test.go b/plugins/outputs/socket_writer/socket_writer_test.go index 0decb644c..d1283a411 100644 --- a/plugins/outputs/socket_writer/socket_writer_test.go +++ b/plugins/outputs/socket_writer/socket_writer_test.go @@ -9,10 +9,10 @@ import ( "sync" "testing" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestSocketWriter_tcp(t *testing.T) { @@ -105,8 +105,8 @@ func testSocketWriterStream(t *testing.T, sw *SocketWriter, lconn net.Conn) { require.True(t, scnr.Scan()) mstr2in := scnr.Text() + "\n" - assert.Equal(t, string(mbs1out), mstr1in) - assert.Equal(t, string(mbs2out), mstr2in) + require.Equal(t, string(mbs1out), mstr1in) + require.Equal(t, string(mbs2out), mstr2in) } func testSocketWriterPacket(t *testing.T, sw *SocketWriter, lconn net.PacketConn) { @@ -132,8 +132,8 @@ func testSocketWriterPacket(t *testing.T, sw *SocketWriter, lconn net.PacketConn } require.Len(t, mstrins, 2) - assert.Equal(t, mbs1str, mstrins[0]) - assert.Equal(t, mbs2str, mstrins[1]) + require.Equal(t, mbs1str, mstrins[0]) + require.Equal(t, mbs2str, mstrins[1]) } func TestSocketWriter_Write_err(t *testing.T) { @@ -145,20 +145,26 @@ func TestSocketWriter_Write_err(t *testing.T) { err = sw.Connect() require.NoError(t, err) - sw.Conn.(*net.TCPConn).SetReadBuffer(256) + err = sw.Conn.(*net.TCPConn).SetReadBuffer(256) + require.NoError(t, err) lconn, err := listener.Accept() require.NoError(t, err) - lconn.(*net.TCPConn).SetWriteBuffer(256) + err = lconn.(*net.TCPConn).SetWriteBuffer(256) + require.NoError(t, err) metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")} // close the socket to generate an error - lconn.Close() - sw.Conn.Close() + err = lconn.Close() + require.NoError(t, err) + + err = sw.Conn.Close() + require.NoError(t, err) + err = sw.Write(metrics) require.Error(t, err) - assert.Nil(t, sw.Conn) + require.Nil(t, sw.Conn) } func TestSocketWriter_Write_reconnect(t *testing.T) { @@ -170,12 +176,16 @@ func TestSocketWriter_Write_reconnect(t *testing.T) { err = sw.Connect() require.NoError(t, err) - sw.Conn.(*net.TCPConn).SetReadBuffer(256) + err = sw.Conn.(*net.TCPConn).SetReadBuffer(256) + require.NoError(t, err) lconn, err := listener.Accept() require.NoError(t, err) - lconn.(*net.TCPConn).SetWriteBuffer(256) - lconn.Close() + err = lconn.(*net.TCPConn).SetWriteBuffer(256) + require.NoError(t, err) + + err = lconn.Close() + require.NoError(t, err) sw.Conn = nil wg := sync.WaitGroup{} @@ -191,13 +201,13 @@ func TestSocketWriter_Write_reconnect(t *testing.T) { require.NoError(t, err) wg.Wait() - assert.NoError(t, lerr) + require.NoError(t, lerr) mbsout, _ := sw.Serialize(metrics[0]) buf := make([]byte, 256) n, err := lconn.Read(buf) require.NoError(t, err) - assert.Equal(t, string(mbsout), string(buf[:n])) + require.Equal(t, string(mbsout), string(buf[:n])) } func TestSocketWriter_udp_gzip(t *testing.T) { diff --git a/plugins/outputs/stackdriver/stackdriver.go b/plugins/outputs/stackdriver/stackdriver.go index d6b24ff78..e1fb49d2e 100644 --- a/plugins/outputs/stackdriver/stackdriver.go +++ b/plugins/outputs/stackdriver/stackdriver.go @@ -4,20 +4,20 @@ import ( "context" "fmt" "hash/fnv" - "log" "path" "sort" "strings" monitoring "cloud.google.com/go/monitoring/apiv3/v2" // Imports the Stackdriver Monitoring client package. - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/plugins/outputs" "google.golang.org/api/option" metricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" ) // Stackdriver is the Google Stackdriver config info. @@ -26,6 +26,7 @@ type Stackdriver struct { Namespace string ResourceType string `toml:"resource_type"` ResourceLabels map[string]string `toml:"resource_labels"` + Log telegraf.Logger `toml:"-"` client *monitoring.MetricClient } @@ -46,9 +47,9 @@ const ( // MaxInt is the max int64 value. MaxInt = int(^uint(0) >> 1) - errStringPointsOutOfOrder = "One or more of the points specified had an older end time than the most recent point" - errStringPointsTooOld = "Data points cannot be written more than 24h in the past" - errStringPointsTooFrequent = "One or more points were written more frequently than the maximum sampling period configured for the metric" + errStringPointsOutOfOrder = "one or more of the points specified had an older end time than the most recent point" + errStringPointsTooOld = "data points cannot be written more than 24h in the past" + errStringPointsTooFrequent = "one or more points were written more frequently than the maximum sampling period configured for the metric" ) var sampleConfig = ` @@ -118,15 +119,15 @@ type timeSeriesBuckets map[uint64][]*monitoringpb.TimeSeries func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monitoringpb.TimeSeries) { h := fnv.New64a() - h.Write([]byte(m.Name())) - h.Write([]byte{'\n'}) - h.Write([]byte(f.Key)) - h.Write([]byte{'\n'}) + h.Write([]byte(m.Name())) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte{'\n'}) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte(f.Key)) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte{'\n'}) //nolint:revive // from hash.go: "It never returns an error" for key, value := range m.Tags() { - h.Write([]byte(key)) - h.Write([]byte{'\n'}) - h.Write([]byte(value)) - h.Write([]byte{'\n'}) + h.Write([]byte(key)) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte{'\n'}) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte(value)) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte{'\n'}) //nolint:revive // from hash.go: "It never returns an error" } k := h.Sum64() @@ -145,7 +146,7 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error { for _, f := range m.FieldList() { value, err := getStackdriverTypedValue(f.Value) if err != nil { - log.Printf("E! [outputs.stackdriver] get type failed: %s", err) + s.Log.Errorf("Get type failed: %s", err) continue } @@ -155,13 +156,13 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error { metricKind, err := getStackdriverMetricKind(m.Type()) if err != nil { - log.Printf("E! [outputs.stackdriver] get metric failed: %s", err) + s.Log.Errorf("Get metric failed: %s", err) continue } timeInterval, err := getStackdriverTimeInterval(metricKind, StartTime, m.Time().Unix()) if err != nil { - log.Printf("E! [outputs.stackdriver] get time interval failed: %s", err) + s.Log.Errorf("Get time interval failed: %s", err) continue } @@ -175,7 +176,7 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error { timeSeries := &monitoringpb.TimeSeries{ Metric: &metricpb.Metric{ Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key), - Labels: getStackdriverLabels(m.TagList()), + Labels: s.getStackdriverLabels(m.TagList()), }, MetricKind: metricKind, Resource: &monitoredrespb.MonitoredResource{ @@ -228,10 +229,10 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error { if strings.Contains(err.Error(), errStringPointsOutOfOrder) || strings.Contains(err.Error(), errStringPointsTooOld) || strings.Contains(err.Error(), errStringPointsTooFrequent) { - log.Printf("D! [outputs.stackdriver] unable to write to Stackdriver: %s", err) + s.Log.Debugf("Unable to write to Stackdriver: %s", err) return nil } - log.Printf("E! [outputs.stackdriver] unable to write to Stackdriver: %s", err) + s.Log.Errorf("Unable to write to Stackdriver: %s", err) return err } } @@ -306,7 +307,7 @@ func getStackdriverTypedValue(value interface{}) (*monitoringpb.TypedValue, erro case float64: return &monitoringpb.TypedValue{ Value: &monitoringpb.TypedValue_DoubleValue{ - DoubleValue: float64(v), + DoubleValue: v, }, }, nil case bool: @@ -323,39 +324,26 @@ func getStackdriverTypedValue(value interface{}) (*monitoringpb.TypedValue, erro } } -func getStackdriverLabels(tags []*telegraf.Tag) map[string]string { +func (s *Stackdriver) getStackdriverLabels(tags []*telegraf.Tag) map[string]string { labels := make(map[string]string) for _, t := range tags { labels[t.Key] = t.Value } for k, v := range labels { if len(k) > QuotaStringLengthForLabelKey { - log.Printf( - "W! [outputs.stackdriver] removing tag [%s] key exceeds string length for label key [%d]", - k, - QuotaStringLengthForLabelKey, - ) + s.Log.Warnf("Removing tag [%s] key exceeds string length for label key [%d]", k, QuotaStringLengthForLabelKey) delete(labels, k) continue } if len(v) > QuotaStringLengthForLabelValue { - log.Printf( - "W! [outputs.stackdriver] removing tag [%s] value exceeds string length for label value [%d]", - k, - QuotaStringLengthForLabelValue, - ) + s.Log.Warnf("Removing tag [%s] value exceeds string length for label value [%d]", k, QuotaStringLengthForLabelValue) delete(labels, k) continue } } if len(labels) > QuotaLabelsPerMetricDescriptor { excess := len(labels) - QuotaLabelsPerMetricDescriptor - log.Printf( - "W! [outputs.stackdriver] tag count [%d] exceeds quota for stackdriver labels [%d] removing [%d] random tags", - len(labels), - QuotaLabelsPerMetricDescriptor, - excess, - ) + s.Log.Warnf("Tag count [%d] exceeds quota for stackdriver labels [%d] removing [%d] random tags", len(labels), QuotaLabelsPerMetricDescriptor, excess) for k := range labels { if excess == 0 { break diff --git a/plugins/outputs/stackdriver/stackdriver_test.go b/plugins/outputs/stackdriver/stackdriver_test.go index bb2a620e9..741e08e65 100644 --- a/plugins/outputs/stackdriver/stackdriver_test.go +++ b/plugins/outputs/stackdriver/stackdriver_test.go @@ -12,8 +12,6 @@ import ( "time" monitoring "cloud.google.com/go/monitoring/apiv3/v2" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" "google.golang.org/api/option" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" @@ -22,6 +20,9 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" ) // clientOpt is the option tests should use to connect to the test server. @@ -65,6 +66,9 @@ func TestMain(m *testing.M) { if err != nil { log.Fatal(err) } + + // Ignore the returned error as the tests will fail anyway + //nolint:errcheck,revive go serv.Serve(lis) conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) @@ -90,6 +94,7 @@ func TestWrite(t *testing.T) { s := &Stackdriver{ Project: fmt.Sprintf("projects/%s", "[PROJECT]"), Namespace: "test", + Log: testutil.Logger{}, client: c, } @@ -121,6 +126,7 @@ func TestWriteResourceTypeAndLabels(t *testing.T) { ResourceLabels: map[string]string{ "mylabel": "myvalue", }, + Log: testutil.Logger{}, client: c, } @@ -149,6 +155,7 @@ func TestWriteAscendingTime(t *testing.T) { s := &Stackdriver{ Project: fmt.Sprintf("projects/%s", "[PROJECT]"), Namespace: "test", + Log: testutil.Logger{}, client: c, } @@ -221,6 +228,7 @@ func TestWriteBatchable(t *testing.T) { s := &Stackdriver{ Project: fmt.Sprintf("projects/%s", "[PROJECT]"), Namespace: "test", + Log: testutil.Logger{}, client: c, } @@ -398,6 +406,7 @@ func TestWriteIgnoredErrors(t *testing.T) { s := &Stackdriver{ Project: fmt.Sprintf("projects/%s", "[PROJECT]"), Namespace: "test", + Log: testutil.Logger{}, client: c, } @@ -431,6 +440,10 @@ func TestGetStackdriverLabels(t *testing.T) { {Key: "valuequota", Value: "icym5wcpejnhljcvy2vwk15svmhrtueoppwlvix61vlbaeedufn1g6u4jgwjoekwew9s2dboxtgrkiyuircnl8h1lbzntt9gzcf60qunhxurhiz0g2bynzy1v6eyn4ravndeiiugobsrsj2bfaguahg4gxn7nx4irwfknunhkk6jdlldevawj8levebjajcrcbeugewd14fa8o34ycfwx2ymalyeqxhfqrsksxnii2deqq6cghrzi6qzwmittkzdtye3imoygqmjjshiskvnzz1e4ipd9c6wfor5jsygn1kvcg6jm4clnsl1fnxotbei9xp4swrkjpgursmfmkyvxcgq9hoy435nwnolo3ipnvdlhk6pmlzpdjn6gqi3v9gv7jn5ro2p1t5ufxzfsvqq1fyrgoi7gvmttil1banh3cftkph1dcoaqfhl7y0wkvhwwvrmslmmxp1wedyn8bacd7akmjgfwdvcmrymbzvmrzfvq1gs1xnmmg8rsfxci2h6r1ralo3splf4f3bdg4c7cy0yy9qbxzxhcmdpwekwc7tdjs8uj6wmofm2aor4hum8nwyfwwlxy3yvsnbjy32oucsrmhcnu6l2i8laujkrhvsr9fcix5jflygznlydbqw5uhw1rg1g5wiihqumwmqgggemzoaivm3ut41vjaff4uqtqyuhuwblmuiphfkd7si49vgeeswzg7tpuw0oxmkesgibkcjtev2h9ouxzjs3eb71jffhdacyiuyhuxwvm5bnrjewbm4x2kmhgbirz3eoj7ijgplggdkx5vixufg65ont8zi1jabsuxx0vsqgprunwkugqkxg2r7iy6fmgs4lob4dlseinowkst6gp6x1ejreauyzjz7atzm3hbmr5rbynuqp4lxrnhhcbuoun69mavvaaki0bdz5ybmbbbz5qdv0odtpjo2aezat5uosjuhzbvic05jlyclikynjgfhencdkz3qcqzbzhnsynj1zdke0sk4zfpvfyryzsxv9pu0qm"}, } - labels := getStackdriverLabels(tags) + s := &Stackdriver{ + Log: testutil.Logger{}, + } + + labels := s.getStackdriverLabels(tags) require.Equal(t, QuotaLabelsPerMetricDescriptor, len(labels)) } diff --git a/plugins/outputs/sumologic/sumologic.go b/plugins/outputs/sumologic/sumologic.go index 088210b9d..889a28bc2 100644 --- a/plugins/outputs/sumologic/sumologic.go +++ b/plugins/outputs/sumologic/sumologic.go @@ -3,7 +3,6 @@ package sumologic import ( "bytes" "compress/gzip" - "log" "net/http" "time" @@ -198,19 +197,19 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error { return s.writeRequestChunks(chunks) } - return s.write(reqBody) + return s.writeRequestChunk(reqBody) } func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error { for _, reqChunk := range chunks { - if err := s.write(reqChunk); err != nil { + if err := s.writeRequestChunk(reqChunk); err != nil { s.Log.Errorf("Error sending chunk: %v", err) } } return nil } -func (s *SumoLogic) write(reqBody []byte) error { +func (s *SumoLogic) writeRequestChunk(reqBody []byte) error { var ( err error buff bytes.Buffer @@ -284,31 +283,31 @@ func (s *SumoLogic) splitIntoChunks(metrics []telegraf.Metric) ([][]byte, error) if la+len(chunkBody) > int(s.MaxRequstBodySize) { // ... and it's just the right size, without currently processed chunk. break - } else { - // ... we can try appending more. - i++ - toAppend = append(toAppend, chunkBody...) - continue } - } else { // la == 0 + // ... we can try appending more. i++ - toAppend = chunkBody - - if len(chunkBody) > int(s.MaxRequstBodySize) { - log.Printf( - "W! [SumoLogic] max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split", - s.MaxRequstBodySize, len(chunkBody), - ) - - // The serialized metric is too big but we have no choice - // but to send it. - // max_request_body_size was set so small that it wouldn't - // even accomodate a single metric. - break - } - + toAppend = append(toAppend, chunkBody...) continue } + + // la == 0 + i++ + toAppend = chunkBody + + if len(chunkBody) > int(s.MaxRequstBodySize) { + s.Log.Warnf( + "max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split", + s.MaxRequstBodySize, len(chunkBody), + ) + + // The serialized metric is too big, but we have no choice + // but to send it. + // max_request_body_size was set so small that it wouldn't + // even accommodate a single metric. + break + } + + continue } if toAppend == nil { diff --git a/plugins/outputs/sumologic/sumologic_test.go b/plugins/outputs/sumologic/sumologic_test.go index 5629defa4..a3e202f49 100644 --- a/plugins/outputs/sumologic/sumologic_test.go +++ b/plugins/outputs/sumologic/sumologic_test.go @@ -5,6 +5,7 @@ import ( "bytes" "compress/gzip" "fmt" + "github.com/influxdata/telegraf/testutil" "io" "net/http" "net/http/httptest" @@ -13,7 +14,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -25,7 +25,7 @@ import ( "github.com/influxdata/telegraf/plugins/serializers/prometheus" ) -func getMetric(t *testing.T) telegraf.Metric { +func getMetric() telegraf.Metric { m := metric.New( "cpu", map[string]string{}, @@ -37,7 +37,7 @@ func getMetric(t *testing.T) telegraf.Metric { return m } -func getMetrics(t *testing.T) []telegraf.Metric { +func getMetrics() []telegraf.Metric { const count = 100 var metrics = make([]telegraf.Metric, count) @@ -105,7 +105,7 @@ func TestMethod(t *testing.T) { } require.NoError(t, err) - err = plugin.Write([]telegraf.Metric{getMetric(t)}) + err = plugin.Write([]telegraf.Metric{getMetric()}) require.NoError(t, err) }) } @@ -177,7 +177,7 @@ func TestStatusCode(t *testing.T) { err = tt.plugin.Connect() require.NoError(t, err) - err = tt.plugin.Write([]telegraf.Metric{getMetric(t)}) + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) tt.errFunc(t, err) }) } @@ -247,7 +247,8 @@ func TestContentType(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { gz, err := gzip.NewReader(r.Body) require.NoError(t, err) - io.Copy(&body, gz) + _, err = io.Copy(&body, gz) + require.NoError(t, err) w.WriteHeader(http.StatusOK) })) defer ts.Close() @@ -260,7 +261,7 @@ func TestContentType(t *testing.T) { require.NoError(t, plugin.Connect()) - err = plugin.Write([]telegraf.Metric{getMetric(t)}) + err = plugin.Write([]telegraf.Metric{getMetric()}) require.NoError(t, err) if tt.expectedBody != nil { @@ -302,7 +303,7 @@ func TestContentEncodingGzip(t *testing.T) { payload, err := io.ReadAll(body) require.NoError(t, err) - assert.Equal(t, string(payload), "metric=cpu field=value 42 0\n") + require.Equal(t, string(payload), "metric=cpu field=value 42 0\n") w.WriteHeader(http.StatusNoContent) }) @@ -316,14 +317,12 @@ func TestContentEncodingGzip(t *testing.T) { err = plugin.Connect() require.NoError(t, err) - err = plugin.Write([]telegraf.Metric{getMetric(t)}) + err = plugin.Write([]telegraf.Metric{getMetric()}) require.NoError(t, err) }) } } -type TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) - func TestDefaultUserAgent(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) defer ts.Close() @@ -349,7 +348,7 @@ func TestDefaultUserAgent(t *testing.T) { err = plugin.Connect() require.NoError(t, err) - err = plugin.Write([]telegraf.Metric{getMetric(t)}) + err = plugin.Write([]telegraf.Metric{getMetric()}) require.NoError(t, err) }) } @@ -463,7 +462,7 @@ func TestMaxRequestBodySize(t *testing.T) { s.URL = u.String() return s }, - metrics: []telegraf.Metric{getMetric(t)}, + metrics: []telegraf.Metric{getMetric()}, expectedError: false, expectedRequestCount: 1, expectedMetricLinesCount: 1, @@ -475,7 +474,7 @@ func TestMaxRequestBodySize(t *testing.T) { s.URL = u.String() return s }, - metrics: getMetrics(t), + metrics: getMetrics(), expectedError: false, expectedRequestCount: 1, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 @@ -490,7 +489,7 @@ func TestMaxRequestBodySize(t *testing.T) { s.MaxRequstBodySize = 43_749 return s }, - metrics: getMetrics(t), + metrics: getMetrics(), expectedError: false, expectedRequestCount: 2, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 @@ -503,7 +502,7 @@ func TestMaxRequestBodySize(t *testing.T) { s.MaxRequstBodySize = 10_000 return s }, - metrics: getMetrics(t), + metrics: getMetrics(), expectedError: false, expectedRequestCount: 5, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 @@ -516,7 +515,7 @@ func TestMaxRequestBodySize(t *testing.T) { s.MaxRequstBodySize = 5_000 return s }, - metrics: getMetrics(t), + metrics: getMetrics(), expectedError: false, expectedRequestCount: 10, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 @@ -529,7 +528,7 @@ func TestMaxRequestBodySize(t *testing.T) { s.MaxRequstBodySize = 2_500 return s }, - metrics: getMetrics(t), + metrics: getMetrics(), expectedError: false, expectedRequestCount: 20, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 @@ -542,7 +541,7 @@ func TestMaxRequestBodySize(t *testing.T) { s.MaxRequstBodySize = 1_000 return s }, - metrics: getMetrics(t), + metrics: getMetrics(), expectedError: false, expectedRequestCount: 50, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 @@ -555,7 +554,7 @@ func TestMaxRequestBodySize(t *testing.T) { s.MaxRequstBodySize = 500 return s }, - metrics: getMetrics(t), + metrics: getMetrics(), expectedError: false, expectedRequestCount: 100, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 @@ -568,7 +567,7 @@ func TestMaxRequestBodySize(t *testing.T) { s.MaxRequstBodySize = 300 return s }, - metrics: getMetrics(t), + metrics: getMetrics(), expectedError: false, expectedRequestCount: 100, expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 @@ -596,6 +595,7 @@ func TestMaxRequestBodySize(t *testing.T) { plugin := tt.plugin() plugin.SetSerializer(serializer) + plugin.Log = testutil.Logger{} err = plugin.Connect() require.NoError(t, err) diff --git a/plugins/outputs/syslog/syslog.go b/plugins/outputs/syslog/syslog.go index 570ed15a7..d5925d4df 100644 --- a/plugins/outputs/syslog/syslog.go +++ b/plugins/outputs/syslog/syslog.go @@ -3,7 +3,6 @@ package syslog import ( "crypto/tls" "fmt" - "log" "net" "strconv" "strings" @@ -11,6 +10,7 @@ import ( "github.com/influxdata/go-syslog/v3/nontransparent" "github.com/influxdata/go-syslog/v3/rfc5424" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" framing "github.com/influxdata/telegraf/internal/syslog" @@ -29,6 +29,7 @@ type Syslog struct { Separator string `toml:"sdparam_separator"` Framing framing.Framing Trailer nontransparent.TrailerType + Log telegraf.Logger `toml:"-"` net.Conn tlsint.ClientConfig mapper *SyslogMapper @@ -135,7 +136,7 @@ func (s *Syslog) Connect() error { } if err := s.setKeepAlive(c); err != nil { - log.Printf("unable to configure keep alive (%s): %s", s.Address, err) + s.Log.Warnf("unable to configure keep alive (%s): %s", s.Address, err) } s.Conn = c @@ -186,17 +187,17 @@ func (s *Syslog) Write(metrics []telegraf.Metric) (err error) { for _, metric := range metrics { var msg *rfc5424.SyslogMessage if msg, err = s.mapper.MapMetricToSyslogMessage(metric); err != nil { - log.Printf("E! [outputs.syslog] Failed to create syslog message: %v", err) + s.Log.Errorf("Failed to create syslog message: %v", err) continue } var msgBytesWithFraming []byte if msgBytesWithFraming, err = s.getSyslogMessageBytesWithFraming(msg); err != nil { - log.Printf("E! [outputs.syslog] Failed to convert syslog message with framing: %v", err) + s.Log.Errorf("Failed to convert syslog message with framing: %v", err) continue } if _, err = s.Conn.Write(msgBytesWithFraming); err != nil { if netErr, ok := err.(net.Error); !ok || !netErr.Temporary() { - s.Close() + s.Close() //nolint:revive // There is another error which will be returned here s.Conn = nil return fmt.Errorf("closing connection: %v", netErr) } diff --git a/plugins/outputs/syslog/syslog_mapper.go b/plugins/outputs/syslog/syslog_mapper.go index 28c74f3f9..7d3d6d0a3 100644 --- a/plugins/outputs/syslog/syslog_mapper.go +++ b/plugins/outputs/syslog/syslog_mapper.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/go-syslog/v3/rfc5424" + "github.com/influxdata/telegraf" ) @@ -90,8 +91,7 @@ func mapMsgID(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { func mapVersion(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { if value, ok := metric.GetField("version"); ok { - switch v := value.(type) { - case uint64: + if v, ok := value.(uint64); ok { msg.SetVersion(uint16(v)) return } @@ -142,9 +142,9 @@ func mapHostname(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { func mapTimestamp(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { timestamp := metric.Time() + //nolint: revive // Need switch with only one case to handle `.(type)` if value, ok := metric.GetField("timestamp"); ok { - switch v := value.(type) { - case int64: + if v, ok := value.(int64); ok { timestamp = time.Unix(0, v).UTC() } } diff --git a/plugins/outputs/syslog/syslog_mapper_test.go b/plugins/outputs/syslog/syslog_mapper_test.go index d4bbc1d6f..90cec95e4 100644 --- a/plugins/outputs/syslog/syslog_mapper_test.go +++ b/plugins/outputs/syslog/syslog_mapper_test.go @@ -5,9 +5,9 @@ import ( "testing" "time" - "github.com/influxdata/telegraf/metric" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf/metric" ) func TestSyslogMapperWithDefaults(t *testing.T) { @@ -22,11 +22,11 @@ func TestSyslogMapperWithDefaults(t *testing.T) { time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), ) hostname, err := os.Hostname() - assert.NoError(t, err) + require.NoError(t, err) syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) require.NoError(t, err) str, _ := syslogMessage.String() - assert.Equal(t, "<13>1 2010-11-10T23:00:00Z "+hostname+" Telegraf - testmetric -", str, "Wrong syslog message") + require.Equal(t, "<13>1 2010-11-10T23:00:00Z "+hostname+" Telegraf - testmetric -", str, "Wrong syslog message") } func TestSyslogMapperWithHostname(t *testing.T) { @@ -47,7 +47,7 @@ func TestSyslogMapperWithHostname(t *testing.T) { syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) require.NoError(t, err) str, _ := syslogMessage.String() - assert.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", str, "Wrong syslog message") + require.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", str, "Wrong syslog message") } func TestSyslogMapperWithHostnameSourceFallback(t *testing.T) { s := newSyslog() @@ -66,7 +66,7 @@ func TestSyslogMapperWithHostnameSourceFallback(t *testing.T) { syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) require.NoError(t, err) str, _ := syslogMessage.String() - assert.Equal(t, "<13>1 2010-11-10T23:00:00Z sourcevalue Telegraf - testmetric -", str, "Wrong syslog message") + require.Equal(t, "<13>1 2010-11-10T23:00:00Z sourcevalue Telegraf - testmetric -", str, "Wrong syslog message") } func TestSyslogMapperWithHostnameHostFallback(t *testing.T) { @@ -85,7 +85,7 @@ func TestSyslogMapperWithHostnameHostFallback(t *testing.T) { syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) require.NoError(t, err) str, _ := syslogMessage.String() - assert.Equal(t, "<13>1 2010-11-10T23:00:00Z hostvalue Telegraf - testmetric -", str, "Wrong syslog message") + require.Equal(t, "<13>1 2010-11-10T23:00:00Z hostvalue Telegraf - testmetric -", str, "Wrong syslog message") } func TestSyslogMapperWithDefaultSdid(t *testing.T) { @@ -120,7 +120,7 @@ func TestSyslogMapperWithDefaultSdid(t *testing.T) { syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) require.NoError(t, err) str, _ := syslogMessage.String() - assert.Equal(t, "<27>2 2010-11-10T23:30:00Z testhost testapp 25 555 [default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"foo\" value3=\"1.2\"] Test message", str, "Wrong syslog message") + require.Equal(t, "<27>2 2010-11-10T23:30:00Z testhost testapp 25 555 [default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"foo\" value3=\"1.2\"] Test message", str, "Wrong syslog message") } func TestSyslogMapperWithDefaultSdidAndOtherSdids(t *testing.T) { @@ -158,7 +158,7 @@ func TestSyslogMapperWithDefaultSdidAndOtherSdids(t *testing.T) { syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) require.NoError(t, err) str, _ := syslogMessage.String() - assert.Equal(t, "<25>2 2010-11-10T23:30:00Z testhost testapp 25 555 [bar@123 tag3=\"barfoobar\" value3=\"2\"][default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"default\"][foo@456 value4=\"foo\"] Test message", str, "Wrong syslog message") + require.Equal(t, "<25>2 2010-11-10T23:30:00Z testhost testapp 25 555 [bar@123 tag3=\"barfoobar\" value3=\"2\"][default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"default\"][foo@456 value4=\"foo\"] Test message", str, "Wrong syslog message") } func TestSyslogMapperWithNoSdids(t *testing.T) { @@ -196,5 +196,5 @@ func TestSyslogMapperWithNoSdids(t *testing.T) { syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) require.NoError(t, err) str, _ := syslogMessage.String() - assert.Equal(t, "<26>2 2010-11-10T23:30:00Z testhost testapp 25 555 - Test message", str, "Wrong syslog message") + require.Equal(t, "<26>2 2010-11-10T23:30:00Z testhost testapp 25 555 - Test message", str, "Wrong syslog message") } diff --git a/plugins/outputs/syslog/syslog_test.go b/plugins/outputs/syslog/syslog_test.go index d9e082e5f..f245bcc84 100644 --- a/plugins/outputs/syslog/syslog_test.go +++ b/plugins/outputs/syslog/syslog_test.go @@ -6,12 +6,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" framing "github.com/influxdata/telegraf/internal/syslog" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestGetSyslogMessageWithFramingOctectCounting(t *testing.T) { @@ -34,7 +34,7 @@ func TestGetSyslogMessageWithFramingOctectCounting(t *testing.T) { messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage) require.NoError(t, err) - assert.Equal(t, "59 <13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", string(messageBytesWithFraming), "Incorrect Octect counting framing") + require.Equal(t, "59 <13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", string(messageBytesWithFraming), "Incorrect Octect counting framing") } func TestGetSyslogMessageWithFramingNonTransparent(t *testing.T) { @@ -58,7 +58,7 @@ func TestGetSyslogMessageWithFramingNonTransparent(t *testing.T) { messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage) require.NoError(t, err) - assert.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -\x00", string(messageBytesWithFraming), "Incorrect Octect counting framing") + require.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -\x00", string(messageBytesWithFraming), "Incorrect Octect counting framing") } func TestSyslogWriteWithTcp(t *testing.T) { @@ -110,7 +110,7 @@ func testSyslogWriteWithStream(t *testing.T, s *Syslog, lconn net.Conn) { buf := make([]byte, 256) n, err := lconn.Read(buf) require.NoError(t, err) - assert.Equal(t, string(messageBytesWithFraming), string(buf[:n])) + require.Equal(t, string(messageBytesWithFraming), string(buf[:n])) } func testSyslogWriteWithPacket(t *testing.T, s *Syslog, lconn net.PacketConn) { @@ -134,7 +134,7 @@ func testSyslogWriteWithPacket(t *testing.T, s *Syslog, lconn net.PacketConn) { buf := make([]byte, 256) n, _, err := lconn.ReadFrom(buf) require.NoError(t, err) - assert.Equal(t, string(messageBytesWithFraming), string(buf[:n])) + require.Equal(t, string(messageBytesWithFraming), string(buf[:n])) } func TestSyslogWriteErr(t *testing.T) { @@ -146,20 +146,26 @@ func TestSyslogWriteErr(t *testing.T) { err = s.Connect() require.NoError(t, err) - s.Conn.(*net.TCPConn).SetReadBuffer(256) + err = s.Conn.(*net.TCPConn).SetReadBuffer(256) + require.NoError(t, err) lconn, err := listener.Accept() require.NoError(t, err) - lconn.(*net.TCPConn).SetWriteBuffer(256) + err = lconn.(*net.TCPConn).SetWriteBuffer(256) + require.NoError(t, err) metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")} // close the socket to generate an error - lconn.Close() - s.Conn.Close() + err = lconn.Close() + require.NoError(t, err) + + err = s.Conn.Close() + require.NoError(t, err) + err = s.Write(metrics) require.Error(t, err) - assert.Nil(t, s.Conn) + require.Nil(t, s.Conn) } func TestSyslogWriteReconnect(t *testing.T) { @@ -171,12 +177,15 @@ func TestSyslogWriteReconnect(t *testing.T) { err = s.Connect() require.NoError(t, err) - s.Conn.(*net.TCPConn).SetReadBuffer(256) + err = s.Conn.(*net.TCPConn).SetReadBuffer(256) + require.NoError(t, err) lconn, err := listener.Accept() require.NoError(t, err) - lconn.(*net.TCPConn).SetWriteBuffer(256) - lconn.Close() + err = lconn.(*net.TCPConn).SetWriteBuffer(256) + require.NoError(t, err) + err = lconn.Close() + require.NoError(t, err) s.Conn = nil wg := sync.WaitGroup{} @@ -192,7 +201,7 @@ func TestSyslogWriteReconnect(t *testing.T) { require.NoError(t, err) wg.Wait() - assert.NoError(t, lerr) + require.NoError(t, lerr) syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0]) require.NoError(t, err) @@ -201,5 +210,5 @@ func TestSyslogWriteReconnect(t *testing.T) { buf := make([]byte, 256) n, err := lconn.Read(buf) require.NoError(t, err) - assert.Equal(t, string(messageBytesWithFraming), string(buf[:n])) + require.Equal(t, string(messageBytesWithFraming), string(buf[:n])) } diff --git a/plugins/outputs/timestream/timestream.go b/plugins/outputs/timestream/timestream.go index 6478563b6..91d73de38 100644 --- a/plugins/outputs/timestream/timestream.go +++ b/plugins/outputs/timestream/timestream.go @@ -10,14 +10,14 @@ import ( "strconv" "time" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/outputs" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/timestreamwrite" "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" "github.com/aws/smithy-go" + + "github.com/influxdata/telegraf" internalaws "github.com/influxdata/telegraf/config/aws" + "github.com/influxdata/telegraf/plugins/outputs" ) type ( @@ -332,12 +332,12 @@ func (t *Timestream) logWriteToTimestreamError(err error, tableName *string) { func (t *Timestream) createTableAndRetry(writeRecordsInput *timestreamwrite.WriteRecordsInput) error { if t.CreateTableIfNotExists { t.Log.Infof("Trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'true'.", *writeRecordsInput.TableName, t.DatabaseName) - if err := t.createTable(writeRecordsInput.TableName); err != nil { - t.Log.Errorf("Failed to create table '%s' in database '%s': %s. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName, err) - } else { + err := t.createTable(writeRecordsInput.TableName) + if err == nil { t.Log.Infof("Table '%s' in database '%s' created. Retrying writing.", *writeRecordsInput.TableName, t.DatabaseName) return t.writeToTimestream(writeRecordsInput, false) } + t.Log.Errorf("Failed to create table '%s' in database '%s': %s. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName, err) } else { t.Log.Errorf("Not trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'false'. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName) } @@ -434,22 +434,22 @@ func (t *Timestream) TransformMetrics(metrics []telegraf.Metric) []*timestreamwr func hashFromMetricTimeNameTagKeys(m telegraf.Metric) uint64 { h := fnv.New64a() - h.Write([]byte(m.Name())) - h.Write([]byte("\n")) + h.Write([]byte(m.Name())) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" for _, tag := range m.TagList() { if tag.Key == "" { continue } - h.Write([]byte(tag.Key)) - h.Write([]byte("\n")) - h.Write([]byte(tag.Value)) - h.Write([]byte("\n")) + h.Write([]byte(tag.Key)) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte(tag.Value)) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" } b := make([]byte, binary.MaxVarintLen64) n := binary.PutUvarint(b, uint64(m.Time().UnixNano())) - h.Write(b[:n]) - h.Write([]byte("\n")) + h.Write(b[:n]) //nolint:revive // from hash.go: "It never returns an error" + h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" return h.Sum64() } @@ -537,7 +537,7 @@ func getTimestreamTime(t time.Time) (timeUnit types.TimeUnit, timeValue string) timeUnit = types.TimeUnitNanoseconds timeValue = strconv.FormatInt(nanosTime, 10) } - return + return timeUnit, timeValue } // convertValue converts single Field value from Telegraf Metric and produces @@ -595,7 +595,7 @@ func convertValue(v interface{}) (value string, valueType types.MeasureValueType default: // Skip unsupported type. ok = false - return + return value, valueType, ok } - return + return value, valueType, ok } diff --git a/plugins/outputs/timestream/timestream_internal_test.go b/plugins/outputs/timestream/timestream_internal_test.go index d151c10d4..a86bc432d 100644 --- a/plugins/outputs/timestream/timestream_internal_test.go +++ b/plugins/outputs/timestream/timestream_internal_test.go @@ -4,40 +4,36 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" - - "github.com/stretchr/testify/assert" ) func TestGetTimestreamTime(t *testing.T) { - assertions := assert.New(t) - tWithNanos := time.Date(2020, time.November, 10, 23, 44, 20, 123, time.UTC) tWithMicros := time.Date(2020, time.November, 10, 23, 44, 20, 123000, time.UTC) tWithMillis := time.Date(2020, time.November, 10, 23, 44, 20, 123000000, time.UTC) tOnlySeconds := time.Date(2020, time.November, 10, 23, 44, 20, 0, time.UTC) tUnitNanos, tValueNanos := getTimestreamTime(tWithNanos) - assertions.Equal(types.TimeUnitNanoseconds, tUnitNanos) - assertions.Equal("1605051860000000123", tValueNanos) + require.Equal(t, types.TimeUnitNanoseconds, tUnitNanos) + require.Equal(t, "1605051860000000123", tValueNanos) tUnitMicros, tValueMicros := getTimestreamTime(tWithMicros) - assertions.Equal(types.TimeUnitMicroseconds, tUnitMicros) - assertions.Equal("1605051860000123", tValueMicros) + require.Equal(t, types.TimeUnitMicroseconds, tUnitMicros) + require.Equal(t, "1605051860000123", tValueMicros) tUnitMillis, tValueMillis := getTimestreamTime(tWithMillis) - assertions.Equal(types.TimeUnitMilliseconds, tUnitMillis) - assertions.Equal("1605051860123", tValueMillis) + require.Equal(t, types.TimeUnitMilliseconds, tUnitMillis) + require.Equal(t, "1605051860123", tValueMillis) tUnitSeconds, tValueSeconds := getTimestreamTime(tOnlySeconds) - assertions.Equal(types.TimeUnitSeconds, tUnitSeconds) - assertions.Equal("1605051860", tValueSeconds) + require.Equal(t, types.TimeUnitSeconds, tUnitSeconds) + require.Equal(t, "1605051860", tValueSeconds) } func TestPartitionRecords(t *testing.T) { - assertions := assert.New(t) - testDatum := types.Record{ MeasureName: aws.String("Foo"), MeasureValueType: types.MeasureValueTypeDouble, @@ -49,11 +45,11 @@ func TestPartitionRecords(t *testing.T) { twoDatum := []types.Record{testDatum, testDatum} threeDatum := []types.Record{testDatum, testDatum, testDatum} - assertions.Equal([][]types.Record{}, partitionRecords(2, zeroDatum)) - assertions.Equal([][]types.Record{oneDatum}, partitionRecords(2, oneDatum)) - assertions.Equal([][]types.Record{oneDatum}, partitionRecords(2, oneDatum)) - assertions.Equal([][]types.Record{twoDatum}, partitionRecords(2, twoDatum)) - assertions.Equal([][]types.Record{twoDatum, oneDatum}, partitionRecords(2, threeDatum)) + require.Equal(t, [][]types.Record{}, partitionRecords(2, zeroDatum)) + require.Equal(t, [][]types.Record{oneDatum}, partitionRecords(2, oneDatum)) + require.Equal(t, [][]types.Record{oneDatum}, partitionRecords(2, oneDatum)) + require.Equal(t, [][]types.Record{twoDatum}, partitionRecords(2, twoDatum)) + require.Equal(t, [][]types.Record{twoDatum, oneDatum}, partitionRecords(2, threeDatum)) } func TestConvertValueSupported(t *testing.T) { @@ -74,18 +70,16 @@ func TestConvertValueSupported(t *testing.T) { } func TestConvertValueUnsupported(t *testing.T) { - assertions := assert.New(t) _, _, ok := convertValue(time.Date(2020, time.November, 10, 23, 44, 20, 0, time.UTC)) - assertions.False(ok, "Expected unsuccessful conversion") + require.False(t, ok, "Expected unsuccessful conversion") } func testConvertValueSupportedCases(t *testing.T, inputValues []interface{}, outputValues []string, outputValueTypes []types.MeasureValueType) { - assertions := assert.New(t) for i, inputValue := range inputValues { v, vt, ok := convertValue(inputValue) - assertions.Equal(true, ok, "Expected successful conversion") - assertions.Equal(outputValues[i], v, "Expected different string representation of converted value") - assertions.Equal(outputValueTypes[i], vt, "Expected different value type of converted value") + require.Equal(t, true, ok, "Expected successful conversion") + require.Equal(t, outputValues[i], v, "Expected different string representation of converted value") + require.Equal(t, outputValueTypes[i], vt, "Expected different value type of converted value") } } diff --git a/plugins/outputs/timestream/timestream_test.go b/plugins/outputs/timestream/timestream_test.go index be61a06a1..7be25c255 100644 --- a/plugins/outputs/timestream/timestream_test.go +++ b/plugins/outputs/timestream/timestream_test.go @@ -13,11 +13,11 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/timestreamwrite" "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" internalaws "github.com/influxdata/telegraf/config/aws" "github.com/influxdata/telegraf/testutil" - - "github.com/stretchr/testify/assert" ) const tsDbName = "testDb" @@ -49,26 +49,25 @@ func (m *mockTimestreamClient) DescribeDatabase(context.Context, *timestreamwrit } func TestConnectValidatesConfigParameters(t *testing.T) { - assertions := assert.New(t) WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { return &mockTimestreamClient{}, nil } // checking base arguments noDatabaseName := Timestream{Log: testutil.Logger{}} - assertions.Contains(noDatabaseName.Connect().Error(), "DatabaseName") + require.Contains(t, noDatabaseName.Connect().Error(), "DatabaseName") noMappingMode := Timestream{ DatabaseName: tsDbName, Log: testutil.Logger{}, } - assertions.Contains(noMappingMode.Connect().Error(), "MappingMode") + require.Contains(t, noMappingMode.Connect().Error(), "MappingMode") incorrectMappingMode := Timestream{ DatabaseName: tsDbName, MappingMode: "foo", Log: testutil.Logger{}, } - assertions.Contains(incorrectMappingMode.Connect().Error(), "single-table") + require.Contains(t, incorrectMappingMode.Connect().Error(), "single-table") // multi-table arguments validMappingModeMultiTable := Timestream{ @@ -76,7 +75,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) { MappingMode: MappingModeMultiTable, Log: testutil.Logger{}, } - assertions.Nil(validMappingModeMultiTable.Connect()) + require.Nil(t, validMappingModeMultiTable.Connect()) singleTableNameWithMultiTable := Timestream{ DatabaseName: tsDbName, @@ -84,7 +83,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) { SingleTableName: testSingleTableName, Log: testutil.Logger{}, } - assertions.Contains(singleTableNameWithMultiTable.Connect().Error(), "SingleTableName") + require.Contains(t, singleTableNameWithMultiTable.Connect().Error(), "SingleTableName") singleTableDimensionWithMultiTable := Timestream{ DatabaseName: tsDbName, @@ -92,7 +91,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) { SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim, Log: testutil.Logger{}, } - assertions.Contains(singleTableDimensionWithMultiTable.Connect().Error(), + require.Contains(t, singleTableDimensionWithMultiTable.Connect().Error(), "SingleTableDimensionNameForTelegrafMeasurementName") // single-table arguments @@ -101,7 +100,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) { MappingMode: MappingModeSingleTable, Log: testutil.Logger{}, } - assertions.Contains(noTableNameMappingModeSingleTable.Connect().Error(), "SingleTableName") + require.Contains(t, noTableNameMappingModeSingleTable.Connect().Error(), "SingleTableName") noDimensionNameMappingModeSingleTable := Timestream{ DatabaseName: tsDbName, @@ -109,7 +108,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) { SingleTableName: testSingleTableName, Log: testutil.Logger{}, } - assertions.Contains(noDimensionNameMappingModeSingleTable.Connect().Error(), + require.Contains(t, noDimensionNameMappingModeSingleTable.Connect().Error(), "SingleTableDimensionNameForTelegrafMeasurementName") validConfigurationMappingModeSingleTable := Timestream{ @@ -119,7 +118,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) { SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim, Log: testutil.Logger{}, } - assertions.Nil(validConfigurationMappingModeSingleTable.Connect()) + require.Nil(t, validConfigurationMappingModeSingleTable.Connect()) // create table arguments createTableNoMagneticRetention := Timestream{ @@ -128,7 +127,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) { CreateTableIfNotExists: true, Log: testutil.Logger{}, } - assertions.Contains(createTableNoMagneticRetention.Connect().Error(), + require.Contains(t, createTableNoMagneticRetention.Connect().Error(), "CreateTableMagneticStoreRetentionPeriodInDays") createTableNoMemoryRetention := Timestream{ @@ -138,7 +137,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) { CreateTableMagneticStoreRetentionPeriodInDays: 3, Log: testutil.Logger{}, } - assertions.Contains(createTableNoMemoryRetention.Connect().Error(), + require.Contains(t, createTableNoMemoryRetention.Connect().Error(), "CreateTableMemoryStoreRetentionPeriodInHours") createTableValid := Timestream{ @@ -149,7 +148,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) { CreateTableMemoryStoreRetentionPeriodInHours: 3, Log: testutil.Logger{}, } - assertions.Nil(createTableValid.Connect()) + require.Nil(t, createTableValid.Connect()) // describe table on start arguments describeTableInvoked := Timestream{ @@ -158,7 +157,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) { DescribeDatabaseOnStart: true, Log: testutil.Logger{}, } - assertions.Contains(describeTableInvoked.Connect().Error(), "hello from DescribeDatabase") + require.Contains(t, describeTableInvoked.Connect().Error(), "hello from DescribeDatabase") } type mockTimestreamErrorClient struct { @@ -176,7 +175,6 @@ func (m *mockTimestreamErrorClient) DescribeDatabase(context.Context, *timestrea } func TestThrottlingErrorIsReturnedToTelegraf(t *testing.T) { - assertions := assert.New(t) WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { return &mockTimestreamErrorClient{ ErrorToReturnOnWriteRecords: &types.ThrottlingException{Message: aws.String("Throttling Test")}, @@ -188,7 +186,7 @@ func TestThrottlingErrorIsReturnedToTelegraf(t *testing.T) { DatabaseName: tsDbName, Log: testutil.Logger{}, } - assertions.NoError(plugin.Connect()) + require.NoError(t, plugin.Connect()) input := testutil.MustMetric( metricName1, map[string]string{"tag1": "value1"}, @@ -198,12 +196,11 @@ func TestThrottlingErrorIsReturnedToTelegraf(t *testing.T) { err := plugin.Write([]telegraf.Metric{input}) - assertions.NotNil(err, "Expected an error to be returned to Telegraf, "+ + require.NotNil(t, err, "Expected an error to be returned to Telegraf, "+ "so that the write will be retried by Telegraf later.") } func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) { - assertions := assert.New(t) WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { return &mockTimestreamErrorClient{ ErrorToReturnOnWriteRecords: &types.RejectedRecordsException{Message: aws.String("RejectedRecords Test")}, @@ -215,7 +212,7 @@ func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) { DatabaseName: tsDbName, Log: testutil.Logger{}, } - assertions.NoError(plugin.Connect()) + require.NoError(t, plugin.Connect()) input := testutil.MustMetric( metricName1, map[string]string{"tag1": "value1"}, @@ -225,7 +222,7 @@ func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) { err := plugin.Write([]telegraf.Metric{input}) - assertions.Nil(err, "Expected to silently swallow the RejectedRecordsException, "+ + require.Nil(t, err, "Expected to silently swallow the RejectedRecordsException, "+ "as retrying this error doesn't make sense.") } @@ -649,13 +646,11 @@ func comparisonTest(t *testing.T, Log: testutil.Logger{}, } } - assertions := assert.New(t) - result := plugin.TransformMetrics(telegrafMetrics) - assertions.Equal(len(timestreamRecords), len(result), "The number of transformed records was expected to be different") + require.Equal(t, len(timestreamRecords), len(result), "The number of transformed records was expected to be different") for _, tsRecord := range timestreamRecords { - assertions.True(arrayContains(result, tsRecord), "Expected that the list of requests to Timestream: \n%s\n\n "+ + require.True(t, arrayContains(result, tsRecord), "Expected that the list of requests to Timestream: \n%s\n\n "+ "will contain request: \n%s\n\nUsed MappingMode: %s", result, tsRecord, mappingMode) } } diff --git a/plugins/outputs/warp10/warp10.go b/plugins/outputs/warp10/warp10.go index 4d3027b1b..740bb0198 100644 --- a/plugins/outputs/warp10/warp10.go +++ b/plugins/outputs/warp10/warp10.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "io" - "log" "math" "net/http" "net/url" @@ -33,6 +32,7 @@ type Warp10 struct { MaxStringErrorSize int `toml:"max_string_error_size"` client *http.Client tls.ClientConfig + Log telegraf.Logger `toml:"-"` } var sampleConfig = ` @@ -114,7 +114,7 @@ func (w *Warp10) GenWarp10Payload(metrics []telegraf.Metric) string { metricValue, err := buildValue(field.Value) if err != nil { - log.Printf("E! [outputs.warp10] Could not encode value: %v", err) + w.Log.Errorf("Could not encode value: %v", err) continue } metric.Value = metricValue @@ -199,7 +199,7 @@ func buildValue(v interface{}) (string, error) { retv = strconv.FormatInt(math.MaxInt64, 10) } case float64: - retv = floatToString(float64(p)) + retv = floatToString(p) default: return "", fmt.Errorf("unsupported type: %T", v) } diff --git a/plugins/outputs/wavefront/wavefront.go b/plugins/outputs/wavefront/wavefront.go index 3ad4e803b..7049ded52 100644 --- a/plugins/outputs/wavefront/wavefront.go +++ b/plugins/outputs/wavefront/wavefront.go @@ -5,9 +5,10 @@ import ( "regexp" "strings" + wavefront "github.com/wavefronthq/wavefront-sdk-go/senders" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" - wavefront "github.com/wavefronthq/wavefront-sdk-go/senders" ) const maxTagLength = 254 @@ -51,7 +52,7 @@ var strictSanitizedChars = strings.NewReplacer( ) // instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer -var sanitizedRegex = regexp.MustCompile("[^a-zA-Z\\d_.-]") +var sanitizedRegex = regexp.MustCompile(`[^a-zA-Z\d_.-]`) var tagValueReplacer = strings.NewReplacer("*", "-")