609 lines
22 KiB
Go
609 lines
22 KiB
Go
|
|
package timestream
|
||
|
|
|
||
|
|
import (
|
||
|
|
"encoding/binary"
|
||
|
|
"fmt"
|
||
|
|
"hash/fnv"
|
||
|
|
"reflect"
|
||
|
|
"strconv"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/influxdata/telegraf"
|
||
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||
|
|
|
||
|
|
"github.com/aws/aws-sdk-go/aws"
|
||
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||
|
|
"github.com/aws/aws-sdk-go/service/timestreamwrite"
|
||
|
|
internalaws "github.com/influxdata/telegraf/config/aws"
|
||
|
|
)
|
||
|
|
|
||
|
|
type (
|
||
|
|
Timestream struct {
|
||
|
|
Region string `toml:"region"`
|
||
|
|
AccessKey string `toml:"access_key"`
|
||
|
|
SecretKey string `toml:"secret_key"`
|
||
|
|
RoleARN string `toml:"role_arn"`
|
||
|
|
Profile string `toml:"profile"`
|
||
|
|
Filename string `toml:"shared_credential_file"`
|
||
|
|
Token string `toml:"token"`
|
||
|
|
EndpointURL string `toml:"endpoint_url"`
|
||
|
|
|
||
|
|
MappingMode string `toml:"mapping_mode"`
|
||
|
|
DescribeDatabaseOnStart bool `toml:"describe_database_on_start"`
|
||
|
|
DatabaseName string `toml:"database_name"`
|
||
|
|
|
||
|
|
SingleTableName string `toml:"single_table_name"`
|
||
|
|
SingleTableDimensionNameForTelegrafMeasurementName string `toml:"single_table_dimension_name_for_telegraf_measurement_name"`
|
||
|
|
|
||
|
|
CreateTableIfNotExists bool `toml:"create_table_if_not_exists"`
|
||
|
|
CreateTableMagneticStoreRetentionPeriodInDays int64 `toml:"create_table_magnetic_store_retention_period_in_days"`
|
||
|
|
CreateTableMemoryStoreRetentionPeriodInHours int64 `toml:"create_table_memory_store_retention_period_in_hours"`
|
||
|
|
CreateTableTags map[string]string `toml:"create_table_tags"`
|
||
|
|
|
||
|
|
Log telegraf.Logger
|
||
|
|
svc WriteClient
|
||
|
|
}
|
||
|
|
|
||
|
|
WriteClient interface {
|
||
|
|
CreateTable(*timestreamwrite.CreateTableInput) (*timestreamwrite.CreateTableOutput, error)
|
||
|
|
WriteRecords(*timestreamwrite.WriteRecordsInput) (*timestreamwrite.WriteRecordsOutput, error)
|
||
|
|
DescribeDatabase(*timestreamwrite.DescribeDatabaseInput) (*timestreamwrite.DescribeDatabaseOutput, error)
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
// Mapping modes specify how Telegraf model should be represented in Timestream model.
|
||
|
|
// See sample config for more details.
|
||
|
|
const (
|
||
|
|
MappingModeSingleTable = "single-table"
|
||
|
|
MappingModeMultiTable = "multi-table"
|
||
|
|
)
|
||
|
|
|
||
|
|
// MaxRecordsPerCall reflects Timestream limit of WriteRecords API call
|
||
|
|
const MaxRecordsPerCall = 100
|
||
|
|
|
||
|
|
var sampleConfig = `
|
||
|
|
## Amazon Region
|
||
|
|
region = "us-east-1"
|
||
|
|
|
||
|
|
## Amazon Credentials
|
||
|
|
## Credentials are loaded in the following order:
|
||
|
|
## 1) Assumed credentials via STS if role_arn is specified
|
||
|
|
## 2) Explicit credentials from 'access_key' and 'secret_key'
|
||
|
|
## 3) Shared profile from 'profile'
|
||
|
|
## 4) Environment variables
|
||
|
|
## 5) Shared credentials file
|
||
|
|
## 6) EC2 Instance Profile
|
||
|
|
#access_key = ""
|
||
|
|
#secret_key = ""
|
||
|
|
#token = ""
|
||
|
|
#role_arn = ""
|
||
|
|
#profile = ""
|
||
|
|
#shared_credential_file = ""
|
||
|
|
|
||
|
|
## Endpoint to make request against, the correct endpoint is automatically
|
||
|
|
## determined and this option should only be set if you wish to override the
|
||
|
|
## default.
|
||
|
|
## ex: endpoint_url = "http://localhost:8000"
|
||
|
|
# endpoint_url = ""
|
||
|
|
|
||
|
|
## Timestream database where the metrics will be inserted.
|
||
|
|
## The database must exist prior to starting Telegraf.
|
||
|
|
database_name = "yourDatabaseNameHere"
|
||
|
|
|
||
|
|
## Specifies if the plugin should describe the Timestream database upon starting
|
||
|
|
## to validate if it has access necessary permissions, connection, etc., as a safety check.
|
||
|
|
## If the describe operation fails, the plugin will not start
|
||
|
|
## and therefore the Telegraf agent will not start.
|
||
|
|
describe_database_on_start = false
|
||
|
|
|
||
|
|
## The mapping mode specifies how Telegraf records are represented in Timestream.
|
||
|
|
## Valid values are: single-table, multi-table.
|
||
|
|
## For example, consider the following data in line protocol format:
|
||
|
|
## weather,location=us-midwest,season=summer temperature=82,humidity=71 1465839830100400200
|
||
|
|
## airquality,location=us-west no2=5,pm25=16 1465839830100400200
|
||
|
|
## where weather and airquality are the measurement names, location and season are tags,
|
||
|
|
## and temperature, humidity, no2, pm25 are fields.
|
||
|
|
## In multi-table mode:
|
||
|
|
## - first line will be ingested to table named weather
|
||
|
|
## - second line will be ingested to table named airquality
|
||
|
|
## - the tags will be represented as dimensions
|
||
|
|
## - first table (weather) will have two records:
|
||
|
|
## one with measurement name equals to temperature,
|
||
|
|
## another with measurement name equals to humidity
|
||
|
|
## - second table (airquality) will have two records:
|
||
|
|
## one with measurement name equals to no2,
|
||
|
|
## another with measurement name equals to pm25
|
||
|
|
## - the Timestream tables from the example will look like this:
|
||
|
|
## TABLE "weather":
|
||
|
|
## time | location | season | measure_name | measure_value::bigint
|
||
|
|
## 2016-06-13 17:43:50 | us-midwest | summer | temperature | 82
|
||
|
|
## 2016-06-13 17:43:50 | us-midwest | summer | humidity | 71
|
||
|
|
## TABLE "airquality":
|
||
|
|
## time | location | measure_name | measure_value::bigint
|
||
|
|
## 2016-06-13 17:43:50 | us-west | no2 | 5
|
||
|
|
## 2016-06-13 17:43:50 | us-west | pm25 | 16
|
||
|
|
## In single-table mode:
|
||
|
|
## - the data will be ingested to a single table, which name will be valueOf(single_table_name)
|
||
|
|
## - measurement name will stored in dimension named valueOf(single_table_dimension_name_for_telegraf_measurement_name)
|
||
|
|
## - location and season will be represented as dimensions
|
||
|
|
## - temperature, humidity, no2, pm25 will be represented as measurement name
|
||
|
|
## - the Timestream table from the example will look like this:
|
||
|
|
## Assuming:
|
||
|
|
## - single_table_name = "my_readings"
|
||
|
|
## - single_table_dimension_name_for_telegraf_measurement_name = "namespace"
|
||
|
|
## TABLE "my_readings":
|
||
|
|
## time | location | season | namespace | measure_name | measure_value::bigint
|
||
|
|
## 2016-06-13 17:43:50 | us-midwest | summer | weather | temperature | 82
|
||
|
|
## 2016-06-13 17:43:50 | us-midwest | summer | weather | humidity | 71
|
||
|
|
## 2016-06-13 17:43:50 | us-west | NULL | airquality | no2 | 5
|
||
|
|
## 2016-06-13 17:43:50 | us-west | NULL | airquality | pm25 | 16
|
||
|
|
## In most cases, using multi-table mapping mode is recommended.
|
||
|
|
## However, you can consider using single-table in situations when you have thousands of measurement names.
|
||
|
|
mapping_mode = "multi-table"
|
||
|
|
|
||
|
|
## Only valid and required for mapping_mode = "single-table"
|
||
|
|
## Specifies the Timestream table where the metrics will be uploaded.
|
||
|
|
# single_table_name = "yourTableNameHere"
|
||
|
|
|
||
|
|
## Only valid and required for mapping_mode = "single-table"
|
||
|
|
## Describes what will be the Timestream dimension name for the Telegraf
|
||
|
|
## measurement name.
|
||
|
|
# single_table_dimension_name_for_telegraf_measurement_name = "namespace"
|
||
|
|
|
||
|
|
## Specifies if the plugin should create the table, if the table do not exist.
|
||
|
|
## The plugin writes the data without prior checking if the table exists.
|
||
|
|
## When the table does not exist, the error returned from Timestream will cause
|
||
|
|
## the plugin to create the table, if this parameter is set to true.
|
||
|
|
create_table_if_not_exists = true
|
||
|
|
|
||
|
|
## Only valid and required if create_table_if_not_exists = true
|
||
|
|
## Specifies the Timestream table magnetic store retention period in days.
|
||
|
|
## Check Timestream documentation for more details.
|
||
|
|
create_table_magnetic_store_retention_period_in_days = 365
|
||
|
|
|
||
|
|
## Only valid and required if create_table_if_not_exists = true
|
||
|
|
## Specifies the Timestream table memory store retention period in hours.
|
||
|
|
## Check Timestream documentation for more details.
|
||
|
|
create_table_memory_store_retention_period_in_hours = 24
|
||
|
|
|
||
|
|
## Only valid and optional if create_table_if_not_exists = true
|
||
|
|
## Specifies the Timestream table tags.
|
||
|
|
## Check Timestream documentation for more details
|
||
|
|
# create_table_tags = { "foo" = "bar", "environment" = "dev"}
|
||
|
|
`
|
||
|
|
|
||
|
|
// WriteFactory function provides a way to mock the client instantiation for testing purposes.
|
||
|
|
var WriteFactory = func(credentialConfig *internalaws.CredentialConfig) WriteClient {
|
||
|
|
configProvider := credentialConfig.Credentials()
|
||
|
|
return timestreamwrite.New(configProvider)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t *Timestream) Connect() error {
|
||
|
|
if t.DatabaseName == "" {
|
||
|
|
return fmt.Errorf("DatabaseName key is required")
|
||
|
|
}
|
||
|
|
|
||
|
|
if t.MappingMode == "" {
|
||
|
|
return fmt.Errorf("MappingMode key is required")
|
||
|
|
}
|
||
|
|
|
||
|
|
if t.MappingMode != MappingModeSingleTable && t.MappingMode != MappingModeMultiTable {
|
||
|
|
return fmt.Errorf("correct MappingMode key values are: '%s', '%s'",
|
||
|
|
MappingModeSingleTable, MappingModeMultiTable)
|
||
|
|
}
|
||
|
|
|
||
|
|
if t.MappingMode == MappingModeSingleTable {
|
||
|
|
if t.SingleTableName == "" {
|
||
|
|
return fmt.Errorf("in '%s' mapping mode, SingleTableName key is required", MappingModeSingleTable)
|
||
|
|
}
|
||
|
|
|
||
|
|
if t.SingleTableDimensionNameForTelegrafMeasurementName == "" {
|
||
|
|
return fmt.Errorf("in '%s' mapping mode, SingleTableDimensionNameForTelegrafMeasurementName key is required",
|
||
|
|
MappingModeSingleTable)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if t.MappingMode == MappingModeMultiTable {
|
||
|
|
if t.SingleTableName != "" {
|
||
|
|
return fmt.Errorf("in '%s' mapping mode, do not specify SingleTableName key", MappingModeMultiTable)
|
||
|
|
}
|
||
|
|
|
||
|
|
if t.SingleTableDimensionNameForTelegrafMeasurementName != "" {
|
||
|
|
return fmt.Errorf("in '%s' mapping mode, do not specify SingleTableDimensionNameForTelegrafMeasurementName key", MappingModeMultiTable)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if t.CreateTableIfNotExists {
|
||
|
|
if t.CreateTableMagneticStoreRetentionPeriodInDays < 1 {
|
||
|
|
return fmt.Errorf("if Telegraf should create tables, CreateTableMagneticStoreRetentionPeriodInDays key should have a value greater than 0")
|
||
|
|
}
|
||
|
|
|
||
|
|
if t.CreateTableMemoryStoreRetentionPeriodInHours < 1 {
|
||
|
|
return fmt.Errorf("if Telegraf should create tables, CreateTableMemoryStoreRetentionPeriodInHours key should have a value greater than 0")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
t.Log.Infof("Constructing Timestream client for '%s' mode", t.MappingMode)
|
||
|
|
|
||
|
|
credentialConfig := &internalaws.CredentialConfig{
|
||
|
|
Region: t.Region,
|
||
|
|
AccessKey: t.AccessKey,
|
||
|
|
SecretKey: t.SecretKey,
|
||
|
|
RoleARN: t.RoleARN,
|
||
|
|
Profile: t.Profile,
|
||
|
|
Filename: t.Filename,
|
||
|
|
Token: t.Token,
|
||
|
|
EndpointURL: t.EndpointURL,
|
||
|
|
}
|
||
|
|
svc := WriteFactory(credentialConfig)
|
||
|
|
|
||
|
|
if t.DescribeDatabaseOnStart {
|
||
|
|
t.Log.Infof("Describing database '%s' in region '%s'", t.DatabaseName, t.Region)
|
||
|
|
|
||
|
|
describeDatabaseInput := ×treamwrite.DescribeDatabaseInput{
|
||
|
|
DatabaseName: aws.String(t.DatabaseName),
|
||
|
|
}
|
||
|
|
describeDatabaseOutput, err := svc.DescribeDatabase(describeDatabaseInput)
|
||
|
|
if err != nil {
|
||
|
|
t.Log.Errorf("Couldn't describe database '%s'. Check error, fix permissions, connectivity, create database.", t.DatabaseName)
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
t.Log.Infof("Describe database '%s' returned: '%s'.", t.DatabaseName, describeDatabaseOutput)
|
||
|
|
}
|
||
|
|
|
||
|
|
t.svc = svc
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t *Timestream) Close() error {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t *Timestream) SampleConfig() string {
|
||
|
|
return sampleConfig
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t *Timestream) Description() string {
|
||
|
|
return "Configuration for Amazon Timestream output."
|
||
|
|
}
|
||
|
|
|
||
|
|
func init() {
|
||
|
|
outputs.Add("timestream", func() telegraf.Output {
|
||
|
|
return &Timestream{}
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t *Timestream) Write(metrics []telegraf.Metric) error {
|
||
|
|
writeRecordsInputs := t.TransformMetrics(metrics)
|
||
|
|
for _, writeRecordsInput := range writeRecordsInputs {
|
||
|
|
if err := t.writeToTimestream(writeRecordsInput, true); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t *Timestream) writeToTimestream(writeRecordsInput *timestreamwrite.WriteRecordsInput, resourceNotFoundRetry bool) error {
|
||
|
|
t.Log.Debugf("Writing to Timestream: '%v' with ResourceNotFoundRetry: '%t'", writeRecordsInput, resourceNotFoundRetry)
|
||
|
|
|
||
|
|
_, err := t.svc.WriteRecords(writeRecordsInput)
|
||
|
|
if err != nil {
|
||
|
|
// Telegraf will retry ingesting the metrics if an error is returned from the plugin.
|
||
|
|
// Therefore, return error only for retryable exceptions: ThrottlingException and 5xx exceptions.
|
||
|
|
if e, ok := err.(awserr.Error); ok {
|
||
|
|
switch e.Code() {
|
||
|
|
case timestreamwrite.ErrCodeResourceNotFoundException:
|
||
|
|
if resourceNotFoundRetry {
|
||
|
|
t.Log.Warnf("Failed to write to Timestream database '%s' table '%s'. Error: '%s'",
|
||
|
|
t.DatabaseName, *writeRecordsInput.TableName, e)
|
||
|
|
return t.createTableAndRetry(writeRecordsInput)
|
||
|
|
}
|
||
|
|
t.logWriteToTimestreamError(err, writeRecordsInput.TableName)
|
||
|
|
case timestreamwrite.ErrCodeThrottlingException:
|
||
|
|
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %s",
|
||
|
|
t.DatabaseName, *writeRecordsInput.TableName, err)
|
||
|
|
case timestreamwrite.ErrCodeInternalServerException:
|
||
|
|
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %s",
|
||
|
|
t.DatabaseName, *writeRecordsInput.TableName, err)
|
||
|
|
default:
|
||
|
|
t.logWriteToTimestreamError(err, writeRecordsInput.TableName)
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
// Retry other, non-aws errors.
|
||
|
|
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %s",
|
||
|
|
t.DatabaseName, *writeRecordsInput.TableName, err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t *Timestream) logWriteToTimestreamError(err error, tableName *string) {
|
||
|
|
t.Log.Errorf("Failed to write to Timestream database '%s' table '%s'. Skipping metric! Error: '%s'",
|
||
|
|
t.DatabaseName, *tableName, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t *Timestream) createTableAndRetry(writeRecordsInput *timestreamwrite.WriteRecordsInput) error {
|
||
|
|
if t.CreateTableIfNotExists {
|
||
|
|
t.Log.Infof("Trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'true'.", *writeRecordsInput.TableName, t.DatabaseName)
|
||
|
|
if err := t.createTable(writeRecordsInput.TableName); err != nil {
|
||
|
|
t.Log.Errorf("Failed to create table '%s' in database '%s': %s. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName, err)
|
||
|
|
} else {
|
||
|
|
t.Log.Infof("Table '%s' in database '%s' created. Retrying writing.", *writeRecordsInput.TableName, t.DatabaseName)
|
||
|
|
return t.writeToTimestream(writeRecordsInput, false)
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
t.Log.Errorf("Not trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'false'. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName)
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// createTable creates a Timestream table according to the configuration.
|
||
|
|
func (t *Timestream) createTable(tableName *string) error {
|
||
|
|
createTableInput := ×treamwrite.CreateTableInput{
|
||
|
|
DatabaseName: aws.String(t.DatabaseName),
|
||
|
|
TableName: aws.String(*tableName),
|
||
|
|
RetentionProperties: ×treamwrite.RetentionProperties{
|
||
|
|
MagneticStoreRetentionPeriodInDays: aws.Int64(t.CreateTableMagneticStoreRetentionPeriodInDays),
|
||
|
|
MemoryStoreRetentionPeriodInHours: aws.Int64(t.CreateTableMemoryStoreRetentionPeriodInHours),
|
||
|
|
},
|
||
|
|
}
|
||
|
|
var tags []*timestreamwrite.Tag
|
||
|
|
for key, val := range t.CreateTableTags {
|
||
|
|
tags = append(tags, ×treamwrite.Tag{
|
||
|
|
Key: aws.String(key),
|
||
|
|
Value: aws.String(val),
|
||
|
|
})
|
||
|
|
}
|
||
|
|
createTableInput.SetTags(tags)
|
||
|
|
|
||
|
|
_, err := t.svc.CreateTable(createTableInput)
|
||
|
|
if err != nil {
|
||
|
|
if e, ok := err.(awserr.Error); ok {
|
||
|
|
// if the table was created in the meantime, it's ok.
|
||
|
|
if e.Code() == timestreamwrite.ErrCodeConflictException {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// TransformMetrics transforms a collection of Telegraf Metrics into write requests to Timestream.
|
||
|
|
// Telegraf Metrics are grouped by Name, Tag Keys and Time to use Timestream CommonAttributes.
|
||
|
|
// Returns collection of write requests to be performed to Timestream.
|
||
|
|
func (t *Timestream) TransformMetrics(metrics []telegraf.Metric) []*timestreamwrite.WriteRecordsInput {
|
||
|
|
writeRequests := make(map[uint64]*timestreamwrite.WriteRecordsInput, len(metrics))
|
||
|
|
for _, m := range metrics {
|
||
|
|
// build MeasureName, MeasureValue, MeasureValueType
|
||
|
|
records := t.buildWriteRecords(m)
|
||
|
|
if len(records) == 0 {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
id := hashFromMetricTimeNameTagKeys(m)
|
||
|
|
if curr, ok := writeRequests[id]; !ok {
|
||
|
|
// No current CommonAttributes/WriteRecordsInput found for current Telegraf Metric
|
||
|
|
dimensions := t.buildDimensions(m)
|
||
|
|
timeUnit, timeValue := getTimestreamTime(m.Time())
|
||
|
|
newWriteRecord := ×treamwrite.WriteRecordsInput{
|
||
|
|
DatabaseName: aws.String(t.DatabaseName),
|
||
|
|
Records: records,
|
||
|
|
CommonAttributes: ×treamwrite.Record{
|
||
|
|
Dimensions: dimensions,
|
||
|
|
Time: aws.String(timeValue),
|
||
|
|
TimeUnit: aws.String(timeUnit),
|
||
|
|
},
|
||
|
|
}
|
||
|
|
if t.MappingMode == MappingModeSingleTable {
|
||
|
|
newWriteRecord.SetTableName(t.SingleTableName)
|
||
|
|
}
|
||
|
|
if t.MappingMode == MappingModeMultiTable {
|
||
|
|
newWriteRecord.SetTableName(m.Name())
|
||
|
|
}
|
||
|
|
|
||
|
|
writeRequests[id] = newWriteRecord
|
||
|
|
} else {
|
||
|
|
curr.Records = append(curr.Records, records...)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// Create result as array of WriteRecordsInput. Split requests over records count limit to smaller requests.
|
||
|
|
var result []*timestreamwrite.WriteRecordsInput
|
||
|
|
for _, writeRequest := range writeRequests {
|
||
|
|
if len(writeRequest.Records) > MaxRecordsPerCall {
|
||
|
|
for _, recordsPartition := range partitionRecords(MaxRecordsPerCall, writeRequest.Records) {
|
||
|
|
newWriteRecord := ×treamwrite.WriteRecordsInput{
|
||
|
|
DatabaseName: writeRequest.DatabaseName,
|
||
|
|
TableName: writeRequest.TableName,
|
||
|
|
Records: recordsPartition,
|
||
|
|
CommonAttributes: writeRequest.CommonAttributes,
|
||
|
|
}
|
||
|
|
result = append(result, newWriteRecord)
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
result = append(result, writeRequest)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return result
|
||
|
|
}
|
||
|
|
|
||
|
|
func hashFromMetricTimeNameTagKeys(m telegraf.Metric) uint64 {
|
||
|
|
h := fnv.New64a()
|
||
|
|
h.Write([]byte(m.Name()))
|
||
|
|
h.Write([]byte("\n"))
|
||
|
|
for _, tag := range m.TagList() {
|
||
|
|
if tag.Key == "" {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
|
||
|
|
h.Write([]byte(tag.Key))
|
||
|
|
h.Write([]byte("\n"))
|
||
|
|
h.Write([]byte(tag.Value))
|
||
|
|
h.Write([]byte("\n"))
|
||
|
|
}
|
||
|
|
b := make([]byte, binary.MaxVarintLen64)
|
||
|
|
n := binary.PutUvarint(b, uint64(m.Time().UnixNano()))
|
||
|
|
h.Write(b[:n])
|
||
|
|
h.Write([]byte("\n"))
|
||
|
|
return h.Sum64()
|
||
|
|
}
|
||
|
|
|
||
|
|
func (t *Timestream) buildDimensions(point telegraf.Metric) []*timestreamwrite.Dimension {
|
||
|
|
var dimensions []*timestreamwrite.Dimension
|
||
|
|
for tagName, tagValue := range point.Tags() {
|
||
|
|
dimension := ×treamwrite.Dimension{
|
||
|
|
Name: aws.String(tagName),
|
||
|
|
Value: aws.String(tagValue),
|
||
|
|
}
|
||
|
|
dimensions = append(dimensions, dimension)
|
||
|
|
}
|
||
|
|
if t.MappingMode == MappingModeSingleTable {
|
||
|
|
dimension := ×treamwrite.Dimension{
|
||
|
|
Name: aws.String(t.SingleTableDimensionNameForTelegrafMeasurementName),
|
||
|
|
Value: aws.String(point.Name()),
|
||
|
|
}
|
||
|
|
dimensions = append(dimensions, dimension)
|
||
|
|
}
|
||
|
|
return dimensions
|
||
|
|
}
|
||
|
|
|
||
|
|
// buildWriteRecords builds the Timestream write records from Metric Fields only.
|
||
|
|
// Tags and time are not included - common attributes are built separately.
|
||
|
|
// Records with unsupported Metric Field type are skipped.
|
||
|
|
// It returns an array of Timestream write records.
|
||
|
|
func (t *Timestream) buildWriteRecords(point telegraf.Metric) []*timestreamwrite.Record {
|
||
|
|
var records []*timestreamwrite.Record
|
||
|
|
for fieldName, fieldValue := range point.Fields() {
|
||
|
|
stringFieldValue, stringFieldValueType, ok := convertValue(fieldValue)
|
||
|
|
if !ok {
|
||
|
|
t.Log.Errorf("Skipping field '%s'. The type '%s' is not supported in Timestream as MeasureValue. "+
|
||
|
|
"Supported values are: [int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool]",
|
||
|
|
fieldName, reflect.TypeOf(fieldValue))
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
record := ×treamwrite.Record{
|
||
|
|
MeasureName: aws.String(fieldName),
|
||
|
|
MeasureValueType: aws.String(stringFieldValueType),
|
||
|
|
MeasureValue: aws.String(stringFieldValue),
|
||
|
|
}
|
||
|
|
records = append(records, record)
|
||
|
|
}
|
||
|
|
return records
|
||
|
|
}
|
||
|
|
|
||
|
|
// partitionRecords splits the Timestream records into smaller slices of a max size
|
||
|
|
// so that are under the limit for the Timestream API call.
|
||
|
|
// It returns the array of array of records.
|
||
|
|
func partitionRecords(size int, records []*timestreamwrite.Record) [][]*timestreamwrite.Record {
|
||
|
|
numberOfPartitions := len(records) / size
|
||
|
|
if len(records)%size != 0 {
|
||
|
|
numberOfPartitions++
|
||
|
|
}
|
||
|
|
|
||
|
|
partitions := make([][]*timestreamwrite.Record, numberOfPartitions)
|
||
|
|
|
||
|
|
for i := 0; i < numberOfPartitions; i++ {
|
||
|
|
start := size * i
|
||
|
|
end := size * (i + 1)
|
||
|
|
if end > len(records) {
|
||
|
|
end = len(records)
|
||
|
|
}
|
||
|
|
|
||
|
|
partitions[i] = records[start:end]
|
||
|
|
}
|
||
|
|
|
||
|
|
return partitions
|
||
|
|
}
|
||
|
|
|
||
|
|
// getTimestreamTime produces Timestream TimeUnit and TimeValue with minimum possible granularity
|
||
|
|
// while maintaining the same information.
|
||
|
|
func getTimestreamTime(time time.Time) (timeUnit string, timeValue string) {
|
||
|
|
const (
|
||
|
|
TimeUnitS = "SECONDS"
|
||
|
|
TimeUnitMS = "MILLISECONDS"
|
||
|
|
TimeUnitUS = "MICROSECONDS"
|
||
|
|
TimeUnitNS = "NANOSECONDS"
|
||
|
|
)
|
||
|
|
nanosTime := time.UnixNano()
|
||
|
|
if nanosTime%1e9 == 0 {
|
||
|
|
timeUnit = TimeUnitS
|
||
|
|
timeValue = strconv.FormatInt(nanosTime/1e9, 10)
|
||
|
|
} else if nanosTime%1e6 == 0 {
|
||
|
|
timeUnit = TimeUnitMS
|
||
|
|
timeValue = strconv.FormatInt(nanosTime/1e6, 10)
|
||
|
|
} else if nanosTime%1e3 == 0 {
|
||
|
|
timeUnit = TimeUnitUS
|
||
|
|
timeValue = strconv.FormatInt(nanosTime/1e3, 10)
|
||
|
|
} else {
|
||
|
|
timeUnit = TimeUnitNS
|
||
|
|
timeValue = strconv.FormatInt(nanosTime, 10)
|
||
|
|
}
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// convertValue converts single Field value from Telegraf Metric and produces
|
||
|
|
// value, valueType Timestream representation.
|
||
|
|
func convertValue(v interface{}) (value string, valueType string, ok bool) {
|
||
|
|
const (
|
||
|
|
TypeBigInt = "BIGINT"
|
||
|
|
TypeDouble = "DOUBLE"
|
||
|
|
TypeBoolean = "BOOLEAN"
|
||
|
|
TypeVarchar = "VARCHAR"
|
||
|
|
)
|
||
|
|
ok = true
|
||
|
|
|
||
|
|
switch t := v.(type) {
|
||
|
|
case int:
|
||
|
|
valueType = TypeBigInt
|
||
|
|
value = strconv.FormatInt(int64(t), 10)
|
||
|
|
case int8:
|
||
|
|
valueType = TypeBigInt
|
||
|
|
value = strconv.FormatInt(int64(t), 10)
|
||
|
|
case int16:
|
||
|
|
valueType = TypeBigInt
|
||
|
|
value = strconv.FormatInt(int64(t), 10)
|
||
|
|
case int32:
|
||
|
|
valueType = TypeBigInt
|
||
|
|
value = strconv.FormatInt(int64(t), 10)
|
||
|
|
case int64:
|
||
|
|
valueType = TypeBigInt
|
||
|
|
value = strconv.FormatInt(t, 10)
|
||
|
|
case uint:
|
||
|
|
valueType = TypeBigInt
|
||
|
|
value = strconv.FormatUint(uint64(t), 10)
|
||
|
|
case uint8:
|
||
|
|
valueType = TypeBigInt
|
||
|
|
value = strconv.FormatUint(uint64(t), 10)
|
||
|
|
case uint16:
|
||
|
|
valueType = TypeBigInt
|
||
|
|
value = strconv.FormatUint(uint64(t), 10)
|
||
|
|
case uint32:
|
||
|
|
valueType = TypeBigInt
|
||
|
|
value = strconv.FormatUint(uint64(t), 10)
|
||
|
|
case uint64:
|
||
|
|
valueType = TypeBigInt
|
||
|
|
value = strconv.FormatUint(t, 10)
|
||
|
|
case float32:
|
||
|
|
valueType = TypeDouble
|
||
|
|
value = strconv.FormatFloat(float64(t), 'f', -1, 32)
|
||
|
|
case float64:
|
||
|
|
valueType = TypeDouble
|
||
|
|
value = strconv.FormatFloat(t, 'f', -1, 64)
|
||
|
|
case bool:
|
||
|
|
valueType = TypeBoolean
|
||
|
|
if t {
|
||
|
|
value = "true"
|
||
|
|
} else {
|
||
|
|
value = "false"
|
||
|
|
}
|
||
|
|
case string:
|
||
|
|
valueType = TypeVarchar
|
||
|
|
value = t
|
||
|
|
default:
|
||
|
|
// Skip unsupported type.
|
||
|
|
ok = false
|
||
|
|
return
|
||
|
|
}
|
||
|
|
return
|
||
|
|
}
|