fix(outputs.timestream): Clip uint64 values (#14213)
This commit is contained in:
parent
fa1ba97540
commit
7b7c7b6505
|
|
@ -123,6 +123,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
##
|
||||
```
|
||||
|
||||
### Unsigned Integers
|
||||
|
||||
Timestream does **DOES NOT** support unsigned int64 values. Values using uint64,
|
||||
which are less than the maximum signed int64 are returned as expected. Any
|
||||
larger value is caped at the maximum int64 value.
|
||||
|
||||
### Batching
|
||||
|
||||
Timestream WriteInputRequest.CommonAttributes are used to efficiently write data
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
_ "embed"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
|
@ -609,7 +610,11 @@ func convertValue(v interface{}) (value string, valueType types.MeasureValueType
|
|||
value = strconv.FormatUint(uint64(t), 10)
|
||||
case uint64:
|
||||
valueType = types.MeasureValueTypeBigint
|
||||
value = strconv.FormatUint(t, 10)
|
||||
if t <= uint64(math.MaxInt64) {
|
||||
value = strconv.FormatUint(t, 10)
|
||||
} else {
|
||||
value = strconv.FormatUint(math.MaxInt64, 10)
|
||||
}
|
||||
case float32:
|
||||
valueType = types.MeasureValueTypeDouble
|
||||
value = strconv.FormatFloat(float64(t), 'f', -1, 32)
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package timestream
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
|
@ -381,6 +382,24 @@ func TestBuildMultiMeasuresInSingleAndMultiTableMode(t *testing.T) {
|
|||
time1,
|
||||
)
|
||||
|
||||
input5 := testutil.MustMetric(
|
||||
metricName1,
|
||||
map[string]string{"tag5": "value5"},
|
||||
map[string]interface{}{
|
||||
"measureMaxUint64": uint64(math.MaxUint64),
|
||||
},
|
||||
time1,
|
||||
)
|
||||
|
||||
input6 := testutil.MustMetric(
|
||||
metricName1,
|
||||
map[string]string{"tag6": "value6"},
|
||||
map[string]interface{}{
|
||||
"measureSmallUint64": uint64(123456),
|
||||
},
|
||||
time1,
|
||||
)
|
||||
|
||||
expectedResultMultiTable := buildExpectedMultiRecords("config-multi-measure-name", metricName1)
|
||||
|
||||
plugin := Timestream{
|
||||
|
|
@ -396,7 +415,7 @@ func TestBuildMultiMeasuresInSingleAndMultiTableMode(t *testing.T) {
|
|||
require.NoError(t, err, "Invalid configuration")
|
||||
|
||||
// validate multi-record generation with MappingModeMultiTable
|
||||
result := plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4})
|
||||
result := plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4, input5, input6})
|
||||
require.Len(t, result, 1, "Expected 1 WriteRecordsInput requests")
|
||||
|
||||
require.EqualValues(t, result[0], expectedResultMultiTable)
|
||||
|
|
@ -421,7 +440,7 @@ func TestBuildMultiMeasuresInSingleAndMultiTableMode(t *testing.T) {
|
|||
expectedResultSingleTable := buildExpectedMultiRecords(metricName1, "singleTableName")
|
||||
|
||||
// validate multi-record generation with MappingModeSingleTable
|
||||
result = plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4})
|
||||
result = plugin.TransformMetrics([]telegraf.Metric{input1, input2, input3, input4, input5, input6})
|
||||
require.Len(t, result, 1, "Expected 1 WriteRecordsInput requests")
|
||||
|
||||
require.EqualValues(t, result[0], expectedResultSingleTable)
|
||||
|
|
@ -473,6 +492,28 @@ func buildExpectedMultiRecords(multiMeasureName string, tableName string) *times
|
|||
|
||||
recordsMultiTableMode = append(recordsMultiTableMode, recordBool...)
|
||||
|
||||
recordMaxUint64 := buildMultiRecords([]SimpleInput{
|
||||
{
|
||||
t: time1Epoch,
|
||||
tableName: metricName1,
|
||||
dimensions: map[string]string{"tag5": "value5"},
|
||||
measureValues: map[string]string{"measureMaxUint64": "9223372036854775807"},
|
||||
},
|
||||
}, multiMeasureName, types.MeasureValueTypeBigint)
|
||||
|
||||
recordsMultiTableMode = append(recordsMultiTableMode, recordMaxUint64...)
|
||||
|
||||
recordUint64 := buildMultiRecords([]SimpleInput{
|
||||
{
|
||||
t: time1Epoch,
|
||||
tableName: metricName1,
|
||||
dimensions: map[string]string{"tag6": "value6"},
|
||||
measureValues: map[string]string{"measureSmallUint64": "123456"},
|
||||
},
|
||||
}, multiMeasureName, types.MeasureValueTypeBigint)
|
||||
|
||||
recordsMultiTableMode = append(recordsMultiTableMode, recordUint64...)
|
||||
|
||||
expectedResultMultiTable := ×treamwrite.WriteRecordsInput{
|
||||
DatabaseName: aws.String(tsDbName),
|
||||
TableName: aws.String(tableName),
|
||||
|
|
|
|||
Loading…
Reference in New Issue