fix: Linter fixes for plugins/outputs/[g-m]* (#10127)
Co-authored-by: Pawel Zak <Pawel Zak>
This commit is contained in:
parent
2b1a79f327
commit
020b77b239
|
|
@ -113,7 +113,7 @@ func (g *Graphite) Connect() error {
|
||||||
func (g *Graphite) Close() error {
|
func (g *Graphite) Close() error {
|
||||||
// Closing all connections
|
// Closing all connections
|
||||||
for _, conn := range g.conns {
|
for _, conn := range g.conns {
|
||||||
conn.Close()
|
_ = conn.Close()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -133,11 +133,16 @@ func (g *Graphite) Description() string {
|
||||||
// props to Tv via the authors of carbon-relay-ng` for this trick.
|
// props to Tv via the authors of carbon-relay-ng` for this trick.
|
||||||
func (g *Graphite) checkEOF(conn net.Conn) {
|
func (g *Graphite) checkEOF(conn net.Conn) {
|
||||||
b := make([]byte, 1024)
|
b := make([]byte, 1024)
|
||||||
conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond))
|
|
||||||
|
if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil {
|
||||||
|
g.Log.Errorf("Couldn't set read deadline for connection %s. closing conn explicitly", conn)
|
||||||
|
_ = conn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
num, err := conn.Read(b)
|
num, err := conn.Read(b)
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
g.Log.Errorf("Conn %s is closed. closing conn explicitly", conn)
|
g.Log.Errorf("Conn %s is closed. closing conn explicitly", conn)
|
||||||
conn.Close()
|
_ = conn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// just in case i misunderstand something or the remote behaves badly
|
// just in case i misunderstand something or the remote behaves badly
|
||||||
|
|
@ -147,7 +152,7 @@ func (g *Graphite) checkEOF(conn net.Conn) {
|
||||||
// Log non-timeout errors or close.
|
// Log non-timeout errors or close.
|
||||||
if e, ok := err.(net.Error); !(ok && e.Timeout()) {
|
if e, ok := err.(net.Error); !(ok && e.Timeout()) {
|
||||||
g.Log.Errorf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
|
g.Log.Errorf("conn %s checkEOF .conn.Read returned err != EOF, which is unexpected. closing conn. error: %s", conn, err)
|
||||||
conn.Close()
|
_ = conn.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -174,7 +179,7 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
|
||||||
// try to reconnect and retry to send
|
// try to reconnect and retry to send
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log.Error("Graphite: Reconnecting and retrying...")
|
g.Log.Error("Graphite: Reconnecting and retrying...")
|
||||||
g.Connect()
|
_ = g.Connect()
|
||||||
err = g.send(batch)
|
err = g.send(batch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -189,14 +194,14 @@ func (g *Graphite) send(batch []byte) error {
|
||||||
p := rand.Perm(len(g.conns))
|
p := rand.Perm(len(g.conns))
|
||||||
for _, n := range p {
|
for _, n := range p {
|
||||||
if g.Timeout > 0 {
|
if g.Timeout > 0 {
|
||||||
g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
|
_ = g.conns[n].SetWriteDeadline(time.Now().Add(time.Duration(g.Timeout) * time.Second))
|
||||||
}
|
}
|
||||||
g.checkEOF(g.conns[n])
|
g.checkEOF(g.conns[n])
|
||||||
if _, e := g.conns[n].Write(batch); e != nil {
|
if _, e := g.conns[n].Write(batch); e != nil {
|
||||||
// Error
|
// Error
|
||||||
g.Log.Errorf("Graphite Error: " + e.Error())
|
g.Log.Errorf("Graphite Error: " + e.Error())
|
||||||
// Close explicitly and let's try the next one
|
// Close explicitly and let's try the next one
|
||||||
g.conns[n].Close()
|
_ = g.conns[n].Close()
|
||||||
} else {
|
} else {
|
||||||
// Success
|
// Success
|
||||||
err = nil
|
err = nil
|
||||||
|
|
|
||||||
|
|
@ -8,13 +8,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGraphiteError(t *testing.T) {
|
func TestGraphiteError(t *testing.T) {
|
||||||
|
|
@ -39,7 +37,7 @@ func TestGraphiteError(t *testing.T) {
|
||||||
require.NoError(t, err1)
|
require.NoError(t, err1)
|
||||||
err2 := g.Write(metrics)
|
err2 := g.Write(metrics)
|
||||||
require.Error(t, err2)
|
require.Error(t, err2)
|
||||||
assert.Equal(t, "could not write to any Graphite server in cluster", err2.Error())
|
require.Equal(t, "could not write to any Graphite server in cluster", err2.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGraphiteOK(t *testing.T) {
|
func TestGraphiteOK(t *testing.T) {
|
||||||
|
|
@ -490,9 +488,9 @@ func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
|
||||||
reader := bufio.NewReader(conn)
|
reader := bufio.NewReader(conn)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
data1, _ := tp.ReadLine()
|
data1, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
|
require.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
|
||||||
conn.Close()
|
require.NoError(t, conn.Close())
|
||||||
tcpServer.Close()
|
require.NoError(t, tcpServer.Close())
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -504,11 +502,11 @@ func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
|
||||||
reader := bufio.NewReader(conn2)
|
reader := bufio.NewReader(conn2)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
data2, _ := tp.ReadLine()
|
data2, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
|
require.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
|
||||||
data3, _ := tp.ReadLine()
|
data3, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3)
|
require.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3)
|
||||||
conn2.Close()
|
require.NoError(t, conn2.Close())
|
||||||
tcpServer.Close()
|
require.NoError(t, tcpServer.Close())
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -520,9 +518,9 @@ func TCPServer1WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
|
||||||
reader := bufio.NewReader(conn)
|
reader := bufio.NewReader(conn)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
data1, _ := tp.ReadLine()
|
data1, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1)
|
require.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1.myfield 3.14 1289430000", data1)
|
||||||
conn.Close()
|
require.NoError(t, conn.Close())
|
||||||
tcpServer.Close()
|
require.NoError(t, tcpServer.Close())
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -534,11 +532,11 @@ func TCPServer2WithMultipleTemplates(t *testing.T, wg *sync.WaitGroup) {
|
||||||
reader := bufio.NewReader(conn2)
|
reader := bufio.NewReader(conn2)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
data2, _ := tp.ReadLine()
|
data2, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2)
|
require.Equal(t, "my.prefix.mymeasurement.valuetag.192_168_0_1 3.14 1289430000", data2)
|
||||||
data3, _ := tp.ReadLine()
|
data3, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3)
|
require.Equal(t, "my.prefix.192_168_0_1.my_measurement.valuetag 3.14 1289430000", data3)
|
||||||
conn2.Close()
|
require.NoError(t, conn2.Close())
|
||||||
tcpServer.Close()
|
require.NoError(t, tcpServer.Close())
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -550,9 +548,9 @@ func TCPServer1WithTags(t *testing.T, wg *sync.WaitGroup) {
|
||||||
reader := bufio.NewReader(conn)
|
reader := bufio.NewReader(conn)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
data1, _ := tp.ReadLine()
|
data1, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1)
|
require.Equal(t, "my.prefix.mymeasurement.myfield;host=192.168.0.1 3.14 1289430000", data1)
|
||||||
conn.Close()
|
require.NoError(t, conn.Close())
|
||||||
tcpServer.Close()
|
require.NoError(t, tcpServer.Close())
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -564,11 +562,11 @@ func TCPServer2WithTags(t *testing.T, wg *sync.WaitGroup) {
|
||||||
reader := bufio.NewReader(conn2)
|
reader := bufio.NewReader(conn2)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
data2, _ := tp.ReadLine()
|
data2, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
|
require.Equal(t, "my.prefix.mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
|
||||||
data3, _ := tp.ReadLine()
|
data3, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3)
|
require.Equal(t, "my.prefix.my_measurement;host=192.168.0.1 3.14 1289430000", data3)
|
||||||
conn2.Close()
|
require.NoError(t, conn2.Close())
|
||||||
tcpServer.Close()
|
require.NoError(t, tcpServer.Close())
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -580,9 +578,9 @@ func TCPServer1WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) {
|
||||||
reader := bufio.NewReader(conn)
|
reader := bufio.NewReader(conn)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
data1, _ := tp.ReadLine()
|
data1, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my_prefix_mymeasurement_myfield;host=192.168.0.1 3.14 1289430000", data1)
|
require.Equal(t, "my_prefix_mymeasurement_myfield;host=192.168.0.1 3.14 1289430000", data1)
|
||||||
conn.Close()
|
require.NoError(t, conn.Close())
|
||||||
tcpServer.Close()
|
require.NoError(t, tcpServer.Close())
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -594,10 +592,10 @@ func TCPServer2WithTagsSeparatorUnderscore(t *testing.T, wg *sync.WaitGroup) {
|
||||||
reader := bufio.NewReader(conn2)
|
reader := bufio.NewReader(conn2)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
data2, _ := tp.ReadLine()
|
data2, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my_prefix_mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
|
require.Equal(t, "my_prefix_mymeasurement;host=192.168.0.1 3.14 1289430000", data2)
|
||||||
data3, _ := tp.ReadLine()
|
data3, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "my_prefix_my_measurement;host=192.168.0.1 3.14 1289430000", data3)
|
require.Equal(t, "my_prefix_my_measurement;host=192.168.0.1 3.14 1289430000", data3)
|
||||||
conn2.Close()
|
require.NoError(t, conn2.Close())
|
||||||
tcpServer.Close()
|
require.NoError(t, tcpServer.Close())
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -97,7 +97,10 @@ func newGelfWriter(cfg gelfConfig, dialer *net.Dialer, tlsConfig *tls.Config) ge
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *gelfUDP) Write(message []byte) (n int, err error) {
|
func (g *gelfUDP) Write(message []byte) (n int, err error) {
|
||||||
compressed := g.compress(message)
|
compressed, err := g.compress(message)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
chunksize := g.gelfConfig.MaxChunkSizeWan
|
chunksize := g.gelfConfig.MaxChunkSizeWan
|
||||||
length := compressed.Len()
|
length := compressed.Len()
|
||||||
|
|
@ -106,10 +109,17 @@ func (g *gelfUDP) Write(message []byte) (n int, err error) {
|
||||||
chunkCountInt := int(math.Ceil(float64(length) / float64(chunksize)))
|
chunkCountInt := int(math.Ceil(float64(length) / float64(chunksize)))
|
||||||
|
|
||||||
id := make([]byte, 8)
|
id := make([]byte, 8)
|
||||||
rand.Read(id)
|
_, err = rand.Read(id)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
for i, index := 0, 0; i < length; i, index = i+chunksize, index+1 {
|
for i, index := 0, 0; i < length; i, index = i+chunksize, index+1 {
|
||||||
packet := g.createChunkedMessage(index, chunkCountInt, id, &compressed)
|
packet, err := g.createChunkedMessage(index, chunkCountInt, id, &compressed)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
err = g.send(packet.Bytes())
|
err = g.send(packet.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
|
|
@ -136,21 +146,40 @@ func (g *gelfUDP) Close() (err error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *gelfUDP) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer {
|
func (g *gelfUDP) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) (bytes.Buffer, error) {
|
||||||
var packet bytes.Buffer
|
var packet bytes.Buffer
|
||||||
|
|
||||||
chunksize := g.getChunksize()
|
chunksize := g.getChunksize()
|
||||||
|
|
||||||
packet.Write(g.intToBytes(30))
|
b, err := g.intToBytes(30)
|
||||||
packet.Write(g.intToBytes(15))
|
if err != nil {
|
||||||
packet.Write(id)
|
return packet, err
|
||||||
|
}
|
||||||
|
packet.Write(b) //nolint:revive // from buffer.go: "err is always nil"
|
||||||
|
|
||||||
packet.Write(g.intToBytes(index))
|
b, err = g.intToBytes(15)
|
||||||
packet.Write(g.intToBytes(chunkCountInt))
|
if err != nil {
|
||||||
|
return packet, err
|
||||||
|
}
|
||||||
|
packet.Write(b) //nolint:revive // from buffer.go: "err is always nil"
|
||||||
|
|
||||||
packet.Write(compressed.Next(chunksize))
|
packet.Write(id) //nolint:revive // from buffer.go: "err is always nil"
|
||||||
|
|
||||||
return packet
|
b, err = g.intToBytes(index)
|
||||||
|
if err != nil {
|
||||||
|
return packet, err
|
||||||
|
}
|
||||||
|
packet.Write(b) //nolint:revive // from buffer.go: "err is always nil"
|
||||||
|
|
||||||
|
b, err = g.intToBytes(chunkCountInt)
|
||||||
|
if err != nil {
|
||||||
|
return packet, err
|
||||||
|
}
|
||||||
|
packet.Write(b) //nolint:revive // from buffer.go: "err is always nil"
|
||||||
|
|
||||||
|
packet.Write(compressed.Next(chunksize)) //nolint:revive // from buffer.go: "err is always nil"
|
||||||
|
|
||||||
|
return packet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *gelfUDP) getChunksize() int {
|
func (g *gelfUDP) getChunksize() int {
|
||||||
|
|
@ -165,21 +194,30 @@ func (g *gelfUDP) getChunksize() int {
|
||||||
return g.gelfConfig.MaxChunkSizeWan
|
return g.gelfConfig.MaxChunkSizeWan
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *gelfUDP) intToBytes(i int) []byte {
|
func (g *gelfUDP) intToBytes(i int) ([]byte, error) {
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
|
|
||||||
binary.Write(buf, binary.LittleEndian, int8(i))
|
err := binary.Write(buf, binary.LittleEndian, int8(i))
|
||||||
return buf.Bytes()
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *gelfUDP) compress(b []byte) bytes.Buffer {
|
return buf.Bytes(), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *gelfUDP) compress(b []byte) (bytes.Buffer, error) {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
comp := zlib.NewWriter(&buf)
|
comp := zlib.NewWriter(&buf)
|
||||||
|
|
||||||
comp.Write(b)
|
if _, err := comp.Write(b); err != nil {
|
||||||
comp.Close()
|
return bytes.Buffer{}, err
|
||||||
|
}
|
||||||
|
|
||||||
return buf
|
if err := comp.Close(); err != nil {
|
||||||
|
return bytes.Buffer{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *gelfUDP) Connect() error {
|
func (g *gelfUDP) Connect() error {
|
||||||
|
|
|
||||||
|
|
@ -11,11 +11,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
reuse "github.com/libp2p/go-reuseport"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
reuse "github.com/libp2p/go-reuseport"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWriteUDP(t *testing.T) {
|
func TestWriteUDP(t *testing.T) {
|
||||||
|
|
@ -183,14 +183,14 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, config *Gr
|
||||||
var obj GelfObject
|
var obj GelfObject
|
||||||
_ = json.Unmarshal(bufW.Bytes(), &obj)
|
_ = json.Unmarshal(bufW.Bytes(), &obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, obj["short_message"], "telegraf")
|
require.Equal(t, obj["short_message"], "telegraf")
|
||||||
if config.NameFieldNoPrefix {
|
if config.NameFieldNoPrefix {
|
||||||
assert.Equal(t, obj["name"], "test1")
|
require.Equal(t, obj["name"], "test1")
|
||||||
} else {
|
} else {
|
||||||
assert.Equal(t, obj["_name"], "test1")
|
require.Equal(t, obj["_name"], "test1")
|
||||||
}
|
}
|
||||||
assert.Equal(t, obj["_tag1"], "value1")
|
require.Equal(t, obj["_tag1"], "value1")
|
||||||
assert.Equal(t, obj["_value"], float64(1))
|
require.Equal(t, obj["_value"], float64(1))
|
||||||
}
|
}
|
||||||
|
|
||||||
// in UDP scenario all 4 messages are received
|
// in UDP scenario all 4 messages are received
|
||||||
|
|
@ -238,10 +238,10 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, wg3 *sync.
|
||||||
var obj GelfObject
|
var obj GelfObject
|
||||||
err = json.Unmarshal(bufW.Bytes(), &obj)
|
err = json.Unmarshal(bufW.Bytes(), &obj)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, obj["short_message"], "telegraf")
|
require.Equal(t, obj["short_message"], "telegraf")
|
||||||
assert.Equal(t, obj["_name"], "test1")
|
require.Equal(t, obj["_name"], "test1")
|
||||||
assert.Equal(t, obj["_tag1"], "value1")
|
require.Equal(t, obj["_tag1"], "value1")
|
||||||
assert.Equal(t, obj["_value"], float64(1))
|
require.Equal(t, obj["_value"], float64(1))
|
||||||
}
|
}
|
||||||
|
|
||||||
conn := accept()
|
conn := accept()
|
||||||
|
|
|
||||||
|
|
@ -208,9 +208,9 @@ func (h *Health) Close() error {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
h.server.Shutdown(ctx)
|
err := h.server.Shutdown(ctx)
|
||||||
h.wg.Wait()
|
h.wg.Wait()
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Origin returns the URL of the HTTP server.
|
// Origin returns the URL of the HTTP server.
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs/health"
|
"github.com/influxdata/telegraf/plugins/outputs/health"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var pki = testutil.NewPKI("../../../testutil/pki")
|
var pki = testutil.NewPKI("../../../testutil/pki")
|
||||||
|
|
@ -119,6 +120,7 @@ func TestHealth(t *testing.T) {
|
||||||
|
|
||||||
resp, err := http.Get(output.Origin())
|
resp, err := http.Get(output.Origin())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
defer resp.Body.Close()
|
||||||
require.Equal(t, tt.expectedCode, resp.StatusCode)
|
require.Equal(t, tt.expectedCode, resp.StatusCode)
|
||||||
|
|
||||||
_, err = io.ReadAll(resp.Body)
|
_, err = io.ReadAll(resp.Body)
|
||||||
|
|
|
||||||
|
|
@ -150,7 +150,7 @@ func (h *HTTP) Write(metrics []telegraf.Metric) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return h.write(reqBody)
|
return h.writeMetric(reqBody)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
|
|
@ -160,14 +160,14 @@ func (h *HTTP) Write(metrics []telegraf.Metric) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := h.write(reqBody); err != nil {
|
if err := h.writeMetric(reqBody); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HTTP) write(reqBody []byte) error {
|
func (h *HTTP) writeMetric(reqBody []byte) error {
|
||||||
var reqBodyBuffer io.Reader = bytes.NewBuffer(reqBody)
|
var reqBodyBuffer io.Reader = bytes.NewBuffer(reqBody)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
|
||||||
|
|
@ -10,15 +10,16 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
|
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
|
||||||
oauth "github.com/influxdata/telegraf/plugins/common/oauth"
|
"github.com/influxdata/telegraf/plugins/common/oauth"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/json"
|
"github.com/influxdata/telegraf/plugins/serializers/json"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func getMetric() telegraf.Metric {
|
func getMetric() telegraf.Metric {
|
||||||
|
|
@ -408,7 +409,8 @@ func TestOAuthClientCredentialsGrant(t *testing.T) {
|
||||||
values.Add("access_token", token)
|
values.Add("access_token", token)
|
||||||
values.Add("token_type", "bearer")
|
values.Add("token_type", "bearer")
|
||||||
values.Add("expires_in", "3600")
|
values.Add("expires_in", "3600")
|
||||||
w.Write([]byte(values.Encode()))
|
_, err = w.Write([]byte(values.Encode()))
|
||||||
|
require.NoError(t, err)
|
||||||
},
|
},
|
||||||
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"])
|
require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"])
|
||||||
|
|
|
||||||
|
|
@ -456,10 +456,10 @@ func (c *httpClient) makeQueryRequest(query string) (*http.Request, error) {
|
||||||
return req, nil
|
return req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
|
func (c *httpClient) makeWriteRequest(address string, body io.Reader) (*http.Request, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", url, body)
|
req, err := http.NewRequest("POST", address, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed creating new request: %s", err.Error())
|
return nil, fmt.Errorf("failed creating new request: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,7 @@ func (c *udpClient) Write(ctx context.Context, metrics []telegraf.Metric) error
|
||||||
_, err = c.conn.Write(scanner.Bytes())
|
_, err = c.conn.Write(scanner.Bytes())
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.conn.Close()
|
_ = c.conn.Close()
|
||||||
c.conn = nil
|
c.conn = nil
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,11 +11,12 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs/influxdb"
|
"github.com/influxdata/telegraf/plugins/outputs/influxdb"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -91,7 +92,7 @@ func TestUDP_Simple(t *testing.T) {
|
||||||
DialContextF: func(network, address string) (influxdb.Conn, error) {
|
DialContextF: func(network, address string) (influxdb.Conn, error) {
|
||||||
conn := &MockConn{
|
conn := &MockConn{
|
||||||
WriteF: func(b []byte) (n int, err error) {
|
WriteF: func(b []byte) (n int, err error) {
|
||||||
buffer.Write(b)
|
buffer.Write(b) //nolint:revive // MockConn with always-success return
|
||||||
return 0, nil
|
return 0, nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
@ -55,6 +54,7 @@ type HTTPConfig struct {
|
||||||
TLSConfig *tls.Config
|
TLSConfig *tls.Config
|
||||||
|
|
||||||
Serializer *influx.Serializer
|
Serializer *influx.Serializer
|
||||||
|
Log telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpClient struct {
|
type httpClient struct {
|
||||||
|
|
@ -71,6 +71,7 @@ type httpClient struct {
|
||||||
url *url.URL
|
url *url.URL
|
||||||
retryTime time.Time
|
retryTime time.Time
|
||||||
retryCount int
|
retryCount int
|
||||||
|
log telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
|
func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
|
||||||
|
|
@ -142,6 +143,7 @@ func NewHTTPClient(config *HTTPConfig) (*httpClient, error) {
|
||||||
Bucket: config.Bucket,
|
Bucket: config.Bucket,
|
||||||
BucketTag: config.BucketTag,
|
BucketTag: config.BucketTag,
|
||||||
ExcludeBucketTag: config.ExcludeBucketTag,
|
ExcludeBucketTag: config.ExcludeBucketTag,
|
||||||
|
log: config.Log,
|
||||||
}
|
}
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
@ -296,7 +298,7 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
|
||||||
// Clients should *not* repeat the request and the metrics should be dropped.
|
// Clients should *not* repeat the request and the metrics should be dropped.
|
||||||
http.StatusUnprocessableEntity,
|
http.StatusUnprocessableEntity,
|
||||||
http.StatusNotAcceptable:
|
http.StatusNotAcceptable:
|
||||||
log.Printf("E! [outputs.influxdb_v2] Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc)
|
c.log.Errorf("Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc)
|
||||||
return nil
|
return nil
|
||||||
case http.StatusUnauthorized, http.StatusForbidden:
|
case http.StatusUnauthorized, http.StatusForbidden:
|
||||||
return fmt.Errorf("failed to write metric (%s): %s", resp.Status, desc)
|
return fmt.Errorf("failed to write metric (%s): %s", resp.Status, desc)
|
||||||
|
|
@ -308,14 +310,14 @@ func (c *httpClient) writeBatch(ctx context.Context, bucket string, metrics []te
|
||||||
c.retryCount++
|
c.retryCount++
|
||||||
retryDuration := c.getRetryDuration(resp.Header)
|
retryDuration := c.getRetryDuration(resp.Header)
|
||||||
c.retryTime = time.Now().Add(retryDuration)
|
c.retryTime = time.Now().Add(retryDuration)
|
||||||
log.Printf("W! [outputs.influxdb_v2] Failed to write; will retry in %s. (%s)\n", retryDuration, resp.Status)
|
c.log.Warnf("Failed to write; will retry in %s. (%s)\n", retryDuration, resp.Status)
|
||||||
return fmt.Errorf("waiting %s for server before sending metric again", retryDuration)
|
return fmt.Errorf("waiting %s for server before sending metric again", retryDuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
|
// if it's any other 4xx code, the client should not retry as it's the client's mistake.
|
||||||
// retrying will not make the request magically work.
|
// retrying will not make the request magically work.
|
||||||
if len(resp.Status) > 0 && resp.Status[0] == '4' {
|
if len(resp.Status) > 0 && resp.Status[0] == '4' {
|
||||||
log.Printf("E! [outputs.influxdb_v2] Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc)
|
c.log.Errorf("Failed to write metric (will be dropped: %s): %s\n", resp.Status, desc)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -357,10 +359,10 @@ func (c *httpClient) getRetryDuration(headers http.Header) time.Duration {
|
||||||
return time.Duration(retry*1000) * time.Millisecond
|
return time.Duration(retry*1000) * time.Millisecond
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *httpClient) makeWriteRequest(url string, body io.Reader) (*http.Request, error) {
|
func (c *httpClient) makeWriteRequest(address string, body io.Reader) (*http.Request, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", url, body)
|
req, err := http.NewRequest("POST", address, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,8 +11,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func genURL(u string) *url.URL {
|
func genURL(u string) *url.URL {
|
||||||
URL, _ := url.Parse(u)
|
address, _ := url.Parse(u)
|
||||||
return URL
|
return address
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMakeWriteURL(t *testing.T) {
|
func TestMakeWriteURL(t *testing.T) {
|
||||||
|
|
|
||||||
|
|
@ -9,15 +9,16 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2"
|
influxdb "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func genURL(u string) *url.URL {
|
func genURL(u string) *url.URL {
|
||||||
URL, _ := url.Parse(u)
|
address, _ := url.Parse(u)
|
||||||
return URL
|
return address
|
||||||
}
|
}
|
||||||
func TestNewHTTPClient(t *testing.T) {
|
func TestNewHTTPClient(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
|
@ -60,7 +61,8 @@ func TestWriteBucketTagWorksOnRetry(t *testing.T) {
|
||||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
switch r.URL.Path {
|
switch r.URL.Path {
|
||||||
case "/api/v2/write":
|
case "/api/v2/write":
|
||||||
r.ParseForm()
|
err := r.ParseForm()
|
||||||
|
require.NoError(t, err)
|
||||||
require.Equal(t, r.Form["bucket"], []string{"foo"})
|
require.Equal(t, r.Form["bucket"], []string{"foo"})
|
||||||
|
|
||||||
body, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
|
|
|
||||||
|
|
@ -170,14 +170,14 @@ func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *InfluxDB) getHTTPClient(url *url.URL, proxy *url.URL) (Client, error) {
|
func (i *InfluxDB) getHTTPClient(address *url.URL, proxy *url.URL) (Client, error) {
|
||||||
tlsConfig, err := i.ClientConfig.TLSConfig()
|
tlsConfig, err := i.ClientConfig.TLSConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
config := &HTTPConfig{
|
httpConfig := &HTTPConfig{
|
||||||
URL: url,
|
URL: address,
|
||||||
Token: i.Token,
|
Token: i.Token,
|
||||||
Organization: i.Organization,
|
Organization: i.Organization,
|
||||||
Bucket: i.Bucket,
|
Bucket: i.Bucket,
|
||||||
|
|
@ -190,11 +190,12 @@ func (i *InfluxDB) getHTTPClient(url *url.URL, proxy *url.URL) (Client, error) {
|
||||||
ContentEncoding: i.ContentEncoding,
|
ContentEncoding: i.ContentEncoding,
|
||||||
TLSConfig: tlsConfig,
|
TLSConfig: tlsConfig,
|
||||||
Serializer: i.newSerializer(),
|
Serializer: i.newSerializer(),
|
||||||
|
Log: i.Log,
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := NewHTTPClient(config)
|
c, err := NewHTTPClient(httpConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating HTTP client [%s]: %v", url, err)
|
return nil, fmt.Errorf("error creating HTTP client [%s]: %v", address, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
|
|
|
||||||
|
|
@ -75,9 +75,9 @@ func (i *Instrumental) Connect() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Instrumental) Close() error {
|
func (i *Instrumental) Close() error {
|
||||||
i.conn.Close()
|
err := i.conn.Close()
|
||||||
i.conn = nil
|
i.conn = nil
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Instrumental) Write(metrics []telegraf.Metric) error {
|
func (i *Instrumental) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
@ -138,23 +138,23 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
|
||||||
splitStat := strings.SplitN(stat, " ", 3)
|
splitStat := strings.SplitN(stat, " ", 3)
|
||||||
name := splitStat[0]
|
name := splitStat[0]
|
||||||
value := splitStat[1]
|
value := splitStat[1]
|
||||||
time := splitStat[2]
|
timestamp := splitStat[2]
|
||||||
|
|
||||||
// replace invalid components of metric name with underscore
|
// replace invalid components of metric name with underscore
|
||||||
cleanMetric := MetricNameReplacer.ReplaceAllString(name, "_")
|
cleanMetric := MetricNameReplacer.ReplaceAllString(name, "_")
|
||||||
|
|
||||||
if !ValueIncludesBadChar.MatchString(value) {
|
if !ValueIncludesBadChar.MatchString(value) {
|
||||||
points = append(points, fmt.Sprintf("%s %s %s %s", metricType, cleanMetric, value, time))
|
points = append(points, fmt.Sprintf("%s %s %s %s", metricType, cleanMetric, value, timestamp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
allPoints := strings.Join(points, "")
|
allPoints := strings.Join(points, "")
|
||||||
_, err = fmt.Fprintf(i.conn, allPoints)
|
_, err = fmt.Fprint(i.conn, allPoints)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
i.Close()
|
_ = i.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
|
@ -163,7 +163,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
|
||||||
// force the connection closed after sending data
|
// force the connection closed after sending data
|
||||||
// to deal with various disconnection scenarios and eschew holding
|
// to deal with various disconnection scenarios and eschew holding
|
||||||
// open idle connections en masse
|
// open idle connections en masse
|
||||||
i.Close()
|
_ = i.Close()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,10 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWrite(t *testing.T) {
|
func TestWrite(t *testing.T) {
|
||||||
|
|
@ -39,7 +40,8 @@ func TestWrite(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
metrics := []telegraf.Metric{m1, m2}
|
metrics := []telegraf.Metric{m1, m2}
|
||||||
i.Write(metrics)
|
err := i.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Counter and Histogram are increments
|
// Counter and Histogram are increments
|
||||||
m3 := metric.New(
|
m3 := metric.New(
|
||||||
|
|
@ -70,7 +72,8 @@ func TestWrite(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
metrics = []telegraf.Metric{m3, m4, m5, m6}
|
metrics = []telegraf.Metric{m3, m4, m5, m6}
|
||||||
i.Write(metrics)
|
err = i.Write(metrics)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
@ -80,44 +83,49 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup) {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
conn, _ := tcpServer.Accept()
|
conn, _ := tcpServer.Accept()
|
||||||
conn.SetDeadline(time.Now().Add(1 * time.Second))
|
err := conn.SetDeadline(time.Now().Add(1 * time.Second))
|
||||||
|
require.NoError(t, err)
|
||||||
reader := bufio.NewReader(conn)
|
reader := bufio.NewReader(conn)
|
||||||
tp := textproto.NewReader(reader)
|
tp := textproto.NewReader(reader)
|
||||||
|
|
||||||
hello, _ := tp.ReadLine()
|
hello, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "hello version go/telegraf/1.1", hello)
|
require.Equal(t, "hello version go/telegraf/1.1", hello)
|
||||||
auth, _ := tp.ReadLine()
|
auth, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "authenticate abc123token", auth)
|
require.Equal(t, "authenticate abc123token", auth)
|
||||||
conn.Write([]byte("ok\nok\n"))
|
_, err = conn.Write([]byte("ok\nok\n"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
data1, _ := tp.ReadLine()
|
data1, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
|
require.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
|
||||||
data2, _ := tp.ReadLine()
|
data2, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
|
require.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
|
||||||
|
|
||||||
conn, _ = tcpServer.Accept()
|
conn, _ = tcpServer.Accept()
|
||||||
conn.SetDeadline(time.Now().Add(1 * time.Second))
|
err = conn.SetDeadline(time.Now().Add(1 * time.Second))
|
||||||
|
require.NoError(t, err)
|
||||||
reader = bufio.NewReader(conn)
|
reader = bufio.NewReader(conn)
|
||||||
tp = textproto.NewReader(reader)
|
tp = textproto.NewReader(reader)
|
||||||
|
|
||||||
hello, _ = tp.ReadLine()
|
hello, _ = tp.ReadLine()
|
||||||
assert.Equal(t, "hello version go/telegraf/1.1", hello)
|
require.Equal(t, "hello version go/telegraf/1.1", hello)
|
||||||
auth, _ = tp.ReadLine()
|
auth, _ = tp.ReadLine()
|
||||||
assert.Equal(t, "authenticate abc123token", auth)
|
require.Equal(t, "authenticate abc123token", auth)
|
||||||
conn.Write([]byte("ok\nok\n"))
|
_, err = conn.Write([]byte("ok\nok\n"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
data3, _ := tp.ReadLine()
|
data3, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3)
|
require.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3)
|
||||||
|
|
||||||
data4, _ := tp.ReadLine()
|
data4, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000", data4)
|
require.Equal(t, "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000", data4)
|
||||||
|
|
||||||
data5, _ := tp.ReadLine()
|
data5, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data5)
|
require.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data5)
|
||||||
|
|
||||||
data6, _ := tp.ReadLine()
|
data6, _ := tp.ReadLine()
|
||||||
assert.Equal(t, "", data6)
|
require.Equal(t, "", data6)
|
||||||
|
|
||||||
conn.Close()
|
err = conn.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/gofrs/uuid"
|
"github.com/gofrs/uuid"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/common/kafka"
|
"github.com/influxdata/telegraf/plugins/common/kafka"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
|
@ -228,7 +229,7 @@ func ValidateTopicSuffixMethod(method string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("Unknown topic suffix method provided: %s", method)
|
return fmt.Errorf("unknown topic suffix method provided: %s", method)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *Kafka) GetTopicName(metric telegraf.Metric) (telegraf.Metric, string) {
|
func (k *Kafka) GetTopicName(metric telegraf.Metric) (telegraf.Metric, string) {
|
||||||
|
|
@ -379,7 +380,7 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
|
||||||
k.Log.Error("The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; dropping batch")
|
k.Log.Error("The timestamp of the message is out of acceptable range, consider increasing broker `message.timestamp.difference.max.ms`; dropping batch")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return prodErr
|
return prodErr //nolint:staticcheck // Return first error encountered
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -5,11 +5,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type topicSuffixTestpair struct {
|
type topicSuffixTestpair struct {
|
||||||
|
|
@ -50,10 +51,10 @@ func TestTopicSuffixesIntegration(t *testing.T) {
|
||||||
|
|
||||||
topic := "Test"
|
topic := "Test"
|
||||||
|
|
||||||
metric := testutil.TestMetric(1)
|
m := testutil.TestMetric(1)
|
||||||
metricTagName := "tag1"
|
metricTagName := "tag1"
|
||||||
metricTagValue := metric.Tags()[metricTagName]
|
metricTagValue := m.Tags()[metricTagName]
|
||||||
metricName := metric.Name()
|
metricName := m.Name()
|
||||||
|
|
||||||
var testcases = []topicSuffixTestpair{
|
var testcases = []topicSuffixTestpair{
|
||||||
// This ensures empty separator is okay
|
// This ensures empty separator is okay
|
||||||
|
|
@ -85,7 +86,7 @@ func TestTopicSuffixesIntegration(t *testing.T) {
|
||||||
TopicSuffix: topicSuffix,
|
TopicSuffix: topicSuffix,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, topic := k.GetTopicName(metric)
|
_, topic := k.GetTopicName(m)
|
||||||
require.Equal(t, expectedTopic, topic)
|
require.Equal(t, expectedTopic, topic)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,12 +9,12 @@ import (
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
"github.com/aws/aws-sdk-go-v2/service/kinesis"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
|
||||||
"github.com/gofrs/uuid"
|
"github.com/gofrs/uuid"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const testPartitionKey = "partitionKey"
|
const testPartitionKey = "partitionKey"
|
||||||
|
|
@ -24,7 +24,6 @@ const testStreamName = "streamName"
|
||||||
const zero int64 = 0
|
const zero int64 = 0
|
||||||
|
|
||||||
func TestPartitionKey(t *testing.T) {
|
func TestPartitionKey(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
testPoint := testutil.TestMetric(1)
|
testPoint := testutil.TestMetric(1)
|
||||||
|
|
||||||
k := KinesisOutput{
|
k := KinesisOutput{
|
||||||
|
|
@ -34,7 +33,7 @@ func TestPartitionKey(t *testing.T) {
|
||||||
Key: "-",
|
Key: "-",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")
|
require.Equal(t, "-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")
|
||||||
|
|
||||||
k = KinesisOutput{
|
k = KinesisOutput{
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -43,7 +42,7 @@ func TestPartitionKey(t *testing.T) {
|
||||||
Key: "tag1",
|
Key: "tag1",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'")
|
require.Equal(t, testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'")
|
||||||
|
|
||||||
k = KinesisOutput{
|
k = KinesisOutput{
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -53,7 +52,7 @@ func TestPartitionKey(t *testing.T) {
|
||||||
Default: "somedefault",
|
Default: "somedefault",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Equal("somedefault", k.getPartitionKey(testPoint), "PartitionKey should use default")
|
require.Equal(t, "somedefault", k.getPartitionKey(testPoint), "PartitionKey should use default")
|
||||||
|
|
||||||
k = KinesisOutput{
|
k = KinesisOutput{
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -62,7 +61,7 @@ func TestPartitionKey(t *testing.T) {
|
||||||
Key: "doesnotexist",
|
Key: "doesnotexist",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Equal("telegraf", k.getPartitionKey(testPoint), "PartitionKey should be telegraf")
|
require.Equal(t, "telegraf", k.getPartitionKey(testPoint), "PartitionKey should be telegraf")
|
||||||
|
|
||||||
k = KinesisOutput{
|
k = KinesisOutput{
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -70,7 +69,7 @@ func TestPartitionKey(t *testing.T) {
|
||||||
Method: "not supported",
|
Method: "not supported",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")
|
require.Equal(t, "", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")
|
||||||
|
|
||||||
k = KinesisOutput{
|
k = KinesisOutput{
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -78,7 +77,7 @@ func TestPartitionKey(t *testing.T) {
|
||||||
Method: "measurement",
|
Method: "measurement",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Equal(testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name")
|
require.Equal(t, testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name")
|
||||||
|
|
||||||
k = KinesisOutput{
|
k = KinesisOutput{
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -88,14 +87,14 @@ func TestPartitionKey(t *testing.T) {
|
||||||
}
|
}
|
||||||
partitionKey := k.getPartitionKey(testPoint)
|
partitionKey := k.getPartitionKey(testPoint)
|
||||||
u, err := uuid.FromString(partitionKey)
|
u, err := uuid.FromString(partitionKey)
|
||||||
assert.Nil(err, "Issue parsing UUID")
|
require.NoError(t, err, "Issue parsing UUID")
|
||||||
assert.Equal(byte(4), u.Version(), "PartitionKey should be UUIDv4")
|
require.Equal(t, byte(4), u.Version(), "PartitionKey should be UUIDv4")
|
||||||
|
|
||||||
k = KinesisOutput{
|
k = KinesisOutput{
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
PartitionKey: "-",
|
PartitionKey: "-",
|
||||||
}
|
}
|
||||||
assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")
|
require.Equal(t, "-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")
|
||||||
|
|
||||||
k = KinesisOutput{
|
k = KinesisOutput{
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
|
@ -103,13 +102,11 @@ func TestPartitionKey(t *testing.T) {
|
||||||
}
|
}
|
||||||
partitionKey = k.getPartitionKey(testPoint)
|
partitionKey = k.getPartitionKey(testPoint)
|
||||||
u, err = uuid.FromString(partitionKey)
|
u, err = uuid.FromString(partitionKey)
|
||||||
assert.Nil(err, "Issue parsing UUID")
|
require.NoError(t, err, "Issue parsing UUID")
|
||||||
assert.Equal(byte(4), u.Version(), "PartitionKey should be UUIDv4")
|
require.Equal(t, byte(4), u.Version(), "PartitionKey should be UUIDv4")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteKinesis_WhenSuccess(t *testing.T) {
|
func TestWriteKinesis_WhenSuccess(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
records := []types.PutRecordsRequestEntry{
|
records := []types.PutRecordsRequestEntry{
|
||||||
{
|
{
|
||||||
PartitionKey: aws.String(testPartitionKey),
|
PartitionKey: aws.String(testPartitionKey),
|
||||||
|
|
@ -135,7 +132,7 @@ func TestWriteKinesis_WhenSuccess(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
elapsed := k.writeKinesis(records)
|
elapsed := k.writeKinesis(records)
|
||||||
assert.GreaterOrEqual(elapsed.Nanoseconds(), zero)
|
require.GreaterOrEqual(t, elapsed.Nanoseconds(), zero)
|
||||||
|
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
||||||
{
|
{
|
||||||
|
|
@ -146,8 +143,6 @@ func TestWriteKinesis_WhenSuccess(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteKinesis_WhenRecordErrors(t *testing.T) {
|
func TestWriteKinesis_WhenRecordErrors(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
records := []types.PutRecordsRequestEntry{
|
records := []types.PutRecordsRequestEntry{
|
||||||
{
|
{
|
||||||
PartitionKey: aws.String(testPartitionKey),
|
PartitionKey: aws.String(testPartitionKey),
|
||||||
|
|
@ -173,7 +168,7 @@ func TestWriteKinesis_WhenRecordErrors(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
elapsed := k.writeKinesis(records)
|
elapsed := k.writeKinesis(records)
|
||||||
assert.GreaterOrEqual(elapsed.Nanoseconds(), zero)
|
require.GreaterOrEqual(t, elapsed.Nanoseconds(), zero)
|
||||||
|
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
||||||
{
|
{
|
||||||
|
|
@ -184,8 +179,6 @@ func TestWriteKinesis_WhenRecordErrors(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteKinesis_WhenServiceError(t *testing.T) {
|
func TestWriteKinesis_WhenServiceError(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
records := []types.PutRecordsRequestEntry{
|
records := []types.PutRecordsRequestEntry{
|
||||||
{
|
{
|
||||||
PartitionKey: aws.String(testPartitionKey),
|
PartitionKey: aws.String(testPartitionKey),
|
||||||
|
|
@ -205,7 +198,7 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
elapsed := k.writeKinesis(records)
|
elapsed := k.writeKinesis(records)
|
||||||
assert.GreaterOrEqual(elapsed.Nanoseconds(), zero)
|
require.GreaterOrEqual(t, elapsed.Nanoseconds(), zero)
|
||||||
|
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
||||||
{
|
{
|
||||||
|
|
@ -216,7 +209,6 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_NoMetrics(t *testing.T) {
|
func TestWrite_NoMetrics(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
serializer := influx.NewSerializer()
|
serializer := influx.NewSerializer()
|
||||||
svc := &mockKinesisPutRecords{}
|
svc := &mockKinesisPutRecords{}
|
||||||
|
|
||||||
|
|
@ -232,13 +224,12 @@ func TestWrite_NoMetrics(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := k.Write([]telegraf.Metric{})
|
err := k.Write([]telegraf.Metric{})
|
||||||
assert.Nil(err, "Should not return error")
|
require.NoError(t, err, "Should not return error")
|
||||||
|
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{})
|
svc.AssertRequests(t, []*kinesis.PutRecordsInput{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_SingleMetric(t *testing.T) {
|
func TestWrite_SingleMetric(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
serializer := influx.NewSerializer()
|
serializer := influx.NewSerializer()
|
||||||
|
|
||||||
svc := &mockKinesisPutRecords{}
|
svc := &mockKinesisPutRecords{}
|
||||||
|
|
@ -257,7 +248,7 @@ func TestWrite_SingleMetric(t *testing.T) {
|
||||||
|
|
||||||
metric, metricData := createTestMetric(t, "metric1", serializer)
|
metric, metricData := createTestMetric(t, "metric1", serializer)
|
||||||
err := k.Write([]telegraf.Metric{metric})
|
err := k.Write([]telegraf.Metric{metric})
|
||||||
assert.Nil(err, "Should not return error")
|
require.NoError(t, err, "Should not return error")
|
||||||
|
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
||||||
{
|
{
|
||||||
|
|
@ -273,7 +264,6 @@ func TestWrite_SingleMetric(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
|
func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
serializer := influx.NewSerializer()
|
serializer := influx.NewSerializer()
|
||||||
|
|
||||||
svc := &mockKinesisPutRecords{}
|
svc := &mockKinesisPutRecords{}
|
||||||
|
|
@ -292,7 +282,7 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
|
||||||
|
|
||||||
metrics, metricsData := createTestMetrics(t, 3, serializer)
|
metrics, metricsData := createTestMetrics(t, 3, serializer)
|
||||||
err := k.Write(metrics)
|
err := k.Write(metrics)
|
||||||
assert.Nil(err, "Should not return error")
|
require.NoError(t, err, "Should not return error")
|
||||||
|
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
||||||
{
|
{
|
||||||
|
|
@ -305,7 +295,6 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
|
func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
serializer := influx.NewSerializer()
|
serializer := influx.NewSerializer()
|
||||||
|
|
||||||
svc := &mockKinesisPutRecords{}
|
svc := &mockKinesisPutRecords{}
|
||||||
|
|
@ -324,7 +313,7 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
|
||||||
|
|
||||||
metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest, serializer)
|
metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest, serializer)
|
||||||
err := k.Write(metrics)
|
err := k.Write(metrics)
|
||||||
assert.Nil(err, "Should not return error")
|
require.NoError(t, err, "Should not return error")
|
||||||
|
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
||||||
{
|
{
|
||||||
|
|
@ -337,7 +326,6 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
|
func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
serializer := influx.NewSerializer()
|
serializer := influx.NewSerializer()
|
||||||
|
|
||||||
svc := &mockKinesisPutRecords{}
|
svc := &mockKinesisPutRecords{}
|
||||||
|
|
@ -357,7 +345,7 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
|
||||||
|
|
||||||
metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest+1, serializer)
|
metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest+1, serializer)
|
||||||
err := k.Write(metrics)
|
err := k.Write(metrics)
|
||||||
assert.Nil(err, "Should not return error")
|
require.NoError(t, err, "Should not return error")
|
||||||
|
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
||||||
{
|
{
|
||||||
|
|
@ -376,7 +364,6 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
|
func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
serializer := influx.NewSerializer()
|
serializer := influx.NewSerializer()
|
||||||
|
|
||||||
svc := &mockKinesisPutRecords{}
|
svc := &mockKinesisPutRecords{}
|
||||||
|
|
@ -396,7 +383,7 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
|
||||||
|
|
||||||
metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest*2, serializer)
|
metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest*2, serializer)
|
||||||
err := k.Write(metrics)
|
err := k.Write(metrics)
|
||||||
assert.Nil(err, "Should not return error")
|
require.NoError(t, err, "Should not return error")
|
||||||
|
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
||||||
{
|
{
|
||||||
|
|
@ -415,7 +402,6 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWrite_SerializerError(t *testing.T) {
|
func TestWrite_SerializerError(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
serializer := influx.NewSerializer()
|
serializer := influx.NewSerializer()
|
||||||
|
|
||||||
svc := &mockKinesisPutRecords{}
|
svc := &mockKinesisPutRecords{}
|
||||||
|
|
@ -443,7 +429,7 @@ func TestWrite_SerializerError(t *testing.T) {
|
||||||
invalidMetric,
|
invalidMetric,
|
||||||
metric2,
|
metric2,
|
||||||
})
|
})
|
||||||
assert.Nil(err, "Should not return error")
|
require.NoError(t, err, "Should not return error")
|
||||||
|
|
||||||
// remaining valid metrics should still get written
|
// remaining valid metrics should still get written
|
||||||
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
|
||||||
|
|
@ -519,7 +505,7 @@ func (m *mockKinesisPutRecords) SetupErrorResponse(err error) {
|
||||||
func (m *mockKinesisPutRecords) PutRecords(_ context.Context, input *kinesis.PutRecordsInput, _ ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error) {
|
func (m *mockKinesisPutRecords) PutRecords(_ context.Context, input *kinesis.PutRecordsInput, _ ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error) {
|
||||||
reqNum := len(m.requests)
|
reqNum := len(m.requests)
|
||||||
if reqNum > len(m.responses) {
|
if reqNum > len(m.responses) {
|
||||||
return nil, fmt.Errorf("Response for request %+v not setup", reqNum)
|
return nil, fmt.Errorf("response for request %+v not setup", reqNum)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.requests = append(m.requests, input)
|
m.requests = append(m.requests, input)
|
||||||
|
|
|
||||||
|
|
@ -118,6 +118,16 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
|
||||||
// make sur we send a batch of maximum 300
|
// make sur we send a batch of maximum 300
|
||||||
sizeBatch := 300
|
sizeBatch := 300
|
||||||
for start := 0; start < metricCounter; start += sizeBatch {
|
for start := 0; start < metricCounter; start += sizeBatch {
|
||||||
|
err := l.writeBatch(start, sizeBatch, metricCounter, tempGauges)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Librato) writeBatch(start int, sizeBatch int, metricCounter int, tempGauges []*Gauge) error {
|
||||||
lmetrics := LMetrics{}
|
lmetrics := LMetrics{}
|
||||||
end := start + sizeBatch
|
end := start + sizeBatch
|
||||||
if end > metricCounter {
|
if end > metricCounter {
|
||||||
|
|
@ -163,8 +173,6 @@ func (l *Librato) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
l.Log.Debugf("Librato response: %v", string(htmlData))
|
l.Log.Debugf("Librato response: %v", string(htmlData))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -219,9 +227,10 @@ func verifyValue(v interface{}) bool {
|
||||||
switch v.(type) {
|
switch v.(type) {
|
||||||
case string:
|
case string:
|
||||||
return false
|
return false
|
||||||
}
|
default:
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (g *Gauge) setValue(v interface{}) error {
|
func (g *Gauge) setValue(v interface{}) error {
|
||||||
switch d := v.(type) {
|
switch d := v.(type) {
|
||||||
|
|
@ -230,7 +239,7 @@ func (g *Gauge) setValue(v interface{}) error {
|
||||||
case uint64:
|
case uint64:
|
||||||
g.Value = float64(d)
|
g.Value = float64(d)
|
||||||
case float64:
|
case float64:
|
||||||
g.Value = float64(d)
|
g.Value = d
|
||||||
case bool:
|
case bool:
|
||||||
if d {
|
if d {
|
||||||
g.Value = float64(1.0)
|
g.Value = float64(1.0)
|
||||||
|
|
|
||||||
|
|
@ -11,13 +11,14 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/oauth2"
|
||||||
|
"golang.org/x/oauth2/clientcredentials"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"golang.org/x/oauth2"
|
|
||||||
"golang.org/x/oauth2/clientcredentials"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -126,7 +127,7 @@ func (l *Loki) Connect() (err error) {
|
||||||
return fmt.Errorf("http client fail: %w", err)
|
return fmt.Errorf("http client fail: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Loki) Close() error {
|
func (l *Loki) Close() error {
|
||||||
|
|
@ -155,10 +156,10 @@ func (l *Loki) Write(metrics []telegraf.Metric) error {
|
||||||
s.insertLog(tags, Log{fmt.Sprintf("%d", m.Time().UnixNano()), line})
|
s.insertLog(tags, Log{fmt.Sprintf("%d", m.Time().UnixNano()), line})
|
||||||
}
|
}
|
||||||
|
|
||||||
return l.write(s)
|
return l.writeMetrics(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Loki) write(s Streams) error {
|
func (l *Loki) writeMetrics(s Streams) error {
|
||||||
bs, err := json.Marshal(s)
|
bs, err := json.Marshal(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("json.Marshal: %w", err)
|
return fmt.Errorf("json.Marshal: %w", err)
|
||||||
|
|
|
||||||
|
|
@ -11,11 +11,11 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getMetric() telegraf.Metric {
|
func getMetric() telegraf.Metric {
|
||||||
|
|
@ -329,7 +329,8 @@ func TestOAuthClientCredentialsGrant(t *testing.T) {
|
||||||
values.Add("access_token", token)
|
values.Add("access_token", token)
|
||||||
values.Add("token_type", "bearer")
|
values.Add("token_type", "bearer")
|
||||||
values.Add("expires_in", "3600")
|
values.Add("expires_in", "3600")
|
||||||
w.Write([]byte(values.Encode()))
|
_, err = w.Write([]byte(values.Encode()))
|
||||||
|
require.NoError(t, err)
|
||||||
},
|
},
|
||||||
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"])
|
require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"])
|
||||||
|
|
|
||||||
|
|
@ -2,12 +2,12 @@ package mqtt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
paho "github.com/eclipse/paho.mqtt.golang"
|
paho "github.com/eclipse/paho.mqtt.golang"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
|
@ -85,6 +85,7 @@ type MQTT struct {
|
||||||
BatchMessage bool `toml:"batch"`
|
BatchMessage bool `toml:"batch"`
|
||||||
Retain bool `toml:"retain"`
|
Retain bool `toml:"retain"`
|
||||||
KeepAlive int64 `toml:"keep_alive"`
|
KeepAlive int64 `toml:"keep_alive"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
client paho.Client
|
client paho.Client
|
||||||
opts *paho.ClientOptions
|
opts *paho.ClientOptions
|
||||||
|
|
@ -164,13 +165,13 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
|
||||||
} else {
|
} else {
|
||||||
buf, err := m.serializer.Serialize(metric)
|
buf, err := m.serializer.Serialize(metric)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("D! [outputs.mqtt] Could not serialize metric: %v", err)
|
m.Log.Debugf("Could not serialize metric: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.publish(topic, buf)
|
err = m.publish(topic, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Could not write to MQTT server, %s", err)
|
return fmt.Errorf("could not write to MQTT server, %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -183,7 +184,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
|
||||||
}
|
}
|
||||||
publisherr := m.publish(key, buf)
|
publisherr := m.publish(key, buf)
|
||||||
if publisherr != nil {
|
if publisherr != nil {
|
||||||
return fmt.Errorf("Could not write to MQTT server, %s", publisherr)
|
return fmt.Errorf("could not write to MQTT server, %s", publisherr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,14 +2,14 @@ package nats
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"github.com/nats-io/nats.go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type NATS struct {
|
type NATS struct {
|
||||||
|
|
@ -23,6 +23,8 @@ type NATS struct {
|
||||||
|
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
|
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
conn *nats.Conn
|
conn *nats.Conn
|
||||||
serializer serializers.Serializer
|
serializer serializers.Serializer
|
||||||
}
|
}
|
||||||
|
|
@ -121,7 +123,7 @@ func (n *NATS) Write(metrics []telegraf.Metric) error {
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
buf, err := n.serializer.Serialize(metric)
|
buf, err := n.serializer.Serialize(metric)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("D! [outputs.nats] Could not serialize metric: %v", err)
|
n.Log.Debugf("Could not serialize metric: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,11 +8,12 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/newrelic/newrelic-telemetry-sdk-go/cumulative"
|
||||||
|
"github.com/newrelic/newrelic-telemetry-sdk-go/telemetry"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/newrelic/newrelic-telemetry-sdk-go/cumulative"
|
|
||||||
"github.com/newrelic/newrelic-telemetry-sdk-go/telemetry"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewRelic nr structure
|
// NewRelic nr structure
|
||||||
|
|
@ -27,7 +28,7 @@ type NewRelic struct {
|
||||||
dc *cumulative.DeltaCalculator
|
dc *cumulative.DeltaCalculator
|
||||||
savedErrors map[int]interface{}
|
savedErrors map[int]interface{}
|
||||||
errorCount int
|
errorCount int
|
||||||
client http.Client `toml:"-"`
|
client http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
// Description returns a one-sentence description on the Output
|
// Description returns a one-sentence description on the Output
|
||||||
|
|
|
||||||
|
|
@ -2,19 +2,20 @@ package nsq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
|
"github.com/nsqio/go-nsq"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"github.com/nsqio/go-nsq"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type NSQ struct {
|
type NSQ struct {
|
||||||
Server string
|
Server string
|
||||||
Topic string
|
Topic string
|
||||||
producer *nsq.Producer
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
|
producer *nsq.Producer
|
||||||
serializer serializers.Serializer
|
serializer serializers.Serializer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -68,13 +69,13 @@ func (n *NSQ) Write(metrics []telegraf.Metric) error {
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
buf, err := n.serializer.Serialize(metric)
|
buf, err := n.serializer.Serialize(metric)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("D! [outputs.nsq] Could not serialize metric: %v", err)
|
n.Log.Debugf("Could not serialize metric: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = n.producer.Publish(n.Topic, buf)
|
err = n.producer.Publish(n.Topic, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("FAILED to send NSQD message: %s", err)
|
return fmt.Errorf("failed to send NSQD message: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue