2022-05-25 22:48:59 +08:00
//go:generate ../../../tools/readme_config_includer/generator
2020-10-16 01:51:17 +08:00
package timestream
import (
2021-10-22 05:32:10 +08:00
"context"
2022-05-25 22:48:59 +08:00
_ "embed"
2021-10-22 05:32:10 +08:00
"errors"
2020-10-16 01:51:17 +08:00
"fmt"
"reflect"
"strconv"
2022-01-07 06:28:23 +08:00
"sync"
2020-10-16 01:51:17 +08:00
"time"
2021-10-22 05:32:10 +08:00
"github.com/aws/aws-sdk-go-v2/aws"
2022-07-29 22:55:11 +08:00
"github.com/aws/aws-sdk-go-v2/config"
2021-10-22 05:32:10 +08:00
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types"
"github.com/aws/smithy-go"
2021-11-25 03:33:45 +08:00
"github.com/influxdata/telegraf"
2020-10-16 01:51:17 +08:00
internalaws "github.com/influxdata/telegraf/config/aws"
2021-11-25 03:33:45 +08:00
"github.com/influxdata/telegraf/plugins/outputs"
2020-10-16 01:51:17 +08:00
)
2022-05-27 21:13:47 +08:00
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
2022-05-25 22:48:59 +08:00
//go:embed sample.conf
var sampleConfig string
2020-10-16 01:51:17 +08:00
type (
Timestream struct {
MappingMode string ` toml:"mapping_mode" `
DescribeDatabaseOnStart bool ` toml:"describe_database_on_start" `
DatabaseName string ` toml:"database_name" `
SingleTableName string ` toml:"single_table_name" `
SingleTableDimensionNameForTelegrafMeasurementName string ` toml:"single_table_dimension_name_for_telegraf_measurement_name" `
CreateTableIfNotExists bool ` toml:"create_table_if_not_exists" `
CreateTableMagneticStoreRetentionPeriodInDays int64 ` toml:"create_table_magnetic_store_retention_period_in_days" `
CreateTableMemoryStoreRetentionPeriodInHours int64 ` toml:"create_table_memory_store_retention_period_in_hours" `
CreateTableTags map [ string ] string ` toml:"create_table_tags" `
2022-01-07 06:28:23 +08:00
MaxWriteGoRoutinesCount int ` toml:"max_write_go_routines" `
2020-10-16 01:51:17 +08:00
Log telegraf . Logger
svc WriteClient
2021-08-04 05:29:26 +08:00
internalaws . CredentialConfig
2020-10-16 01:51:17 +08:00
}
WriteClient interface {
2021-10-22 05:32:10 +08:00
CreateTable ( context . Context , * timestreamwrite . CreateTableInput , ... func ( * timestreamwrite . Options ) ) ( * timestreamwrite . CreateTableOutput , error )
WriteRecords ( context . Context , * timestreamwrite . WriteRecordsInput , ... func ( * timestreamwrite . Options ) ) ( * timestreamwrite . WriteRecordsOutput , error )
DescribeDatabase ( context . Context , * timestreamwrite . DescribeDatabaseInput , ... func ( * timestreamwrite . Options ) ) ( * timestreamwrite . DescribeDatabaseOutput , error )
2020-10-16 01:51:17 +08:00
}
)
// Mapping modes specify how Telegraf model should be represented in Timestream model.
// See sample config for more details.
const (
MappingModeSingleTable = "single-table"
MappingModeMultiTable = "multi-table"
)
// MaxRecordsPerCall reflects Timestream limit of WriteRecords API call
const MaxRecordsPerCall = 100
2022-01-07 06:28:23 +08:00
// Default value for maximum number of parallel go routines to ingest/write data
// when max_write_go_routines is not specified in the config
const MaxWriteRoutinesDefault = 1
2020-10-16 01:51:17 +08:00
// WriteFactory function provides a way to mock the client instantiation for testing purposes.
2021-10-08 04:47:56 +08:00
var WriteFactory = func ( credentialConfig * internalaws . CredentialConfig ) ( WriteClient , error ) {
2022-07-29 22:55:11 +08:00
awsCreds , awsErr := credentialConfig . Credentials ( )
if awsErr != nil {
panic ( "Unable to load credentials config " + awsErr . Error ( ) )
}
cfg , cfgErr := config . LoadDefaultConfig ( context . TODO ( ) )
if cfgErr != nil {
panic ( "Unable to load SDK config for Timestream " + cfgErr . Error ( ) )
2021-10-08 04:47:56 +08:00
}
2022-07-29 22:55:11 +08:00
if credentialConfig . EndpointURL != "" && credentialConfig . Region != "" {
customResolver := aws . EndpointResolverWithOptionsFunc ( func ( service , region string , options ... interface { } ) ( aws . Endpoint , error ) {
return aws . Endpoint {
PartitionID : "aws" ,
URL : credentialConfig . EndpointURL ,
SigningRegion : credentialConfig . Region ,
} , nil
} )
cfg , err := config . LoadDefaultConfig ( context . TODO ( ) , config . WithEndpointResolverWithOptions ( customResolver ) )
if err != nil {
panic ( "unable to load SDK config for Timestream " + err . Error ( ) )
}
cfg . Credentials = awsCreds . Credentials
return timestreamwrite . NewFromConfig ( cfg , func ( o * timestreamwrite . Options ) {
o . Region = credentialConfig . Region
o . EndpointDiscovery . EnableEndpointDiscovery = aws . EndpointDiscoveryDisabled
} ) , nil
}
cfg . Credentials = awsCreds . Credentials
return timestreamwrite . NewFromConfig ( cfg , func ( o * timestreamwrite . Options ) {
o . Region = credentialConfig . Region
} ) , nil
2020-10-16 01:51:17 +08:00
}
2022-05-25 22:48:59 +08:00
func ( * Timestream ) SampleConfig ( ) string {
return sampleConfig
}
2020-10-16 01:51:17 +08:00
func ( t * Timestream ) Connect ( ) error {
if t . DatabaseName == "" {
return fmt . Errorf ( "DatabaseName key is required" )
}
if t . MappingMode == "" {
return fmt . Errorf ( "MappingMode key is required" )
}
if t . MappingMode != MappingModeSingleTable && t . MappingMode != MappingModeMultiTable {
return fmt . Errorf ( "correct MappingMode key values are: '%s', '%s'" ,
MappingModeSingleTable , MappingModeMultiTable )
}
if t . MappingMode == MappingModeSingleTable {
if t . SingleTableName == "" {
return fmt . Errorf ( "in '%s' mapping mode, SingleTableName key is required" , MappingModeSingleTable )
}
if t . SingleTableDimensionNameForTelegrafMeasurementName == "" {
return fmt . Errorf ( "in '%s' mapping mode, SingleTableDimensionNameForTelegrafMeasurementName key is required" ,
MappingModeSingleTable )
}
}
if t . MappingMode == MappingModeMultiTable {
if t . SingleTableName != "" {
return fmt . Errorf ( "in '%s' mapping mode, do not specify SingleTableName key" , MappingModeMultiTable )
}
if t . SingleTableDimensionNameForTelegrafMeasurementName != "" {
return fmt . Errorf ( "in '%s' mapping mode, do not specify SingleTableDimensionNameForTelegrafMeasurementName key" , MappingModeMultiTable )
}
}
if t . CreateTableIfNotExists {
if t . CreateTableMagneticStoreRetentionPeriodInDays < 1 {
return fmt . Errorf ( "if Telegraf should create tables, CreateTableMagneticStoreRetentionPeriodInDays key should have a value greater than 0" )
}
if t . CreateTableMemoryStoreRetentionPeriodInHours < 1 {
return fmt . Errorf ( "if Telegraf should create tables, CreateTableMemoryStoreRetentionPeriodInHours key should have a value greater than 0" )
}
}
2022-01-07 06:28:23 +08:00
if t . MaxWriteGoRoutinesCount <= 0 {
t . MaxWriteGoRoutinesCount = MaxWriteRoutinesDefault
}
2020-10-16 01:51:17 +08:00
t . Log . Infof ( "Constructing Timestream client for '%s' mode" , t . MappingMode )
2021-10-08 04:47:56 +08:00
svc , err := WriteFactory ( & t . CredentialConfig )
if err != nil {
return err
}
2020-10-16 01:51:17 +08:00
if t . DescribeDatabaseOnStart {
t . Log . Infof ( "Describing database '%s' in region '%s'" , t . DatabaseName , t . Region )
describeDatabaseInput := & timestreamwrite . DescribeDatabaseInput {
DatabaseName : aws . String ( t . DatabaseName ) ,
}
2021-10-22 05:32:10 +08:00
describeDatabaseOutput , err := svc . DescribeDatabase ( context . Background ( ) , describeDatabaseInput )
2020-10-16 01:51:17 +08:00
if err != nil {
t . Log . Errorf ( "Couldn't describe database '%s'. Check error, fix permissions, connectivity, create database." , t . DatabaseName )
return err
}
t . Log . Infof ( "Describe database '%s' returned: '%s'." , t . DatabaseName , describeDatabaseOutput )
}
t . svc = svc
return nil
}
func ( t * Timestream ) Close ( ) error {
return nil
}
func init ( ) {
outputs . Add ( "timestream" , func ( ) telegraf . Output {
return & Timestream { }
} )
}
func ( t * Timestream ) Write ( metrics [ ] telegraf . Metric ) error {
writeRecordsInputs := t . TransformMetrics ( metrics )
2022-01-07 06:28:23 +08:00
maxWriteJobs := t . MaxWriteGoRoutinesCount
numberOfWriteRecordsInputs := len ( writeRecordsInputs )
if numberOfWriteRecordsInputs < maxWriteJobs {
maxWriteJobs = numberOfWriteRecordsInputs
}
var wg sync . WaitGroup
errs := make ( chan error , numberOfWriteRecordsInputs )
writeJobs := make ( chan * timestreamwrite . WriteRecordsInput , maxWriteJobs )
start := time . Now ( )
for i := 0 ; i < maxWriteJobs ; i ++ {
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
for writeJob := range writeJobs {
if err := t . writeToTimestream ( writeJob , true ) ; err != nil {
errs <- err
}
}
} ( )
}
for i := range writeRecordsInputs {
writeJobs <- writeRecordsInputs [ i ]
}
// Close channel once all jobs are added
close ( writeJobs )
wg . Wait ( )
elapsed := time . Since ( start )
close ( errs )
t . Log . Infof ( "##WriteToTimestream - Metrics size: %d request size: %d time(ms): %d" ,
len ( metrics ) , len ( writeRecordsInputs ) , elapsed . Milliseconds ( ) )
// On partial failures, Telegraf will reject the entire batch of metrics and
// retry. writeToTimestream will return retryable exceptions only.
for err := range errs {
if err != nil {
2020-10-16 01:51:17 +08:00
return err
}
}
2022-01-07 06:28:23 +08:00
2020-10-16 01:51:17 +08:00
return nil
}
func ( t * Timestream ) writeToTimestream ( writeRecordsInput * timestreamwrite . WriteRecordsInput , resourceNotFoundRetry bool ) error {
t . Log . Debugf ( "Writing to Timestream: '%v' with ResourceNotFoundRetry: '%t'" , writeRecordsInput , resourceNotFoundRetry )
2021-10-22 05:32:10 +08:00
_ , err := t . svc . WriteRecords ( context . Background ( ) , writeRecordsInput )
2020-10-16 01:51:17 +08:00
if err != nil {
// Telegraf will retry ingesting the metrics if an error is returned from the plugin.
// Therefore, return error only for retryable exceptions: ThrottlingException and 5xx exceptions.
2021-10-22 05:32:10 +08:00
var notFound * types . ResourceNotFoundException
if errors . As ( err , & notFound ) {
if resourceNotFoundRetry {
t . Log . Warnf ( "Failed to write to Timestream database '%s' table '%s'. Error: '%s'" ,
t . DatabaseName , * writeRecordsInput . TableName , notFound )
return t . createTableAndRetry ( writeRecordsInput )
2020-10-16 01:51:17 +08:00
}
2021-10-22 05:32:10 +08:00
t . logWriteToTimestreamError ( notFound , writeRecordsInput . TableName )
}
var rejected * types . RejectedRecordsException
if errors . As ( err , & rejected ) {
t . logWriteToTimestreamError ( err , writeRecordsInput . TableName )
return nil
}
var throttling * types . ThrottlingException
if errors . As ( err , & throttling ) {
return fmt . Errorf ( "unable to write to Timestream database '%s' table '%s'. Error: %s" ,
t . DatabaseName , * writeRecordsInput . TableName , throttling )
}
var internal * types . InternalServerException
if errors . As ( err , & internal ) {
return fmt . Errorf ( "unable to write to Timestream database '%s' table '%s'. Error: %s" ,
t . DatabaseName , * writeRecordsInput . TableName , internal )
}
var operation * smithy . OperationError
if ! errors . As ( err , & operation ) {
2020-10-16 01:51:17 +08:00
// Retry other, non-aws errors.
return fmt . Errorf ( "unable to write to Timestream database '%s' table '%s'. Error: %s" ,
t . DatabaseName , * writeRecordsInput . TableName , err )
}
2021-10-22 05:32:10 +08:00
t . logWriteToTimestreamError ( err , writeRecordsInput . TableName )
2020-10-16 01:51:17 +08:00
}
return nil
}
func ( t * Timestream ) logWriteToTimestreamError ( err error , tableName * string ) {
t . Log . Errorf ( "Failed to write to Timestream database '%s' table '%s'. Skipping metric! Error: '%s'" ,
t . DatabaseName , * tableName , err )
}
func ( t * Timestream ) createTableAndRetry ( writeRecordsInput * timestreamwrite . WriteRecordsInput ) error {
if t . CreateTableIfNotExists {
t . Log . Infof ( "Trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'true'." , * writeRecordsInput . TableName , t . DatabaseName )
2021-11-25 03:33:45 +08:00
err := t . createTable ( writeRecordsInput . TableName )
if err == nil {
2020-10-16 01:51:17 +08:00
t . Log . Infof ( "Table '%s' in database '%s' created. Retrying writing." , * writeRecordsInput . TableName , t . DatabaseName )
return t . writeToTimestream ( writeRecordsInput , false )
}
2021-11-25 03:33:45 +08:00
t . Log . Errorf ( "Failed to create table '%s' in database '%s': %s. Skipping metric!" , * writeRecordsInput . TableName , t . DatabaseName , err )
2020-10-16 01:51:17 +08:00
} else {
t . Log . Errorf ( "Not trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'false'. Skipping metric!" , * writeRecordsInput . TableName , t . DatabaseName )
}
return nil
}
// createTable creates a Timestream table according to the configuration.
func ( t * Timestream ) createTable ( tableName * string ) error {
createTableInput := & timestreamwrite . CreateTableInput {
DatabaseName : aws . String ( t . DatabaseName ) ,
TableName : aws . String ( * tableName ) ,
2021-10-22 05:32:10 +08:00
RetentionProperties : & types . RetentionProperties {
MagneticStoreRetentionPeriodInDays : t . CreateTableMagneticStoreRetentionPeriodInDays ,
MemoryStoreRetentionPeriodInHours : t . CreateTableMemoryStoreRetentionPeriodInHours ,
2020-10-16 01:51:17 +08:00
} ,
}
2021-10-22 05:32:10 +08:00
var tags [ ] types . Tag
2020-10-16 01:51:17 +08:00
for key , val := range t . CreateTableTags {
2021-10-22 05:32:10 +08:00
tags = append ( tags , types . Tag {
2020-10-16 01:51:17 +08:00
Key : aws . String ( key ) ,
Value : aws . String ( val ) ,
} )
}
2021-10-22 05:32:10 +08:00
createTableInput . Tags = tags
2020-10-16 01:51:17 +08:00
2021-10-22 05:32:10 +08:00
_ , err := t . svc . CreateTable ( context . Background ( ) , createTableInput )
2020-10-16 01:51:17 +08:00
if err != nil {
2021-10-22 05:32:10 +08:00
if _ , ok := err . ( * types . ConflictException ) ; ok {
2020-10-16 01:51:17 +08:00
// if the table was created in the meantime, it's ok.
2021-10-22 05:32:10 +08:00
return nil
2020-10-16 01:51:17 +08:00
}
return err
}
return nil
}
// TransformMetrics transforms a collection of Telegraf Metrics into write requests to Timestream.
// Telegraf Metrics are grouped by Name, Tag Keys and Time to use Timestream CommonAttributes.
// Returns collection of write requests to be performed to Timestream.
func ( t * Timestream ) TransformMetrics ( metrics [ ] telegraf . Metric ) [ ] * timestreamwrite . WriteRecordsInput {
2022-01-07 06:28:23 +08:00
writeRequests := make ( map [ string ] * timestreamwrite . WriteRecordsInput , len ( metrics ) )
2020-10-16 01:51:17 +08:00
for _ , m := range metrics {
// build MeasureName, MeasureValue, MeasureValueType
records := t . buildWriteRecords ( m )
if len ( records ) == 0 {
continue
}
2022-01-07 06:28:23 +08:00
var tableName string
if t . MappingMode == MappingModeSingleTable {
tableName = t . SingleTableName
}
if t . MappingMode == MappingModeMultiTable {
tableName = m . Name ( )
}
if curr , ok := writeRequests [ tableName ] ; ! ok {
2020-10-16 01:51:17 +08:00
newWriteRecord := & timestreamwrite . WriteRecordsInput {
2022-01-07 06:28:23 +08:00
DatabaseName : aws . String ( t . DatabaseName ) ,
TableName : aws . String ( tableName ) ,
Records : records ,
CommonAttributes : & types . Record { } ,
2020-10-16 01:51:17 +08:00
}
2022-01-07 06:28:23 +08:00
writeRequests [ tableName ] = newWriteRecord
2020-10-16 01:51:17 +08:00
} else {
curr . Records = append ( curr . Records , records ... )
}
}
// Create result as array of WriteRecordsInput. Split requests over records count limit to smaller requests.
var result [ ] * timestreamwrite . WriteRecordsInput
for _ , writeRequest := range writeRequests {
if len ( writeRequest . Records ) > MaxRecordsPerCall {
for _ , recordsPartition := range partitionRecords ( MaxRecordsPerCall , writeRequest . Records ) {
newWriteRecord := & timestreamwrite . WriteRecordsInput {
DatabaseName : writeRequest . DatabaseName ,
TableName : writeRequest . TableName ,
Records : recordsPartition ,
CommonAttributes : writeRequest . CommonAttributes ,
}
result = append ( result , newWriteRecord )
}
} else {
result = append ( result , writeRequest )
}
}
return result
}
2021-10-22 05:32:10 +08:00
func ( t * Timestream ) buildDimensions ( point telegraf . Metric ) [ ] types . Dimension {
var dimensions [ ] types . Dimension
2020-10-16 01:51:17 +08:00
for tagName , tagValue := range point . Tags ( ) {
2021-10-22 05:32:10 +08:00
dimension := types . Dimension {
2020-10-16 01:51:17 +08:00
Name : aws . String ( tagName ) ,
Value : aws . String ( tagValue ) ,
}
dimensions = append ( dimensions , dimension )
}
if t . MappingMode == MappingModeSingleTable {
2021-10-22 05:32:10 +08:00
dimension := types . Dimension {
2020-10-16 01:51:17 +08:00
Name : aws . String ( t . SingleTableDimensionNameForTelegrafMeasurementName ) ,
Value : aws . String ( point . Name ( ) ) ,
}
dimensions = append ( dimensions , dimension )
}
return dimensions
}
// buildWriteRecords builds the Timestream write records from Metric Fields only.
// Tags and time are not included - common attributes are built separately.
// Records with unsupported Metric Field type are skipped.
// It returns an array of Timestream write records.
2021-10-22 05:32:10 +08:00
func ( t * Timestream ) buildWriteRecords ( point telegraf . Metric ) [ ] types . Record {
var records [ ] types . Record
2022-01-07 06:28:23 +08:00
dimensions := t . buildDimensions ( point )
2020-10-16 01:51:17 +08:00
for fieldName , fieldValue := range point . Fields ( ) {
stringFieldValue , stringFieldValueType , ok := convertValue ( fieldValue )
if ! ok {
t . Log . Errorf ( "Skipping field '%s'. The type '%s' is not supported in Timestream as MeasureValue. " +
"Supported values are: [int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool]" ,
fieldName , reflect . TypeOf ( fieldValue ) )
continue
}
2022-01-07 06:28:23 +08:00
timeUnit , timeValue := getTimestreamTime ( point . Time ( ) )
2021-10-22 05:32:10 +08:00
record := types . Record {
2020-10-16 01:51:17 +08:00
MeasureName : aws . String ( fieldName ) ,
2021-10-22 05:32:10 +08:00
MeasureValueType : stringFieldValueType ,
2020-10-16 01:51:17 +08:00
MeasureValue : aws . String ( stringFieldValue ) ,
2022-01-07 06:28:23 +08:00
Dimensions : dimensions ,
Time : aws . String ( timeValue ) ,
TimeUnit : timeUnit ,
2020-10-16 01:51:17 +08:00
}
records = append ( records , record )
}
return records
}
// partitionRecords splits the Timestream records into smaller slices of a max size
// so that are under the limit for the Timestream API call.
// It returns the array of array of records.
2021-10-22 05:32:10 +08:00
func partitionRecords ( size int , records [ ] types . Record ) [ ] [ ] types . Record {
2020-10-16 01:51:17 +08:00
numberOfPartitions := len ( records ) / size
if len ( records ) % size != 0 {
numberOfPartitions ++
}
2021-10-22 05:32:10 +08:00
partitions := make ( [ ] [ ] types . Record , numberOfPartitions )
2020-10-16 01:51:17 +08:00
for i := 0 ; i < numberOfPartitions ; i ++ {
start := size * i
end := size * ( i + 1 )
if end > len ( records ) {
end = len ( records )
}
partitions [ i ] = records [ start : end ]
}
return partitions
}
// getTimestreamTime produces Timestream TimeUnit and TimeValue with minimum possible granularity
// while maintaining the same information.
2021-10-22 05:32:10 +08:00
func getTimestreamTime ( t time . Time ) ( timeUnit types . TimeUnit , timeValue string ) {
nanosTime := t . UnixNano ( )
2020-10-16 01:51:17 +08:00
if nanosTime % 1e9 == 0 {
2021-10-22 05:32:10 +08:00
timeUnit = types . TimeUnitSeconds
2020-10-16 01:51:17 +08:00
timeValue = strconv . FormatInt ( nanosTime / 1e9 , 10 )
} else if nanosTime % 1e6 == 0 {
2021-10-22 05:32:10 +08:00
timeUnit = types . TimeUnitMilliseconds
2020-10-16 01:51:17 +08:00
timeValue = strconv . FormatInt ( nanosTime / 1e6 , 10 )
} else if nanosTime % 1e3 == 0 {
2021-10-22 05:32:10 +08:00
timeUnit = types . TimeUnitMicroseconds
2020-10-16 01:51:17 +08:00
timeValue = strconv . FormatInt ( nanosTime / 1e3 , 10 )
} else {
2021-10-22 05:32:10 +08:00
timeUnit = types . TimeUnitNanoseconds
2020-10-16 01:51:17 +08:00
timeValue = strconv . FormatInt ( nanosTime , 10 )
}
2021-11-25 03:33:45 +08:00
return timeUnit , timeValue
2020-10-16 01:51:17 +08:00
}
// convertValue converts single Field value from Telegraf Metric and produces
// value, valueType Timestream representation.
2021-10-22 05:32:10 +08:00
func convertValue ( v interface { } ) ( value string , valueType types . MeasureValueType , ok bool ) {
2020-10-16 01:51:17 +08:00
ok = true
switch t := v . ( type ) {
case int :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBigint
2020-10-16 01:51:17 +08:00
value = strconv . FormatInt ( int64 ( t ) , 10 )
case int8 :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBigint
2020-10-16 01:51:17 +08:00
value = strconv . FormatInt ( int64 ( t ) , 10 )
case int16 :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBigint
2020-10-16 01:51:17 +08:00
value = strconv . FormatInt ( int64 ( t ) , 10 )
case int32 :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBigint
2020-10-16 01:51:17 +08:00
value = strconv . FormatInt ( int64 ( t ) , 10 )
case int64 :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBigint
2020-10-16 01:51:17 +08:00
value = strconv . FormatInt ( t , 10 )
case uint :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBigint
2020-10-16 01:51:17 +08:00
value = strconv . FormatUint ( uint64 ( t ) , 10 )
case uint8 :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBigint
2020-10-16 01:51:17 +08:00
value = strconv . FormatUint ( uint64 ( t ) , 10 )
case uint16 :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBigint
2020-10-16 01:51:17 +08:00
value = strconv . FormatUint ( uint64 ( t ) , 10 )
case uint32 :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBigint
2020-10-16 01:51:17 +08:00
value = strconv . FormatUint ( uint64 ( t ) , 10 )
case uint64 :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBigint
2020-10-16 01:51:17 +08:00
value = strconv . FormatUint ( t , 10 )
case float32 :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeDouble
2020-10-16 01:51:17 +08:00
value = strconv . FormatFloat ( float64 ( t ) , 'f' , - 1 , 32 )
case float64 :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeDouble
2020-10-16 01:51:17 +08:00
value = strconv . FormatFloat ( t , 'f' , - 1 , 64 )
case bool :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeBoolean
2020-10-16 01:51:17 +08:00
if t {
value = "true"
} else {
value = "false"
}
case string :
2021-10-22 05:32:10 +08:00
valueType = types . MeasureValueTypeVarchar
2020-10-16 01:51:17 +08:00
value = t
default :
// Skip unsupported type.
ok = false
2021-11-25 03:33:45 +08:00
return value , valueType , ok
2020-10-16 01:51:17 +08:00
}
2021-11-25 03:33:45 +08:00
return value , valueType , ok
2020-10-16 01:51:17 +08:00
}