diff --git a/plugins/outputs/timestream/timestream.go b/plugins/outputs/timestream/timestream.go index 91d73de38..3f87bb1ee 100644 --- a/plugins/outputs/timestream/timestream.go +++ b/plugins/outputs/timestream/timestream.go @@ -2,12 +2,11 @@ package timestream import ( "context" - "encoding/binary" "errors" "fmt" - "hash/fnv" "reflect" "strconv" + "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -33,6 +32,7 @@ type ( CreateTableMagneticStoreRetentionPeriodInDays int64 `toml:"create_table_magnetic_store_retention_period_in_days"` CreateTableMemoryStoreRetentionPeriodInHours int64 `toml:"create_table_memory_store_retention_period_in_hours"` CreateTableTags map[string]string `toml:"create_table_tags"` + MaxWriteGoRoutinesCount int `toml:"max_write_go_routines"` Log telegraf.Logger svc WriteClient @@ -57,6 +57,10 @@ const ( // MaxRecordsPerCall reflects Timestream limit of WriteRecords API call const MaxRecordsPerCall = 100 +// Default value for maximum number of parallel go routines to ingest/write data +// when max_write_go_routines is not specified in the config +const MaxWriteRoutinesDefault = 1 + var sampleConfig = ` ## Amazon Region region = "us-east-1" @@ -169,6 +173,10 @@ var sampleConfig = ` ## Specifies the Timestream table tags. ## Check Timestream documentation for more details # create_table_tags = { "foo" = "bar", "environment" = "dev"} + + ## Specify the maximum number of parallel go routines to ingest/write data + ## If not specified, defaulted to 1 go routines + max_write_go_routines = 25 ` // WriteFactory function provides a way to mock the client instantiation for testing purposes. @@ -225,6 +233,10 @@ func (t *Timestream) Connect() error { } } + if t.MaxWriteGoRoutinesCount <= 0 { + t.MaxWriteGoRoutinesCount = MaxWriteRoutinesDefault + } + t.Log.Infof("Constructing Timestream client for '%s' mode", t.MappingMode) svc, err := WriteFactory(&t.CredentialConfig) @@ -270,11 +282,55 @@ func init() { func (t *Timestream) Write(metrics []telegraf.Metric) error { writeRecordsInputs := t.TransformMetrics(metrics) - for _, writeRecordsInput := range writeRecordsInputs { - if err := t.writeToTimestream(writeRecordsInput, true); err != nil { + + maxWriteJobs := t.MaxWriteGoRoutinesCount + numberOfWriteRecordsInputs := len(writeRecordsInputs) + + if numberOfWriteRecordsInputs < maxWriteJobs { + maxWriteJobs = numberOfWriteRecordsInputs + } + + var wg sync.WaitGroup + errs := make(chan error, numberOfWriteRecordsInputs) + writeJobs := make(chan *timestreamwrite.WriteRecordsInput, maxWriteJobs) + + start := time.Now() + + for i := 0; i < maxWriteJobs; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for writeJob := range writeJobs { + if err := t.writeToTimestream(writeJob, true); err != nil { + errs <- err + } + } + }() + } + + for i := range writeRecordsInputs { + writeJobs <- writeRecordsInputs[i] + } + + // Close channel once all jobs are added + close(writeJobs) + + wg.Wait() + elapsed := time.Since(start) + + close(errs) + + t.Log.Infof("##WriteToTimestream - Metrics size: %d request size: %d time(ms): %d", + len(metrics), len(writeRecordsInputs), elapsed.Milliseconds()) + + // On partial failures, Telegraf will reject the entire batch of metrics and + // retry. writeToTimestream will return retryable exceptions only. + for err := range errs { + if err != nil { return err } } + return nil } @@ -378,35 +434,33 @@ func (t *Timestream) createTable(tableName *string) error { // Telegraf Metrics are grouped by Name, Tag Keys and Time to use Timestream CommonAttributes. // Returns collection of write requests to be performed to Timestream. func (t *Timestream) TransformMetrics(metrics []telegraf.Metric) []*timestreamwrite.WriteRecordsInput { - writeRequests := make(map[uint64]*timestreamwrite.WriteRecordsInput, len(metrics)) + writeRequests := make(map[string]*timestreamwrite.WriteRecordsInput, len(metrics)) for _, m := range metrics { // build MeasureName, MeasureValue, MeasureValueType records := t.buildWriteRecords(m) if len(records) == 0 { continue } - id := hashFromMetricTimeNameTagKeys(m) - if curr, ok := writeRequests[id]; !ok { - // No current CommonAttributes/WriteRecordsInput found for current Telegraf Metric - dimensions := t.buildDimensions(m) - timeUnit, timeValue := getTimestreamTime(m.Time()) + + var tableName string + + if t.MappingMode == MappingModeSingleTable { + tableName = t.SingleTableName + } + + if t.MappingMode == MappingModeMultiTable { + tableName = m.Name() + } + + if curr, ok := writeRequests[tableName]; !ok { newWriteRecord := ×treamwrite.WriteRecordsInput{ - DatabaseName: aws.String(t.DatabaseName), - Records: records, - CommonAttributes: &types.Record{ - Dimensions: dimensions, - Time: aws.String(timeValue), - TimeUnit: timeUnit, - }, - } - if t.MappingMode == MappingModeSingleTable { - newWriteRecord.TableName = &t.SingleTableName - } - if t.MappingMode == MappingModeMultiTable { - newWriteRecord.TableName = aws.String(m.Name()) + DatabaseName: aws.String(t.DatabaseName), + TableName: aws.String(tableName), + Records: records, + CommonAttributes: &types.Record{}, } - writeRequests[id] = newWriteRecord + writeRequests[tableName] = newWriteRecord } else { curr.Records = append(curr.Records, records...) } @@ -432,27 +486,6 @@ func (t *Timestream) TransformMetrics(metrics []telegraf.Metric) []*timestreamwr return result } -func hashFromMetricTimeNameTagKeys(m telegraf.Metric) uint64 { - h := fnv.New64a() - h.Write([]byte(m.Name())) //nolint:revive // from hash.go: "It never returns an error" - h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" - for _, tag := range m.TagList() { - if tag.Key == "" { - continue - } - - h.Write([]byte(tag.Key)) //nolint:revive // from hash.go: "It never returns an error" - h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" - h.Write([]byte(tag.Value)) //nolint:revive // from hash.go: "It never returns an error" - h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" - } - b := make([]byte, binary.MaxVarintLen64) - n := binary.PutUvarint(b, uint64(m.Time().UnixNano())) - h.Write(b[:n]) //nolint:revive // from hash.go: "It never returns an error" - h.Write([]byte("\n")) //nolint:revive // from hash.go: "It never returns an error" - return h.Sum64() -} - func (t *Timestream) buildDimensions(point telegraf.Metric) []types.Dimension { var dimensions []types.Dimension for tagName, tagValue := range point.Tags() { @@ -478,6 +511,9 @@ func (t *Timestream) buildDimensions(point telegraf.Metric) []types.Dimension { // It returns an array of Timestream write records. func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record { var records []types.Record + + dimensions := t.buildDimensions(point) + for fieldName, fieldValue := range point.Fields() { stringFieldValue, stringFieldValueType, ok := convertValue(fieldValue) if !ok { @@ -486,10 +522,16 @@ func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record { fieldName, reflect.TypeOf(fieldValue)) continue } + + timeUnit, timeValue := getTimestreamTime(point.Time()) + record := types.Record{ MeasureName: aws.String(fieldName), MeasureValueType: stringFieldValueType, MeasureValue: aws.String(stringFieldValue), + Dimensions: dimensions, + Time: aws.String(timeValue), + TimeUnit: timeUnit, } records = append(records, record) } diff --git a/plugins/outputs/timestream/timestream_test.go b/plugins/outputs/timestream/timestream_test.go index 7be25c255..70f81c8cb 100644 --- a/plugins/outputs/timestream/timestream_test.go +++ b/plugins/outputs/timestream/timestream_test.go @@ -28,6 +28,7 @@ const testSingleTableDim = "namespace" var time1 = time.Date(2009, time.November, 10, 22, 0, 0, 0, time.UTC) const time1Epoch = "1257890400" +const timeUnit = types.TimeUnitSeconds var time2 = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) @@ -250,37 +251,58 @@ func TestTransformMetricsSkipEmptyMetric(t *testing.T) { time1, ) - expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: testSingleTableName, - dimensions: map[string]string{"tag2": "value2", testSingleTableDim: metricName1}, - measureValues: map[string]string{"value": "10"}, - }) - expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: testSingleTableName, - dimensions: map[string]string{testSingleTableDim: metricName1}, - measureValues: map[string]string{"value": "20"}, + records := buildRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: metricName1, + dimensions: map[string]string{"tag2": "value2", testSingleTableDim: metricName1}, + measureValues: map[string]string{"value": "10"}, + }, + + { + t: time1Epoch, + tableName: metricName1, + dimensions: map[string]string{testSingleTableDim: metricName1}, + measureValues: map[string]string{"value": "20"}, + }, }) + + expectedResultSingleTable := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(tsDbName), + TableName: aws.String(testSingleTableName), + Records: records, + CommonAttributes: &types.Record{}, + } + comparisonTest(t, MappingModeSingleTable, []telegraf.Metric{input1, input2, input3}, - []*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) + []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable}) - expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: metricName1, - dimensions: map[string]string{"tag2": "value2"}, - measureValues: map[string]string{"value": "10"}, - }) - expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: metricName1, - dimensions: map[string]string{}, - measureValues: map[string]string{"value": "20"}, + recordsMulti := buildRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: metricName1, + dimensions: map[string]string{"tag2": "value2"}, + measureValues: map[string]string{"value": "10"}, + }, + { + t: time1Epoch, + tableName: metricName1, + dimensions: map[string]string{}, + measureValues: map[string]string{"value": "20"}, + }, }) + + expectedResultMultiTable := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(tsDbName), + TableName: aws.String(metricName1), + Records: recordsMulti, + CommonAttributes: &types.Record{}, + } + comparisonTest(t, MappingModeMultiTable, []telegraf.Metric{input1, input2, input3}, - []*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable}) + []*timestreamwrite.WriteRecordsInput{expectedResultMultiTable}) } func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) { @@ -305,13 +327,13 @@ func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) { resultFields[fieldName] = "10" } - expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ + expectedResult1SingleTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: testSingleTableName, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, measureValues: resultFields, }) - expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ + expectedResult2SingleTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: testSingleTableName, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, @@ -321,13 +343,13 @@ func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) { inputs, []*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) - expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ + expectedResult1MultiTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: metricName1, dimensions: map[string]string{"tag1": "value1"}, measureValues: resultFields, }) - expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ + expectedResult2MultiTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: metricName1, dimensions: map[string]string{"tag1": "value1"}, @@ -339,6 +361,7 @@ func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) { } func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *testing.T) { + input1 := testutil.MustMetric( metricName1, map[string]string{"tag1": "value1"}, @@ -347,8 +370,9 @@ func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t * }, time1, ) + input2 := testutil.MustMetric( - metricName1, + metricName2, map[string]string{"tag2": "value2"}, map[string]interface{}{ "value_supported3": float64(30), @@ -356,32 +380,42 @@ func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t * time1, ) - expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: testSingleTableName, - dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, - measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, - }) - expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: testSingleTableName, - dimensions: map[string]string{"tag2": "value2", testSingleTableDim: metricName1}, - measureValues: map[string]string{"value_supported3": "30"}, + recordsSingle := buildRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: testSingleTableName, + dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, + measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, + }, + { + t: time1Epoch, + tableName: testSingleTableName, + dimensions: map[string]string{"tag2": "value2", testSingleTableDim: metricName2}, + measureValues: map[string]string{"value_supported3": "30"}, + }, }) + expectedResultSingleTable := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(tsDbName), + TableName: aws.String(testSingleTableName), + Records: recordsSingle, + CommonAttributes: &types.Record{}, + } + comparisonTest(t, MappingModeSingleTable, []telegraf.Metric{input1, input2}, - []*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) + []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable}) - expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ + expectedResult1MultiTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: metricName1, dimensions: map[string]string{"tag1": "value1"}, measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, }) - expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ + + expectedResult2MultiTable := buildExpectedInput(SimpleInput{ t: time1Epoch, - tableName: metricName1, + tableName: metricName2, dimensions: map[string]string{"tag2": "value2"}, measureValues: map[string]string{"value_supported3": "30"}, }) @@ -401,7 +435,7 @@ func TestTransformMetricsSameDimensionsDifferentDimensionValuesAreWrittenSeparat time1, ) input2 := testutil.MustMetric( - metricName1, + metricName2, map[string]string{"tag1": "value2"}, map[string]interface{}{ "value_supported1": float64(20), @@ -409,32 +443,41 @@ func TestTransformMetricsSameDimensionsDifferentDimensionValuesAreWrittenSeparat time1, ) - expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: testSingleTableName, - dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, - measureValues: map[string]string{"value_supported1": "10"}, - }) - expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: testSingleTableName, - dimensions: map[string]string{"tag1": "value2", testSingleTableDim: metricName1}, - measureValues: map[string]string{"value_supported1": "20"}, + recordsSingle := buildRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: testSingleTableName, + dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, + measureValues: map[string]string{"value_supported1": "10"}, + }, + { + t: time1Epoch, + tableName: testSingleTableName, + dimensions: map[string]string{"tag1": "value2", testSingleTableDim: metricName2}, + measureValues: map[string]string{"value_supported1": "20"}, + }, }) + expectedResultSingleTable := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(tsDbName), + TableName: aws.String(testSingleTableName), + Records: recordsSingle, + CommonAttributes: &types.Record{}, + } + comparisonTest(t, MappingModeSingleTable, []telegraf.Metric{input1, input2}, - []*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) + []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable}) - expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ + expectedResult1MultiTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: metricName1, dimensions: map[string]string{"tag1": "value1"}, measureValues: map[string]string{"value_supported1": "10"}, }) - expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ + expectedResult2MultiTable := buildExpectedInput(SimpleInput{ t: time1Epoch, - tableName: metricName1, + tableName: metricName2, dimensions: map[string]string{"tag1": "value2"}, measureValues: map[string]string{"value_supported1": "20"}, }) @@ -462,39 +505,57 @@ func TestTransformMetricsSameDimensionsDifferentTimestampsAreWrittenSeparate(t * time2, ) - expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: testSingleTableName, - dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, - measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, - }) - expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ - t: time2Epoch, - tableName: testSingleTableName, - dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, - measureValues: map[string]string{"value_supported3": "30"}, + recordsSingle := buildRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: testSingleTableName, + dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, + measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, + }, + { + t: time2Epoch, + tableName: testSingleTableName, + dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, + measureValues: map[string]string{"value_supported3": "30"}, + }, }) + expectedResultSingleTable := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(tsDbName), + TableName: aws.String(testSingleTableName), + Records: recordsSingle, + CommonAttributes: &types.Record{}, + } + comparisonTest(t, MappingModeSingleTable, []telegraf.Metric{input1, input2}, - []*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) + []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable}) - expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: metricName1, - dimensions: map[string]string{"tag1": "value1"}, - measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, - }) - expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ - t: time2Epoch, - tableName: metricName1, - dimensions: map[string]string{"tag1": "value1"}, - measureValues: map[string]string{"value_supported3": "30"}, + recordsMultiTable := buildRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: metricName1, + dimensions: map[string]string{"tag1": "value1"}, + measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, + }, + { + t: time2Epoch, + tableName: metricName1, + dimensions: map[string]string{"tag1": "value1"}, + measureValues: map[string]string{"value_supported3": "30"}, + }, }) + expectedResultMultiTable := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(tsDbName), + TableName: aws.String(metricName1), + Records: recordsMultiTable, + CommonAttributes: &types.Record{}, + } + comparisonTest(t, MappingModeMultiTable, []telegraf.Metric{input1, input2}, - []*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable}) + []*timestreamwrite.WriteRecordsInput{expectedResultMultiTable}) } func TestTransformMetricsSameDimensionsSameTimestampsAreWrittenTogether(t *testing.T) { @@ -515,7 +576,7 @@ func TestTransformMetricsSameDimensionsSameTimestampsAreWrittenTogether(t *testi time1, ) - expectedResultSingleTable := buildExpectedRecords(SimpleInput{ + expectedResultSingleTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: testSingleTableName, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, @@ -526,7 +587,7 @@ func TestTransformMetricsSameDimensionsSameTimestampsAreWrittenTogether(t *testi []telegraf.Metric{input1, input2}, []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable}) - expectedResultMultiTable := buildExpectedRecords(SimpleInput{ + expectedResultMultiTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: metricName1, dimensions: map[string]string{"tag1": "value1"}, @@ -556,30 +617,39 @@ func TestTransformMetricsDifferentMetricsAreWrittenToDifferentTablesInMultiTable time1, ) - expectedResult1SingleTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: testSingleTableName, - dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, - measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, - }) - expectedResult2SingleTable := buildExpectedRecords(SimpleInput{ - t: time1Epoch, - tableName: testSingleTableName, - dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName2}, - measureValues: map[string]string{"value_supported3": "30"}, + recordsSingle := buildRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: testSingleTableName, + dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, + measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, + }, + { + t: time1Epoch, + tableName: testSingleTableName, + dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName2}, + measureValues: map[string]string{"value_supported3": "30"}, + }, }) + expectedResultSingleTable := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(tsDbName), + TableName: aws.String(testSingleTableName), + Records: recordsSingle, + CommonAttributes: &types.Record{}, + } + comparisonTest(t, MappingModeSingleTable, []telegraf.Metric{input1, input2}, - []*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) + []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable}) - expectedResult1MultiTable := buildExpectedRecords(SimpleInput{ + expectedResult1MultiTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: metricName1, dimensions: map[string]string{"tag1": "value1"}, measureValues: map[string]string{"value_supported1": "10", "value_supported2": "20"}, }) - expectedResult2MultiTable := buildExpectedRecords(SimpleInput{ + expectedResult2MultiTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: metricName2, dimensions: map[string]string{"tag1": "value1"}, @@ -600,7 +670,7 @@ func TestTransformMetricsUnsupportedFieldsAreSkipped(t *testing.T) { }, time1, ) - expectedResultSingleTable := buildExpectedRecords(SimpleInput{ + expectedResultSingleTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: testSingleTableName, dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, @@ -611,7 +681,7 @@ func TestTransformMetricsUnsupportedFieldsAreSkipped(t *testing.T) { []telegraf.Metric{metricWithUnsupportedField}, []*timestreamwrite.WriteRecordsInput{expectedResultSingleTable}) - expectedResultMultiTable := buildExpectedRecords(SimpleInput{ + expectedResultMultiTable := buildExpectedInput(SimpleInput{ t: time1Epoch, tableName: metricName1, dimensions: map[string]string{"tag1": "value1"}, @@ -646,6 +716,15 @@ func comparisonTest(t *testing.T, Log: testutil.Logger{}, } } + + comparison(t, plugin, mappingMode, telegrafMetrics, timestreamRecords) +} + +func comparison(t *testing.T, + plugin Timestream, + mappingMode string, + telegrafMetrics []telegraf.Metric, + timestreamRecords []*timestreamwrite.WriteRecordsInput) { result := plugin.TransformMetrics(telegrafMetrics) require.Equal(t, len(timestreamRecords), len(result), "The number of transformed records was expected to be different") @@ -698,7 +777,7 @@ type SimpleInput struct { measureValues map[string]string } -func buildExpectedRecords(i SimpleInput) *timestreamwrite.WriteRecordsInput { +func buildExpectedInput(i SimpleInput) *timestreamwrite.WriteRecordsInput { var tsDimensions []types.Dimension for k, v := range i.dimensions { tsDimensions = append(tsDimensions, types.Dimension{ @@ -713,19 +792,54 @@ func buildExpectedRecords(i SimpleInput) *timestreamwrite.WriteRecordsInput { MeasureName: aws.String(k), MeasureValue: aws.String(v), MeasureValueType: types.MeasureValueTypeDouble, + Dimensions: tsDimensions, + Time: aws.String(i.t), + TimeUnit: timeUnit, }) } result := ×treamwrite.WriteRecordsInput{ - DatabaseName: aws.String(tsDbName), - TableName: aws.String(i.tableName), - Records: tsRecords, - CommonAttributes: &types.Record{ - Dimensions: tsDimensions, - Time: aws.String(i.t), - TimeUnit: types.TimeUnitSeconds, - }, + DatabaseName: aws.String(tsDbName), + TableName: aws.String(i.tableName), + Records: tsRecords, + CommonAttributes: &types.Record{}, } return result } + +func buildRecords(inputs []SimpleInput) []types.Record { + var tsRecords []types.Record + + for _, inp := range inputs { + tsRecords = append(tsRecords, buildRecord(inp)...) + } + + return tsRecords +} + +func buildRecord(input SimpleInput) []types.Record { + var tsRecords []types.Record + + var tsDimensions []types.Dimension + + for k, v := range input.dimensions { + tsDimensions = append(tsDimensions, types.Dimension{ + Name: aws.String(k), + Value: aws.String(v), + }) + } + + for k, v := range input.measureValues { + tsRecords = append(tsRecords, types.Record{ + MeasureName: aws.String(k), + MeasureValue: aws.String(v), + MeasureValueType: types.MeasureValueTypeDouble, + Dimensions: tsDimensions, + Time: aws.String(input.t), + TimeUnit: timeUnit, + }) + } + + return tsRecords +}