feat(parsers.xpath): Add option to skip (header) bytes (#11933)

This commit is contained in:
Sven Rebhan 2022-10-11 17:27:07 +02:00 committed by GitHub
parent 8c00fe3d45
commit 8ca3b9262a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 143 additions and 53 deletions

View File

@ -73,6 +73,23 @@ You should use the following setting
... ...
``` ```
#### `xpath_protobuf_skip_bytes` (optional)
This option allows to skip a number of bytes before trying to parse
the protocol-buffer message. This is useful in cases where the raw data
has a header e.g. for the message length or in case of GRPC messages.
This is a list of known headers and the corresponding values for
`xpath_protobuf_skip_bytes`
| name | setting | comment |
| --------------------------------------- | ------- | ------- |
| [GRPC protocol][GRPC] | 5 | GRPC adds a 5-byte header for _Length-Prefixed-Messages_ |
| [PowerDNS logging][PDNS] | 2 | Sent messages contain a 2-byte header containing the message length |
[GRPC]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
[PDNS]: https://docs.powerdns.com/recursor/lua-config/protobuf.html
## Configuration ## Configuration
```toml ```toml
@ -92,6 +109,8 @@ You should use the following setting
# xpath_protobuf_type = "org.eclipse.tahu.protobuf.Payload" # xpath_protobuf_type = "org.eclipse.tahu.protobuf.Payload"
## List of paths to use when looking up imported protocol-buffer definition files. ## List of paths to use when looking up imported protocol-buffer definition files.
# xpath_protobuf_import_paths = ["."] # xpath_protobuf_import_paths = ["."]
## Number of (header) bytes to ignore before parsing the message.
# xpath_protobuf_skip_bytes = 0
## Print the internal XML document when in debug logging mode. ## Print the internal XML document when in debug logging mode.
## This is especially useful when using the parser with non-XML formats like protocol-buffers ## This is especially useful when using the parser with non-XML formats like protocol-buffers

View File

@ -34,6 +34,7 @@ type Parser struct {
ProtobufMessageDef string `toml:"xpath_protobuf_file"` ProtobufMessageDef string `toml:"xpath_protobuf_file"`
ProtobufMessageType string `toml:"xpath_protobuf_type"` ProtobufMessageType string `toml:"xpath_protobuf_type"`
ProtobufImportPaths []string `toml:"xpath_protobuf_import_paths"` ProtobufImportPaths []string `toml:"xpath_protobuf_import_paths"`
ProtobufSkipBytes int64 `toml:"xpath_protobuf_skip_bytes"`
PrintDocument bool `toml:"xpath_print_document"` PrintDocument bool `toml:"xpath_print_document"`
AllowEmptySelection bool `toml:"xpath_allow_empty_selection"` AllowEmptySelection bool `toml:"xpath_allow_empty_selection"`
NativeTypes bool `toml:"xpath_native_types"` NativeTypes bool `toml:"xpath_native_types"`
@ -94,6 +95,7 @@ func (p *Parser) Init() error {
MessageDefinition: p.ProtobufMessageDef, MessageDefinition: p.ProtobufMessageDef,
MessageType: p.ProtobufMessageType, MessageType: p.ProtobufMessageType,
ImportPaths: p.ProtobufImportPaths, ImportPaths: p.ProtobufImportPaths,
SkipBytes: p.ProtobufSkipBytes,
Log: p.Log, Log: p.Log,
} }
if err := pbdoc.Init(); err != nil { if err := pbdoc.Init(); err != nil {

View File

@ -1,9 +1,6 @@
package xpath package xpath
import ( import (
"bufio"
"errors"
"fmt"
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
@ -11,6 +8,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
@ -1402,73 +1400,82 @@ func TestMultipleConfigs(t *testing.T) {
// Make sure the folder contains data // Make sure the folder contains data
require.NotEmpty(t, folders) require.NotEmpty(t, folders)
// Register the wrapper plugin
inputs.Add("file", func() telegraf.Input {
return &file.File{}
})
// Prepare the influx parser for expectations
parser := &influx.Parser{}
require.NoError(t, parser.Init())
// Compare options
options := []cmp.Option{
testutil.IgnoreTime(),
testutil.SortMetrics(),
}
for _, f := range folders { for _, f := range folders {
if !f.IsDir() { // Only handle folders
if !f.IsDir() || f.Name() == "protos" {
continue continue
} }
t.Run(f.Name(), func(t *testing.T) { testcasePath := filepath.Join("testcases", f.Name())
configFilename := filepath.Join("testcases", f.Name(), "telegraf.conf") configFilename := filepath.Join(testcasePath, "telegraf.conf")
expectedFilename := filepath.Join("testcases", f.Name(), "expected.out") expectedFilename := filepath.Join(testcasePath, "expected.out")
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")
// Process the telegraf config file for the test t.Run(f.Name(), func(t *testing.T) {
buf, err := os.ReadFile(configFilename) // Read the expected output if any
if err != nil && errors.Is(err, os.ErrNotExist) { var expected []telegraf.Metric
return if _, err := os.Stat(expectedFilename); err == nil {
var err error
expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser)
require.NoError(t, err)
} }
require.NoError(t, err)
inputs.Add("file", func() telegraf.Input { // Read the expected output if any
return &file.File{} var expectedErrors []string
}) if _, err := os.Stat(expectedErrorFilename); err == nil {
var err error
expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename)
require.NoError(t, err)
require.NotEmpty(t, expectedErrors)
}
// Configure the plugin
cfg := config.NewConfig() cfg := config.NewConfig()
require.NoError(t, cfg.LoadConfigData(buf)) require.NoError(t, cfg.LoadConfig(configFilename))
require.NotEmpty(t, cfg.Inputs)
// Gather the metrics from the input file configure // Gather the metrics from the input file configure
acc := testutil.Accumulator{} var acc testutil.Accumulator
var errs []error
for _, input := range cfg.Inputs { for _, input := range cfg.Inputs {
require.NoError(t, input.Init()) require.NoError(t, input.Init())
require.NoError(t, input.Gather(&acc)) err := input.Gather(&acc)
if err != nil {
errs = append(errs, err)
}
}
// Check for errors if we expect any
if len(expectedErrors) > 0 {
require.Len(t, errs, len(expectedErrors))
for i, err := range errs {
require.ErrorContains(t, err, expectedErrors[i])
}
} else {
require.Empty(t, errs)
} }
// Process expected metrics and compare with resulting metrics // Process expected metrics and compare with resulting metrics
expected, err := readMetricFile(expectedFilename)
require.NoError(t, err)
actual := acc.GetTelegrafMetrics() actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) testutil.RequireMetricsEqual(t, expected, actual, options...)
}) })
} }
} }
func readMetricFile(filename string) ([]telegraf.Metric, error) {
var metrics []telegraf.Metric
f, err := os.Open(filename)
if err != nil {
return metrics, err
}
defer f.Close()
parser := &influx.Parser{}
if err := parser.Init(); err != nil {
return nil, err
}
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
if line != "" {
m, err := parser.ParseLine(line)
if err != nil {
return nil, fmt.Errorf("unable to parse metric in %q failed: %v", line, err)
}
// The timezone needs to be UTC to match the timestamp test results
m.SetTime(m.Time().UTC())
metrics = append(metrics, m)
}
}
return metrics, nil
}
func loadTestConfiguration(filename string) (*xpath.Config, []string, error) { func loadTestConfiguration(filename string) (*xpath.Config, []string, error) {
buf, err := os.ReadFile(filename) buf, err := os.ReadFile(filename)
if err != nil { if err != nil {

View File

@ -1,6 +1,7 @@
package xpath package xpath
import ( import (
"encoding/hex"
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
@ -23,6 +24,7 @@ type protobufDocument struct {
MessageDefinition string MessageDefinition string
MessageType string MessageType string
ImportPaths []string ImportPaths []string
SkipBytes int64
Log telegraf.Logger Log telegraf.Logger
msg *dynamicpb.Message msg *dynamicpb.Message
} }
@ -94,7 +96,9 @@ func (d *protobufDocument) Parse(buf []byte) (dataNode, error) {
msg := d.msg.New() msg := d.msg.New()
// Unmarshal the received buffer // Unmarshal the received buffer
if err := proto.Unmarshal(buf, msg.Interface()); err != nil { if err := proto.Unmarshal(buf[d.SkipBytes:], msg.Interface()); err != nil {
hexbuf := hex.EncodeToString(buf)
d.Log.Debugf("raw data (hex): %q (skip %d bytes)", hexbuf, d.SkipBytes)
return nil, err return nil, err
} }

View File

@ -0,0 +1 @@
cannot parse invalid wire-format data

View File

@ -0,0 +1,10 @@
syntax = "proto3";
package native_type;
message Message {
string a = 1;
double b = 2;
int32 c = 3;
bool d = 4;
}

View File

@ -0,0 +1,18 @@
[[inputs.file]]
files = ["./testcases/protobuf_skip_bytes_grpc/test.dat"]
data_format = "xpath_protobuf"
xpath_native_types = true
xpath_protobuf_file = "message.proto"
xpath_protobuf_type = "native_type.Message"
xpath_protobuf_import_paths = [".", "./testcases/protobuf_skip_bytes_grpc"]
#xpath_protobuf_skip_bytes = 5
[[inputs.file.xpath]]
metric_name = "'native_types'"
[inputs.file.xpath.fields]
value_a = "//a"
value_b = "//b"
value_c = "//c"
value_d = "//d"

View File

@ -0,0 +1 @@
native_types value_a="a string",value_b=3.1415,value_c=42i,value_d=true

View File

@ -0,0 +1,10 @@
syntax = "proto3";
package native_type;
message Message {
string a = 1;
double b = 2;
int32 c = 3;
bool d = 4;
}

View File

@ -0,0 +1,18 @@
[[inputs.file]]
files = ["./testcases/protobuf_skip_bytes_grpc/test.dat"]
data_format = "xpath_protobuf"
xpath_native_types = true
xpath_protobuf_file = "message.proto"
xpath_protobuf_type = "native_type.Message"
xpath_protobuf_import_paths = [".", "./testcases/protobuf_skip_bytes_grpc"]
xpath_protobuf_skip_bytes = 5
[[inputs.file.xpath]]
metric_name = "'native_types'"
[inputs.file.xpath.fields]
value_a = "//a"
value_b = "//b"
value_c = "//c"
value_d = "//d"