diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index 88620fa70..75f790f33 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -6,6 +6,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" "github.com/gofrs/uuid" "github.com/influxdata/telegraf" internalaws "github.com/influxdata/telegraf/config/aws" @@ -29,7 +30,7 @@ type ( RandomPartitionKey bool `toml:"use_random_partitionkey"` Partition *Partition `toml:"partition"` Debug bool `toml:"debug"` - svc *kinesis.Kinesis + svc kinesisiface.KinesisAPI serializer serializers.Serializer } @@ -154,26 +155,29 @@ func (k *KinesisOutput) SetSerializer(serializer serializers.Serializer) { k.serializer = serializer } -func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Duration { +func (k *KinesisOutput) writeKinesis(r []*kinesis.PutRecordsRequestEntry) time.Duration { + start := time.Now() payload := &kinesis.PutRecordsInput{ Records: r, StreamName: aws.String(k.StreamName), } - if k.Debug { - resp, err := k.svc.PutRecords(payload) - if err != nil { - log.Printf("E! kinesis: Unable to write to Kinesis : %s", err.Error()) - } - log.Printf("I! Wrote: '%+v'", resp) - - } else { - _, err := k.svc.PutRecords(payload) - if err != nil { - log.Printf("E! kinesis: Unable to write to Kinesis : %s", err.Error()) - } + resp, err := k.svc.PutRecords(payload) + if err != nil { + log.Printf("E! kinesis: Unable to write to Kinesis : %s", err.Error()) + return time.Since(start) } + + if k.Debug { + log.Printf("I! Wrote: '%+v'", resp) + } + + failed := *resp.FailedRecordCount + if failed > 0 { + log.Printf("E! kinesis: Unable to write %+v of %+v record(s) to Kinesis", failed, len(r)) + } + return time.Since(start) } @@ -241,7 +245,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { if sz == 500 { // Max Messages Per PutRecordRequest is 500 - elapsed := writekinesis(k, r) + elapsed := k.writeKinesis(r) log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) sz = 0 r = nil @@ -249,7 +253,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { } if sz > 0 { - elapsed := writekinesis(k, r) + elapsed := k.writeKinesis(r) log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) } diff --git a/plugins/outputs/kinesis/kinesis_test.go b/plugins/outputs/kinesis/kinesis_test.go index 9d4f6729b..293ec86fb 100644 --- a/plugins/outputs/kinesis/kinesis_test.go +++ b/plugins/outputs/kinesis/kinesis_test.go @@ -1,13 +1,19 @@ package kinesis import ( + "fmt" "testing" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" "github.com/gofrs/uuid" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" ) +const zero int64 = 0 + func TestPartitionKey(t *testing.T) { assert := assert.New(t) @@ -83,3 +89,225 @@ func TestPartitionKey(t *testing.T) { assert.Nil(err, "Issue parsing UUID") assert.Equal(byte(4), u.Version(), "PartitionKey should be UUIDv4") } + +func TestWriteKinesis_WhenSuccess(t *testing.T) { + + assert := assert.New(t) + + partitionKey := "partitionKey" + shard := "shard" + sequenceNumber := "sequenceNumber" + streamName := "stream" + + records := []*kinesis.PutRecordsRequestEntry{ + { + PartitionKey: &partitionKey, + Data: []byte{0x65}, + }, + } + + svc := &mockKinesisPutRecords{} + svc.SetupResponse( + 0, + []*kinesis.PutRecordsResultEntry{ + { + ErrorCode: nil, + ErrorMessage: nil, + SequenceNumber: &sequenceNumber, + ShardId: &shard, + }, + }, + ) + + k := KinesisOutput{ + StreamName: streamName, + svc: svc, + } + + elapsed := k.writeKinesis(records) + assert.GreaterOrEqual(elapsed.Nanoseconds(), zero) + + svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + { + StreamName: &streamName, + Records: records, + }, + }) +} + +func TestWriteKinesis_WhenRecordErrors(t *testing.T) { + + assert := assert.New(t) + + errorCode := "InternalFailure" + errorMessage := "Internal Service Failure" + partitionKey := "partitionKey" + streamName := "stream" + + records := []*kinesis.PutRecordsRequestEntry{ + { + PartitionKey: &partitionKey, + Data: []byte{0x66}, + }, + } + + svc := &mockKinesisPutRecords{} + svc.SetupResponse( + 1, + []*kinesis.PutRecordsResultEntry{ + { + ErrorCode: &errorCode, + ErrorMessage: &errorMessage, + SequenceNumber: nil, + ShardId: nil, + }, + }, + ) + + k := KinesisOutput{ + StreamName: streamName, + svc: svc, + } + + elapsed := k.writeKinesis(records) + assert.GreaterOrEqual(elapsed.Nanoseconds(), zero) + + svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + { + StreamName: &streamName, + Records: records, + }, + }) +} + +func TestWriteKinesis_WhenServiceError(t *testing.T) { + + assert := assert.New(t) + + partitionKey := "partitionKey" + streamName := "stream" + + records := []*kinesis.PutRecordsRequestEntry{ + { + PartitionKey: &partitionKey, + Data: []byte{}, + }, + } + + svc := &mockKinesisPutRecords{} + svc.SetupErrorResponse( + awserr.New("InvalidArgumentException", "Invalid record", nil), + ) + + k := KinesisOutput{ + StreamName: streamName, + svc: svc, + } + + elapsed := k.writeKinesis(records) + assert.GreaterOrEqual(elapsed.Nanoseconds(), zero) + + svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + { + StreamName: &streamName, + Records: records, + }, + }) +} + +type mockKinesisPutRecordsResponse struct { + Output *kinesis.PutRecordsOutput + Err error +} + +type mockKinesisPutRecords struct { + kinesisiface.KinesisAPI + + requests []*kinesis.PutRecordsInput + responses []*mockKinesisPutRecordsResponse +} + +func (m *mockKinesisPutRecords) SetupResponse( + failedRecordCount int64, + records []*kinesis.PutRecordsResultEntry, +) { + + m.responses = append(m.responses, &mockKinesisPutRecordsResponse{ + Err: nil, + Output: &kinesis.PutRecordsOutput{ + FailedRecordCount: &failedRecordCount, + Records: records, + }, + }) +} + +func (m *mockKinesisPutRecords) SetupErrorResponse(err error) { + + m.responses = append(m.responses, &mockKinesisPutRecordsResponse{ + Err: err, + Output: nil, + }) +} + +func (m *mockKinesisPutRecords) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) { + + reqNum := len(m.requests) + if reqNum > len(m.responses) { + return nil, fmt.Errorf("Response for request %+v not setup", reqNum) + } + + m.requests = append(m.requests, input) + + resp := m.responses[reqNum] + return resp.Output, resp.Err +} + +func (m *mockKinesisPutRecords) AssertRequests( + assert *assert.Assertions, + expected []*kinesis.PutRecordsInput, +) { + + assert.Equal( + len(expected), + len(m.requests), + fmt.Sprintf("Expected %v requests", len(expected)), + ) + + for i, expectedInput := range expected { + actualInput := m.requests[i] + + assert.Equal( + expectedInput.StreamName, + actualInput.StreamName, + fmt.Sprintf("Expected request %v to have correct StreamName", i), + ) + + assert.Equal( + len(expectedInput.Records), + len(actualInput.Records), + fmt.Sprintf("Expected request %v to have %v Records", i, len(expectedInput.Records)), + ) + + for r, expectedRecord := range expectedInput.Records { + actualRecord := actualInput.Records[r] + + assert.Equal( + &expectedRecord.PartitionKey, + &actualRecord.PartitionKey, + fmt.Sprintf("Expected (request %v, record %v) to have correct PartitionKey", i, r), + ) + + assert.Equal( + &expectedRecord.ExplicitHashKey, + &actualRecord.ExplicitHashKey, + fmt.Sprintf("Expected (request %v, record %v) to have correct ExplicitHashKey", i, r), + ) + + assert.Equal( + expectedRecord.Data, + actualRecord.Data, + fmt.Sprintf("Expected (request %v, record %v) to have correct Data", i, r), + ) + } + } +}