feat(processors.converter): Add support for base64 encoded IEEE floats (#16214)
This commit is contained in:
parent
ef098557ba
commit
b58c6a4416
|
|
@ -1,12 +1,12 @@
|
|||
# Converter Processor Plugin
|
||||
|
||||
The converter processor is used to change the type of tag or field values. In
|
||||
The converter processor is used to change the type of tag or field values. In
|
||||
addition to changing field types it can convert between fields and tags.
|
||||
|
||||
Values that cannot be converted are dropped.
|
||||
|
||||
**Note:** When converting tags to fields, take care to ensure the series is
|
||||
still uniquely identifiable. Fields with the same series key (measurement +
|
||||
still uniquely identifiable. Fields with the same series key (measurement +
|
||||
tags) will overwrite one another.
|
||||
|
||||
**Note on large strings being converted to numeric types:** When converting a
|
||||
|
|
@ -67,6 +67,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
boolean = []
|
||||
float = []
|
||||
|
||||
## Optional field to use for converting base64 encoding of IEEE 754 Float32 values
|
||||
## i.e. data_json_content_state_openconfig-platform-psu:output-power":"RKeAAA=="
|
||||
## into a float32 value 1340
|
||||
# base64_ieee_float32 = []
|
||||
|
||||
## Optional field to use as metric timestamp
|
||||
# timestamp = []
|
||||
|
||||
|
|
|
|||
|
|
@ -3,9 +3,12 @@ package converter
|
|||
|
||||
import (
|
||||
_ "embed"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/big"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
|
|
@ -18,15 +21,16 @@ import (
|
|||
var sampleConfig string
|
||||
|
||||
type Conversion struct {
|
||||
Measurement []string `toml:"measurement"`
|
||||
Tag []string `toml:"tag"`
|
||||
String []string `toml:"string"`
|
||||
Integer []string `toml:"integer"`
|
||||
Unsigned []string `toml:"unsigned"`
|
||||
Boolean []string `toml:"boolean"`
|
||||
Float []string `toml:"float"`
|
||||
Timestamp []string `toml:"timestamp"`
|
||||
TimestampFormat string `toml:"timestamp_format"`
|
||||
Measurement []string `toml:"measurement"`
|
||||
Tag []string `toml:"tag"`
|
||||
String []string `toml:"string"`
|
||||
Integer []string `toml:"integer"`
|
||||
Unsigned []string `toml:"unsigned"`
|
||||
Boolean []string `toml:"boolean"`
|
||||
Float []string `toml:"float"`
|
||||
Timestamp []string `toml:"timestamp"`
|
||||
TimestampFormat string `toml:"timestamp_format"`
|
||||
Base64IEEEFloat32 []string `toml:"base64_ieee_float32"`
|
||||
}
|
||||
|
||||
type Converter struct {
|
||||
|
|
@ -39,14 +43,15 @@ type Converter struct {
|
|||
}
|
||||
|
||||
type ConversionFilter struct {
|
||||
Measurement filter.Filter
|
||||
Tag filter.Filter
|
||||
String filter.Filter
|
||||
Integer filter.Filter
|
||||
Unsigned filter.Filter
|
||||
Boolean filter.Filter
|
||||
Float filter.Filter
|
||||
Timestamp filter.Filter
|
||||
Measurement filter.Filter
|
||||
Tag filter.Filter
|
||||
String filter.Filter
|
||||
Integer filter.Filter
|
||||
Unsigned filter.Filter
|
||||
Boolean filter.Filter
|
||||
Float filter.Filter
|
||||
Timestamp filter.Filter
|
||||
Base64IEEEFloat32 filter.Filter
|
||||
}
|
||||
|
||||
func (*Converter) SampleConfig() string {
|
||||
|
|
@ -132,6 +137,11 @@ func compileFilter(conv *Conversion) (*ConversionFilter, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
cf.Base64IEEEFloat32, err = filter.Compile(conv.Base64IEEEFloat32)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cf, nil
|
||||
}
|
||||
|
||||
|
|
@ -249,6 +259,14 @@ func (p *Converter) convertFields(metric telegraf.Metric) {
|
|||
metric.SetTime(time)
|
||||
metric.RemoveField(key)
|
||||
}
|
||||
|
||||
case p.fieldConversions.Base64IEEEFloat32 != nil && p.fieldConversions.Base64IEEEFloat32.Match(key):
|
||||
if v, err := base64ToFloat32(value.(string)); err != nil {
|
||||
p.Log.Errorf("Converting to base64_ieee_float32 [%T] failed: %v", value, err)
|
||||
metric.RemoveField(key)
|
||||
} else {
|
||||
metric.AddField(key, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -345,6 +363,32 @@ func toFloat(v interface{}) (float64, error) {
|
|||
return internal.ToFloat64(v)
|
||||
}
|
||||
|
||||
func base64ToFloat32(encoded string) (float32, error) {
|
||||
// Decode the Base64 string to bytes
|
||||
decodedBytes, err := base64.StdEncoding.DecodeString(encoded)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Check if the byte length matches a float32 (4 bytes)
|
||||
if len(decodedBytes) != 4 {
|
||||
return 0, errors.New("decoded byte length is not 4 bytes")
|
||||
}
|
||||
|
||||
// Convert the bytes to a string representation as per IEEE 754 of the bits
|
||||
bitsStrRepresentation := fmt.Sprintf("%08b%08b%08b%08b", decodedBytes[0], decodedBytes[1], decodedBytes[2], decodedBytes[3])
|
||||
|
||||
// Convert the bits to a uint32
|
||||
bits, err := strconv.ParseUint(bitsStrRepresentation, 2, 32)
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Convert the uint32 (bits) to a float32 based on IEEE 754 binary representation
|
||||
return math.Float32frombits(uint32(bits)), nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
processors.Add("converter", func() telegraf.Processor {
|
||||
return &Converter{}
|
||||
|
|
|
|||
|
|
@ -770,6 +770,34 @@ func TestMeasurement(t *testing.T) {
|
|||
),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "float32 from ieee754 float32 encoded as base64",
|
||||
converter: &Converter{
|
||||
Fields: &Conversion{
|
||||
Base64IEEEFloat32: []string{"a", "b"},
|
||||
},
|
||||
},
|
||||
input: testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": "QlAAAA==",
|
||||
"b": "QlgAAA==",
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
expected: []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"a": float32(52),
|
||||
"b": float32(54),
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -35,6 +35,11 @@
|
|||
boolean = []
|
||||
float = []
|
||||
|
||||
## Optional field to use for converting base64 encoding of IEEE 754 Float32 values
|
||||
## i.e. data_json_content_state_openconfig-platform-psu:output-power":"RKeAAA=="
|
||||
## into a float32 value 1340
|
||||
# base64_ieee_float32 = []
|
||||
|
||||
## Optional field to use as metric timestamp
|
||||
# timestamp = []
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue