Linter fixes (unhandled errors) -- Part 2 (#9122)

This commit is contained in:
Sven Rebhan 2021-04-22 23:08:03 +02:00 committed by GitHub
parent 4d00e21630
commit 03b2daeb1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 641 additions and 461 deletions

View File

@ -30,7 +30,7 @@ func TestGatherQueuesMetrics(t *testing.T) {
queues := Queues{}
xml.Unmarshal([]byte(s), &queues)
require.NoError(t, xml.Unmarshal([]byte(s), &queues))
records := make(map[string]interface{})
tags := make(map[string]string)
@ -49,7 +49,7 @@ func TestGatherQueuesMetrics(t *testing.T) {
activeMQ := new(ActiveMQ)
activeMQ.Server = "localhost"
activeMQ.Port = 8161
activeMQ.Init()
require.NoError(t, activeMQ.Init())
activeMQ.GatherQueuesMetrics(&acc, queues)
acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags)
@ -76,7 +76,7 @@ func TestGatherTopicsMetrics(t *testing.T) {
topics := Topics{}
xml.Unmarshal([]byte(s), &topics)
require.NoError(t, xml.Unmarshal([]byte(s), &topics))
records := make(map[string]interface{})
tags := make(map[string]string)
@ -95,7 +95,7 @@ func TestGatherTopicsMetrics(t *testing.T) {
activeMQ := new(ActiveMQ)
activeMQ.Server = "localhost"
activeMQ.Port = 8161
activeMQ.Init()
require.NoError(t, activeMQ.Init())
activeMQ.GatherTopicsMetrics(&acc, topics)
acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags)
@ -110,7 +110,7 @@ func TestGatherSubscribersMetrics(t *testing.T) {
subscribers := Subscribers{}
xml.Unmarshal([]byte(s), &subscribers)
require.NoError(t, xml.Unmarshal([]byte(s), &subscribers))
records := make(map[string]interface{})
tags := make(map[string]string)
@ -135,7 +135,7 @@ func TestGatherSubscribersMetrics(t *testing.T) {
activeMQ := new(ActiveMQ)
activeMQ.Server = "localhost"
activeMQ.Port = 8161
activeMQ.Init()
require.NoError(t, activeMQ.Init())
activeMQ.GatherSubscribersMetrics(&acc, subscribers)
acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags)
@ -149,13 +149,16 @@ func TestURLs(t *testing.T) {
switch r.URL.Path {
case "/admin/xml/queues.jsp":
w.WriteHeader(http.StatusOK)
w.Write([]byte("<queues></queues>"))
_, err := w.Write([]byte("<queues></queues>"))
require.NoError(t, err)
case "/admin/xml/topics.jsp":
w.WriteHeader(http.StatusOK)
w.Write([]byte("<topics></topics>"))
_, err := w.Write([]byte("<topics></topics>"))
require.NoError(t, err)
case "/admin/xml/subscribers.jsp":
w.WriteHeader(http.StatusOK)
w.Write([]byte("<subscribers></subscribers>"))
_, err := w.Write([]byte("<subscribers></subscribers>"))
require.NoError(t, err)
default:
w.WriteHeader(http.StatusNotFound)
t.Fatalf("unexpected path: " + r.URL.Path)

View File

@ -31,7 +31,8 @@ Scoreboard: WW_____W_RW_R_W__RRR____WR_W___WW________W_WW_W_____R__R_WR__WRWR_RR
func TestHTTPApache(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, apacheStatus)
_, err := fmt.Fprintln(w, apacheStatus)
require.NoError(t, err)
}))
defer ts.Close()

View File

@ -43,7 +43,7 @@ func listen(ctx context.Context, t *testing.T, out [][]byte) (string, error) {
continue
}
defer conn.Close()
conn.SetReadDeadline(time.Now().Add(time.Second))
require.NoError(t, conn.SetReadDeadline(time.Now().Add(time.Second)))
in := make([]byte, 128)
n, err := conn.Read(in)

View File

@ -190,7 +190,9 @@ func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (RoleType, err
if err != nil {
return Unknown, err
}
resp.Body.Close()
if err := resp.Body.Close(); err != nil {
return Unknown, fmt.Errorf("closing body failed: %v", err)
}
switch resp.StatusCode {
case http.StatusOK:

View File

@ -46,7 +46,8 @@ func TestAurora(t *testing.T) {
"variable_scrape_micros_total_per_sec": 1485.0
}`
w.WriteHeader(http.StatusOK)
w.Write([]byte(body))
_, err := w.Write([]byte(body))
require.NoError(t, err)
},
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
require.NoError(t, err)
@ -86,7 +87,8 @@ func TestAurora(t *testing.T) {
},
varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("{}"))
_, err := w.Write([]byte("{}"))
require.NoError(t, err)
},
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
require.NoError(t, err)
@ -104,7 +106,8 @@ func TestAurora(t *testing.T) {
"foo": "bar"
}`
w.WriteHeader(http.StatusOK)
w.Write([]byte(body))
_, err := w.Write([]byte(body))
require.NoError(t, err)
},
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
require.NoError(t, err)
@ -123,7 +126,8 @@ func TestAurora(t *testing.T) {
"foo": 1e309
}`
w.WriteHeader(http.StatusOK)
w.Write([]byte(body))
_, err := w.Write([]byte(body))
require.NoError(t, err)
},
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
require.NoError(t, err)
@ -142,7 +146,8 @@ func TestAurora(t *testing.T) {
"foo": 9223372036854775808
}`
w.WriteHeader(http.StatusOK)
w.Write([]byte(body))
_, err := w.Write([]byte(body))
require.NoError(t, err)
},
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
require.NoError(t, err)
@ -158,7 +163,8 @@ func TestAurora(t *testing.T) {
varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
body := `{]`
w.WriteHeader(http.StatusOK)
w.Write([]byte(body))
_, err := w.Write([]byte(body))
require.NoError(t, err)
},
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
require.NoError(t, err)
@ -176,7 +182,8 @@ func TestAurora(t *testing.T) {
"value": 42
}`
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(body))
_, err := w.Write([]byte(body))
require.NoError(t, err)
},
check: func(t *testing.T, err error, acc *testutil.Accumulator) {
require.NoError(t, err)
@ -244,7 +251,8 @@ func TestBasicAuth(t *testing.T) {
require.Equal(t, tt.username, username)
require.Equal(t, tt.password, password)
w.WriteHeader(http.StatusOK)
w.Write([]byte("{}"))
_, err := w.Write([]byte("{}"))
require.NoError(t, err)
})
var acc testutil.Accumulator

View File

@ -6,6 +6,7 @@ package bcache
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
@ -128,7 +129,7 @@ func (b *Bcache) Gather(acc telegraf.Accumulator) error {
}
bdevs, _ := filepath.Glob(bcachePath + "/*/bdev*")
if len(bdevs) < 1 {
return errors.New("Can't find any bcache device")
return errors.New("can't find any bcache device")
}
for _, bdev := range bdevs {
if restrictDevs {
@ -137,7 +138,9 @@ func (b *Bcache) Gather(acc telegraf.Accumulator) error {
continue
}
}
b.gatherBcache(bdev, acc)
if err := b.gatherBcache(bdev, acc); err != nil {
return fmt.Errorf("gathering bcache failed: %v", err)
}
}
return nil
}

View File

@ -62,7 +62,10 @@ func (b *Beanstalkd) Gather(acc telegraf.Accumulator) error {
for _, tube := range tubes {
wg.Add(1)
go func(tube string) {
b.gatherTubeStats(connection, tube, acc)
err := b.gatherTubeStats(connection, tube, acc)
if err != nil {
acc.AddError(err)
}
wg.Done()
}(tube)
}

View File

@ -22,6 +22,7 @@ func TestBeanstalkd(t *testing.T) {
tubesConfig []string
expectedTubes []tubeStats
notExpectedTubes []tubeStats
expectedError string
}{
{
name: "All tubes stats",
@ -50,15 +51,14 @@ func TestBeanstalkd(t *testing.T) {
{name: "default", fields: defaultTubeFields},
{name: "test", fields: testTubeFields},
},
expectedError: "input does not match format",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server, err := startTestServer(t)
if err != nil {
t.Fatalf("Unable to create test server")
}
require.NoError(t, err, "Unable to create test server")
defer server.Close()
serverAddress := server.Addr().String()
@ -68,8 +68,13 @@ func TestBeanstalkd(t *testing.T) {
}
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))
err = acc.GatherError(plugin.Gather)
if test.expectedError == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Equal(t, test.expectedError, err.Error())
}
acc.AssertContainsTaggedFields(t, "beanstalkd_overview",
overviewFields,
getOverviewTags(serverAddress),
@ -110,8 +115,8 @@ func startTestServer(t *testing.T) (net.Listener, error) {
tp := textproto.NewConn(connection)
defer tp.Close()
sendSuccessResponse := func(body string) {
tp.PrintfLine("OK %d\r\n%s", len(body), body)
sendSuccessResponse := func(body string) error {
return tp.PrintfLine("OK %d\r\n%s", len(body), body)
}
for {
@ -125,15 +130,30 @@ func startTestServer(t *testing.T) (net.Listener, error) {
switch cmd {
case "list-tubes":
sendSuccessResponse(listTubesResponse)
if err := sendSuccessResponse(listTubesResponse); err != nil {
t.Logf("sending response %q failed: %v", listTubesResponse, err)
return
}
case "stats":
sendSuccessResponse(statsResponse)
if err := sendSuccessResponse(statsResponse); err != nil {
t.Logf("sending response %q failed: %v", statsResponse, err)
return
}
case "stats-tube default":
sendSuccessResponse(statsTubeDefaultResponse)
if err := sendSuccessResponse(statsTubeDefaultResponse); err != nil {
t.Logf("sending response %q failed: %v", statsTubeDefaultResponse, err)
return
}
case "stats-tube test":
sendSuccessResponse(statsTubeTestResponse)
if err := sendSuccessResponse(statsTubeTestResponse); err != nil {
t.Logf("sending response %q failed: %v", statsTubeTestResponse, err)
return
}
case "stats-tube unknown":
tp.PrintfLine("NOT_FOUND")
if err := tp.PrintfLine("NOT_FOUND"); err != nil {
t.Logf("sending response %q failed: %v", "NOT_FOUND", err)
return
}
default:
t.Log("Test server: unknown command")
}

View File

@ -10,18 +10,15 @@ import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func Test_BeatStats(test *testing.T) {
func Test_BeatStats(t *testing.T) {
var beat6StatsAccumulator testutil.Accumulator
var beatTest = NewBeat()
// System stats are disabled by default
beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"}
err := beatTest.Init()
if err != nil {
panic(fmt.Sprintf("could not init beat: %s", err))
}
require.NoError(t, beatTest.Init())
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, request *http.Request) {
var jsonFilePath string
@ -31,35 +28,26 @@ func Test_BeatStats(test *testing.T) {
case suffixStats:
jsonFilePath = "beat6_stats.json"
default:
panic("Cannot handle request")
require.FailNow(t, "cannot handle request")
}
data, err := ioutil.ReadFile(jsonFilePath)
if err != nil {
panic(fmt.Sprintf("could not read from data file %s", jsonFilePath))
}
w.Write(data)
require.NoErrorf(t, err, "could not read from data file %s", jsonFilePath)
_, err = w.Write(data)
require.NoError(t, err, "could not write data")
}))
requestURL, err := url.Parse(beatTest.URL)
if err != nil {
test.Logf("Can't parse URL %s", beatTest.URL)
}
require.NoErrorf(t, err, "can't parse URL %s", beatTest.URL)
fakeServer.Listener, err = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
if err != nil {
test.Logf("Can't listen for %s: %v", requestURL, err)
}
require.NoErrorf(t, err, "can't listen for %s: %v", requestURL, err)
fakeServer.Start()
defer fakeServer.Close()
err = beatTest.Gather(&beat6StatsAccumulator)
if err != nil {
test.Logf("Can't gather stats")
}
require.NoError(t, err, beatTest.Gather(&beat6StatsAccumulator))
beat6StatsAccumulator.AssertContainsTaggedFields(
test,
t,
"beat",
map[string]interface{}{
"cpu_system_ticks": float64(626970),
@ -85,7 +73,7 @@ func Test_BeatStats(test *testing.T) {
},
)
beat6StatsAccumulator.AssertContainsTaggedFields(
test,
t,
"beat_filebeat",
map[string]interface{}{
"events_active": float64(0),
@ -108,7 +96,7 @@ func Test_BeatStats(test *testing.T) {
},
)
beat6StatsAccumulator.AssertContainsTaggedFields(
test,
t,
"beat_libbeat",
map[string]interface{}{
"config_module_running": float64(0),
@ -148,7 +136,7 @@ func Test_BeatStats(test *testing.T) {
},
)
beat6StatsAccumulator.AssertContainsTaggedFields(
test,
t,
"beat_system",
map[string]interface{}{
"cpu_cores": float64(32),
@ -169,15 +157,12 @@ func Test_BeatStats(test *testing.T) {
)
}
func Test_BeatRequest(test *testing.T) {
func Test_BeatRequest(t *testing.T) {
var beat6StatsAccumulator testutil.Accumulator
beatTest := NewBeat()
// System stats are disabled by default
beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"}
err := beatTest.Init()
if err != nil {
panic(fmt.Sprintf("could not init beat: %s", err))
}
require.NoError(t, beatTest.Init())
fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, request *http.Request) {
var jsonFilePath string
@ -187,30 +172,24 @@ func Test_BeatRequest(test *testing.T) {
case suffixStats:
jsonFilePath = "beat6_stats.json"
default:
panic("Cannot handle request")
require.FailNow(t, "cannot handle request")
}
data, err := ioutil.ReadFile(jsonFilePath)
require.NoErrorf(t, err, "could not read from data file %s", jsonFilePath)
require.Equal(t, request.Host, "beat.test.local")
require.Equal(t, request.Method, "POST")
require.Equal(t, request.Header.Get("Authorization"), "Basic YWRtaW46UFdE")
require.Equal(t, request.Header.Get("X-Test"), "test-value")
if err != nil {
panic(fmt.Sprintf("could not read from data file %s", jsonFilePath))
}
assert.Equal(test, request.Host, "beat.test.local")
assert.Equal(test, request.Method, "POST")
assert.Equal(test, request.Header.Get("Authorization"), "Basic YWRtaW46UFdE")
assert.Equal(test, request.Header.Get("X-Test"), "test-value")
w.Write(data)
_, err = w.Write(data)
require.NoError(t, err, "could not write data")
}))
requestURL, err := url.Parse(beatTest.URL)
if err != nil {
test.Logf("Can't parse URL %s", beatTest.URL)
}
require.NoErrorf(t, err, "can't parse URL %s", beatTest.URL)
fakeServer.Listener, err = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
if err != nil {
test.Logf("Can't listen for %s: %v", requestURL, err)
}
require.NoErrorf(t, err, "can't listen for %s: %v", requestURL, err)
fakeServer.Start()
defer fakeServer.Close()
@ -220,8 +199,5 @@ func Test_BeatRequest(test *testing.T) {
beatTest.Username = "admin"
beatTest.Password = "PWD"
err = beatTest.Gather(&beat6StatsAccumulator)
if err != nil {
test.Logf("Can't gather stats")
}
require.NoError(t, beatTest.Gather(&beat6StatsAccumulator))
}

View File

@ -58,7 +58,9 @@ func addJSONCounter(acc telegraf.Accumulator, commonTags map[string]string, stat
tags[k] = v
}
grouper.Add("bind_counter", tags, ts, name, value)
if err := grouper.Add("bind_counter", tags, ts, name, value); err != nil {
acc.AddError(fmt.Errorf("adding field %q to group failed: %v", name, err))
}
}
//Add grouped metrics
@ -133,7 +135,9 @@ func (b *Bind) addStatsJSON(stats jsonStats, acc telegraf.Accumulator, urlTag st
"type": cntrType,
}
grouper.Add("bind_counter", tags, ts, cntrName, value)
if err := grouper.Add("bind_counter", tags, ts, cntrName, value); err != nil {
acc.AddError(fmt.Errorf("adding tags %q to group failed: %v", tags, err))
}
}
}
}

View File

@ -75,7 +75,9 @@ func addXMLv2Counter(acc telegraf.Accumulator, commonTags map[string]string, sta
tags[k] = v
}
grouper.Add("bind_counter", tags, ts, c.Name, c.Value)
if err := grouper.Add("bind_counter", tags, ts, c.Name, c.Value); err != nil {
acc.AddError(fmt.Errorf("adding field %q to group failed: %v", c.Name, err))
}
}
//Add grouped metrics

View File

@ -81,7 +81,9 @@ func (b *Bind) addStatsXMLv3(stats v3Stats, acc telegraf.Accumulator, hostPort s
tags := map[string]string{"url": hostPort, "source": host, "port": port, "type": cg.Type}
grouper.Add("bind_counter", tags, ts, c.Name, c.Value)
if err := grouper.Add("bind_counter", tags, ts, c.Name, c.Value); err != nil {
acc.AddError(fmt.Errorf("adding tags %q to group failed: %v", tags, err))
}
}
}
@ -118,7 +120,9 @@ func (b *Bind) addStatsXMLv3(stats v3Stats, acc telegraf.Accumulator, hostPort s
"type": cg.Type,
}
grouper.Add("bind_counter", tags, ts, c.Name, c.Value)
if err := grouper.Add("bind_counter", tags, ts, c.Name, c.Value); err != nil {
acc.AddError(fmt.Errorf("adding tags %q to group failed: %v", tags, err))
}
}
}
}

View File

@ -4,6 +4,7 @@ import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var sampleTest802 = `
@ -65,12 +66,12 @@ func TestGatherBondInterface(t *testing.T) {
var acc testutil.Accumulator
bond := &Bond{}
bond.gatherBondInterface("bond802", sampleTest802, &acc)
require.NoError(t, bond.gatherBondInterface("bond802", sampleTest802, &acc))
acc.AssertContainsTaggedFields(t, "bond", map[string]interface{}{"status": 1}, map[string]string{"bond": "bond802"})
acc.AssertContainsTaggedFields(t, "bond_slave", map[string]interface{}{"failures": 0, "status": 1}, map[string]string{"bond": "bond802", "interface": "eth1"})
acc.AssertContainsTaggedFields(t, "bond_slave", map[string]interface{}{"failures": 3, "status": 1}, map[string]string{"bond": "bond802", "interface": "eth2"})
bond.gatherBondInterface("bondAB", sampleTestAB, &acc)
require.NoError(t, bond.gatherBondInterface("bondAB", sampleTestAB, &acc))
acc.AssertContainsTaggedFields(t, "bond", map[string]interface{}{"active_slave": "eth2", "status": 1}, map[string]string{"bond": "bondAB"})
acc.AssertContainsTaggedFields(t, "bond_slave", map[string]interface{}{"failures": 2, "status": 0}, map[string]string{"bond": "bondAB", "interface": "eth3"})
acc.AssertContainsTaggedFields(t, "bond_slave", map[string]interface{}{"failures": 0, "status": 1}, map[string]string{"bond": "bondAB", "interface": "eth2"})

View File

@ -253,8 +253,10 @@ func (m *metric) name() string {
buf := bytes.Buffer{}
for i := len(m.pathStack) - 1; i >= 0; i-- {
if buf.Len() > 0 {
//nolint:errcheck,revive // should never return an error
buf.WriteString(".")
}
//nolint:errcheck,revive // should never return an error
buf.WriteString(m.pathStack[i])
}
return buf.String()

View File

@ -10,7 +10,7 @@ import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
@ -25,41 +25,41 @@ type expectedResult struct {
func TestParseSockId(t *testing.T) {
s := parseSockID(sockFile(osdPrefix, 1), osdPrefix, sockSuffix)
assert.Equal(t, s, "1")
require.Equal(t, s, "1")
}
func TestParseMonDump(t *testing.T) {
dump, err := parseDump(monPerfDump)
assert.NoError(t, err)
assert.InEpsilon(t, int64(5678670180), dump["cluster"]["osd_kb_used"], epsilon)
assert.InEpsilon(t, 6866.540527000, dump["paxos"]["store_state_latency.sum"], epsilon)
require.NoError(t, err)
require.InEpsilon(t, int64(5678670180), dump["cluster"]["osd_kb_used"], epsilon)
require.InEpsilon(t, 6866.540527000, dump["paxos"]["store_state_latency.sum"], epsilon)
}
func TestParseOsdDump(t *testing.T) {
dump, err := parseDump(osdPerfDump)
assert.NoError(t, err)
assert.InEpsilon(t, 552132.109360000, dump["filestore"]["commitcycle_interval.sum"], epsilon)
assert.Equal(t, float64(0), dump["mutex-FileJournal::finisher_lock"]["wait.avgcount"])
require.NoError(t, err)
require.InEpsilon(t, 552132.109360000, dump["filestore"]["commitcycle_interval.sum"], epsilon)
require.Equal(t, float64(0), dump["mutex-FileJournal::finisher_lock"]["wait.avgcount"])
}
func TestParseMdsDump(t *testing.T) {
dump, err := parseDump(mdsPerfDump)
assert.NoError(t, err)
assert.InEpsilon(t, 2408386.600934982, dump["mds"]["reply_latency.sum"], epsilon)
assert.Equal(t, float64(0), dump["throttle-write_buf_throttle"]["wait.avgcount"])
require.NoError(t, err)
require.InEpsilon(t, 2408386.600934982, dump["mds"]["reply_latency.sum"], epsilon)
require.Equal(t, float64(0), dump["throttle-write_buf_throttle"]["wait.avgcount"])
}
func TestParseRgwDump(t *testing.T) {
dump, err := parseDump(rgwPerfDump)
assert.NoError(t, err)
assert.InEpsilon(t, 0.002219876, dump["rgw"]["get_initial_lat.sum"], epsilon)
assert.Equal(t, float64(0), dump["rgw"]["put_initial_lat.avgcount"])
require.NoError(t, err)
require.InEpsilon(t, 0.002219876, dump["rgw"]["get_initial_lat.sum"], epsilon)
require.Equal(t, float64(0), dump["rgw"]["put_initial_lat.avgcount"])
}
func TestDecodeStatus(t *testing.T) {
acc := &testutil.Accumulator{}
err := decodeStatus(acc, clusterStatusDump)
assert.NoError(t, err)
require.NoError(t, err)
for _, r := range cephStatusResults {
acc.AssertContainsTaggedFields(t, r.metric, r.fields, r.tags)
@ -69,7 +69,7 @@ func TestDecodeStatus(t *testing.T) {
func TestDecodeDf(t *testing.T) {
acc := &testutil.Accumulator{}
err := decodeDf(acc, cephDFDump)
assert.NoError(t, err)
require.NoError(t, err)
for _, r := range cephDfResults {
acc.AssertContainsTaggedFields(t, r.metric, r.fields, r.tags)
@ -79,14 +79,14 @@ func TestDecodeDf(t *testing.T) {
func TestDecodeOSDPoolStats(t *testing.T) {
acc := &testutil.Accumulator{}
err := decodeOsdPoolStats(acc, cephODSPoolStatsDump)
assert.NoError(t, err)
require.NoError(t, err)
for _, r := range cephOSDPoolStatsResults {
acc.AssertContainsTaggedFields(t, r.metric, r.fields, r.tags)
}
}
func TestGather(_ *testing.T) {
func TestGather(t *testing.T) {
saveFind := findSockets
saveDump := perfDump
defer func() {
@ -104,15 +104,15 @@ func TestGather(_ *testing.T) {
acc := &testutil.Accumulator{}
c := &Ceph{}
c.Gather(acc)
require.NoError(t, c.Gather(acc))
}
func TestFindSockets(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "socktest")
assert.NoError(t, err)
require.NoError(t, err)
defer func() {
err := os.Remove(tmpdir)
assert.NoError(t, err)
require.NoError(t, err)
}()
c := &Ceph{
CephBinary: "foo",
@ -129,10 +129,10 @@ func TestFindSockets(t *testing.T) {
}
for _, st := range sockTestParams {
createTestFiles(tmpdir, st)
require.NoError(t, createTestFiles(tmpdir, st))
sockets, err := findSockets(c)
assert.NoError(t, err)
require.NoError(t, err)
for i := 1; i <= st.osds; i++ {
assertFoundSocket(t, tmpdir, typeOsd, i, sockets)
@ -147,7 +147,7 @@ func TestFindSockets(t *testing.T) {
for i := 1; i <= st.rgws; i++ {
assertFoundSocket(t, tmpdir, typeRgw, i, sockets)
}
cleanupTestFiles(tmpdir, st)
require.NoError(t, cleanupTestFiles(tmpdir, st))
}
}
@ -165,54 +165,61 @@ func assertFoundSocket(t *testing.T, dir, sockType string, i int, sockets []*soc
expected := filepath.Join(dir, sockFile(prefix, i))
found := false
for _, s := range sockets {
fmt.Printf("Checking %s\n", s.socket)
_, err := fmt.Printf("Checking %s\n", s.socket)
require.NoError(t, err)
if s.socket == expected {
found = true
assert.Equal(t, s.sockType, sockType, "Unexpected socket type for '%s'", s)
assert.Equal(t, s.sockID, strconv.Itoa(i))
require.Equal(t, s.sockType, sockType, "Unexpected socket type for '%s'", s)
require.Equal(t, s.sockID, strconv.Itoa(i))
}
}
assert.True(t, found, "Did not find socket: %s", expected)
require.True(t, found, "Did not find socket: %s", expected)
}
func sockFile(prefix string, i int) string {
return strings.Join([]string{prefix, strconv.Itoa(i), sockSuffix}, ".")
}
func createTestFiles(dir string, st *SockTest) {
writeFile := func(prefix string, i int) {
func createTestFiles(dir string, st *SockTest) error {
writeFile := func(prefix string, i int) error {
f := sockFile(prefix, i)
fpath := filepath.Join(dir, f)
ioutil.WriteFile(fpath, []byte(""), 0777)
return ioutil.WriteFile(fpath, []byte(""), 0777)
}
tstFileApply(st, writeFile)
return tstFileApply(st, writeFile)
}
func cleanupTestFiles(dir string, st *SockTest) {
rmFile := func(prefix string, i int) {
func cleanupTestFiles(dir string, st *SockTest) error {
rmFile := func(prefix string, i int) error {
f := sockFile(prefix, i)
fpath := filepath.Join(dir, f)
err := os.Remove(fpath)
if err != nil {
fmt.Printf("Error removing test file %s: %v\n", fpath, err)
return os.Remove(fpath)
}
return tstFileApply(st, rmFile)
}
func tstFileApply(st *SockTest, fn func(string, int) error) error {
for i := 1; i <= st.osds; i++ {
if err := fn(osdPrefix, i); err != nil {
return err
}
}
tstFileApply(st, rmFile)
}
func tstFileApply(st *SockTest, fn func(prefix string, i int)) {
for i := 1; i <= st.osds; i++ {
fn(osdPrefix, i)
}
for i := 1; i <= st.mons; i++ {
fn(monPrefix, i)
if err := fn(monPrefix, i); err != nil {
return err
}
}
for i := 1; i <= st.mdss; i++ {
fn(mdsPrefix, i)
if err := fn(mdsPrefix, i); err != nil {
return err
}
}
for i := 1; i <= st.rgws; i++ {
fn(rgwPrefix, i)
if err := fn(rgwPrefix, i); err != nil {
return err
}
}
return nil
}
type SockTest struct {

View File

@ -94,11 +94,14 @@ Leap status : Not synchronized
if cmd == "chronyc" {
if args[0] == "tracking" {
//nolint:errcheck,revive // test will fail anyway
fmt.Fprint(os.Stdout, lookup+mockData)
} else {
//nolint:errcheck,revive // test will fail anyway
fmt.Fprint(os.Stdout, noLookup+mockData)
}
} else {
//nolint:errcheck,revive // test will fail anyway
fmt.Fprint(os.Stdout, "command not found")
os.Exit(1)
}

View File

@ -152,6 +152,7 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error {
var opts []grpc.ServerOption
tlsConfig, err := c.ServerConfig.TLSConfig()
if err != nil {
//nolint:errcheck,revive // we cannot do anything if the closing fails
c.listener.Close()
return err
} else if tlsConfig != nil {
@ -167,11 +168,14 @@ func (c *CiscoTelemetryMDT) Start(acc telegraf.Accumulator) error {
c.wg.Add(1)
go func() {
c.grpcServer.Serve(c.listener)
if err := c.grpcServer.Serve(c.listener); err != nil {
c.Log.Errorf("serving GRPC server failed: %v", err)
}
c.wg.Done()
}()
default:
//nolint:errcheck,revive // we cannot do anything if the closing fails
c.listener.Close()
return fmt.Errorf("invalid Cisco MDT transport: %s", c.Transport)
}
@ -210,7 +214,9 @@ func (c *CiscoTelemetryMDT) acceptTCPClients() {
delete(clients, conn)
mutex.Unlock()
conn.Close()
if err := conn.Close(); err != nil {
c.Log.Warnf("closing connection failed: %v", err)
}
c.wg.Done()
}()
}
@ -295,7 +301,9 @@ func (c *CiscoTelemetryMDT) MdtDialout(stream dialout.GRPCMdtDialout_MdtDialoutS
if packet.TotalSize == 0 {
c.handleTelemetry(packet.Data)
} else if int(packet.TotalSize) <= c.MaxMsgSize {
chunkBuffer.Write(packet.Data)
if _, err := chunkBuffer.Write(packet.Data); err != nil {
c.acc.AddError(fmt.Errorf("writing packet %q failed: %v", packet.Data, err))
}
if chunkBuffer.Len() >= int(packet.TotalSize) {
c.handleTelemetry(chunkBuffer.Bytes())
chunkBuffer.Reset()
@ -460,7 +468,9 @@ func (c *CiscoTelemetryMDT) parseRib(grouper *metric.SeriesGrouper, field *telem
tags[subfield.Name] = decodeTag(subfield)
}
if value := decodeValue(subfield); value != nil {
grouper.Add(measurement, tags, timestamp, subfield.Name, value)
if err := grouper.Add(measurement, tags, timestamp, subfield.Name, value); err != nil {
c.Log.Errorf("adding field %q to group failed: %v", subfield.Name, err)
}
}
if subfield.Name != "nextHop" {
continue
@ -475,7 +485,9 @@ func (c *CiscoTelemetryMDT) parseRib(grouper *metric.SeriesGrouper, field *telem
}
if value := decodeValue(ff); value != nil {
name := "nextHop/" + ff.Name
grouper.Add(measurement, tags, timestamp, name, value)
if err := grouper.Add(measurement, tags, timestamp, name, value); err != nil {
c.Log.Errorf("adding field %q to group failed: %v", name, err)
}
}
}
}
@ -540,9 +552,13 @@ func (c *CiscoTelemetryMDT) parseContentField(grouper *metric.SeriesGrouper, fie
}
if val := c.nxosValueXform(field, value, encodingPath); val != nil {
grouper.Add(measurement, tags, timestamp, name, val)
if err := grouper.Add(measurement, tags, timestamp, name, val); err != nil {
c.Log.Errorf("adding field %q to group failed: %v", name, err)
}
} else {
grouper.Add(measurement, tags, timestamp, name, value)
if err := grouper.Add(measurement, tags, timestamp, name, value); err != nil {
c.Log.Errorf("adding field %q to group failed: %v", name, err)
}
}
return
}
@ -652,9 +668,11 @@ func (c *CiscoTelemetryMDT) Address() net.Addr {
func (c *CiscoTelemetryMDT) Stop() {
if c.grpcServer != nil {
// Stop server and terminate all running dialout routines
//nolint:errcheck,revive // we cannot do anything if the stopping fails
c.grpcServer.Stop()
}
if c.listener != nil {
//nolint:errcheck,revive // we cannot do anything if the closing fails
c.listener.Close()
}
c.wg.Wait()

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/binary"
"errors"
"io"
"net"
"testing"
@ -78,7 +79,8 @@ func TestHandleTelemetryTwoSimple(t *testing.T) {
},
},
}
data, _ := proto.Marshal(telemetry)
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
@ -149,7 +151,8 @@ func TestHandleTelemetrySingleNested(t *testing.T) {
},
},
}
data, _ := proto.Marshal(telemetry)
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
@ -218,7 +221,8 @@ func TestHandleEmbeddedTags(t *testing.T) {
},
},
}
data, _ := proto.Marshal(telemetry)
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
@ -306,7 +310,8 @@ func TestHandleNXAPI(t *testing.T) {
},
},
}
data, _ := proto.Marshal(telemetry)
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
@ -382,7 +387,8 @@ func TestHandleNXAPIXformNXAPI(t *testing.T) {
},
},
}
data, _ := proto.Marshal(telemetry)
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
@ -467,7 +473,8 @@ func TestHandleNXXformMulti(t *testing.T) {
},
},
}
data, _ := proto.Marshal(telemetry)
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
@ -539,7 +546,8 @@ func TestHandleNXDME(t *testing.T) {
},
},
}
data, _ := proto.Marshal(telemetry)
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
c.handleTelemetry(data)
require.Empty(t, acc.Errors)
@ -566,9 +574,10 @@ func TestTCPDialoutOverflow(t *testing.T) {
addr := c.Address()
conn, err := net.Dial(addr.Network(), addr.String())
require.NoError(t, err)
binary.Write(conn, binary.BigEndian, hdr)
conn.Read([]byte{0})
conn.Close()
require.NoError(t, binary.Write(conn, binary.BigEndian, hdr))
_, err = conn.Read([]byte{0})
require.True(t, err == nil || err == io.EOF)
require.NoError(t, conn.Close())
c.Stop()
@ -629,32 +638,42 @@ func TestTCPDialoutMultiple(t *testing.T) {
conn, err := net.Dial(addr.Network(), addr.String())
require.NoError(t, err)
data, _ := proto.Marshal(telemetry)
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
hdr.MsgLen = uint32(len(data))
binary.Write(conn, binary.BigEndian, hdr)
conn.Write(data)
require.NoError(t, binary.Write(conn, binary.BigEndian, hdr))
_, err = conn.Write(data)
require.NoError(t, err)
conn2, err := net.Dial(addr.Network(), addr.String())
require.NoError(t, err)
telemetry.EncodingPath = "type:model/parallel/path"
data, _ = proto.Marshal(telemetry)
data, err = proto.Marshal(telemetry)
require.NoError(t, err)
hdr.MsgLen = uint32(len(data))
binary.Write(conn2, binary.BigEndian, hdr)
conn2.Write(data)
conn2.Write([]byte{0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0})
conn2.Read([]byte{0})
conn2.Close()
require.NoError(t, binary.Write(conn2, binary.BigEndian, hdr))
_, err = conn2.Write(data)
require.NoError(t, err)
_, err = conn2.Write([]byte{0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0})
require.NoError(t, err)
_, err = conn2.Read([]byte{0})
require.True(t, err == nil || err == io.EOF)
require.NoError(t, conn2.Close())
telemetry.EncodingPath = "type:model/other/path"
data, _ = proto.Marshal(telemetry)
data, err = proto.Marshal(telemetry)
require.NoError(t, err)
hdr.MsgLen = uint32(len(data))
binary.Write(conn, binary.BigEndian, hdr)
conn.Write(data)
conn.Write([]byte{0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0})
conn.Read([]byte{0})
require.NoError(t, binary.Write(conn, binary.BigEndian, hdr))
_, err = conn.Write(data)
require.NoError(t, err)
_, err = conn.Write([]byte{0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0})
require.NoError(t, err)
_, err = conn.Read([]byte{0})
require.True(t, err == nil || err == io.EOF)
c.Stop()
conn.Close()
require.NoError(t, conn.Close())
// We use the invalid dialout flags to let the server close the connection
require.Equal(t, acc.Errors, []error{errors.New("invalid dialout flags: 257"), errors.New("invalid dialout flags: 257")})
@ -679,15 +698,18 @@ func TestGRPCDialoutError(t *testing.T) {
require.NoError(t, err)
addr := c.Address()
conn, _ := grpc.Dial(addr.String(), grpc.WithInsecure())
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
require.NoError(t, err)
client := dialout.NewGRPCMdtDialoutClient(conn)
stream, _ := client.MdtDialout(context.Background())
stream, err := client.MdtDialout(context.Background())
require.NoError(t, err)
args := &dialout.MdtDialoutArgs{Errors: "foobar"}
stream.Send(args)
require.NoError(t, stream.Send(args))
// Wait for the server to close
stream.Recv()
_, err = stream.Recv()
require.True(t, err == nil || err == io.EOF)
c.Stop()
require.Equal(t, acc.Errors, []error{errors.New("GRPC dialout error: foobar")})
@ -702,35 +724,44 @@ func TestGRPCDialoutMultiple(t *testing.T) {
telemetry := mockTelemetryMessage()
addr := c.Address()
conn, _ := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock())
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock())
require.NoError(t, err)
client := dialout.NewGRPCMdtDialoutClient(conn)
stream, _ := client.MdtDialout(context.TODO())
stream, err := client.MdtDialout(context.TODO())
require.NoError(t, err)
data, _ := proto.Marshal(telemetry)
data, err := proto.Marshal(telemetry)
require.NoError(t, err)
args := &dialout.MdtDialoutArgs{Data: data, ReqId: 456}
stream.Send(args)
require.NoError(t, stream.Send(args))
conn2, _ := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock())
conn2, err := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock())
require.NoError(t, err)
client2 := dialout.NewGRPCMdtDialoutClient(conn2)
stream2, _ := client2.MdtDialout(context.TODO())
stream2, err := client2.MdtDialout(context.TODO())
require.NoError(t, err)
telemetry.EncodingPath = "type:model/parallel/path"
data, _ = proto.Marshal(telemetry)
data, err = proto.Marshal(telemetry)
require.NoError(t, err)
args = &dialout.MdtDialoutArgs{Data: data}
stream2.Send(args)
stream2.Send(&dialout.MdtDialoutArgs{Errors: "testclose"})
stream2.Recv()
conn2.Close()
require.NoError(t, stream2.Send(args))
require.NoError(t, stream2.Send(&dialout.MdtDialoutArgs{Errors: "testclose"}))
_, err = stream2.Recv()
require.True(t, err == nil || err == io.EOF)
require.NoError(t, conn2.Close())
telemetry.EncodingPath = "type:model/other/path"
data, _ = proto.Marshal(telemetry)
data, err = proto.Marshal(telemetry)
require.NoError(t, err)
args = &dialout.MdtDialoutArgs{Data: data}
stream.Send(args)
stream.Send(&dialout.MdtDialoutArgs{Errors: "testclose"})
stream.Recv()
require.NoError(t, stream.Send(args))
require.NoError(t, stream.Send(&dialout.MdtDialoutArgs{Errors: "testclose"}))
_, err = stream.Recv()
require.True(t, err == nil || err == io.EOF)
c.Stop()
conn.Close()
require.NoError(t, conn.Close())
require.Equal(t, acc.Errors, []error{errors.New("GRPC dialout error: testclose"), errors.New("GRPC dialout error: testclose")})

View File

@ -57,7 +57,7 @@ func TestGather(t *testing.T) {
enc := json.NewEncoder(w)
switch query := r.URL.Query().Get("query"); {
case strings.Contains(query, "system.parts"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
Database string `json:"database"`
Table string `json:"table"`
@ -74,8 +74,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "system.events"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
Metric string `json:"metric"`
Value chUInt64 `json:"value"`
@ -90,8 +91,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "system.metrics"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
Metric string `json:"metric"`
Value chUInt64 `json:"value"`
@ -106,8 +108,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "system.asynchronous_metrics"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
Metric string `json:"metric"`
Value chUInt64 `json:"value"`
@ -122,8 +125,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "zk_exists"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
ZkExists chUInt64 `json:"zk_exists"`
}{
@ -132,8 +136,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "zk_root_nodes"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
ZkRootNodes chUInt64 `json:"zk_root_nodes"`
}{
@ -142,8 +147,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "replication_queue_exists"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
ReplicationQueueExists chUInt64 `json:"replication_queue_exists"`
}{
@ -152,8 +158,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "replication_too_many_tries_replicas"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
TooManyTriesReplicas chUInt64 `json:"replication_too_many_tries_replicas"`
NumTriesReplicas chUInt64 `json:"replication_num_tries_replicas"`
@ -164,8 +171,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "system.detached_parts"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
DetachedParts chUInt64 `json:"detached_parts"`
}{
@ -174,8 +182,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "system.dictionaries"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
Origin string `json:"origin"`
Status string `json:"status"`
@ -188,8 +197,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "system.mutations"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
Failed chUInt64 `json:"failed"`
Completed chUInt64 `json:"completed"`
@ -202,8 +212,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "system.disks"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
Name string `json:"name"`
Path string `json:"path"`
@ -218,8 +229,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "system.processes"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
QueryType string `json:"query_type"`
Percentile50 float64 `json:"p50"`
@ -246,8 +258,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "text_log_exists"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
TextLogExists chUInt64 `json:"text_log_exists"`
}{
@ -256,8 +269,9 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "system.text_log"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
Level string `json:"level"`
LastMessagesLast10Min chUInt64 `json:"messages_last_10_min"`
@ -284,6 +298,7 @@ func TestGather(t *testing.T) {
},
},
})
assert.NoError(t, err)
}
}))
ch = &ClickHouse{
@ -294,7 +309,7 @@ func TestGather(t *testing.T) {
acc = &testutil.Accumulator{}
)
defer ts.Close()
ch.Gather(acc)
assert.NoError(t, ch.Gather(acc))
acc.AssertContainsTaggedFields(t, "clickhouse_tables",
map[string]interface{}{
@ -427,7 +442,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) {
enc := json.NewEncoder(w)
switch query := r.URL.Query().Get("query"); {
case strings.Contains(query, "zk_exists"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
ZkExists chUInt64 `json:"zk_exists"`
}{
@ -436,8 +451,9 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "replication_queue_exists"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
ReplicationQueueExists chUInt64 `json:"replication_queue_exists"`
}{
@ -446,8 +462,9 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) {
},
},
})
assert.NoError(t, err)
case strings.Contains(query, "text_log_exists"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
TextLogExists chUInt64 `json:"text_log_exists"`
}{
@ -456,6 +473,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) {
},
},
})
assert.NoError(t, err)
}
}))
ch = &ClickHouse{
@ -467,7 +485,7 @@ func TestGatherWithSomeTablesNotExists(t *testing.T) {
acc = &testutil.Accumulator{}
)
defer ts.Close()
ch.Gather(acc)
assert.NoError(t, ch.Gather(acc))
acc.AssertDoesNotContainMeasurement(t, "clickhouse_zookeeper")
acc.AssertDoesNotContainMeasurement(t, "clickhouse_replication_queue")
@ -482,9 +500,10 @@ func TestWrongJSONMarshalling(t *testing.T) {
}
enc := json.NewEncoder(w)
//wrong data section json
enc.Encode(result{
err := enc.Encode(result{
Data: []struct{}{},
})
assert.NoError(t, err)
}))
ch = &ClickHouse{
Servers: []string{
@ -495,7 +514,7 @@ func TestWrongJSONMarshalling(t *testing.T) {
acc = &testutil.Accumulator{}
)
defer ts.Close()
ch.Gather(acc)
assert.NoError(t, ch.Gather(acc))
assert.Equal(t, 0, len(acc.Metrics))
allMeasurements := []string{
@ -528,7 +547,7 @@ func TestOfflineServer(t *testing.T) {
},
}
)
ch.Gather(acc)
assert.NoError(t, ch.Gather(acc))
assert.Equal(t, 0, len(acc.Metrics))
allMeasurements := []string{
@ -548,7 +567,7 @@ func TestOfflineServer(t *testing.T) {
assert.GreaterOrEqual(t, len(allMeasurements), len(acc.Errors))
}
func TestAutoDiscovery(_ *testing.T) {
func TestAutoDiscovery(t *testing.T) {
var (
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
type result struct {
@ -557,7 +576,7 @@ func TestAutoDiscovery(_ *testing.T) {
enc := json.NewEncoder(w)
switch query := r.URL.Query().Get("query"); {
case strings.Contains(query, "system.clusters"):
enc.Encode(result{
err := enc.Encode(result{
Data: []struct {
Cluster string `json:"test"`
Hostname string `json:"localhost"`
@ -570,6 +589,7 @@ func TestAutoDiscovery(_ *testing.T) {
},
},
})
assert.NoError(t, err)
}
}))
ch = &ClickHouse{
@ -582,5 +602,5 @@ func TestAutoDiscovery(_ *testing.T) {
acc = &testutil.Accumulator{}
)
defer ts.Close()
ch.Gather(acc)
assert.NoError(t, ch.Gather(acc))
}

View File

@ -169,9 +169,13 @@ func (p *PubSubPush) Start(acc telegraf.Accumulator) error {
go func() {
defer p.wg.Done()
if tlsConf != nil {
p.server.ListenAndServeTLS("", "")
if err := p.server.ListenAndServeTLS("", ""); err != nil && err != http.ErrServerClosed {
p.Log.Errorf("listening and serving TLS failed: %v", err)
}
} else {
p.server.ListenAndServe()
if err := p.server.ListenAndServe(); err != nil {
p.Log.Errorf("listening and serving TLS failed: %v", err)
}
}
}()
@ -181,6 +185,7 @@ func (p *PubSubPush) Start(acc telegraf.Accumulator) error {
// Stop cleans up all resources
func (p *PubSubPush) Stop() {
p.cancel()
//nolint:errcheck,revive // we cannot do anything if the shutdown fails
p.server.Shutdown(p.ctx)
p.wg.Wait()
}

View File

@ -156,6 +156,7 @@ func TestServeHTTP(t *testing.T) {
defer wg.Done()
for m := range d {
ro.AddMetric(m)
//nolint:errcheck,revive // test will fail anyway if the write fails
ro.Write()
}
}(dst)

View File

@ -584,7 +584,9 @@ func (c *CloudWatch) aggregateMetrics(
tags["region"] = c.Region
for i := range result.Values {
grouper.Add(namespace, tags, *result.Timestamps[i], *result.Label, *result.Values[i])
if err := grouper.Add(namespace, tags, *result.Timestamps[i], *result.Label, *result.Values[i]); err != nil {
acc.AddError(err)
}
}
}

View File

@ -241,7 +241,7 @@ func TestGenerateStatisticsInputParams(t *testing.T) {
Period: internalDuration,
}
c.initializeCloudWatch()
require.NoError(t, c.initializeCloudWatch())
now := time.Now()
@ -278,7 +278,7 @@ func TestGenerateStatisticsInputParamsFiltered(t *testing.T) {
Period: internalDuration,
}
c.initializeCloudWatch()
require.NoError(t, c.initializeCloudWatch())
now := time.Now()

View File

@ -11,7 +11,7 @@ import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func restoreDflts(savedFiles, savedDirs []string) {
@ -28,18 +28,18 @@ func TestNoFilesFound(t *testing.T) {
acc := &testutil.Accumulator{}
err := c.Gather(acc)
assert.EqualError(t, err, "Conntrack input failed to collect metrics. "+
require.EqualError(t, err, "Conntrack input failed to collect metrics. "+
"Is the conntrack kernel module loaded?")
}
func TestDefaultsUsed(t *testing.T) {
defer restoreDflts(dfltFiles, dfltDirs)
tmpdir, err := ioutil.TempDir("", "tmp1")
assert.NoError(t, err)
require.NoError(t, err)
defer os.Remove(tmpdir)
tmpFile, err := ioutil.TempFile(tmpdir, "ip_conntrack_count")
assert.NoError(t, err)
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
dfltDirs = []string{tmpdir}
@ -47,11 +47,11 @@ func TestDefaultsUsed(t *testing.T) {
dfltFiles = []string{fname}
count := 1234321
ioutil.WriteFile(tmpFile.Name(), []byte(strconv.Itoa(count)), 0660)
require.NoError(t, ioutil.WriteFile(tmpFile.Name(), []byte(strconv.Itoa(count)), 0660))
c := &Conntrack{}
acc := &testutil.Accumulator{}
c.Gather(acc)
require.NoError(t, c.Gather(acc))
acc.AssertContainsFields(t, inputName, map[string]interface{}{
fname: float64(count)})
}
@ -59,12 +59,13 @@ func TestDefaultsUsed(t *testing.T) {
func TestConfigsUsed(t *testing.T) {
defer restoreDflts(dfltFiles, dfltDirs)
tmpdir, err := ioutil.TempDir("", "tmp1")
assert.NoError(t, err)
require.NoError(t, err)
defer os.Remove(tmpdir)
cntFile, err := ioutil.TempFile(tmpdir, "nf_conntrack_count")
require.NoError(t, err)
maxFile, err := ioutil.TempFile(tmpdir, "nf_conntrack_max")
assert.NoError(t, err)
require.NoError(t, err)
defer os.Remove(cntFile.Name())
defer os.Remove(maxFile.Name())
@ -75,12 +76,12 @@ func TestConfigsUsed(t *testing.T) {
count := 1234321
max := 9999999
ioutil.WriteFile(cntFile.Name(), []byte(strconv.Itoa(count)), 0660)
ioutil.WriteFile(maxFile.Name(), []byte(strconv.Itoa(max)), 0660)
require.NoError(t, ioutil.WriteFile(cntFile.Name(), []byte(strconv.Itoa(count)), 0660))
require.NoError(t, ioutil.WriteFile(maxFile.Name(), []byte(strconv.Itoa(max)), 0660))
c := &Conntrack{}
acc := &testutil.Accumulator{}
c.Gather(acc)
require.NoError(t, c.Gather(acc))
fix := func(s string) string {
return strings.Replace(s, "nf_", "ip_", 1)

View File

@ -140,9 +140,9 @@ func (c *CouchDB) fetchAndInsertData(accumulator telegraf.Accumulator, host stri
req.SetBasicAuth(c.BasicUsername, c.BasicPassword)
}
response, error := c.client.Do(req)
if error != nil {
return error
response, err := c.client.Do(req)
if err != nil {
return err
}
defer response.Body.Close()
@ -152,7 +152,9 @@ func (c *CouchDB) fetchAndInsertData(accumulator telegraf.Accumulator, host stri
stats := Stats{}
decoder := json.NewDecoder(response.Body)
decoder.Decode(&stats)
if err := decoder.Decode(&stats); err != nil {
return fmt.Errorf("failed to decode stats from couchdb: HTTP body %q", response.Body)
}
fields := map[string]interface{}{}

View File

@ -292,6 +292,7 @@ func (c *ClusterClient) doGet(ctx context.Context, url string, v interface{}) er
return err
}
defer func() {
//nolint:errcheck,revive // we cannot do anything if the closing fails
resp.Body.Close()
<-c.semaphore
}()

View File

@ -39,7 +39,7 @@ const sampleConfig = `
## The amount of time a file is allowed to sit in the directory before it is picked up.
## This time can generally be low but if you choose to have a very large file written to the directory and it's potentially slow,
## set this higher so that the plugin will wait until the file is fully copied to the directory.
# directory_duration_threshold = "50ms"
# directory_duration_threshold = "50ms"
#
## A list of the only file names to monitor, if necessary. Supports regex. If left blank, all files are ingested.
# files_to_monitor = ["^.*\.csv"]
@ -118,6 +118,7 @@ func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error {
// We've been cancelled via Stop().
if monitor.context.Err() != nil {
//nolint:nilerr // context cancelation is not an error
return nil
}
@ -266,7 +267,9 @@ func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Read
firstLine = false
}
monitor.sendMetrics(metrics)
if err := monitor.sendMetrics(metrics); err != nil {
return err
}
}
return nil
@ -295,13 +298,16 @@ func (monitor *DirectoryMonitor) parseLine(parser parsers.Parser, line []byte, f
}
}
func (monitor *DirectoryMonitor) sendMetrics(metrics []telegraf.Metric) {
func (monitor *DirectoryMonitor) sendMetrics(metrics []telegraf.Metric) error {
// Report the metrics for the file.
for _, m := range metrics {
// Block until metric can be written.
monitor.sem.Acquire(monitor.context, 1)
if err := monitor.sem.Acquire(monitor.context, 1); err != nil {
return err
}
monitor.acc.AddTrackingMetricGroup([]telegraf.Metric{m})
}
return nil
}
func (monitor *DirectoryMonitor) moveFile(filePath string, directory string) {
@ -344,7 +350,7 @@ func (monitor *DirectoryMonitor) SetParserFunc(fn parsers.ParserFunc) {
func (monitor *DirectoryMonitor) Init() error {
if monitor.Directory == "" || monitor.FinishedDirectory == "" {
return errors.New("Missing one of the following required config options: directory, finished_directory.")
return errors.New("missing one of the following required config options: directory, finished_directory")
}
if monitor.FileQueueSize <= 0 {

View File

@ -49,15 +49,20 @@ func TestCSVGZImport(t *testing.T) {
// Write csv file to process into the 'process' directory.
f, err := os.Create(filepath.Join(processDirectory, testCsvFile))
require.NoError(t, err)
f.WriteString("thing,color\nsky,blue\ngrass,green\nclifford,red\n")
f.Close()
_, err = f.WriteString("thing,color\nsky,blue\ngrass,green\nclifford,red\n")
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)
// Write csv.gz file to process into the 'process' directory.
var b bytes.Buffer
w := gzip.NewWriter(&b)
w.Write([]byte("thing,color\nsky,blue\ngrass,green\nclifford,red\n"))
w.Close()
_, err = w.Write([]byte("thing,color\nsky,blue\ngrass,green\nclifford,red\n"))
require.NoError(t, err)
err = w.Close()
require.NoError(t, err)
err = ioutil.WriteFile(filepath.Join(processDirectory, testCsvGzFile), b.Bytes(), 0666)
require.NoError(t, err)
// Start plugin before adding file.
err = r.Start(&acc)
@ -112,8 +117,10 @@ func TestMultipleJSONFileImports(t *testing.T) {
// Write csv file to process into the 'process' directory.
f, err := os.Create(filepath.Join(processDirectory, testJSONFile))
require.NoError(t, err)
f.WriteString("{\"Name\": \"event1\",\"Speed\": 100.1,\"Length\": 20.1}\n{\"Name\": \"event2\",\"Speed\": 500,\"Length\": 1.4}\n{\"Name\": \"event3\",\"Speed\": 200,\"Length\": 10.23}\n{\"Name\": \"event4\",\"Speed\": 80,\"Length\": 250}\n{\"Name\": \"event5\",\"Speed\": 120.77,\"Length\": 25.97}")
f.Close()
_, err = f.WriteString("{\"Name\": \"event1\",\"Speed\": 100.1,\"Length\": 20.1}\n{\"Name\": \"event2\",\"Speed\": 500,\"Length\": 1.4}\n{\"Name\": \"event3\",\"Speed\": 200,\"Length\": 10.23}\n{\"Name\": \"event4\",\"Speed\": 80,\"Length\": 250}\n{\"Name\": \"event5\",\"Speed\": 120.77,\"Length\": 25.97}")
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)
err = r.Start(&acc)
r.Log = testutil.Logger{}

View File

@ -58,10 +58,10 @@ func (d *DiskIO) diskInfo(devName string) (map[string]string, error) {
}
// Final open of the confirmed (or the previously detected/used) udev file
f, err := os.Open(udevDataPath)
defer f.Close()
if err != nil {
return nil, err
}
defer f.Close()
di := map[string]string{}
@ -80,9 +80,12 @@ func (d *DiskIO) diskInfo(devName string) (map[string]string, error) {
}
if l[:2] == "S:" {
if devlinks.Len() > 0 {
//nolint:errcheck,revive // this will never fail
devlinks.WriteString(" ")
}
//nolint:errcheck,revive // this will never fail
devlinks.WriteString("/dev/")
//nolint:errcheck,revive // this will never fail
devlinks.WriteString(l[2:])
continue
}

View File

@ -7,7 +7,6 @@ import (
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -19,7 +18,7 @@ S:foo/bar/devlink1
`)
// setupNullDisk sets up fake udev info as if /dev/null were a disk.
func setupNullDisk(t *testing.T, s *DiskIO, devName string) func() error {
func setupNullDisk(t *testing.T, s *DiskIO, devName string) func() {
td, err := ioutil.TempFile("", ".telegraf.DiskInfoTest")
require.NoError(t, err)
@ -37,9 +36,10 @@ func setupNullDisk(t *testing.T, s *DiskIO, devName string) func() error {
}
origUdevPath := ic.udevDataPath
cleanFunc := func() error {
cleanFunc := func() {
ic.udevDataPath = origUdevPath
return os.Remove(td.Name())
//nolint:errcheck,revive // we cannot do anything if file cannot be removed
os.Remove(td.Name())
}
ic.udevDataPath = td.Name()
@ -58,19 +58,18 @@ func TestDiskInfo(t *testing.T) {
defer clean()
di, err := s.diskInfo("null")
require.NoError(t, err)
assert.Equal(t, "myval1", di["MY_PARAM_1"])
assert.Equal(t, "myval2", di["MY_PARAM_2"])
assert.Equal(t, "/dev/foo/bar/devlink /dev/foo/bar/devlink1", di["DEVLINKS"])
require.Equal(t, "myval1", di["MY_PARAM_1"])
require.Equal(t, "myval2", di["MY_PARAM_2"])
require.Equal(t, "/dev/foo/bar/devlink /dev/foo/bar/devlink1", di["DEVLINKS"])
// test that data is cached
err = clean()
require.NoError(t, err)
clean()
di, err = s.diskInfo("null")
require.NoError(t, err)
assert.Equal(t, "myval1", di["MY_PARAM_1"])
assert.Equal(t, "myval2", di["MY_PARAM_2"])
assert.Equal(t, "/dev/foo/bar/devlink /dev/foo/bar/devlink1", di["DEVLINKS"])
require.Equal(t, "myval1", di["MY_PARAM_1"])
require.Equal(t, "myval2", di["MY_PARAM_2"])
require.Equal(t, "/dev/foo/bar/devlink /dev/foo/bar/devlink1", di["DEVLINKS"])
// unfortunately we can't adjust mtime on /dev/null to test cache invalidation
}
@ -98,7 +97,7 @@ func TestDiskIOStats_diskName(t *testing.T) {
}
defer setupNullDisk(t, &s, "null")()
name, _ := s.diskName("null")
assert.Equal(t, tc.expected, name, "Templates: %#v", tc.templates)
require.Equal(t, tc.expected, name, "Templates: %#v", tc.templates)
}
}
@ -110,5 +109,5 @@ func TestDiskIOStats_diskTags(t *testing.T) {
}
defer setupNullDisk(t, s, "null")()
dt := s.diskTags("null")
assert.Equal(t, map[string]string{"MY_PARAM_2": "myval2"}, dt)
require.Equal(t, map[string]string{"MY_PARAM_2": "myval2"}, dt)
}

View File

@ -68,8 +68,7 @@ func (d *Disque) Gather(acc telegraf.Accumulator) error {
url := &url.URL{
Host: ":7711",
}
d.gatherServer(url, acc)
return nil
return d.gatherServer(url, acc)
}
var wg sync.WaitGroup
@ -114,7 +113,9 @@ func (d *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
if addr.User != nil {
pwd, set := addr.User.Password()
if set && pwd != "" {
c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd)))
if _, err := c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd))); err != nil {
return err
}
r := bufio.NewReader(c)
@ -132,9 +133,13 @@ func (d *Disque) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
}
// Extend connection
d.c.SetDeadline(time.Now().Add(defaultTimeout))
if err := d.c.SetDeadline(time.Now().Add(defaultTimeout)); err != nil {
return err
}
d.c.Write([]byte("info\r\n"))
if _, err := d.c.Write([]byte("info\r\n")); err != nil {
return err
}
r := bufio.NewReader(d.c)

View File

@ -38,8 +38,12 @@ func TestDisqueGeneratesMetricsIntegration(t *testing.T) {
return
}
fmt.Fprintf(c, "$%d\n", len(testOutput))
c.Write([]byte(testOutput))
if _, err := fmt.Fprintf(c, "$%d\n", len(testOutput)); err != nil {
return
}
if _, err := c.Write([]byte(testOutput)); err != nil {
return
}
}
}()
@ -104,8 +108,12 @@ func TestDisqueCanPullStatsFromMultipleServersIntegration(t *testing.T) {
return
}
fmt.Fprintf(c, "$%d\n", len(testOutput))
c.Write([]byte(testOutput))
if _, err := fmt.Fprintf(c, "$%d\n", len(testOutput)); err != nil {
return
}
if _, err := c.Write([]byte(testOutput)); err != nil {
return
}
}
}()

View File

@ -911,7 +911,7 @@ func TestDockerGatherSwarmInfo(t *testing.T) {
err := acc.GatherError(d.Gather)
require.NoError(t, err)
d.gatherSwarmInfo(&acc)
require.NoError(t, d.gatherSwarmInfo(&acc))
// test docker_container_net measurement
acc.AssertContainsTaggedFields(t,

View File

@ -398,8 +398,11 @@ func tailMultiplexed(
}()
_, err := stdcopy.StdCopy(outWriter, errWriter, src)
//nolint:errcheck,revive // we cannot do anything if the closing fails
outWriter.Close()
//nolint:errcheck,revive // we cannot do anything if the closing fails
errWriter.Close()
//nolint:errcheck,revive // we cannot do anything if the closing fails
src.Close()
wg.Wait()
return err

View File

@ -138,8 +138,8 @@ func Test(t *testing.T) {
ContainerLogsF: func(ctx context.Context, containerID string, options types.ContainerLogsOptions) (io.ReadCloser, error) {
var buf bytes.Buffer
w := stdcopy.NewStdWriter(&buf, stdcopy.Stdout)
w.Write([]byte("2020-04-28T18:42:16.432691200Z hello from stdout"))
return &Response{Reader: &buf}, nil
_, err := w.Write([]byte("2020-04-28T18:42:16.432691200Z hello from stdout"))
return &Response{Reader: &buf}, err
},
},
expected: []telegraf.Metric{

View File

@ -90,7 +90,9 @@ func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype stri
defer c.Close()
// Extend connection
c.SetDeadline(time.Now().Add(defaultTimeout))
if err := c.SetDeadline(time.Now().Add(defaultTimeout)); err != nil {
return fmt.Errorf("setting deadline failed for dovecot server '%s': %s", addr, err)
}
msg := fmt.Sprintf("EXPORT\t%s", qtype)
if len(filter) > 0 {
@ -98,9 +100,13 @@ func (d *Dovecot) gatherServer(addr string, acc telegraf.Accumulator, qtype stri
}
msg += "\n"
c.Write([]byte(msg))
if _, err := c.Write([]byte(msg)); err != nil {
return fmt.Errorf("writing message %q failed for dovecot server '%s': %s", msg, addr, err)
}
var buf bytes.Buffer
io.Copy(&buf, c)
if _, err := io.Copy(&buf, c); err != nil {
return fmt.Errorf("copying message failed for dovecot server '%s': %s", addr, err)
}
host, _, _ := net.SplitHostPort(addr)

View File

@ -6,7 +6,7 @@ import (
"time"
"github.com/docker/docker/api/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// codified golden objects for tests
@ -800,10 +800,10 @@ func TestResolveEndpoint(t *testing.T) {
{
name: "Endpoint is not set, ECS_CONTAINER_METADATA_URI is set => use v3 metadata",
preF: func() {
os.Setenv("ECS_CONTAINER_METADATA_URI", "v3-endpoint.local")
require.NoError(t, os.Setenv("ECS_CONTAINER_METADATA_URI", "v3-endpoint.local"))
},
afterF: func() {
os.Unsetenv("ECS_CONTAINER_METADATA_URI")
require.NoError(t, os.Unsetenv("ECS_CONTAINER_METADATA_URI"))
},
given: Ecs{
EndpointURL: "",
@ -825,7 +825,7 @@ func TestResolveEndpoint(t *testing.T) {
act := tt.given
resolveEndpoint(&act)
assert.Equal(t, tt.exp, act)
require.Equal(t, tt.exp, act)
})
}
}

View File

@ -8,7 +8,6 @@ import (
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -49,14 +48,7 @@ func (t *transportMock) RoundTrip(r *http.Request) (*http.Response, error) {
return res, nil
}
func (t *transportMock) CancelRequest(_ *http.Request) {
}
func checkIsMaster(es *Elasticsearch, server string, expected bool, t *testing.T) {
if es.serverInfo[server].isMaster() != expected {
assert.Fail(t, "IsMaster set incorrectly")
}
}
func (t *transportMock) CancelRequest(_ *http.Request) {}
func checkNodeStatsResult(t *testing.T, acc *testutil.Accumulator) {
tags := defaultTags()
@ -79,11 +71,8 @@ func TestGather(t *testing.T) {
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
if err := acc.GatherError(es.Gather); err != nil {
t.Fatal(err)
}
checkIsMaster(es, es.Servers[0], false, t)
require.NoError(t, acc.GatherError(es.Gather))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
checkNodeStatsResult(t, &acc)
}
@ -96,11 +85,8 @@ func TestGatherIndividualStats(t *testing.T) {
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
if err := acc.GatherError(es.Gather); err != nil {
t.Fatal(err)
}
checkIsMaster(es, es.Servers[0], false, t)
require.NoError(t, acc.GatherError(es.Gather))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
tags := defaultTags()
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices", nodestatsIndicesExpected, tags)
@ -122,11 +108,8 @@ func TestGatherNodeStats(t *testing.T) {
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
if err := es.gatherNodeStats("junk", &acc); err != nil {
t.Fatal(err)
}
checkIsMaster(es, es.Servers[0], false, t)
require.NoError(t, es.gatherNodeStats("junk", &acc))
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
checkNodeStatsResult(t, &acc)
}
@ -141,8 +124,7 @@ func TestGatherClusterHealthEmptyClusterHealth(t *testing.T) {
var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))
checkIsMaster(es, es.Servers[0], false, t)
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
@ -168,8 +150,7 @@ func TestGatherClusterHealthSpecificClusterHealth(t *testing.T) {
var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))
checkIsMaster(es, es.Servers[0], false, t)
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
@ -195,8 +176,7 @@ func TestGatherClusterHealthAlsoIndicesHealth(t *testing.T) {
var acc testutil.Accumulator
require.NoError(t, es.gatherClusterHealth("junk", &acc))
checkIsMaster(es, es.Servers[0], false, t)
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
acc.AssertContainsTaggedFields(t, "elasticsearch_cluster_health",
clusterHealthExpected,
@ -227,19 +207,14 @@ func TestGatherClusterStatsMaster(t *testing.T) {
es.serverInfo["http://example.com:9200"] = info
IsMasterResultTokens := strings.Split(string(IsMasterResult), " ")
if masterID != IsMasterResultTokens[0] {
assert.Fail(t, "catmaster is incorrect")
}
require.Equal(t, masterID, IsMasterResultTokens[0], "catmaster is incorrect")
// now get node status, which determines whether we're master
var acc testutil.Accumulator
es.Local = true
es.client.Transport = newTransportMock(nodeStatsResponse)
if err := es.gatherNodeStats("junk", &acc); err != nil {
t.Fatal(err)
}
checkIsMaster(es, es.Servers[0], true, t)
require.NoError(t, es.gatherNodeStats("junk", &acc))
require.True(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
checkNodeStatsResult(t, &acc)
// now test the clusterstats method
@ -270,20 +245,16 @@ func TestGatherClusterStatsNonMaster(t *testing.T) {
require.NoError(t, err)
IsNotMasterResultTokens := strings.Split(string(IsNotMasterResult), " ")
if masterID != IsNotMasterResultTokens[0] {
assert.Fail(t, "catmaster is incorrect")
}
require.Equal(t, masterID, IsNotMasterResultTokens[0], "catmaster is incorrect")
// now get node status, which determines whether we're master
var acc testutil.Accumulator
es.Local = true
es.client.Transport = newTransportMock(nodeStatsResponse)
if err := es.gatherNodeStats("junk", &acc); err != nil {
t.Fatal(err)
}
require.NoError(t, es.gatherNodeStats("junk", &acc))
// ensure flag is clear so Cluster Stats would not be done
checkIsMaster(es, es.Servers[0], false, t)
require.False(t, es.serverInfo[es.Servers[0]].isMaster(), "IsMaster set incorrectly")
checkNodeStatsResult(t, &acc)
}
@ -296,9 +267,7 @@ func TestGatherClusterIndicesStats(t *testing.T) {
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
if err := es.gatherIndicesStats("junk", &acc); err != nil {
t.Fatal(err)
}
require.NoError(t, es.gatherIndicesStats("junk", &acc))
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
@ -313,12 +282,10 @@ func TestGatherDateStampedIndicesStats(t *testing.T) {
es.client.Transport = newTransportMock(dateStampedIndicesResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
es.Init()
require.NoError(t, es.Init())
var acc testutil.Accumulator
if err := es.gatherIndicesStats(es.Servers[0]+"/"+strings.Join(es.IndicesInclude, ",")+"/_stats", &acc); err != nil {
t.Fatal(err)
}
require.NoError(t, es.gatherIndicesStats(es.Servers[0]+"/"+strings.Join(es.IndicesInclude, ",")+"/_stats", &acc))
// includes 2 most recent indices for "twitter", only expect the most recent two.
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
@ -357,9 +324,7 @@ func TestGatherClusterIndiceShardsStats(t *testing.T) {
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
var acc testutil.Accumulator
if err := es.gatherIndicesStats("junk", &acc); err != nil {
t.Fatal(err)
}
require.NoError(t, es.gatherIndicesStats("junk", &acc))
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,

View File

@ -111,6 +111,7 @@ func (c CommandRunner) truncate(buf bytes.Buffer) bytes.Buffer {
buf.Truncate(i)
}
if didTruncate {
//nolint:errcheck,revive // Will always return nil or panic
buf.WriteString("...")
}
return buf

View File

@ -200,12 +200,14 @@ func TestTruncate(t *testing.T) {
name: "should not truncate",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world")
_, err := b.WriteString("hello world")
require.NoError(t, err)
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world")
_, err := b.WriteString("hello world")
require.NoError(t, err)
return &b
},
},
@ -213,12 +215,14 @@ func TestTruncate(t *testing.T) {
name: "should truncate up to the new line",
bufF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world\nand all the people")
_, err := b.WriteString("hello world\nand all the people")
require.NoError(t, err)
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
b.WriteString("hello world...")
_, err := b.WriteString("hello world...")
require.NoError(t, err)
return &b
},
},
@ -227,16 +231,17 @@ func TestTruncate(t *testing.T) {
bufF: func() *bytes.Buffer {
var b bytes.Buffer
for i := 0; i < 2*MaxStderrBytes; i++ {
b.WriteByte('b')
require.NoError(t, b.WriteByte('b'))
}
return &b
},
expF: func() *bytes.Buffer {
var b bytes.Buffer
for i := 0; i < MaxStderrBytes; i++ {
b.WriteByte('b')
require.NoError(t, b.WriteByte('b'))
}
b.WriteString("...")
_, err := b.WriteString("...")
require.NoError(t, err)
return &b
},
},

View File

@ -23,17 +23,19 @@ func (e *Execd) Gather(_ telegraf.Accumulator) error {
}
switch e.Signal {
case "SIGHUP":
osProcess.Signal(syscall.SIGHUP)
return osProcess.Signal(syscall.SIGHUP)
case "SIGUSR1":
osProcess.Signal(syscall.SIGUSR1)
return osProcess.Signal(syscall.SIGUSR1)
case "SIGUSR2":
osProcess.Signal(syscall.SIGUSR2)
return osProcess.Signal(syscall.SIGUSR2)
case "STDIN":
if osStdin, ok := e.process.Stdin.(*os.File); ok {
osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second))
if err := osStdin.SetWriteDeadline(time.Now().Add(1 * time.Second)); err != nil {
return fmt.Errorf("setting write deadline failed: %s", err)
}
}
if _, err := io.WriteString(e.process.Stdin, "\n"); err != nil {
return fmt.Errorf("Error writing to stdin: %s", err)
return fmt.Errorf("writing to stdin failed: %s", err)
}
case "none":
default:

View File

@ -153,19 +153,22 @@ var counter = flag.Bool("counter", false,
func TestMain(m *testing.M) {
flag.Parse()
if *counter {
runCounterProgram()
if err := runCounterProgram(); err != nil {
os.Exit(1)
}
os.Exit(0)
}
code := m.Run()
os.Exit(code)
}
func runCounterProgram() {
func runCounterProgram() error {
i := 0
serializer, err := serializers.NewInfluxSerializer()
if err != nil {
//nolint:errcheck,revive // Test will fail anyway
fmt.Fprintln(os.Stderr, "ERR InfluxSerializer failed to load")
os.Exit(1)
return err
}
scanner := bufio.NewScanner(os.Stdin)
@ -181,9 +184,13 @@ func runCounterProgram() {
b, err := serializer.Serialize(m)
if err != nil {
//nolint:errcheck,revive // Test will fail anyway
fmt.Fprintf(os.Stderr, "ERR %v\n", err)
os.Exit(1)
return err
}
if _, err := fmt.Fprint(os.Stdout, string(b)); err != nil {
return err
}
fmt.Fprint(os.Stdout, string(b))
}
return nil
}

View File

@ -57,8 +57,7 @@ var (
// New creates a new shim interface
func New() *Shim {
fmt.Fprintf(os.Stderr, "%s is deprecated; please change your import to %s\n",
oldpkg, newpkg)
_, _ = fmt.Fprintf(os.Stderr, "%s is deprecated; please change your import to %s\n", oldpkg, newpkg)
return &Shim{
stdin: os.Stdin,
stdout: os.Stdout,
@ -155,7 +154,9 @@ loop:
return fmt.Errorf("failed to serialize metric: %s", err)
}
// Write this to stdout
fmt.Fprint(s.stdout, string(b))
if _, err := fmt.Fprint(s.stdout, string(b)); err != nil {
return fmt.Errorf("failed to write %q to stdout: %s", string(b), err)
}
}
}
@ -232,11 +233,17 @@ func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc tel
return
case <-gatherPromptCh:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err)
if _, perr := fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err); perr != nil {
acc.AddError(err)
acc.AddError(perr)
}
}
case <-t.C:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err)
if _, perr := fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err); perr != nil {
acc.AddError(err)
acc.AddError(perr)
}
}
}
}

View File

@ -37,7 +37,7 @@ func TestShimUSR1SignalingWorks(t *testing.T) {
return // test is done
default:
// test isn't done, keep going.
process.Signal(syscall.SIGUSR1)
require.NoError(t, process.Signal(syscall.SIGUSR1))
time.Sleep(200 * time.Millisecond)
}
}
@ -51,7 +51,7 @@ func TestShimUSR1SignalingWorks(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out)
stdinWriter.Close()
require.NoError(t, stdinWriter.Close())
readUntilEmpty(r)
<-exited

View File

@ -36,7 +36,8 @@ func TestShimStdinSignalingWorks(t *testing.T) {
metricProcessed, exited := runInputPlugin(t, 40*time.Second, stdinReader, stdoutWriter, nil)
stdinWriter.Write([]byte("\n"))
_, err := stdinWriter.Write([]byte("\n"))
require.NoError(t, err)
<-metricProcessed
@ -45,7 +46,7 @@ func TestShimStdinSignalingWorks(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out)
stdinWriter.Close()
require.NoError(t, stdinWriter.Close())
readUntilEmpty(r)
@ -71,7 +72,7 @@ func runInputPlugin(t *testing.T, interval time.Duration, stdin io.Reader, stdou
shim.stderr = stderr
}
shim.AddInput(inp)
require.NoError(t, shim.AddInput(inp))
go func() {
err := shim.Run(interval)
require.NoError(t, err)
@ -112,8 +113,8 @@ func (i *testInput) Stop() {
}
func TestLoadConfig(t *testing.T) {
os.Setenv("SECRET_TOKEN", "xxxxxxxxxx")
os.Setenv("SECRET_VALUE", `test"\test`)
require.NoError(t, os.Setenv("SECRET_TOKEN", "xxxxxxxxxx"))
require.NoError(t, os.Setenv("SECRET_VALUE", `test"\test`))
inputs.Add("test", func() telegraf.Input {
return &serviceInput{}

View File

@ -101,25 +101,31 @@ func TestHelperProcess(_ *testing.T) {
cmd, args := args[3], args[4:]
if !strings.HasSuffix(cmd, "fail2ban-client") {
//nolint:errcheck,revive // Test will fail anyway
fmt.Fprint(os.Stdout, "command not found")
os.Exit(1)
}
if len(args) == 1 && args[0] == "status" {
//nolint:errcheck,revive // Test will fail anyway
fmt.Fprint(os.Stdout, execStatusOutput)
os.Exit(0)
} else if len(args) == 2 && args[0] == "status" {
if args[1] == "sshd" {
//nolint:errcheck,revive // Test will fail anyway
fmt.Fprint(os.Stdout, execStatusSshdOutput)
os.Exit(0)
} else if args[1] == "postfix" {
//nolint:errcheck,revive // Test will fail anyway
fmt.Fprint(os.Stdout, execStatusPostfixOutput)
os.Exit(0)
} else if args[1] == "dovecot" {
//nolint:errcheck,revive // Test will fail anyway
fmt.Fprint(os.Stdout, execStatusDovecotOutput)
os.Exit(0)
}
}
//nolint:errcheck,revive // Test will fail anyway
fmt.Fprint(os.Stdout, "invalid argument")
os.Exit(1)
}

View File

@ -7,7 +7,6 @@ import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -162,7 +161,8 @@ func TestJSONSuccess(t *testing.T) {
payload = devicesJSON
}
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, payload)
_, err := fmt.Fprintln(w, payload)
require.NoError(t, err)
}))
defer ts.Close()
@ -178,7 +178,7 @@ func TestJSONSuccess(t *testing.T) {
require.NoError(t, err)
// Gather should add 5 metrics
assert.Equal(t, uint64(5), acc.NMetrics())
require.Equal(t, uint64(5), acc.NMetrics())
// Ensure fields / values are correct - Device 1
tags := map[string]string{"deviceId": "1", "section": "Section 1", "room": "Room 1", "name": "Device 1", "type": "com.fibaro.binarySwitch"}

View File

@ -15,7 +15,6 @@ import (
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -29,7 +28,7 @@ func TestRefreshFilePaths(t *testing.T) {
err = r.refreshFilePaths()
require.NoError(t, err)
assert.Equal(t, 2, len(r.filenames))
require.Equal(t, 2, len(r.filenames))
}
func TestFileTag(t *testing.T) {
@ -47,7 +46,7 @@ func TestFileTag(t *testing.T) {
DataFormat: "json",
}
nParser, err := parsers.NewParser(&parserConfig)
assert.NoError(t, err)
require.NoError(t, err)
r.parser = nParser
err = r.Gather(&acc)
@ -55,8 +54,8 @@ func TestFileTag(t *testing.T) {
for _, m := range acc.Metrics {
for key, value := range m.Tags {
assert.Equal(t, r.FileTag, key)
assert.Equal(t, filepath.Base(r.Files[0]), value)
require.Equal(t, r.FileTag, key)
require.Equal(t, filepath.Base(r.Files[0]), value)
}
}
}
@ -74,12 +73,12 @@ func TestJSONParserCompile(t *testing.T) {
TagKeys: []string{"parent_ignored_child"},
}
nParser, err := parsers.NewParser(&parserConfig)
assert.NoError(t, err)
require.NoError(t, err)
r.parser = nParser
r.Gather(&acc)
assert.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags)
assert.Equal(t, 5, len(acc.Metrics[0].Fields))
require.NoError(t, r.Gather(&acc))
require.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags)
require.Equal(t, 5, len(acc.Metrics[0].Fields))
}
func TestGrokParser(t *testing.T) {
@ -98,10 +97,10 @@ func TestGrokParser(t *testing.T) {
nParser, err := parsers.NewParser(&parserConfig)
r.parser = nParser
assert.NoError(t, err)
require.NoError(t, err)
err = r.Gather(&acc)
assert.Equal(t, len(acc.Metrics), 2)
require.Equal(t, len(acc.Metrics), 2)
}
func TestCharacterEncoding(t *testing.T) {

View File

@ -35,7 +35,7 @@ func TestNoFiltersOnChildDir(t *testing.T) {
tags := map[string]string{"directory": getTestdataDir() + "/subdir"}
acc := testutil.Accumulator{}
acc.GatherError(fc.Gather)
require.NoError(t, acc.GatherError(fc.Gather))
require.True(t, acc.HasPoint("filecount", tags, "count", int64(len(matches))))
require.True(t, acc.HasPoint("filecount", tags, "size_bytes", int64(600)))
}
@ -48,7 +48,7 @@ func TestNoRecursiveButSuperMeta(t *testing.T) {
tags := map[string]string{"directory": getTestdataDir() + "/subdir"}
acc := testutil.Accumulator{}
acc.GatherError(fc.Gather)
require.NoError(t, acc.GatherError(fc.Gather))
require.True(t, acc.HasPoint("filecount", tags, "count", int64(len(matches))))
require.True(t, acc.HasPoint("filecount", tags, "size_bytes", int64(200)))
@ -77,7 +77,7 @@ func TestDoubleAndSimpleStar(t *testing.T) {
tags := map[string]string{"directory": getTestdataDir() + "/subdir/nested2"}
acc := testutil.Accumulator{}
acc.GatherError(fc.Gather)
require.NoError(t, acc.GatherError(fc.Gather))
require.True(t, acc.HasPoint("filecount", tags, "count", int64(len(matches))))
require.True(t, acc.HasPoint("filecount", tags, "size_bytes", int64(400)))
@ -235,7 +235,7 @@ func getFakeFileSystem(basePath string) fakeFileSystem {
func fileCountEquals(t *testing.T, fc FileCount, expectedCount int, expectedSize int) {
tags := map[string]string{"directory": getTestdataDir()}
acc := testutil.Accumulator{}
acc.GatherError(fc.Gather)
require.NoError(t, acc.GatherError(fc.Gather))
require.True(t, acc.HasPoint("filecount", tags, "count", int64(expectedCount)))
require.True(t, acc.HasPoint("filecount", tags, "size_bytes", int64(expectedSize)))
}

View File

@ -10,7 +10,6 @@ import (
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/testutil"
@ -30,7 +29,7 @@ func TestGatherNoMd5(t *testing.T) {
}
acc := testutil.Accumulator{}
acc.GatherError(fs.Gather)
require.NoError(t, acc.GatherError(fs.Gather))
tags1 := map[string]string{
"file": filepath.Join(testdataDir, "log1.log"),
@ -61,7 +60,7 @@ func TestGatherExplicitFiles(t *testing.T) {
}
acc := testutil.Accumulator{}
acc.GatherError(fs.Gather)
require.NoError(t, acc.GatherError(fs.Gather))
tags1 := map[string]string{
"file": filepath.Join(testdataDir, "log1.log"),
@ -94,10 +93,10 @@ func TestNonExistentFile(t *testing.T) {
require.NoError(t, acc.GatherError(fs.Gather))
acc.AssertContainsFields(t, "filestat", map[string]interface{}{"exists": int64(0)})
assert.False(t, acc.HasField("filestat", "error"))
assert.False(t, acc.HasField("filestat", "md5_sum"))
assert.False(t, acc.HasField("filestat", "size_bytes"))
assert.False(t, acc.HasField("filestat", "modification_time"))
require.False(t, acc.HasField("filestat", "error"))
require.False(t, acc.HasField("filestat", "md5_sum"))
require.False(t, acc.HasField("filestat", "size_bytes"))
require.False(t, acc.HasField("filestat", "modification_time"))
}
func TestGatherGlob(t *testing.T) {
@ -109,7 +108,7 @@ func TestGatherGlob(t *testing.T) {
}
acc := testutil.Accumulator{}
acc.GatherError(fs.Gather)
require.NoError(t, acc.GatherError(fs.Gather))
tags1 := map[string]string{
"file": filepath.Join(testdataDir, "log1.log"),
@ -135,7 +134,7 @@ func TestGatherSuperAsterisk(t *testing.T) {
}
acc := testutil.Accumulator{}
acc.GatherError(fs.Gather)
require.NoError(t, acc.GatherError(fs.Gather))
tags1 := map[string]string{
"file": filepath.Join(testdataDir, "log1.log"),
@ -167,7 +166,7 @@ func TestModificationTime(t *testing.T) {
}
acc := testutil.Accumulator{}
acc.GatherError(fs.Gather)
require.NoError(t, acc.GatherError(fs.Gather))
tags1 := map[string]string{
"file": filepath.Join(testdataDir, "log1.log"),
@ -185,7 +184,7 @@ func TestNoModificationTime(t *testing.T) {
}
acc := testutil.Accumulator{}
acc.GatherError(fs.Gather)
require.NoError(t, acc.GatherError(fs.Gather))
tags1 := map[string]string{
"file": filepath.Join(testdataDir, "non_existent_file"),
@ -196,11 +195,11 @@ func TestNoModificationTime(t *testing.T) {
func TestGetMd5(t *testing.T) {
md5, err := getMd5(filepath.Join(testdataDir, "test.conf"))
assert.NoError(t, err)
assert.Equal(t, "5a7e9b77fa25e7bb411dbd17cf403c1f", md5)
require.NoError(t, err)
require.Equal(t, "5a7e9b77fa25e7bb411dbd17cf403c1f", md5)
md5, err = getMd5("/tmp/foo/bar/fooooo")
assert.Error(t, err)
require.Error(t, err)
}
func getTestdataDir() string {

View File

@ -16,7 +16,8 @@ func TestFireboard(t *testing.T) {
// Create a test server with the const response JSON
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, response)
_, err := fmt.Fprintln(w, response)
require.NoError(t, err)
}))
defer ts.Close()

View File

@ -9,7 +9,7 @@ import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// sampleJSON from fluentd version '0.14.9'
@ -122,7 +122,8 @@ func Test_Gather(t *testing.T) {
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, "%s", string(sampleJSON))
_, err := fmt.Fprintf(w, "%s", string(sampleJSON))
require.NoError(t, err)
}))
requestURL, err := url.Parse(fluentdTest.Endpoint)
@ -144,15 +145,15 @@ func Test_Gather(t *testing.T) {
t.Errorf("acc.HasMeasurement: expected fluentd")
}
assert.Equal(t, expectedOutput[0].PluginID, acc.Metrics[0].Tags["plugin_id"])
assert.Equal(t, expectedOutput[0].PluginType, acc.Metrics[0].Tags["plugin_type"])
assert.Equal(t, expectedOutput[0].PluginCategory, acc.Metrics[0].Tags["plugin_category"])
assert.Equal(t, *expectedOutput[0].RetryCount, acc.Metrics[0].Fields["retry_count"])
require.Equal(t, expectedOutput[0].PluginID, acc.Metrics[0].Tags["plugin_id"])
require.Equal(t, expectedOutput[0].PluginType, acc.Metrics[0].Tags["plugin_type"])
require.Equal(t, expectedOutput[0].PluginCategory, acc.Metrics[0].Tags["plugin_category"])
require.Equal(t, *expectedOutput[0].RetryCount, acc.Metrics[0].Fields["retry_count"])
assert.Equal(t, expectedOutput[1].PluginID, acc.Metrics[1].Tags["plugin_id"])
assert.Equal(t, expectedOutput[1].PluginType, acc.Metrics[1].Tags["plugin_type"])
assert.Equal(t, expectedOutput[1].PluginCategory, acc.Metrics[1].Tags["plugin_category"])
assert.Equal(t, *expectedOutput[1].RetryCount, acc.Metrics[1].Fields["retry_count"])
assert.Equal(t, *expectedOutput[1].BufferQueueLength, acc.Metrics[1].Fields["buffer_queue_length"])
assert.Equal(t, *expectedOutput[1].BufferTotalQueuedSize, acc.Metrics[1].Fields["buffer_total_queued_size"])
require.Equal(t, expectedOutput[1].PluginID, acc.Metrics[1].Tags["plugin_id"])
require.Equal(t, expectedOutput[1].PluginType, acc.Metrics[1].Tags["plugin_type"])
require.Equal(t, expectedOutput[1].PluginCategory, acc.Metrics[1].Tags["plugin_category"])
require.Equal(t, *expectedOutput[1].RetryCount, acc.Metrics[1].Fields["retry_count"])
require.Equal(t, *expectedOutput[1].BufferQueueLength, acc.Metrics[1].Fields["buffer_queue_length"])
require.Equal(t, *expectedOutput[1].BufferTotalQueuedSize, acc.Metrics[1].Fields["buffer_total_queued_size"])
}

View File

@ -114,8 +114,14 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
return err
}
longPath, _ := c.handlePath(gnmiLongPath, nil, "")
shortPath, _ := c.handlePath(gnmiShortPath, nil, "")
longPath, _, err := c.handlePath(gnmiLongPath, nil, "")
if err != nil {
return fmt.Errorf("handling long-path failed: %v", err)
}
shortPath, _, err := c.handlePath(gnmiShortPath, nil, "")
if err != nil {
return fmt.Errorf("handling short-path failed: %v", err)
}
name := subscription.Name
// If the user didn't provide a measurement name, use last path element
@ -257,7 +263,10 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmi.Subs
prefixTags := make(map[string]string)
if response.Update.Prefix != nil {
prefix, prefixAliasPath = c.handlePath(response.Update.Prefix, prefixTags, "")
var err error
if prefix, prefixAliasPath, err = c.handlePath(response.Update.Prefix, prefixTags, ""); err != nil {
c.Log.Errorf("handling path %q failed: %v", response.Update.Prefix, err)
}
}
prefixTags["source"], _, _ = net.SplitHostPort(address)
prefixTags["path"] = prefix
@ -307,7 +316,9 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmi.Subs
}
}
grouper.Add(name, tags, timestamp, key, v)
if err := grouper.Add(name, tags, timestamp, key, v); err != nil {
c.Log.Errorf("cannot add to grouper: %v", err)
}
}
lastAliasPath = aliasPath
@ -321,14 +332,17 @@ func (c *GNMI) handleSubscribeResponseUpdate(address string, response *gnmi.Subs
// HandleTelemetryField and add it to a measurement
func (c *GNMI) handleTelemetryField(update *gnmi.Update, tags map[string]string, prefix string) (string, map[string]interface{}) {
path, aliasPath := c.handlePath(update.Path, tags, prefix)
gpath, aliasPath, err := c.handlePath(update.Path, tags, prefix)
if err != nil {
c.Log.Errorf("handling path %q failed: %v", update.Path, err)
}
var value interface{}
var jsondata []byte
// Make sure a value is actually set
if update.Val == nil || update.Val.Value == nil {
c.Log.Infof("Discarded empty or legacy type value with path: %q", path)
c.Log.Infof("Discarded empty or legacy type value with path: %q", gpath)
return aliasPath, nil
}
@ -355,7 +369,7 @@ func (c *GNMI) handleTelemetryField(update *gnmi.Update, tags map[string]string,
jsondata = val.JsonVal
}
name := strings.Replace(path, "-", "_", -1)
name := strings.Replace(gpath, "-", "_", -1)
fields := make(map[string]interface{})
if value != nil {
fields[name] = value
@ -364,28 +378,38 @@ func (c *GNMI) handleTelemetryField(update *gnmi.Update, tags map[string]string,
c.acc.AddError(fmt.Errorf("failed to parse JSON value: %v", err))
} else {
flattener := jsonparser.JSONFlattener{Fields: fields}
flattener.FullFlattenJSON(name, value, true, true)
if err := flattener.FullFlattenJSON(name, value, true, true); err != nil {
c.acc.AddError(fmt.Errorf("failed to flatten JSON: %v", err))
}
}
}
return aliasPath, fields
}
// Parse path to path-buffer and tag-field
func (c *GNMI) handlePath(path *gnmi.Path, tags map[string]string, prefix string) (string, string) {
func (c *GNMI) handlePath(path *gnmi.Path, tags map[string]string, prefix string) (string, string, error) {
var aliasPath string
builder := bytes.NewBufferString(prefix)
// Prefix with origin
if len(path.Origin) > 0 {
builder.WriteString(path.Origin)
builder.WriteRune(':')
if _, err := builder.WriteString(path.Origin); err != nil {
return "", "", err
}
if _, err := builder.WriteRune(':'); err != nil {
return "", "", err
}
}
// Parse generic keys from prefix
for _, elem := range path.Elem {
if len(elem.Name) > 0 {
builder.WriteRune('/')
builder.WriteString(elem.Name)
if _, err := builder.WriteRune('/'); err != nil {
return "", "", err
}
if _, err := builder.WriteString(elem.Name); err != nil {
return "", "", err
}
}
name := builder.String()
@ -407,7 +431,7 @@ func (c *GNMI) handlePath(path *gnmi.Path, tags map[string]string, prefix string
}
}
return builder.String(), aliasPath
return builder.String(), aliasPath, nil
}
//ParsePath from XPath-like string to gNMI path structure

View File

@ -231,13 +231,18 @@ func TestNotification(t *testing.T) {
server: &MockServer{
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
notification := mockGNMINotification()
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}})
err := server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
if err != nil {
return err
}
err = server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_SyncResponse{SyncResponse: true}})
if err != nil {
return err
}
notification.Prefix.Elem[0].Key["foo"] = "bar2"
notification.Update[0].Path.Elem[1].Key["name"] = "str2"
notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_JsonVal{JsonVal: []byte{'"', '1', '2', '3', '"'}}}
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
return nil
return server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
},
},
expected: []telegraf.Metric{
@ -348,8 +353,7 @@ func TestNotification(t *testing.T) {
},
},
}
server.Send(response)
return nil
return server.Send(response)
},
},
expected: []telegraf.Metric{
@ -419,10 +423,9 @@ func TestSubscribeResponseError(t *testing.T) {
var mc uint32 = 7
ml := &MockLogger{}
plugin := &GNMI{Log: ml}
errorResponse := &gnmi.SubscribeResponse_Error{
Error: &gnmi.Error{Message: me, Code: mc}}
plugin.handleSubscribeResponse(
"127.0.0.1:0", &gnmi.SubscribeResponse{Response: errorResponse})
// TODO: FIX SA1019: gnmi.Error is deprecated: Do not use.
errorResponse := &gnmi.SubscribeResponse_Error{Error: &gnmi.Error{Message: me, Code: mc}}
plugin.handleSubscribeResponse("127.0.0.1:0", &gnmi.SubscribeResponse{Response: errorResponse})
require.NotEmpty(t, ml.lastFormat)
require.Equal(t, ml.lastArgs, []interface{}{mc, me})
}
@ -442,8 +445,7 @@ func TestRedial(t *testing.T) {
gnmiServer := &MockServer{
SubscribeF: func(server gnmi.GNMI_SubscribeServer) error {
notification := mockGNMINotification()
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
return nil
return server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
},
GRPCServer: grpcServer,
}
@ -476,8 +478,7 @@ func TestRedial(t *testing.T) {
notification.Prefix.Elem[0].Key["foo"] = "bar2"
notification.Update[0].Path.Elem[1].Key["name"] = "str2"
notification.Update[0].Val = &gnmi.TypedValue{Value: &gnmi.TypedValue_BoolVal{BoolVal: false}}
server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
return nil
return server.Send(&gnmi.SubscribeResponse{Response: &gnmi.SubscribeResponse_Update{Update: notification}})
},
GRPCServer: grpcServer,
}

View File

@ -13,7 +13,6 @@ import (
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -27,13 +26,15 @@ func (s statServer) serverSocket(l net.Listener) {
}
go func(c net.Conn) {
defer c.Close()
buf := make([]byte, 1024)
n, _ := c.Read(buf)
data := buf[:n]
if string(data) == "show stat\n" {
//nolint:errcheck,revive // we return anyway
c.Write([]byte(csvOutputSample))
c.Close()
}
}(conn)
}
@ -45,15 +46,18 @@ func TestHaproxyGeneratesMetricsWithAuthentication(t *testing.T) {
username, password, ok := r.BasicAuth()
if !ok {
w.WriteHeader(http.StatusNotFound)
fmt.Fprint(w, "Unauthorized")
_, err := fmt.Fprint(w, "Unauthorized")
require.NoError(t, err)
return
}
if username == "user" && password == "password" {
fmt.Fprint(w, csvOutputSample)
_, err := fmt.Fprint(w, csvOutputSample)
require.NoError(t, err)
} else {
w.WriteHeader(http.StatusNotFound)
fmt.Fprint(w, "Unauthorized")
_, err := fmt.Fprint(w, "Unauthorized")
require.NoError(t, err)
}
}))
defer ts.Close()
@ -83,13 +87,14 @@ func TestHaproxyGeneratesMetricsWithAuthentication(t *testing.T) {
Servers: []string{ts.URL},
}
r.Gather(&acc)
require.NoError(t, r.Gather(&acc))
require.NotEmpty(t, acc.Errors)
}
func TestHaproxyGeneratesMetricsWithoutAuthentication(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, csvOutputSample)
_, err := fmt.Fprint(w, csvOutputSample)
require.NoError(t, err)
}))
defer ts.Close()
@ -99,8 +104,7 @@ func TestHaproxyGeneratesMetricsWithoutAuthentication(t *testing.T) {
var acc testutil.Accumulator
err := r.Gather(&acc)
require.NoError(t, err)
require.NoError(t, r.Gather(&acc))
tags := map[string]string{
"server": ts.Listener.Addr().String(),
@ -121,7 +125,7 @@ func TestHaproxyGeneratesMetricsUsingSocket(t *testing.T) {
_badmask := filepath.Join(os.TempDir(), "test-fail-haproxy*.sock")
for i := 0; i < 5; i++ {
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
require.NoError(t, binary.Read(rand.Reader, binary.LittleEndian, &randomNumber))
sockname := filepath.Join(os.TempDir(), fmt.Sprintf("test-haproxy%d.sock", randomNumber))
sock, err := net.Listen("unix", sockname)
@ -161,7 +165,7 @@ func TestHaproxyGeneratesMetricsUsingSocket(t *testing.T) {
// This mask should not match any socket
r.Servers = []string{_badmask}
r.Gather(&acc)
require.NoError(t, r.Gather(&acc))
require.NotEmpty(t, acc.Errors)
}
@ -174,12 +178,13 @@ func TestHaproxyDefaultGetFromLocalhost(t *testing.T) {
err := r.Gather(&acc)
require.Error(t, err)
assert.Contains(t, err.Error(), "127.0.0.1:1936/haproxy?stats/;csv")
require.Contains(t, err.Error(), "127.0.0.1:1936/haproxy?stats/;csv")
}
func TestHaproxyKeepFieldNames(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, csvOutputSample)
_, err := fmt.Fprint(w, csvOutputSample)
require.NoError(t, err)
}))
defer ts.Close()
@ -190,8 +195,7 @@ func TestHaproxyKeepFieldNames(t *testing.T) {
var acc testutil.Accumulator
err := r.Gather(&acc)
require.NoError(t, err)
require.NoError(t, r.Gather(&acc))
tags := map[string]string{
"server": ts.Listener.Addr().String(),

View File

@ -6,21 +6,21 @@ import (
"github.com/influxdata/telegraf/selfstat"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSelfPlugin(t *testing.T) {
s := NewSelf()
acc := &testutil.Accumulator{}
s.Gather(acc)
assert.True(t, acc.HasMeasurement("internal_memstats"))
require.NoError(t, s.Gather(acc))
require.True(t, acc.HasMeasurement("internal_memstats"))
// test that a registered stat is incremented
stat := selfstat.Register("mytest", "test", map[string]string{"test": "foo"})
stat.Incr(1)
stat.Incr(2)
s.Gather(acc)
require.NoError(t, s.Gather(acc))
acc.AssertContainsTaggedFields(t, "internal_mytest",
map[string]interface{}{
"test": int64(3),
@ -34,7 +34,7 @@ func TestSelfPlugin(t *testing.T) {
// test that a registered stat is set properly
stat.Set(101)
s.Gather(acc)
require.NoError(t, s.Gather(acc))
acc.AssertContainsTaggedFields(t, "internal_mytest",
map[string]interface{}{
"test": int64(101),
@ -51,7 +51,7 @@ func TestSelfPlugin(t *testing.T) {
timing := selfstat.RegisterTiming("mytest", "test_ns", map[string]string{"test": "foo"})
timing.Incr(100)
timing.Incr(200)
s.Gather(acc)
require.NoError(t, s.Gather(acc))
acc.AssertContainsTaggedFields(t, "internal_mytest",
map[string]interface{}{
"test": int64(101),