fix: Fix batching logic with write records, introduce concurrent requests (#8947)

This commit is contained in:
Nirmesh 2022-01-06 14:28:23 -08:00 committed by GitHub
parent 3bbd6be2fa
commit ad1694b1d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 312 additions and 156 deletions

View File

@ -2,12 +2,11 @@ package timestream
import ( import (
"context" "context"
"encoding/binary"
"errors" "errors"
"fmt" "fmt"
"hash/fnv"
"reflect" "reflect"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws"
@ -33,6 +32,7 @@ type (
CreateTableMagneticStoreRetentionPeriodInDays int64 `toml:"create_table_magnetic_store_retention_period_in_days"` CreateTableMagneticStoreRetentionPeriodInDays int64 `toml:"create_table_magnetic_store_retention_period_in_days"`
CreateTableMemoryStoreRetentionPeriodInHours int64 `toml:"create_table_memory_store_retention_period_in_hours"` CreateTableMemoryStoreRetentionPeriodInHours int64 `toml:"create_table_memory_store_retention_period_in_hours"`
CreateTableTags map[string]string `toml:"create_table_tags"` CreateTableTags map[string]string `toml:"create_table_tags"`
MaxWriteGoRoutinesCount int `toml:"max_write_go_routines"`
Log telegraf.Logger Log telegraf.Logger
svc WriteClient svc WriteClient
@ -57,6 +57,10 @@ const (
// MaxRecordsPerCall reflects Timestream limit of WriteRecords API call // MaxRecordsPerCall reflects Timestream limit of WriteRecords API call
const MaxRecordsPerCall = 100 const MaxRecordsPerCall = 100
// Default value for maximum number of parallel go routines to ingest/write data
// when max_write_go_routines is not specified in the config
const MaxWriteRoutinesDefault = 1
var sampleConfig = ` var sampleConfig = `
## Amazon Region ## Amazon Region
region = "us-east-1" region = "us-east-1"
@ -169,6 +173,10 @@ var sampleConfig = `
## Specifies the Timestream table tags. ## Specifies the Timestream table tags.
## Check Timestream documentation for more details ## Check Timestream documentation for more details
# create_table_tags = { "foo" = "bar", "environment" = "dev"} # create_table_tags = { "foo" = "bar", "environment" = "dev"}
## Specify the maximum number of parallel go routines to ingest/write data
## If not specified, defaulted to 1 go routines
max_write_go_routines = 25
` `
// WriteFactory function provides a way to mock the client instantiation for testing purposes. // WriteFactory function provides a way to mock the client instantiation for testing purposes.
@ -225,6 +233,10 @@ func (t *Timestream) Connect() error {
} }
} }
if t.MaxWriteGoRoutinesCount <= 0 {
t.MaxWriteGoRoutinesCount = MaxWriteRoutinesDefault
}
t.Log.Infof("Constructing Timestream client for '%s' mode", t.MappingMode) t.Log.Infof("Constructing Timestream client for '%s' mode", t.MappingMode)
svc, err := WriteFactory(&t.CredentialConfig) svc, err := WriteFactory(&t.CredentialConfig)
@ -270,11 +282,55 @@ func init() {
func (t *Timestream) Write(metrics []telegraf.Metric) error { func (t *Timestream) Write(metrics []telegraf.Metric) error {
writeRecordsInputs := t.TransformMetrics(metrics) writeRecordsInputs := t.TransformMetrics(metrics)
for _, writeRecordsInput := range writeRecordsInputs {
if err := t.writeToTimestream(writeRecordsInput, true); err != nil { maxWriteJobs := t.MaxWriteGoRoutinesCount
numberOfWriteRecordsInputs := len(writeRecordsInputs)
if numberOfWriteRecordsInputs < maxWriteJobs {
maxWriteJobs = numberOfWriteRecordsInputs
}
var wg sync.WaitGroup
errs := make(chan error, numberOfWriteRecordsInputs)
writeJobs := make(chan *timestreamwrite.WriteRecordsInput, maxWriteJobs)
start := time.Now()
for i := 0; i < maxWriteJobs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for writeJob := range writeJobs {
if err := t.writeToTimestream(writeJob, true); err != nil {
errs <- err
}
}
}()
}
for i := range writeRecordsInputs {
writeJobs <- writeRecordsInputs[i]
}
// Close channel once all jobs are added
close(writeJobs)
wg.Wait()
elapsed := time.Since(start)
close(errs)
t.Log.Infof("##WriteToTimestream - Metrics size: %d request size: %d time(ms): %d",
len(metrics), len(writeRecordsInputs), elapsed.Milliseconds())
// On partial failures, Telegraf will reject the entire batch of metrics and
// retry. writeToTimestream will return retryable exceptions only.
for err := range errs {
if err != nil {
return err return err
} }
} }
return nil return nil
} }
@ -378,35 +434,33 @@ func (t *Timestream) createTable(tableName *string) error {
// Telegraf Metrics are grouped by Name, Tag Keys and Time to use Timestream CommonAttributes. // Telegraf Metrics are grouped by Name, Tag Keys and Time to use Timestream CommonAttributes.
// Returns collection of write requests to be performed to Timestream. // Returns collection of write requests to be performed to Timestream.
func (t *Timestream) TransformMetrics(metrics []telegraf.Metric) []*timestreamwrite.WriteRecordsInput { func (t *Timestream) TransformMetrics(metrics []telegraf.Metric) []*timestreamwrite.WriteRecordsInput {
writeRequests := make(map[uint64]*timestreamwrite.WriteRecordsInput, len(metrics)) writeRequests := make(map[string]*timestreamwrite.WriteRecordsInput, len(metrics))
for _, m := range metrics { for _, m := range metrics {
// build MeasureName, MeasureValue, MeasureValueType // build MeasureName, MeasureValue, MeasureValueType
records := t.buildWriteRecords(m) records := t.buildWriteRecords(m)
if len(records) == 0 { if len(records) == 0 {
continue continue
} }
id := hashFromMetricTimeNameTagKeys(m)
if curr, ok := writeRequests[id]; !ok { var tableName string
// No current CommonAttributes/WriteRecordsInput found for current Telegraf Metric
dimensions := t.buildDimensions(m) if t.MappingMode == MappingModeSingleTable {
timeUnit, timeValue := getTimestreamTime(m.Time()) tableName = t.SingleTableName
}
if t.MappingMode == MappingModeMultiTable {
tableName = m.Name()
}
if curr, ok := writeRequests[tableName]; !ok {
newWriteRecord := &timestreamwrite.WriteRecordsInput{ newWriteRecord := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(t.DatabaseName), DatabaseName: aws.String(t.DatabaseName),
Records: records, TableName: aws.String(tableName),
CommonAttributes: &types.Record{ Records: records,
Dimensions: dimensions, CommonAttributes: &types.Record{},
Time: aws.String(timeValue),
TimeUnit: timeUnit,
},
}
if t.MappingMode == MappingModeSingleTable {
newWriteRecord.TableName = &t.SingleTableName
}
if t.MappingMode == MappingModeMultiTable {
newWriteRecord.TableName = aws.String(m.Name())
} }
writeRequests[id] = newWriteRecord writeRequests[tableName] = newWriteRecord
} else { } else {
curr.Records = append(curr.Records, records...) curr.Records = append(curr.Records, records...)
} }
@ -432,27 +486,6 @@ func (t *Timestream) TransformMetrics(metrics []telegraf.Metric) []*timestreamwr
return result return result
} }
func hashFromMetricTimeNameTagKeys(m telegraf.Metric) uint64 {
h := fnv.New64a()
h.Write([]byte(m.Name())) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
for _, tag := range m.TagList() {
if tag.Key == "" {
continue
}
h.Write([]byte(tag.Key)) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte(tag.Value)) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
}
b := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(b, uint64(m.Time().UnixNano()))
h.Write(b[:n]) //nolint:revive // from hash.go: "It never returns an error"
h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error"
return h.Sum64()
}
func (t *Timestream) buildDimensions(point telegraf.Metric) []types.Dimension { func (t *Timestream) buildDimensions(point telegraf.Metric) []types.Dimension {
var dimensions []types.Dimension var dimensions []types.Dimension
for tagName, tagValue := range point.Tags() { for tagName, tagValue := range point.Tags() {
@ -478,6 +511,9 @@ func (t *Timestream) buildDimensions(point telegraf.Metric) []types.Dimension {
// It returns an array of Timestream write records. // It returns an array of Timestream write records.
func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record { func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record {
var records []types.Record var records []types.Record
dimensions := t.buildDimensions(point)
for fieldName, fieldValue := range point.Fields() { for fieldName, fieldValue := range point.Fields() {
stringFieldValue, stringFieldValueType, ok := convertValue(fieldValue) stringFieldValue, stringFieldValueType, ok := convertValue(fieldValue)
if !ok { if !ok {
@ -486,10 +522,16 @@ func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record {
fieldName, reflect.TypeOf(fieldValue)) fieldName, reflect.TypeOf(fieldValue))
continue continue
} }
timeUnit, timeValue := getTimestreamTime(point.Time())
record := types.Record{ record := types.Record{
MeasureName: aws.String(fieldName), MeasureName: aws.String(fieldName),
MeasureValueType: stringFieldValueType, MeasureValueType: stringFieldValueType,
MeasureValue: aws.String(stringFieldValue), MeasureValue: aws.String(stringFieldValue),
Dimensions: dimensions,
Time: aws.String(timeValue),
TimeUnit: timeUnit,
} }
records = append(records, record) records = append(records, record)
} }

View File

@ -28,6 +28,7 @@ const testSingleTableDim = "namespace"
var time1 = time.Date(2009, time.November, 10, 22, 0, 0, 0, time.UTC) var time1 = time.Date(2009, time.November, 10, 22, 0, 0, 0, time.UTC)
const time1Epoch = "1257890400" const time1Epoch = "1257890400"
const timeUnit = types.TimeUnitSeconds
var time2 = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) var time2 = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
@ -250,37 +251,58 @@ func TestTransformMetricsSkipEmptyMetric(t *testing.T) {
time1, time1,
) )
expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ records := buildRecords([]SimpleInput{
t: time1Epoch, {
tableName: testSingleTableName, t: time1Epoch,
dimensions: map[string]string{"tag2": "value2", testSingleTableDim: metricName1}, tableName: metricName1,
measureValues: map[string]string{"value": "10"}, dimensions: map[string]string{"tag2": "value2", testSingleTableDim: metricName1},
}) measureValues: map[string]string{"value": "10"},
expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ },
t: time1Epoch,
tableName: testSingleTableName, {
dimensions: map[string]string{testSingleTableDim: metricName1}, t: time1Epoch,
measureValues: map[string]string{"value": "20"}, tableName: metricName1,
dimensions: map[string]string{testSingleTableDim: metricName1},
measureValues: map[string]string{"value": "20"},
},
}) })
expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName),
TableName: aws.String(testSingleTableName),
Records: records,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable, comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2, input3}, []telegraf.Metric{input1, input2, input3},
[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ recordsMulti := buildRecords([]SimpleInput{
t: time1Epoch, {
tableName: metricName1, t: time1Epoch,
dimensions: map[string]string{"tag2": "value2"}, tableName: metricName1,
measureValues: map[string]string{"value": "10"}, dimensions: map[string]string{"tag2": "value2"},
}) measureValues: map[string]string{"value": "10"},
expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ },
t: time1Epoch, {
tableName: metricName1, t: time1Epoch,
dimensions: map[string]string{}, tableName: metricName1,
measureValues: map[string]string{"value": "20"}, dimensions: map[string]string{},
measureValues: map[string]string{"value": "20"},
},
}) })
expectedResultMultiTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName),
TableName: aws.String(metricName1),
Records: recordsMulti,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeMultiTable, comparisonTest(t, MappingModeMultiTable,
[]telegraf.Metric{input1, input2, input3}, []telegraf.Metric{input1, input2, input3},
[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable}) []*timestreamwrite.WriteRecordsInput{expectedResultMultiTable})
} }
func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) { func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) {
@ -305,13 +327,13 @@ func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) {
resultFields[fieldName] = "10" resultFields[fieldName] = "10"
} }
expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ expectedResult1SingleTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: testSingleTableName, tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: resultFields, measureValues: resultFields,
}) })
expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ expectedResult2SingleTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: testSingleTableName, tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
@ -321,13 +343,13 @@ func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) {
inputs, inputs,
[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) []*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable})
expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ expectedResult1MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: metricName1, tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"}, dimensions: map[string]string{"tag1": "value1"},
measureValues: resultFields, measureValues: resultFields,
}) })
expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ expectedResult2MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: metricName1, tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"}, dimensions: map[string]string{"tag1": "value1"},
@ -339,6 +361,7 @@ func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) {
} }
func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *testing.T) { func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *testing.T) {
input1 := testutil.MustMetric( input1 := testutil.MustMetric(
metricName1, metricName1,
map[string]string{"tag1": "value1"}, map[string]string{"tag1": "value1"},
@ -347,8 +370,9 @@ func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *
}, },
time1, time1,
) )
input2 := testutil.MustMetric( input2 := testutil.MustMetric(
metricName1, metricName2,
map[string]string{"tag2": "value2"}, map[string]string{"tag2": "value2"},
map[string]interface{}{ map[string]interface{}{
"value_supported3": float64(30), "value_supported3": float64(30),
@ -356,32 +380,42 @@ func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *
time1, time1,
) )
expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ recordsSingle := buildRecords([]SimpleInput{
t: time1Epoch, {
tableName: testSingleTableName, t: time1Epoch,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, tableName: testSingleTableName,
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
}) measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ },
t: time1Epoch, {
tableName: testSingleTableName, t: time1Epoch,
dimensions: map[string]string{"tag2": "value2", testSingleTableDim: metricName1}, tableName: testSingleTableName,
measureValues: map[string]string{"value_supported3": "30"}, dimensions: map[string]string{"tag2": "value2", testSingleTableDim: metricName2},
measureValues: map[string]string{"value_supported3": "30"},
},
}) })
expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName),
TableName: aws.String(testSingleTableName),
Records: recordsSingle,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable, comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2}, []telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ expectedResult1MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: metricName1, tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"}, dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
}) })
expectedResult2MultiTable := buildExpectedRecords(SimpleInput{
expectedResult2MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: metricName1, tableName: metricName2,
dimensions: map[string]string{"tag2": "value2"}, dimensions: map[string]string{"tag2": "value2"},
measureValues: map[string]string{"value_supported3": "30"}, measureValues: map[string]string{"value_supported3": "30"},
}) })
@ -401,7 +435,7 @@ func TestTransformMetricsSameDimensionsDifferentDimensionValuesAreWrittenSeparat
time1, time1,
) )
input2 := testutil.MustMetric( input2 := testutil.MustMetric(
metricName1, metricName2,
map[string]string{"tag1": "value2"}, map[string]string{"tag1": "value2"},
map[string]interface{}{ map[string]interface{}{
"value_supported1": float64(20), "value_supported1": float64(20),
@ -409,32 +443,41 @@ func TestTransformMetricsSameDimensionsDifferentDimensionValuesAreWrittenSeparat
time1, time1,
) )
expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ recordsSingle := buildRecords([]SimpleInput{
t: time1Epoch, {
tableName: testSingleTableName, t: time1Epoch,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, tableName: testSingleTableName,
measureValues: map[string]string{"value_supported1": "10"}, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
}) measureValues: map[string]string{"value_supported1": "10"},
expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ },
t: time1Epoch, {
tableName: testSingleTableName, t: time1Epoch,
dimensions: map[string]string{"tag1": "value2", testSingleTableDim: metricName1}, tableName: testSingleTableName,
measureValues: map[string]string{"value_supported1": "20"}, dimensions: map[string]string{"tag1": "value2", testSingleTableDim: metricName2},
measureValues: map[string]string{"value_supported1": "20"},
},
}) })
expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName),
TableName: aws.String(testSingleTableName),
Records: recordsSingle,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable, comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2}, []telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ expectedResult1MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: metricName1, tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"}, dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported1": "10"}, measureValues: map[string]string{"value_supported1": "10"},
}) })
expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ expectedResult2MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: metricName1, tableName: metricName2,
dimensions: map[string]string{"tag1": "value2"}, dimensions: map[string]string{"tag1": "value2"},
measureValues: map[string]string{"value_supported1": "20"}, measureValues: map[string]string{"value_supported1": "20"},
}) })
@ -462,39 +505,57 @@ func TestTransformMetricsSameDimensionsDifferentTimestampsAreWrittenSeparate(t *
time2, time2,
) )
expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ recordsSingle := buildRecords([]SimpleInput{
t: time1Epoch, {
tableName: testSingleTableName, t: time1Epoch,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, tableName: testSingleTableName,
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
}) measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ },
t: time2Epoch, {
tableName: testSingleTableName, t: time2Epoch,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, tableName: testSingleTableName,
measureValues: map[string]string{"value_supported3": "30"}, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported3": "30"},
},
}) })
expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName),
TableName: aws.String(testSingleTableName),
Records: recordsSingle,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable, comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2}, []telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ recordsMultiTable := buildRecords([]SimpleInput{
t: time1Epoch, {
tableName: metricName1, t: time1Epoch,
dimensions: map[string]string{"tag1": "value1"}, tableName: metricName1,
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, dimensions: map[string]string{"tag1": "value1"},
}) measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ },
t: time2Epoch, {
tableName: metricName1, t: time2Epoch,
dimensions: map[string]string{"tag1": "value1"}, tableName: metricName1,
measureValues: map[string]string{"value_supported3": "30"}, dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported3": "30"},
},
}) })
expectedResultMultiTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName),
TableName: aws.String(metricName1),
Records: recordsMultiTable,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeMultiTable, comparisonTest(t, MappingModeMultiTable,
[]telegraf.Metric{input1, input2}, []telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable}) []*timestreamwrite.WriteRecordsInput{expectedResultMultiTable})
} }
func TestTransformMetricsSameDimensionsSameTimestampsAreWrittenTogether(t *testing.T) { func TestTransformMetricsSameDimensionsSameTimestampsAreWrittenTogether(t *testing.T) {
@ -515,7 +576,7 @@ func TestTransformMetricsSameDimensionsSameTimestampsAreWrittenTogether(t *testi
time1, time1,
) )
expectedResultSingleTable := buildExpectedRecords(SimpleInput{ expectedResultSingleTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: testSingleTableName, tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
@ -526,7 +587,7 @@ func TestTransformMetricsSameDimensionsSameTimestampsAreWrittenTogether(t *testi
[]telegraf.Metric{input1, input2}, []telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable}) []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResultMultiTable := buildExpectedRecords(SimpleInput{ expectedResultMultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: metricName1, tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"}, dimensions: map[string]string{"tag1": "value1"},
@ -556,30 +617,39 @@ func TestTransformMetricsDifferentMetricsAreWrittenToDifferentTablesInMultiTable
time1, time1,
) )
expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ recordsSingle := buildRecords([]SimpleInput{
t: time1Epoch, {
tableName: testSingleTableName, t: time1Epoch,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, tableName: testSingleTableName,
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
}) measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ },
t: time1Epoch, {
tableName: testSingleTableName, t: time1Epoch,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName2}, tableName: testSingleTableName,
measureValues: map[string]string{"value_supported3": "30"}, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName2},
measureValues: map[string]string{"value_supported3": "30"},
},
}) })
expectedResultSingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName),
TableName: aws.String(testSingleTableName),
Records: recordsSingle,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable, comparisonTest(t, MappingModeSingleTable,
[]telegraf.Metric{input1, input2}, []telegraf.Metric{input1, input2},
[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ expectedResult1MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: metricName1, tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"}, dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"},
}) })
expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ expectedResult2MultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: metricName2, tableName: metricName2,
dimensions: map[string]string{"tag1": "value1"}, dimensions: map[string]string{"tag1": "value1"},
@ -600,7 +670,7 @@ func TestTransformMetricsUnsupportedFieldsAreSkipped(t *testing.T) {
}, },
time1, time1,
) )
expectedResultSingleTable := buildExpectedRecords(SimpleInput{ expectedResultSingleTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: testSingleTableName, tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
@ -611,7 +681,7 @@ func TestTransformMetricsUnsupportedFieldsAreSkipped(t *testing.T) {
[]telegraf.Metric{metricWithUnsupportedField}, []telegraf.Metric{metricWithUnsupportedField},
[]*timestreamwrite.WriteRecordsInput{expectedResultSingleTable}) []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable})
expectedResultMultiTable := buildExpectedRecords(SimpleInput{ expectedResultMultiTable := buildExpectedInput(SimpleInput{
t: time1Epoch, t: time1Epoch,
tableName: metricName1, tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"}, dimensions: map[string]string{"tag1": "value1"},
@ -646,6 +716,15 @@ func comparisonTest(t *testing.T,
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
} }
comparison(t, plugin, mappingMode, telegrafMetrics, timestreamRecords)
}
func comparison(t *testing.T,
plugin Timestream,
mappingMode string,
telegrafMetrics []telegraf.Metric,
timestreamRecords []*timestreamwrite.WriteRecordsInput) {
result := plugin.TransformMetrics(telegrafMetrics) result := plugin.TransformMetrics(telegrafMetrics)
require.Equal(t, len(timestreamRecords), len(result), "The number of transformed records was expected to be different") require.Equal(t, len(timestreamRecords), len(result), "The number of transformed records was expected to be different")
@ -698,7 +777,7 @@ type SimpleInput struct {
measureValues map[string]string measureValues map[string]string
} }
func buildExpectedRecords(i SimpleInput) *timestreamwrite.WriteRecordsInput { func buildExpectedInput(i SimpleInput) *timestreamwrite.WriteRecordsInput {
var tsDimensions []types.Dimension var tsDimensions []types.Dimension
for k, v := range i.dimensions { for k, v := range i.dimensions {
tsDimensions = append(tsDimensions, types.Dimension{ tsDimensions = append(tsDimensions, types.Dimension{
@ -713,19 +792,54 @@ func buildExpectedRecords(i SimpleInput) *timestreamwrite.WriteRecordsInput {
MeasureName: aws.String(k), MeasureName: aws.String(k),
MeasureValue: aws.String(v), MeasureValue: aws.String(v),
MeasureValueType: types.MeasureValueTypeDouble, MeasureValueType: types.MeasureValueTypeDouble,
Dimensions: tsDimensions,
Time: aws.String(i.t),
TimeUnit: timeUnit,
}) })
} }
result := &timestreamwrite.WriteRecordsInput{ result := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName), DatabaseName: aws.String(tsDbName),
TableName: aws.String(i.tableName), TableName: aws.String(i.tableName),
Records: tsRecords, Records: tsRecords,
CommonAttributes: &types.Record{ CommonAttributes: &types.Record{},
Dimensions: tsDimensions,
Time: aws.String(i.t),
TimeUnit: types.TimeUnitSeconds,
},
} }
return result return result
} }
func buildRecords(inputs []SimpleInput) []types.Record {
var tsRecords []types.Record
for _, inp := range inputs {
tsRecords = append(tsRecords, buildRecord(inp)...)
}
return tsRecords
}
func buildRecord(input SimpleInput) []types.Record {
var tsRecords []types.Record
var tsDimensions []types.Dimension
for k, v := range input.dimensions {
tsDimensions = append(tsDimensions, types.Dimension{
Name: aws.String(k),
Value: aws.String(v),
})
}
for k, v := range input.measureValues {
tsRecords = append(tsRecords, types.Record{
MeasureName: aws.String(k),
MeasureValue: aws.String(v),
MeasureValueType: types.MeasureValueTypeDouble,
Dimensions: tsDimensions,
Time: aws.String(input.t),
TimeUnit: timeUnit,
})
}
return tsRecords
}