fix: Linter fixes for plugins/inputs/[p-z]* (leftovers) (#10193)

Co-authored-by: Pawel Zak <Pawel Zak>
This commit is contained in:
Paweł Żak 2021-11-30 20:31:10 +01:00 committed by GitHub
parent 9bd0c6121e
commit 7aa6b533bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 330 additions and 318 deletions

1
go.mod
View File

@ -311,7 +311,6 @@ require (
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
gotest.tools v2.2.0+incompatible
k8s.io/api v0.22.2 k8s.io/api v0.22.2
k8s.io/apimachinery v0.22.2 k8s.io/apimachinery v0.22.2
k8s.io/client-go v0.22.2 k8s.io/client-go v0.22.2

View File

@ -116,7 +116,6 @@ func (r *Raindrops) gatherURL(addr *url.URL, acc telegraf.Accumulator) error {
} }
activeLineStr, activeErr = buf.ReadString('\n') activeLineStr, activeErr = buf.ReadString('\n')
if activeErr != nil { if activeErr != nil {
iterate = false
break break
} }
if strings.Compare(activeLineStr, "\n") == 0 { if strings.Compare(activeLineStr, "\n") == 0 {
@ -154,7 +153,7 @@ func (r *Raindrops) gatherURL(addr *url.URL, acc telegraf.Accumulator) error {
} }
acc.AddFields("raindrops_listen", lis, tags) acc.AddFields("raindrops_listen", lis, tags)
} }
return nil return nil //nolint:nilerr // nil returned on purpose
} }
// Get tag(s) for the raindrops calling/writing plugin // Get tag(s) for the raindrops calling/writing plugin

View File

@ -7,11 +7,11 @@ import (
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"testing" "testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"time"
) )
const sampleResponse = ` const sampleResponse = `
@ -41,7 +41,7 @@ func TestRaindropsTags(t *testing.T) {
for _, url1 := range urls { for _, url1 := range urls {
addr, _ = url.Parse(url1) addr, _ = url.Parse(url1)
tagMap := r.getTags(addr) tagMap := r.getTags(addr)
assert.Contains(t, tagMap["server"], "localhost") require.Contains(t, tagMap["server"], "localhost")
} }
} }

View File

@ -23,11 +23,11 @@ type Ras struct {
DBPath string `toml:"db_path"` DBPath string `toml:"db_path"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
db *sql.DB `toml:"-"`
latestTimestamp time.Time `toml:"-"` db *sql.DB
cpuSocketCounters map[int]metricCounters `toml:"-"` latestTimestamp time.Time
serverCounters metricCounters `toml:"-"` cpuSocketCounters map[int]metricCounters
serverCounters metricCounters
} }
type machineCheckError struct { type machineCheckError struct {

View File

@ -8,9 +8,9 @@ import (
"fmt" "fmt"
"testing" "testing"
"github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert" "github.com/influxdata/telegraf/testutil"
) )
func TestUpdateCounters(t *testing.T) { func TestUpdateCounters(t *testing.T) {
@ -19,20 +19,20 @@ func TestUpdateCounters(t *testing.T) {
ras.updateCounters(&mce) ras.updateCounters(&mce)
} }
assert.Equal(t, 1, len(ras.cpuSocketCounters), "Should contain counters only for single socket") require.Equal(t, 1, len(ras.cpuSocketCounters), "Should contain counters only for single socket")
for metric, value := range ras.cpuSocketCounters[0] { for metric, value := range ras.cpuSocketCounters[0] {
if metric == processorBase { if metric == processorBase {
// processor_base_errors is sum of other seven errors: internal_timer_errors, smm_handler_code_access_violation_errors, // processor_base_errors is sum of other seven errors: internal_timer_errors, smm_handler_code_access_violation_errors,
// internal_parity_errors, frc_errors, external_mce_errors, microcode_rom_parity_errors and unclassified_mce_errors // internal_parity_errors, frc_errors, external_mce_errors, microcode_rom_parity_errors and unclassified_mce_errors
assert.Equal(t, int64(7), value, fmt.Sprintf("%s should have value of 7", processorBase)) require.Equal(t, int64(7), value, fmt.Sprintf("%s should have value of 7", processorBase))
} else { } else {
assert.Equal(t, int64(1), value, fmt.Sprintf("%s should have value of 1", metric)) require.Equal(t, int64(1), value, fmt.Sprintf("%s should have value of 1", metric))
} }
} }
for metric, value := range ras.serverCounters { for metric, value := range ras.serverCounters {
assert.Equal(t, int64(1), value, fmt.Sprintf("%s should have value of 1", metric)) require.Equal(t, int64(1), value, fmt.Sprintf("%s should have value of 1", metric))
} }
} }
@ -61,9 +61,9 @@ func TestUpdateLatestTimestamp(t *testing.T) {
}...) }...)
for _, mce := range testData { for _, mce := range testData {
err := ras.updateLatestTimestamp(mce.Timestamp) err := ras.updateLatestTimestamp(mce.Timestamp)
assert.NoError(t, err) require.NoError(t, err)
} }
assert.Equal(t, ts, ras.latestTimestamp.Format(dateLayout)) require.Equal(t, ts, ras.latestTimestamp.Format(dateLayout))
} }
func TestMultipleSockets(t *testing.T) { func TestMultipleSockets(t *testing.T) {
@ -99,14 +99,14 @@ func TestMultipleSockets(t *testing.T) {
for _, mce := range testData { for _, mce := range testData {
ras.updateCounters(&mce) ras.updateCounters(&mce)
} }
assert.Equal(t, 4, len(ras.cpuSocketCounters), "Should contain counters for four sockets") require.Equal(t, 4, len(ras.cpuSocketCounters), "Should contain counters for four sockets")
for _, metricData := range ras.cpuSocketCounters { for _, metricData := range ras.cpuSocketCounters {
for metric, value := range metricData { for metric, value := range metricData {
if metric == levelTwoCache { if metric == levelTwoCache {
assert.Equal(t, int64(1), value, fmt.Sprintf("%s should have value of 1", levelTwoCache)) require.Equal(t, int64(1), value, fmt.Sprintf("%s should have value of 1", levelTwoCache))
} else { } else {
assert.Equal(t, int64(0), value, fmt.Sprintf("%s should have value of 0", metric)) require.Equal(t, int64(0), value, fmt.Sprintf("%s should have value of 0", metric))
} }
} }
} }
@ -117,21 +117,21 @@ func TestMissingDatabase(t *testing.T) {
ras := newRas() ras := newRas()
ras.DBPath = "/nonexistent/ras.db" ras.DBPath = "/nonexistent/ras.db"
err := ras.Start(&acc) err := ras.Start(&acc)
assert.Error(t, err) require.Error(t, err)
} }
func TestEmptyDatabase(t *testing.T) { func TestEmptyDatabase(t *testing.T) {
ras := newRas() ras := newRas()
assert.Equal(t, 1, len(ras.cpuSocketCounters), "Should contain default counters for one socket") require.Equal(t, 1, len(ras.cpuSocketCounters), "Should contain default counters for one socket")
assert.Equal(t, 2, len(ras.serverCounters), "Should contain default counters for server") require.Equal(t, 2, len(ras.serverCounters), "Should contain default counters for server")
for metric, value := range ras.cpuSocketCounters[0] { for metric, value := range ras.cpuSocketCounters[0] {
assert.Equal(t, int64(0), value, fmt.Sprintf("%s should have value of 0", metric)) require.Equal(t, int64(0), value, fmt.Sprintf("%s should have value of 0", metric))
} }
for metric, value := range ras.serverCounters { for metric, value := range ras.serverCounters {
assert.Equal(t, int64(0), value, fmt.Sprintf("%s should have value of 0", metric)) require.Equal(t, int64(0), value, fmt.Sprintf("%s should have value of 0", metric))
} }
} }

View File

@ -176,8 +176,8 @@ func (r *Redfish) Init() error {
return nil return nil
} }
func (r *Redfish) getData(url string, payload interface{}) error { func (r *Redfish) getData(address string, payload interface{}) error {
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", address, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -761,40 +761,42 @@ func TestInvalidDellJSON(t *testing.T) {
}, },
} }
for _, tt := range tests { for _, tt := range tests {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { t.Run(tt.name, func(t *testing.T) {
if !checkAuth(r, "test", "test") { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Unauthorized.", 401) if !checkAuth(r, "test", "test") {
return http.Error(w, "Unauthorized.", 401)
return
}
switch r.URL.Path {
case "/redfish/v1/Chassis/System.Embedded.1/Thermal":
http.ServeFile(w, r, tt.thermalfilename)
case "/redfish/v1/Chassis/System.Embedded.1/Power":
http.ServeFile(w, r, tt.powerfilename)
case "/redfish/v1/Chassis/System.Embedded.1":
http.ServeFile(w, r, tt.chassisfilename)
case "/redfish/v1/Systems/System.Embedded.1":
http.ServeFile(w, r, tt.hostnamefilename)
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer ts.Close()
plugin := &Redfish{
Address: ts.URL,
Username: "test",
Password: "test",
ComputerSystemID: "System.Embedded.1",
} }
switch r.URL.Path { require.NoError(t, plugin.Init())
case "/redfish/v1/Chassis/System.Embedded.1/Thermal":
http.ServeFile(w, r, tt.thermalfilename)
case "/redfish/v1/Chassis/System.Embedded.1/Power":
http.ServeFile(w, r, tt.powerfilename)
case "/redfish/v1/Chassis/System.Embedded.1":
http.ServeFile(w, r, tt.chassisfilename)
case "/redfish/v1/Systems/System.Embedded.1":
http.ServeFile(w, r, tt.hostnamefilename)
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer ts.Close()
plugin := &Redfish{ var acc testutil.Accumulator
Address: ts.URL, err := plugin.Gather(&acc)
Username: "test", require.Error(t, err)
Password: "test", require.Contains(t, err.Error(), "error parsing input:")
ComputerSystemID: "System.Embedded.1", })
}
require.NoError(t, plugin.Init())
var acc testutil.Accumulator
err := plugin.Gather(&acc)
require.Error(t, err)
require.Contains(t, err.Error(), "error parsing input:")
} }
} }

View File

@ -8,9 +8,9 @@ import (
"time" "time"
"github.com/go-redis/redis" "github.com/go-redis/redis"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil"
) )
type testClient struct { type testClient struct {
@ -165,7 +165,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
"total_writes_processed": int64(17), "total_writes_processed": int64(17),
"lazyfree_pending_objects": int64(0), "lazyfree_pending_objects": int64(0),
"maxmemory": int64(0), "maxmemory": int64(0),
"maxmemory_policy": string("noeviction"), "maxmemory_policy": "noeviction",
"mem_aof_buffer": int64(0), "mem_aof_buffer": int64(0),
"mem_clients_normal": int64(17440), "mem_clients_normal": int64(17440),
"mem_clients_slaves": int64(0), "mem_clients_slaves": int64(0),
@ -202,7 +202,7 @@ func TestRedis_ParseMetrics(t *testing.T) {
} }
} }
} }
assert.InDelta(t, require.InDelta(t,
time.Now().Unix()-fields["rdb_last_save_time"].(int64), time.Now().Unix()-fields["rdb_last_save_time"].(int64),
fields["rdb_last_save_time_elapsed"].(int64), fields["rdb_last_save_time_elapsed"].(int64),
2) // allow for 2 seconds worth of offset 2) // allow for 2 seconds worth of offset

View File

@ -3,8 +3,9 @@ package rethinkdb
import ( import (
"testing" "testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
) )
var tags = make(map[string]string) var tags = make(map[string]string)
@ -36,7 +37,7 @@ func TestAddEngineStats(t *testing.T) {
engine.AddEngineStats(keys, &acc, tags) engine.AddEngineStats(keys, &acc, tags)
for _, metric := range keys { for _, metric := range keys {
assert.True(t, acc.HasInt64Field("rethinkdb_engine", metric)) require.True(t, acc.HasInt64Field("rethinkdb_engine", metric))
} }
} }
@ -67,7 +68,7 @@ func TestAddEngineStatsPartial(t *testing.T) {
engine.AddEngineStats(keys, &acc, tags) engine.AddEngineStats(keys, &acc, tags)
for _, metric := range missingKeys { for _, metric := range missingKeys {
assert.False(t, acc.HasInt64Field("rethinkdb", metric)) require.False(t, acc.HasInt64Field("rethinkdb", metric))
} }
} }
@ -107,6 +108,6 @@ func TestAddStorageStats(t *testing.T) {
storage.AddStats(&acc, tags) storage.AddStats(&acc, tags)
for _, metric := range keys { for _, metric := range keys {
assert.True(t, acc.HasInt64Field("rethinkdb", metric)) require.True(t, acc.HasInt64Field("rethinkdb", metric))
} }
} }

View File

@ -9,9 +9,9 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/influxdata/telegraf"
"gopkg.in/gorethink/gorethink.v3" "gopkg.in/gorethink/gorethink.v3"
"github.com/influxdata/telegraf"
) )
type Server struct { type Server struct {
@ -37,7 +37,7 @@ func (s *Server) gatherData(acc telegraf.Accumulator) error {
return fmt.Errorf("error adding member stats, %s", err.Error()) return fmt.Errorf("error adding member stats, %s", err.Error())
} }
if err := s.addTableStats(acc); err != nil { if err := s.addTablesStats(acc); err != nil {
return fmt.Errorf("error adding table stats, %s", err.Error()) return fmt.Errorf("error adding table stats, %s", err.Error())
} }
@ -49,7 +49,7 @@ func (s *Server) validateVersion() error {
return errors.New("could not determine the RethinkDB server version: process.version key missing") return errors.New("could not determine the RethinkDB server version: process.version key missing")
} }
versionRegexp := regexp.MustCompile("\\d.\\d.\\d") versionRegexp := regexp.MustCompile(`\d.\d.\d`)
versionString := versionRegexp.FindString(s.serverStatus.Process.Version) versionString := versionRegexp.FindString(s.serverStatus.Process.Version)
if versionString == "" { if versionString == "" {
return fmt.Errorf("could not determine the RethinkDB server version: malformed version string (%v)", s.serverStatus.Process.Version) return fmt.Errorf("could not determine the RethinkDB server version: malformed version string (%v)", s.serverStatus.Process.Version)
@ -161,7 +161,7 @@ var TableTracking = []string{
"total_writes", "total_writes",
} }
func (s *Server) addTableStats(acc telegraf.Accumulator) error { func (s *Server) addTablesStats(acc telegraf.Accumulator) error {
tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session) tablesCursor, err := gorethink.DB("rethinkdb").Table("table_status").Run(s.session)
if err != nil { if err != nil {
return fmt.Errorf("table stats query error, %s", err.Error()) return fmt.Errorf("table stats query error, %s", err.Error())
@ -174,23 +174,33 @@ func (s *Server) addTableStats(acc telegraf.Accumulator) error {
return errors.New("could not parse table_status results") return errors.New("could not parse table_status results")
} }
for _, table := range tables { for _, table := range tables {
cursor, err := gorethink.DB("rethinkdb").Table("stats"). err = s.addTableStats(acc, table)
Get([]string{"table_server", table.ID, s.serverStatus.ID}).
Run(s.session)
if err != nil { if err != nil {
return fmt.Errorf("table stats query error, %s", err.Error()) return err
} }
defer cursor.Close()
var ts tableStats
if err := cursor.One(&ts); err != nil {
return fmt.Errorf("failure to parse table stats, %s", err.Error())
}
tags := s.getDefaultTags()
tags["type"] = "data"
tags["ns"] = fmt.Sprintf("%s.%s", table.DB, table.Name)
ts.Engine.AddEngineStats(TableTracking, acc, tags)
ts.Storage.AddStats(acc, tags)
} }
return nil return nil
} }
func (s *Server) addTableStats(acc telegraf.Accumulator, table tableStatus) error {
cursor, err := gorethink.DB("rethinkdb").Table("stats").
Get([]string{"table_server", table.ID, s.serverStatus.ID}).
Run(s.session)
if err != nil {
return fmt.Errorf("table stats query error, %s", err.Error())
}
defer cursor.Close()
var ts tableStats
if err := cursor.One(&ts); err != nil {
return fmt.Errorf("failure to parse table stats, %s", err.Error())
}
tags := s.getDefaultTags()
tags["type"] = "data"
tags["ns"] = fmt.Sprintf("%s.%s", table.DB, table.Name)
ts.Engine.AddEngineStats(TableTracking, acc, tags)
ts.Storage.AddStats(acc, tags)
return nil
}

View File

@ -6,9 +6,9 @@ package rethinkdb
import ( import (
"testing" "testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil"
) )
func TestValidateVersion(t *testing.T) { func TestValidateVersion(t *testing.T) {
@ -39,7 +39,7 @@ func TestAddClusterStats(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
for _, metric := range ClusterTracking { for _, metric := range ClusterTracking {
assert.True(t, acc.HasIntValue(metric)) require.True(t, acc.HasIntValue(metric))
} }
} }
@ -50,7 +50,7 @@ func TestAddMemberStats(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
for _, metric := range MemberTracking { for _, metric := range MemberTracking {
assert.True(t, acc.HasIntValue(metric)) require.True(t, acc.HasIntValue(metric))
} }
} }
@ -61,7 +61,7 @@ func TestAddTableStats(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
for _, metric := range TableTracking { for _, metric := range TableTracking {
assert.True(t, acc.HasIntValue(metric)) require.True(t, acc.HasIntValue(metric))
} }
keys := []string{ keys := []string{
@ -77,6 +77,6 @@ func TestAddTableStats(t *testing.T) {
} }
for _, metric := range keys { for _, metric := range keys {
assert.True(t, acc.HasIntValue(metric)) require.True(t, acc.HasIntValue(metric))
} }
} }

View File

@ -7,7 +7,6 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io" "io"
"log"
"net" "net"
"os" "os"
"os/signal" "os/signal"
@ -37,12 +36,12 @@ type RiemannSocketListener struct {
wg sync.WaitGroup wg sync.WaitGroup
Log telegraf.Logger Log telegraf.Logger `toml:"-"`
telegraf.Accumulator telegraf.Accumulator
} }
type setReadBufferer interface { type setReadBufferer interface {
SetReadBuffer(bytes int) error SetReadBuffer(sizeInBytes int) error
} }
type riemannListener struct { type riemannListener struct {
@ -162,13 +161,6 @@ func readMessages(r io.Reader, p []byte) error {
return nil return nil
} }
func checkError(err error) {
log.Println("The error is")
if err != nil {
log.Println(err.Error())
}
}
func (rsl *riemannListener) read(conn net.Conn) { func (rsl *riemannListener) read(conn net.Conn) {
defer rsl.removeConnection(conn) defer rsl.removeConnection(conn)
defer conn.Close() defer conn.Close()
@ -187,7 +179,7 @@ func (rsl *riemannListener) read(conn net.Conn) {
if err = binary.Read(conn, binary.BigEndian, &header); err != nil { if err = binary.Read(conn, binary.BigEndian, &header); err != nil {
if err.Error() != "EOF" { if err.Error() != "EOF" {
rsl.Log.Debugf("Failed to read header") rsl.Log.Debugf("Failed to read header")
riemannReturnErrorResponse(conn, err.Error()) rsl.riemannReturnErrorResponse(conn, err.Error())
return return
} }
return return
@ -196,19 +188,19 @@ func (rsl *riemannListener) read(conn net.Conn) {
if err = readMessages(conn, data); err != nil { if err = readMessages(conn, data); err != nil {
rsl.Log.Debugf("Failed to read body: %s", err.Error()) rsl.Log.Debugf("Failed to read body: %s", err.Error())
riemannReturnErrorResponse(conn, "Failed to read body") rsl.riemannReturnErrorResponse(conn, "Failed to read body")
return return
} }
if err = proto.Unmarshal(data, messagePb); err != nil { if err = proto.Unmarshal(data, messagePb); err != nil {
rsl.Log.Debugf("Failed to unmarshal: %s", err.Error()) rsl.Log.Debugf("Failed to unmarshal: %s", err.Error())
riemannReturnErrorResponse(conn, "Failed to unmarshal") rsl.riemannReturnErrorResponse(conn, "Failed to unmarshal")
return return
} }
riemannEvents := riemanngo.ProtocolBuffersToEvents(messagePb.Events) riemannEvents := riemanngo.ProtocolBuffersToEvents(messagePb.Events)
for _, m := range riemannEvents { for _, m := range riemannEvents {
if m.Service == "" { if m.Service == "" {
riemannReturnErrorResponse(conn, "No Service Name") rsl.riemannReturnErrorResponse(conn, "No Service Name")
return return
} }
tags := make(map[string]string) tags := make(map[string]string)
@ -224,53 +216,52 @@ func (rsl *riemannListener) read(conn net.Conn) {
singleMetric := metric.New(m.Service, tags, fieldValues, m.Time, telegraf.Untyped) singleMetric := metric.New(m.Service, tags, fieldValues, m.Time, telegraf.Untyped)
rsl.AddMetric(singleMetric) rsl.AddMetric(singleMetric)
} }
riemannReturnResponse(conn) rsl.riemannReturnResponse(conn)
} }
} }
func riemannReturnResponse(conn net.Conn) { func (rsl *riemannListener) riemannReturnResponse(conn net.Conn) {
t := true t := true
message := new(riemangoProto.Msg) message := new(riemangoProto.Msg)
message.Ok = &t message.Ok = &t
returnData, err := proto.Marshal(message) returnData, err := proto.Marshal(message)
if err != nil { if err != nil {
checkError(err) rsl.Log.Errorf("The error is: %v", err)
return return
} }
b := new(bytes.Buffer) b := new(bytes.Buffer)
if err = binary.Write(b, binary.BigEndian, uint32(len(returnData))); err != nil { if err = binary.Write(b, binary.BigEndian, uint32(len(returnData))); err != nil {
checkError(err) rsl.Log.Errorf("The error is: %v", err)
} }
// send the msg length // send the msg length
if _, err = conn.Write(b.Bytes()); err != nil { if _, err = conn.Write(b.Bytes()); err != nil {
checkError(err) rsl.Log.Errorf("The error is: %v", err)
} }
if _, err = conn.Write(returnData); err != nil { if _, err = conn.Write(returnData); err != nil {
checkError(err) rsl.Log.Errorf("The error is: %v", err)
} }
} }
func riemannReturnErrorResponse(conn net.Conn, errorMessage string) { func (rsl *riemannListener) riemannReturnErrorResponse(conn net.Conn, errorMessage string) {
t := false t := false
message := new(riemangoProto.Msg) message := new(riemangoProto.Msg)
message.Ok = &t message.Ok = &t
message.Error = &errorMessage message.Error = &errorMessage
returnData, err := proto.Marshal(message) returnData, err := proto.Marshal(message)
if err != nil { if err != nil {
checkError(err) rsl.Log.Errorf("The error is: %v", err)
return return
} }
b := new(bytes.Buffer) b := new(bytes.Buffer)
if err = binary.Write(b, binary.BigEndian, uint32(len(returnData))); err != nil { if err = binary.Write(b, binary.BigEndian, uint32(len(returnData))); err != nil {
checkError(err) rsl.Log.Errorf("The error is: %v", err)
} }
// send the msg length // send the msg length
if _, err = conn.Write(b.Bytes()); err != nil { if _, err = conn.Write(b.Bytes()); err != nil {
checkError(err) rsl.Log.Errorf("The error is: %v", err)
} }
if _, err = conn.Write(returnData); err != nil { if _, err = conn.Write(returnData); err != nil {
log.Println("Somethign") rsl.Log.Errorf("The error is: %v", err)
checkError(err)
} }
} }
@ -314,7 +305,7 @@ func (rsl *RiemannSocketListener) Gather(_ telegraf.Accumulator) error {
func (rsl *RiemannSocketListener) Start(acc telegraf.Accumulator) error { func (rsl *RiemannSocketListener) Start(acc telegraf.Accumulator) error {
ctx, cancelFunc := context.WithCancel(context.Background()) ctx, cancelFunc := context.WithCancel(context.Background())
go processOsSignals(cancelFunc) go rsl.processOsSignals(cancelFunc)
rsl.Accumulator = acc rsl.Accumulator = acc
if rsl.ServiceAddress == "" { if rsl.ServiceAddress == "" {
rsl.Log.Warnf("Using default service_address tcp://:5555") rsl.Log.Warnf("Using default service_address tcp://:5555")
@ -367,14 +358,13 @@ func (rsl *RiemannSocketListener) Start(acc telegraf.Accumulator) error {
} }
// Handle cancellations from the process // Handle cancellations from the process
func processOsSignals(cancelFunc context.CancelFunc) { func (rsl *RiemannSocketListener) processOsSignals(cancelFunc context.CancelFunc) {
signalChan := make(chan os.Signal, 1) signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt) signal.Notify(signalChan, os.Interrupt)
for { for {
sig := <-signalChan sig := <-signalChan
switch sig { if sig == os.Interrupt {
case os.Interrupt: rsl.Log.Warn("Signal SIGINT is received, probably due to `Ctrl-C`, exiting...")
log.Println("Signal SIGINT is received, probably due to `Ctrl-C`, exiting ...")
cancelFunc() cancelFunc()
return return
} }

View File

@ -7,7 +7,6 @@ import (
riemanngo "github.com/riemann/riemann-go-client" riemanngo "github.com/riemann/riemann-go-client"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"gotest.tools/assert"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -29,26 +28,26 @@ func TestSocketListener_tcp(t *testing.T) {
testStats(t) testStats(t)
testMissingService(t) testMissingService(t)
} }
func testStats(t *testing.T) { func testStats(t *testing.T) {
c := riemanngo.NewTCPClient("127.0.0.1:5555", 5*time.Second) c := riemanngo.NewTCPClient("127.0.0.1:5555", 5*time.Second)
err := c.Connect() err := c.Connect()
if err != nil { require.NoError(t, err)
log.Println("Error")
panic(err)
}
defer c.Close() defer c.Close()
result, err := riemanngo.SendEvent(c, &riemanngo.Event{ result, err := riemanngo.SendEvent(c, &riemanngo.Event{
Service: "hello", Service: "hello",
}) })
assert.Equal(t, result.GetOk(), true) require.NoError(t, err)
require.Equal(t, result.GetOk(), true)
} }
func testMissingService(t *testing.T) { func testMissingService(t *testing.T) {
c := riemanngo.NewTCPClient("127.0.0.1:5555", 5*time.Second) c := riemanngo.NewTCPClient("127.0.0.1:5555", 5*time.Second)
err := c.Connect() err := c.Connect()
if err != nil { require.NoError(t, err)
panic(err)
}
defer c.Close() defer c.Close()
result, err := riemanngo.SendEvent(c, &riemanngo.Event{}) result, err := riemanngo.SendEvent(c, &riemanngo.Event{})
assert.Equal(t, result.GetOk(), false) require.Equal(t, false, result.GetOk())
require.Equal(t, "No Service Name", result.GetError())
require.NoError(t, err)
} }

View File

@ -10,13 +10,13 @@ import (
"time" "time"
"github.com/gosnmp/gosnmp" "github.com/gosnmp/gosnmp"
"github.com/influxdata/toml"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/snmp" "github.com/influxdata/telegraf/internal/snmp"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/influxdata/toml"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
type testSNMPConnection struct { type testSNMPConnection struct {
@ -139,11 +139,10 @@ func TestFieldInit(t *testing.T) {
for _, txl := range translations { for _, txl := range translations {
f := Field{Oid: txl.inputOid, Name: txl.inputName, Conversion: txl.inputConversion} f := Field{Oid: txl.inputOid, Name: txl.inputName, Conversion: txl.inputConversion}
err := f.init() err := f.init()
if !assert.NoError(t, err, "inputOid='%s' inputName='%s'", txl.inputOid, txl.inputName) { require.NoError(t, err, "inputOid='%s' inputName='%s'", txl.inputOid, txl.inputName)
continue
} require.Equal(t, txl.expectedOid, f.Oid, "inputOid='%s' inputName='%s' inputConversion='%s'", txl.inputOid, txl.inputName, txl.inputConversion)
assert.Equal(t, txl.expectedOid, f.Oid, "inputOid='%s' inputName='%s' inputConversion='%s'", txl.inputOid, txl.inputName, txl.inputConversion) require.Equal(t, txl.expectedName, f.Name, "inputOid='%s' inputName='%s' inputConversion='%s'", txl.inputOid, txl.inputName, txl.inputConversion)
assert.Equal(t, txl.expectedName, f.Name, "inputOid='%s' inputName='%s' inputConversion='%s'", txl.inputOid, txl.inputName, txl.inputConversion)
} }
} }
@ -158,14 +157,14 @@ func TestTableInit(t *testing.T) {
err := tbl.Init() err := tbl.Init()
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "testTable", tbl.Name) require.Equal(t, "testTable", tbl.Name)
assert.Len(t, tbl.Fields, 5) require.Len(t, tbl.Fields, 5)
assert.Contains(t, tbl.Fields, Field{Oid: ".999", Name: "foo", initialized: true}) require.Contains(t, tbl.Fields, Field{Oid: ".999", Name: "foo", initialized: true})
assert.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.1", Name: "server", IsTag: true, initialized: true}) require.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.1", Name: "server", IsTag: true, initialized: true})
assert.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.2", Name: "connections", initialized: true}) require.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.2", Name: "connections", initialized: true})
assert.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.3", Name: "latency", initialized: true}) require.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.3", Name: "latency", initialized: true})
assert.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.4", Name: "description", IsTag: true, initialized: true}) require.Contains(t, tbl.Fields, Field{Oid: ".1.0.0.0.1.4", Name: "description", IsTag: true, initialized: true})
} }
func TestSnmpInit(t *testing.T) { func TestSnmpInit(t *testing.T) {
@ -181,13 +180,13 @@ func TestSnmpInit(t *testing.T) {
err := s.init() err := s.init()
require.NoError(t, err) require.NoError(t, err)
assert.Len(t, s.Tables[0].Fields, 4) require.Len(t, s.Tables[0].Fields, 4)
assert.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.1", Name: "server", IsTag: true, initialized: true}) require.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.1", Name: "server", IsTag: true, initialized: true})
assert.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.2", Name: "connections", initialized: true}) require.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.2", Name: "connections", initialized: true})
assert.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.3", Name: "latency", initialized: true}) require.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.3", Name: "latency", initialized: true})
assert.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.4", Name: "description", initialized: true}) require.Contains(t, s.Tables[0].Fields, Field{Oid: ".1.0.0.0.1.4", Name: "description", initialized: true})
assert.Equal(t, Field{ require.Equal(t, Field{
Oid: ".1.0.0.1.1", Oid: ".1.0.0.1.1",
Name: "hostname", Name: "hostname",
initialized: true, initialized: true,
@ -220,29 +219,29 @@ func TestSnmpInit_noTranslate(t *testing.T) {
err := s.init() err := s.init()
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, ".1.1.1.1", s.Fields[0].Oid) require.Equal(t, ".1.1.1.1", s.Fields[0].Oid)
assert.Equal(t, "one", s.Fields[0].Name) require.Equal(t, "one", s.Fields[0].Name)
assert.Equal(t, true, s.Fields[0].IsTag) require.Equal(t, true, s.Fields[0].IsTag)
assert.Equal(t, ".1.1.1.2", s.Fields[1].Oid) require.Equal(t, ".1.1.1.2", s.Fields[1].Oid)
assert.Equal(t, "two", s.Fields[1].Name) require.Equal(t, "two", s.Fields[1].Name)
assert.Equal(t, false, s.Fields[1].IsTag) require.Equal(t, false, s.Fields[1].IsTag)
assert.Equal(t, ".1.1.1.3", s.Fields[2].Oid) require.Equal(t, ".1.1.1.3", s.Fields[2].Oid)
assert.Equal(t, ".1.1.1.3", s.Fields[2].Name) require.Equal(t, ".1.1.1.3", s.Fields[2].Name)
assert.Equal(t, false, s.Fields[2].IsTag) require.Equal(t, false, s.Fields[2].IsTag)
assert.Equal(t, ".1.1.1.4", s.Tables[0].Fields[0].Oid) require.Equal(t, ".1.1.1.4", s.Tables[0].Fields[0].Oid)
assert.Equal(t, "four", s.Tables[0].Fields[0].Name) require.Equal(t, "four", s.Tables[0].Fields[0].Name)
assert.Equal(t, true, s.Tables[0].Fields[0].IsTag) require.Equal(t, true, s.Tables[0].Fields[0].IsTag)
assert.Equal(t, ".1.1.1.5", s.Tables[0].Fields[1].Oid) require.Equal(t, ".1.1.1.5", s.Tables[0].Fields[1].Oid)
assert.Equal(t, "five", s.Tables[0].Fields[1].Name) require.Equal(t, "five", s.Tables[0].Fields[1].Name)
assert.Equal(t, false, s.Tables[0].Fields[1].IsTag) require.Equal(t, false, s.Tables[0].Fields[1].IsTag)
assert.Equal(t, ".1.1.1.6", s.Tables[0].Fields[2].Oid) require.Equal(t, ".1.1.1.6", s.Tables[0].Fields[2].Oid)
assert.Equal(t, ".1.1.1.6", s.Tables[0].Fields[2].Name) require.Equal(t, ".1.1.1.6", s.Tables[0].Fields[2].Name)
assert.Equal(t, false, s.Tables[0].Fields[2].IsTag) require.Equal(t, false, s.Tables[0].Fields[2].IsTag)
} }
func TestSnmpInit_noName_noOid(t *testing.T) { func TestSnmpInit_noName_noOid(t *testing.T) {
@ -276,25 +275,25 @@ func TestGetSNMPConnection_v2(t *testing.T) {
gsc, err := s.getConnection(0) gsc, err := s.getConnection(0)
require.NoError(t, err) require.NoError(t, err)
gs := gsc.(snmp.GosnmpWrapper) gs := gsc.(snmp.GosnmpWrapper)
assert.Equal(t, "1.2.3.4", gs.Target) require.Equal(t, "1.2.3.4", gs.Target)
assert.EqualValues(t, 567, gs.Port) require.EqualValues(t, 567, gs.Port)
assert.Equal(t, gosnmp.Version2c, gs.Version) require.Equal(t, gosnmp.Version2c, gs.Version)
assert.Equal(t, "foo", gs.Community) require.Equal(t, "foo", gs.Community)
assert.Equal(t, "udp", gs.Transport) require.Equal(t, "udp", gs.Transport)
gsc, err = s.getConnection(1) gsc, err = s.getConnection(1)
require.NoError(t, err) require.NoError(t, err)
gs = gsc.(snmp.GosnmpWrapper) gs = gsc.(snmp.GosnmpWrapper)
assert.Equal(t, "1.2.3.4", gs.Target) require.Equal(t, "1.2.3.4", gs.Target)
assert.EqualValues(t, 161, gs.Port) require.EqualValues(t, 161, gs.Port)
assert.Equal(t, "udp", gs.Transport) require.Equal(t, "udp", gs.Transport)
gsc, err = s.getConnection(2) gsc, err = s.getConnection(2)
require.NoError(t, err) require.NoError(t, err)
gs = gsc.(snmp.GosnmpWrapper) gs = gsc.(snmp.GosnmpWrapper)
assert.Equal(t, "127.0.0.1", gs.Target) require.Equal(t, "127.0.0.1", gs.Target)
assert.EqualValues(t, 161, gs.Port) require.EqualValues(t, 161, gs.Port)
assert.Equal(t, "udp", gs.Transport) require.Equal(t, "udp", gs.Transport)
} }
func TestGetSNMPConnectionTCP(t *testing.T) { func TestGetSNMPConnectionTCP(t *testing.T) {
@ -313,9 +312,9 @@ func TestGetSNMPConnectionTCP(t *testing.T) {
gsc, err := s.getConnection(0) gsc, err := s.getConnection(0)
require.NoError(t, err) require.NoError(t, err)
gs := gsc.(snmp.GosnmpWrapper) gs := gsc.(snmp.GosnmpWrapper)
assert.Equal(t, "127.0.0.1", gs.Target) require.Equal(t, "127.0.0.1", gs.Target)
assert.EqualValues(t, 56789, gs.Port) require.EqualValues(t, 56789, gs.Port)
assert.Equal(t, "tcp", gs.Transport) require.Equal(t, "tcp", gs.Transport)
wg.Wait() wg.Wait()
} }
@ -353,20 +352,20 @@ func TestGetSNMPConnection_v3(t *testing.T) {
gsc, err := s.getConnection(0) gsc, err := s.getConnection(0)
require.NoError(t, err) require.NoError(t, err)
gs := gsc.(snmp.GosnmpWrapper) gs := gsc.(snmp.GosnmpWrapper)
assert.Equal(t, gs.Version, gosnmp.Version3) require.Equal(t, gs.Version, gosnmp.Version3)
sp := gs.SecurityParameters.(*gosnmp.UsmSecurityParameters) sp := gs.SecurityParameters.(*gosnmp.UsmSecurityParameters)
assert.Equal(t, "1.2.3.4", gsc.Host()) require.Equal(t, "1.2.3.4", gsc.Host())
assert.EqualValues(t, 20, gs.MaxRepetitions) require.EqualValues(t, 20, gs.MaxRepetitions)
assert.Equal(t, "mycontext", gs.ContextName) require.Equal(t, "mycontext", gs.ContextName)
assert.Equal(t, gosnmp.AuthPriv, gs.MsgFlags&gosnmp.AuthPriv) require.Equal(t, gosnmp.AuthPriv, gs.MsgFlags&gosnmp.AuthPriv)
assert.Equal(t, "myuser", sp.UserName) require.Equal(t, "myuser", sp.UserName)
assert.Equal(t, gosnmp.MD5, sp.AuthenticationProtocol) require.Equal(t, gosnmp.MD5, sp.AuthenticationProtocol)
assert.Equal(t, "password123", sp.AuthenticationPassphrase) require.Equal(t, "password123", sp.AuthenticationPassphrase)
assert.Equal(t, gosnmp.DES, sp.PrivacyProtocol) require.Equal(t, gosnmp.DES, sp.PrivacyProtocol)
assert.Equal(t, "321drowssap", sp.PrivacyPassphrase) require.Equal(t, "321drowssap", sp.PrivacyPassphrase)
assert.Equal(t, "myengineid", sp.AuthoritativeEngineID) require.Equal(t, "myengineid", sp.AuthoritativeEngineID)
assert.EqualValues(t, 1, sp.AuthoritativeEngineBoots) require.EqualValues(t, 1, sp.AuthoritativeEngineBoots)
assert.EqualValues(t, 2, sp.AuthoritativeEngineTime) require.EqualValues(t, 2, sp.AuthoritativeEngineTime)
} }
func TestGetSNMPConnection_v3_blumenthal(t *testing.T) { func TestGetSNMPConnection_v3_blumenthal(t *testing.T) {
@ -470,20 +469,20 @@ func TestGetSNMPConnection_v3_blumenthal(t *testing.T) {
gsc, err := s.getConnection(0) gsc, err := s.getConnection(0)
require.NoError(t, err) require.NoError(t, err)
gs := gsc.(snmp.GosnmpWrapper) gs := gsc.(snmp.GosnmpWrapper)
assert.Equal(t, gs.Version, gosnmp.Version3) require.Equal(t, gs.Version, gosnmp.Version3)
sp := gs.SecurityParameters.(*gosnmp.UsmSecurityParameters) sp := gs.SecurityParameters.(*gosnmp.UsmSecurityParameters)
assert.Equal(t, "1.2.3.4", gsc.Host()) require.Equal(t, "1.2.3.4", gsc.Host())
assert.EqualValues(t, 20, gs.MaxRepetitions) require.EqualValues(t, 20, gs.MaxRepetitions)
assert.Equal(t, "mycontext", gs.ContextName) require.Equal(t, "mycontext", gs.ContextName)
assert.Equal(t, gosnmp.AuthPriv, gs.MsgFlags&gosnmp.AuthPriv) require.Equal(t, gosnmp.AuthPriv, gs.MsgFlags&gosnmp.AuthPriv)
assert.Equal(t, "myuser", sp.UserName) require.Equal(t, "myuser", sp.UserName)
assert.Equal(t, gosnmp.MD5, sp.AuthenticationProtocol) require.Equal(t, gosnmp.MD5, sp.AuthenticationProtocol)
assert.Equal(t, "password123", sp.AuthenticationPassphrase) require.Equal(t, "password123", sp.AuthenticationPassphrase)
assert.Equal(t, tc.Algorithm, sp.PrivacyProtocol) require.Equal(t, tc.Algorithm, sp.PrivacyProtocol)
assert.Equal(t, "password123", sp.PrivacyPassphrase) require.Equal(t, "password123", sp.PrivacyPassphrase)
assert.Equal(t, "myengineid", sp.AuthoritativeEngineID) require.Equal(t, "myengineid", sp.AuthoritativeEngineID)
assert.EqualValues(t, 1, sp.AuthoritativeEngineBoots) require.EqualValues(t, 1, sp.AuthoritativeEngineBoots)
assert.EqualValues(t, 2, sp.AuthoritativeEngineTime) require.EqualValues(t, 2, sp.AuthoritativeEngineTime)
}) })
} }
} }
@ -502,9 +501,9 @@ func TestGetSNMPConnection_caching(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
gs4, err := s.getConnection(2) gs4, err := s.getConnection(2)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, gs1 == gs2) require.Equal(t, gs1, gs2)
assert.False(t, gs2 == gs3) require.NotEqual(t, gs2, gs3)
assert.False(t, gs3 == gs4) require.NotEqual(t, gs3, gs4)
} }
func TestGosnmpWrapper_walk_retry(t *testing.T) { func TestGosnmpWrapper_walk_retry(t *testing.T) {
@ -554,11 +553,11 @@ func TestGosnmpWrapper_walk_retry(t *testing.T) {
GoSNMP: gs, GoSNMP: gs,
} }
err = gsw.Walk(".1.0.0", func(_ gosnmp.SnmpPDU) error { return nil }) err = gsw.Walk(".1.0.0", func(_ gosnmp.SnmpPDU) error { return nil })
assert.NoError(t, srvr.Close()) require.NoError(t, srvr.Close())
wg.Wait() wg.Wait()
assert.Error(t, err) require.Error(t, err)
assert.False(t, gs.Conn == conn) require.NotEqual(t, gs.Conn, conn)
assert.Equal(t, (gs.Retries+1)*2, reqCount) require.Equal(t, (gs.Retries+1)*2, reqCount)
} }
func TestGosnmpWrapper_get_retry(t *testing.T) { func TestGosnmpWrapper_get_retry(t *testing.T) {
@ -609,9 +608,9 @@ func TestGosnmpWrapper_get_retry(t *testing.T) {
_, err = gsw.Get([]string{".1.0.0"}) _, err = gsw.Get([]string{".1.0.0"})
require.NoError(t, srvr.Close()) require.NoError(t, srvr.Close())
wg.Wait() wg.Wait()
assert.Error(t, err) require.Error(t, err)
assert.False(t, gs.Conn == conn) require.NotEqual(t, gs.Conn, conn)
assert.Equal(t, (gs.Retries+1)*2, reqCount) require.Equal(t, (gs.Retries+1)*2, reqCount)
} }
func TestTableBuild_walk(t *testing.T) { func TestTableBuild_walk(t *testing.T) {
@ -659,7 +658,7 @@ func TestTableBuild_walk(t *testing.T) {
tb, err := tbl.Build(tsc, true) tb, err := tbl.Build(tsc, true)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, tb.Name, "mytable") require.Equal(t, tb.Name, "mytable")
rtr1 := RTableRow{ rtr1 := RTableRow{
Tags: map[string]string{ Tags: map[string]string{
"myfield1": "foo", "myfield1": "foo",
@ -703,11 +702,11 @@ func TestTableBuild_walk(t *testing.T) {
"myfield3": float64(9.999), "myfield3": float64(9.999),
}, },
} }
assert.Len(t, tb.Rows, 4) require.Len(t, tb.Rows, 4)
assert.Contains(t, tb.Rows, rtr1) require.Contains(t, tb.Rows, rtr1)
assert.Contains(t, tb.Rows, rtr2) require.Contains(t, tb.Rows, rtr2)
assert.Contains(t, tb.Rows, rtr3) require.Contains(t, tb.Rows, rtr3)
assert.Contains(t, tb.Rows, rtr4) require.Contains(t, tb.Rows, rtr4)
} }
func TestTableBuild_noWalk(t *testing.T) { func TestTableBuild_noWalk(t *testing.T) {
@ -746,8 +745,8 @@ func TestTableBuild_noWalk(t *testing.T) {
Tags: map[string]string{"myfield1": "baz", "myfield3": "234"}, Tags: map[string]string{"myfield1": "baz", "myfield3": "234"},
Fields: map[string]interface{}{"myfield2": 234}, Fields: map[string]interface{}{"myfield2": 234},
} }
assert.Len(t, tb.Rows, 1) require.Len(t, tb.Rows, 1)
assert.Contains(t, tb.Rows, rtr) require.Contains(t, tb.Rows, rtr)
} }
func TestGather(t *testing.T) { func TestGather(t *testing.T) {
@ -796,21 +795,21 @@ func TestGather(t *testing.T) {
require.Len(t, acc.Metrics, 2) require.Len(t, acc.Metrics, 2)
m := acc.Metrics[0] m := acc.Metrics[0]
assert.Equal(t, "mytable", m.Measurement) require.Equal(t, "mytable", m.Measurement)
assert.Equal(t, "tsc", m.Tags[s.AgentHostTag]) require.Equal(t, "tsc", m.Tags[s.AgentHostTag])
assert.Equal(t, "baz", m.Tags["myfield1"]) require.Equal(t, "baz", m.Tags["myfield1"])
assert.Len(t, m.Fields, 2) require.Len(t, m.Fields, 2)
assert.Equal(t, 234, m.Fields["myfield2"]) require.Equal(t, 234, m.Fields["myfield2"])
assert.Equal(t, "baz", m.Fields["myfield3"]) require.Equal(t, "baz", m.Fields["myfield3"])
assert.True(t, !tstart.After(m.Time)) require.False(t, tstart.After(m.Time))
assert.True(t, !tstop.Before(m.Time)) require.False(t, tstop.Before(m.Time))
m2 := acc.Metrics[1] m2 := acc.Metrics[1]
assert.Equal(t, "myOtherTable", m2.Measurement) require.Equal(t, "myOtherTable", m2.Measurement)
assert.Equal(t, "tsc", m2.Tags[s.AgentHostTag]) require.Equal(t, "tsc", m2.Tags[s.AgentHostTag])
assert.Equal(t, "baz", m2.Tags["myfield1"]) require.Equal(t, "baz", m2.Tags["myfield1"])
assert.Len(t, m2.Fields, 1) require.Len(t, m2.Fields, 1)
assert.Equal(t, 123456, m2.Fields["myOtherField"]) require.Equal(t, 123456, m2.Fields["myOtherField"])
} }
func TestGather_host(t *testing.T) { func TestGather_host(t *testing.T) {
@ -841,7 +840,7 @@ func TestGather_host(t *testing.T) {
require.Len(t, acc.Metrics, 1) require.Len(t, acc.Metrics, 1)
m := acc.Metrics[0] m := acc.Metrics[0]
assert.Equal(t, "baz", m.Tags["host"]) require.Equal(t, "baz", m.Tags["host"])
} }
func TestFieldConvert(t *testing.T) { func TestFieldConvert(t *testing.T) {
@ -874,7 +873,7 @@ func TestFieldConvert(t *testing.T) {
{[]byte("123123123123"), "int", int64(123123123123)}, {[]byte("123123123123"), "int", int64(123123123123)},
{float32(12.3), "int", int64(12)}, {float32(12.3), "int", int64(12)},
{float64(12.3), "int", int64(12)}, {float64(12.3), "int", int64(12)},
{int(123), "int", int64(123)}, {123, "int", int64(123)},
{int8(123), "int", int64(123)}, {int8(123), "int", int64(123)},
{int16(123), "int", int64(123)}, {int16(123), "int", int64(123)},
{int32(123), "int", int64(123)}, {int32(123), "int", int64(123)},
@ -899,10 +898,8 @@ func TestFieldConvert(t *testing.T) {
for _, tc := range testTable { for _, tc := range testTable {
act, err := fieldConvert(tc.conv, tc.input) act, err := fieldConvert(tc.conv, tc.input)
if !assert.NoError(t, err, "input=%T(%v) conv=%s expected=%T(%v)", tc.input, tc.input, tc.conv, tc.expected, tc.expected) { require.NoError(t, err, "input=%T(%v) conv=%s expected=%T(%v)", tc.input, tc.input, tc.conv, tc.expected, tc.expected)
continue require.EqualValues(t, tc.expected, act, "input=%T(%v) conv=%s expected=%T(%v)", tc.input, tc.input, tc.conv, tc.expected, tc.expected)
}
assert.EqualValues(t, tc.expected, act, "input=%T(%v) conv=%s expected=%T(%v)", tc.input, tc.input, tc.conv, tc.expected, tc.expected)
} }
} }
@ -910,14 +907,14 @@ func TestSnmpTranslateCache_miss(t *testing.T) {
snmpTranslateCaches = nil snmpTranslateCaches = nil
oid := "IF-MIB::ifPhysAddress.1" oid := "IF-MIB::ifPhysAddress.1"
mibName, oidNum, oidText, conversion, err := SnmpTranslate(oid) mibName, oidNum, oidText, conversion, err := SnmpTranslate(oid)
assert.Len(t, snmpTranslateCaches, 1) require.Len(t, snmpTranslateCaches, 1)
stc := snmpTranslateCaches[oid] stc := snmpTranslateCaches[oid]
require.NotNil(t, stc) require.NotNil(t, stc)
assert.Equal(t, mibName, stc.mibName) require.Equal(t, mibName, stc.mibName)
assert.Equal(t, oidNum, stc.oidNum) require.Equal(t, oidNum, stc.oidNum)
assert.Equal(t, oidText, stc.oidText) require.Equal(t, oidText, stc.oidText)
assert.Equal(t, conversion, stc.conversion) require.Equal(t, conversion, stc.conversion)
assert.Equal(t, err, stc.err) require.Equal(t, err, stc.err)
} }
func TestSnmpTranslateCache_hit(t *testing.T) { func TestSnmpTranslateCache_hit(t *testing.T) {
@ -931,11 +928,11 @@ func TestSnmpTranslateCache_hit(t *testing.T) {
}, },
} }
mibName, oidNum, oidText, conversion, err := SnmpTranslate("foo") mibName, oidNum, oidText, conversion, err := SnmpTranslate("foo")
assert.Equal(t, "a", mibName) require.Equal(t, "a", mibName)
assert.Equal(t, "b", oidNum) require.Equal(t, "b", oidNum)
assert.Equal(t, "c", oidText) require.Equal(t, "c", oidText)
assert.Equal(t, "d", conversion) require.Equal(t, "d", conversion)
assert.Equal(t, fmt.Errorf("e"), err) require.Equal(t, fmt.Errorf("e"), err)
snmpTranslateCaches = nil snmpTranslateCaches = nil
} }
@ -943,14 +940,14 @@ func TestSnmpTableCache_miss(t *testing.T) {
snmpTableCaches = nil snmpTableCaches = nil
oid := ".1.0.0.0" oid := ".1.0.0.0"
mibName, oidNum, oidText, fields, err := snmpTable(oid) mibName, oidNum, oidText, fields, err := snmpTable(oid)
assert.Len(t, snmpTableCaches, 1) require.Len(t, snmpTableCaches, 1)
stc := snmpTableCaches[oid] stc := snmpTableCaches[oid]
require.NotNil(t, stc) require.NotNil(t, stc)
assert.Equal(t, mibName, stc.mibName) require.Equal(t, mibName, stc.mibName)
assert.Equal(t, oidNum, stc.oidNum) require.Equal(t, oidNum, stc.oidNum)
assert.Equal(t, oidText, stc.oidText) require.Equal(t, oidText, stc.oidText)
assert.Equal(t, fields, stc.fields) require.Equal(t, fields, stc.fields)
assert.Equal(t, err, stc.err) require.Equal(t, err, stc.err)
} }
func TestSnmpTableCache_hit(t *testing.T) { func TestSnmpTableCache_hit(t *testing.T) {
@ -964,11 +961,11 @@ func TestSnmpTableCache_hit(t *testing.T) {
}, },
} }
mibName, oidNum, oidText, fields, err := snmpTable("foo") mibName, oidNum, oidText, fields, err := snmpTable("foo")
assert.Equal(t, "a", mibName) require.Equal(t, "a", mibName)
assert.Equal(t, "b", oidNum) require.Equal(t, "b", oidNum)
assert.Equal(t, "c", oidText) require.Equal(t, "c", oidText)
assert.Equal(t, []Field{{Name: "d"}}, fields) require.Equal(t, []Field{{Name: "d"}}, fields)
assert.Equal(t, fmt.Errorf("e"), err) require.Equal(t, fmt.Errorf("e"), err)
} }
func TestTableJoin_walk(t *testing.T) { func TestTableJoin_walk(t *testing.T) {
@ -1007,7 +1004,7 @@ func TestTableJoin_walk(t *testing.T) {
tb, err := tbl.Build(tsc, true) tb, err := tbl.Build(tsc, true)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, tb.Name, "mytable") require.Equal(t, tb.Name, "mytable")
rtr1 := RTableRow{ rtr1 := RTableRow{
Tags: map[string]string{ Tags: map[string]string{
"myfield1": "instance", "myfield1": "instance",
@ -1041,10 +1038,10 @@ func TestTableJoin_walk(t *testing.T) {
"myfield3": 3, "myfield3": 3,
}, },
} }
assert.Len(t, tb.Rows, 3) require.Len(t, tb.Rows, 3)
assert.Contains(t, tb.Rows, rtr1) require.Contains(t, tb.Rows, rtr1)
assert.Contains(t, tb.Rows, rtr2) require.Contains(t, tb.Rows, rtr2)
assert.Contains(t, tb.Rows, rtr3) require.Contains(t, tb.Rows, rtr3)
} }
func TestTableOuterJoin_walk(t *testing.T) { func TestTableOuterJoin_walk(t *testing.T) {
@ -1084,7 +1081,7 @@ func TestTableOuterJoin_walk(t *testing.T) {
tb, err := tbl.Build(tsc, true) tb, err := tbl.Build(tsc, true)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, tb.Name, "mytable") require.Equal(t, tb.Name, "mytable")
rtr1 := RTableRow{ rtr1 := RTableRow{
Tags: map[string]string{ Tags: map[string]string{
"myfield1": "instance", "myfield1": "instance",
@ -1127,11 +1124,11 @@ func TestTableOuterJoin_walk(t *testing.T) {
"myfield5": 1, "myfield5": 1,
}, },
} }
assert.Len(t, tb.Rows, 4) require.Len(t, tb.Rows, 4)
assert.Contains(t, tb.Rows, rtr1) require.Contains(t, tb.Rows, rtr1)
assert.Contains(t, tb.Rows, rtr2) require.Contains(t, tb.Rows, rtr2)
assert.Contains(t, tb.Rows, rtr3) require.Contains(t, tb.Rows, rtr3)
assert.Contains(t, tb.Rows, rtr4) require.Contains(t, tb.Rows, rtr4)
} }
func TestTableJoinNoIndexAsTag_walk(t *testing.T) { func TestTableJoinNoIndexAsTag_walk(t *testing.T) {
@ -1170,7 +1167,7 @@ func TestTableJoinNoIndexAsTag_walk(t *testing.T) {
tb, err := tbl.Build(tsc, true) tb, err := tbl.Build(tsc, true)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, tb.Name, "mytable") require.Equal(t, tb.Name, "mytable")
rtr1 := RTableRow{ rtr1 := RTableRow{
Tags: map[string]string{ Tags: map[string]string{
"myfield1": "instance", "myfield1": "instance",
@ -1204,8 +1201,8 @@ func TestTableJoinNoIndexAsTag_walk(t *testing.T) {
"myfield3": 3, "myfield3": 3,
}, },
} }
assert.Len(t, tb.Rows, 3) require.Len(t, tb.Rows, 3)
assert.Contains(t, tb.Rows, rtr1) require.Contains(t, tb.Rows, rtr1)
assert.Contains(t, tb.Rows, rtr2) require.Contains(t, tb.Rows, rtr2)
assert.Contains(t, tb.Rows, rtr3) require.Contains(t, tb.Rows, rtr3)
} }

View File

@ -62,7 +62,7 @@ package udp_listener
// // } // // }
// // listener.Stop() // // listener.Stop()
// // assert.Equal(t, uint64(100000), acc.NMetrics()) // // require.Equal(t, uint64(100000), acc.NMetrics())
// // } // // }
// func TestConnectUDP(t *testing.T) { // func TestConnectUDP(t *testing.T) {

View File

@ -3,23 +3,25 @@ package filestack
import ( import (
"encoding/json" "encoding/json"
"io" "io"
"log"
"net/http" "net/http"
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
type FilestackWebhook struct { type FilestackWebhook struct {
Path string Path string
acc telegraf.Accumulator acc telegraf.Accumulator
log telegraf.Logger
} }
func (fs *FilestackWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { func (fs *FilestackWebhook) Register(router *mux.Router, acc telegraf.Accumulator, log telegraf.Logger) {
router.HandleFunc(fs.Path, fs.eventHandler).Methods("POST") router.HandleFunc(fs.Path, fs.eventHandler).Methods("POST")
log.Printf("I! Started the webhooks_filestack on %s\n", fs.Path) fs.log = log
fs.log.Infof("Started the webhooks_filestack on %s", fs.Path)
fs.acc = acc fs.acc = acc
} }

View File

@ -6,10 +6,10 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"io" "io"
"log"
"net/http" "net/http"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
@ -17,11 +17,14 @@ type GithubWebhook struct {
Path string Path string
Secret string Secret string
acc telegraf.Accumulator acc telegraf.Accumulator
log telegraf.Logger
} }
func (gh *GithubWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { func (gh *GithubWebhook) Register(router *mux.Router, acc telegraf.Accumulator, log telegraf.Logger) {
router.HandleFunc(gh.Path, gh.eventHandler).Methods("POST") router.HandleFunc(gh.Path, gh.eventHandler).Methods("POST")
log.Printf("I! Started the webhooks_github on %s\n", gh.Path)
gh.log = log
gh.log.Infof("Started the webhooks_github on %s", gh.Path)
gh.acc = acc gh.acc = acc
} }
@ -35,12 +38,12 @@ func (gh *GithubWebhook) eventHandler(w http.ResponseWriter, r *http.Request) {
} }
if gh.Secret != "" && !checkSignature(gh.Secret, data, r.Header.Get("X-Hub-Signature")) { if gh.Secret != "" && !checkSignature(gh.Secret, data, r.Header.Get("X-Hub-Signature")) {
log.Printf("E! Fail to check the github webhook signature\n") gh.log.Error("Fail to check the github webhook signature")
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
e, err := NewEvent(data, eventType) e, err := gh.NewEvent(data, eventType)
if err != nil { if err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
@ -69,8 +72,8 @@ func (e *newEventError) Error() string {
return e.s return e.s
} }
func NewEvent(data []byte, name string) (Event, error) { func (gh *GithubWebhook) NewEvent(data []byte, name string) (Event, error) {
log.Printf("D! New %v event received", name) gh.log.Debugf("New %v event received", name)
switch name { switch name {
case "commit_comment": case "commit_comment":
return generateEvent(data, &CommitCommentEvent{}) return generateEvent(data, &CommitCommentEvent{})

View File

@ -11,7 +11,7 @@ import (
func GithubWebhookRequest(event string, jsonString string, t *testing.T) { func GithubWebhookRequest(event string, jsonString string, t *testing.T) {
var acc testutil.Accumulator var acc testutil.Accumulator
gh := &GithubWebhook{Path: "/github", acc: &acc} gh := &GithubWebhook{Path: "/github", acc: &acc, log: testutil.Logger{}}
req, _ := http.NewRequest("POST", "/github", strings.NewReader(jsonString)) req, _ := http.NewRequest("POST", "/github", strings.NewReader(jsonString))
req.Header.Add("X-Github-Event", event) req.Header.Add("X-Github-Event", event)
w := httptest.NewRecorder() w := httptest.NewRecorder()
@ -23,7 +23,7 @@ func GithubWebhookRequest(event string, jsonString string, t *testing.T) {
func GithubWebhookRequestWithSignature(event string, jsonString string, t *testing.T, signature string, expectedStatus int) { func GithubWebhookRequestWithSignature(event string, jsonString string, t *testing.T, signature string, expectedStatus int) {
var acc testutil.Accumulator var acc testutil.Accumulator
gh := &GithubWebhook{Path: "/github", Secret: "signature", acc: &acc} gh := &GithubWebhook{Path: "/github", Secret: "signature", acc: &acc, log: testutil.Logger{}}
req, _ := http.NewRequest("POST", "/github", strings.NewReader(jsonString)) req, _ := http.NewRequest("POST", "/github", strings.NewReader(jsonString))
req.Header.Add("X-Github-Event", event) req.Header.Add("X-Github-Event", event)
req.Header.Add("X-Hub-Signature", signature) req.Header.Add("X-Hub-Signature", signature)

View File

@ -3,25 +3,27 @@ package mandrill
import ( import (
"encoding/json" "encoding/json"
"io" "io"
"log"
"net/http" "net/http"
"net/url" "net/url"
"time" "time"
"github.com/gorilla/mux"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/gorilla/mux"
) )
type MandrillWebhook struct { type MandrillWebhook struct {
Path string Path string
acc telegraf.Accumulator acc telegraf.Accumulator
log telegraf.Logger
} }
func (md *MandrillWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { func (md *MandrillWebhook) Register(router *mux.Router, acc telegraf.Accumulator, log telegraf.Logger) {
router.HandleFunc(md.Path, md.returnOK).Methods("HEAD") router.HandleFunc(md.Path, md.returnOK).Methods("HEAD")
router.HandleFunc(md.Path, md.eventHandler).Methods("POST") router.HandleFunc(md.Path, md.eventHandler).Methods("POST")
log.Printf("I! Started the webhooks_mandrill on %s\n", md.Path) md.log = log
md.log.Infof("Started the webhooks_mandrill on %s", md.Path)
md.acc = acc md.acc = acc
} }

View File

@ -3,22 +3,24 @@ package papertrail
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"net/http" "net/http"
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
type PapertrailWebhook struct { type PapertrailWebhook struct {
Path string Path string
acc telegraf.Accumulator acc telegraf.Accumulator
log telegraf.Logger
} }
func (pt *PapertrailWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { func (pt *PapertrailWebhook) Register(router *mux.Router, acc telegraf.Accumulator, log telegraf.Logger) {
router.HandleFunc(pt.Path, pt.eventHandler).Methods("POST") router.HandleFunc(pt.Path, pt.eventHandler).Methods("POST")
log.Printf("I! Started the papertrail_webhook on %s", pt.Path) pt.log = log
pt.log.Infof("Started the papertrail_webhook on %s", pt.Path)
pt.acc = acc pt.acc = acc
} }
@ -64,7 +66,7 @@ func (pt *PapertrailWebhook) eventHandler(w http.ResponseWriter, r *http.Request
} }
pt.acc.AddFields("papertrail", fields, tags, e.ReceivedAt) pt.acc.AddFields("papertrail", fields, tags, e.ReceivedAt)
} }
} else if payload.Counts != nil { } else if payload.Counts != nil { //nolint:revive // Not simplifying here to stay in the structure for better understanding the code
// Handle count-based payload // Handle count-based payload
for _, c := range payload.Counts { for _, c := range payload.Counts {
for ts, count := range *c.TimeSeries { for ts, count := range *c.TimeSeries {

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
@ -38,10 +39,13 @@ func (e *event) Time() (time.Time, error) {
type ParticleWebhook struct { type ParticleWebhook struct {
Path string Path string
acc telegraf.Accumulator acc telegraf.Accumulator
log telegraf.Logger
} }
func (rb *ParticleWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { func (rb *ParticleWebhook) Register(router *mux.Router, acc telegraf.Accumulator, log telegraf.Logger) {
router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST") router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST")
rb.log = log
rb.log.Infof("Started the webhooks_particle on %s", rb.Path)
rb.acc = acc rb.acc = acc
} }

View File

@ -4,22 +4,24 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"io" "io"
"log"
"net/http" "net/http"
"time" "time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
type RollbarWebhook struct { type RollbarWebhook struct {
Path string Path string
acc telegraf.Accumulator acc telegraf.Accumulator
log telegraf.Logger
} }
func (rb *RollbarWebhook) Register(router *mux.Router, acc telegraf.Accumulator) { func (rb *RollbarWebhook) Register(router *mux.Router, acc telegraf.Accumulator, log telegraf.Logger) {
router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST") router.HandleFunc(rb.Path, rb.eventHandler).Methods("POST")
log.Printf("I! Started the webhooks_rollbar on %s\n", rb.Path) rb.log = log
rb.log.Infof("Started the webhooks_rollbar on %s", rb.Path)
rb.acc = acc rb.acc = acc
} }

View File

@ -7,9 +7,9 @@ import (
"reflect" "reflect"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/filestack" "github.com/influxdata/telegraf/plugins/inputs/webhooks/filestack"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/github" "github.com/influxdata/telegraf/plugins/inputs/webhooks/github"
"github.com/influxdata/telegraf/plugins/inputs/webhooks/mandrill" "github.com/influxdata/telegraf/plugins/inputs/webhooks/mandrill"
@ -19,7 +19,7 @@ import (
) )
type Webhook interface { type Webhook interface {
Register(router *mux.Router, acc telegraf.Accumulator) Register(router *mux.Router, acc telegraf.Accumulator, log telegraf.Logger)
} }
func init() { func init() {
@ -79,7 +79,7 @@ func (wb *Webhooks) Gather(_ telegraf.Accumulator) error {
return nil return nil
} }
// Looks for fields which implement Webhook interface // AvailableWebhooks Looks for fields which implement Webhook interface
func (wb *Webhooks) AvailableWebhooks() []Webhook { func (wb *Webhooks) AvailableWebhooks() []Webhook {
webhooks := make([]Webhook, 0) webhooks := make([]Webhook, 0)
s := reflect.ValueOf(wb).Elem() s := reflect.ValueOf(wb).Elem()
@ -104,7 +104,7 @@ func (wb *Webhooks) Start(acc telegraf.Accumulator) error {
r := mux.NewRouter() r := mux.NewRouter()
for _, webhook := range wb.AvailableWebhooks() { for _, webhook := range wb.AvailableWebhooks() {
webhook.Register(r, acc) webhook.Register(r, acc, wb.Log)
} }
wb.srv = &http.Server{Handler: r} wb.srv = &http.Server{Handler: r}