diff --git a/plugins/outputs/timestream/README.md b/plugins/outputs/timestream/README.md index bec1b217e..681f95b1f 100644 --- a/plugins/outputs/timestream/README.md +++ b/plugins/outputs/timestream/README.md @@ -24,7 +24,7 @@ API endpoint. In the following order the plugin will attempt to authenticate. region = "us-east-1" ## Amazon Credentials - ## Credentials are loaded in the following order + ## Credentials are loaded in the following order: ## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified ## 2) Assumed credentials via STS if role_arn is specified ## 3) explicit credentials from 'access_key' and 'secret_key' @@ -57,76 +57,50 @@ API endpoint. In the following order the plugin will attempt to authenticate. ## and therefore the Telegraf agent will not start. describe_database_on_start = false - ## The mapping mode specifies how Telegraf records are represented in Timestream. + ## Specifies how the data is organized in Timestream. ## Valid values are: single-table, multi-table. - ## For example, consider the following data in line protocol format: - ## weather,location=us-midwest,season=summer temperature=82,humidity=71 1465839830100400200 - ## airquality,location=us-west no2=5,pm25=16 1465839830100400200 - ## where weather and airquality are the measurement names, location and season are tags, - ## and temperature, humidity, no2, pm25 are fields. - ## In multi-table mode: - ## - first line will be ingested to table named weather - ## - second line will be ingested to table named airquality - ## - the tags will be represented as dimensions - ## - first table (weather) will have two records: - ## one with measurement name equals to temperature, - ## another with measurement name equals to humidity - ## - second table (airquality) will have two records: - ## one with measurement name equals to no2, - ## another with measurement name equals to pm25 - ## - the Timestream tables from the example will look like this: - ## TABLE "weather": - ## time | location | season | measure_name | measure_value::bigint - ## 2016-06-13 17:43:50 | us-midwest | summer | temperature | 82 - ## 2016-06-13 17:43:50 | us-midwest | summer | humidity | 71 - ## TABLE "airquality": - ## time | location | measure_name | measure_value::bigint - ## 2016-06-13 17:43:50 | us-west | no2 | 5 - ## 2016-06-13 17:43:50 | us-west | pm25 | 16 - ## In single-table mode: - ## - the data will be ingested to a single table, which name will be valueOf(single_table_name) - ## - measurement name will stored in dimension named valueOf(single_table_dimension_name_for_telegraf_measurement_name) - ## - location and season will be represented as dimensions - ## - temperature, humidity, no2, pm25 will be represented as measurement name - ## - the Timestream table from the example will look like this: - ## Assuming: - ## - single_table_name = "my_readings" - ## - single_table_dimension_name_for_telegraf_measurement_name = "namespace" - ## TABLE "my_readings": - ## time | location | season | namespace | measure_name | measure_value::bigint - ## 2016-06-13 17:43:50 | us-midwest | summer | weather | temperature | 82 - ## 2016-06-13 17:43:50 | us-midwest | summer | weather | humidity | 71 - ## 2016-06-13 17:43:50 | us-west | NULL | airquality | no2 | 5 - ## 2016-06-13 17:43:50 | us-west | NULL | airquality | pm25 | 16 - ## In most cases, using multi-table mapping mode is recommended. - ## However, you can consider using single-table in situations when you have thousands of measurement names. + ## When mapping_mode is set to single-table, all of the data is stored in a single table. + ## When mapping_mode is set to multi-table, the data is organized and stored in multiple tables. + ## The default is multi-table. mapping_mode = "multi-table" - ## Only valid and required for mapping_mode = "single-table" - ## Specifies the Timestream table where the metrics will be uploaded. - # single_table_name = "yourTableNameHere" - - ## Only valid and required for mapping_mode = "single-table" - ## Describes what will be the Timestream dimension name for the Telegraf - ## measurement name. - # single_table_dimension_name_for_telegraf_measurement_name = "namespace" - - ## Specifies if the plugin should create the table, if the table do not exist. - ## The plugin writes the data without prior checking if the table exists. - ## When the table does not exist, the error returned from Timestream will cause - ## the plugin to create the table, if this parameter is set to true. + ## Specifies if the plugin should create the table, if the table does not exist. create_table_if_not_exists = true - ## Only valid and required if create_table_if_not_exists = true ## Specifies the Timestream table magnetic store retention period in days. ## Check Timestream documentation for more details. + ## NOTE: This property is valid when create_table_if_not_exists = true. create_table_magnetic_store_retention_period_in_days = 365 - ## Only valid and required if create_table_if_not_exists = true ## Specifies the Timestream table memory store retention period in hours. ## Check Timestream documentation for more details. + ## NOTE: This property is valid when create_table_if_not_exists = true. create_table_memory_store_retention_period_in_hours = 24 + ## Specifies how the data is written into Timestream. + ## Valid values are: true, false + ## When use_multi_measure_records is set to true, all of the tags and fields are stored + ## as a single row in a Timestream table. + ## When use_multi_measure_record is set to false, Timestream stores each field in a + ## separate table row, thereby storing the tags multiple times (once for each field). + ## The recommended setting is true. + ## The default is false. + use_multi_measure_records = "false" + + ## Specifies the measure_name to use when sending multi-measure records. + ## NOTE: This property is valid when use_multi_measure_records=true and mapping_mode=multi-table + measure_name_for_multi_measure_records = "telegraf_measure" + + ## Specifies the name of the table to write data into + ## NOTE: This property is valid when mapping_mode=single-table. + # single_table_name = "" + + ## Specifies the name of dimension when all of the data is being stored in a single table + ## and the measurement name is transformed into the dimension value + ## (see Mapping data from Influx to Timestream for details) + ## NOTE: This property is valid when mapping_mode=single-table. + # single_table_dimension_name_for_telegraf_measurement_name = "namespace" + ## Only valid and optional if create_table_if_not_exists = true ## Specifies the Timestream table tags. ## Check Timestream documentation for more details @@ -135,6 +109,9 @@ API endpoint. In the following order the plugin will attempt to authenticate. ## Specify the maximum number of parallel go routines to ingest/write data ## If not specified, defaulted to 1 go routines max_write_go_routines = 25 + + ## Please see README.md to know how line protocol data is mapped to Timestream + ## ``` ### Batching @@ -181,8 +158,111 @@ Execute unit tests with: go test -v ./plugins/outputs/timestream/... ``` -[Amazon Timestream]: https://aws.amazon.com/timestream/ -[Assumed credentials via STS]: https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/credentials/stscreds -[Environment Variables]: https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#environment-variables -[Shared Credentials]: https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#shared-credentials-file -[EC2 Instance Profile]: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html +### Mapping data from Influx to Timestream + +When writing data from Influx to Timestream, +data is written by default as follows: + + 1. The timestamp is written as the time field. + 2. Tags are written as dimensions. + 3. Fields are written as measures. + 4. Measurements are written as table names. + + For example, consider the following data in line protocol format: + + > weather,location=us-midwest,season=summer temperature=82,humidity=71 1465839830100400200 + > airquality,location=us-west no2=5,pm25=16 1465839830100400200 + +where: + `weather` and `airquality` are the measurement names, + `location` and `season` are tags, + `temperature`, `humidity`, `no2`, `pm25` are fields. + +When you choose to create a separate table for each measurement and store +multiple fields in a single table row, the data will be written into +Timestream as: + + 1. The plugin will create 2 tables, namely, weather and airquality (mapping_mode=multi-table). + 2. The tables may contain multiple fields in a single table row (use_multi_measure_records=true). + 3. The table weather will contain the following columns and data: + + | time | location | season | measure_name | temperature | humidity | + | :--- | :--- | :--- | :--- | :--- | :--- | + | 2016-06-13 17:43:50 | us-midwest | summer | `` | 82 | 71| + + 4. The table airquality will contain the following columns and data: + + | time | location | measure_name | no2 | pm25 | + | :--- | :--- | :--- | :--- | :--- | + |2016-06-13 17:43:50 | us-west | `` | 5 | 16 | + + NOTE: + `` represents the actual + value of that property. + +You can also choose to create a separate table per measurement and store +each field in a separate row per table. In that case: + + 1. The plugin will create 2 tables, namely, weather and airquality (mapping_mode=multi-table). + 2. Each table row will contain a single field only (use_multi_measure_records=false). + 3. The table weather will contain the following columns and data: + + | time | location | season | measure_name | measure_value::bigint | + | :--- | :--- | :--- | :--- | :--- | + | 2016-06-13 17:43:50 | us-midwest | summer | temperature | 82 | + | 2016-06-13 17:43:50 | us-midwest | summer | humidity | 71 | + + 4. The table airquality will contain the following columns and data: + + | time | location | measure_name | measure_value::bigint | + | :--- | :--- | :--- | :--- | + | 2016-06-13 17:43:50 | us-west | no2 | 5 | + | 2016-06-13 17:43:50 | us-west | pm25 | 16 | + +You can also choose to store all the measurements in a single table and +store all fields in a single table row. In that case: + + 1. This plugin will create a table with name (mapping_mode=single-table). + 2. The table may contain multiple fields in a single table row (use_multi_measure_records=true). + 3. The table will contain the following column and data: + + | time | location | season | ``| measure_name | temperature | humidity | no2 | pm25 | + | :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | + | 2016-06-13 17:43:50 | us-midwest | summer | weather | `` | 82 | 71 | null | null | + | 2016-06-13 17:43:50 | us-west | null | airquality | `` | null | null | 5 | 16 | + + NOTE: + `` represents the actual value of that property. + `` represents + the actual value of that property. + `` represents the actual value of + that property. + +Furthermore, you can choose to store all the measurements in a single table +and store each field in a separate table row. In that case: + + 1. Timestream will create a table with name (mapping_mode=single-table). + 2. Each table row will contain a single field only (use_multi_measure_records=false). + 3. The table will contain the following column and data: + + | time | location | season | namespace | measure_name | measure_value::bigint | + | :--- | :--- | :--- | :--- | :--- | :--- | + | 2016-06-13 17:43:50 | us-midwest | summer | weather | temperature | 82 | + | 2016-06-13 17:43:50 | us-midwest | summer | weather | humidity | 71 | + | 2016-06-13 17:43:50 | us-west | NULL | airquality | no2 | 5 | + | 2016-06-13 17:43:50 | us-west | NULL | airquality | pm25 | 16 | + + NOTE: + `` represents the actual value of that property. + `` represents the + actual value of that property. + `` represents the actual value of + that property. + +### References + +- [Amazon Timestream](https://aws.amazon.com/timestream/) +- [Assumed credentials via STS](https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/credentials/stscreds) +- [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#environment-variables) +- [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#shared-credentials-file) +- [EC2 Instance Profile](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) diff --git a/plugins/outputs/timestream/sample.conf b/plugins/outputs/timestream/sample.conf index b32c69856..aa8691d0d 100644 --- a/plugins/outputs/timestream/sample.conf +++ b/plugins/outputs/timestream/sample.conf @@ -4,7 +4,7 @@ region = "us-east-1" ## Amazon Credentials - ## Credentials are loaded in the following order + ## Credentials are loaded in the following order: ## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified ## 2) Assumed credentials via STS if role_arn is specified ## 3) explicit credentials from 'access_key' and 'secret_key' @@ -37,76 +37,50 @@ ## and therefore the Telegraf agent will not start. describe_database_on_start = false - ## The mapping mode specifies how Telegraf records are represented in Timestream. + ## Specifies how the data is organized in Timestream. ## Valid values are: single-table, multi-table. - ## For example, consider the following data in line protocol format: - ## weather,location=us-midwest,season=summer temperature=82,humidity=71 1465839830100400200 - ## airquality,location=us-west no2=5,pm25=16 1465839830100400200 - ## where weather and airquality are the measurement names, location and season are tags, - ## and temperature, humidity, no2, pm25 are fields. - ## In multi-table mode: - ## - first line will be ingested to table named weather - ## - second line will be ingested to table named airquality - ## - the tags will be represented as dimensions - ## - first table (weather) will have two records: - ## one with measurement name equals to temperature, - ## another with measurement name equals to humidity - ## - second table (airquality) will have two records: - ## one with measurement name equals to no2, - ## another with measurement name equals to pm25 - ## - the Timestream tables from the example will look like this: - ## TABLE "weather": - ## time | location | season | measure_name | measure_value::bigint - ## 2016-06-13 17:43:50 | us-midwest | summer | temperature | 82 - ## 2016-06-13 17:43:50 | us-midwest | summer | humidity | 71 - ## TABLE "airquality": - ## time | location | measure_name | measure_value::bigint - ## 2016-06-13 17:43:50 | us-west | no2 | 5 - ## 2016-06-13 17:43:50 | us-west | pm25 | 16 - ## In single-table mode: - ## - the data will be ingested to a single table, which name will be valueOf(single_table_name) - ## - measurement name will stored in dimension named valueOf(single_table_dimension_name_for_telegraf_measurement_name) - ## - location and season will be represented as dimensions - ## - temperature, humidity, no2, pm25 will be represented as measurement name - ## - the Timestream table from the example will look like this: - ## Assuming: - ## - single_table_name = "my_readings" - ## - single_table_dimension_name_for_telegraf_measurement_name = "namespace" - ## TABLE "my_readings": - ## time | location | season | namespace | measure_name | measure_value::bigint - ## 2016-06-13 17:43:50 | us-midwest | summer | weather | temperature | 82 - ## 2016-06-13 17:43:50 | us-midwest | summer | weather | humidity | 71 - ## 2016-06-13 17:43:50 | us-west | NULL | airquality | no2 | 5 - ## 2016-06-13 17:43:50 | us-west | NULL | airquality | pm25 | 16 - ## In most cases, using multi-table mapping mode is recommended. - ## However, you can consider using single-table in situations when you have thousands of measurement names. + ## When mapping_mode is set to single-table, all of the data is stored in a single table. + ## When mapping_mode is set to multi-table, the data is organized and stored in multiple tables. + ## The default is multi-table. mapping_mode = "multi-table" - ## Only valid and required for mapping_mode = "single-table" - ## Specifies the Timestream table where the metrics will be uploaded. - # single_table_name = "yourTableNameHere" - - ## Only valid and required for mapping_mode = "single-table" - ## Describes what will be the Timestream dimension name for the Telegraf - ## measurement name. - # single_table_dimension_name_for_telegraf_measurement_name = "namespace" - - ## Specifies if the plugin should create the table, if the table do not exist. - ## The plugin writes the data without prior checking if the table exists. - ## When the table does not exist, the error returned from Timestream will cause - ## the plugin to create the table, if this parameter is set to true. + ## Specifies if the plugin should create the table, if the table does not exist. create_table_if_not_exists = true - ## Only valid and required if create_table_if_not_exists = true ## Specifies the Timestream table magnetic store retention period in days. ## Check Timestream documentation for more details. + ## NOTE: This property is valid when create_table_if_not_exists = true. create_table_magnetic_store_retention_period_in_days = 365 - ## Only valid and required if create_table_if_not_exists = true ## Specifies the Timestream table memory store retention period in hours. ## Check Timestream documentation for more details. + ## NOTE: This property is valid when create_table_if_not_exists = true. create_table_memory_store_retention_period_in_hours = 24 + ## Specifies how the data is written into Timestream. + ## Valid values are: true, false + ## When use_multi_measure_records is set to true, all of the tags and fields are stored + ## as a single row in a Timestream table. + ## When use_multi_measure_record is set to false, Timestream stores each field in a + ## separate table row, thereby storing the tags multiple times (once for each field). + ## The recommended setting is true. + ## The default is false. + use_multi_measure_records = "false" + + ## Specifies the measure_name to use when sending multi-measure records. + ## NOTE: This property is valid when use_multi_measure_records=true and mapping_mode=multi-table + measure_name_for_multi_measure_records = "telegraf_measure" + + ## Specifies the name of the table to write data into + ## NOTE: This property is valid when mapping_mode=single-table. + # single_table_name = "" + + ## Specifies the name of dimension when all of the data is being stored in a single table + ## and the measurement name is transformed into the dimension value + ## (see Mapping data from Influx to Timestream for details) + ## NOTE: This property is valid when mapping_mode=single-table. + # single_table_dimension_name_for_telegraf_measurement_name = "namespace" + ## Only valid and optional if create_table_if_not_exists = true ## Specifies the Timestream table tags. ## Check Timestream documentation for more details @@ -115,3 +89,6 @@ ## Specify the maximum number of parallel go routines to ingest/write data ## If not specified, defaulted to 1 go routines max_write_go_routines = 25 + + ## Please see README.md to know how line protocol data is mapped to Timestream + ## diff --git a/plugins/outputs/timestream/timestream.go b/plugins/outputs/timestream/timestream.go index 2064e9223..596416fca 100644 --- a/plugins/outputs/timestream/timestream.go +++ b/plugins/outputs/timestream/timestream.go @@ -34,6 +34,9 @@ type ( SingleTableName string `toml:"single_table_name"` SingleTableDimensionNameForTelegrafMeasurementName string `toml:"single_table_dimension_name_for_telegraf_measurement_name"` + UseMultiMeasureRecords bool `toml:"use_multi_measure_records"` + MeasureNameForMultiMeasureRecords string `toml:"measure_name_for_multi_measure_records"` + CreateTableIfNotExists bool `toml:"create_table_if_not_exists"` CreateTableMagneticStoreRetentionPeriodInDays int64 `toml:"create_table_magnetic_store_retention_period_in_days"` CreateTableMemoryStoreRetentionPeriodInHours int64 `toml:"create_table_memory_store_retention_period_in_hours"` @@ -132,10 +135,16 @@ func (t *Timestream) Connect() error { return fmt.Errorf("in '%s' mapping mode, SingleTableName key is required", MappingModeSingleTable) } - if t.SingleTableDimensionNameForTelegrafMeasurementName == "" { + if t.SingleTableDimensionNameForTelegrafMeasurementName == "" && !t.UseMultiMeasureRecords { return fmt.Errorf("in '%s' mapping mode, SingleTableDimensionNameForTelegrafMeasurementName key is required", MappingModeSingleTable) } + + // When using MappingModeSingleTable with UseMultiMeasureRecords enabled, + // measurementName ( from line protocol ) is mapped to multiMeasure name in timestream. + if t.UseMultiMeasureRecords && t.MeasureNameForMultiMeasureRecords != "" { + return fmt.Errorf("in '%s' mapping mode, with multi-measure enabled, key MeasureNameForMultiMeasureRecords is invalid", MappingModeMultiTable) + } } if t.MappingMode == MappingModeMultiTable { @@ -146,6 +155,13 @@ func (t *Timestream) Connect() error { if t.SingleTableDimensionNameForTelegrafMeasurementName != "" { return fmt.Errorf("in '%s' mapping mode, do not specify SingleTableDimensionNameForTelegrafMeasurementName key", MappingModeMultiTable) } + + // When using MappingModeMultiTable ( data is ingested to multiple tables ) with + // UseMultiMeasureRecords enabled, measurementName is used as tableName in timestream and + // we require MeasureNameForMultiMeasureRecords to be configured. + if t.UseMultiMeasureRecords && t.MeasureNameForMultiMeasureRecords == "" { + return fmt.Errorf("in '%s' mapping mode, with multi-measure enabled, key MeasureNameForMultiMeasureRecords is required", MappingModeMultiTable) + } } if t.CreateTableIfNotExists { @@ -252,8 +268,6 @@ func (t *Timestream) Write(metrics []telegraf.Metric) error { } func (t *Timestream) writeToTimestream(writeRecordsInput *timestreamwrite.WriteRecordsInput, resourceNotFoundRetry bool) error { - t.Log.Debugf("Writing to Timestream: '%v' with ResourceNotFoundRetry: '%t'", writeRecordsInput, resourceNotFoundRetry) - _, err := t.svc.WriteRecords(context.Background(), writeRecordsInput) if err != nil { // Telegraf will retry ingesting the metrics if an error is returned from the plugin. @@ -266,11 +280,18 @@ func (t *Timestream) writeToTimestream(writeRecordsInput *timestreamwrite.WriteR return t.createTableAndRetry(writeRecordsInput) } t.logWriteToTimestreamError(notFound, writeRecordsInput.TableName) + // log error and return error to telegraf to retry in next flush interval + // We need this is to avoid data drop when there are no tables present in the database + return fmt.Errorf("failed to write to Timestream database '%s' table '%s', Error: '%s'", + t.DatabaseName, *writeRecordsInput.TableName, err) } var rejected *types.RejectedRecordsException if errors.As(err, &rejected) { t.logWriteToTimestreamError(err, writeRecordsInput.TableName) + for _, rr := range rejected.RejectedRecords { + t.Log.Errorf("reject reason: '%s', record index: '%d'", aws.ToString(rr.Reason), rr.RecordIndex) + } return nil } @@ -412,7 +433,7 @@ func (t *Timestream) buildDimensions(point telegraf.Metric) []types.Dimension { } dimensions = append(dimensions, dimension) } - if t.MappingMode == MappingModeSingleTable { + if t.MappingMode == MappingModeSingleTable && !t.UseMultiMeasureRecords { dimension := types.Dimension{ Name: aws.String(t.SingleTableDimensionNameForTelegrafMeasurementName), Value: aws.String(point.Name()), @@ -427,6 +448,13 @@ func (t *Timestream) buildDimensions(point telegraf.Metric) []types.Dimension { // Records with unsupported Metric Field type are skipped. // It returns an array of Timestream write records. func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record { + if t.UseMultiMeasureRecords { + return t.buildMultiMeasureWriteRecords(point) + } + return t.buildSingleWriteRecords(point) +} + +func (t *Timestream) buildSingleWriteRecords(point telegraf.Metric) []types.Record { var records []types.Record dimensions := t.buildDimensions(point) @@ -434,7 +462,7 @@ func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record { for fieldName, fieldValue := range point.Fields() { stringFieldValue, stringFieldValueType, ok := convertValue(fieldValue) if !ok { - t.Log.Errorf("Skipping field '%s'. The type '%s' is not supported in Timestream as MeasureValue. "+ + t.Log.Warnf("Skipping field '%s'. The type '%s' is not supported in Timestream as MeasureValue. "+ "Supported values are: [int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool]", fieldName, reflect.TypeOf(fieldValue)) continue @@ -455,6 +483,48 @@ func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record { return records } +func (t *Timestream) buildMultiMeasureWriteRecords(point telegraf.Metric) []types.Record { + var records []types.Record + dimensions := t.buildDimensions(point) + + multiMeasureName := t.MeasureNameForMultiMeasureRecords + if t.MappingMode == MappingModeSingleTable { + multiMeasureName = point.Name() + } + + var multiMeasures []types.MeasureValue + + for fieldName, fieldValue := range point.Fields() { + stringFieldValue, stringFieldValueType, ok := convertValue(fieldValue) + if !ok { + t.Log.Warnf("Skipping field '%s'. The type '%s' is not supported in Timestream as MeasureValue. "+ + "Supported values are: [int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool]", + fieldName, reflect.TypeOf(fieldValue)) + continue + } + multiMeasures = append(multiMeasures, types.MeasureValue{ + Name: aws.String(fieldName), + Type: stringFieldValueType, + Value: aws.String(stringFieldValue), + }) + } + + timeUnit, timeValue := getTimestreamTime(point.Time()) + + record := types.Record{ + MeasureName: aws.String(multiMeasureName), + MeasureValueType: "MULTI", + MeasureValues: multiMeasures, + Dimensions: dimensions, + Time: aws.String(timeValue), + TimeUnit: timeUnit, + } + + records = append(records, record) + + return records +} + // partitionRecords splits the Timestream records into smaller slices of a max size // so that are under the limit for the Timestream API call. // It returns the array of array of records. diff --git a/plugins/outputs/timestream/timestream_test.go b/plugins/outputs/timestream/timestream_test.go index dfde353a3..b7da46837 100644 --- a/plugins/outputs/timestream/timestream_test.go +++ b/plugins/outputs/timestream/timestream_test.go @@ -37,12 +37,15 @@ const time2Epoch = "1257894000" const metricName1 = "metricName1" const metricName2 = "metricName2" -type mockTimestreamClient struct{} +type mockTimestreamClient struct { + WriteRecordsRequestCount int +} func (m *mockTimestreamClient) CreateTable(context.Context, *timestreamwrite.CreateTableInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.CreateTableOutput, error) { return nil, nil } func (m *mockTimestreamClient) WriteRecords(context.Context, *timestreamwrite.WriteRecordsInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error) { + m.WriteRecordsRequestCount++ return nil, nil } func (m *mockTimestreamClient) DescribeDatabase(context.Context, *timestreamwrite.DescribeDatabaseInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.DescribeDatabaseOutput, error) { @@ -70,6 +73,49 @@ func TestConnectValidatesConfigParameters(t *testing.T) { } require.Contains(t, incorrectMappingMode.Connect().Error(), "single-table") + //multi-measure config validation multi table mode + validConfigMultiMeasureMultiTableMode := Timestream{ + DatabaseName: tsDbName, + MappingMode: MappingModeMultiTable, + UseMultiMeasureRecords: true, + MeasureNameForMultiMeasureRecords: "multi-measure-name", + Log: testutil.Logger{}, + } + require.Nil(t, validConfigMultiMeasureMultiTableMode.Connect()) + + invalidConfigMultiMeasureMultiTableMode := Timestream{ + DatabaseName: tsDbName, + MappingMode: MappingModeMultiTable, + UseMultiMeasureRecords: true, + // without MeasureNameForMultiMeasureRecords set we expect validation failure + Log: testutil.Logger{}, + } + require.Contains(t, invalidConfigMultiMeasureMultiTableMode.Connect().Error(), "MeasureNameForMultiMeasureRecords") + + // multi-measure config validation single table mode + validConfigMultiMeasureSingleTableMode := Timestream{ + DatabaseName: tsDbName, + MappingMode: MappingModeSingleTable, + SingleTableName: testSingleTableName, + UseMultiMeasureRecords: true, // MeasureNameForMultiMeasureRecords is not needed as + // measurement name (from telegraf metric) is used as multi-measure name in TS + Log: testutil.Logger{}, + } + require.Nil(t, validConfigMultiMeasureSingleTableMode.Connect()) + + invalidConfigMultiMeasureSingleTableMode := Timestream{ + DatabaseName: tsDbName, + MappingMode: MappingModeSingleTable, + SingleTableName: testSingleTableName, + UseMultiMeasureRecords: true, + MeasureNameForMultiMeasureRecords: "multi-measure-name", + // value of MeasureNameForMultiMeasureRecords will be ignored and + // measurement name (from telegraf metric) is used as multi-measure name in TS + Log: testutil.Logger{}, + } + err := invalidConfigMultiMeasureSingleTableMode.Connect() + require.ErrorContains(t, err, "MeasureNameForMultiMeasureRecords") + // multi-table arguments validMappingModeMultiTable := Timestream{ DatabaseName: tsDbName, @@ -161,6 +207,269 @@ func TestConnectValidatesConfigParameters(t *testing.T) { require.Contains(t, describeTableInvoked.Connect().Error(), "hello from DescribeDatabase") } +func TestWriteMultiMeasuresSingleTableMode(t *testing.T) { + const recordCount = 100 + mockClient := &mockTimestreamClient{0} + + WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { + return mockClient, nil + } + + localTime, _ := strconv.Atoi(time1Epoch) + + var inputs []telegraf.Metric + + for i := 1; i <= recordCount+1; i++ { + localTime++ + + fieldName1 := "value_supported1" + strconv.Itoa(i) + fieldName2 := "value_supported2" + strconv.Itoa(i) + inputs = append(inputs, testutil.MustMetric( + "multi_measure_name", + map[string]string{"tag1": "value1"}, + map[string]interface{}{ + fieldName1: float64(10), + fieldName2: float64(20), + }, + time.Unix(int64(localTime), 0), + )) + } + + plugin := Timestream{ + MappingMode: MappingModeSingleTable, + SingleTableName: "test-multi-single-table-mode", + DatabaseName: tsDbName, + UseMultiMeasureRecords: true, // use multi + Log: testutil.Logger{}, + } + + // validate config correctness + require.NoError(t, plugin.Connect()) + + // validate multi-record generation + result := plugin.TransformMetrics(inputs) + // 'inputs' has a total of 101 metrics transformed to 2 writeRecord calls to TS + require.Equal(t, 2, len(result), "Expected 2 WriteRecordsInput requests") + + var transformedRecords []types.Record + for _, r := range result { + transformedRecords = append(transformedRecords, r.Records...) + // Assert that we use measure name from input + require.Equal(t, *r.Records[0].MeasureName, "multi_measure_name") + } + // Expected 101 records + require.Equal(t, recordCount+1, len(transformedRecords), "Expected 101 records after transforming") + // validate write to TS + err := plugin.Write(inputs) + require.Nil(t, err, "Write to Timestream failed") + require.Equal(t, mockClient.WriteRecordsRequestCount, 2, "Expected 2 WriteRecords calls") +} + +func TestWriteMultiMeasuresMultiTableMode(t *testing.T) { + const recordCount = 100 + mockClient := &mockTimestreamClient{0} + + WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { + return mockClient, nil + } + + localTime, _ := strconv.Atoi(time1Epoch) + + var inputs []telegraf.Metric + + for i := 1; i <= recordCount; i++ { + localTime++ + + fieldName1 := "value_supported1" + strconv.Itoa(i) + fieldName2 := "value_supported2" + strconv.Itoa(i) + inputs = append(inputs, testutil.MustMetric( + "multi_measure_name", + map[string]string{"tag1": "value1"}, + map[string]interface{}{ + fieldName1: float64(10), + fieldName2: float64(20), + }, + time.Unix(int64(localTime), 0), + )) + } + + plugin := Timestream{ + MappingMode: MappingModeMultiTable, + DatabaseName: tsDbName, + UseMultiMeasureRecords: true, // use multi + MeasureNameForMultiMeasureRecords: "config-multi-measure-name", + Log: testutil.Logger{}, + } + + // validate config correctness + err := plugin.Connect() + require.Nil(t, err, "Invalid configuration") + + // validate multi-record generation + result := plugin.TransformMetrics(inputs) + // 'inputs' has a total of 101 metrics transformed to 2 writeRecord calls to TS + require.Equal(t, 1, len(result), "Expected 1 WriteRecordsInput requests") + + // Assert that we use measure name from config + require.Equal(t, *result[0].Records[0].MeasureName, "config-multi-measure-name") + + var transformedRecords []types.Record + for _, r := range result { + transformedRecords = append(transformedRecords, r.Records...) + } + // Expected 100 records + require.Equal(t, recordCount, len(transformedRecords), "Expected 100 records after transforming") + + for _, input := range inputs { + fmt.Println("Input", input) + fmt.Println(*result[0].Records[0].MeasureName) + break + } + + // validate successful write to TS + err = plugin.Write(inputs) + require.Nil(t, err, "Write to Timestream failed") + require.Equal(t, mockClient.WriteRecordsRequestCount, 1, "Expected 1 WriteRecords call") +} + +func TestBuildMultiMeasuresInSingleAndMultiTableMode(t *testing.T) { + input1 := testutil.MustMetric( + metricName1, + map[string]string{"tag1": "value1"}, + map[string]interface{}{ + "measureDouble": aws.Float64(10), + }, + time1, + ) + + input2 := testutil.MustMetric( + metricName1, + map[string]string{"tag2": "value2"}, + map[string]interface{}{ + "measureBigint": aws.Int32(20), + }, + time1, + ) + + input3 := testutil.MustMetric( + metricName1, + map[string]string{"tag3": "value3"}, + map[string]interface{}{ + "measureVarchar": "DUMMY", + }, + time1, + ) + + input4 := testutil.MustMetric( + metricName1, + map[string]string{"tag4": "value4"}, + map[string]interface{}{ + "measureBool": true, + }, + time1, + ) + + expectedResultMultiTable := buildExpectedMultiRecords("config-multi-measure-name", metricName1) + + plugin := Timestream{ + MappingMode: MappingModeMultiTable, + DatabaseName: tsDbName, + UseMultiMeasureRecords: true, // use multi + MeasureNameForMultiMeasureRecords: "config-multi-measure-name", + Log: testutil.Logger{}, + } + + // validate config correctness + err := plugin.Connect() + require.Nil(t, err, "Invalid configuration") + + // validate multi-record generation with MappingModeMultiTable + result := plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4}) + require.Equal(t, 1, len(result), "Expected 1 WriteRecordsInput requests") + + require.EqualValues(t, result[0], expectedResultMultiTable) + + require.True(t, arrayContains(result, expectedResultMultiTable), "Expected that the list of requests to Timestream: %+v\n "+ + "will contain request: %+v\n\n", result, expectedResultMultiTable) + + // singleTableMode + + plugin = Timestream{ + MappingMode: MappingModeSingleTable, + SingleTableName: "singleTableName", + DatabaseName: tsDbName, + UseMultiMeasureRecords: true, // use multi + Log: testutil.Logger{}, + } + + // validate config correctness + err = plugin.Connect() + require.Nil(t, err, "Invalid configuration") + + expectedResultSingleTable := buildExpectedMultiRecords(metricName1, "singleTableName") + + // validate multi-record generation with MappingModeSingleTable + result = plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4}) + require.Equal(t, 1, len(result), "Expected 1 WriteRecordsInput requests") + + require.EqualValues(t, result[0], expectedResultSingleTable) + + require.True(t, arrayContains(result, expectedResultSingleTable), "Expected that the list of requests to Timestream: %+v\n "+ + "will contain request: %+v\n\n", result, expectedResultSingleTable) +} + +func buildExpectedMultiRecords(multiMeasureName string, tableName string) *timestreamwrite.WriteRecordsInput { + var recordsMultiTableMode []types.Record + recordDouble := buildMultiRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: metricName1, + dimensions: map[string]string{"tag1": "value1"}, + measureValues: map[string]string{"measureDouble": "10"}, + }}, multiMeasureName, types.MeasureValueTypeDouble) + + recordsMultiTableMode = append(recordsMultiTableMode, recordDouble...) + + recordBigint := buildMultiRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: metricName1, + dimensions: map[string]string{"tag2": "value2"}, + measureValues: map[string]string{"measureBigint": "20"}, + }}, multiMeasureName, types.MeasureValueTypeBigint) + + recordsMultiTableMode = append(recordsMultiTableMode, recordBigint...) + + recordVarchar := buildMultiRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: metricName1, + dimensions: map[string]string{"tag3": "value3"}, + measureValues: map[string]string{"measureVarchar": "DUMMY"}, + }}, multiMeasureName, types.MeasureValueTypeVarchar) + + recordsMultiTableMode = append(recordsMultiTableMode, recordVarchar...) + + recordBool := buildMultiRecords([]SimpleInput{ + { + t: time1Epoch, + tableName: metricName1, + dimensions: map[string]string{"tag4": "value4"}, + measureValues: map[string]string{"measureBool": "true"}, + }, + }, multiMeasureName, types.MeasureValueTypeBoolean) + + recordsMultiTableMode = append(recordsMultiTableMode, recordBool...) + + expectedResultMultiTable := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(tsDbName), + TableName: aws.String(tableName), + Records: recordsMultiTableMode, + CommonAttributes: &types.Record{}, + } + return expectedResultMultiTable +} + type mockTimestreamErrorClient struct { ErrorToReturnOnWriteRecords error } @@ -226,6 +535,85 @@ func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) { require.Nil(t, err, "Expected to silently swallow the RejectedRecordsException, "+ "as retrying this error doesn't make sense.") } +func TestWriteWhenRequestsGreaterThanMaxWriteGoRoutinesCount(t *testing.T) { + t.Skip("Skipping test due to data race, will be re-visited") + const maxWriteRecordsCalls = 5 + const maxRecordsInWriteRecordsCall = 100 + const totalRecords = maxWriteRecordsCalls * maxRecordsInWriteRecordsCall + mockClient := &mockTimestreamClient{0} + + WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { + return mockClient, nil + } + + plugin := Timestream{ + MappingMode: MappingModeMultiTable, + DatabaseName: tsDbName, + // Spawn only one go routine to serve all 5 write requests + MaxWriteGoRoutinesCount: 2, + Log: testutil.Logger{}, + } + + require.NoError(t, plugin.Connect()) + + var inputs []telegraf.Metric + + for i := 1; i <= totalRecords; i++ { + fieldName := "value_supported" + strconv.Itoa(i) + inputs = append(inputs, testutil.MustMetric( + metricName1, + map[string]string{"tag1": "value1"}, + map[string]interface{}{ + fieldName: float64(10), + }, + time1, + )) + } + + err := plugin.Write(inputs) + require.Nil(t, err, "Expected to write without any errors ") + require.Equal(t, mockClient.WriteRecordsRequestCount, maxWriteRecordsCalls, "Expected 5 calls to WriteRecords") +} + +func TestWriteWhenRequestsLesserThanMaxWriteGoRoutinesCount(t *testing.T) { + t.Skip("Skipping test due to data race, will be re-visited") + const maxWriteRecordsCalls = 2 + const maxRecordsInWriteRecordsCall = 100 + const totalRecords = maxWriteRecordsCalls * maxRecordsInWriteRecordsCall + mockClient := &mockTimestreamClient{0} + + WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) { + return mockClient, nil + } + + plugin := Timestream{ + MappingMode: MappingModeMultiTable, + DatabaseName: tsDbName, + // Spawn 5 parallel go routines to serve 2 write requests + // In this case only 2 of the 5 go routines will process the write requests + MaxWriteGoRoutinesCount: 5, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Connect()) + + var inputs []telegraf.Metric + + for i := 1; i <= totalRecords; i++ { + fieldName := "value_supported" + strconv.Itoa(i) + inputs = append(inputs, testutil.MustMetric( + metricName1, + map[string]string{"tag1": "value1"}, + map[string]interface{}{ + fieldName: float64(10), + }, + time1, + )) + } + + err := plugin.Write(inputs) + require.Nil(t, err, "Expected to write without any errors ") + require.Equal(t, mockClient.WriteRecordsRequestCount, maxWriteRecordsCalls, "Expected 5 calls to WriteRecords") +} func TestTransformMetricsSkipEmptyMetric(t *testing.T) { input1 := testutil.MustMetric( @@ -360,6 +748,72 @@ func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) { []*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable}) } +func TestTransformMetricsRequestsAboveLimitAreSplitSingleTable(t *testing.T) { + const maxRecordsInWriteRecordsCall = 100 + + localTime, _ := strconv.Atoi(time1Epoch) + + var inputs []telegraf.Metric + + for i := 1; i <= maxRecordsInWriteRecordsCall+1; i++ { + localTime++ + + fieldName := "value_supported" + strconv.Itoa(i) + inputs = append(inputs, testutil.MustMetric( + metricName1, + map[string]string{"tag1": "value1"}, + map[string]interface{}{ + fieldName: float64(10), + }, + time.Unix(int64(localTime), 0), + )) + } + + localTime, _ = strconv.Atoi(time1Epoch) + + var recordsFirstReq []types.Record + + for i := 1; i <= maxRecordsInWriteRecordsCall; i++ { + localTime++ + + recordsFirstReq = append(recordsFirstReq, buildRecord(SimpleInput{ + t: strconv.Itoa(localTime), + tableName: testSingleTableName, + dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, + measureValues: map[string]string{"value_supported" + strconv.Itoa(i): "10"}, + })...) + } + + expectedResult1SingleTable := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(tsDbName), + TableName: aws.String(testSingleTableName), + Records: recordsFirstReq, + CommonAttributes: &types.Record{}, + } + + var recordsSecondReq []types.Record + + localTime++ + + recordsSecondReq = append(recordsSecondReq, buildRecord(SimpleInput{ + t: strconv.Itoa(localTime), + tableName: testSingleTableName, + dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1}, + measureValues: map[string]string{"value_supported" + strconv.Itoa(maxRecordsInWriteRecordsCall+1): "10"}, + })...) + + expectedResult2SingleTable := ×treamwrite.WriteRecordsInput{ + DatabaseName: aws.String(tsDbName), + TableName: aws.String(testSingleTableName), + Records: recordsSecondReq, + CommonAttributes: &types.Record{}, + } + + comparisonTest(t, MappingModeSingleTable, + inputs, + []*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable}) +} + func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *testing.T) { input1 := testutil.MustMetric( metricName1, @@ -858,3 +1312,37 @@ func buildRecord(input SimpleInput) []types.Record { return tsRecords } + +func buildMultiRecords(inputs []SimpleInput, multiMeasureName string, measureType types.MeasureValueType) []types.Record { + var tsRecords []types.Record + for _, input := range inputs { + var multiMeasures []types.MeasureValue + var tsDimensions []types.Dimension + + for k, v := range input.dimensions { + tsDimensions = append(tsDimensions, types.Dimension{ + Name: aws.String(k), + Value: aws.String(v), + }) + } + + for k, v := range input.measureValues { + multiMeasures = append(multiMeasures, types.MeasureValue{ + Name: aws.String(k), + Value: aws.String(v), + Type: measureType, + }) + } + + tsRecords = append(tsRecords, types.Record{ + MeasureName: aws.String(multiMeasureName), + MeasureValueType: "MULTI", + MeasureValues: multiMeasures, + Dimensions: tsDimensions, + Time: aws.String(input.t), + TimeUnit: timeUnit, + }) + } + + return tsRecords +}