feat(outputs.timestream): Support ingesting multi-measures (#11385)

This commit is contained in:
Lohith 2022-09-20 04:45:57 -07:00 committed by GitHub
parent 40fe9daf76
commit 8dd7ec04c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 742 additions and 127 deletions

View File

@ -24,7 +24,7 @@ API endpoint. In the following order the plugin will attempt to authenticate.
region = "us-east-1"
## Amazon Credentials
## Credentials are loaded in the following order
## Credentials are loaded in the following order:
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
## 2) Assumed credentials via STS if role_arn is specified
## 3) explicit credentials from 'access_key' and 'secret_key'
@ -57,76 +57,50 @@ API endpoint. In the following order the plugin will attempt to authenticate.
## and therefore the Telegraf agent will not start.
describe_database_on_start = false
## The mapping mode specifies how Telegraf records are represented in Timestream.
## Specifies how the data is organized in Timestream.
## Valid values are: single-table, multi-table.
## For example, consider the following data in line protocol format:
## weather,location=us-midwest,season=summer temperature=82,humidity=71 1465839830100400200
## airquality,location=us-west no2=5,pm25=16 1465839830100400200
## where weather and airquality are the measurement names, location and season are tags,
## and temperature, humidity, no2, pm25 are fields.
## In multi-table mode:
## - first line will be ingested to table named weather
## - second line will be ingested to table named airquality
## - the tags will be represented as dimensions
## - first table (weather) will have two records:
## one with measurement name equals to temperature,
## another with measurement name equals to humidity
## - second table (airquality) will have two records:
## one with measurement name equals to no2,
## another with measurement name equals to pm25
## - the Timestream tables from the example will look like this:
## TABLE "weather":
## time | location | season | measure_name | measure_value::bigint
## 2016-06-13 17:43:50 | us-midwest | summer | temperature | 82
## 2016-06-13 17:43:50 | us-midwest | summer | humidity | 71
## TABLE "airquality":
## time | location | measure_name | measure_value::bigint
## 2016-06-13 17:43:50 | us-west | no2 | 5
## 2016-06-13 17:43:50 | us-west | pm25 | 16
## In single-table mode:
## - the data will be ingested to a single table, which name will be valueOf(single_table_name)
## - measurement name will stored in dimension named valueOf(single_table_dimension_name_for_telegraf_measurement_name)
## - location and season will be represented as dimensions
## - temperature, humidity, no2, pm25 will be represented as measurement name
## - the Timestream table from the example will look like this:
## Assuming:
## - single_table_name = "my_readings"
## - single_table_dimension_name_for_telegraf_measurement_name = "namespace"
## TABLE "my_readings":
## time | location | season | namespace | measure_name | measure_value::bigint
## 2016-06-13 17:43:50 | us-midwest | summer | weather | temperature | 82
## 2016-06-13 17:43:50 | us-midwest | summer | weather | humidity | 71
## 2016-06-13 17:43:50 | us-west | NULL | airquality | no2 | 5
## 2016-06-13 17:43:50 | us-west | NULL | airquality | pm25 | 16
## In most cases, using multi-table mapping mode is recommended.
## However, you can consider using single-table in situations when you have thousands of measurement names.
## When mapping_mode is set to single-table, all of the data is stored in a single table.
## When mapping_mode is set to multi-table, the data is organized and stored in multiple tables.
## The default is multi-table.
mapping_mode = "multi-table"
## Only valid and required for mapping_mode = "single-table"
## Specifies the Timestream table where the metrics will be uploaded.
# single_table_name = "yourTableNameHere"
## Only valid and required for mapping_mode = "single-table"
## Describes what will be the Timestream dimension name for the Telegraf
## measurement name.
# single_table_dimension_name_for_telegraf_measurement_name = "namespace"
## Specifies if the plugin should create the table, if the table do not exist.
## The plugin writes the data without prior checking if the table exists.
## When the table does not exist, the error returned from Timestream will cause
## the plugin to create the table, if this parameter is set to true.
## Specifies if the plugin should create the table, if the table does not exist.
create_table_if_not_exists = true
## Only valid and required if create_table_if_not_exists = true
## Specifies the Timestream table magnetic store retention period in days.
## Check Timestream documentation for more details.
## NOTE: This property is valid when create_table_if_not_exists = true.
create_table_magnetic_store_retention_period_in_days = 365
## Only valid and required if create_table_if_not_exists = true
## Specifies the Timestream table memory store retention period in hours.
## Check Timestream documentation for more details.
## NOTE: This property is valid when create_table_if_not_exists = true.
create_table_memory_store_retention_period_in_hours = 24
## Specifies how the data is written into Timestream.
## Valid values are: true, false
## When use_multi_measure_records is set to true, all of the tags and fields are stored
## as a single row in a Timestream table.
## When use_multi_measure_record is set to false, Timestream stores each field in a
## separate table row, thereby storing the tags multiple times (once for each field).
## The recommended setting is true.
## The default is false.
use_multi_measure_records = "false"
## Specifies the measure_name to use when sending multi-measure records.
## NOTE: This property is valid when use_multi_measure_records=true and mapping_mode=multi-table
measure_name_for_multi_measure_records = "telegraf_measure"
## Specifies the name of the table to write data into
## NOTE: This property is valid when mapping_mode=single-table.
# single_table_name = ""
## Specifies the name of dimension when all of the data is being stored in a single table
## and the measurement name is transformed into the dimension value
## (see Mapping data from Influx to Timestream for details)
## NOTE: This property is valid when mapping_mode=single-table.
# single_table_dimension_name_for_telegraf_measurement_name = "namespace"
## Only valid and optional if create_table_if_not_exists = true
## Specifies the Timestream table tags.
## Check Timestream documentation for more details
@ -135,6 +109,9 @@ API endpoint. In the following order the plugin will attempt to authenticate.
## Specify the maximum number of parallel go routines to ingest/write data
## If not specified, defaulted to 1 go routines
max_write_go_routines = 25
## Please see README.md to know how line protocol data is mapped to Timestream
##
```
### Batching
@ -181,8 +158,111 @@ Execute unit tests with:
go test -v ./plugins/outputs/timestream/...
```
[Amazon Timestream]: https://aws.amazon.com/timestream/
[Assumed credentials via STS]: https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/credentials/stscreds
[Environment Variables]: https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#environment-variables
[Shared Credentials]: https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#shared-credentials-file
[EC2 Instance Profile]: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
### Mapping data from Influx to Timestream
When writing data from Influx to Timestream,
data is written by default as follows:
1. The timestamp is written as the time field.
2. Tags are written as dimensions.
3. Fields are written as measures.
4. Measurements are written as table names.
For example, consider the following data in line protocol format:
> weather,location=us-midwest,season=summer temperature=82,humidity=71 1465839830100400200
> airquality,location=us-west no2=5,pm25=16 1465839830100400200
where:
`weather` and `airquality` are the measurement names,
`location` and `season` are tags,
`temperature`, `humidity`, `no2`, `pm25` are fields.
When you choose to create a separate table for each measurement and store
multiple fields in a single table row, the data will be written into
Timestream as:
1. The plugin will create 2 tables, namely, weather and airquality (mapping_mode=multi-table).
2. The tables may contain multiple fields in a single table row (use_multi_measure_records=true).
3. The table weather will contain the following columns and data:
| time | location | season | measure_name | temperature | humidity |
| :--- | :--- | :--- | :--- | :--- | :--- |
| 2016-06-13 17:43:50 | us-midwest | summer | `<measure_name_for_multi_measure_records>` | 82 | 71|
4. The table airquality will contain the following columns and data:
| time | location | measure_name | no2 | pm25 |
| :--- | :--- | :--- | :--- | :--- |
|2016-06-13 17:43:50 | us-west | `<measure_name_for_multi_measure_records>` | 5 | 16 |
NOTE:
`<measure_name_for_multi_measure_records>` represents the actual
value of that property.
You can also choose to create a separate table per measurement and store
each field in a separate row per table. In that case:
1. The plugin will create 2 tables, namely, weather and airquality (mapping_mode=multi-table).
2. Each table row will contain a single field only (use_multi_measure_records=false).
3. The table weather will contain the following columns and data:
| time | location | season | measure_name | measure_value::bigint |
| :--- | :--- | :--- | :--- | :--- |
| 2016-06-13 17:43:50 | us-midwest | summer | temperature | 82 |
| 2016-06-13 17:43:50 | us-midwest | summer | humidity | 71 |
4. The table airquality will contain the following columns and data:
| time | location | measure_name | measure_value::bigint |
| :--- | :--- | :--- | :--- |
| 2016-06-13 17:43:50 | us-west | no2 | 5 |
| 2016-06-13 17:43:50 | us-west | pm25 | 16 |
You can also choose to store all the measurements in a single table and
store all fields in a single table row. In that case:
1. This plugin will create a table with name <single_table_name> (mapping_mode=single-table).
2. The table may contain multiple fields in a single table row (use_multi_measure_records=true).
3. The table will contain the following column and data:
| time | location | season | `<single_table_dimension_name_for_telegraf_measurement_name>`| measure_name | temperature | humidity | no2 | pm25 |
| :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- |
| 2016-06-13 17:43:50 | us-midwest | summer | weather | `<measure_name_for_multi_measure_records>` | 82 | 71 | null | null |
| 2016-06-13 17:43:50 | us-west | null | airquality | `<measure_name_for_multi_measure_records>` | null | null | 5 | 16 |
NOTE:
`<single_table_name>` represents the actual value of that property.
`<single_table_dimension_name_for_telegraf_measurement_name>` represents
the actual value of that property.
`<measure_name_for_multi_measure_records>` represents the actual value of
that property.
Furthermore, you can choose to store all the measurements in a single table
and store each field in a separate table row. In that case:
1. Timestream will create a table with name <single_table_name> (mapping_mode=single-table).
2. Each table row will contain a single field only (use_multi_measure_records=false).
3. The table will contain the following column and data:
| time | location | season | namespace | measure_name | measure_value::bigint |
| :--- | :--- | :--- | :--- | :--- | :--- |
| 2016-06-13 17:43:50 | us-midwest | summer | weather | temperature | 82 |
| 2016-06-13 17:43:50 | us-midwest | summer | weather | humidity | 71 |
| 2016-06-13 17:43:50 | us-west | NULL | airquality | no2 | 5 |
| 2016-06-13 17:43:50 | us-west | NULL | airquality | pm25 | 16 |
NOTE:
`<single_table_name>` represents the actual value of that property.
`<single_table_dimension_name_for_telegraf_measurement_name>` represents the
actual value of that property.
`<measure_name_for_multi_measure_records>` represents the actual value of
that property.
### References
- [Amazon Timestream](https://aws.amazon.com/timestream/)
- [Assumed credentials via STS](https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/credentials/stscreds)
- [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#environment-variables)
- [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#shared-credentials-file)
- [EC2 Instance Profile](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)

View File

@ -4,7 +4,7 @@
region = "us-east-1"
## Amazon Credentials
## Credentials are loaded in the following order
## Credentials are loaded in the following order:
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
## 2) Assumed credentials via STS if role_arn is specified
## 3) explicit credentials from 'access_key' and 'secret_key'
@ -37,76 +37,50 @@
## and therefore the Telegraf agent will not start.
describe_database_on_start = false
## The mapping mode specifies how Telegraf records are represented in Timestream.
## Specifies how the data is organized in Timestream.
## Valid values are: single-table, multi-table.
## For example, consider the following data in line protocol format:
## weather,location=us-midwest,season=summer temperature=82,humidity=71 1465839830100400200
## airquality,location=us-west no2=5,pm25=16 1465839830100400200
## where weather and airquality are the measurement names, location and season are tags,
## and temperature, humidity, no2, pm25 are fields.
## In multi-table mode:
## - first line will be ingested to table named weather
## - second line will be ingested to table named airquality
## - the tags will be represented as dimensions
## - first table (weather) will have two records:
## one with measurement name equals to temperature,
## another with measurement name equals to humidity
## - second table (airquality) will have two records:
## one with measurement name equals to no2,
## another with measurement name equals to pm25
## - the Timestream tables from the example will look like this:
## TABLE "weather":
## time | location | season | measure_name | measure_value::bigint
## 2016-06-13 17:43:50 | us-midwest | summer | temperature | 82
## 2016-06-13 17:43:50 | us-midwest | summer | humidity | 71
## TABLE "airquality":
## time | location | measure_name | measure_value::bigint
## 2016-06-13 17:43:50 | us-west | no2 | 5
## 2016-06-13 17:43:50 | us-west | pm25 | 16
## In single-table mode:
## - the data will be ingested to a single table, which name will be valueOf(single_table_name)
## - measurement name will stored in dimension named valueOf(single_table_dimension_name_for_telegraf_measurement_name)
## - location and season will be represented as dimensions
## - temperature, humidity, no2, pm25 will be represented as measurement name
## - the Timestream table from the example will look like this:
## Assuming:
## - single_table_name = "my_readings"
## - single_table_dimension_name_for_telegraf_measurement_name = "namespace"
## TABLE "my_readings":
## time | location | season | namespace | measure_name | measure_value::bigint
## 2016-06-13 17:43:50 | us-midwest | summer | weather | temperature | 82
## 2016-06-13 17:43:50 | us-midwest | summer | weather | humidity | 71
## 2016-06-13 17:43:50 | us-west | NULL | airquality | no2 | 5
## 2016-06-13 17:43:50 | us-west | NULL | airquality | pm25 | 16
## In most cases, using multi-table mapping mode is recommended.
## However, you can consider using single-table in situations when you have thousands of measurement names.
## When mapping_mode is set to single-table, all of the data is stored in a single table.
## When mapping_mode is set to multi-table, the data is organized and stored in multiple tables.
## The default is multi-table.
mapping_mode = "multi-table"
## Only valid and required for mapping_mode = "single-table"
## Specifies the Timestream table where the metrics will be uploaded.
# single_table_name = "yourTableNameHere"
## Only valid and required for mapping_mode = "single-table"
## Describes what will be the Timestream dimension name for the Telegraf
## measurement name.
# single_table_dimension_name_for_telegraf_measurement_name = "namespace"
## Specifies if the plugin should create the table, if the table do not exist.
## The plugin writes the data without prior checking if the table exists.
## When the table does not exist, the error returned from Timestream will cause
## the plugin to create the table, if this parameter is set to true.
## Specifies if the plugin should create the table, if the table does not exist.
create_table_if_not_exists = true
## Only valid and required if create_table_if_not_exists = true
## Specifies the Timestream table magnetic store retention period in days.
## Check Timestream documentation for more details.
## NOTE: This property is valid when create_table_if_not_exists = true.
create_table_magnetic_store_retention_period_in_days = 365
## Only valid and required if create_table_if_not_exists = true
## Specifies the Timestream table memory store retention period in hours.
## Check Timestream documentation for more details.
## NOTE: This property is valid when create_table_if_not_exists = true.
create_table_memory_store_retention_period_in_hours = 24
## Specifies how the data is written into Timestream.
## Valid values are: true, false
## When use_multi_measure_records is set to true, all of the tags and fields are stored
## as a single row in a Timestream table.
## When use_multi_measure_record is set to false, Timestream stores each field in a
## separate table row, thereby storing the tags multiple times (once for each field).
## The recommended setting is true.
## The default is false.
use_multi_measure_records = "false"
## Specifies the measure_name to use when sending multi-measure records.
## NOTE: This property is valid when use_multi_measure_records=true and mapping_mode=multi-table
measure_name_for_multi_measure_records = "telegraf_measure"
## Specifies the name of the table to write data into
## NOTE: This property is valid when mapping_mode=single-table.
# single_table_name = ""
## Specifies the name of dimension when all of the data is being stored in a single table
## and the measurement name is transformed into the dimension value
## (see Mapping data from Influx to Timestream for details)
## NOTE: This property is valid when mapping_mode=single-table.
# single_table_dimension_name_for_telegraf_measurement_name = "namespace"
## Only valid and optional if create_table_if_not_exists = true
## Specifies the Timestream table tags.
## Check Timestream documentation for more details
@ -115,3 +89,6 @@
## Specify the maximum number of parallel go routines to ingest/write data
## If not specified, defaulted to 1 go routines
max_write_go_routines = 25
## Please see README.md to know how line protocol data is mapped to Timestream
##

View File

@ -34,6 +34,9 @@ type (
SingleTableName string `toml:"single_table_name"`
SingleTableDimensionNameForTelegrafMeasurementName string `toml:"single_table_dimension_name_for_telegraf_measurement_name"`
UseMultiMeasureRecords bool `toml:"use_multi_measure_records"`
MeasureNameForMultiMeasureRecords string `toml:"measure_name_for_multi_measure_records"`
CreateTableIfNotExists bool `toml:"create_table_if_not_exists"`
CreateTableMagneticStoreRetentionPeriodInDays int64 `toml:"create_table_magnetic_store_retention_period_in_days"`
CreateTableMemoryStoreRetentionPeriodInHours int64 `toml:"create_table_memory_store_retention_period_in_hours"`
@ -132,10 +135,16 @@ func (t *Timestream) Connect() error {
return fmt.Errorf("in '%s' mapping mode, SingleTableName key is required", MappingModeSingleTable)
}
if t.SingleTableDimensionNameForTelegrafMeasurementName == "" {
if t.SingleTableDimensionNameForTelegrafMeasurementName == "" && !t.UseMultiMeasureRecords {
return fmt.Errorf("in '%s' mapping mode, SingleTableDimensionNameForTelegrafMeasurementName key is required",
MappingModeSingleTable)
}
// When using MappingModeSingleTable with UseMultiMeasureRecords enabled,
// measurementName ( from line protocol ) is mapped to multiMeasure name in timestream.
if t.UseMultiMeasureRecords && t.MeasureNameForMultiMeasureRecords != "" {
return fmt.Errorf("in '%s' mapping mode, with multi-measure enabled, key MeasureNameForMultiMeasureRecords is invalid", MappingModeMultiTable)
}
}
if t.MappingMode == MappingModeMultiTable {
@ -146,6 +155,13 @@ func (t *Timestream) Connect() error {
if t.SingleTableDimensionNameForTelegrafMeasurementName != "" {
return fmt.Errorf("in '%s' mapping mode, do not specify SingleTableDimensionNameForTelegrafMeasurementName key", MappingModeMultiTable)
}
// When using MappingModeMultiTable ( data is ingested to multiple tables ) with
// UseMultiMeasureRecords enabled, measurementName is used as tableName in timestream and
// we require MeasureNameForMultiMeasureRecords to be configured.
if t.UseMultiMeasureRecords && t.MeasureNameForMultiMeasureRecords == "" {
return fmt.Errorf("in '%s' mapping mode, with multi-measure enabled, key MeasureNameForMultiMeasureRecords is required", MappingModeMultiTable)
}
}
if t.CreateTableIfNotExists {
@ -252,8 +268,6 @@ func (t *Timestream) Write(metrics []telegraf.Metric) error {
}
func (t *Timestream) writeToTimestream(writeRecordsInput *timestreamwrite.WriteRecordsInput, resourceNotFoundRetry bool) error {
t.Log.Debugf("Writing to Timestream: '%v' with ResourceNotFoundRetry: '%t'", writeRecordsInput, resourceNotFoundRetry)
_, err := t.svc.WriteRecords(context.Background(), writeRecordsInput)
if err != nil {
// Telegraf will retry ingesting the metrics if an error is returned from the plugin.
@ -266,11 +280,18 @@ func (t *Timestream) writeToTimestream(writeRecordsInput *timestreamwrite.WriteR
return t.createTableAndRetry(writeRecordsInput)
}
t.logWriteToTimestreamError(notFound, writeRecordsInput.TableName)
// log error and return error to telegraf to retry in next flush interval
// We need this is to avoid data drop when there are no tables present in the database
return fmt.Errorf("failed to write to Timestream database '%s' table '%s', Error: '%s'",
t.DatabaseName, *writeRecordsInput.TableName, err)
}
var rejected *types.RejectedRecordsException
if errors.As(err, &rejected) {
t.logWriteToTimestreamError(err, writeRecordsInput.TableName)
for _, rr := range rejected.RejectedRecords {
t.Log.Errorf("reject reason: '%s', record index: '%d'", aws.ToString(rr.Reason), rr.RecordIndex)
}
return nil
}
@ -412,7 +433,7 @@ func (t *Timestream) buildDimensions(point telegraf.Metric) []types.Dimension {
}
dimensions = append(dimensions, dimension)
}
if t.MappingMode == MappingModeSingleTable {
if t.MappingMode == MappingModeSingleTable && !t.UseMultiMeasureRecords {
dimension := types.Dimension{
Name: aws.String(t.SingleTableDimensionNameForTelegrafMeasurementName),
Value: aws.String(point.Name()),
@ -427,6 +448,13 @@ func (t *Timestream) buildDimensions(point telegraf.Metric) []types.Dimension {
// Records with unsupported Metric Field type are skipped.
// It returns an array of Timestream write records.
func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record {
if t.UseMultiMeasureRecords {
return t.buildMultiMeasureWriteRecords(point)
}
return t.buildSingleWriteRecords(point)
}
func (t *Timestream) buildSingleWriteRecords(point telegraf.Metric) []types.Record {
var records []types.Record
dimensions := t.buildDimensions(point)
@ -434,7 +462,7 @@ func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record {
for fieldName, fieldValue := range point.Fields() {
stringFieldValue, stringFieldValueType, ok := convertValue(fieldValue)
if !ok {
t.Log.Errorf("Skipping field '%s'. The type '%s' is not supported in Timestream as MeasureValue. "+
t.Log.Warnf("Skipping field '%s'. The type '%s' is not supported in Timestream as MeasureValue. "+
"Supported values are: [int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool]",
fieldName, reflect.TypeOf(fieldValue))
continue
@ -455,6 +483,48 @@ func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record {
return records
}
func (t *Timestream) buildMultiMeasureWriteRecords(point telegraf.Metric) []types.Record {
var records []types.Record
dimensions := t.buildDimensions(point)
multiMeasureName := t.MeasureNameForMultiMeasureRecords
if t.MappingMode == MappingModeSingleTable {
multiMeasureName = point.Name()
}
var multiMeasures []types.MeasureValue
for fieldName, fieldValue := range point.Fields() {
stringFieldValue, stringFieldValueType, ok := convertValue(fieldValue)
if !ok {
t.Log.Warnf("Skipping field '%s'. The type '%s' is not supported in Timestream as MeasureValue. "+
"Supported values are: [int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool]",
fieldName, reflect.TypeOf(fieldValue))
continue
}
multiMeasures = append(multiMeasures, types.MeasureValue{
Name: aws.String(fieldName),
Type: stringFieldValueType,
Value: aws.String(stringFieldValue),
})
}
timeUnit, timeValue := getTimestreamTime(point.Time())
record := types.Record{
MeasureName: aws.String(multiMeasureName),
MeasureValueType: "MULTI",
MeasureValues: multiMeasures,
Dimensions: dimensions,
Time: aws.String(timeValue),
TimeUnit: timeUnit,
}
records = append(records, record)
return records
}
// partitionRecords splits the Timestream records into smaller slices of a max size
// so that are under the limit for the Timestream API call.
// It returns the array of array of records.

View File

@ -37,12 +37,15 @@ const time2Epoch = "1257894000"
const metricName1 = "metricName1"
const metricName2 = "metricName2"
type mockTimestreamClient struct{}
type mockTimestreamClient struct {
WriteRecordsRequestCount int
}
func (m *mockTimestreamClient) CreateTable(context.Context, *timestreamwrite.CreateTableInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.CreateTableOutput, error) {
return nil, nil
}
func (m *mockTimestreamClient) WriteRecords(context.Context, *timestreamwrite.WriteRecordsInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error) {
m.WriteRecordsRequestCount++
return nil, nil
}
func (m *mockTimestreamClient) DescribeDatabase(context.Context, *timestreamwrite.DescribeDatabaseInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.DescribeDatabaseOutput, error) {
@ -70,6 +73,49 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
}
require.Contains(t, incorrectMappingMode.Connect().Error(), "single-table")
//multi-measure config validation multi table mode
validConfigMultiMeasureMultiTableMode := Timestream{
DatabaseName: tsDbName,
MappingMode: MappingModeMultiTable,
UseMultiMeasureRecords: true,
MeasureNameForMultiMeasureRecords: "multi-measure-name",
Log: testutil.Logger{},
}
require.Nil(t, validConfigMultiMeasureMultiTableMode.Connect())
invalidConfigMultiMeasureMultiTableMode := Timestream{
DatabaseName: tsDbName,
MappingMode: MappingModeMultiTable,
UseMultiMeasureRecords: true,
// without MeasureNameForMultiMeasureRecords set we expect validation failure
Log: testutil.Logger{},
}
require.Contains(t, invalidConfigMultiMeasureMultiTableMode.Connect().Error(), "MeasureNameForMultiMeasureRecords")
// multi-measure config validation single table mode
validConfigMultiMeasureSingleTableMode := Timestream{
DatabaseName: tsDbName,
MappingMode: MappingModeSingleTable,
SingleTableName: testSingleTableName,
UseMultiMeasureRecords: true, // MeasureNameForMultiMeasureRecords is not needed as
// measurement name (from telegraf metric) is used as multi-measure name in TS
Log: testutil.Logger{},
}
require.Nil(t, validConfigMultiMeasureSingleTableMode.Connect())
invalidConfigMultiMeasureSingleTableMode := Timestream{
DatabaseName: tsDbName,
MappingMode: MappingModeSingleTable,
SingleTableName: testSingleTableName,
UseMultiMeasureRecords: true,
MeasureNameForMultiMeasureRecords: "multi-measure-name",
// value of MeasureNameForMultiMeasureRecords will be ignored and
// measurement name (from telegraf metric) is used as multi-measure name in TS
Log: testutil.Logger{},
}
err := invalidConfigMultiMeasureSingleTableMode.Connect()
require.ErrorContains(t, err, "MeasureNameForMultiMeasureRecords")
// multi-table arguments
validMappingModeMultiTable := Timestream{
DatabaseName: tsDbName,
@ -161,6 +207,269 @@ func TestConnectValidatesConfigParameters(t *testing.T) {
require.Contains(t, describeTableInvoked.Connect().Error(), "hello from DescribeDatabase")
}
func TestWriteMultiMeasuresSingleTableMode(t *testing.T) {
const recordCount = 100
mockClient := &mockTimestreamClient{0}
WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) {
return mockClient, nil
}
localTime, _ := strconv.Atoi(time1Epoch)
var inputs []telegraf.Metric
for i := 1; i <= recordCount+1; i++ {
localTime++
fieldName1 := "value_supported1" + strconv.Itoa(i)
fieldName2 := "value_supported2" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
"multi_measure_name",
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName1: float64(10),
fieldName2: float64(20),
},
time.Unix(int64(localTime), 0),
))
}
plugin := Timestream{
MappingMode: MappingModeSingleTable,
SingleTableName: "test-multi-single-table-mode",
DatabaseName: tsDbName,
UseMultiMeasureRecords: true, // use multi
Log: testutil.Logger{},
}
// validate config correctness
require.NoError(t, plugin.Connect())
// validate multi-record generation
result := plugin.TransformMetrics(inputs)
// 'inputs' has a total of 101 metrics transformed to 2 writeRecord calls to TS
require.Equal(t, 2, len(result), "Expected 2 WriteRecordsInput requests")
var transformedRecords []types.Record
for _, r := range result {
transformedRecords = append(transformedRecords, r.Records...)
// Assert that we use measure name from input
require.Equal(t, *r.Records[0].MeasureName, "multi_measure_name")
}
// Expected 101 records
require.Equal(t, recordCount+1, len(transformedRecords), "Expected 101 records after transforming")
// validate write to TS
err := plugin.Write(inputs)
require.Nil(t, err, "Write to Timestream failed")
require.Equal(t, mockClient.WriteRecordsRequestCount, 2, "Expected 2 WriteRecords calls")
}
func TestWriteMultiMeasuresMultiTableMode(t *testing.T) {
const recordCount = 100
mockClient := &mockTimestreamClient{0}
WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) {
return mockClient, nil
}
localTime, _ := strconv.Atoi(time1Epoch)
var inputs []telegraf.Metric
for i := 1; i <= recordCount; i++ {
localTime++
fieldName1 := "value_supported1" + strconv.Itoa(i)
fieldName2 := "value_supported2" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
"multi_measure_name",
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName1: float64(10),
fieldName2: float64(20),
},
time.Unix(int64(localTime), 0),
))
}
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDbName,
UseMultiMeasureRecords: true, // use multi
MeasureNameForMultiMeasureRecords: "config-multi-measure-name",
Log: testutil.Logger{},
}
// validate config correctness
err := plugin.Connect()
require.Nil(t, err, "Invalid configuration")
// validate multi-record generation
result := plugin.TransformMetrics(inputs)
// 'inputs' has a total of 101 metrics transformed to 2 writeRecord calls to TS
require.Equal(t, 1, len(result), "Expected 1 WriteRecordsInput requests")
// Assert that we use measure name from config
require.Equal(t, *result[0].Records[0].MeasureName, "config-multi-measure-name")
var transformedRecords []types.Record
for _, r := range result {
transformedRecords = append(transformedRecords, r.Records...)
}
// Expected 100 records
require.Equal(t, recordCount, len(transformedRecords), "Expected 100 records after transforming")
for _, input := range inputs {
fmt.Println("Input", input)
fmt.Println(*result[0].Records[0].MeasureName)
break
}
// validate successful write to TS
err = plugin.Write(inputs)
require.Nil(t, err, "Write to Timestream failed")
require.Equal(t, mockClient.WriteRecordsRequestCount, 1, "Expected 1 WriteRecords call")
}
func TestBuildMultiMeasuresInSingleAndMultiTableMode(t *testing.T) {
input1 := testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
"measureDouble": aws.Float64(10),
},
time1,
)
input2 := testutil.MustMetric(
metricName1,
map[string]string{"tag2": "value2"},
map[string]interface{}{
"measureBigint": aws.Int32(20),
},
time1,
)
input3 := testutil.MustMetric(
metricName1,
map[string]string{"tag3": "value3"},
map[string]interface{}{
"measureVarchar": "DUMMY",
},
time1,
)
input4 := testutil.MustMetric(
metricName1,
map[string]string{"tag4": "value4"},
map[string]interface{}{
"measureBool": true,
},
time1,
)
expectedResultMultiTable := buildExpectedMultiRecords("config-multi-measure-name", metricName1)
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDbName,
UseMultiMeasureRecords: true, // use multi
MeasureNameForMultiMeasureRecords: "config-multi-measure-name",
Log: testutil.Logger{},
}
// validate config correctness
err := plugin.Connect()
require.Nil(t, err, "Invalid configuration")
// validate multi-record generation with MappingModeMultiTable
result := plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4})
require.Equal(t, 1, len(result), "Expected 1 WriteRecordsInput requests")
require.EqualValues(t, result[0], expectedResultMultiTable)
require.True(t, arrayContains(result, expectedResultMultiTable), "Expected that the list of requests to Timestream: %+v\n "+
"will contain request: %+v\n\n", result, expectedResultMultiTable)
// singleTableMode
plugin = Timestream{
MappingMode: MappingModeSingleTable,
SingleTableName: "singleTableName",
DatabaseName: tsDbName,
UseMultiMeasureRecords: true, // use multi
Log: testutil.Logger{},
}
// validate config correctness
err = plugin.Connect()
require.Nil(t, err, "Invalid configuration")
expectedResultSingleTable := buildExpectedMultiRecords(metricName1, "singleTableName")
// validate multi-record generation with MappingModeSingleTable
result = plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4})
require.Equal(t, 1, len(result), "Expected 1 WriteRecordsInput requests")
require.EqualValues(t, result[0], expectedResultSingleTable)
require.True(t, arrayContains(result, expectedResultSingleTable), "Expected that the list of requests to Timestream: %+v\n "+
"will contain request: %+v\n\n", result, expectedResultSingleTable)
}
func buildExpectedMultiRecords(multiMeasureName string, tableName string) *timestreamwrite.WriteRecordsInput {
var recordsMultiTableMode []types.Record
recordDouble := buildMultiRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag1": "value1"},
measureValues: map[string]string{"measureDouble": "10"},
}}, multiMeasureName, types.MeasureValueTypeDouble)
recordsMultiTableMode = append(recordsMultiTableMode, recordDouble...)
recordBigint := buildMultiRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag2": "value2"},
measureValues: map[string]string{"measureBigint": "20"},
}}, multiMeasureName, types.MeasureValueTypeBigint)
recordsMultiTableMode = append(recordsMultiTableMode, recordBigint...)
recordVarchar := buildMultiRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag3": "value3"},
measureValues: map[string]string{"measureVarchar": "DUMMY"},
}}, multiMeasureName, types.MeasureValueTypeVarchar)
recordsMultiTableMode = append(recordsMultiTableMode, recordVarchar...)
recordBool := buildMultiRecords([]SimpleInput{
{
t: time1Epoch,
tableName: metricName1,
dimensions: map[string]string{"tag4": "value4"},
measureValues: map[string]string{"measureBool": "true"},
},
}, multiMeasureName, types.MeasureValueTypeBoolean)
recordsMultiTableMode = append(recordsMultiTableMode, recordBool...)
expectedResultMultiTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName),
TableName: aws.String(tableName),
Records: recordsMultiTableMode,
CommonAttributes: &types.Record{},
}
return expectedResultMultiTable
}
type mockTimestreamErrorClient struct {
ErrorToReturnOnWriteRecords error
}
@ -226,6 +535,85 @@ func TestRejectedRecordsErrorResultsInMetricsBeingSkipped(t *testing.T) {
require.Nil(t, err, "Expected to silently swallow the RejectedRecordsException, "+
"as retrying this error doesn't make sense.")
}
func TestWriteWhenRequestsGreaterThanMaxWriteGoRoutinesCount(t *testing.T) {
t.Skip("Skipping test due to data race, will be re-visited")
const maxWriteRecordsCalls = 5
const maxRecordsInWriteRecordsCall = 100
const totalRecords = maxWriteRecordsCalls * maxRecordsInWriteRecordsCall
mockClient := &mockTimestreamClient{0}
WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) {
return mockClient, nil
}
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDbName,
// Spawn only one go routine to serve all 5 write requests
MaxWriteGoRoutinesCount: 2,
Log: testutil.Logger{},
}
require.NoError(t, plugin.Connect())
var inputs []telegraf.Metric
for i := 1; i <= totalRecords; i++ {
fieldName := "value_supported" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName: float64(10),
},
time1,
))
}
err := plugin.Write(inputs)
require.Nil(t, err, "Expected to write without any errors ")
require.Equal(t, mockClient.WriteRecordsRequestCount, maxWriteRecordsCalls, "Expected 5 calls to WriteRecords")
}
func TestWriteWhenRequestsLesserThanMaxWriteGoRoutinesCount(t *testing.T) {
t.Skip("Skipping test due to data race, will be re-visited")
const maxWriteRecordsCalls = 2
const maxRecordsInWriteRecordsCall = 100
const totalRecords = maxWriteRecordsCalls * maxRecordsInWriteRecordsCall
mockClient := &mockTimestreamClient{0}
WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) {
return mockClient, nil
}
plugin := Timestream{
MappingMode: MappingModeMultiTable,
DatabaseName: tsDbName,
// Spawn 5 parallel go routines to serve 2 write requests
// In this case only 2 of the 5 go routines will process the write requests
MaxWriteGoRoutinesCount: 5,
Log: testutil.Logger{},
}
require.NoError(t, plugin.Connect())
var inputs []telegraf.Metric
for i := 1; i <= totalRecords; i++ {
fieldName := "value_supported" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName: float64(10),
},
time1,
))
}
err := plugin.Write(inputs)
require.Nil(t, err, "Expected to write without any errors ")
require.Equal(t, mockClient.WriteRecordsRequestCount, maxWriteRecordsCalls, "Expected 5 calls to WriteRecords")
}
func TestTransformMetricsSkipEmptyMetric(t *testing.T) {
input1 := testutil.MustMetric(
@ -360,6 +748,72 @@ func TestTransformMetricsRequestsAboveLimitAreSplit(t *testing.T) {
[]*timestreamwrite.WriteRecordsInput{expectedResult1MultiTable, expectedResult2MultiTable})
}
func TestTransformMetricsRequestsAboveLimitAreSplitSingleTable(t *testing.T) {
const maxRecordsInWriteRecordsCall = 100
localTime, _ := strconv.Atoi(time1Epoch)
var inputs []telegraf.Metric
for i := 1; i <= maxRecordsInWriteRecordsCall+1; i++ {
localTime++
fieldName := "value_supported" + strconv.Itoa(i)
inputs = append(inputs, testutil.MustMetric(
metricName1,
map[string]string{"tag1": "value1"},
map[string]interface{}{
fieldName: float64(10),
},
time.Unix(int64(localTime), 0),
))
}
localTime, _ = strconv.Atoi(time1Epoch)
var recordsFirstReq []types.Record
for i := 1; i <= maxRecordsInWriteRecordsCall; i++ {
localTime++
recordsFirstReq = append(recordsFirstReq, buildRecord(SimpleInput{
t: strconv.Itoa(localTime),
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported" + strconv.Itoa(i): "10"},
})...)
}
expectedResult1SingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName),
TableName: aws.String(testSingleTableName),
Records: recordsFirstReq,
CommonAttributes: &types.Record{},
}
var recordsSecondReq []types.Record
localTime++
recordsSecondReq = append(recordsSecondReq, buildRecord(SimpleInput{
t: strconv.Itoa(localTime),
tableName: testSingleTableName,
dimensions: map[string]string{"tag1": "value1", testSingleTableDim: metricName1},
measureValues: map[string]string{"value_supported" + strconv.Itoa(maxRecordsInWriteRecordsCall+1): "10"},
})...)
expectedResult2SingleTable := &timestreamwrite.WriteRecordsInput{
DatabaseName: aws.String(tsDbName),
TableName: aws.String(testSingleTableName),
Records: recordsSecondReq,
CommonAttributes: &types.Record{},
}
comparisonTest(t, MappingModeSingleTable,
inputs,
[]*timestreamwrite.WriteRecordsInput{expectedResult1SingleTable, expectedResult2SingleTable})
}
func TestTransformMetricsDifferentDimensionsSameTimestampsAreWrittenSeparate(t *testing.T) {
input1 := testutil.MustMetric(
metricName1,
@ -858,3 +1312,37 @@ func buildRecord(input SimpleInput) []types.Record {
return tsRecords
}
func buildMultiRecords(inputs []SimpleInput, multiMeasureName string, measureType types.MeasureValueType) []types.Record {
var tsRecords []types.Record
for _, input := range inputs {
var multiMeasures []types.MeasureValue
var tsDimensions []types.Dimension
for k, v := range input.dimensions {
tsDimensions = append(tsDimensions, types.Dimension{
Name: aws.String(k),
Value: aws.String(v),
})
}
for k, v := range input.measureValues {
multiMeasures = append(multiMeasures, types.MeasureValue{
Name: aws.String(k),
Value: aws.String(v),
Type: measureType,
})
}
tsRecords = append(tsRecords, types.Record{
MeasureName: aws.String(multiMeasureName),
MeasureValueType: "MULTI",
MeasureValues: multiMeasures,
Dimensions: tsDimensions,
Time: aws.String(input.t),
TimeUnit: timeUnit,
})
}
return tsRecords
}