Writing unit tests for Kinesis output plugin Write method (#8930)
This commit is contained in:
parent
ed468f4aa7
commit
06e97756c8
|
|
@ -13,6 +13,9 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/serializers"
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Limit set by AWS (https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html)
|
||||||
|
const maxRecordsPerRequest uint32 = 500
|
||||||
|
|
||||||
type (
|
type (
|
||||||
KinesisOutput struct {
|
KinesisOutput struct {
|
||||||
Region string `toml:"region"`
|
Region string `toml:"region"`
|
||||||
|
|
@ -243,8 +246,7 @@ func (k *KinesisOutput) Write(metrics []telegraf.Metric) error {
|
||||||
|
|
||||||
r = append(r, &d)
|
r = append(r, &d)
|
||||||
|
|
||||||
if sz == 500 {
|
if sz == maxRecordsPerRequest {
|
||||||
// Max Messages Per PutRecordRequest is 500
|
|
||||||
elapsed := k.writeKinesis(r)
|
elapsed := k.writeKinesis(r)
|
||||||
k.Log.Debugf("Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
|
k.Log.Debugf("Wrote a %d point batch to Kinesis in %+v.", sz, elapsed)
|
||||||
sz = 0
|
sz = 0
|
||||||
|
|
|
||||||
|
|
@ -8,8 +8,12 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
||||||
"github.com/gofrs/uuid"
|
"github.com/gofrs/uuid"
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
|
"github.com/influxdata/telegraf/plugins/serializers/influx"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
const zero int64 = 0
|
const zero int64 = 0
|
||||||
|
|
@ -227,6 +231,272 @@ func TestWriteKinesis_WhenServiceError(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWrite_NoMetrics(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
serializer := influx.NewSerializer()
|
||||||
|
svc := &mockKinesisPutRecords{}
|
||||||
|
|
||||||
|
k := KinesisOutput{
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
Partition: &Partition{
|
||||||
|
Method: "static",
|
||||||
|
Key: "partitionKey",
|
||||||
|
},
|
||||||
|
StreamName: "stream",
|
||||||
|
serializer: serializer,
|
||||||
|
svc: svc,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := k.Write([]telegraf.Metric{})
|
||||||
|
assert.Nil(err, "Should not return error")
|
||||||
|
|
||||||
|
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_SingleMetric(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
serializer := influx.NewSerializer()
|
||||||
|
partitionKey := "partitionKey"
|
||||||
|
streamName := "stream"
|
||||||
|
|
||||||
|
svc := &mockKinesisPutRecords{}
|
||||||
|
svc.SetupGenericResponse(1, 0)
|
||||||
|
|
||||||
|
k := KinesisOutput{
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
Partition: &Partition{
|
||||||
|
Method: "static",
|
||||||
|
Key: partitionKey,
|
||||||
|
},
|
||||||
|
StreamName: streamName,
|
||||||
|
serializer: serializer,
|
||||||
|
svc: svc,
|
||||||
|
}
|
||||||
|
|
||||||
|
metric, metricData := createTestMetric(t, "metric1", serializer)
|
||||||
|
err := k.Write([]telegraf.Metric{metric})
|
||||||
|
assert.Nil(err, "Should not return error")
|
||||||
|
|
||||||
|
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
|
||||||
|
{
|
||||||
|
StreamName: &streamName,
|
||||||
|
Records: []*kinesis.PutRecordsRequestEntry{
|
||||||
|
{
|
||||||
|
PartitionKey: &partitionKey,
|
||||||
|
Data: metricData,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_MultipleMetrics_SinglePartialRequest(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
serializer := influx.NewSerializer()
|
||||||
|
partitionKey := "partitionKey"
|
||||||
|
streamName := "stream"
|
||||||
|
|
||||||
|
svc := &mockKinesisPutRecords{}
|
||||||
|
svc.SetupGenericResponse(3, 0)
|
||||||
|
|
||||||
|
k := KinesisOutput{
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
Partition: &Partition{
|
||||||
|
Method: "static",
|
||||||
|
Key: partitionKey,
|
||||||
|
},
|
||||||
|
StreamName: streamName,
|
||||||
|
serializer: serializer,
|
||||||
|
svc: svc,
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics, metricsData := createTestMetrics(t, 3, serializer)
|
||||||
|
err := k.Write(metrics)
|
||||||
|
assert.Nil(err, "Should not return error")
|
||||||
|
|
||||||
|
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
|
||||||
|
{
|
||||||
|
StreamName: &streamName,
|
||||||
|
Records: createPutRecordsRequestEntries(
|
||||||
|
metricsData,
|
||||||
|
&partitionKey,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_MultipleMetrics_SingleFullRequest(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
serializer := influx.NewSerializer()
|
||||||
|
partitionKey := "partitionKey"
|
||||||
|
streamName := "stream"
|
||||||
|
|
||||||
|
svc := &mockKinesisPutRecords{}
|
||||||
|
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
|
||||||
|
|
||||||
|
k := KinesisOutput{
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
Partition: &Partition{
|
||||||
|
Method: "static",
|
||||||
|
Key: partitionKey,
|
||||||
|
},
|
||||||
|
StreamName: streamName,
|
||||||
|
serializer: serializer,
|
||||||
|
svc: svc,
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest, serializer)
|
||||||
|
err := k.Write(metrics)
|
||||||
|
assert.Nil(err, "Should not return error")
|
||||||
|
|
||||||
|
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
|
||||||
|
{
|
||||||
|
StreamName: &streamName,
|
||||||
|
Records: createPutRecordsRequestEntries(
|
||||||
|
metricsData,
|
||||||
|
&partitionKey,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_MultipleMetrics_MultipleRequests(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
serializer := influx.NewSerializer()
|
||||||
|
partitionKey := "partitionKey"
|
||||||
|
streamName := "stream"
|
||||||
|
|
||||||
|
svc := &mockKinesisPutRecords{}
|
||||||
|
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
|
||||||
|
svc.SetupGenericResponse(1, 0)
|
||||||
|
|
||||||
|
k := KinesisOutput{
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
Partition: &Partition{
|
||||||
|
Method: "static",
|
||||||
|
Key: partitionKey,
|
||||||
|
},
|
||||||
|
StreamName: streamName,
|
||||||
|
serializer: serializer,
|
||||||
|
svc: svc,
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest+1, serializer)
|
||||||
|
err := k.Write(metrics)
|
||||||
|
assert.Nil(err, "Should not return error")
|
||||||
|
|
||||||
|
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
|
||||||
|
{
|
||||||
|
StreamName: &streamName,
|
||||||
|
Records: createPutRecordsRequestEntries(
|
||||||
|
metricsData[0:maxRecordsPerRequest],
|
||||||
|
&partitionKey,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StreamName: &streamName,
|
||||||
|
Records: createPutRecordsRequestEntries(
|
||||||
|
metricsData[maxRecordsPerRequest:],
|
||||||
|
&partitionKey,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_MultipleMetrics_MultipleFullRequests(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
serializer := influx.NewSerializer()
|
||||||
|
partitionKey := "partitionKey"
|
||||||
|
streamName := "stream"
|
||||||
|
|
||||||
|
svc := &mockKinesisPutRecords{}
|
||||||
|
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
|
||||||
|
svc.SetupGenericResponse(maxRecordsPerRequest, 0)
|
||||||
|
|
||||||
|
k := KinesisOutput{
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
Partition: &Partition{
|
||||||
|
Method: "static",
|
||||||
|
Key: partitionKey,
|
||||||
|
},
|
||||||
|
StreamName: streamName,
|
||||||
|
serializer: serializer,
|
||||||
|
svc: svc,
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics, metricsData := createTestMetrics(t, maxRecordsPerRequest*2, serializer)
|
||||||
|
err := k.Write(metrics)
|
||||||
|
assert.Nil(err, "Should not return error")
|
||||||
|
|
||||||
|
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
|
||||||
|
{
|
||||||
|
StreamName: &streamName,
|
||||||
|
Records: createPutRecordsRequestEntries(
|
||||||
|
metricsData[0:maxRecordsPerRequest],
|
||||||
|
&partitionKey,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
StreamName: &streamName,
|
||||||
|
Records: createPutRecordsRequestEntries(
|
||||||
|
metricsData[maxRecordsPerRequest:],
|
||||||
|
&partitionKey,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWrite_SerializerError(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
serializer := influx.NewSerializer()
|
||||||
|
partitionKey := "partitionKey"
|
||||||
|
streamName := "stream"
|
||||||
|
|
||||||
|
svc := &mockKinesisPutRecords{}
|
||||||
|
svc.SetupGenericResponse(2, 0)
|
||||||
|
|
||||||
|
k := KinesisOutput{
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
Partition: &Partition{
|
||||||
|
Method: "static",
|
||||||
|
Key: partitionKey,
|
||||||
|
},
|
||||||
|
StreamName: streamName,
|
||||||
|
serializer: serializer,
|
||||||
|
svc: svc,
|
||||||
|
}
|
||||||
|
|
||||||
|
metric1, metric1Data := createTestMetric(t, "metric1", serializer)
|
||||||
|
metric2, metric2Data := createTestMetric(t, "metric2", serializer)
|
||||||
|
|
||||||
|
// metric is invalid because of empty name
|
||||||
|
invalidMetric := testutil.TestMetric(3, "")
|
||||||
|
|
||||||
|
err := k.Write([]telegraf.Metric{
|
||||||
|
metric1,
|
||||||
|
invalidMetric,
|
||||||
|
metric2,
|
||||||
|
})
|
||||||
|
assert.Nil(err, "Should not return error")
|
||||||
|
|
||||||
|
// remaining valid metrics should still get written
|
||||||
|
svc.AssertRequests(assert, []*kinesis.PutRecordsInput{
|
||||||
|
{
|
||||||
|
StreamName: &streamName,
|
||||||
|
Records: []*kinesis.PutRecordsRequestEntry{
|
||||||
|
{
|
||||||
|
PartitionKey: &partitionKey,
|
||||||
|
Data: metric1Data,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
PartitionKey: &partitionKey,
|
||||||
|
Data: metric2Data,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
type mockKinesisPutRecordsResponse struct {
|
type mockKinesisPutRecordsResponse struct {
|
||||||
Output *kinesis.PutRecordsOutput
|
Output *kinesis.PutRecordsOutput
|
||||||
Err error
|
Err error
|
||||||
|
|
@ -253,6 +523,35 @@ func (m *mockKinesisPutRecords) SetupResponse(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockKinesisPutRecords) SetupGenericResponse(
|
||||||
|
successfulRecordCount uint32,
|
||||||
|
failedRecordCount uint32,
|
||||||
|
) {
|
||||||
|
|
||||||
|
errorCode := "InternalFailure"
|
||||||
|
errorMessage := "Internal Service Failure"
|
||||||
|
shard := "shardId-000000000003"
|
||||||
|
|
||||||
|
records := []*kinesis.PutRecordsResultEntry{}
|
||||||
|
|
||||||
|
for i := uint32(0); i < successfulRecordCount; i++ {
|
||||||
|
sequenceNumber := fmt.Sprintf("%d", i)
|
||||||
|
records = append(records, &kinesis.PutRecordsResultEntry{
|
||||||
|
SequenceNumber: &sequenceNumber,
|
||||||
|
ShardId: &shard,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := uint32(0); i < failedRecordCount; i++ {
|
||||||
|
records = append(records, &kinesis.PutRecordsResultEntry{
|
||||||
|
ErrorCode: &errorCode,
|
||||||
|
ErrorMessage: &errorMessage,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
m.SetupResponse(int64(failedRecordCount), records)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockKinesisPutRecords) SetupErrorResponse(err error) {
|
func (m *mockKinesisPutRecords) SetupErrorResponse(err error) {
|
||||||
|
|
||||||
m.responses = append(m.responses, &mockKinesisPutRecordsResponse{
|
m.responses = append(m.responses, &mockKinesisPutRecordsResponse{
|
||||||
|
|
@ -323,3 +622,54 @@ func (m *mockKinesisPutRecords) AssertRequests(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createTestMetric(
|
||||||
|
t *testing.T,
|
||||||
|
name string,
|
||||||
|
serializer serializers.Serializer,
|
||||||
|
) (telegraf.Metric, []byte) {
|
||||||
|
|
||||||
|
metric := testutil.TestMetric(1, name)
|
||||||
|
|
||||||
|
data, err := serializer.Serialize(metric)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return metric, data
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTestMetrics(
|
||||||
|
t *testing.T,
|
||||||
|
count uint32,
|
||||||
|
serializer serializers.Serializer,
|
||||||
|
) ([]telegraf.Metric, [][]byte) {
|
||||||
|
|
||||||
|
metrics := make([]telegraf.Metric, count)
|
||||||
|
metricsData := make([][]byte, count)
|
||||||
|
|
||||||
|
for i := uint32(0); i < count; i++ {
|
||||||
|
name := fmt.Sprintf("metric%d", i)
|
||||||
|
metric, data := createTestMetric(t, name, serializer)
|
||||||
|
metrics[i] = metric
|
||||||
|
metricsData[i] = data
|
||||||
|
}
|
||||||
|
|
||||||
|
return metrics, metricsData
|
||||||
|
}
|
||||||
|
|
||||||
|
func createPutRecordsRequestEntries(
|
||||||
|
metricsData [][]byte,
|
||||||
|
partitionKey *string,
|
||||||
|
) []*kinesis.PutRecordsRequestEntry {
|
||||||
|
|
||||||
|
count := len(metricsData)
|
||||||
|
records := make([]*kinesis.PutRecordsRequestEntry, count)
|
||||||
|
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
records[i] = &kinesis.PutRecordsRequestEntry{
|
||||||
|
PartitionKey: partitionKey,
|
||||||
|
Data: metricsData[i],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return records
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue