From d50a52ff2ff4671cb02861cef389102093a2bd79 Mon Sep 17 00:00:00 2001 From: Jeff Ashton Date: Wed, 3 Mar 2021 09:49:05 -0500 Subject: [PATCH] Switching kinesis output plugin to use telegraf.Logger (#8929) --- plugins/outputs/kinesis/kinesis.go | 22 +++++++++++----------- plugins/outputs/kinesis/kinesis_test.go | 12 ++++++++++++ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index 75f790f33..fd233e5b8 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -1,7 +1,6 @@ package kinesis import ( - "log" "time" "github.com/aws/aws-sdk-go/aws" @@ -30,9 +29,10 @@ type ( RandomPartitionKey bool `toml:"use_random_partitionkey"` Partition *Partition `toml:"partition"` Debug bool `toml:"debug"` - svc kinesisiface.KinesisAPI + Log telegraf.Logger `toml:"-"` serializer serializers.Serializer + svc kinesisiface.KinesisAPI } Partition struct { @@ -118,13 +118,13 @@ func (k *KinesisOutput) Description() string { func (k *KinesisOutput) Connect() error { if k.Partition == nil { - log.Print("E! kinesis : Deprecated partitionkey configuration in use, please consider using outputs.kinesis.partition") + k.Log.Error("Deprecated partitionkey configuration in use, please consider using outputs.kinesis.partition") } // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using // environment variables, and then Shared Credentials. if k.Debug { - log.Printf("I! kinesis: Establishing a connection to Kinesis in %s", k.Region) + k.Log.Infof("Establishing a connection to Kinesis in %s", k.Region) } credentialConfig := &internalaws.CredentialConfig{ @@ -165,17 +165,17 @@ func (k *KinesisOutput) writeKinesis(r []*kinesis.PutRecordsRequestEntry) time.D resp, err := k.svc.PutRecords(payload) if err != nil { - log.Printf("E! kinesis: Unable to write to Kinesis : %s", err.Error()) + k.Log.Errorf("Unable to write to Kinesis : %s", err.Error()) return time.Since(start) } if k.Debug { - log.Printf("I! Wrote: '%+v'", resp) + k.Log.Infof("Wrote: '%+v'", resp) } failed := *resp.FailedRecordCount if failed > 0 { - log.Printf("E! kinesis: Unable to write %+v of %+v record(s) to Kinesis", failed, len(r)) + k.Log.Errorf("Unable to write %+v of %+v record(s) to Kinesis", failed, len(r)) } return time.Since(start) @@ -203,7 +203,7 @@ func (k *KinesisOutput) getPartitionKey(metric telegraf.Metric) string { // Default partition name if default is not set return "telegraf" default: - log.Printf("E! kinesis : You have configured a Partition method of '%s' which is not supported", k.Partition.Method) + k.Log.Errorf("You have configured a Partition method of '%s' which is not supported", k.Partition.Method) } } if k.RandomPartitionKey { @@ -230,7 +230,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { values, err := k.serializer.Serialize(metric) if err != nil { - log.Printf("D! [outputs.kinesis] Could not serialize metric: %v", err) + k.Log.Debugf("Could not serialize metric: %v", err) continue } @@ -246,7 +246,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { if sz == 500 { // Max Messages Per PutRecordRequest is 500 elapsed := k.writeKinesis(r) - log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) + k.Log.Debugf("Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) sz = 0 r = nil } @@ -254,7 +254,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { } if sz > 0 { elapsed := k.writeKinesis(r) - log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) + k.Log.Debugf("Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) } return nil diff --git a/plugins/outputs/kinesis/kinesis_test.go b/plugins/outputs/kinesis/kinesis_test.go index 293ec86fb..49cfcedd5 100644 --- a/plugins/outputs/kinesis/kinesis_test.go +++ b/plugins/outputs/kinesis/kinesis_test.go @@ -20,6 +20,7 @@ func TestPartitionKey(t *testing.T) { testPoint := testutil.TestMetric(1) k := KinesisOutput{ + Log: testutil.Logger{}, Partition: &Partition{ Method: "static", Key: "-", @@ -28,6 +29,7 @@ func TestPartitionKey(t *testing.T) { assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'") k = KinesisOutput{ + Log: testutil.Logger{}, Partition: &Partition{ Method: "tag", Key: "tag1", @@ -36,6 +38,7 @@ func TestPartitionKey(t *testing.T) { assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'") k = KinesisOutput{ + Log: testutil.Logger{}, Partition: &Partition{ Method: "tag", Key: "doesnotexist", @@ -45,6 +48,7 @@ func TestPartitionKey(t *testing.T) { assert.Equal("somedefault", k.getPartitionKey(testPoint), "PartitionKey should use default") k = KinesisOutput{ + Log: testutil.Logger{}, Partition: &Partition{ Method: "tag", Key: "doesnotexist", @@ -53,6 +57,7 @@ func TestPartitionKey(t *testing.T) { assert.Equal("telegraf", k.getPartitionKey(testPoint), "PartitionKey should be telegraf") k = KinesisOutput{ + Log: testutil.Logger{}, Partition: &Partition{ Method: "not supported", }, @@ -60,6 +65,7 @@ func TestPartitionKey(t *testing.T) { assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''") k = KinesisOutput{ + Log: testutil.Logger{}, Partition: &Partition{ Method: "measurement", }, @@ -67,6 +73,7 @@ func TestPartitionKey(t *testing.T) { assert.Equal(testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name") k = KinesisOutput{ + Log: testutil.Logger{}, Partition: &Partition{ Method: "random", }, @@ -77,11 +84,13 @@ func TestPartitionKey(t *testing.T) { assert.Equal(byte(4), u.Version(), "PartitionKey should be UUIDv4") k = KinesisOutput{ + Log: testutil.Logger{}, PartitionKey: "-", } assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'") k = KinesisOutput{ + Log: testutil.Logger{}, RandomPartitionKey: true, } partitionKey = k.getPartitionKey(testPoint) @@ -120,6 +129,7 @@ func TestWriteKinesis_WhenSuccess(t *testing.T) { ) k := KinesisOutput{ + Log: testutil.Logger{}, StreamName: streamName, svc: svc, } @@ -165,6 +175,7 @@ func TestWriteKinesis_WhenRecordErrors(t *testing.T) { ) k := KinesisOutput{ + Log: testutil.Logger{}, StreamName: streamName, svc: svc, } @@ -200,6 +211,7 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) { ) k := KinesisOutput{ + Log: testutil.Logger{}, StreamName: streamName, svc: svc, }