Switching kinesis output plugin to use telegraf.Logger (#8929)

This commit is contained in:
Jeff Ashton 2021-03-03 09:49:05 -05:00 committed by GitHub
parent 851136f16c
commit d50a52ff2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 11 deletions

View File

@ -1,7 +1,6 @@
package kinesis package kinesis
import ( import (
"log"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@ -30,9 +29,10 @@ type (
RandomPartitionKey bool `toml:"use_random_partitionkey"` RandomPartitionKey bool `toml:"use_random_partitionkey"`
Partition *Partition `toml:"partition"` Partition *Partition `toml:"partition"`
Debug bool `toml:"debug"` Debug bool `toml:"debug"`
svc kinesisiface.KinesisAPI
Log telegraf.Logger `toml:"-"`
serializer serializers.Serializer serializer serializers.Serializer
svc kinesisiface.KinesisAPI
} }
Partition struct { Partition struct {
@ -118,13 +118,13 @@ func (k *KinesisOutput) Description() string {
func (k *KinesisOutput) Connect() error { func (k *KinesisOutput) Connect() error {
if k.Partition == nil { if k.Partition == nil {
log.Print("E! kinesis : Deprecated partitionkey configuration in use, please consider using outputs.kinesis.partition") k.Log.Error("Deprecated partitionkey configuration in use, please consider using outputs.kinesis.partition")
} }
// We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using
// environment variables, and then Shared Credentials. // environment variables, and then Shared Credentials.
if k.Debug { if k.Debug {
log.Printf("I! kinesis: Establishing a connection to Kinesis in %s", k.Region) k.Log.Infof("Establishing a connection to Kinesis in %s", k.Region)
} }
credentialConfig := &internalaws.CredentialConfig{ credentialConfig := &internalaws.CredentialConfig{
@ -165,17 +165,17 @@ func (k *KinesisOutput) writeKinesis(r []*kinesis.PutRecordsRequestEntry) time.D
resp, err := k.svc.PutRecords(payload) resp, err := k.svc.PutRecords(payload)
if err != nil { if err != nil {
log.Printf("E! kinesis: Unable to write to Kinesis : %s", err.Error()) k.Log.Errorf("Unable to write to Kinesis : %s", err.Error())
return time.Since(start) return time.Since(start)
} }
if k.Debug { if k.Debug {
log.Printf("I! Wrote: '%+v'", resp) k.Log.Infof("Wrote: '%+v'", resp)
} }
failed := *resp.FailedRecordCount failed := *resp.FailedRecordCount
if failed > 0 { if failed > 0 {
log.Printf("E! kinesis: Unable to write %+v of %+v record(s) to Kinesis", failed, len(r)) k.Log.Errorf("Unable to write %+v of %+v record(s) to Kinesis", failed, len(r))
} }
return time.Since(start) return time.Since(start)
@ -203,7 +203,7 @@ func (k *KinesisOutput) getPartitionKey(metric telegraf.Metric) string {
// Default partition name if default is not set // Default partition name if default is not set
return "telegraf" return "telegraf"
default: default:
log.Printf("E! kinesis : You have configured a Partition method of '%s' which is not supported", k.Partition.Method) k.Log.Errorf("You have configured a Partition method of '%s' which is not supported", k.Partition.Method)
} }
} }
if k.RandomPartitionKey { if k.RandomPartitionKey {
@ -230,7 +230,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
values, err := k.serializer.Serialize(metric) values, err := k.serializer.Serialize(metric)
if err != nil { if err != nil {
log.Printf("D! [outputs.kinesis] Could not serialize metric: %v", err) k.Log.Debugf("Could not serialize metric: %v", err)
continue continue
} }
@ -246,7 +246,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
if sz == 500 { if sz == 500 {
// Max Messages Per PutRecordRequest is 500 // Max Messages Per PutRecordRequest is 500
elapsed := k.writeKinesis(r) elapsed := k.writeKinesis(r)
log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) k.Log.Debugf("Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
sz = 0 sz = 0
r = nil r = nil
} }
@ -254,7 +254,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
} }
if sz > 0 { if sz > 0 {
elapsed := k.writeKinesis(r) elapsed := k.writeKinesis(r)
log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) k.Log.Debugf("Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
} }
return nil return nil

View File

@ -20,6 +20,7 @@ func TestPartitionKey(t *testing.T) {
testPoint := testutil.TestMetric(1) testPoint := testutil.TestMetric(1)
k := KinesisOutput{ k := KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{ Partition: &Partition{
Method: "static", Method: "static",
Key: "-", Key: "-",
@ -28,6 +29,7 @@ func TestPartitionKey(t *testing.T) {
assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'") assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")
k = KinesisOutput{ k = KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{ Partition: &Partition{
Method: "tag", Method: "tag",
Key: "tag1", Key: "tag1",
@ -36,6 +38,7 @@ func TestPartitionKey(t *testing.T) {
assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'") assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'")
k = KinesisOutput{ k = KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{ Partition: &Partition{
Method: "tag", Method: "tag",
Key: "doesnotexist", Key: "doesnotexist",
@ -45,6 +48,7 @@ func TestPartitionKey(t *testing.T) {
assert.Equal("somedefault", k.getPartitionKey(testPoint), "PartitionKey should use default") assert.Equal("somedefault", k.getPartitionKey(testPoint), "PartitionKey should use default")
k = KinesisOutput{ k = KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{ Partition: &Partition{
Method: "tag", Method: "tag",
Key: "doesnotexist", Key: "doesnotexist",
@ -53,6 +57,7 @@ func TestPartitionKey(t *testing.T) {
assert.Equal("telegraf", k.getPartitionKey(testPoint), "PartitionKey should be telegraf") assert.Equal("telegraf", k.getPartitionKey(testPoint), "PartitionKey should be telegraf")
k = KinesisOutput{ k = KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{ Partition: &Partition{
Method: "not supported", Method: "not supported",
}, },
@ -60,6 +65,7 @@ func TestPartitionKey(t *testing.T) {
assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''") assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''")
k = KinesisOutput{ k = KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{ Partition: &Partition{
Method: "measurement", Method: "measurement",
}, },
@ -67,6 +73,7 @@ func TestPartitionKey(t *testing.T) {
assert.Equal(testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name") assert.Equal(testPoint.Name(), k.getPartitionKey(testPoint), "PartitionKey should be value of measurement name")
k = KinesisOutput{ k = KinesisOutput{
Log: testutil.Logger{},
Partition: &Partition{ Partition: &Partition{
Method: "random", Method: "random",
}, },
@ -77,11 +84,13 @@ func TestPartitionKey(t *testing.T) {
assert.Equal(byte(4), u.Version(), "PartitionKey should be UUIDv4") assert.Equal(byte(4), u.Version(), "PartitionKey should be UUIDv4")
k = KinesisOutput{ k = KinesisOutput{
Log: testutil.Logger{},
PartitionKey: "-", PartitionKey: "-",
} }
assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'") assert.Equal("-", k.getPartitionKey(testPoint), "PartitionKey should be '-'")
k = KinesisOutput{ k = KinesisOutput{
Log: testutil.Logger{},
RandomPartitionKey: true, RandomPartitionKey: true,
} }
partitionKey = k.getPartitionKey(testPoint) partitionKey = k.getPartitionKey(testPoint)
@ -120,6 +129,7 @@ func TestWriteKinesis_WhenSuccess(t *testing.T) {
) )
k := KinesisOutput{ k := KinesisOutput{
Log: testutil.Logger{},
StreamName: streamName, StreamName: streamName,
svc: svc, svc: svc,
} }
@ -165,6 +175,7 @@ func TestWriteKinesis_WhenRecordErrors(t *testing.T) {
) )
k := KinesisOutput{ k := KinesisOutput{
Log: testutil.Logger{},
StreamName: streamName, StreamName: streamName,
svc: svc, svc: svc,
} }
@ -200,6 +211,7 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) {
) )
k := KinesisOutput{ k := KinesisOutput{
Log: testutil.Logger{},
StreamName: streamName, StreamName: streamName,
svc: svc, svc: svc,
} }