feat(parsers.binary): Allow base64-encoded input data (#14961)
This commit is contained in:
parent
f674099fad
commit
03700b5983
|
|
@ -23,8 +23,9 @@ user-specified configurations.
|
||||||
## where "host" means the same endianness as the machine running Telegraf.
|
## where "host" means the same endianness as the machine running Telegraf.
|
||||||
# endianness = "host"
|
# endianness = "host"
|
||||||
|
|
||||||
## Interpret input as string containing hex-encoded data.
|
## Interpret input using the specified encoding
|
||||||
# hex_encoding = false
|
## Available values are "none" (raw bytes), "hex" and "base64"
|
||||||
|
# binary_encoding = "none"
|
||||||
|
|
||||||
## Multiple parsing sections are allowed
|
## Multiple parsing sections are allowed
|
||||||
[[inputs.file.binary]]
|
[[inputs.file.binary]]
|
||||||
|
|
@ -112,11 +113,17 @@ machine share the same endianness.
|
||||||
Alternatively, you can explicitly specify big-endian format (`"be"`) or
|
Alternatively, you can explicitly specify big-endian format (`"be"`) or
|
||||||
little-endian format (`"le"`).
|
little-endian format (`"le"`).
|
||||||
|
|
||||||
#### `hex_encoding` (optional)
|
#### `binary_encoding` (optional)
|
||||||
|
|
||||||
If `true`, the input data is interpreted as a string containing hex-encoded
|
If this option is not specified or set to `none`, the input data contains the
|
||||||
data like `C0 C7 21 A9`. The value is _case insensitive_ and can handle spaces,
|
binary data as raw bytes. This is the default.
|
||||||
however prefixes like `0x` or `x` are _not_ allowed.
|
|
||||||
|
If set to `hex`, the input data is interpreted as a string containing
|
||||||
|
hex-encoded data like `C0 C7 21 A9`. The value is _case insensitive_ and can
|
||||||
|
handle spaces and prefixes like `0x` or `x`.
|
||||||
|
|
||||||
|
If set to `base64` the input data is interpreted as a string containing
|
||||||
|
padded base64 data `RDLAAA==`.
|
||||||
|
|
||||||
### Non-byte aligned value extraction
|
### Non-byte aligned value extraction
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package binary
|
package binary
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/base64"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
@ -18,7 +19,8 @@ type Parser struct {
|
||||||
Endianess string `toml:"endianess" deprecated:"1.27.4;use 'endianness' instead"`
|
Endianess string `toml:"endianess" deprecated:"1.27.4;use 'endianness' instead"`
|
||||||
Endianness string `toml:"endianness"`
|
Endianness string `toml:"endianness"`
|
||||||
Configs []Config `toml:"binary"`
|
Configs []Config `toml:"binary"`
|
||||||
HexEncoding bool `toml:"hex_encoding"`
|
HexEncoding bool `toml:"hex_encoding" deprecated:"1.30.0;use 'binary_encoding' instead"`
|
||||||
|
Encoding string `toml:"binary_encoding"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
metricName string
|
metricName string
|
||||||
|
|
@ -27,9 +29,16 @@ type Parser struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Parser) Init() error {
|
func (p *Parser) Init() error {
|
||||||
|
// Keep backward compatibility
|
||||||
if p.Endianess != "" && p.Endianness == "" {
|
if p.Endianess != "" && p.Endianness == "" {
|
||||||
p.Endianness = p.Endianess
|
p.Endianness = p.Endianess
|
||||||
}
|
}
|
||||||
|
if p.HexEncoding {
|
||||||
|
if p.Encoding != "" && p.Encoding != "hex" {
|
||||||
|
return errors.New("conflicting settings between 'hex_encoding' and 'binary_encoding'")
|
||||||
|
}
|
||||||
|
p.Encoding = "hex"
|
||||||
|
}
|
||||||
|
|
||||||
switch p.Endianness {
|
switch p.Endianness {
|
||||||
case "le":
|
case "le":
|
||||||
|
|
@ -42,6 +51,12 @@ func (p *Parser) Init() error {
|
||||||
return fmt.Errorf("unknown endianness %q", p.Endianness)
|
return fmt.Errorf("unknown endianness %q", p.Endianness)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch p.Encoding {
|
||||||
|
case "", "none", "hex", "base64":
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown encoding %q", p.Encoding)
|
||||||
|
}
|
||||||
|
|
||||||
// Pre-process the configurations
|
// Pre-process the configurations
|
||||||
if len(p.Configs) == 0 {
|
if len(p.Configs) == 0 {
|
||||||
return errors.New("no configuration given")
|
return errors.New("no configuration given")
|
||||||
|
|
@ -61,14 +76,25 @@ func (p *Parser) Parse(data []byte) ([]telegraf.Metric, error) {
|
||||||
|
|
||||||
// If the data is encoded in HEX, we need to decode it first
|
// If the data is encoded in HEX, we need to decode it first
|
||||||
buf := data
|
buf := data
|
||||||
if p.HexEncoding {
|
switch p.Encoding {
|
||||||
s := strings.ReplaceAll(string(data), " ", "")
|
case "hex":
|
||||||
|
s := strings.TrimPrefix(string(data), "0x")
|
||||||
|
s = strings.TrimPrefix(s, "x")
|
||||||
|
s = strings.TrimSpace(s)
|
||||||
|
s = strings.ReplaceAll(s, " ", "")
|
||||||
s = strings.ReplaceAll(s, "\t", "")
|
s = strings.ReplaceAll(s, "\t", "")
|
||||||
var err error
|
var err error
|
||||||
buf, err = hex.DecodeString(s)
|
buf, err = hex.DecodeString(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("decoding hex failed: %w", err)
|
return nil, fmt.Errorf("decoding hex failed: %w", err)
|
||||||
}
|
}
|
||||||
|
case "base64":
|
||||||
|
decoder := base64.StdEncoding.WithPadding(base64.StdPadding)
|
||||||
|
var err error
|
||||||
|
buf, err = decoder.DecodeString(strings.TrimSpace(string(data)))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decoding base64 failed: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
matches := 0
|
matches := 0
|
||||||
|
|
|
||||||
|
|
@ -11,16 +11,15 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
|
||||||
"github.com/influxdata/telegraf/plugins/inputs/file"
|
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
test "github.com/influxdata/telegraf/testutil/plugin_input"
|
||||||
)
|
)
|
||||||
|
|
||||||
var dummyEntry = Entry{
|
var dummyEntry = Entry{
|
||||||
|
|
@ -1410,60 +1409,45 @@ func TestCases(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEmpty(t, folders)
|
require.NotEmpty(t, folders)
|
||||||
|
|
||||||
// Register the plugin
|
|
||||||
inputs.Add("file", func() telegraf.Input {
|
|
||||||
return &file.File{}
|
|
||||||
})
|
|
||||||
|
|
||||||
// Prepare the influx parser for expectations
|
|
||||||
parser := &influx.Parser{}
|
|
||||||
require.NoError(t, parser.Init())
|
|
||||||
|
|
||||||
for _, f := range folders {
|
for _, f := range folders {
|
||||||
testcasePath := filepath.Join("testcases", f.Name())
|
testcasePath := filepath.Join("testcases", f.Name())
|
||||||
configFilename := filepath.Join(testcasePath, "telegraf.conf")
|
configFilename := filepath.Join(testcasePath, "telegraf.conf")
|
||||||
expectedFilename := filepath.Join(testcasePath, "expected.out")
|
|
||||||
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")
|
|
||||||
|
|
||||||
t.Run(f.Name(), func(t *testing.T) {
|
t.Run(f.Name(), func(t *testing.T) {
|
||||||
// Read the expected output if any
|
|
||||||
var expected []telegraf.Metric
|
|
||||||
if _, err := os.Stat(expectedFilename); err == nil {
|
|
||||||
var err error
|
|
||||||
expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the expected errors if any
|
|
||||||
var expectedErrors []string
|
|
||||||
if _, err := os.Stat(expectedErrorFilename); err == nil {
|
|
||||||
var err error
|
|
||||||
expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NotEmpty(t, expectedErrors)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Configure the plugin
|
// Configure the plugin
|
||||||
cfg := config.NewConfig()
|
cfg := config.NewConfig()
|
||||||
require.NoError(t, cfg.LoadConfig(configFilename))
|
require.NoError(t, cfg.LoadConfig(configFilename))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.Len(t, cfg.Inputs, 1)
|
||||||
|
|
||||||
// Gather the metrics from the input file configure
|
// Tune the test-plugin
|
||||||
|
plugin := cfg.Inputs[0].Input.(*test.Plugin)
|
||||||
|
plugin.Path = testcasePath
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
// Gather the metrics and check for potential errors
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
var actualErrors []string
|
err := plugin.Gather(&acc)
|
||||||
for _, input := range cfg.Inputs {
|
switch len(plugin.ExpectedErrors) {
|
||||||
require.NoError(t, input.Init())
|
case 0:
|
||||||
if err := input.Gather(&acc); err != nil {
|
require.NoError(t, err)
|
||||||
actualErrors = append(actualErrors, err.Error())
|
case 1:
|
||||||
}
|
require.ErrorContains(t, err, plugin.ExpectedErrors[0])
|
||||||
|
default:
|
||||||
|
require.Contains(t, plugin.ExpectedErrors, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for potential errors
|
// Determine checking options
|
||||||
require.ElementsMatch(t, actualErrors, expectedErrors)
|
options := []cmp.Option{
|
||||||
|
cmpopts.EquateApprox(0, 1e-6),
|
||||||
|
}
|
||||||
|
if plugin.ShouldIgnoreTimestamp {
|
||||||
|
options = append(options, testutil.IgnoreTime())
|
||||||
|
}
|
||||||
|
|
||||||
// Process expected metrics and compare with resulting metrics
|
// Process expected metrics and compare with resulting metrics
|
||||||
actual := acc.GetTelegrafMetrics()
|
actual := acc.GetTelegrafMetrics()
|
||||||
testutil.RequireMetricsEqual(t, expected, actual)
|
testutil.RequireMetricsEqual(t, plugin.Expected, actual, options...)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1478,8 +1462,8 @@ func TestHexEncoding(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
parser := &Parser{
|
parser := &Parser{
|
||||||
Endianness: "be",
|
Endianness: "be",
|
||||||
HexEncoding: true,
|
Encoding: "hex",
|
||||||
Configs: []Config{
|
Configs: []Config{
|
||||||
{
|
{
|
||||||
Entries: []Entry{dummyEntry},
|
Entries: []Entry{dummyEntry},
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
test value=715
|
||||||
|
test value=208.5
|
||||||
|
test value=0.471
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
RDLAAA==
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
Q1CAAA==
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
PvEm6Q==
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
[[inputs.test]]
|
||||||
|
files = ["messageA.bin", "messageB.bin", "messageC.bin"]
|
||||||
|
data_format = "binary"
|
||||||
|
endianness = "be"
|
||||||
|
binary_encoding = "base64"
|
||||||
|
|
||||||
|
[[inputs.test.binary]]
|
||||||
|
entries = [{ name = "value", type = "float32" }]
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
test value=715
|
||||||
|
test value=208.5
|
||||||
|
test value=0.471
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
0x4432c000
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
0x43508000
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
0x3ef126e9
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
[[inputs.test]]
|
||||||
|
files = ["messageA.bin", "messageB.bin", "messageC.bin"]
|
||||||
|
data_format = "binary"
|
||||||
|
endianness = "be"
|
||||||
|
binary_encoding = "hex"
|
||||||
|
|
||||||
|
[[inputs.test.binary]]
|
||||||
|
entries = [{ name = "value", type = "float32" }]
|
||||||
|
|
@ -1,9 +1,9 @@
|
||||||
[[inputs.file]]
|
[[inputs.test]]
|
||||||
files = ["./testcases/multiple_messages/messageA.bin", "./testcases/multiple_messages/messageB.bin", "./testcases/multiple_messages/messageC.bin"]
|
files = ["messageA.bin", "messageB.bin", "messageC.bin"]
|
||||||
data_format = "binary"
|
data_format = "binary"
|
||||||
endianness = "le"
|
endianness = "le"
|
||||||
|
|
||||||
[[inputs.file.binary]]
|
[[inputs.test.binary]]
|
||||||
metric_name = "metricA"
|
metric_name = "metricA"
|
||||||
|
|
||||||
entries = [
|
entries = [
|
||||||
|
|
@ -15,12 +15,12 @@
|
||||||
{ type = "unix", assignment = "time" },
|
{ type = "unix", assignment = "time" },
|
||||||
]
|
]
|
||||||
|
|
||||||
[inputs.file.binary.filter]
|
[inputs.test.binary.filter]
|
||||||
selection = [
|
selection = [
|
||||||
{ offset = 16, bits = 8, match = "0x0A" },
|
{ offset = 16, bits = 8, match = "0x0A" },
|
||||||
]
|
]
|
||||||
|
|
||||||
[[inputs.file.binary]]
|
[[inputs.test.binary]]
|
||||||
metric_name = "metricB"
|
metric_name = "metricB"
|
||||||
|
|
||||||
entries = [
|
entries = [
|
||||||
|
|
@ -29,10 +29,10 @@
|
||||||
{ type = "unix", assignment = "time" },
|
{ type = "unix", assignment = "time" },
|
||||||
]
|
]
|
||||||
|
|
||||||
[inputs.file.binary.filter]
|
[inputs.test.binary.filter]
|
||||||
selection = [{ offset = 16, bits = 8, match = "0x0B" }]
|
selection = [{ offset = 16, bits = 8, match = "0x0B" }]
|
||||||
|
|
||||||
[[inputs.file.binary]]
|
[[inputs.test.binary]]
|
||||||
metric_name = "metricC"
|
metric_name = "metricC"
|
||||||
|
|
||||||
entries = [
|
entries = [
|
||||||
|
|
@ -42,5 +42,5 @@
|
||||||
{ type = "unix", assignment = "time" },
|
{ type = "unix", assignment = "time" },
|
||||||
]
|
]
|
||||||
|
|
||||||
[inputs.file.binary.filter]
|
[inputs.test.binary.filter]
|
||||||
selection = [{ offset = 16, bits = 8, match = "0x0C" }]
|
selection = [{ offset = 16, bits = 8, match = "0x0C" }]
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue