outputs.kinesis - log record error count (#8817)

This commit is contained in:
Jeff Ashton 2021-03-01 10:56:17 -05:00 committed by GitHub
parent b362ee4665
commit a65a3052a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 248 additions and 16 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/gofrs/uuid"
"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/config/aws"
@ -29,7 +30,7 @@ type (
RandomPartitionKey bool `toml:"use_random_partitionkey"`
Partition *Partition `toml:"partition"`
Debug bool `toml:"debug"`
svc *kinesis.Kinesis
svc kinesisiface.KinesisAPI
serializer serializers.Serializer
}
@ -154,26 +155,29 @@ func (k *KinesisOutput) SetSerializer(serializer serializers.Serializer) {
k.serializer = serializer
}
func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Duration {
func (k *KinesisOutput) writeKinesis(r []*kinesis.PutRecordsRequestEntry) time.Duration {
start := time.Now()
payload := &kinesis.PutRecordsInput{
Records: r,
StreamName: aws.String(k.StreamName),
}
if k.Debug {
resp, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("E! kinesis: Unable to write to Kinesis : %s", err.Error())
}
log.Printf("I! Wrote: '%+v'", resp)
} else {
_, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("E! kinesis: Unable to write to Kinesis : %s", err.Error())
}
resp, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("E! kinesis: Unable to write to Kinesis : %s", err.Error())
return time.Since(start)
}
if k.Debug {
log.Printf("I! Wrote: '%+v'", resp)
}
failed := *resp.FailedRecordCount
if failed > 0 {
log.Printf("E! kinesis: Unable to write %+v of %+v record(s) to Kinesis", failed, len(r))
}
return time.Since(start)
}
@ -241,7 +245,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
if sz == 500 {
// Max Messages Per PutRecordRequest is 500
elapsed := writekinesis(k, r)
elapsed := k.writeKinesis(r)
log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
sz = 0
r = nil
@ -249,7 +253,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
}
if sz > 0 {
elapsed := writekinesis(k, r)
elapsed := k.writeKinesis(r)
log.Printf("D! Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
}

View File

@ -1,13 +1,19 @@
package kinesis
import (
"fmt"
"testing"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/gofrs/uuid"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)
const zero int64 = 0
func TestPartitionKey(t *testing.T) {
assert := assert.New(t)
@ -83,3 +89,225 @@ func TestPartitionKey(t *testing.T) {
assert.Nil(err, "Issue parsing UUID")
assert.Equal(byte(4), u.Version(), "PartitionKey should be UUIDv4")
}
func TestWriteKinesis_WhenSuccess(t *testing.T) {
assert := assert.New(t)
partitionKey := "partitionKey"
shard := "shard"
sequenceNumber := "sequenceNumber"
streamName := "stream"
records := []*kinesis.PutRecordsRequestEntry{
{
PartitionKey: &partitionKey,
Data: []byte{0x65},
},
}
svc := &mockKinesisPutRecords{}
svc.SetupResponse(
0,
[]*kinesis.PutRecordsResultEntry{
{
ErrorCode: nil,
ErrorMessage: nil,
SequenceNumber: &sequenceNumber,
ShardId: &shard,
},
},
)
k := KinesisOutput{
StreamName: streamName,
svc: svc,
}
elapsed := k.writeKinesis(records)
assert.GreaterOrEqual(elapsed.Nanoseconds(), zero)
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
Records: records,
},
})
}
func TestWriteKinesis_WhenRecordErrors(t *testing.T) {
assert := assert.New(t)
errorCode := "InternalFailure"
errorMessage := "Internal Service Failure"
partitionKey := "partitionKey"
streamName := "stream"
records := []*kinesis.PutRecordsRequestEntry{
{
PartitionKey: &partitionKey,
Data: []byte{0x66},
},
}
svc := &mockKinesisPutRecords{}
svc.SetupResponse(
1,
[]*kinesis.PutRecordsResultEntry{
{
ErrorCode: &errorCode,
ErrorMessage: &errorMessage,
SequenceNumber: nil,
ShardId: nil,
},
},
)
k := KinesisOutput{
StreamName: streamName,
svc: svc,
}
elapsed := k.writeKinesis(records)
assert.GreaterOrEqual(elapsed.Nanoseconds(), zero)
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
Records: records,
},
})
}
func TestWriteKinesis_WhenServiceError(t *testing.T) {
assert := assert.New(t)
partitionKey := "partitionKey"
streamName := "stream"
records := []*kinesis.PutRecordsRequestEntry{
{
PartitionKey: &partitionKey,
Data: []byte{},
},
}
svc := &mockKinesisPutRecords{}
svc.SetupErrorResponse(
awserr.New("InvalidArgumentException", "Invalid record", nil),
)
k := KinesisOutput{
StreamName: streamName,
svc: svc,
}
elapsed := k.writeKinesis(records)
assert.GreaterOrEqual(elapsed.Nanoseconds(), zero)
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
Records: records,
},
})
}
type mockKinesisPutRecordsResponse struct {
Output *kinesis.PutRecordsOutput
Err error
}
type mockKinesisPutRecords struct {
kinesisiface.KinesisAPI
requests []*kinesis.PutRecordsInput
responses []*mockKinesisPutRecordsResponse
}
func (m *mockKinesisPutRecords) SetupResponse(
failedRecordCount int64,
records []*kinesis.PutRecordsResultEntry,
) {
m.responses = append(m.responses, &mockKinesisPutRecordsResponse{
Err: nil,
Output: &kinesis.PutRecordsOutput{
FailedRecordCount: &failedRecordCount,
Records: records,
},
})
}
func (m *mockKinesisPutRecords) SetupErrorResponse(err error) {
m.responses = append(m.responses, &mockKinesisPutRecordsResponse{
Err: err,
Output: nil,
})
}
func (m *mockKinesisPutRecords) PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) {
reqNum := len(m.requests)
if reqNum > len(m.responses) {
return nil, fmt.Errorf("Response for request %+v not setup", reqNum)
}
m.requests = append(m.requests, input)
resp := m.responses[reqNum]
return resp.Output, resp.Err
}
func (m *mockKinesisPutRecords) AssertRequests(
assert *assert.Assertions,
expected []*kinesis.PutRecordsInput,
) {
assert.Equal(
len(expected),
len(m.requests),
fmt.Sprintf("Expected %v requests", len(expected)),
)
for i, expectedInput := range expected {
actualInput := m.requests[i]
assert.Equal(
expectedInput.StreamName,
actualInput.StreamName,
fmt.Sprintf("Expected request %v to have correct StreamName", i),
)
assert.Equal(
len(expectedInput.Records),
len(actualInput.Records),
fmt.Sprintf("Expected request %v to have %v Records", i, len(expectedInput.Records)),
)
for r, expectedRecord := range expectedInput.Records {
actualRecord := actualInput.Records[r]
assert.Equal(
&expectedRecord.PartitionKey,
&actualRecord.PartitionKey,
fmt.Sprintf("Expected (request %v, record %v) to have correct PartitionKey", i, r),
)
assert.Equal(
&expectedRecord.ExplicitHashKey,
&actualRecord.ExplicitHashKey,
fmt.Sprintf("Expected (request %v, record %v) to have correct ExplicitHashKey", i, r),
)
assert.Equal(
expectedRecord.Data,
actualRecord.Data,
fmt.Sprintf("Expected (request %v, record %v) to have correct Data", i, r),
)
}
}
}