diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index fd233e5b8..47d7aa10f 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -13,6 +13,9 @@ import ( "github.com/influxdata/telegraf/plugins/serializers" ) +// Limit set by AWS (https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) +const maxRecordsPerRequest uint32 = 500 + type ( KinesisOutput struct { Region string `toml:"region"` @@ -243,8 +246,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error { r = append(r, &d) - if sz == 500 { - // Max Messages Per PutRecordRequest is 500 + if sz == maxRecordsPerRequest { elapsed := k.writeKinesis(r) k.Log.Debugf("Wrote a %d point batch to Kinesis in %+v.", sz, elapsed) sz = 0 diff --git a/plugins/outputs/kinesis/kinesis_test.go b/plugins/outputs/kinesis/kinesis_test.go index 49cfcedd5..4c7063c40 100644 --- a/plugins/outputs/kinesis/kinesis_test.go +++ b/plugins/outputs/kinesis/kinesis_test.go @@ -8,8 +8,12 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" "github.com/gofrs/uuid" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const zero int64 = 0 @@ -227,6 +231,272 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) { }) } +func TestWrite_NoMetrics(t *testing.T) { + assert := assert.New(t) + serializer := influx.NewSerializer() + svc := &mockKinesisPutRecords{} + + k := KinesisOutput{ + Log: testutil.Logger{}, + Partition: &Partition{ + Method: "static", + Key: "partitionKey", + }, + StreamName: "stream", + serializer: serializer, + svc: svc, + } + + err := k.Write([]telegraf.Metric{}) + assert.Nil(err, "Should not return error") + + svc.AssertRequests(assert, []*kinesis.PutRecordsInput{}) +} + +func TestWrite_SingleMetric(t *testing.T) { + assert := assert.New(t) + serializer := influx.NewSerializer() + partitionKey := "partitionKey" + streamName := "stream" + + svc := &mockKinesisPutRecords{} + svc.SetupGenericResponse(1, 0) + + k := KinesisOutput{ + Log: testutil.Logger{}, + Partition: &Partition{ + Method: "static", + Key: partitionKey, + }, + StreamName: streamName, + serializer: serializer, + svc: svc, + } + + metric, metricData := createTestMetric(t, "metric1", serializer) + err := k.Write([]telegraf.Metric{metric}) + assert.Nil(err, "Should not return error") + + svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + { + StreamName: &streamName, + Records: []*kinesis.PutRecordsRequestEntry{ + { + PartitionKey: &partitionKey, + Data: metricData, + }, + }, + }, + }) +} + +func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) { + assert := assert.New(t) + serializer := influx.NewSerializer() + partitionKey := "partitionKey" + streamName := "stream" + + svc := &mockKinesisPutRecords{} + svc.SetupGenericResponse(3, 0) + + k := KinesisOutput{ + Log: testutil.Logger{}, + Partition: &Partition{ + Method: "static", + Key: partitionKey, + }, + StreamName: streamName, + serializer: serializer, + svc: svc, + } + + metrics, metricsData := createTestMetrics(t, 3, serializer) + err := k.Write(metrics) + assert.Nil(err, "Should not return error") + + svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + { + StreamName: &streamName, + Records: createPutRecordsRequestEntries( + metricsData, + &partitionKey, + ), + }, + }) +} + +func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) { + assert := assert.New(t) + serializer := influx.NewSerializer() + partitionKey := "partitionKey" + streamName := "stream" + + svc := &mockKinesisPutRecords{} + svc.SetupGenericResponse(maxRecordsPerRequest, 0) + + k := KinesisOutput{ + Log: testutil.Logger{}, + Partition: &Partition{ + Method: "static", + Key: partitionKey, + }, + StreamName: streamName, + serializer: serializer, + svc: svc, + } + + metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest, serializer) + err := k.Write(metrics) + assert.Nil(err, "Should not return error") + + svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + { + StreamName: &streamName, + Records: createPutRecordsRequestEntries( + metricsData, + &partitionKey, + ), + }, + }) +} + +func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) { + assert := assert.New(t) + serializer := influx.NewSerializer() + partitionKey := "partitionKey" + streamName := "stream" + + svc := &mockKinesisPutRecords{} + svc.SetupGenericResponse(maxRecordsPerRequest, 0) + svc.SetupGenericResponse(1, 0) + + k := KinesisOutput{ + Log: testutil.Logger{}, + Partition: &Partition{ + Method: "static", + Key: partitionKey, + }, + StreamName: streamName, + serializer: serializer, + svc: svc, + } + + metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest+1, serializer) + err := k.Write(metrics) + assert.Nil(err, "Should not return error") + + svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + { + StreamName: &streamName, + Records: createPutRecordsRequestEntries( + metricsData[0:maxRecordsPerRequest], + &partitionKey, + ), + }, + { + StreamName: &streamName, + Records: createPutRecordsRequestEntries( + metricsData[maxRecordsPerRequest:], + &partitionKey, + ), + }, + }) +} + +func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) { + assert := assert.New(t) + serializer := influx.NewSerializer() + partitionKey := "partitionKey" + streamName := "stream" + + svc := &mockKinesisPutRecords{} + svc.SetupGenericResponse(maxRecordsPerRequest, 0) + svc.SetupGenericResponse(maxRecordsPerRequest, 0) + + k := KinesisOutput{ + Log: testutil.Logger{}, + Partition: &Partition{ + Method: "static", + Key: partitionKey, + }, + StreamName: streamName, + serializer: serializer, + svc: svc, + } + + metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest*2, serializer) + err := k.Write(metrics) + assert.Nil(err, "Should not return error") + + svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + { + StreamName: &streamName, + Records: createPutRecordsRequestEntries( + metricsData[0:maxRecordsPerRequest], + &partitionKey, + ), + }, + { + StreamName: &streamName, + Records: createPutRecordsRequestEntries( + metricsData[maxRecordsPerRequest:], + &partitionKey, + ), + }, + }) +} + +func TestWrite_SerializerError(t *testing.T) { + assert := assert.New(t) + serializer := influx.NewSerializer() + partitionKey := "partitionKey" + streamName := "stream" + + svc := &mockKinesisPutRecords{} + svc.SetupGenericResponse(2, 0) + + k := KinesisOutput{ + Log: testutil.Logger{}, + Partition: &Partition{ + Method: "static", + Key: partitionKey, + }, + StreamName: streamName, + serializer: serializer, + svc: svc, + } + + metric1, metric1Data := createTestMetric(t, "metric1", serializer) + metric2, metric2Data := createTestMetric(t, "metric2", serializer) + + // metric is invalid because of empty name + invalidMetric := testutil.TestMetric(3, "") + + err := k.Write([]telegraf.Metric{ + metric1, + invalidMetric, + metric2, + }) + assert.Nil(err, "Should not return error") + + // remaining valid metrics should still get written + svc.AssertRequests(assert, []*kinesis.PutRecordsInput{ + { + StreamName: &streamName, + Records: []*kinesis.PutRecordsRequestEntry{ + { + PartitionKey: &partitionKey, + Data: metric1Data, + }, + { + PartitionKey: &partitionKey, + Data: metric2Data, + }, + }, + }, + }) +} + type mockKinesisPutRecordsResponse struct { Output *kinesis.PutRecordsOutput Err error @@ -253,6 +523,35 @@ func (m *mockKinesisPutRecords) SetupResponse( }) } +func (m *mockKinesisPutRecords) SetupGenericResponse( + successfulRecordCount uint32, + failedRecordCount uint32, +) { + + errorCode := "InternalFailure" + errorMessage := "Internal Service Failure" + shard := "shardId-000000000003" + + records := []*kinesis.PutRecordsResultEntry{} + + for i := uint32(0); i < successfulRecordCount; i++ { + sequenceNumber := fmt.Sprintf("%d", i) + records = append(records, &kinesis.PutRecordsResultEntry{ + SequenceNumber: &sequenceNumber, + ShardId: &shard, + }) + } + + for i := uint32(0); i < failedRecordCount; i++ { + records = append(records, &kinesis.PutRecordsResultEntry{ + ErrorCode: &errorCode, + ErrorMessage: &errorMessage, + }) + } + + m.SetupResponse(int64(failedRecordCount), records) +} + func (m *mockKinesisPutRecords) SetupErrorResponse(err error) { m.responses = append(m.responses, &mockKinesisPutRecordsResponse{ @@ -323,3 +622,54 @@ func (m *mockKinesisPutRecords) AssertRequests( } } } + +func createTestMetric( + t *testing.T, + name string, + serializer serializers.Serializer, +) (telegraf.Metric, []byte) { + + metric := testutil.TestMetric(1, name) + + data, err := serializer.Serialize(metric) + require.NoError(t, err) + + return metric, data +} + +func createTestMetrics( + t *testing.T, + count uint32, + serializer serializers.Serializer, +) ([]telegraf.Metric, [][]byte) { + + metrics := make([]telegraf.Metric, count) + metricsData := make([][]byte, count) + + for i := uint32(0); i < count; i++ { + name := fmt.Sprintf("metric%d", i) + metric, data := createTestMetric(t, name, serializer) + metrics[i] = metric + metricsData[i] = data + } + + return metrics, metricsData +} + +func createPutRecordsRequestEntries( + metricsData [][]byte, + partitionKey *string, +) []*kinesis.PutRecordsRequestEntry { + + count := len(metricsData) + records := make([]*kinesis.PutRecordsRequestEntry, count) + + for i := 0; i < count; i++ { + records[i] = &kinesis.PutRecordsRequestEntry{ + PartitionKey: partitionKey, + Data: metricsData[i], + } + } + + return records +}