diff --git a/plugins/outputs/graphite/graphite.go b/plugins/outputs/graphite/graphite.go index bd35a4203..11a712c36 100644 --- a/plugins/outputs/graphite/graphite.go +++ b/plugins/outputs/graphite/graphite.go @@ -113,7 +113,7 @@ func (g *Graphite) Connect() error { func (g *Graphite) Close() error { // Closing all connections for _, conn := range g.conns { - conn.Close() + _ = conn.Close() } return nil } @@ -133,11 +133,16 @@ func (g *Graphite) Description() string { // props to Tv via the authors of carbon-relay-ng` for this trick. func (g *Graphite) checkEOF(conn net.Conn) { b := make([]byte, 1024) - conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)) + + if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil { + g.Log.Errorf("Couldn't set read deadline for connection %s. closing conn explicitly", conn) + _ = conn.Close() + return + } num, err := conn.Read(b) if err == io.EOF { g.Log.Errorf("Conn %s is closed. closing conn explicitly", conn) - conn.Close() + _ = conn.Close() return } // just in case i misunderstand something or the remote behaves badly @@ -147,7 +152,7 @@ func (g *Graphite) checkEOF(conn net.Conn) { // Log non-timeout errors or close. if e, ok := err.(net.Error); !(ok && e.Timeout()) { g.Log.Errorf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err) - conn.Close() + _ = conn.Close() } } @@ -174,7 +179,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error { // try to reconnect and retry to send if err != nil { g.Log.Error("Graphite: Reconnecting and retrying...") - g.Connect() + _ = g.Connect() err = g.send(batch) } @@ -189,14 +194,14 @@ func (g *Graphite) send(batch []byte) error { p := rand.Perm(len(g.conns)) for _, n := range p { if g.Timeout > 0 { - g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) + _ = g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second)) } g.checkEOF(g.conns[n]) if _, e := g.conns[n].Write(batch); e != nil { // Error g.Log.Errorf("Graphite Error: " + e.Error()) // Close explicitly and let's try the next one - g.conns[n].Close() + _ = g.conns[n].Close() } else { // Success err = nil diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go index 1cb58b194..7f96cb57c 100644 --- a/plugins/outputs/graphite/graphite_test.go +++ b/plugins/outputs/graphite/graphite_test.go @@ -8,13 +8,11 @@ import ( "testing" "time" - "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/testutil" ) func TestGraphiteError(t *testing.T) { @@ -39,7 +37,7 @@ func TestGraphiteError(t *testing.T) { require.NoError(t, err1) err2 := g.Write(metrics) require.Error(t, err2) - assert.Equal(t, "could not write to any Graphite server in cluster", err2.Error()) + require.Equal(t, "could not write to any Graphite server in cluster", err2.Error()) } func TestGraphiteOK(t *testing.T) { @@ -490,9 +488,9 @@ func TCPServer1(t *testing.T, wg *sync.WaitGroup) { reader := bufio.NewReader(conn) tp := textproto.NewReader(reader) data1, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) - conn.Close() - tcpServer.Close() + require.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) + require.NoError(t, conn.Close()) + require.NoError(t, tcpServer.Close()) }() } @@ -504,11 +502,11 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) { reader := bufio.NewReader(conn2) tp := textproto.NewReader(reader) data2, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) + require.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) data3, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3) - conn2.Close() - tcpServer.Close() + require.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3) + require.NoError(t, conn2.Close()) + require.NoError(t, tcpServer.Close()) }() } @@ -520,9 +518,9 @@ func TCPServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) { reader := bufio.NewReader(conn) tp := textproto.NewReader(reader) data1, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1) - conn.Close() - tcpServer.Close() + require.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1) + require.NoError(t, conn.Close()) + require.NoError(t, tcpServer.Close()) }() } @@ -534,11 +532,11 @@ func TCPServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) { reader := bufio.NewReader(conn2) tp := textproto.NewReader(reader) data2, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2) + require.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2) data3, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3) - conn2.Close() - tcpServer.Close() + require.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3) + require.NoError(t, conn2.Close()) + require.NoError(t, tcpServer.Close()) }() } @@ -550,9 +548,9 @@ func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) { reader := bufio.NewReader(conn) tp := textproto.NewReader(reader) data1, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1) - conn.Close() - tcpServer.Close() + require.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1) + require.NoError(t, conn.Close()) + require.NoError(t, tcpServer.Close()) }() } @@ -564,11 +562,11 @@ func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) { reader := bufio.NewReader(conn2) tp := textproto.NewReader(reader) data2, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2) + require.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2) data3, _ := tp.ReadLine() - assert.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3) - conn2.Close() - tcpServer.Close() + require.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3) + require.NoError(t, conn2.Close()) + require.NoError(t, tcpServer.Close()) }() } @@ -580,9 +578,9 @@ func TCPServer1WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) { reader := bufio.NewReader(conn) tp := textproto.NewReader(reader) data1, _ := tp.ReadLine() - assert.Equal(t, "my_prefix_mymeasurement_myfield;host=192.168.0.1 3.14 1289430000", data1) - conn.Close() - tcpServer.Close() + require.Equal(t, "my_prefix_mymeasurement_myfield;host=192.168.0.1 3.14 1289430000", data1) + require.NoError(t, conn.Close()) + require.NoError(t, tcpServer.Close()) }() } @@ -594,10 +592,10 @@ func TCPServer2WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) { reader := bufio.NewReader(conn2) tp := textproto.NewReader(reader) data2, _ := tp.ReadLine() - assert.Equal(t, "my_prefix_mymeasurement;host=192.168.0.1 3.14 1289430000", data2) + require.Equal(t, "my_prefix_mymeasurement;host=192.168.0.1 3.14 1289430000", data2) data3, _ := tp.ReadLine() - assert.Equal(t, "my_prefix_my_measurement;host=192.168.0.1 3.14 1289430000", data3) - conn2.Close() - tcpServer.Close() + require.Equal(t, "my_prefix_my_measurement;host=192.168.0.1 3.14 1289430000", data3) + require.NoError(t, conn2.Close()) + require.NoError(t, tcpServer.Close()) }() } diff --git a/plugins/outputs/graylog/graylog.go b/plugins/outputs/graylog/graylog.go index 16b744f35..b408b6372 100644 --- a/plugins/outputs/graylog/graylog.go +++ b/plugins/outputs/graylog/graylog.go @@ -97,7 +97,10 @@ func newGelfWriter(cfg gelfConfig, dialer *net.Dialer, tlsConfig *tls.Config) ge } func (g *gelfUDP) Write(message []byte) (n int, err error) { - compressed := g.compress(message) + compressed, err := g.compress(message) + if err != nil { + return 0, err + } chunksize := g.gelfConfig.MaxChunkSizeWan length := compressed.Len() @@ -106,10 +109,17 @@ func (g *gelfUDP) Write(message []byte) (n int, err error) { chunkCountInt := int(math.Ceil(float64(length) / float64(chunksize))) id := make([]byte, 8) - rand.Read(id) + _, err = rand.Read(id) + if err != nil { + return 0, err + } for i, index := 0, 0; i < length; i, index = i+chunksize, index+1 { - packet := g.createChunkedMessage(index, chunkCountInt, id, &compressed) + packet, err := g.createChunkedMessage(index, chunkCountInt, id, &compressed) + if err != nil { + return 0, err + } + err = g.send(packet.Bytes()) if err != nil { return 0, err @@ -136,21 +146,40 @@ func (g *gelfUDP) Close() (err error) { return err } -func (g *gelfUDP) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer { +func (g *gelfUDP) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) (bytes.Buffer, error) { var packet bytes.Buffer chunksize := g.getChunksize() - packet.Write(g.intToBytes(30)) - packet.Write(g.intToBytes(15)) - packet.Write(id) + b, err := g.intToBytes(30) + if err != nil { + return packet, err + } + packet.Write(b) //nolint:revive // from buffer.go: "err is always nil" - packet.Write(g.intToBytes(index)) - packet.Write(g.intToBytes(chunkCountInt)) + b, err = g.intToBytes(15) + if err != nil { + return packet, err + } + packet.Write(b) //nolint:revive // from buffer.go: "err is always nil" - packet.Write(compressed.Next(chunksize)) + packet.Write(id) //nolint:revive // from buffer.go: "err is always nil" - return packet + b, err = g.intToBytes(index) + if err != nil { + return packet, err + } + packet.Write(b) //nolint:revive // from buffer.go: "err is always nil" + + b, err = g.intToBytes(chunkCountInt) + if err != nil { + return packet, err + } + packet.Write(b) //nolint:revive // from buffer.go: "err is always nil" + + packet.Write(compressed.Next(chunksize)) //nolint:revive // from buffer.go: "err is always nil" + + return packet, nil } func (g *gelfUDP) getChunksize() int { @@ -165,21 +194,30 @@ func (g *gelfUDP) getChunksize() int { return g.gelfConfig.MaxChunkSizeWan } -func (g *gelfUDP) intToBytes(i int) []byte { +func (g *gelfUDP) intToBytes(i int) ([]byte, error) { buf := new(bytes.Buffer) - binary.Write(buf, binary.LittleEndian, int8(i)) - return buf.Bytes() + err := binary.Write(buf, binary.LittleEndian, int8(i)) + if err != nil { + return nil, err + } + + return buf.Bytes(), err } -func (g *gelfUDP) compress(b []byte) bytes.Buffer { +func (g *gelfUDP) compress(b []byte) (bytes.Buffer, error) { var buf bytes.Buffer comp := zlib.NewWriter(&buf) - comp.Write(b) - comp.Close() + if _, err := comp.Write(b); err != nil { + return bytes.Buffer{}, err + } - return buf + if err := comp.Close(); err != nil { + return bytes.Buffer{}, err + } + + return buf, nil } func (g *gelfUDP) Connect() error { diff --git a/plugins/outputs/graylog/graylog_test.go b/plugins/outputs/graylog/graylog_test.go index a270f279b..e8577fb43 100644 --- a/plugins/outputs/graylog/graylog_test.go +++ b/plugins/outputs/graylog/graylog_test.go @@ -11,11 +11,11 @@ import ( "testing" "time" + reuse "github.com/libp2p/go-reuseport" + "github.com/stretchr/testify/require" + tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/testutil" - reuse "github.com/libp2p/go-reuseport" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestWriteUDP(t *testing.T) { @@ -183,14 +183,14 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, config *Gr var obj GelfObject _ = json.Unmarshal(bufW.Bytes(), &obj) require.NoError(t, err) - assert.Equal(t, obj["short_message"], "telegraf") + require.Equal(t, obj["short_message"], "telegraf") if config.NameFieldNoPrefix { - assert.Equal(t, obj["name"], "test1") + require.Equal(t, obj["name"], "test1") } else { - assert.Equal(t, obj["_name"], "test1") + require.Equal(t, obj["_name"], "test1") } - assert.Equal(t, obj["_tag1"], "value1") - assert.Equal(t, obj["_value"], float64(1)) + require.Equal(t, obj["_tag1"], "value1") + require.Equal(t, obj["_value"], float64(1)) } // in UDP scenario all 4 messages are received @@ -238,10 +238,10 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, wg3 *sync. var obj GelfObject err = json.Unmarshal(bufW.Bytes(), &obj) require.NoError(t, err) - assert.Equal(t, obj["short_message"], "telegraf") - assert.Equal(t, obj["_name"], "test1") - assert.Equal(t, obj["_tag1"], "value1") - assert.Equal(t, obj["_value"], float64(1)) + require.Equal(t, obj["short_message"], "telegraf") + require.Equal(t, obj["_name"], "test1") + require.Equal(t, obj["_tag1"], "value1") + require.Equal(t, obj["_value"], float64(1)) } conn := accept() diff --git a/plugins/outputs/health/health.go b/plugins/outputs/health/health.go index 4541659ce..0782f7be2 100644 --- a/plugins/outputs/health/health.go +++ b/plugins/outputs/health/health.go @@ -208,9 +208,9 @@ func (h *Health) Close() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - h.server.Shutdown(ctx) + err := h.server.Shutdown(ctx) h.wg.Wait() - return nil + return err } // Origin returns the URL of the HTTP server. diff --git a/plugins/outputs/health/health_test.go b/plugins/outputs/health/health_test.go index 03a08fca2..e155a6cba 100644 --- a/plugins/outputs/health/health_test.go +++ b/plugins/outputs/health/health_test.go @@ -6,10 +6,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs/health" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) var pki = testutil.NewPKI("../../../testutil/pki") @@ -119,6 +120,7 @@ func TestHealth(t *testing.T) { resp, err := http.Get(output.Origin()) require.NoError(t, err) + defer resp.Body.Close() require.Equal(t, tt.expectedCode, resp.StatusCode) _, err = io.ReadAll(resp.Body) diff --git a/plugins/outputs/http/http.go b/plugins/outputs/http/http.go index b866c6021..bd261d412 100644 --- a/plugins/outputs/http/http.go +++ b/plugins/outputs/http/http.go @@ -150,7 +150,7 @@ func (h *HTTP) Write(metrics []telegraf.Metric) error { return err } - return h.write(reqBody) + return h.writeMetric(reqBody) } for _, metric := range metrics { @@ -160,14 +160,14 @@ func (h *HTTP) Write(metrics []telegraf.Metric) error { return err } - if err := h.write(reqBody); err != nil { + if err := h.writeMetric(reqBody); err != nil { return err } } return nil } -func (h *HTTP) write(reqBody []byte) error { +func (h *HTTP) writeMetric(reqBody []byte) error { var reqBodyBuffer io.Reader = bytes.NewBuffer(reqBody) var err error diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go index a5fc49b84..5c8488cce 100644 --- a/plugins/outputs/http/http_test.go +++ b/plugins/outputs/http/http_test.go @@ -10,15 +10,16 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" httpconfig "github.com/influxdata/telegraf/plugins/common/http" - oauth "github.com/influxdata/telegraf/plugins/common/oauth" + "github.com/influxdata/telegraf/plugins/common/oauth" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/plugins/serializers/json" - "github.com/stretchr/testify/require" ) func getMetric() telegraf.Metric { @@ -408,7 +409,8 @@ func TestOAuthClientCredentialsGrant(t *testing.T) { values.Add("access_token", token) values.Add("token_type", "bearer") values.Add("expires_in", "3600") - w.Write([]byte(values.Encode())) + _, err = w.Write([]byte(values.Encode())) + require.NoError(t, err) }, handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"]) diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go index ac85814db..992ecf796 100644 --- a/plugins/outputs/influxdb/http.go +++ b/plugins/outputs/influxdb/http.go @@ -456,10 +456,10 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) { return req, nil } -func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) { +func (c *httpClient) makeWriteRequest(address string, body io.Reader) (*http.Request, error) { var err error - req, err := http.NewRequest("POST", url, body) + req, err := http.NewRequest("POST", address, body) if err != nil { return nil, fmt.Errorf("failed creating new request: %s", err.Error()) } diff --git a/plugins/outputs/influxdb/udp.go b/plugins/outputs/influxdb/udp.go index 62848417b..fb629a40d 100644 --- a/plugins/outputs/influxdb/udp.go +++ b/plugins/outputs/influxdb/udp.go @@ -106,7 +106,7 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error _, err = c.conn.Write(scanner.Bytes()) } if err != nil { - c.conn.Close() + _ = c.conn.Close() c.conn = nil return err } diff --git a/plugins/outputs/influxdb/udp_test.go b/plugins/outputs/influxdb/udp_test.go index 25e03f721..dda1f9412 100644 --- a/plugins/outputs/influxdb/udp_test.go +++ b/plugins/outputs/influxdb/udp_test.go @@ -11,11 +11,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/outputs/influxdb" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) var ( @@ -91,7 +92,7 @@ func TestUDP_Simple(t *testing.T) { DialContextF: func(network, address string) (influxdb.Conn, error) { conn := &MockConn{ WriteF: func(b []byte) (n int, err error) { - buffer.Write(b) + buffer.Write(b) //nolint:revive // MockConn with always-success return return 0, nil }, } diff --git a/plugins/outputs/influxdb_v2/http.go b/plugins/outputs/influxdb_v2/http.go index 098ebd9dd..ee2938288 100644 --- a/plugins/outputs/influxdb_v2/http.go +++ b/plugins/outputs/influxdb_v2/http.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "log" "math" "net" "net/http" @@ -55,6 +54,7 @@ type HTTPConfig struct { TLSConfig *tls.Config Serializer *influx.Serializer + Log telegraf.Logger } type httpClient struct { @@ -71,6 +71,7 @@ type httpClient struct { url *url.URL retryTime time.Time retryCount int + log telegraf.Logger } func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { @@ -142,6 +143,7 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) { Bucket: config.Bucket, BucketTag: config.BucketTag, ExcludeBucketTag: config.ExcludeBucketTag, + log: config.Log, } return client, nil } @@ -296,7 +298,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te // Clients should *not* repeat the request and the metrics should be dropped. http.StatusUnprocessableEntity, http.StatusNotAcceptable: - log.Printf("E! [outputs.influxdb_v2] Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc) + c.log.Errorf("Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc) return nil case http.StatusUnauthorized, http.StatusForbidden: return fmt.Errorf("failed to write metric (%s): %s", resp.Status, desc) @@ -308,14 +310,14 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te c.retryCount++ retryDuration := c.getRetryDuration(resp.Header) c.retryTime = time.Now().Add(retryDuration) - log.Printf("W! [outputs.influxdb_v2] Failed to write; will retry in %s. (%s)\n", retryDuration, resp.Status) + c.log.Warnf("Failed to write; will retry in %s. (%s)\n", retryDuration, resp.Status) return fmt.Errorf("waiting %s for server before sending metric again", retryDuration) } // if it's any other 4xx code, the client should not retry as it's the client's mistake. // retrying will not make the request magically work. if len(resp.Status) > 0 && resp.Status[0] == '4' { - log.Printf("E! [outputs.influxdb_v2] Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc) + c.log.Errorf("Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc) return nil } @@ -357,10 +359,10 @@ func (c *httpClient) getRetryDuration(headers http.Header) time.Duration { return time.Duration(retry*1000) * time.Millisecond } -func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) { +func (c *httpClient) makeWriteRequest(address string, body io.Reader) (*http.Request, error) { var err error - req, err := http.NewRequest("POST", url, body) + req, err := http.NewRequest("POST", address, body) if err != nil { return nil, err } diff --git a/plugins/outputs/influxdb_v2/http_internal_test.go b/plugins/outputs/influxdb_v2/http_internal_test.go index 10e2a4e13..96a11324c 100644 --- a/plugins/outputs/influxdb_v2/http_internal_test.go +++ b/plugins/outputs/influxdb_v2/http_internal_test.go @@ -11,8 +11,8 @@ import ( ) func genURL(u string) *url.URL { - URL, _ := url.Parse(u) - return URL + address, _ := url.Parse(u) + return address } func TestMakeWriteURL(t *testing.T) { diff --git a/plugins/outputs/influxdb_v2/http_test.go b/plugins/outputs/influxdb_v2/http_test.go index e44729eec..bce1dfe3d 100644 --- a/plugins/outputs/influxdb_v2/http_test.go +++ b/plugins/outputs/influxdb_v2/http_test.go @@ -9,15 +9,16 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) func genURL(u string) *url.URL { - URL, _ := url.Parse(u) - return URL + address, _ := url.Parse(u) + return address } func TestNewHTTPClient(t *testing.T) { tests := []struct { @@ -60,7 +61,8 @@ func TestWriteBucketTagWorksOnRetry(t *testing.T) { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/api/v2/write": - r.ParseForm() + err := r.ParseForm() + require.NoError(t, err) require.Equal(t, r.Form["bucket"], []string{"foo"}) body, err := io.ReadAll(r.Body) diff --git a/plugins/outputs/influxdb_v2/influxdb.go b/plugins/outputs/influxdb_v2/influxdb.go index e188ddbae..cdaefc41d 100644 --- a/plugins/outputs/influxdb_v2/influxdb.go +++ b/plugins/outputs/influxdb_v2/influxdb.go @@ -170,14 +170,14 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error { return err } -func (i *InfluxDB) getHTTPClient(url *url.URL, proxy *url.URL) (Client, error) { +func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, error) { tlsConfig, err := i.ClientConfig.TLSConfig() if err != nil { return nil, err } - config := &HTTPConfig{ - URL: url, + httpConfig := &HTTPConfig{ + URL: address, Token: i.Token, Organization: i.Organization, Bucket: i.Bucket, @@ -190,11 +190,12 @@ func (i *InfluxDB) getHTTPClient(url *url.URL, proxy *url.URL) (Client, error) { ContentEncoding: i.ContentEncoding, TLSConfig: tlsConfig, Serializer: i.newSerializer(), + Log: i.Log, } - c, err := NewHTTPClient(config) + c, err := NewHTTPClient(httpConfig) if err != nil { - return nil, fmt.Errorf("error creating HTTP client [%s]: %v", url, err) + return nil, fmt.Errorf("error creating HTTP client [%s]: %v", address, err) } return c, nil diff --git a/plugins/outputs/instrumental/instrumental.go b/plugins/outputs/instrumental/instrumental.go index f7158f16f..b0b52a921 100644 --- a/plugins/outputs/instrumental/instrumental.go +++ b/plugins/outputs/instrumental/instrumental.go @@ -75,9 +75,9 @@ func (i *Instrumental) Connect() error { } func (i *Instrumental) Close() error { - i.conn.Close() + err := i.conn.Close() i.conn = nil - return nil + return err } func (i *Instrumental) Write(metrics []telegraf.Metric) error { @@ -138,23 +138,23 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { splitStat := strings.SplitN(stat, " ", 3) name := splitStat[0] value := splitStat[1] - time := splitStat[2] + timestamp := splitStat[2] // replace invalid components of metric name with underscore cleanMetric := MetricNameReplacer.ReplaceAllString(name, "_") if !ValueIncludesBadChar.MatchString(value) { - points = append(points, fmt.Sprintf("%s %s %s %s", metricType, cleanMetric, value, time)) + points = append(points, fmt.Sprintf("%s %s %s %s", metricType, cleanMetric, value, timestamp)) } } } allPoints := strings.Join(points, "") - _, err = fmt.Fprintf(i.conn, allPoints) + _, err = fmt.Fprint(i.conn, allPoints) if err != nil { if err == io.EOF { - i.Close() + _ = i.Close() } return err @@ -163,7 +163,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error { // force the connection closed after sending data // to deal with various disconnection scenarios and eschew holding // open idle connections en masse - i.Close() + _ = i.Close() return nil } diff --git a/plugins/outputs/instrumental/instrumental_test.go b/plugins/outputs/instrumental/instrumental_test.go index f72b9e90f..b55c6b33d 100644 --- a/plugins/outputs/instrumental/instrumental_test.go +++ b/plugins/outputs/instrumental/instrumental_test.go @@ -8,9 +8,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" - "github.com/stretchr/testify/assert" ) func TestWrite(t *testing.T) { @@ -39,7 +40,8 @@ func TestWrite(t *testing.T) { ) metrics := []telegraf.Metric{m1, m2} - i.Write(metrics) + err := i.Write(metrics) + require.NoError(t, err) // Counter and Histogram are increments m3 := metric.New( @@ -70,7 +72,8 @@ func TestWrite(t *testing.T) { ) metrics = []telegraf.Metric{m3, m4, m5, m6} - i.Write(metrics) + err = i.Write(metrics) + require.NoError(t, err) wg.Wait() } @@ -80,44 +83,49 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) { go func() { defer wg.Done() conn, _ := tcpServer.Accept() - conn.SetDeadline(time.Now().Add(1 * time.Second)) + err := conn.SetDeadline(time.Now().Add(1 * time.Second)) + require.NoError(t, err) reader := bufio.NewReader(conn) tp := textproto.NewReader(reader) hello, _ := tp.ReadLine() - assert.Equal(t, "hello version go/telegraf/1.1", hello) + require.Equal(t, "hello version go/telegraf/1.1", hello) auth, _ := tp.ReadLine() - assert.Equal(t, "authenticate abc123token", auth) - conn.Write([]byte("ok\nok\n")) + require.Equal(t, "authenticate abc123token", auth) + _, err = conn.Write([]byte("ok\nok\n")) + require.NoError(t, err) data1, _ := tp.ReadLine() - assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) + require.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1) data2, _ := tp.ReadLine() - assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) + require.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2) conn, _ = tcpServer.Accept() - conn.SetDeadline(time.Now().Add(1 * time.Second)) + err = conn.SetDeadline(time.Now().Add(1 * time.Second)) + require.NoError(t, err) reader = bufio.NewReader(conn) tp = textproto.NewReader(reader) hello, _ = tp.ReadLine() - assert.Equal(t, "hello version go/telegraf/1.1", hello) + require.Equal(t, "hello version go/telegraf/1.1", hello) auth, _ = tp.ReadLine() - assert.Equal(t, "authenticate abc123token", auth) - conn.Write([]byte("ok\nok\n")) + require.Equal(t, "authenticate abc123token", auth) + _, err = conn.Write([]byte("ok\nok\n")) + require.NoError(t, err) data3, _ := tp.ReadLine() - assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3) + require.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3) data4, _ := tp.ReadLine() - assert.Equal(t, "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000", data4) + require.Equal(t, "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000", data4) data5, _ := tp.ReadLine() - assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data5) + require.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data5) data6, _ := tp.ReadLine() - assert.Equal(t, "", data6) + require.Equal(t, "", data6) - conn.Close() + err = conn.Close() + require.NoError(t, err) }() } diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 297242700..90fd7259e 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -8,6 +8,7 @@ import ( "github.com/Shopify/sarama" "github.com/gofrs/uuid" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/common/kafka" "github.com/influxdata/telegraf/plugins/outputs" @@ -228,7 +229,7 @@ func ValidateTopicSuffixMethod(method string) error { return nil } } - return fmt.Errorf("Unknown topic suffix method provided: %s", method) + return fmt.Errorf("unknown topic suffix method provided: %s", method) } func (k *Kafka) GetTopicName(metric telegraf.Metric) (telegraf.Metric, string) { @@ -379,7 +380,7 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error { k.Log.Error("The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; dropping batch") return nil } - return prodErr + return prodErr //nolint:staticcheck // Return first error encountered } } return err diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index 0edaed31f..c7fcc19e6 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -5,11 +5,12 @@ import ( "time" "github.com/Shopify/sarama" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/require" ) type topicSuffixTestpair struct { @@ -50,10 +51,10 @@ func TestTopicSuffixesIntegration(t *testing.T) { topic := "Test" - metric := testutil.TestMetric(1) + m := testutil.TestMetric(1) metricTagName := "tag1" - metricTagValue := metric.Tags()[metricTagName] - metricName := metric.Name() + metricTagValue := m.Tags()[metricTagName] + metricName := m.Name() var testcases = []topicSuffixTestpair{ // This ensures empty separator is okay @@ -85,7 +86,7 @@ func TestTopicSuffixesIntegration(t *testing.T) { TopicSuffix: topicSuffix, } - _, topic := k.GetTopicName(metric) + _, topic := k.GetTopicName(m) require.Equal(t, expectedTopic, topic) } } diff --git a/plugins/outputs/kinesis/kinesis_test.go b/plugins/outputs/kinesis/kinesis_test.go index 89724ef18..ef2481b60 100644 --- a/plugins/outputs/kinesis/kinesis_test.go +++ b/plugins/outputs/kinesis/kinesis_test.go @@ -9,12 +9,12 @@ import ( "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/gofrs/uuid" + "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) const testPartitionKey = "partitionKey" @@ -24,7 +24,6 @@ const testStreamName = "streamName" const zero int64 = 0 func TestPartitionKey(t *testing.T) { - assert := assert.New(t) testPoint := testutil.TestMetric(1) k := KinesisOutput{ @@ -34,7 +33,7 @@ func TestPartitionKey(t *testing.T) { Key: "-", }, } - assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'") + require.Equal(t, "-", k.getPartitionKey(testPoint), "PartitionKey should be '-'") k = KinesisOutput{ Log: testutil.Logger{}, @@ -43,7 +42,7 @@ func TestPartitionKey(t *testing.T) { Key: "tag1", }, } - assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'") + require.Equal(t, testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'") k = KinesisOutput{ Log: testutil.Logger{}, @@ -53,7 +52,7 @@ func TestPartitionKey(t *testing.T) { Default: "somedefault", }, } - assert.Equal("somedefault", k.getPartitionKey(testPoint), "PartitionKey should use default") + require.Equal(t, "somedefault", k.getPartitionKey(testPoint), "PartitionKey should use default") k = KinesisOutput{ Log: testutil.Logger{}, @@ -62,7 +61,7 @@ func TestPartitionKey(t *testing.T) { Key: "doesnotexist", }, } - assert.Equal("telegraf", k.getPartitionKey(testPoint), "PartitionKey should be telegraf") + require.Equal(t, "telegraf", k.getPartitionKey(testPoint), "PartitionKey should be telegraf") k = KinesisOutput{ Log: testutil.Logger{}, @@ -70,7 +69,7 @@ func TestPartitionKey(t *testing.T) { Method: "not supported", }, } - assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''") + require.Equal(t, "", k.getPartitionKey(testPoint), "PartitionKey should be value of ''") k = KinesisOutput{ Log: testutil.Logger{}, @@ -78,7 +77,7 @@ func TestPartitionKey(t *testing.T) { Method: "measurement", }, } - assert.Equal(testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name") + require.Equal(t, testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name") k = KinesisOutput{ Log: testutil.Logger{}, @@ -88,14 +87,14 @@ func TestPartitionKey(t *testing.T) { } partitionKey := k.getPartitionKey(testPoint) u, err := uuid.FromString(partitionKey) - assert.Nil(err, "Issue parsing UUID") - assert.Equal(byte(4), u.Version(), "PartitionKey should be UUIDv4") + require.NoError(t, err, "Issue parsing UUID") + require.Equal(t, byte(4), u.Version(), "PartitionKey should be UUIDv4") k = KinesisOutput{ Log: testutil.Logger{}, PartitionKey: "-", } - assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'") + require.Equal(t, "-", k.getPartitionKey(testPoint), "PartitionKey should be '-'") k = KinesisOutput{ Log: testutil.Logger{}, @@ -103,13 +102,11 @@ func TestPartitionKey(t *testing.T) { } partitionKey = k.getPartitionKey(testPoint) u, err = uuid.FromString(partitionKey) - assert.Nil(err, "Issue parsing UUID") - assert.Equal(byte(4), u.Version(), "PartitionKey should be UUIDv4") + require.NoError(t, err, "Issue parsing UUID") + require.Equal(t, byte(4), u.Version(), "PartitionKey should be UUIDv4") } func TestWriteKinesis_WhenSuccess(t *testing.T) { - assert := assert.New(t) - records := []types.PutRecordsRequestEntry{ { PartitionKey: aws.String(testPartitionKey), @@ -135,7 +132,7 @@ func TestWriteKinesis_WhenSuccess(t *testing.T) { } elapsed := k.writeKinesis(records) - assert.GreaterOrEqual(elapsed.Nanoseconds(), zero) + require.GreaterOrEqual(t, elapsed.Nanoseconds(), zero) svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { @@ -146,8 +143,6 @@ func TestWriteKinesis_WhenSuccess(t *testing.T) { } func TestWriteKinesis_WhenRecordErrors(t *testing.T) { - assert := assert.New(t) - records := []types.PutRecordsRequestEntry{ { PartitionKey: aws.String(testPartitionKey), @@ -173,7 +168,7 @@ func TestWriteKinesis_WhenRecordErrors(t *testing.T) { } elapsed := k.writeKinesis(records) - assert.GreaterOrEqual(elapsed.Nanoseconds(), zero) + require.GreaterOrEqual(t, elapsed.Nanoseconds(), zero) svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { @@ -184,8 +179,6 @@ func TestWriteKinesis_WhenRecordErrors(t *testing.T) { } func TestWriteKinesis_WhenServiceError(t *testing.T) { - assert := assert.New(t) - records := []types.PutRecordsRequestEntry{ { PartitionKey: aws.String(testPartitionKey), @@ -205,7 +198,7 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) { } elapsed := k.writeKinesis(records) - assert.GreaterOrEqual(elapsed.Nanoseconds(), zero) + require.GreaterOrEqual(t, elapsed.Nanoseconds(), zero) svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { @@ -216,7 +209,6 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) { } func TestWrite_NoMetrics(t *testing.T) { - assert := assert.New(t) serializer := influx.NewSerializer() svc := &mockKinesisPutRecords{} @@ -232,13 +224,12 @@ func TestWrite_NoMetrics(t *testing.T) { } err := k.Write([]telegraf.Metric{}) - assert.Nil(err, "Should not return error") + require.NoError(t, err, "Should not return error") svc.AssertRequests(t, []*kinesis.PutRecordsInput{}) } func TestWrite_SingleMetric(t *testing.T) { - assert := assert.New(t) serializer := influx.NewSerializer() svc := &mockKinesisPutRecords{} @@ -257,7 +248,7 @@ func TestWrite_SingleMetric(t *testing.T) { metric, metricData := createTestMetric(t, "metric1", serializer) err := k.Write([]telegraf.Metric{metric}) - assert.Nil(err, "Should not return error") + require.NoError(t, err, "Should not return error") svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { @@ -273,7 +264,6 @@ func TestWrite_SingleMetric(t *testing.T) { } func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) { - assert := assert.New(t) serializer := influx.NewSerializer() svc := &mockKinesisPutRecords{} @@ -292,7 +282,7 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) { metrics, metricsData := createTestMetrics(t, 3, serializer) err := k.Write(metrics) - assert.Nil(err, "Should not return error") + require.NoError(t, err, "Should not return error") svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { @@ -305,7 +295,6 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) { } func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) { - assert := assert.New(t) serializer := influx.NewSerializer() svc := &mockKinesisPutRecords{} @@ -324,7 +313,7 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) { metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest, serializer) err := k.Write(metrics) - assert.Nil(err, "Should not return error") + require.NoError(t, err, "Should not return error") svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { @@ -337,7 +326,6 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) { } func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) { - assert := assert.New(t) serializer := influx.NewSerializer() svc := &mockKinesisPutRecords{} @@ -357,7 +345,7 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) { metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest+1, serializer) err := k.Write(metrics) - assert.Nil(err, "Should not return error") + require.NoError(t, err, "Should not return error") svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { @@ -376,7 +364,6 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) { } func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) { - assert := assert.New(t) serializer := influx.NewSerializer() svc := &mockKinesisPutRecords{} @@ -396,7 +383,7 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) { metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest*2, serializer) err := k.Write(metrics) - assert.Nil(err, "Should not return error") + require.NoError(t, err, "Should not return error") svc.AssertRequests(t, []*kinesis.PutRecordsInput{ { @@ -415,7 +402,6 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) { } func TestWrite_SerializerError(t *testing.T) { - assert := assert.New(t) serializer := influx.NewSerializer() svc := &mockKinesisPutRecords{} @@ -443,7 +429,7 @@ func TestWrite_SerializerError(t *testing.T) { invalidMetric, metric2, }) - assert.Nil(err, "Should not return error") + require.NoError(t, err, "Should not return error") // remaining valid metrics should still get written svc.AssertRequests(t, []*kinesis.PutRecordsInput{ @@ -519,7 +505,7 @@ func (m *mockKinesisPutRecords) SetupErrorResponse(err error) { func (m *mockKinesisPutRecords) PutRecords(_ context.Context, input *kinesis.PutRecordsInput, _ ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error) { reqNum := len(m.requests) if reqNum > len(m.responses) { - return nil, fmt.Errorf("Response for request %+v not setup", reqNum) + return nil, fmt.Errorf("response for request %+v not setup", reqNum) } m.requests = append(m.requests, input) diff --git a/plugins/outputs/librato/librato.go b/plugins/outputs/librato/librato.go index dc1e9b6fa..ff3e59901 100644 --- a/plugins/outputs/librato/librato.go +++ b/plugins/outputs/librato/librato.go @@ -118,56 +118,64 @@ func (l *Librato) Write(metrics []telegraf.Metric) error { // make sur we send a batch of maximum 300 sizeBatch := 300 for start := 0; start < metricCounter; start += sizeBatch { - lmetrics := LMetrics{} - end := start + sizeBatch - if end > metricCounter { - end = metricCounter - sizeBatch = end - start - } - lmetrics.Gauges = make([]*Gauge, sizeBatch) - copy(lmetrics.Gauges, tempGauges[start:end]) - metricsBytes, err := json.Marshal(lmetrics) + err := l.writeBatch(start, sizeBatch, metricCounter, tempGauges) if err != nil { - return fmt.Errorf("unable to marshal Metrics, %s", err.Error()) - } - - l.Log.Debugf("Librato request: %v", string(metricsBytes)) - - req, err := http.NewRequest( - "POST", - l.APIUrl, - bytes.NewBuffer(metricsBytes)) - if err != nil { - return fmt.Errorf("unable to create http.Request, %s", err.Error()) - } - req.Header.Add("Content-Type", "application/json") - req.SetBasicAuth(l.APIUser, l.APIToken) - - resp, err := l.client.Do(req) - if err != nil { - l.Log.Debugf("Error POSTing metrics: %v", err.Error()) - return fmt.Errorf("error POSTing metrics, %s", err.Error()) - } - defer resp.Body.Close() - - if resp.StatusCode != 200 || l.Debug { - htmlData, err := io.ReadAll(resp.Body) - if err != nil { - l.Log.Debugf("Couldn't get response! (%v)", err) - } - if resp.StatusCode != 200 { - return fmt.Errorf( - "received bad status code, %d\n %s", - resp.StatusCode, - string(htmlData)) - } - l.Log.Debugf("Librato response: %v", string(htmlData)) + return err } } return nil } +func (l *Librato) writeBatch(start int, sizeBatch int, metricCounter int, tempGauges []*Gauge) error { + lmetrics := LMetrics{} + end := start + sizeBatch + if end > metricCounter { + end = metricCounter + sizeBatch = end - start + } + lmetrics.Gauges = make([]*Gauge, sizeBatch) + copy(lmetrics.Gauges, tempGauges[start:end]) + metricsBytes, err := json.Marshal(lmetrics) + if err != nil { + return fmt.Errorf("unable to marshal Metrics, %s", err.Error()) + } + + l.Log.Debugf("Librato request: %v", string(metricsBytes)) + + req, err := http.NewRequest( + "POST", + l.APIUrl, + bytes.NewBuffer(metricsBytes)) + if err != nil { + return fmt.Errorf("unable to create http.Request, %s", err.Error()) + } + req.Header.Add("Content-Type", "application/json") + req.SetBasicAuth(l.APIUser, l.APIToken) + + resp, err := l.client.Do(req) + if err != nil { + l.Log.Debugf("Error POSTing metrics: %v", err.Error()) + return fmt.Errorf("error POSTing metrics, %s", err.Error()) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 || l.Debug { + htmlData, err := io.ReadAll(resp.Body) + if err != nil { + l.Log.Debugf("Couldn't get response! (%v)", err) + } + if resp.StatusCode != 200 { + return fmt.Errorf( + "received bad status code, %d\n %s", + resp.StatusCode, + string(htmlData)) + } + l.Log.Debugf("Librato response: %v", string(htmlData)) + } + return nil +} + // SampleConfig is function who return the default configuration for this // output func (l *Librato) SampleConfig() string { @@ -219,8 +227,9 @@ func verifyValue(v interface{}) bool { switch v.(type) { case string: return false + default: + return true } - return true } func (g *Gauge) setValue(v interface{}) error { @@ -230,7 +239,7 @@ func (g *Gauge) setValue(v interface{}) error { case uint64: g.Value = float64(d) case float64: - g.Value = float64(d) + g.Value = d case bool: if d { g.Value = float64(1.0) diff --git a/plugins/outputs/loki/loki.go b/plugins/outputs/loki/loki.go index fcf96e55f..c3787e952 100644 --- a/plugins/outputs/loki/loki.go +++ b/plugins/outputs/loki/loki.go @@ -11,13 +11,14 @@ import ( "strings" "time" + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" - "golang.org/x/oauth2" - "golang.org/x/oauth2/clientcredentials" ) const ( @@ -126,7 +127,7 @@ func (l *Loki) Connect() (err error) { return fmt.Errorf("http client fail: %w", err) } - return + return nil } func (l *Loki) Close() error { @@ -155,10 +156,10 @@ func (l *Loki) Write(metrics []telegraf.Metric) error { s.insertLog(tags, Log{fmt.Sprintf("%d", m.Time().UnixNano()), line}) } - return l.write(s) + return l.writeMetrics(s) } -func (l *Loki) write(s Streams) error { +func (l *Loki) writeMetrics(s Streams) error { bs, err := json.Marshal(s) if err != nil { return fmt.Errorf("json.Marshal: %w", err) diff --git a/plugins/outputs/loki/loki_test.go b/plugins/outputs/loki/loki_test.go index 6f0678e8d..3050f7acb 100644 --- a/plugins/outputs/loki/loki_test.go +++ b/plugins/outputs/loki/loki_test.go @@ -11,11 +11,11 @@ import ( "testing" "time" - "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" - "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/testutil" ) func getMetric() telegraf.Metric { @@ -329,7 +329,8 @@ func TestOAuthClientCredentialsGrant(t *testing.T) { values.Add("access_token", token) values.Add("token_type", "bearer") values.Add("expires_in", "3600") - w.Write([]byte(values.Encode())) + _, err = w.Write([]byte(values.Encode())) + require.NoError(t, err) }, handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"]) diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 54203ee0d..20c4885fa 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -2,12 +2,12 @@ package mqtt import ( "fmt" - "log" "strings" "sync" "time" paho "github.com/eclipse/paho.mqtt.golang" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" @@ -82,9 +82,10 @@ type MQTT struct { QoS int `toml:"qos"` ClientID string `toml:"client_id"` tls.ClientConfig - BatchMessage bool `toml:"batch"` - Retain bool `toml:"retain"` - KeepAlive int64 `toml:"keep_alive"` + BatchMessage bool `toml:"batch"` + Retain bool `toml:"retain"` + KeepAlive int64 `toml:"keep_alive"` + Log telegraf.Logger `toml:"-"` client paho.Client opts *paho.ClientOptions @@ -164,13 +165,13 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { } else { buf, err := m.serializer.Serialize(metric) if err != nil { - log.Printf("D! [outputs.mqtt] Could not serialize metric: %v", err) + m.Log.Debugf("Could not serialize metric: %v", err) continue } err = m.publish(topic, buf) if err != nil { - return fmt.Errorf("Could not write to MQTT server, %s", err) + return fmt.Errorf("could not write to MQTT server, %s", err) } } } @@ -183,7 +184,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { } publisherr := m.publish(key, buf) if publisherr != nil { - return fmt.Errorf("Could not write to MQTT server, %s", publisherr) + return fmt.Errorf("could not write to MQTT server, %s", publisherr) } } diff --git a/plugins/outputs/nats/nats.go b/plugins/outputs/nats/nats.go index f4cf35b16..9f7780eea 100644 --- a/plugins/outputs/nats/nats.go +++ b/plugins/outputs/nats/nats.go @@ -2,14 +2,14 @@ package nats import ( "fmt" - "log" "strings" + "github.com/nats-io/nats.go" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" - "github.com/nats-io/nats.go" ) type NATS struct { @@ -23,6 +23,8 @@ type NATS struct { tls.ClientConfig + Log telegraf.Logger `toml:"-"` + conn *nats.Conn serializer serializers.Serializer } @@ -121,7 +123,7 @@ func (n *NATS) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { buf, err := n.serializer.Serialize(metric) if err != nil { - log.Printf("D! [outputs.nats] Could not serialize metric: %v", err) + n.Log.Debugf("Could not serialize metric: %v", err) continue } diff --git a/plugins/outputs/newrelic/newrelic.go b/plugins/outputs/newrelic/newrelic.go index 02b2b9c3f..5290d4e6a 100644 --- a/plugins/outputs/newrelic/newrelic.go +++ b/plugins/outputs/newrelic/newrelic.go @@ -8,11 +8,12 @@ import ( "net/url" "time" + "github.com/newrelic/newrelic-telemetry-sdk-go/cumulative" + "github.com/newrelic/newrelic-telemetry-sdk-go/telemetry" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/newrelic/newrelic-telemetry-sdk-go/cumulative" - "github.com/newrelic/newrelic-telemetry-sdk-go/telemetry" ) // NewRelic nr structure @@ -27,7 +28,7 @@ type NewRelic struct { dc *cumulative.DeltaCalculator savedErrors map[int]interface{} errorCount int - client http.Client `toml:"-"` + client http.Client } // Description returns a one-sentence description on the Output diff --git a/plugins/outputs/nsq/nsq.go b/plugins/outputs/nsq/nsq.go index a9e2d94ac..6d719d0a0 100644 --- a/plugins/outputs/nsq/nsq.go +++ b/plugins/outputs/nsq/nsq.go @@ -2,19 +2,20 @@ package nsq import ( "fmt" - "log" + + "github.com/nsqio/go-nsq" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/serializers" - "github.com/nsqio/go-nsq" ) type NSQ struct { - Server string - Topic string - producer *nsq.Producer + Server string + Topic string + Log telegraf.Logger `toml:"-"` + producer *nsq.Producer serializer serializers.Serializer } @@ -68,13 +69,13 @@ func (n *NSQ) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { buf, err := n.serializer.Serialize(metric) if err != nil { - log.Printf("D! [outputs.nsq] Could not serialize metric: %v", err) + n.Log.Debugf("Could not serialize metric: %v", err) continue } err = n.producer.Publish(n.Topic, buf) if err != nil { - return fmt.Errorf("FAILED to send NSQD message: %s", err) + return fmt.Errorf("failed to send NSQD message: %s", err) } } return nil