Simplifying the kinesis output tests (#8970)

This commit is contained in:
Jeff Ashton 2021-03-24 14:29:22 -04:00 committed by GitHub
parent 29ac77906d
commit 8564d928df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 74 additions and 110 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
@ -16,6 +17,10 @@ import (
"github.com/stretchr/testify/require"
)
const testPartitionKey = "partitionKey"
const testShardID = "shardId-000000000003"
const testSequenceNumber = "49543463076570308322303623326179887152428262250726293588"
const testStreamName = "streamName"
const zero int64 = 0
func TestPartitionKey(t *testing.T) {
@ -105,14 +110,9 @@ func TestPartitionKey(t *testing.T) {
func TestWriteKinesis_WhenSuccess(t *testing.T) {
assert := assert.New(t)
partitionKey := "partitionKey"
shard := "shard"
sequenceNumber := "sequenceNumber"
streamName := "stream"
records := []*kinesis.PutRecordsRequestEntry{
{
PartitionKey: &partitionKey,
PartitionKey: aws.String(testPartitionKey),
Data: []byte{0x65},
},
}
@ -122,26 +122,24 @@ func TestWriteKinesis_WhenSuccess(t *testing.T) {
0,
[]*kinesis.PutRecordsResultEntry{
{
ErrorCode: nil,
ErrorMessage: nil,
SequenceNumber: &sequenceNumber,
ShardId: &shard,
SequenceNumber: aws.String(testSequenceNumber),
ShardId: aws.String(testShardID),
},
},
)
k := KinesisOutput{
Log: testutil.Logger{},
StreamName: streamName,
StreamName: testStreamName,
svc: svc,
}
elapsed := k.writeKinesis(records)
assert.GreaterOrEqual(elapsed.Nanoseconds(), zero)
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: records,
},
})
@ -150,14 +148,9 @@ func TestWriteKinesis_WhenSuccess(t *testing.T) {
func TestWriteKinesis_WhenRecordErrors(t *testing.T) {
assert := assert.New(t)
errorCode := "InternalFailure"
errorMessage := "Internal Service Failure"
partitionKey := "partitionKey"
streamName := "stream"
records := []*kinesis.PutRecordsRequestEntry{
{
PartitionKey: &partitionKey,
PartitionKey: aws.String(testPartitionKey),
Data: []byte{0x66},
},
}
@ -167,26 +160,24 @@ func TestWriteKinesis_WhenRecordErrors(t *testing.T) {
1,
[]*kinesis.PutRecordsResultEntry{
{
ErrorCode: &errorCode,
ErrorMessage: &errorMessage,
SequenceNumber: nil,
ShardId: nil,
ErrorCode: aws.String("InternalFailure"),
ErrorMessage: aws.String("Internal Service Failure"),
},
},
)
k := KinesisOutput{
Log: testutil.Logger{},
StreamName: streamName,
StreamName: testStreamName,
svc: svc,
}
elapsed := k.writeKinesis(records)
assert.GreaterOrEqual(elapsed.Nanoseconds(), zero)
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: records,
},
})
@ -195,12 +186,9 @@ func TestWriteKinesis_WhenRecordErrors(t *testing.T) {
func TestWriteKinesis_WhenServiceError(t *testing.T) {
assert := assert.New(t)
partitionKey := "partitionKey"
streamName := "stream"
records := []*kinesis.PutRecordsRequestEntry{
{
PartitionKey: &partitionKey,
PartitionKey: aws.String(testPartitionKey),
Data: []byte{},
},
}
@ -212,16 +200,16 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) {
k := KinesisOutput{
Log: testutil.Logger{},
StreamName: streamName,
StreamName: testStreamName,
svc: svc,
}
elapsed := k.writeKinesis(records)
assert.GreaterOrEqual(elapsed.Nanoseconds(), zero)
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: records,
},
})
@ -246,14 +234,12 @@ func TestWrite_NoMetrics(t *testing.T) {
err := k.Write([]telegraf.Metric{})
assert.Nil(err, "Should not return error")
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{})
svc.AssertRequests(t, []*kinesis.PutRecordsInput{})
}
func TestWrite_SingleMetric(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(1, 0)
@ -262,9 +248,9 @@ func TestWrite_SingleMetric(t *testing.T) {
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
Key: testPartitionKey,
},
StreamName: streamName,
StreamName: testStreamName,
serializer: serializer,
svc: svc,
}
@ -273,12 +259,12 @@ func TestWrite_SingleMetric(t *testing.T) {
err := k.Write([]telegraf.Metric{metric})
assert.Nil(err, "Should not return error")
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: []*kinesis.PutRecordsRequestEntry{
{
PartitionKey: &partitionKey,
PartitionKey: aws.String(testPartitionKey),
Data: metricData,
},
},
@ -289,8 +275,6 @@ func TestWrite_SingleMetric(t *testing.T) {
func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(3, 0)
@ -299,9 +283,9 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
Key: testPartitionKey,
},
StreamName: streamName,
StreamName: testStreamName,
serializer: serializer,
svc: svc,
}
@ -310,12 +294,11 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
err := k.Write(metrics)
assert.Nil(err, "Should not return error")
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: createPutRecordsRequestEntries(
metricsData,
&partitionKey,
),
},
})
@ -324,8 +307,6 @@ func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
@ -334,9 +315,9 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
Key: testPartitionKey,
},
StreamName: streamName,
StreamName: testStreamName,
serializer: serializer,
svc: svc,
}
@ -345,12 +326,11 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
err := k.Write(metrics)
assert.Nil(err, "Should not return error")
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: createPutRecordsRequestEntries(
metricsData,
&partitionKey,
),
},
})
@ -359,8 +339,6 @@ func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
@ -370,9 +348,9 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
Key: testPartitionKey,
},
StreamName: streamName,
StreamName: testStreamName,
serializer: serializer,
svc: svc,
}
@ -381,19 +359,17 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
err := k.Write(metrics)
assert.Nil(err, "Should not return error")
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: createPutRecordsRequestEntries(
metricsData[0:maxRecordsPerRequest],
&partitionKey,
),
},
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: createPutRecordsRequestEntries(
metricsData[maxRecordsPerRequest:],
&partitionKey,
),
},
})
@ -402,8 +378,6 @@ func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
@ -413,9 +387,9 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
Key: testPartitionKey,
},
StreamName: streamName,
StreamName: testStreamName,
serializer: serializer,
svc: svc,
}
@ -424,19 +398,17 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
err := k.Write(metrics)
assert.Nil(err, "Should not return error")
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: createPutRecordsRequestEntries(
metricsData[0:maxRecordsPerRequest],
&partitionKey,
),
},
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: createPutRecordsRequestEntries(
metricsData[maxRecordsPerRequest:],
&partitionKey,
),
},
})
@ -445,8 +417,6 @@ func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
func TestWrite_SerializerError(t *testing.T) {
assert := assert.New(t)
serializer := influx.NewSerializer()
partitionKey := "partitionKey"
streamName := "stream"
svc := &mockKinesisPutRecords{}
svc.SetupGenericResponse(2, 0)
@ -455,9 +425,9 @@ func TestWrite_SerializerError(t *testing.T) {
Log: testutil.Logger{},
Partition: &Partition{
Method: "static",
Key: partitionKey,
Key: testPartitionKey,
},
StreamName: streamName,
StreamName: testStreamName,
serializer: serializer,
svc: svc,
}
@ -476,16 +446,16 @@ func TestWrite_SerializerError(t *testing.T) {
assert.Nil(err, "Should not return error")
// remaining valid metrics should still get written
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
svc.AssertRequests(t, []*kinesis.PutRecordsInput{
{
StreamName: &streamName,
StreamName: aws.String(testStreamName),
Records: []*kinesis.PutRecordsRequestEntry{
{
PartitionKey: &partitionKey,
PartitionKey: aws.String(testPartitionKey),
Data: metric1Data,
},
{
PartitionKey: &partitionKey,
PartitionKey: aws.String(testPartitionKey),
Data: metric2Data,
},
},
@ -512,7 +482,7 @@ func (m *mockKinesisPutRecords) SetupResponse(
m.responses = append(m.responses, &mockKinesisPutRecordsResponse{
Err: nil,
Output: &kinesis.PutRecordsOutput{
FailedRecordCount: &failedRecordCount,
FailedRecordCount: aws.Int64(failedRecordCount),
Records: records,
},
})
@ -522,24 +492,19 @@ func (m *mockKinesisPutRecords) SetupGenericResponse(
successfulRecordCount uint32,
failedRecordCount uint32,
) {
errorCode := "InternalFailure"
errorMessage := "Internal Service Failure"
shard := "shardId-000000000003"
records := []*kinesis.PutRecordsResultEntry{}
for i := uint32(0); i < successfulRecordCount; i++ {
sequenceNumber := fmt.Sprintf("%d", i)
records = append(records, &kinesis.PutRecordsResultEntry{
SequenceNumber: &sequenceNumber,
ShardId: &shard,
SequenceNumber: aws.String(testSequenceNumber),
ShardId: aws.String(testShardID),
})
}
for i := uint32(0); i < failedRecordCount; i++ {
records = append(records, &kinesis.PutRecordsResultEntry{
ErrorCode: &errorCode,
ErrorMessage: &errorMessage,
ErrorCode: aws.String("InternalFailure"),
ErrorMessage: aws.String("Internal Service Failure"),
})
}
@ -566,49 +531,49 @@ func (m *mockKinesisPutRecords) PutRecords(input *kinesis.PutRecordsInput) (*kin
}
func (m *mockKinesisPutRecords) AssertRequests(
assert *assert.Assertions,
t *testing.T,
expected []*kinesis.PutRecordsInput,
) {
assert.Equal(
require.Equalf(t,
len(expected),
len(m.requests),
fmt.Sprintf("Expected %v requests", len(expected)),
"Expected %v requests", len(expected),
)
for i, expectedInput := range expected {
actualInput := m.requests[i]
assert.Equal(
require.Equalf(t,
expectedInput.StreamName,
actualInput.StreamName,
fmt.Sprintf("Expected request %v to have correct StreamName", i),
"Expected request %v to have correct StreamName", i,
)
assert.Equal(
require.Equalf(t,
len(expectedInput.Records),
len(actualInput.Records),
fmt.Sprintf("Expected request %v to have %v Records", i, len(expectedInput.Records)),
"Expected request %v to have %v Records", i, len(expectedInput.Records),
)
for r, expectedRecord := range expectedInput.Records {
actualRecord := actualInput.Records[r]
assert.Equal(
&expectedRecord.PartitionKey,
&actualRecord.PartitionKey,
fmt.Sprintf("Expected (request %v, record %v) to have correct PartitionKey", i, r),
require.Equalf(t,
expectedRecord.PartitionKey,
actualRecord.PartitionKey,
"Expected (request %v, record %v) to have correct PartitionKey", i, r,
)
assert.Equal(
&expectedRecord.ExplicitHashKey,
&actualRecord.ExplicitHashKey,
fmt.Sprintf("Expected (request %v, record %v) to have correct ExplicitHashKey", i, r),
require.Equalf(t,
expectedRecord.ExplicitHashKey,
actualRecord.ExplicitHashKey,
"Expected (request %v, record %v) to have correct ExplicitHashKey", i, r,
)
assert.Equal(
require.Equalf(t,
expectedRecord.Data,
actualRecord.Data,
fmt.Sprintf("Expected (request %v, record %v) to have correct Data", i, r),
"Expected (request %v, record %v) to have correct Data", i, r,
)
}
}
@ -647,14 +612,13 @@ func createTestMetrics(
func createPutRecordsRequestEntries(
metricsData [][]byte,
partitionKey *string,
) []*kinesis.PutRecordsRequestEntry {
count := len(metricsData)
records := make([]*kinesis.PutRecordsRequestEntry, count)
for i := 0; i < count; i++ {
records[i] = &kinesis.PutRecordsRequestEntry{
PartitionKey: partitionKey,
PartitionKey: aws.String(testPartitionKey),
Data: metricsData[i],
}
}