fix: Linter fixes for plugins/outputs/[p-z]* (#10139)

Co-authored-by: Pawel Zak <Pawel Zak>
This commit is contained in:
Paweł Żak 2021-11-24 20:33:45 +01:00 committed by GitHub
parent 64aee2c87b
commit 64bc0ae9c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 307 additions and 270 deletions

View File

@ -10,6 +10,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
@ -17,8 +21,6 @@ import (
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v1" "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v1"
"github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v2" "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
) )
var ( var (
@ -121,9 +123,15 @@ func (p *PrometheusClient) Init() error {
for collector := range defaultCollectors { for collector := range defaultCollectors {
switch collector { switch collector {
case "gocollector": case "gocollector":
registry.Register(prometheus.NewGoCollector()) err := registry.Register(collectors.NewGoCollector())
if err != nil {
return err
}
case "process": case "process":
registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) err := registry.Register(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
if err != nil {
return err
}
default: default:
return fmt.Errorf("unrecognized collector %s", collector) return fmt.Errorf("unrecognized collector %s", collector)
} }
@ -160,7 +168,10 @@ func (p *PrometheusClient) Init() error {
rangeHandler := internal.IPRangeHandler(ipRange, onError) rangeHandler := internal.IPRangeHandler(ipRange, onError)
promHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}) promHandler := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError})
landingPageHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { landingPageHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Telegraf Output Plugin: Prometheus Client ")) _, err := w.Write([]byte("Telegraf Output Plugin: Prometheus Client "))
if err != nil {
p.Log.Errorf("Error occurred when writing HTTP reply: %v", err)
}
}) })
mux := http.NewServeMux() mux := http.NewServeMux()
@ -229,7 +240,7 @@ func onError(rw http.ResponseWriter, code int) {
http.Error(rw, http.StatusText(code), code) http.Error(rw, http.StatusText(code), code)
} }
// Address returns the address the plugin is listening on. If not listening // URL returns the address the plugin is listening on. If not listening
// an empty string is returned. // an empty string is returned.
func (p *PrometheusClient) URL() string { func (p *PrometheusClient) URL() string {
if p.url != nil { if p.url != nil {

View File

@ -10,14 +10,15 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus" inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
) )
func TestMetricVersion1(t *testing.T) { func TestMetricVersion1(t *testing.T) {
Logger := testutil.Logger{Name: "outputs.prometheus_client"} logger := testutil.Logger{Name: "outputs.prometheus_client"}
tests := []struct { tests := []struct {
name string name string
output *PrometheusClient output *PrometheusClient
@ -31,7 +32,7 @@ func TestMetricVersion1(t *testing.T) {
MetricVersion: 1, MetricVersion: 1,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -58,7 +59,7 @@ cpu_time_idle{host="example.org"} 42
MetricVersion: 1, MetricVersion: 1,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -85,7 +86,7 @@ cpu_time_idle{host="example.org"} 42
MetricVersion: 1, MetricVersion: 1,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -114,7 +115,7 @@ cpu_time_idle{host="example.org"} 42
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
StringAsLabel: true, StringAsLabel: true,
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -141,7 +142,7 @@ cpu_time_idle{host_name="example.org"} 42
MetricVersion: 1, MetricVersion: 1,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -169,7 +170,7 @@ cpu_time_idle{host="example.org"} 42
MetricVersion: 1, MetricVersion: 1,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -209,7 +210,7 @@ http_request_duration_seconds_count 144320
MetricVersion: 1, MetricVersion: 1,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -272,7 +273,7 @@ rpc_duration_seconds_count 2693
} }
func TestRoundTripMetricVersion1(t *testing.T) { func TestRoundTripMetricVersion1(t *testing.T) {
Logger := testutil.Logger{Name: "outputs.prometheus_client"} logger := testutil.Logger{Name: "outputs.prometheus_client"}
tests := []struct { tests := []struct {
name string name string
data []byte data []byte
@ -348,17 +349,18 @@ rpc_duration_seconds_count 2693
ts := httptest.NewServer(http.NotFoundHandler()) ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close() defer ts.Close()
url := fmt.Sprintf("http://%s", ts.Listener.Addr()) address := fmt.Sprintf("http://%s", ts.Listener.Addr())
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write(tt.data) _, err := w.Write(tt.data)
require.NoError(t, err)
}) })
input := &inputs.Prometheus{ input := &inputs.Prometheus{
URLs: []string{url}, URLs: []string{address},
URLTag: "", URLTag: "",
MetricVersion: 1, MetricVersion: 1,
} }
@ -375,7 +377,7 @@ rpc_duration_seconds_count 2693
Listen: "127.0.0.1:0", Listen: "127.0.0.1:0",
Path: defaultPath, Path: defaultPath,
MetricVersion: 1, MetricVersion: 1,
Log: Logger, Log: logger,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
} }
err = output.Init() err = output.Init()
@ -391,6 +393,7 @@ rpc_duration_seconds_count 2693
resp, err := http.Get(output.URL()) resp, err := http.Get(output.URL())
require.NoError(t, err) require.NoError(t, err)
defer resp.Body.Close()
actual, err := io.ReadAll(resp.Body) actual, err := io.ReadAll(resp.Body)
require.NoError(t, err) require.NoError(t, err)
@ -403,12 +406,12 @@ rpc_duration_seconds_count 2693
} }
func TestLandingPage(t *testing.T) { func TestLandingPage(t *testing.T) {
Logger := testutil.Logger{Name: "outputs.prometheus_client"} logger := testutil.Logger{Name: "outputs.prometheus_client"}
output := PrometheusClient{ output := PrometheusClient{
Listen: ":0", Listen: ":0",
CollectorsExclude: []string{"process"}, CollectorsExclude: []string{"process"},
MetricVersion: 1, MetricVersion: 1,
Log: Logger, Log: logger,
} }
expected := "Telegraf Output Plugin: Prometheus Client" expected := "Telegraf Output Plugin: Prometheus Client"
@ -419,8 +422,11 @@ func TestLandingPage(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
u, err := url.Parse(fmt.Sprintf("http://%s/", output.url.Host)) u, err := url.Parse(fmt.Sprintf("http://%s/", output.url.Host))
require.NoError(t, err)
resp, err := http.Get(u.String()) resp, err := http.Get(u.String())
require.NoError(t, err) require.NoError(t, err)
defer resp.Body.Close()
actual, err := io.ReadAll(resp.Body) actual, err := io.ReadAll(resp.Body)
require.NoError(t, err) require.NoError(t, err)

View File

@ -9,14 +9,15 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus" inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
) )
func TestMetricVersion2(t *testing.T) { func TestMetricVersion2(t *testing.T) {
Logger := testutil.Logger{Name: "outputs.prometheus_client"} logger := testutil.Logger{Name: "outputs.prometheus_client"}
tests := []struct { tests := []struct {
name string name string
output *PrometheusClient output *PrometheusClient
@ -30,7 +31,7 @@ func TestMetricVersion2(t *testing.T) {
MetricVersion: 2, MetricVersion: 2,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -57,7 +58,7 @@ cpu_time_idle{host="example.org"} 42
MetricVersion: 2, MetricVersion: 2,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -86,7 +87,7 @@ rpc_duration_seconds_count 2693
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
ExportTimestamp: true, ExportTimestamp: true,
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -114,7 +115,7 @@ cpu_time_idle{host="example.org"} 42 0
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
StringAsLabel: true, StringAsLabel: true,
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -141,7 +142,7 @@ cpu_time_idle{host="example.org"} 42
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
StringAsLabel: false, StringAsLabel: false,
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -167,7 +168,7 @@ cpu_time_idle 42
MetricVersion: 2, MetricVersion: 2,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -194,7 +195,7 @@ cpu_time_idle{host="example.org"} 42
MetricVersion: 2, MetricVersion: 2,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -276,7 +277,7 @@ cpu_usage_idle_count{cpu="cpu1"} 20
MetricVersion: 2, MetricVersion: 2,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics", Path: "/metrics",
Log: Logger, Log: logger,
}, },
metrics: []telegraf.Metric{ metrics: []telegraf.Metric{
testutil.MustMetric( testutil.MustMetric(
@ -332,7 +333,7 @@ cpu_usage_idle_count{cpu="cpu1"} 20
} }
func TestRoundTripMetricVersion2(t *testing.T) { func TestRoundTripMetricVersion2(t *testing.T) {
Logger := testutil.Logger{Name: "outputs.prometheus_client"} logger := testutil.Logger{Name: "outputs.prometheus_client"}
tests := []struct { tests := []struct {
name string name string
data []byte data []byte
@ -414,7 +415,8 @@ rpc_duration_seconds_count 2693
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write(tt.data) _, err := w.Write(tt.data)
require.NoError(t, err)
}) })
input := &inputs.Prometheus{ input := &inputs.Prometheus{
@ -435,7 +437,7 @@ rpc_duration_seconds_count 2693
Listen: "127.0.0.1:0", Listen: "127.0.0.1:0",
Path: defaultPath, Path: defaultPath,
MetricVersion: 2, MetricVersion: 2,
Log: Logger, Log: logger,
CollectorsExclude: []string{"gocollector", "process"}, CollectorsExclude: []string{"gocollector", "process"},
} }
err = output.Init() err = output.Init()
@ -451,6 +453,7 @@ rpc_duration_seconds_count 2693
resp, err := http.Get(output.URL()) resp, err := http.Get(output.URL())
require.NoError(t, err) require.NoError(t, err)
defer resp.Body.Close()
actual, err := io.ReadAll(resp.Body) actual, err := io.ReadAll(resp.Body)
require.NoError(t, err) require.NoError(t, err)

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/amir/raidman" "github.com/amir/raidman"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
@ -78,12 +79,12 @@ func (r *Riemann) Connect() error {
return nil return nil
} }
func (r *Riemann) Close() error { func (r *Riemann) Close() (err error) {
if r.client != nil { if r.client != nil {
r.client.Close() err = r.client.Close()
r.client = nil r.client = nil
} }
return nil return err
} }
func (r *Riemann) SampleConfig() string { func (r *Riemann) SampleConfig() string {
@ -113,7 +114,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error {
} }
if err := r.client.SendMulti(events); err != nil { if err := r.client.SendMulti(events); err != nil {
r.Close() r.Close() //nolint:revive // There is another error which will be returned here
return fmt.Errorf("failed to send riemann message: %s", err) return fmt.Errorf("failed to send riemann message: %s", err)
} }
return nil return nil

View File

@ -7,6 +7,7 @@ import (
"strings" "strings"
"github.com/amir/raidman" "github.com/amir/raidman"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@ -48,9 +49,9 @@ func (r *Riemann) Close() error {
if r.client == nil { if r.client == nil {
return nil return nil
} }
r.client.Close() err := r.client.Close()
r.client = nil r.client = nil
return nil return err
} }
func (r *Riemann) SampleConfig() string { func (r *Riemann) SampleConfig() string {
@ -82,7 +83,7 @@ func (r *Riemann) Write(metrics []telegraf.Metric) error {
var senderr = r.client.SendMulti(events) var senderr = r.client.SendMulti(events)
if senderr != nil { if senderr != nil {
r.Close() // always returns nil r.Close() //nolint:revive // There is another error which will be returned here
return fmt.Errorf("failed to send riemann message (will try to reconnect), error: %s", senderr) return fmt.Errorf("failed to send riemann message (will try to reconnect), error: %s", senderr)
} }

View File

@ -296,10 +296,10 @@ func (s *Sensu) Write(metrics []telegraf.Metric) error {
return err return err
} }
return s.write(reqBody) return s.writeMetrics(reqBody)
} }
func (s *Sensu) write(reqBody []byte) error { func (s *Sensu) writeMetrics(reqBody []byte) error {
var reqBodyBuffer io.Reader = bytes.NewBuffer(reqBody) var reqBodyBuffer io.Reader = bytes.NewBuffer(reqBody)
method := http.MethodPost method := http.MethodPost

View File

@ -6,12 +6,13 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/signalfx/golib/v3/datapoint" "github.com/signalfx/golib/v3/datapoint"
"github.com/signalfx/golib/v3/datapoint/dpsink" "github.com/signalfx/golib/v3/datapoint/dpsink"
"github.com/signalfx/golib/v3/event" "github.com/signalfx/golib/v3/event"
"github.com/signalfx/golib/v3/sfxclient" "github.com/signalfx/golib/v3/sfxclient"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
) )
//init initializes the plugin context //init initializes the plugin context
@ -106,7 +107,7 @@ func (s *SignalFx) Connect() error {
if s.IngestURL != "" { if s.IngestURL != "" {
client.DatapointEndpoint = datapointEndpointForIngestURL(s.IngestURL) client.DatapointEndpoint = datapointEndpointForIngestURL(s.IngestURL)
client.EventEndpoint = eventEndpointForIngestURL(s.IngestURL) client.EventEndpoint = eventEndpointForIngestURL(s.IngestURL)
} else if s.SignalFxRealm != "" { } else if s.SignalFxRealm != "" { //nolint: revive // "Simplifying" if c {...} else {... return } would not simplify anything at all in this case
client.DatapointEndpoint = datapointEndpointForRealm(s.SignalFxRealm) client.DatapointEndpoint = datapointEndpointForRealm(s.SignalFxRealm)
client.EventEndpoint = eventEndpointForRealm(s.SignalFxRealm) client.EventEndpoint = eventEndpointForRealm(s.SignalFxRealm)
} else { } else {
@ -144,7 +145,7 @@ func (s *SignalFx) ConvertToSignalFx(metrics []telegraf.Metric) ([]*datapoint.Da
if metricValue, err := datapoint.CastMetricValueWithBool(val); err == nil { if metricValue, err := datapoint.CastMetricValueWithBool(val); err == nil {
var dp = datapoint.New(metricName, var dp = datapoint.New(metricName,
metricDims, metricDims,
metricValue.(datapoint.Value), metricValue,
metricType, metricType,
timestamp) timestamp)

View File

@ -7,13 +7,14 @@ import (
"testing" "testing"
"time" "time"
"github.com/signalfx/golib/v3/datapoint"
"github.com/signalfx/golib/v3/event"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/signalfx/golib/v3/datapoint"
"github.com/signalfx/golib/v3/event"
"github.com/stretchr/testify/require"
) )
type sink struct { type sink struct {
@ -436,7 +437,9 @@ func TestSignalFx_SignalFx(t *testing.T) {
measurements = append(measurements, m) measurements = append(measurements, m)
} }
s.Write(measurements) err := s.Write(measurements)
require.NoError(t, err)
require.Eventually(t, func() bool { return len(s.client.(*sink).dps) == len(tt.want.datapoints) }, 5*time.Second, 100*time.Millisecond) require.Eventually(t, func() bool { return len(s.client.(*sink).dps) == len(tt.want.datapoints) }, 5*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool { return len(s.client.(*sink).evs) == len(tt.want.events) }, 5*time.Second, 100*time.Millisecond) require.Eventually(t, func() bool { return len(s.client.(*sink).evs) == len(tt.want.events) }, 5*time.Second, 100*time.Millisecond)
@ -596,7 +599,8 @@ func TestSignalFx_Errors(t *testing.T) {
measurement.name, measurement.tags, measurement.fields, measurement.time, measurement.tp, measurement.name, measurement.tags, measurement.fields, measurement.time, measurement.tp,
) )
s.Write([]telegraf.Metric{m}) err := s.Write([]telegraf.Metric{m})
require.Error(t, err)
} }
for !(len(s.client.(*errorsink).dps) == len(tt.want.datapoints) && len(s.client.(*errorsink).evs) == len(tt.want.events)) { for !(len(s.client.(*errorsink).dps) == len(tt.want.datapoints) && len(s.client.(*errorsink).evs) == len(tt.want.events)) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)

View File

@ -3,7 +3,6 @@ package socket_writer
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"log"
"net" "net"
"strings" "strings"
"time" "time"
@ -21,6 +20,7 @@ type SocketWriter struct {
Address string Address string
KeepAlivePeriod *config.Duration KeepAlivePeriod *config.Duration
tlsint.ClientConfig tlsint.ClientConfig
Log telegraf.Logger `toml:"-"`
serializers.Serializer serializers.Serializer
@ -99,7 +99,7 @@ func (sw *SocketWriter) Connect() error {
} }
if err := sw.setKeepAlive(c); err != nil { if err := sw.setKeepAlive(c); err != nil {
log.Printf("unable to configure keep alive (%s): %s", sw.Address, err) sw.Log.Debugf("Unable to configure keep alive (%s): %s", sw.Address, err)
} }
//set encoder //set encoder
sw.encoder, err = internal.NewContentEncoder(sw.ContentEncoding) sw.encoder, err = internal.NewContentEncoder(sw.ContentEncoding)
@ -142,13 +142,13 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
for _, m := range metrics { for _, m := range metrics {
bs, err := sw.Serialize(m) bs, err := sw.Serialize(m)
if err != nil { if err != nil {
log.Printf("D! [outputs.socket_writer] Could not serialize metric: %v", err) sw.Log.Debugf("Could not serialize metric: %v", err)
continue continue
} }
bs, err = sw.encoder.Encode(bs) bs, err = sw.encoder.Encode(bs)
if err != nil { if err != nil {
log.Printf("D! [outputs.socket_writer] Could not encode metric: %v", err) sw.Log.Debugf("Could not encode metric: %v", err)
continue continue
} }
@ -156,7 +156,7 @@ func (sw *SocketWriter) Write(metrics []telegraf.Metric) error {
//TODO log & keep going with remaining strings //TODO log & keep going with remaining strings
if err, ok := err.(net.Error); !ok || !err.Temporary() { if err, ok := err.(net.Error); !ok || !err.Temporary() {
// permanent error. close the connection // permanent error. close the connection
sw.Close() sw.Close() //nolint:revive // There is another error which will be returned here
sw.Conn = nil sw.Conn = nil
return fmt.Errorf("closing connection: %v", err) return fmt.Errorf("closing connection: %v", err)
} }

View File

@ -9,10 +9,10 @@ import (
"sync" "sync"
"testing" "testing"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestSocketWriter_tcp(t *testing.T) { func TestSocketWriter_tcp(t *testing.T) {
@ -105,8 +105,8 @@ func testSocketWriterStream(t *testing.T, sw *SocketWriter, lconn net.Conn) {
require.True(t, scnr.Scan()) require.True(t, scnr.Scan())
mstr2in := scnr.Text() + "\n" mstr2in := scnr.Text() + "\n"
assert.Equal(t, string(mbs1out), mstr1in) require.Equal(t, string(mbs1out), mstr1in)
assert.Equal(t, string(mbs2out), mstr2in) require.Equal(t, string(mbs2out), mstr2in)
} }
func testSocketWriterPacket(t *testing.T, sw *SocketWriter, lconn net.PacketConn) { func testSocketWriterPacket(t *testing.T, sw *SocketWriter, lconn net.PacketConn) {
@ -132,8 +132,8 @@ func testSocketWriterPacket(t *testing.T, sw *SocketWriter, lconn net.PacketConn
} }
require.Len(t, mstrins, 2) require.Len(t, mstrins, 2)
assert.Equal(t, mbs1str, mstrins[0]) require.Equal(t, mbs1str, mstrins[0])
assert.Equal(t, mbs2str, mstrins[1]) require.Equal(t, mbs2str, mstrins[1])
} }
func TestSocketWriter_Write_err(t *testing.T) { func TestSocketWriter_Write_err(t *testing.T) {
@ -145,20 +145,26 @@ func TestSocketWriter_Write_err(t *testing.T) {
err = sw.Connect() err = sw.Connect()
require.NoError(t, err) require.NoError(t, err)
sw.Conn.(*net.TCPConn).SetReadBuffer(256) err = sw.Conn.(*net.TCPConn).SetReadBuffer(256)
require.NoError(t, err)
lconn, err := listener.Accept() lconn, err := listener.Accept()
require.NoError(t, err) require.NoError(t, err)
lconn.(*net.TCPConn).SetWriteBuffer(256) err = lconn.(*net.TCPConn).SetWriteBuffer(256)
require.NoError(t, err)
metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")} metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")}
// close the socket to generate an error // close the socket to generate an error
lconn.Close() err = lconn.Close()
sw.Conn.Close() require.NoError(t, err)
err = sw.Conn.Close()
require.NoError(t, err)
err = sw.Write(metrics) err = sw.Write(metrics)
require.Error(t, err) require.Error(t, err)
assert.Nil(t, sw.Conn) require.Nil(t, sw.Conn)
} }
func TestSocketWriter_Write_reconnect(t *testing.T) { func TestSocketWriter_Write_reconnect(t *testing.T) {
@ -170,12 +176,16 @@ func TestSocketWriter_Write_reconnect(t *testing.T) {
err = sw.Connect() err = sw.Connect()
require.NoError(t, err) require.NoError(t, err)
sw.Conn.(*net.TCPConn).SetReadBuffer(256) err = sw.Conn.(*net.TCPConn).SetReadBuffer(256)
require.NoError(t, err)
lconn, err := listener.Accept() lconn, err := listener.Accept()
require.NoError(t, err) require.NoError(t, err)
lconn.(*net.TCPConn).SetWriteBuffer(256) err = lconn.(*net.TCPConn).SetWriteBuffer(256)
lconn.Close() require.NoError(t, err)
err = lconn.Close()
require.NoError(t, err)
sw.Conn = nil sw.Conn = nil
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@ -191,13 +201,13 @@ func TestSocketWriter_Write_reconnect(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
wg.Wait() wg.Wait()
assert.NoError(t, lerr) require.NoError(t, lerr)
mbsout, _ := sw.Serialize(metrics[0]) mbsout, _ := sw.Serialize(metrics[0])
buf := make([]byte, 256) buf := make([]byte, 256)
n, err := lconn.Read(buf) n, err := lconn.Read(buf)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, string(mbsout), string(buf[:n])) require.Equal(t, string(mbsout), string(buf[:n]))
} }
func TestSocketWriter_udp_gzip(t *testing.T) { func TestSocketWriter_udp_gzip(t *testing.T) {

View File

@ -4,20 +4,20 @@ import (
"context" "context"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"log"
"path" "path"
"sort" "sort"
"strings" "strings"
monitoring "cloud.google.com/go/monitoring/apiv3/v2" // Imports the Stackdriver Monitoring client package. monitoring "cloud.google.com/go/monitoring/apiv3/v2" // Imports the Stackdriver Monitoring client package.
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"google.golang.org/api/option" "google.golang.org/api/option"
metricpb "google.golang.org/genproto/googleapis/api/metric" metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
) )
// Stackdriver is the Google Stackdriver config info. // Stackdriver is the Google Stackdriver config info.
@ -26,6 +26,7 @@ type Stackdriver struct {
Namespace string Namespace string
ResourceType string `toml:"resource_type"` ResourceType string `toml:"resource_type"`
ResourceLabels map[string]string `toml:"resource_labels"` ResourceLabels map[string]string `toml:"resource_labels"`
Log telegraf.Logger `toml:"-"`
client *monitoring.MetricClient client *monitoring.MetricClient
} }
@ -46,9 +47,9 @@ const (
// MaxInt is the max int64 value. // MaxInt is the max int64 value.
MaxInt = int(^uint(0) >> 1) MaxInt = int(^uint(0) >> 1)
errStringPointsOutOfOrder = "One or more of the points specified had an older end time than the most recent point" errStringPointsOutOfOrder = "one or more of the points specified had an older end time than the most recent point"
errStringPointsTooOld = "Data points cannot be written more than 24h in the past" errStringPointsTooOld = "data points cannot be written more than 24h in the past"
errStringPointsTooFrequent = "One or more points were written more frequently than the maximum sampling period configured for the metric" errStringPointsTooFrequent = "one or more points were written more frequently than the maximum sampling period configured for the metric"
) )
var sampleConfig = ` var sampleConfig = `
@ -118,15 +119,15 @@ type timeSeriesBuckets map[uint64][]*monitoringpb.TimeSeries
func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monitoringpb.TimeSeries) { func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monitoringpb.TimeSeries) {
h := fnv.New64a() h := fnv.New64a()
h.Write([]byte(m.Name())) h.Write([]byte(m.Name())) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte{'\n'}) h.Write([]byte{'\n'}) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte(f.Key)) h.Write([]byte(f.Key)) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte{'\n'}) h.Write([]byte{'\n'}) //nolint:revive // from hash.go: "It never returns an error"
for key, value := range m.Tags() { for key, value := range m.Tags() {
h.Write([]byte(key)) h.Write([]byte(key)) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte{'\n'}) h.Write([]byte{'\n'}) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte(value)) h.Write([]byte(value)) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte{'\n'}) h.Write([]byte{'\n'}) //nolint:revive // from hash.go: "It never returns an error"
} }
k := h.Sum64() k := h.Sum64()
@ -145,7 +146,7 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
for _, f := range m.FieldList() { for _, f := range m.FieldList() {
value, err := getStackdriverTypedValue(f.Value) value, err := getStackdriverTypedValue(f.Value)
if err != nil { if err != nil {
log.Printf("E! [outputs.stackdriver] get type failed: %s", err) s.Log.Errorf("Get type failed: %s", err)
continue continue
} }
@ -155,13 +156,13 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
metricKind, err := getStackdriverMetricKind(m.Type()) metricKind, err := getStackdriverMetricKind(m.Type())
if err != nil { if err != nil {
log.Printf("E! [outputs.stackdriver] get metric failed: %s", err) s.Log.Errorf("Get metric failed: %s", err)
continue continue
} }
timeInterval, err := getStackdriverTimeInterval(metricKind, StartTime, m.Time().Unix()) timeInterval, err := getStackdriverTimeInterval(metricKind, StartTime, m.Time().Unix())
if err != nil { if err != nil {
log.Printf("E! [outputs.stackdriver] get time interval failed: %s", err) s.Log.Errorf("Get time interval failed: %s", err)
continue continue
} }
@ -175,7 +176,7 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
timeSeries := &monitoringpb.TimeSeries{ timeSeries := &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{ Metric: &metricpb.Metric{
Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key), Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key),
Labels: getStackdriverLabels(m.TagList()), Labels: s.getStackdriverLabels(m.TagList()),
}, },
MetricKind: metricKind, MetricKind: metricKind,
Resource: &monitoredrespb.MonitoredResource{ Resource: &monitoredrespb.MonitoredResource{
@ -228,10 +229,10 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
if strings.Contains(err.Error(), errStringPointsOutOfOrder) || if strings.Contains(err.Error(), errStringPointsOutOfOrder) ||
strings.Contains(err.Error(), errStringPointsTooOld) || strings.Contains(err.Error(), errStringPointsTooOld) ||
strings.Contains(err.Error(), errStringPointsTooFrequent) { strings.Contains(err.Error(), errStringPointsTooFrequent) {
log.Printf("D! [outputs.stackdriver] unable to write to Stackdriver: %s", err) s.Log.Debugf("Unable to write to Stackdriver: %s", err)
return nil return nil
} }
log.Printf("E! [outputs.stackdriver] unable to write to Stackdriver: %s", err) s.Log.Errorf("Unable to write to Stackdriver: %s", err)
return err return err
} }
} }
@ -306,7 +307,7 @@ func getStackdriverTypedValue(value interface{}) (*monitoringpb.TypedValue, erro
case float64: case float64:
return &monitoringpb.TypedValue{ return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DoubleValue{ Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: float64(v), DoubleValue: v,
}, },
}, nil }, nil
case bool: case bool:
@ -323,39 +324,26 @@ func getStackdriverTypedValue(value interface{}) (*monitoringpb.TypedValue, erro
} }
} }
func getStackdriverLabels(tags []*telegraf.Tag) map[string]string { func (s *Stackdriver) getStackdriverLabels(tags []*telegraf.Tag) map[string]string {
labels := make(map[string]string) labels := make(map[string]string)
for _, t := range tags { for _, t := range tags {
labels[t.Key] = t.Value labels[t.Key] = t.Value
} }
for k, v := range labels { for k, v := range labels {
if len(k) > QuotaStringLengthForLabelKey { if len(k) > QuotaStringLengthForLabelKey {
log.Printf( s.Log.Warnf("Removing tag [%s] key exceeds string length for label key [%d]", k, QuotaStringLengthForLabelKey)
"W! [outputs.stackdriver] removing tag [%s] key exceeds string length for label key [%d]",
k,
QuotaStringLengthForLabelKey,
)
delete(labels, k) delete(labels, k)
continue continue
} }
if len(v) > QuotaStringLengthForLabelValue { if len(v) > QuotaStringLengthForLabelValue {
log.Printf( s.Log.Warnf("Removing tag [%s] value exceeds string length for label value [%d]", k, QuotaStringLengthForLabelValue)
"W! [outputs.stackdriver] removing tag [%s] value exceeds string length for label value [%d]",
k,
QuotaStringLengthForLabelValue,
)
delete(labels, k) delete(labels, k)
continue continue
} }
} }
if len(labels) > QuotaLabelsPerMetricDescriptor { if len(labels) > QuotaLabelsPerMetricDescriptor {
excess := len(labels) - QuotaLabelsPerMetricDescriptor excess := len(labels) - QuotaLabelsPerMetricDescriptor
log.Printf( s.Log.Warnf("Tag count [%d] exceeds quota for stackdriver labels [%d] removing [%d] random tags", len(labels), QuotaLabelsPerMetricDescriptor, excess)
"W! [outputs.stackdriver] tag count [%d] exceeds quota for stackdriver labels [%d] removing [%d] random tags",
len(labels),
QuotaLabelsPerMetricDescriptor,
excess,
)
for k := range labels { for k := range labels {
if excess == 0 { if excess == 0 {
break break

View File

@ -12,8 +12,6 @@ import (
"time" "time"
monitoring "cloud.google.com/go/monitoring/apiv3/v2" monitoring "cloud.google.com/go/monitoring/apiv3/v2"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/api/option" "google.golang.org/api/option"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
@ -22,6 +20,9 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
) )
// clientOpt is the option tests should use to connect to the test server. // clientOpt is the option tests should use to connect to the test server.
@ -65,6 +66,9 @@ func TestMain(m *testing.M) {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
// Ignore the returned error as the tests will fail anyway
//nolint:errcheck,revive
go serv.Serve(lis) go serv.Serve(lis)
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
@ -90,6 +94,7 @@ func TestWrite(t *testing.T) {
s := &Stackdriver{ s := &Stackdriver{
Project: fmt.Sprintf("projects/%s", "[PROJECT]"), Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
Namespace: "test", Namespace: "test",
Log: testutil.Logger{},
client: c, client: c,
} }
@ -121,6 +126,7 @@ func TestWriteResourceTypeAndLabels(t *testing.T) {
ResourceLabels: map[string]string{ ResourceLabels: map[string]string{
"mylabel": "myvalue", "mylabel": "myvalue",
}, },
Log: testutil.Logger{},
client: c, client: c,
} }
@ -149,6 +155,7 @@ func TestWriteAscendingTime(t *testing.T) {
s := &Stackdriver{ s := &Stackdriver{
Project: fmt.Sprintf("projects/%s", "[PROJECT]"), Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
Namespace: "test", Namespace: "test",
Log: testutil.Logger{},
client: c, client: c,
} }
@ -221,6 +228,7 @@ func TestWriteBatchable(t *testing.T) {
s := &Stackdriver{ s := &Stackdriver{
Project: fmt.Sprintf("projects/%s", "[PROJECT]"), Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
Namespace: "test", Namespace: "test",
Log: testutil.Logger{},
client: c, client: c,
} }
@ -398,6 +406,7 @@ func TestWriteIgnoredErrors(t *testing.T) {
s := &Stackdriver{ s := &Stackdriver{
Project: fmt.Sprintf("projects/%s", "[PROJECT]"), Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
Namespace: "test", Namespace: "test",
Log: testutil.Logger{},
client: c, client: c,
} }
@ -431,6 +440,10 @@ func TestGetStackdriverLabels(t *testing.T) {
{Key: "valuequota", Value: "icym5wcpejnhljcvy2vwk15svmhrtueoppwlvix61vlbaeedufn1g6u4jgwjoekwew9s2dboxtgrkiyuircnl8h1lbzntt9gzcf60qunhxurhiz0g2bynzy1v6eyn4ravndeiiugobsrsj2bfaguahg4gxn7nx4irwfknunhkk6jdlldevawj8levebjajcrcbeugewd14fa8o34ycfwx2ymalyeqxhfqrsksxnii2deqq6cghrzi6qzwmittkzdtye3imoygqmjjshiskvnzz1e4ipd9c6wfor5jsygn1kvcg6jm4clnsl1fnxotbei9xp4swrkjpgursmfmkyvxcgq9hoy435nwnolo3ipnvdlhk6pmlzpdjn6gqi3v9gv7jn5ro2p1t5ufxzfsvqq1fyrgoi7gvmttil1banh3cftkph1dcoaqfhl7y0wkvhwwvrmslmmxp1wedyn8bacd7akmjgfwdvcmrymbzvmrzfvq1gs1xnmmg8rsfxci2h6r1ralo3splf4f3bdg4c7cy0yy9qbxzxhcmdpwekwc7tdjs8uj6wmofm2aor4hum8nwyfwwlxy3yvsnbjy32oucsrmhcnu6l2i8laujkrhvsr9fcix5jflygznlydbqw5uhw1rg1g5wiihqumwmqgggemzoaivm3ut41vjaff4uqtqyuhuwblmuiphfkd7si49vgeeswzg7tpuw0oxmkesgibkcjtev2h9ouxzjs3eb71jffhdacyiuyhuxwvm5bnrjewbm4x2kmhgbirz3eoj7ijgplggdkx5vixufg65ont8zi1jabsuxx0vsqgprunwkugqkxg2r7iy6fmgs4lob4dlseinowkst6gp6x1ejreauyzjz7atzm3hbmr5rbynuqp4lxrnhhcbuoun69mavvaaki0bdz5ybmbbbz5qdv0odtpjo2aezat5uosjuhzbvic05jlyclikynjgfhencdkz3qcqzbzhnsynj1zdke0sk4zfpvfyryzsxv9pu0qm"}, {Key: "valuequota", Value: "icym5wcpejnhljcvy2vwk15svmhrtueoppwlvix61vlbaeedufn1g6u4jgwjoekwew9s2dboxtgrkiyuircnl8h1lbzntt9gzcf60qunhxurhiz0g2bynzy1v6eyn4ravndeiiugobsrsj2bfaguahg4gxn7nx4irwfknunhkk6jdlldevawj8levebjajcrcbeugewd14fa8o34ycfwx2ymalyeqxhfqrsksxnii2deqq6cghrzi6qzwmittkzdtye3imoygqmjjshiskvnzz1e4ipd9c6wfor5jsygn1kvcg6jm4clnsl1fnxotbei9xp4swrkjpgursmfmkyvxcgq9hoy435nwnolo3ipnvdlhk6pmlzpdjn6gqi3v9gv7jn5ro2p1t5ufxzfsvqq1fyrgoi7gvmttil1banh3cftkph1dcoaqfhl7y0wkvhwwvrmslmmxp1wedyn8bacd7akmjgfwdvcmrymbzvmrzfvq1gs1xnmmg8rsfxci2h6r1ralo3splf4f3bdg4c7cy0yy9qbxzxhcmdpwekwc7tdjs8uj6wmofm2aor4hum8nwyfwwlxy3yvsnbjy32oucsrmhcnu6l2i8laujkrhvsr9fcix5jflygznlydbqw5uhw1rg1g5wiihqumwmqgggemzoaivm3ut41vjaff4uqtqyuhuwblmuiphfkd7si49vgeeswzg7tpuw0oxmkesgibkcjtev2h9ouxzjs3eb71jffhdacyiuyhuxwvm5bnrjewbm4x2kmhgbirz3eoj7ijgplggdkx5vixufg65ont8zi1jabsuxx0vsqgprunwkugqkxg2r7iy6fmgs4lob4dlseinowkst6gp6x1ejreauyzjz7atzm3hbmr5rbynuqp4lxrnhhcbuoun69mavvaaki0bdz5ybmbbbz5qdv0odtpjo2aezat5uosjuhzbvic05jlyclikynjgfhencdkz3qcqzbzhnsynj1zdke0sk4zfpvfyryzsxv9pu0qm"},
} }
labels := getStackdriverLabels(tags) s := &Stackdriver{
Log: testutil.Logger{},
}
labels := s.getStackdriverLabels(tags)
require.Equal(t, QuotaLabelsPerMetricDescriptor, len(labels)) require.Equal(t, QuotaLabelsPerMetricDescriptor, len(labels))
} }

View File

@ -3,7 +3,6 @@ package sumologic
import ( import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"log"
"net/http" "net/http"
"time" "time"
@ -198,19 +197,19 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
return s.writeRequestChunks(chunks) return s.writeRequestChunks(chunks)
} }
return s.write(reqBody) return s.writeRequestChunk(reqBody)
} }
func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error { func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error {
for _, reqChunk := range chunks { for _, reqChunk := range chunks {
if err := s.write(reqChunk); err != nil { if err := s.writeRequestChunk(reqChunk); err != nil {
s.Log.Errorf("Error sending chunk: %v", err) s.Log.Errorf("Error sending chunk: %v", err)
} }
} }
return nil return nil
} }
func (s *SumoLogic) write(reqBody []byte) error { func (s *SumoLogic) writeRequestChunk(reqBody []byte) error {
var ( var (
err error err error
buff bytes.Buffer buff bytes.Buffer
@ -284,32 +283,32 @@ func (s *SumoLogic) splitIntoChunks(metrics []telegraf.Metric) ([][]byte, error)
if la+len(chunkBody) > int(s.MaxRequstBodySize) { if la+len(chunkBody) > int(s.MaxRequstBodySize) {
// ... and it's just the right size, without currently processed chunk. // ... and it's just the right size, without currently processed chunk.
break break
} else { }
// ... we can try appending more. // ... we can try appending more.
i++ i++
toAppend = append(toAppend, chunkBody...) toAppend = append(toAppend, chunkBody...)
continue continue
} }
} else { // la == 0
// la == 0
i++ i++
toAppend = chunkBody toAppend = chunkBody
if len(chunkBody) > int(s.MaxRequstBodySize) { if len(chunkBody) > int(s.MaxRequstBodySize) {
log.Printf( s.Log.Warnf(
"W! [SumoLogic] max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split", "max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split",
s.MaxRequstBodySize, len(chunkBody), s.MaxRequstBodySize, len(chunkBody),
) )
// The serialized metric is too big but we have no choice // The serialized metric is too big, but we have no choice
// but to send it. // but to send it.
// max_request_body_size was set so small that it wouldn't // max_request_body_size was set so small that it wouldn't
// even accomodate a single metric. // even accommodate a single metric.
break break
} }
continue continue
} }
}
if toAppend == nil { if toAppend == nil {
break break

View File

@ -5,6 +5,7 @@ import (
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"fmt" "fmt"
"github.com/influxdata/telegraf/testutil"
"io" "io"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -13,7 +14,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -25,7 +25,7 @@ import (
"github.com/influxdata/telegraf/plugins/serializers/prometheus" "github.com/influxdata/telegraf/plugins/serializers/prometheus"
) )
func getMetric(t *testing.T) telegraf.Metric { func getMetric() telegraf.Metric {
m := metric.New( m := metric.New(
"cpu", "cpu",
map[string]string{}, map[string]string{},
@ -37,7 +37,7 @@ func getMetric(t *testing.T) telegraf.Metric {
return m return m
} }
func getMetrics(t *testing.T) []telegraf.Metric { func getMetrics() []telegraf.Metric {
const count = 100 const count = 100
var metrics = make([]telegraf.Metric, count) var metrics = make([]telegraf.Metric, count)
@ -105,7 +105,7 @@ func TestMethod(t *testing.T) {
} }
require.NoError(t, err) require.NoError(t, err)
err = plugin.Write([]telegraf.Metric{getMetric(t)}) err = plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err) require.NoError(t, err)
}) })
} }
@ -177,7 +177,7 @@ func TestStatusCode(t *testing.T) {
err = tt.plugin.Connect() err = tt.plugin.Connect()
require.NoError(t, err) require.NoError(t, err)
err = tt.plugin.Write([]telegraf.Metric{getMetric(t)}) err = tt.plugin.Write([]telegraf.Metric{getMetric()})
tt.errFunc(t, err) tt.errFunc(t, err)
}) })
} }
@ -247,7 +247,8 @@ func TestContentType(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gz, err := gzip.NewReader(r.Body) gz, err := gzip.NewReader(r.Body)
require.NoError(t, err) require.NoError(t, err)
io.Copy(&body, gz) _, err = io.Copy(&body, gz)
require.NoError(t, err)
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
})) }))
defer ts.Close() defer ts.Close()
@ -260,7 +261,7 @@ func TestContentType(t *testing.T) {
require.NoError(t, plugin.Connect()) require.NoError(t, plugin.Connect())
err = plugin.Write([]telegraf.Metric{getMetric(t)}) err = plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err) require.NoError(t, err)
if tt.expectedBody != nil { if tt.expectedBody != nil {
@ -302,7 +303,7 @@ func TestContentEncodingGzip(t *testing.T) {
payload, err := io.ReadAll(body) payload, err := io.ReadAll(body)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, string(payload), "metric=cpu field=value 42 0\n") require.Equal(t, string(payload), "metric=cpu field=value 42 0\n")
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
}) })
@ -316,14 +317,12 @@ func TestContentEncodingGzip(t *testing.T) {
err = plugin.Connect() err = plugin.Connect()
require.NoError(t, err) require.NoError(t, err)
err = plugin.Write([]telegraf.Metric{getMetric(t)}) err = plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err) require.NoError(t, err)
}) })
} }
} }
type TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
func TestDefaultUserAgent(t *testing.T) { func TestDefaultUserAgent(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler()) ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close() defer ts.Close()
@ -349,7 +348,7 @@ func TestDefaultUserAgent(t *testing.T) {
err = plugin.Connect() err = plugin.Connect()
require.NoError(t, err) require.NoError(t, err)
err = plugin.Write([]telegraf.Metric{getMetric(t)}) err = plugin.Write([]telegraf.Metric{getMetric()})
require.NoError(t, err) require.NoError(t, err)
}) })
} }
@ -463,7 +462,7 @@ func TestMaxRequestBodySize(t *testing.T) {
s.URL = u.String() s.URL = u.String()
return s return s
}, },
metrics: []telegraf.Metric{getMetric(t)}, metrics: []telegraf.Metric{getMetric()},
expectedError: false, expectedError: false,
expectedRequestCount: 1, expectedRequestCount: 1,
expectedMetricLinesCount: 1, expectedMetricLinesCount: 1,
@ -475,7 +474,7 @@ func TestMaxRequestBodySize(t *testing.T) {
s.URL = u.String() s.URL = u.String()
return s return s
}, },
metrics: getMetrics(t), metrics: getMetrics(),
expectedError: false, expectedError: false,
expectedRequestCount: 1, expectedRequestCount: 1,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
@ -490,7 +489,7 @@ func TestMaxRequestBodySize(t *testing.T) {
s.MaxRequstBodySize = 43_749 s.MaxRequstBodySize = 43_749
return s return s
}, },
metrics: getMetrics(t), metrics: getMetrics(),
expectedError: false, expectedError: false,
expectedRequestCount: 2, expectedRequestCount: 2,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
@ -503,7 +502,7 @@ func TestMaxRequestBodySize(t *testing.T) {
s.MaxRequstBodySize = 10_000 s.MaxRequstBodySize = 10_000
return s return s
}, },
metrics: getMetrics(t), metrics: getMetrics(),
expectedError: false, expectedError: false,
expectedRequestCount: 5, expectedRequestCount: 5,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
@ -516,7 +515,7 @@ func TestMaxRequestBodySize(t *testing.T) {
s.MaxRequstBodySize = 5_000 s.MaxRequstBodySize = 5_000
return s return s
}, },
metrics: getMetrics(t), metrics: getMetrics(),
expectedError: false, expectedError: false,
expectedRequestCount: 10, expectedRequestCount: 10,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
@ -529,7 +528,7 @@ func TestMaxRequestBodySize(t *testing.T) {
s.MaxRequstBodySize = 2_500 s.MaxRequstBodySize = 2_500
return s return s
}, },
metrics: getMetrics(t), metrics: getMetrics(),
expectedError: false, expectedError: false,
expectedRequestCount: 20, expectedRequestCount: 20,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
@ -542,7 +541,7 @@ func TestMaxRequestBodySize(t *testing.T) {
s.MaxRequstBodySize = 1_000 s.MaxRequstBodySize = 1_000
return s return s
}, },
metrics: getMetrics(t), metrics: getMetrics(),
expectedError: false, expectedError: false,
expectedRequestCount: 50, expectedRequestCount: 50,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
@ -555,7 +554,7 @@ func TestMaxRequestBodySize(t *testing.T) {
s.MaxRequstBodySize = 500 s.MaxRequstBodySize = 500
return s return s
}, },
metrics: getMetrics(t), metrics: getMetrics(),
expectedError: false, expectedError: false,
expectedRequestCount: 100, expectedRequestCount: 100,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
@ -568,7 +567,7 @@ func TestMaxRequestBodySize(t *testing.T) {
s.MaxRequstBodySize = 300 s.MaxRequstBodySize = 300
return s return s
}, },
metrics: getMetrics(t), metrics: getMetrics(),
expectedError: false, expectedError: false,
expectedRequestCount: 100, expectedRequestCount: 100,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
@ -596,6 +595,7 @@ func TestMaxRequestBodySize(t *testing.T) {
plugin := tt.plugin() plugin := tt.plugin()
plugin.SetSerializer(serializer) plugin.SetSerializer(serializer)
plugin.Log = testutil.Logger{}
err = plugin.Connect() err = plugin.Connect()
require.NoError(t, err) require.NoError(t, err)

View File

@ -3,7 +3,6 @@ package syslog
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"log"
"net" "net"
"strconv" "strconv"
"strings" "strings"
@ -11,6 +10,7 @@ import (
"github.com/influxdata/go-syslog/v3/nontransparent" "github.com/influxdata/go-syslog/v3/nontransparent"
"github.com/influxdata/go-syslog/v3/rfc5424" "github.com/influxdata/go-syslog/v3/rfc5424"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
framing "github.com/influxdata/telegraf/internal/syslog" framing "github.com/influxdata/telegraf/internal/syslog"
@ -29,6 +29,7 @@ type Syslog struct {
Separator string `toml:"sdparam_separator"` Separator string `toml:"sdparam_separator"`
Framing framing.Framing Framing framing.Framing
Trailer nontransparent.TrailerType Trailer nontransparent.TrailerType
Log telegraf.Logger `toml:"-"`
net.Conn net.Conn
tlsint.ClientConfig tlsint.ClientConfig
mapper *SyslogMapper mapper *SyslogMapper
@ -135,7 +136,7 @@ func (s *Syslog) Connect() error {
} }
if err := s.setKeepAlive(c); err != nil { if err := s.setKeepAlive(c); err != nil {
log.Printf("unable to configure keep alive (%s): %s", s.Address, err) s.Log.Warnf("unable to configure keep alive (%s): %s", s.Address, err)
} }
s.Conn = c s.Conn = c
@ -186,17 +187,17 @@ func (s *Syslog) Write(metrics []telegraf.Metric) (err error) {
for _, metric := range metrics { for _, metric := range metrics {
var msg *rfc5424.SyslogMessage var msg *rfc5424.SyslogMessage
if msg, err = s.mapper.MapMetricToSyslogMessage(metric); err != nil { if msg, err = s.mapper.MapMetricToSyslogMessage(metric); err != nil {
log.Printf("E! [outputs.syslog] Failed to create syslog message: %v", err) s.Log.Errorf("Failed to create syslog message: %v", err)
continue continue
} }
var msgBytesWithFraming []byte var msgBytesWithFraming []byte
if msgBytesWithFraming, err = s.getSyslogMessageBytesWithFraming(msg); err != nil { if msgBytesWithFraming, err = s.getSyslogMessageBytesWithFraming(msg); err != nil {
log.Printf("E! [outputs.syslog] Failed to convert syslog message with framing: %v", err) s.Log.Errorf("Failed to convert syslog message with framing: %v", err)
continue continue
} }
if _, err = s.Conn.Write(msgBytesWithFraming); err != nil { if _, err = s.Conn.Write(msgBytesWithFraming); err != nil {
if netErr, ok := err.(net.Error); !ok || !netErr.Temporary() { if netErr, ok := err.(net.Error); !ok || !netErr.Temporary() {
s.Close() s.Close() //nolint:revive // There is another error which will be returned here
s.Conn = nil s.Conn = nil
return fmt.Errorf("closing connection: %v", netErr) return fmt.Errorf("closing connection: %v", netErr)
} }

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/influxdata/go-syslog/v3/rfc5424" "github.com/influxdata/go-syslog/v3/rfc5424"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
@ -90,8 +91,7 @@ func mapMsgID(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
func mapVersion(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { func mapVersion(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
if value, ok := metric.GetField("version"); ok { if value, ok := metric.GetField("version"); ok {
switch v := value.(type) { if v, ok := value.(uint64); ok {
case uint64:
msg.SetVersion(uint16(v)) msg.SetVersion(uint16(v))
return return
} }
@ -142,9 +142,9 @@ func mapHostname(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
func mapTimestamp(metric telegraf.Metric, msg *rfc5424.SyslogMessage) { func mapTimestamp(metric telegraf.Metric, msg *rfc5424.SyslogMessage) {
timestamp := metric.Time() timestamp := metric.Time()
//nolint: revive // Need switch with only one case to handle `.(type)`
if value, ok := metric.GetField("timestamp"); ok { if value, ok := metric.GetField("timestamp"); ok {
switch v := value.(type) { if v, ok := value.(int64); ok {
case int64:
timestamp = time.Unix(0, v).UTC() timestamp = time.Unix(0, v).UTC()
} }
} }

View File

@ -5,9 +5,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/metric"
) )
func TestSyslogMapperWithDefaults(t *testing.T) { func TestSyslogMapperWithDefaults(t *testing.T) {
@ -22,11 +22,11 @@ func TestSyslogMapperWithDefaults(t *testing.T) {
time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC),
) )
hostname, err := os.Hostname() hostname, err := os.Hostname()
assert.NoError(t, err) require.NoError(t, err)
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
require.NoError(t, err) require.NoError(t, err)
str, _ := syslogMessage.String() str, _ := syslogMessage.String()
assert.Equal(t, "<13>1 2010-11-10T23:00:00Z "+hostname+" Telegraf - testmetric -", str, "Wrong syslog message") require.Equal(t, "<13>1 2010-11-10T23:00:00Z "+hostname+" Telegraf - testmetric -", str, "Wrong syslog message")
} }
func TestSyslogMapperWithHostname(t *testing.T) { func TestSyslogMapperWithHostname(t *testing.T) {
@ -47,7 +47,7 @@ func TestSyslogMapperWithHostname(t *testing.T) {
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
require.NoError(t, err) require.NoError(t, err)
str, _ := syslogMessage.String() str, _ := syslogMessage.String()
assert.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", str, "Wrong syslog message") require.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", str, "Wrong syslog message")
} }
func TestSyslogMapperWithHostnameSourceFallback(t *testing.T) { func TestSyslogMapperWithHostnameSourceFallback(t *testing.T) {
s := newSyslog() s := newSyslog()
@ -66,7 +66,7 @@ func TestSyslogMapperWithHostnameSourceFallback(t *testing.T) {
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
require.NoError(t, err) require.NoError(t, err)
str, _ := syslogMessage.String() str, _ := syslogMessage.String()
assert.Equal(t, "<13>1 2010-11-10T23:00:00Z sourcevalue Telegraf - testmetric -", str, "Wrong syslog message") require.Equal(t, "<13>1 2010-11-10T23:00:00Z sourcevalue Telegraf - testmetric -", str, "Wrong syslog message")
} }
func TestSyslogMapperWithHostnameHostFallback(t *testing.T) { func TestSyslogMapperWithHostnameHostFallback(t *testing.T) {
@ -85,7 +85,7 @@ func TestSyslogMapperWithHostnameHostFallback(t *testing.T) {
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
require.NoError(t, err) require.NoError(t, err)
str, _ := syslogMessage.String() str, _ := syslogMessage.String()
assert.Equal(t, "<13>1 2010-11-10T23:00:00Z hostvalue Telegraf - testmetric -", str, "Wrong syslog message") require.Equal(t, "<13>1 2010-11-10T23:00:00Z hostvalue Telegraf - testmetric -", str, "Wrong syslog message")
} }
func TestSyslogMapperWithDefaultSdid(t *testing.T) { func TestSyslogMapperWithDefaultSdid(t *testing.T) {
@ -120,7 +120,7 @@ func TestSyslogMapperWithDefaultSdid(t *testing.T) {
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
require.NoError(t, err) require.NoError(t, err)
str, _ := syslogMessage.String() str, _ := syslogMessage.String()
assert.Equal(t, "<27>2 2010-11-10T23:30:00Z testhost testapp 25 555 [default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"foo\" value3=\"1.2\"] Test message", str, "Wrong syslog message") require.Equal(t, "<27>2 2010-11-10T23:30:00Z testhost testapp 25 555 [default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"foo\" value3=\"1.2\"] Test message", str, "Wrong syslog message")
} }
func TestSyslogMapperWithDefaultSdidAndOtherSdids(t *testing.T) { func TestSyslogMapperWithDefaultSdidAndOtherSdids(t *testing.T) {
@ -158,7 +158,7 @@ func TestSyslogMapperWithDefaultSdidAndOtherSdids(t *testing.T) {
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
require.NoError(t, err) require.NoError(t, err)
str, _ := syslogMessage.String() str, _ := syslogMessage.String()
assert.Equal(t, "<25>2 2010-11-10T23:30:00Z testhost testapp 25 555 [bar@123 tag3=\"barfoobar\" value3=\"2\"][default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"default\"][foo@456 value4=\"foo\"] Test message", str, "Wrong syslog message") require.Equal(t, "<25>2 2010-11-10T23:30:00Z testhost testapp 25 555 [bar@123 tag3=\"barfoobar\" value3=\"2\"][default@32473 tag1=\"bar\" tag2=\"foobar\" value1=\"2\" value2=\"default\"][foo@456 value4=\"foo\"] Test message", str, "Wrong syslog message")
} }
func TestSyslogMapperWithNoSdids(t *testing.T) { func TestSyslogMapperWithNoSdids(t *testing.T) {
@ -196,5 +196,5 @@ func TestSyslogMapperWithNoSdids(t *testing.T) {
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1) syslogMessage, err := s.mapper.MapMetricToSyslogMessage(m1)
require.NoError(t, err) require.NoError(t, err)
str, _ := syslogMessage.String() str, _ := syslogMessage.String()
assert.Equal(t, "<26>2 2010-11-10T23:30:00Z testhost testapp 25 555 - Test message", str, "Wrong syslog message") require.Equal(t, "<26>2 2010-11-10T23:30:00Z testhost testapp 25 555 - Test message", str, "Wrong syslog message")
} }

View File

@ -6,12 +6,12 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
framing "github.com/influxdata/telegraf/internal/syslog" framing "github.com/influxdata/telegraf/internal/syslog"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestGetSyslogMessageWithFramingOctectCounting(t *testing.T) { func TestGetSyslogMessageWithFramingOctectCounting(t *testing.T) {
@ -34,7 +34,7 @@ func TestGetSyslogMessageWithFramingOctectCounting(t *testing.T) {
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage) messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "59 <13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", string(messageBytesWithFraming), "Incorrect Octect counting framing") require.Equal(t, "59 <13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -", string(messageBytesWithFraming), "Incorrect Octect counting framing")
} }
func TestGetSyslogMessageWithFramingNonTransparent(t *testing.T) { func TestGetSyslogMessageWithFramingNonTransparent(t *testing.T) {
@ -58,7 +58,7 @@ func TestGetSyslogMessageWithFramingNonTransparent(t *testing.T) {
messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage) messageBytesWithFraming, err := s.getSyslogMessageBytesWithFraming(syslogMessage)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -\x00", string(messageBytesWithFraming), "Incorrect Octect counting framing") require.Equal(t, "<13>1 2010-11-10T23:00:00Z testhost Telegraf - testmetric -\x00", string(messageBytesWithFraming), "Incorrect Octect counting framing")
} }
func TestSyslogWriteWithTcp(t *testing.T) { func TestSyslogWriteWithTcp(t *testing.T) {
@ -110,7 +110,7 @@ func testSyslogWriteWithStream(t *testing.T, s *Syslog, lconn net.Conn) {
buf := make([]byte, 256) buf := make([]byte, 256)
n, err := lconn.Read(buf) n, err := lconn.Read(buf)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, string(messageBytesWithFraming), string(buf[:n])) require.Equal(t, string(messageBytesWithFraming), string(buf[:n]))
} }
func testSyslogWriteWithPacket(t *testing.T, s *Syslog, lconn net.PacketConn) { func testSyslogWriteWithPacket(t *testing.T, s *Syslog, lconn net.PacketConn) {
@ -134,7 +134,7 @@ func testSyslogWriteWithPacket(t *testing.T, s *Syslog, lconn net.PacketConn) {
buf := make([]byte, 256) buf := make([]byte, 256)
n, _, err := lconn.ReadFrom(buf) n, _, err := lconn.ReadFrom(buf)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, string(messageBytesWithFraming), string(buf[:n])) require.Equal(t, string(messageBytesWithFraming), string(buf[:n]))
} }
func TestSyslogWriteErr(t *testing.T) { func TestSyslogWriteErr(t *testing.T) {
@ -146,20 +146,26 @@ func TestSyslogWriteErr(t *testing.T) {
err = s.Connect() err = s.Connect()
require.NoError(t, err) require.NoError(t, err)
s.Conn.(*net.TCPConn).SetReadBuffer(256) err = s.Conn.(*net.TCPConn).SetReadBuffer(256)
require.NoError(t, err)
lconn, err := listener.Accept() lconn, err := listener.Accept()
require.NoError(t, err) require.NoError(t, err)
lconn.(*net.TCPConn).SetWriteBuffer(256) err = lconn.(*net.TCPConn).SetWriteBuffer(256)
require.NoError(t, err)
metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")} metrics := []telegraf.Metric{testutil.TestMetric(1, "testerr")}
// close the socket to generate an error // close the socket to generate an error
lconn.Close() err = lconn.Close()
s.Conn.Close() require.NoError(t, err)
err = s.Conn.Close()
require.NoError(t, err)
err = s.Write(metrics) err = s.Write(metrics)
require.Error(t, err) require.Error(t, err)
assert.Nil(t, s.Conn) require.Nil(t, s.Conn)
} }
func TestSyslogWriteReconnect(t *testing.T) { func TestSyslogWriteReconnect(t *testing.T) {
@ -171,12 +177,15 @@ func TestSyslogWriteReconnect(t *testing.T) {
err = s.Connect() err = s.Connect()
require.NoError(t, err) require.NoError(t, err)
s.Conn.(*net.TCPConn).SetReadBuffer(256) err = s.Conn.(*net.TCPConn).SetReadBuffer(256)
require.NoError(t, err)
lconn, err := listener.Accept() lconn, err := listener.Accept()
require.NoError(t, err) require.NoError(t, err)
lconn.(*net.TCPConn).SetWriteBuffer(256) err = lconn.(*net.TCPConn).SetWriteBuffer(256)
lconn.Close() require.NoError(t, err)
err = lconn.Close()
require.NoError(t, err)
s.Conn = nil s.Conn = nil
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@ -192,7 +201,7 @@ func TestSyslogWriteReconnect(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
wg.Wait() wg.Wait()
assert.NoError(t, lerr) require.NoError(t, lerr)
syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0]) syslogMessage, err := s.mapper.MapMetricToSyslogMessage(metrics[0])
require.NoError(t, err) require.NoError(t, err)
@ -201,5 +210,5 @@ func TestSyslogWriteReconnect(t *testing.T) {
buf := make([]byte, 256) buf := make([]byte, 256)
n, err := lconn.Read(buf) n, err := lconn.Read(buf)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, string(messageBytesWithFraming), string(buf[:n])) require.Equal(t, string(messageBytesWithFraming), string(buf[:n]))
} }

View File

@ -10,14 +10,14 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite" "github.com/aws/aws-sdk-go-v2/service/timestreamwrite"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types"
"github.com/aws/smithy-go" "github.com/aws/smithy-go"
"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/config/aws" internalaws "github.com/influxdata/telegraf/config/aws"
"github.com/influxdata/telegraf/plugins/outputs"
) )
type ( type (
@ -332,12 +332,12 @@ func (t *Timestream) logWriteToTimestreamError(err error, tableName *string) {
func (t *Timestream) createTableAndRetry(writeRecordsInput *timestreamwrite.WriteRecordsInput) error { func (t *Timestream) createTableAndRetry(writeRecordsInput *timestreamwrite.WriteRecordsInput) error {
if t.CreateTableIfNotExists { if t.CreateTableIfNotExists {
t.Log.Infof("Trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'true'.", *writeRecordsInput.TableName, t.DatabaseName) t.Log.Infof("Trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'true'.", *writeRecordsInput.TableName, t.DatabaseName)
if err := t.createTable(writeRecordsInput.TableName); err != nil { err := t.createTable(writeRecordsInput.TableName)
t.Log.Errorf("Failed to create table '%s' in database '%s': %s. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName, err) if err == nil {
} else {
t.Log.Infof("Table '%s' in database '%s' created. Retrying writing.", *writeRecordsInput.TableName, t.DatabaseName) t.Log.Infof("Table '%s' in database '%s' created. Retrying writing.", *writeRecordsInput.TableName, t.DatabaseName)
return t.writeToTimestream(writeRecordsInput, false) return t.writeToTimestream(writeRecordsInput, false)
} }
t.Log.Errorf("Failed to create table '%s' in database '%s': %s. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName, err)
} else { } else {
t.Log.Errorf("Not trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'false'. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName) t.Log.Errorf("Not trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'false'. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName)
} }
@ -434,22 +434,22 @@ func (t *Timestream) TransformMetrics(metrics []telegraf.Metric) []*timestreamwr
func hashFromMetricTimeNameTagKeys(m telegraf.Metric) uint64 { func hashFromMetricTimeNameTagKeys(m telegraf.Metric) uint64 {
h := fnv.New64a() h := fnv.New64a()
h.Write([]byte(m.Name())) h.Write([]byte(m.Name())) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte("\n")) h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
for _, tag := range m.TagList() { for _, tag := range m.TagList() {
if tag.Key == "" { if tag.Key == "" {
continue continue
} }
h.Write([]byte(tag.Key)) h.Write([]byte(tag.Key)) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte("\n")) h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte(tag.Value)) h.Write([]byte(tag.Value)) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte("\n")) h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
} }
b := make([]byte, binary.MaxVarintLen64) b := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(b, uint64(m.Time().UnixNano())) n := binary.PutUvarint(b, uint64(m.Time().UnixNano()))
h.Write(b[:n]) h.Write(b[:n]) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte("\n")) h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
return h.Sum64() return h.Sum64()
} }
@ -537,7 +537,7 @@ func getTimestreamTime(t time.Time) (timeUnit types.TimeUnit, timeValue string)
timeUnit = types.TimeUnitNanoseconds timeUnit = types.TimeUnitNanoseconds
timeValue = strconv.FormatInt(nanosTime, 10) timeValue = strconv.FormatInt(nanosTime, 10)
} }
return return timeUnit, timeValue
} }
// convertValue converts single Field value from Telegraf Metric and produces // convertValue converts single Field value from Telegraf Metric and produces
@ -595,7 +595,7 @@ func convertValue(v interface{}) (value string, valueType types.MeasureValueType
default: default:
// Skip unsupported type. // Skip unsupported type.
ok = false ok = false
return return value, valueType, ok
} }
return return value, valueType, ok
} }

View File

@ -4,40 +4,36 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types"
"github.com/stretchr/testify/assert"
) )
func TestGetTimestreamTime(t *testing.T) { func TestGetTimestreamTime(t *testing.T) {
assertions := assert.New(t)
tWithNanos := time.Date(2020, time.November, 10, 23, 44, 20, 123, time.UTC) tWithNanos := time.Date(2020, time.November, 10, 23, 44, 20, 123, time.UTC)
tWithMicros := time.Date(2020, time.November, 10, 23, 44, 20, 123000, time.UTC) tWithMicros := time.Date(2020, time.November, 10, 23, 44, 20, 123000, time.UTC)
tWithMillis := time.Date(2020, time.November, 10, 23, 44, 20, 123000000, time.UTC) tWithMillis := time.Date(2020, time.November, 10, 23, 44, 20, 123000000, time.UTC)
tOnlySeconds := time.Date(2020, time.November, 10, 23, 44, 20, 0, time.UTC) tOnlySeconds := time.Date(2020, time.November, 10, 23, 44, 20, 0, time.UTC)
tUnitNanos, tValueNanos := getTimestreamTime(tWithNanos) tUnitNanos, tValueNanos := getTimestreamTime(tWithNanos)
assertions.Equal(types.TimeUnitNanoseconds, tUnitNanos) require.Equal(t, types.TimeUnitNanoseconds, tUnitNanos)
assertions.Equal("1605051860000000123", tValueNanos) require.Equal(t, "1605051860000000123", tValueNanos)
tUnitMicros, tValueMicros := getTimestreamTime(tWithMicros) tUnitMicros, tValueMicros := getTimestreamTime(tWithMicros)
assertions.Equal(types.TimeUnitMicroseconds, tUnitMicros) require.Equal(t, types.TimeUnitMicroseconds, tUnitMicros)
assertions.Equal("1605051860000123", tValueMicros) require.Equal(t, "1605051860000123", tValueMicros)
tUnitMillis, tValueMillis := getTimestreamTime(tWithMillis) tUnitMillis, tValueMillis := getTimestreamTime(tWithMillis)
assertions.Equal(types.TimeUnitMilliseconds, tUnitMillis) require.Equal(t, types.TimeUnitMilliseconds, tUnitMillis)
assertions.Equal("1605051860123", tValueMillis) require.Equal(t, "1605051860123", tValueMillis)
tUnitSeconds, tValueSeconds := getTimestreamTime(tOnlySeconds) tUnitSeconds, tValueSeconds := getTimestreamTime(tOnlySeconds)
assertions.Equal(types.TimeUnitSeconds, tUnitSeconds) require.Equal(t, types.TimeUnitSeconds, tUnitSeconds)
assertions.Equal("1605051860", tValueSeconds) require.Equal(t, "1605051860", tValueSeconds)
} }
func TestPartitionRecords(t *testing.T) { func TestPartitionRecords(t *testing.T) {
assertions := assert.New(t)
testDatum := types.Record{ testDatum := types.Record{
MeasureName: aws.String("Foo"), MeasureName: aws.String("Foo"),
MeasureValueType: types.MeasureValueTypeDouble, MeasureValueType: types.MeasureValueTypeDouble,
@ -49,11 +45,11 @@ func TestPartitionRecords(t *testing.T) {
twoDatum := []types.Record{testDatum, testDatum} twoDatum := []types.Record{testDatum, testDatum}
threeDatum := []types.Record{testDatum, testDatum, testDatum} threeDatum := []types.Record{testDatum, testDatum, testDatum}
assertions.Equal([][]types.Record{}, partitionRecords(2, zeroDatum)) require.Equal(t, [][]types.Record{}, partitionRecords(2, zeroDatum))
assertions.Equal([][]types.Record{oneDatum}, partitionRecords(2, oneDatum)) require.Equal(t, [][]types.Record{oneDatum}, partitionRecords(2, oneDatum))
assertions.Equal([][]types.Record{oneDatum}, partitionRecords(2, oneDatum)) require.Equal(t, [][]types.Record{oneDatum}, partitionRecords(2, oneDatum))
assertions.Equal([][]types.Record{twoDatum}, partitionRecords(2, twoDatum)) require.Equal(t, [][]types.Record{twoDatum}, partitionRecords(2, twoDatum))
assertions.Equal([][]types.Record{twoDatum, oneDatum}, partitionRecords(2, threeDatum)) require.Equal(t, [][]types.Record{twoDatum, oneDatum}, partitionRecords(2, threeDatum))
} }
func TestConvertValueSupported(t *testing.T) { func TestConvertValueSupported(t *testing.T) {
@ -74,18 +70,16 @@ func TestConvertValueSupported(t *testing.T) {
} }
func TestConvertValueUnsupported(t *testing.T) { func TestConvertValueUnsupported(t *testing.T) {
assertions := assert.New(t)
_, _, ok := convertValue(time.Date(2020, time.November, 10, 23, 44, 20, 0, time.UTC)) _, _, ok := convertValue(time.Date(2020, time.November, 10, 23, 44, 20, 0, time.UTC))
assertions.False(ok, "Expected unsuccessful conversion") require.False(t, ok, "Expected unsuccessful conversion")
} }
func testConvertValueSupportedCases(t *testing.T, func testConvertValueSupportedCases(t *testing.T,
inputValues []interface{}, outputValues []string, outputValueTypes []types.MeasureValueType) { inputValues []interface{}, outputValues []string, outputValueTypes []types.MeasureValueType) {
assertions := assert.New(t)
for i, inputValue := range inputValues { for i, inputValue := range inputValues {
v, vt, ok := convertValue(inputValue) v, vt, ok := convertValue(inputValue)
assertions.Equal(true, ok, "Expected successful conversion") require.Equal(t, true, ok, "Expected successful conversion")
assertions.Equal(outputValues[i], v, "Expected different string representation of converted value") require.Equal(t, outputValues[i], v, "Expected different string representation of converted value")
assertions.Equal(outputValueTypes[i], vt, "Expected different value type of converted value") require.Equal(t, outputValueTypes[i], vt, "Expected different value type of converted value")
} }
} }

View File

@ -13,11 +13,11 @@ import (
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite" "github.com/aws/aws-sdk-go-v2/service/timestreamwrite"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types" "github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/config/aws" internalaws "github.com/influxdata/telegraf/config/aws"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
) )
const tsDbName = "testDb" const tsDbName = "testDb"
@ -49,26 +49,25 @@ func (m *mockTimestreamClient) DescribeDatabase(context.Context, *timestreamwrit
} }
func TestConnectValidatesConfigParameters(t *testing.T) { func TestConnectValidatesConfigParameters(t *testing.T) {
assertions := assert.New(t)
WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) {
return &mockTimestreamClient{}, nil return &mockTimestreamClient{}, nil
} }
// checking base arguments // checking base arguments
noDatabaseName := Timestream{Log: testutil.Logger{}} noDatabaseName := Timestream{Log: testutil.Logger{}}
assertions.Contains(noDatabaseName.Connect().Error(), "DatabaseName") require.Contains(t, noDatabaseName.Connect().Error(), "DatabaseName")
noMappingMode := Timestream{ noMappingMode := Timestream{
DatabaseName: tsDbName, DatabaseName: tsDbName,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Contains(noMappingMode.Connect().Error(), "MappingMode") require.Contains(t, noMappingMode.Connect().Error(), "MappingMode")
incorrectMappingMode := Timestream{ incorrectMappingMode := Timestream{
DatabaseName: tsDbName, DatabaseName: tsDbName,
MappingMode: "foo", MappingMode: "foo",
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Contains(incorrectMappingMode.Connect().Error(), "single-table") require.Contains(t, incorrectMappingMode.Connect().Error(), "single-table")
// multi-table arguments // multi-table arguments
validMappingModeMultiTable := Timestream{ validMappingModeMultiTable := Timestream{
@ -76,7 +75,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
MappingMode: MappingModeMultiTable, MappingMode: MappingModeMultiTable,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Nil(validMappingModeMultiTable.Connect()) require.Nil(t, validMappingModeMultiTable.Connect())
singleTableNameWithMultiTable := Timestream{ singleTableNameWithMultiTable := Timestream{
DatabaseName: tsDbName, DatabaseName: tsDbName,
@ -84,7 +83,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
SingleTableName: testSingleTableName, SingleTableName: testSingleTableName,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Contains(singleTableNameWithMultiTable.Connect().Error(), "SingleTableName") require.Contains(t, singleTableNameWithMultiTable.Connect().Error(), "SingleTableName")
singleTableDimensionWithMultiTable := Timestream{ singleTableDimensionWithMultiTable := Timestream{
DatabaseName: tsDbName, DatabaseName: tsDbName,
@ -92,7 +91,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim, SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Contains(singleTableDimensionWithMultiTable.Connect().Error(), require.Contains(t, singleTableDimensionWithMultiTable.Connect().Error(),
"SingleTableDimensionNameForTelegrafMeasurementName") "SingleTableDimensionNameForTelegrafMeasurementName")
// single-table arguments // single-table arguments
@ -101,7 +100,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
MappingMode: MappingModeSingleTable, MappingMode: MappingModeSingleTable,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Contains(noTableNameMappingModeSingleTable.Connect().Error(), "SingleTableName") require.Contains(t, noTableNameMappingModeSingleTable.Connect().Error(), "SingleTableName")
noDimensionNameMappingModeSingleTable := Timestream{ noDimensionNameMappingModeSingleTable := Timestream{
DatabaseName: tsDbName, DatabaseName: tsDbName,
@ -109,7 +108,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
SingleTableName: testSingleTableName, SingleTableName: testSingleTableName,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Contains(noDimensionNameMappingModeSingleTable.Connect().Error(), require.Contains(t, noDimensionNameMappingModeSingleTable.Connect().Error(),
"SingleTableDimensionNameForTelegrafMeasurementName") "SingleTableDimensionNameForTelegrafMeasurementName")
validConfigurationMappingModeSingleTable := Timestream{ validConfigurationMappingModeSingleTable := Timestream{
@ -119,7 +118,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim, SingleTableDimensionNameForTelegrafMeasurementName: testSingleTableDim,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Nil(validConfigurationMappingModeSingleTable.Connect()) require.Nil(t, validConfigurationMappingModeSingleTable.Connect())
// create table arguments // create table arguments
createTableNoMagneticRetention := Timestream{ createTableNoMagneticRetention := Timestream{
@ -128,7 +127,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
CreateTableIfNotExists: true, CreateTableIfNotExists: true,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Contains(createTableNoMagneticRetention.Connect().Error(), require.Contains(t, createTableNoMagneticRetention.Connect().Error(),
"CreateTableMagneticStoreRetentionPeriodInDays") "CreateTableMagneticStoreRetentionPeriodInDays")
createTableNoMemoryRetention := Timestream{ createTableNoMemoryRetention := Timestream{
@ -138,7 +137,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
CreateTableMagneticStoreRetentionPeriodInDays: 3, CreateTableMagneticStoreRetentionPeriodInDays: 3,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Contains(createTableNoMemoryRetention.Connect().Error(), require.Contains(t, createTableNoMemoryRetention.Connect().Error(),
"CreateTableMemoryStoreRetentionPeriodInHours") "CreateTableMemoryStoreRetentionPeriodInHours")
createTableValid := Timestream{ createTableValid := Timestream{
@ -149,7 +148,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
CreateTableMemoryStoreRetentionPeriodInHours: 3, CreateTableMemoryStoreRetentionPeriodInHours: 3,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Nil(createTableValid.Connect()) require.Nil(t, createTableValid.Connect())
// describe table on start arguments // describe table on start arguments
describeTableInvoked := Timestream{ describeTableInvoked := Timestream{
@ -158,7 +157,7 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
DescribeDatabaseOnStart: true, DescribeDatabaseOnStart: true,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.Contains(describeTableInvoked.Connect().Error(), "hello from DescribeDatabase") require.Contains(t, describeTableInvoked.Connect().Error(), "hello from DescribeDatabase")
} }
type mockTimestreamErrorClient struct { type mockTimestreamErrorClient struct {
@ -176,7 +175,6 @@ func (m *mockTimestreamErrorClient) DescribeDatabase(context.Context, *timestrea
} }
func TestThrottlingErrorIsReturnedToTelegraf(t *testing.T) { func TestThrottlingErrorIsReturnedToTelegraf(t *testing.T) {
assertions := assert.New(t)
WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) {
return &mockTimestreamErrorClient{ return &mockTimestreamErrorClient{
ErrorToReturnOnWriteRecords: &types.ThrottlingException{Message: aws.String("Throttling Test")}, ErrorToReturnOnWriteRecords: &types.ThrottlingException{Message: aws.String("Throttling Test")},
@ -188,7 +186,7 @@ func TestThrottlingErrorIsReturnedToTelegraf(t *testing.T) {
DatabaseName: tsDbName, DatabaseName: tsDbName,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.NoError(plugin.Connect()) require.NoError(t, plugin.Connect())
input := testutil.MustMetric( input := testutil.MustMetric(
metricName1, metricName1,
map[string]string{"tag1": "value1"}, map[string]string{"tag1": "value1"},
@ -198,12 +196,11 @@ func TestThrottlingErrorIsReturnedToTelegraf(t *testing.T) {
err := plugin.Write([]telegraf.Metric{input}) err := plugin.Write([]telegraf.Metric{input})
assertions.NotNil(err, "Expected an error to be returned to Telegraf, "+ require.NotNil(t, err, "Expected an error to be returned to Telegraf, "+
"so that the write will be retried by Telegraf later.") "so that the write will be retried by Telegraf later.")
} }
func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) { func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) {
assertions := assert.New(t)
WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) {
return &mockTimestreamErrorClient{ return &mockTimestreamErrorClient{
ErrorToReturnOnWriteRecords: &types.RejectedRecordsException{Message: aws.String("RejectedRecords Test")}, ErrorToReturnOnWriteRecords: &types.RejectedRecordsException{Message: aws.String("RejectedRecords Test")},
@ -215,7 +212,7 @@ func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) {
DatabaseName: tsDbName, DatabaseName: tsDbName,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
assertions.NoError(plugin.Connect()) require.NoError(t, plugin.Connect())
input := testutil.MustMetric( input := testutil.MustMetric(
metricName1, metricName1,
map[string]string{"tag1": "value1"}, map[string]string{"tag1": "value1"},
@ -225,7 +222,7 @@ func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) {
err := plugin.Write([]telegraf.Metric{input}) err := plugin.Write([]telegraf.Metric{input})
assertions.Nil(err, "Expected to silently swallow the RejectedRecordsException, "+ require.Nil(t, err, "Expected to silently swallow the RejectedRecordsException, "+
"as retrying this error doesn't make sense.") "as retrying this error doesn't make sense.")
} }
@ -649,13 +646,11 @@ func comparisonTest(t *testing.T,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
} }
assertions := assert.New(t)
result := plugin.TransformMetrics(telegrafMetrics) result := plugin.TransformMetrics(telegrafMetrics)
assertions.Equal(len(timestreamRecords), len(result), "The number of transformed records was expected to be different") require.Equal(t, len(timestreamRecords), len(result), "The number of transformed records was expected to be different")
for _, tsRecord := range timestreamRecords { for _, tsRecord := range timestreamRecords {
assertions.True(arrayContains(result, tsRecord), "Expected that the list of requests to Timestream: \n%s\n\n "+ require.True(t, arrayContains(result, tsRecord), "Expected that the list of requests to Timestream: \n%s\n\n "+
"will contain request: \n%s\n\nUsed MappingMode: %s", result, tsRecord, mappingMode) "will contain request: \n%s\n\nUsed MappingMode: %s", result, tsRecord, mappingMode)
} }
} }

View File

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"log"
"math" "math"
"net/http" "net/http"
"net/url" "net/url"
@ -33,6 +32,7 @@ type Warp10 struct {
MaxStringErrorSize int `toml:"max_string_error_size"` MaxStringErrorSize int `toml:"max_string_error_size"`
client *http.Client client *http.Client
tls.ClientConfig tls.ClientConfig
Log telegraf.Logger `toml:"-"`
} }
var sampleConfig = ` var sampleConfig = `
@ -114,7 +114,7 @@ func (w *Warp10) GenWarp10Payload(metrics []telegraf.Metric) string {
metricValue, err := buildValue(field.Value) metricValue, err := buildValue(field.Value)
if err != nil { if err != nil {
log.Printf("E! [outputs.warp10] Could not encode value: %v", err) w.Log.Errorf("Could not encode value: %v", err)
continue continue
} }
metric.Value = metricValue metric.Value = metricValue
@ -199,7 +199,7 @@ func buildValue(v interface{}) (string, error) {
retv = strconv.FormatInt(math.MaxInt64, 10) retv = strconv.FormatInt(math.MaxInt64, 10)
} }
case float64: case float64:
retv = floatToString(float64(p)) retv = floatToString(p)
default: default:
return "", fmt.Errorf("unsupported type: %T", v) return "", fmt.Errorf("unsupported type: %T", v)
} }

View File

@ -5,9 +5,10 @@ import (
"regexp" "regexp"
"strings" "strings"
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
wavefront "github.com/wavefronthq/wavefront-sdk-go/senders"
) )
const maxTagLength = 254 const maxTagLength = 254
@ -51,7 +52,7 @@ var strictSanitizedChars = strings.NewReplacer(
) )
// instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer // instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer
var sanitizedRegex = regexp.MustCompile("[^a-zA-Z\\d_.-]") var sanitizedRegex = regexp.MustCompile(`[^a-zA-Z\d_.-]`)
var tagValueReplacer = strings.NewReplacer("*", "-") var tagValueReplacer = strings.NewReplacer("*", "-")