fix(outputs): Linter issues (#11595)

This commit is contained in:
Sven Rebhan 2022-08-03 14:20:51 +02:00 committed by GitHub
parent b741f3288a
commit ac26d786fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 66 additions and 54 deletions

View File

@ -300,7 +300,7 @@ func (q *AMQP) makeClientConfig() (*ClientConfig, error) {
} }
func connect(clientConfig *ClientConfig) (Client, error) { func connect(clientConfig *ClientConfig) (Client, error) {
return Connect(clientConfig) return newClient(clientConfig)
} }
func init() { func init() {

View File

@ -35,8 +35,8 @@ type client struct {
config *ClientConfig config *ClientConfig
} }
// Connect opens a connection to one of the brokers at random // newClient opens a connection to one of the brokers at random
func Connect(config *ClientConfig) (*client, error) { func newClient(config *ClientConfig) (*client, error) {
client := &client{ client := &client{
config: config, config: config,
} }

View File

@ -160,7 +160,6 @@ func (*CloudWatch) SampleConfig() string {
} }
func (c *CloudWatch) Connect() error { func (c *CloudWatch) Connect() error {
cfg, err := c.CredentialConfig.Credentials() cfg, err := c.CredentialConfig.Credentials()
if err != nil { if err != nil {

View File

@ -4,7 +4,6 @@ package kafka
import ( import (
_ "embed" _ "embed"
"fmt" "fmt"
"log"
"strings" "strings"
"time" "time"
@ -68,22 +67,21 @@ type TopicSuffix struct {
// DebugLogger logs messages from sarama at the debug level. // DebugLogger logs messages from sarama at the debug level.
type DebugLogger struct { type DebugLogger struct {
Log telegraf.Logger
} }
func (*DebugLogger) Print(v ...interface{}) { func (l *DebugLogger) Print(v ...interface{}) {
args := make([]interface{}, 0, len(v)+1) args := make([]interface{}, 0, len(v)+1)
args = append(append(args, "D! [sarama] "), v...) args = append(append(args, "[sarama] "), v...)
log.Print(args...) l.Log.Debug(args...)
} }
func (*DebugLogger) Printf(format string, v ...interface{}) { func (l *DebugLogger) Printf(format string, v ...interface{}) {
log.Printf("D! [sarama] "+format, v...) l.Log.Debugf("[sarama] "+format, v...)
} }
func (*DebugLogger) Println(v ...interface{}) { func (l *DebugLogger) Println(v ...interface{}) {
args := make([]interface{}, 0, len(v)+1) l.Print(v)
args = append(append(args, "D! [sarama] "), v...)
log.Println(args...)
} }
func ValidateTopicSuffixMethod(method string) error { func ValidateTopicSuffixMethod(method string) error {
@ -140,6 +138,8 @@ func (k *Kafka) SetSerializer(serializer serializers.Serializer) {
} }
func (k *Kafka) Init() error { func (k *Kafka) Init() error {
sarama.Logger = &DebugLogger{Log: k.Log}
err := ValidateTopicSuffixMethod(k.TopicSuffix.Method) err := ValidateTopicSuffixMethod(k.TopicSuffix.Method)
if err != nil { if err != nil {
return err return err
@ -259,7 +259,6 @@ func (k *Kafka) Write(metrics []telegraf.Metric) error {
} }
func init() { func init() {
sarama.Logger = &DebugLogger{}
outputs.Add("kafka", func() telegraf.Output { outputs.Add("kafka", func() telegraf.Output {
return &Kafka{ return &Kafka{
WriteConfig: kafka.WriteConfig{ WriteConfig: kafka.WriteConfig{

View File

@ -80,6 +80,7 @@ func TestConnectAndWriteIntegration(t *testing.T) {
k := &Kafka{ k := &Kafka{
Brokers: brokers, Brokers: brokers,
Topic: "Test", Topic: "Test",
Log: testutil.Logger{},
serializer: s, serializer: s,
producerFunc: sarama.NewSyncProducer, producerFunc: sarama.NewSyncProducer,
} }
@ -133,6 +134,7 @@ func TestTopicSuffixes(t *testing.T) {
k := &Kafka{ k := &Kafka{
Topic: topic, Topic: topic,
TopicSuffix: topicSuffix, TopicSuffix: topicSuffix,
Log: testutil.Logger{},
} }
_, topic := k.GetTopicName(m) _, topic := k.GetTopicName(m)
@ -200,6 +202,7 @@ func TestRoutingKey(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
tt.kafka.Log = testutil.Logger{}
key, err := tt.kafka.routingKey(tt.metric) key, err := tt.kafka.routingKey(tt.metric)
require.NoError(t, err) require.NoError(t, err)
tt.check(t, key) tt.check(t, key)
@ -328,6 +331,8 @@ func TestTopicTag(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
tt.plugin.Log = testutil.Logger{}
s, err := serializers.NewInfluxSerializer() s, err := serializers.NewInfluxSerializer()
require.NoError(t, err) require.NoError(t, err)
tt.plugin.SetSerializer(s) tt.plugin.SetSerializer(s)

View File

@ -22,9 +22,7 @@ var sampleConfig string
const ( const (
defaultLogzioURL = "https://listener.logz.io:8071" defaultLogzioURL = "https://listener.logz.io:8071"
logzioType = "telegraf"
logzioDescription = "Send aggregate metrics to Logz.io"
logzioType = "telegraf"
) )
type Logzio struct { type Logzio struct {

View File

@ -12,6 +12,8 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
// Blank import to allow gzip encoding
_ "google.golang.org/grpc/encoding/gzip" _ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"

View File

@ -114,6 +114,7 @@ func (o *OpenTSDB) WriteHTTP(metrics []telegraf.Metric, u *url.URL) error {
BatchSize: o.HTTPBatchSize, BatchSize: o.HTTPBatchSize,
Path: o.HTTPPath, Path: o.HTTPPath,
Debug: o.Debug, Debug: o.Debug,
log: o.Log,
} }
for _, m := range metrics { for _, m := range metrics {
@ -218,7 +219,7 @@ func buildValue(v interface{}) (string, error) {
case uint64: case uint64:
retv = UIntToString(p) retv = UIntToString(p)
case float64: case float64:
retv = FloatToString(float64(p)) retv = FloatToString(p)
default: default:
return retv, fmt.Errorf("unexpected type %T with value %v for OpenTSDB", v, v) return retv, fmt.Errorf("unexpected type %T with value %v for OpenTSDB", v, v)
} }

View File

@ -6,10 +6,11 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"net/url" "net/url"
"github.com/influxdata/telegraf"
) )
type HTTPMetric struct { type HTTPMetric struct {
@ -28,6 +29,8 @@ type openTSDBHttp struct {
Path string Path string
Debug bool Debug bool
log telegraf.Logger
metricCounter int metricCounter int
body requestBody body requestBody
} }
@ -62,18 +65,20 @@ func (r *requestBody) reset(debug bool) {
r.enc = json.NewEncoder(r.w) r.enc = json.NewEncoder(r.w)
io.WriteString(r.w, "[") _, _ = io.WriteString(r.w, "[")
r.empty = true r.empty = true
} }
func (r *requestBody) addMetric(metric *HTTPMetric) error { func (r *requestBody) addMetric(metric *HTTPMetric) error {
if !r.empty { if !r.empty {
io.WriteString(r.w, ",") if _, err := io.WriteString(r.w, ","); err != nil {
return err
}
} }
if err := r.enc.Encode(metric); err != nil { if err := r.enc.Encode(metric); err != nil {
return fmt.Errorf("Metric serialization error %s", err.Error()) return fmt.Errorf("metric serialization error %w", err)
} }
r.empty = false r.empty = false
@ -82,10 +87,12 @@ func (r *requestBody) addMetric(metric *HTTPMetric) error {
} }
func (r *requestBody) close() error { func (r *requestBody) close() error {
io.WriteString(r.w, "]") if _, err := io.WriteString(r.w, "]"); err != nil {
return err
}
if err := r.g.Close(); err != nil { if err := r.g.Close(); err != nil {
return fmt.Errorf("Error when closing gzip writer: %s", err.Error()) return fmt.Errorf("error when closing gzip writer: %w", err)
} }
return nil return nil
@ -117,7 +124,9 @@ func (o *openTSDBHttp) flush() error {
return nil return nil
} }
o.body.close() if err := o.body.close(); err != nil {
return err
}
u := url.URL{ u := url.URL{
Scheme: o.Scheme, Scheme: o.Scheme,
@ -132,7 +141,7 @@ func (o *openTSDBHttp) flush() error {
req, err := http.NewRequest("POST", u.String(), &o.body.b) req, err := http.NewRequest("POST", u.String(), &o.body.b)
if err != nil { if err != nil {
return fmt.Errorf("Error when building request: %s", err.Error()) return fmt.Errorf("error when building request: %w", err)
} }
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip") req.Header.Set("Content-Encoding", "gzip")
@ -140,7 +149,7 @@ func (o *openTSDBHttp) flush() error {
if o.Debug { if o.Debug {
dump, err := httputil.DumpRequestOut(req, false) dump, err := httputil.DumpRequestOut(req, false)
if err != nil { if err != nil {
return fmt.Errorf("Error when dumping request: %s", err.Error()) return fmt.Errorf("error when dumping request: %w", err)
} }
fmt.Printf("Sending metrics:\n%s", dump) fmt.Printf("Sending metrics:\n%s", dump)
@ -149,14 +158,14 @@ func (o *openTSDBHttp) flush() error {
resp, err := http.DefaultClient.Do(req) resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
return fmt.Errorf("Error when sending metrics: %s", err.Error()) return fmt.Errorf("error when sending metrics: %w", err)
} }
defer resp.Body.Close() defer resp.Body.Close()
if o.Debug { if o.Debug {
dump, err := httputil.DumpResponse(resp, true) dump, err := httputil.DumpResponse(resp, true)
if err != nil { if err != nil {
return fmt.Errorf("Error when dumping response: %s", err.Error()) return fmt.Errorf("error when dumping response: %w", err)
} }
fmt.Printf("Received response\n%s\n\n", dump) fmt.Printf("Received response\n%s\n\n", dump)
@ -165,14 +174,11 @@ func (o *openTSDBHttp) flush() error {
_, _ = io.Copy(io.Discard, resp.Body) _, _ = io.Copy(io.Discard, resp.Body)
} }
if resp.StatusCode/100 != 2 { if resp.StatusCode < 200 || resp.StatusCode > 299 {
if resp.StatusCode/100 == 4 { if resp.StatusCode < 400 || resp.StatusCode > 499 {
log.Printf("E! Received %d status code. Dropping metrics to avoid overflowing buffer.", return fmt.Errorf("error sending metrics (status %d)", resp.StatusCode)
resp.StatusCode)
} else {
return fmt.Errorf("Error when sending metrics. Received status %d",
resp.StatusCode)
} }
o.log.Errorf("Received %d status code. Dropping metrics to avoid overflowing buffer.", resp.StatusCode)
} }
return nil return nil

View File

@ -126,16 +126,16 @@ func TestSanitize(t *testing.T) {
} }
func BenchmarkHttpSend(b *testing.B) { func BenchmarkHttpSend(b *testing.B) {
const BatchSize = 50 const batchSize = 50
const MetricsCount = 4 * BatchSize const metricsCount = 4 * batchSize
metrics := make([]telegraf.Metric, MetricsCount) metrics := make([]telegraf.Metric, metricsCount)
for i := 0; i < MetricsCount; i++ { for i := 0; i < metricsCount; i++ {
metrics[i] = testutil.TestMetric(1.0) metrics[i] = testutil.TestMetric(1.0)
} }
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
fmt.Fprintln(w, "{}") _, _ = fmt.Fprintln(w, "{}")
})) }))
defer ts.Close() defer ts.Close()
@ -155,13 +155,13 @@ func BenchmarkHttpSend(b *testing.B) {
Host: ts.URL, Host: ts.URL,
Port: port, Port: port,
Prefix: "", Prefix: "",
HTTPBatchSize: BatchSize, HTTPBatchSize: batchSize,
HTTPPath: "/api/put", HTTPPath: "/api/put",
} }
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
o.Write(metrics) _ = o.Write(metrics)
} }
} }
func TestWriteIntegration(t *testing.T) { func TestWriteIntegration(t *testing.T) {

View File

@ -212,14 +212,16 @@ func (c *Collector) Add(metrics []telegraf.Metric) error {
// fields to labels if enabled. // fields to labels if enabled.
if c.StringAsLabel { if c.StringAsLabel {
for fn, fv := range point.Fields() { for fn, fv := range point.Fields() {
switch fv := fv.(type) { sfv, ok := fv.(string)
case string: if !ok {
name, ok := serializer.SanitizeLabelName(fn) continue
if !ok {
continue
}
labels[name] = fv
} }
name, ok := serializer.SanitizeLabelName(fn)
if !ok {
continue
}
labels[name] = sfv
} }
} }

View File

@ -17,6 +17,7 @@ import (
metricpb "google.golang.org/genproto/googleapis/api/metric" metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
@ -72,7 +73,8 @@ func TestMain(m *testing.M) {
//nolint:errcheck,revive //nolint:errcheck,revive
go serv.Serve(lis) go serv.Serve(lis)
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure()) opt := grpc.WithTransportCredentials(insecure.NewCredentials())
conn, err := grpc.Dial(lis.Addr().String(), opt)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -70,7 +70,6 @@ const MaxWriteRoutinesDefault = 1
// WriteFactory function provides a way to mock the client instantiation for testing purposes. // WriteFactory function provides a way to mock the client instantiation for testing purposes.
var WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { var WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) {
awsCreds, awsErr := credentialConfig.Credentials() awsCreds, awsErr := credentialConfig.Credentials()
if awsErr != nil { if awsErr != nil {
panic("Unable to load credentials config " + awsErr.Error()) panic("Unable to load credentials config " + awsErr.Error())

View File

@ -361,7 +361,6 @@ func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) {
} }
func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *testing.T) { func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *testing.T) {
input1 := testutil.MustMetric( input1 := testutil.MustMetric(
metricName1, metricName1,
map[string]string{"tag1": "value1"}, map[string]string{"tag1": "value1"},