fix(parsers.xpath): Allow resolving extensions (#15586)

This commit is contained in:
Sven Rebhan 2024-07-02 20:58:27 +02:00 committed by GitHub
parent 2b2f826ac0
commit 94e45a1e66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 3589 additions and 41 deletions

View File

@ -24,9 +24,9 @@ lib]. The only exception are _integer_ fields that need to be specified in a
For using the protocol-buffer format you need to specify additional
(_mandatory_) properties for the parser. Those options are described here.
#### `xpath_protobuf_file` (mandatory)
#### `xpath_protobuf_files` (mandatory)
Use this option to specify the name of the protocol-buffer definition file
Use this option to specify the name of the protocol-buffer definition files
(`.proto`).
#### `xpath_protobuf_type` (mandatory)
@ -67,7 +67,7 @@ You should use the following setting
files = ["example.dat"]
data_format = "xpath_protobuf"
xpath_protobuf_file = "A.proto"
xpath_protobuf_files = ["A.proto"]
xpath_protobuf_type = "foo.Measurement"
xpath_protobuf_import_paths = [".", "/data/my_proto_files"]
@ -114,7 +114,7 @@ XPath expressions.
## PROTOCOL-BUFFER definitions
## Protocol-buffer definition file
# xpath_protobuf_file = "sparkplug_b.proto"
# xpath_protobuf_files = ["sparkplug_b.proto"]
## Name of the protocol-buffer message type to use in a fully qualified form.
# xpath_protobuf_type = "org.eclipse.tahu.protobuf.Payload"
## List of paths to use when looking up imported protocol-buffer definition files.
@ -218,7 +218,7 @@ in the metric.
## PROTOCOL-BUFFER definitions
## Protocol-buffer definition file
# xpath_protobuf_file = "sparkplug_b.proto"
# xpath_protobuf_file = ["sparkplug_b.proto"]
## Name of the protocol-buffer message type to use in a fully qualified form.
# xpath_protobuf_type = "org.eclipse.tahu.protobuf.Payload"
## List of paths to use when looking up imported protocol-buffer definition files.

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"reflect"
"slices"
"strconv"
"strings"
"time"
@ -35,19 +36,20 @@ type dataDocument interface {
}
type Parser struct {
Format string `toml:"-"`
ProtobufMessageDef string `toml:"xpath_protobuf_file"`
ProtobufMessageType string `toml:"xpath_protobuf_type"`
ProtobufImportPaths []string `toml:"xpath_protobuf_import_paths"`
ProtobufSkipBytes int64 `toml:"xpath_protobuf_skip_bytes"`
PrintDocument bool `toml:"xpath_print_document"`
AllowEmptySelection bool `toml:"xpath_allow_empty_selection"`
NativeTypes bool `toml:"xpath_native_types"`
Trace bool `toml:"xpath_trace"`
Configs []Config `toml:"xpath"`
DefaultMetricName string `toml:"-"`
DefaultTags map[string]string `toml:"-"`
Log telegraf.Logger `toml:"-"`
Format string `toml:"-"`
ProtobufMessageFiles []string `toml:"xpath_protobuf_files"`
ProtobufMessageDef string `toml:"xpath_protobuf_file" deprecated:"1.32.0;1.40.0;use 'xpath_protobuf_files' instead"`
ProtobufMessageType string `toml:"xpath_protobuf_type"`
ProtobufImportPaths []string `toml:"xpath_protobuf_import_paths"`
ProtobufSkipBytes int64 `toml:"xpath_protobuf_skip_bytes"`
PrintDocument bool `toml:"xpath_print_document"`
AllowEmptySelection bool `toml:"xpath_allow_empty_selection"`
NativeTypes bool `toml:"xpath_native_types"`
Trace bool `toml:"xpath_trace"`
Configs []Config `toml:"xpath"`
DefaultMetricName string `toml:"-"`
DefaultTags map[string]string `toml:"-"`
Log telegraf.Logger `toml:"-"`
// Required for backward compatibility
ConfigsXML []Config `toml:"xml" deprecated:"1.23.1;1.35.0;use 'xpath' instead"`
@ -126,12 +128,15 @@ func (p *Parser) Init() error {
})
}
case "xpath_protobuf":
if p.ProtobufMessageDef != "" && !slices.Contains(p.ProtobufMessageFiles, p.ProtobufMessageDef) {
p.ProtobufMessageFiles = append(p.ProtobufMessageFiles, p.ProtobufMessageDef)
}
pbdoc := protobufDocument{
MessageDefinition: p.ProtobufMessageDef,
MessageType: p.ProtobufMessageType,
ImportPaths: p.ProtobufImportPaths,
SkipBytes: p.ProtobufSkipBytes,
Log: p.Log,
MessageFiles: p.ProtobufMessageFiles,
MessageType: p.ProtobufMessageType,
ImportPaths: p.ProtobufImportPaths,
SkipBytes: p.ProtobufSkipBytes,
Log: p.Log,
}
if err := pbdoc.Init(); err != nil {
return err

View File

@ -13,6 +13,7 @@ import (
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"
"github.com/srebhan/protobufquery"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
@ -22,18 +23,20 @@ import (
)
type protobufDocument struct {
MessageDefinition string
MessageType string
ImportPaths []string
SkipBytes int64
Log telegraf.Logger
msg *dynamicpb.Message
MessageFiles []string
MessageType string
ImportPaths []string
SkipBytes int64
Log telegraf.Logger
msg *dynamicpb.Message
unmarshaller proto.UnmarshalOptions
}
func (d *protobufDocument) Init() error {
// Check the message definition and type
if d.MessageDefinition == "" {
return errors.New("protocol-buffer message-definition not set")
if len(d.MessageFiles) == 0 {
return errors.New("protocol-buffer files not set")
}
if d.MessageType == "" {
return errors.New("protocol-buffer message-type not set")
@ -44,12 +47,12 @@ func (d *protobufDocument) Init() error {
ImportPaths: d.ImportPaths,
InferImportPaths: true,
}
fds, err := parser.ParseFiles(d.MessageDefinition)
fds, err := parser.ParseFiles(d.MessageFiles...)
if err != nil {
return fmt.Errorf("parsing protocol-buffer definition in %q failed: %w", d.MessageDefinition, err)
return fmt.Errorf("parsing protocol-buffer definition failed: %w", err)
}
if len(fds) < 1 {
return fmt.Errorf("file %q does not contain file descriptors", d.MessageDefinition)
return errors.New("files do not contain a file descriptor")
}
// Register all definitions in the file in the global registry
@ -57,6 +60,10 @@ func (d *protobufDocument) Init() error {
if err != nil {
return fmt.Errorf("constructing registry failed: %w", err)
}
d.unmarshaller = proto.UnmarshalOptions{
RecursionLimit: protowire.DefaultRecursionLimit,
Resolver: dynamicpb.NewTypes(registry),
}
// Lookup given type in the loaded file descriptors
msgFullName := protoreflect.FullName(d.MessageType)
@ -97,7 +104,7 @@ func (d *protobufDocument) Parse(buf []byte) (dataNode, error) {
msg := d.msg.New()
// Unmarshal the received buffer
if err := proto.Unmarshal(buf[d.SkipBytes:], msg.Interface()); err != nil {
if err := d.unmarshaller.Unmarshal(buf[d.SkipBytes:], msg.Interface()); err != nil {
hexbuf := hex.EncodeToString(buf)
d.Log.Debugf("raw data (hex): %q (skip %d bytes)", hexbuf, d.SkipBytes)
return nil, err

View File

@ -3,7 +3,7 @@
data_format = "xpath_protobuf"
xpath_native_types = true
xpath_protobuf_file = "message.proto"
xpath_protobuf_files = ["message.proto"]
xpath_protobuf_type = "native_type.Message"
xpath_protobuf_import_paths = [".", "./testcases/native_types_protobuf"]

View File

@ -2,7 +2,7 @@
files = ["./testcases/protobuf_benchmark/message.bin"]
data_format = "xpath_protobuf"
xpath_protobuf_file = "benchmark.proto"
xpath_protobuf_files = ["benchmark.proto"]
xpath_protobuf_type = "benchmark.BenchmarkData"
xpath_protobuf_import_paths = [".", "./testcases/protobuf_benchmark"]

View File

@ -3,7 +3,7 @@
data_format = "xpath_protobuf"
xpath_native_types = true
xpath_protobuf_file = "issue.proto"
xpath_protobuf_files = ["issue.proto"]
xpath_protobuf_type = "dunedaq.ersschema.IssueChain"
xpath_protobuf_import_paths = [".", "./testcases/protobuf_issue_13715"]

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1 @@
test component_id=0u,enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name="ge-0/0/0",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_1="ge-0/0/1",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_10="ge-0/0/0",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_11="ge-0/0/1",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_12="ge-0/0/0",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_13="ge-0/0/1",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_14="ge-0/0/0",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_15="ge-0/0/1",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_16="ge-0/0/0",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_17="ge-0/0/1",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_18="ge-0/0/0",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_19="ge-0/0/1",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_2="ge-0/0/0",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_3="ge-0/0/1",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_4="ge-0/0/0",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_5="ge-0/0/1",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_6="ge-0/0/0",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_7="ge-0/0/1",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_8="ge-0/0/0",enterprise_juniperNetworks_jnpr_interface_ext_interface_stats_if_name_9="ge-0/0/1",sensor_name="ROUTER-INF:/junos/system/linecard/interface/:/junos/system/linecard/interface/:PFE",sequence_number=103837u,sub_component_id=0u,system_id="ST-Justin:10.55.60.125",timestamp=1719150965216u,version_major=1u,version_minor=1u 1719150965216000000

View File

@ -0,0 +1,226 @@
//
// Copyrights (c) 2015, 2016, Juniper Networks, Inc.
// All rights reserved.
//
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
//
// Nitin Kumar, Jan 2015
//
// This file defines the messages in Protocol Buffers format used by
// the port sensor. The-top level messages is Port.
//
// Version 1.1
//
syntax = "proto2";
import "telemetry_top.proto";
//
// This occupies branch 3 from JuniperNetworksSensors
//
extend JuniperNetworksSensors {
optional Port jnpr_interface_ext = 3;
}
//
// Top-level message
//
message Port {
repeated InterfaceInfos interface_stats = 1;
}
//
// Interface information
//
message InterfaceInfos {
// Interface name, e.g., xe-0/0/0
required string if_name = 1 [(telemetry_options).is_key = true];
// Time when interface is created
optional uint64 init_time = 2;
// Global Index
optional uint32 snmp_if_index = 3;
// Name of parent for AE interface, if applicable
optional string parent_ae_name = 4;
// Egress queue information
repeated QueueStats egress_queue_info = 5;
// Ingress queue information
repeated QueueStats ingress_queue_info = 6;
// Inbound traffic statistics
optional InterfaceStats ingress_stats = 7;
// Outbound traffic statistics
optional InterfaceStats egress_stats = 8;
// Inbound traffic errors
optional IngressInterfaceErrors ingress_errors = 9;
// Interface administration status
optional string if_administration_status = 10;
// Interface operational status
optional string if_operational_status = 11;
// Interface description
optional string if_description = 12;
// Counter: number of carrier transitions on this interface
optional uint64 if_transitions = 13 [(telemetry_options).is_counter = true];
// This corresponds to the ifLastChange object in the standard interface MIB
optional uint32 ifLastChange = 14;
// This corresponds to the ifHighSpeed object in the standard interface MIB
optional uint32 ifHighSpeed = 15;
// Outbound traffic errors
optional EgressInterfaceErrors egress_errors = 16;
}
//
// Interface queue statistics
//
message QueueStats {
// Queue number
optional uint32 queue_number = 1 [(telemetry_options).is_key = true];
// The total number of packets that have been added to this queue
optional uint64 packets = 2 [(telemetry_options).is_counter = true];
// The total number of bytes that have been added to this queue
optional uint64 bytes = 3 [(telemetry_options).is_counter = true];
// The total number of tail dropped packets
optional uint64 tail_drop_packets = 4 [(telemetry_options).is_counter = true];
// The total number of rate-limited packets
optional uint64 rl_drop_packets = 5 [(telemetry_options).is_counter = true];
// The total number of rate-limited bytes
optional uint64 rl_drop_bytes = 6 [(telemetry_options).is_counter = true];
// The total number of red-dropped packets
optional uint64 red_drop_packets = 7 [(telemetry_options).is_counter = true];
// The total number of red-dropped bytes
optional uint64 red_drop_bytes = 8 [(telemetry_options).is_counter = true];
// Average queue depth, in packets
optional uint64 avg_buffer_occupancy = 9 [(telemetry_options).is_gauge = true];
// Current queue depth, in packets
optional uint64 cur_buffer_occupancy = 10 [(telemetry_options).is_gauge = true];
// The max measured queue depth, in packets, across all measurements since boot
optional uint64 peak_buffer_occupancy = 11 [(telemetry_options).is_gauge = true];
// Allocated buffer size
optional uint64 allocated_buffer_size = 12 [(telemetry_options).is_gauge = true];
}
//
// Interface statistics
//
message InterfaceStats {
// The total number of packets sent/received by this interface
optional uint64 if_pkts = 1 [(telemetry_options).is_counter = true];
// The total number of bytes sent/received by this interface
optional uint64 if_octets = 2 [(telemetry_options).is_counter = true];
// The rate at which packets are sent/received by this interface (in packets/sec)
optional uint64 if_1sec_pkts = 3 [(telemetry_options).is_gauge = true];
// The rate at which bytes are sent/received by this interface
optional uint64 if_1sec_octets = 4 [(telemetry_options).is_gauge = true];
// Total number of unicast packets sent/received by this interface
optional uint64 if_uc_pkts = 5 [(telemetry_options).is_counter = true];
// Total number of multicast packets sent/received by this interface
optional uint64 if_mc_pkts = 6 [(telemetry_options).is_counter = true];
// Total number of broadcast packets sent/received by this interface
optional uint64 if_bc_pkts = 7 [(telemetry_options).is_counter = true];
// Counter: total no of error packets sent/rcvd by this interface
optional uint64 if_error = 8 [(telemetry_options).is_counter = true];
// Counter: total no of PAUSE packets sent/rcvd by this interface
optional uint64 if_pause_pkts = 9 [(telemetry_options).is_counter = true];
// Counter: total no of UNKNOWN proto packets sent/rcvd by this interface
optional uint64 if_unknown_proto_pkts = 10 [(telemetry_options).is_counter = true];
}
//
// Inbound traffic error statistics
//
message IngressInterfaceErrors {
// The number of packets that contained errors
optional uint64 if_errors = 1 [(telemetry_options).is_counter = true];
// The number of packets dropped by the input queue of the I/O Manager ASIC
optional uint64 if_in_qdrops = 2 [(telemetry_options).is_counter = true];
// The number of packets which were misaligned
optional uint64 if_in_frame_errors = 3 [(telemetry_options).is_counter = true];
// The number of non-error packets which were chosen to be discarded
optional uint64 if_discards = 4 [(telemetry_options).is_counter = true];
// The number of runt packets
optional uint64 if_in_runts = 5 [(telemetry_options).is_counter = true];
// The number of packets that fail Layer 3 sanity checks of the header
optional uint64 if_in_l3_incompletes = 6 [(telemetry_options).is_counter = true];
// The number of packets for which the software could not find a valid logical interface
optional uint64 if_in_l2chan_errors = 7 [(telemetry_options).is_counter = true];
// The number of malform or short packets
optional uint64 if_in_l2_mismatch_timeouts = 8 [(telemetry_options).is_counter = true];
// The number of FIFO errors
optional uint64 if_in_fifo_errors = 9 [(telemetry_options).is_counter = true];
// The number of resource errors
optional uint64 if_in_resource_errors = 10 [(telemetry_options).is_counter = true];
}
//
// Outbound traffic error statistics
//
message EgressInterfaceErrors {
// The number of packets that contained errors
optional uint64 if_errors = 1 [(telemetry_options).is_counter = true];
// The number of non-error packets which were chosen to be discarded
optional uint64 if_discards = 2 [(telemetry_options).is_counter = true];
}

View File

@ -0,0 +1,16 @@
[[inputs.file]]
files = ["./testcases/protobuf_issue_15571/message.bin"]
data_format = "xpath_protobuf"
xpath_print_document = true
xpath_native_types = true
xpath_protobuf_files = ["telemetry_top.proto", "port.proto"]
xpath_protobuf_type = "TelemetryStream"
xpath_protobuf_import_paths = [".", "./testcases/protobuf_issue_15571"]
[[inputs.file.xpath]]
metric_name = "'test'"
field_selection = "* | //if_name"
field_name_expansion = true
timestamp = "//timestamp"
timestamp_format = "unix_ms"

View File

@ -0,0 +1,99 @@
//
// Copyrights (c) 2015, 2016, Juniper Networks, Inc.
// All rights reserved.
//
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
//
// This file defines the top level message used for all Juniper
// Telemetry packets encoded to the protocol buffer format.
// The top level message is TelemetryStream.
//
syntax = "proto2";
import "google/protobuf/descriptor.proto";
extend google.protobuf.FieldOptions {
optional TelemetryFieldOptions telemetry_options = 1024;
}
message TelemetryFieldOptions {
optional bool is_key = 1;
optional bool is_timestamp = 2;
optional bool is_counter = 3;
optional bool is_gauge = 4;
}
message TelemetryStream {
// router hostname
// (or, just in the case of legacy (microkernel) PFEs, the IP address)
required string system_id = 1 [(telemetry_options).is_key = true];
// line card / RE (slot number). For RE, it will be 65535
optional uint32 component_id = 2 [(telemetry_options).is_key = true];
// PFE (if applicable)
optional uint32 sub_component_id = 3 [(telemetry_options).is_key = true];
// Overload sensor name with "senor name, internal path, external path
// and component" seperated by ":". For RE sensors, component will be
// daemon-name and for PFE sensors it will be "PFE".
optional string sensor_name = 4 [(telemetry_options).is_key = true];
// sequence number, monotonically increasing for each
// system_id, component_id, sub_component_id + sensor_name.
optional uint32 sequence_number = 5;
// timestamp (milliseconds since 00:00:00 UTC 1/1/1970)
optional uint64 timestamp = 6 [(telemetry_options).is_timestamp = true];
// major version
optional uint32 version_major = 7;
// minor version
optional uint32 version_minor = 8;
// end-of-message marker, set to true when the end of wrap is reached
optional bool eom = 9;
optional IETFSensors ietf = 100;
optional EnterpriseSensors enterprise = 101;
}
message IETFSensors {
extensions 1 to max;
}
message EnterpriseSensors {
extensions 1 to max;
}
extend EnterpriseSensors {
// re-use IANA assigned numbers
optional JuniperNetworksSensors juniperNetworks = 2636;
}
message JuniperNetworksSensors {
extensions 1 to max;
}

View File

@ -3,7 +3,7 @@
data_format = "xpath_protobuf"
xpath_native_types = true
xpath_protobuf_file = "message.proto"
xpath_protobuf_files = ["message.proto"]
xpath_protobuf_type = "native_type.Message"
xpath_protobuf_import_paths = [".", "./testcases/protobuf_skip_bytes_grpc"]
#xpath_protobuf_skip_bytes = 5

View File

@ -3,7 +3,7 @@
data_format = "xpath_protobuf"
xpath_native_types = true
xpath_protobuf_file = "powerdns_message.proto"
xpath_protobuf_files = ["powerdns_message.proto"]
xpath_protobuf_type = "PBDNSMessage"
xpath_protobuf_import_paths = [".", "./testcases/protobuf_powerdns_hex"]
xpath_protobuf_skip_bytes = 2

View File

@ -3,7 +3,7 @@
data_format = "xpath_protobuf"
xpath_native_types = true
xpath_protobuf_file = "message.proto"
xpath_protobuf_files = ["message.proto"]
xpath_protobuf_type = "native_type.Message"
xpath_protobuf_import_paths = [".", "./testcases/protobuf_skip_bytes_grpc"]
xpath_protobuf_skip_bytes = 5