fix(inputs.netflow): Handle malformed inputs gracefully (#14373)

This commit is contained in:
Sven Rebhan 2023-12-04 16:16:59 +01:00 committed by GitHub
parent 6ff28c7593
commit ce64421419
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 422 additions and 263 deletions

View File

@ -17,7 +17,7 @@ import (
var regexpIPFIXPENMapping = regexp.MustCompile(`\d+\.\d+`)
type decoderFunc func([]byte) interface{}
type decoderFunc func([]byte) (interface{}, error)
type fieldMapping struct {
name string
@ -62,8 +62,8 @@ var fieldMappingsNetflowCommon = map[uint16][]fieldMapping{
30: {{"dst_mask", decodeUint}}, // IPV6_DST_MASK / destinationIPv6PrefixLength
31: {{"flow_label", decodeHex}}, // IPV6_FLOW_LABEL / flowLabelIPv6
32: {
{"icmp_type", func(b []byte) interface{} { return b[0] }}, // ICMP_TYPE / icmpTypeCodeIPv4
{"icmp_code", func(b []byte) interface{} { return b[1] }},
{"icmp_type", decodeByteFunc(0)}, // ICMP_TYPE / icmpTypeCodeIPv4
{"icmp_code", decodeByteFunc(1)},
},
33: {{"igmp_type", decodeUint}}, // MUL_IGMP_TYPE / igmpType
34: {{"sampling_interval", decodeUint}}, // SAMPLING_INTERVAL / samplingInterval (deprecated)
@ -187,8 +187,8 @@ var fieldMappingsIPFIX = map[uint16][]fieldMapping{
137: {{"common_properties_id", decodeUint}}, // commonPropertiesId
138: {{"observation_point_id", decodeUint}}, // observationPointId
139: {
{"icmp_type", func(b []byte) interface{} { return b[0] }}, // icmpTypeCodeIPv6
{"icmp_code", func(b []byte) interface{} { return b[1] }},
{"icmp_type", decodeByteFunc(0)}, // icmpTypeCodeIPv6
{"icmp_code", decodeByteFunc(1)},
},
140: {{"mpls_top_label_ip", decodeIP}}, // mplsTopLabelIPv6Address
141: {{"linecard_id", decodeUint}}, // lineCardId
@ -468,7 +468,7 @@ var fieldMappingsIPFIX = map[uint16][]fieldMapping{
431: {{"layer2_frames_total", decodeUint}}, // layer2FrameTotalCount
432: {{"pseudo_wire_dst", decodeIP}}, // pseudoWireDestinationIPv4Address
433: {{"ignored_layer2_frames_total", decodeUint}}, // ignoredLayer2FrameTotalCount
434: {{"mib_obj_value_int", decodeInt32}}, // mibObjectValueInteger
434: {{"mib_obj_value_int", decodeInt}}, // mibObjectValueInteger
435: {{"mib_obj_value_str", decodeString}}, // mibObjectValueOctetString
436: {{"mib_obj_value_oid", decodeHex}}, // mibObjectValueOID
437: {{"mib_obj_value_bits", decodeHex}}, // mibObjectValueBits
@ -584,7 +584,12 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
}
fields := make(map[string]interface{})
for _, value := range record.Values {
for _, field := range d.decodeValueV9(value) {
decodedFields, err := d.decodeValueV9(value)
if err != nil {
d.Log.Errorf("decoding record %+v failed: %v", record, err)
continue
}
for _, field := range decodedFields {
fields[field.Key] = field.Value
}
}
@ -607,7 +612,12 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
fields := make(map[string]interface{})
t := time.Now()
for _, value := range record.Values {
for _, field := range d.decodeValueIPFIX(value) {
decodedFields, err := d.decodeValueIPFIX(value)
if err != nil {
d.Log.Errorf("decoding value %+v failed: %v", value, err)
continue
}
for _, field := range decodedFields {
fields[field.Key] = field.Value
}
}
@ -657,37 +667,43 @@ func (d *netflowDecoder) Init() error {
return nil
}
func (d *netflowDecoder) decodeValueV9(field netflow.DataField) []telegraf.Field {
func (d *netflowDecoder) decodeValueV9(field netflow.DataField) ([]telegraf.Field, error) {
raw := field.Value.([]byte)
elementID := field.Type
// Check the user-specified mapping
if m, found := d.mappingsV9[elementID]; found {
return []telegraf.Field{{Key: m.name, Value: m.decoder(raw)}}
v, err := m.decoder(raw)
if err != nil {
return nil, err
}
return []telegraf.Field{{Key: m.name, Value: v}}, nil
}
// Check the version specific default field mappings
if mappings, found := fieldMappingsNetflowV9[elementID]; found {
var fields []telegraf.Field
for _, m := range mappings {
fields = append(fields, telegraf.Field{
Key: m.name,
Value: m.decoder(raw),
})
v, err := m.decoder(raw)
if err != nil {
return nil, err
}
return fields
fields = append(fields, telegraf.Field{Key: m.name, Value: v})
}
return fields, nil
}
// Check the common default field mappings
if mappings, found := fieldMappingsNetflowCommon[elementID]; found {
var fields []telegraf.Field
for _, m := range mappings {
fields = append(fields, telegraf.Field{
Key: m.name,
Value: m.decoder(raw),
})
v, err := m.decoder(raw)
if err != nil {
return nil, err
}
return fields
fields = append(fields, telegraf.Field{Key: m.name, Value: v})
}
return fields, nil
}
// Return the raw data if no mapping was found
@ -695,10 +711,15 @@ func (d *netflowDecoder) decodeValueV9(field netflow.DataField) []telegraf.Field
if !d.logged[key] {
d.Log.Debugf("unknown Netflow v9 data field %v", field)
}
return []telegraf.Field{{Key: key, Value: decodeHex(raw)}}
v, err := decodeHex(raw)
if err != nil {
return nil, err
}
return []telegraf.Field{{Key: key, Value: v}}, nil
}
func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) []telegraf.Field {
func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) ([]telegraf.Field, error) {
raw := field.Value.([]byte)
// Checking for reverse elements according to RFC5103
@ -714,42 +735,56 @@ func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) []telegraf.Fi
key := fmt.Sprintf("%d.%d", field.Pen, elementID)
if m, found := d.mappingsPEN[key]; found {
name := prefix + m.name
return []telegraf.Field{{Key: name, Value: m.decoder(raw)}}
v, err := m.decoder(raw)
if err != nil {
return nil, err
}
return []telegraf.Field{{Key: name, Value: v}}, nil
}
if !d.logged[key] {
d.Log.Debugf("unknown IPFIX PEN data field %v", field)
}
name := fmt.Sprintf("type_%d_%s%d", field.Pen, prefix, elementID)
return []telegraf.Field{{Key: name, Value: decodeHex(raw)}}
v, err := decodeHex(raw)
if err != nil {
return nil, err
}
return []telegraf.Field{{Key: name, Value: v}}, nil
}
// Check the user-specified mapping
if m, found := d.mappingsIPFIX[elementID]; found {
return []telegraf.Field{{Key: prefix + m.name, Value: m.decoder(raw)}}
v, err := m.decoder(raw)
if err != nil {
return nil, err
}
return []telegraf.Field{{Key: prefix + m.name, Value: v}}, nil
}
// Check the version specific default field mappings
if mappings, found := fieldMappingsIPFIX[elementID]; found {
var fields []telegraf.Field
for _, m := range mappings {
fields = append(fields, telegraf.Field{
Key: prefix + m.name,
Value: m.decoder(raw),
})
v, err := m.decoder(raw)
if err != nil {
return nil, err
}
return fields
fields = append(fields, telegraf.Field{Key: prefix + m.name, Value: v})
}
return fields, nil
}
// Check the common default field mappings
if mappings, found := fieldMappingsNetflowCommon[elementID]; found {
var fields []telegraf.Field
for _, m := range mappings {
fields = append(fields, telegraf.Field{
Key: prefix + m.name,
Value: m.decoder(raw),
})
v, err := m.decoder(raw)
if err != nil {
return nil, err
}
return fields
fields = append(fields, telegraf.Field{Key: prefix + m.name, Value: v})
}
return fields, nil
}
// Return the raw data if no mapping was found
@ -757,5 +792,9 @@ func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) []telegraf.Fi
if !d.logged[key] {
d.Log.Debugf("unknown IPFIX data field %v", field)
}
return []telegraf.Field{{Key: key, Value: decodeHex(raw)}}
v, err := decodeHex(raw)
if err != nil {
return nil, err
}
return []telegraf.Field{{Key: key, Value: v}}, nil
}

View File

@ -50,11 +50,7 @@ func (d *netflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metr
"sys_uptime": msg.SysUptime,
"seq_number": msg.FlowSequence,
"engine_type": mapEngineType(msg.EngineType),
"engine_id": decodeHex([]byte{msg.EngineId}),
"sampling_interval": msg.SamplingInterval,
"src": decodeIPFromUint32(record.SrcAddr),
"dst": decodeIPFromUint32(record.DstAddr),
"next_hop": decodeIPFromUint32(record.NextHop),
"in_snmp": record.Input,
"out_snmp": record.Output,
"in_packets": record.DPkts,
@ -65,12 +61,34 @@ func (d *netflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metr
"dst_port": record.DstPort,
"tcp_flags": mapTCPFlags(record.TCPFlags),
"protocol": mapL4Proto(record.Proto),
"src_tos": decodeHex([]byte{record.Tos}),
"bgp_src_as": record.SrcAS,
"bgp_dst_as": record.DstAS,
"src_mask": record.SrcMask,
"dst_mask": record.DstMask,
}
var err error
fields["engine_id"], err = decodeHex([]byte{msg.EngineId})
if err != nil {
return nil, fmt.Errorf("decoding 'engine_id' failed: %w", err)
}
fields["src"], err = decodeIPFromUint32(record.SrcAddr)
if err != nil {
return nil, fmt.Errorf("decoding 'src' failed: %w", err)
}
fields["dst"], err = decodeIPFromUint32(record.DstAddr)
if err != nil {
return nil, fmt.Errorf("decoding 'dst' failed: %w", err)
}
fields["next_hop"], err = decodeIPFromUint32(record.NextHop)
if err != nil {
return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err)
}
fields["src_tos"], err = decodeHex([]byte{record.Tos})
if err != nil {
return nil, fmt.Errorf("decoding 'src_tos' failed: %w", err)
}
metrics = append(metrics, metric.New("netflow", tags, fields, t))
}

View File

@ -62,7 +62,6 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
fields := map[string]interface{}{
"ip_version": decodeSflowIPVersion(msg.IPVersion),
"sys_uptime": msg.Uptime,
"agent_ip": decodeIP(msg.AgentIP),
"agent_subid": msg.SubAgentId,
"seq_number": sample.Header.SampleSequenceNumber,
"sampling_interval": sample.SamplingRate,
@ -70,6 +69,12 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
"sampling_drops": sample.Drops,
"in_snmp": sample.Input,
}
fields["agent_ip"], err = decodeIP(msg.AgentIP)
if err != nil {
return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err)
}
if sample.Output>>31 == 0 {
fields["out_snmp"] = sample.Output & 0x7fffffff
}
@ -95,7 +100,6 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
fields := map[string]interface{}{
"ip_version": decodeSflowIPVersion(msg.IPVersion),
"sys_uptime": msg.Uptime,
"agent_ip": decodeIP(msg.AgentIP),
"agent_subid": msg.SubAgentId,
"seq_number": sample.Header.SampleSequenceNumber,
"sampling_interval": sample.SamplingRate,
@ -103,6 +107,11 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
"sampling_drops": sample.Drops,
"in_snmp": sample.InputIfValue,
}
fields["agent_ip"], err = decodeIP(msg.AgentIP)
if err != nil {
return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err)
}
if sample.OutputIfFormat == 0 {
fields["out_snmp"] = sample.OutputIfValue
}
@ -128,10 +137,14 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
fields := map[string]interface{}{
"ip_version": decodeSflowIPVersion(msg.IPVersion),
"sys_uptime": msg.Uptime,
"agent_ip": decodeIP(msg.AgentIP),
"agent_subid": msg.SubAgentId,
"seq_number": sample.Header.SampleSequenceNumber,
}
fields["agent_ip"], err = decodeIP(msg.AgentIP)
if err != nil {
return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err)
}
// Decode the source information
if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" {
fields[name] = sample.Header.SourceIdValue
@ -170,41 +183,79 @@ func (d *sflowv5Decoder) decodeFlowRecords(records []sflow.FlowRecord) (map[stri
fields[k] = v
}
case sflow.SampledEthernet:
var err error
fields["eth_total_len"] = record.Length
fields["in_src_mac"] = decodeMAC(record.SrcMac)
fields["out_dst_mac"] = decodeMAC(record.DstMac)
fields["in_src_mac"], err = decodeMAC(record.SrcMac)
if err != nil {
return nil, fmt.Errorf("decoding 'in_src_mac' failed: %w", err)
}
fields["out_dst_mac"], err = decodeMAC(record.DstMac)
if err != nil {
return nil, fmt.Errorf("decoding 'out_dst_mac' failed: %w", err)
}
fields["datalink_frame_type"] = layers.EthernetType(record.EthType & 0x0000ffff).String()
case sflow.SampledIPv4:
var err error
fields["ipv4_total_len"] = record.Base.Length
fields["protocol"] = mapL4Proto(uint8(record.Base.Protocol & 0x000000ff))
fields["src"] = decodeIP(record.Base.SrcIP)
fields["dst"] = decodeIP(record.Base.DstIP)
fields["src"], err = decodeIP(record.Base.SrcIP)
if err != nil {
return nil, fmt.Errorf("decoding 'src' failed: %w", err)
}
fields["dst"], err = decodeIP(record.Base.DstIP)
if err != nil {
return nil, fmt.Errorf("decoding 'dst' failed: %w", err)
}
fields["src_port"] = record.Base.SrcPort
fields["dst_port"] = record.Base.DstPort
fields["src_tos"] = record.Tos
fields["tcp_flags"] = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)})
fields["tcp_flags"], err = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)})
if err != nil {
return nil, fmt.Errorf("decoding 'tcp_flags' failed: %w", err)
}
case sflow.SampledIPv6:
var err error
fields["ipv6_total_len"] = record.Base.Length
fields["protocol"] = mapL4Proto(uint8(record.Base.Protocol & 0x000000ff))
fields["src"] = decodeIP(record.Base.SrcIP)
fields["dst"] = decodeIP(record.Base.DstIP)
fields["src"], err = decodeIP(record.Base.SrcIP)
if err != nil {
return nil, fmt.Errorf("decoding 'src' failed: %w", err)
}
fields["dst"], err = decodeIP(record.Base.DstIP)
if err != nil {
return nil, fmt.Errorf("decoding 'dst' failed: %w", err)
}
fields["src_port"] = record.Base.SrcPort
fields["dst_port"] = record.Base.DstPort
fields["tcp_flags"] = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)})
fields["tcp_flags"], err = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)})
if err != nil {
return nil, fmt.Errorf("decoding 'tcp_flags' failed: %w", err)
}
case sflow.ExtendedSwitch:
fields["vlan_src"] = record.SrcVlan
fields["vlan_src_priority"] = record.SrcPriority
fields["vlan_dst"] = record.DstVlan
fields["vlan_dst_priority"] = record.DstPriority
case sflow.ExtendedRouter:
fields["next_hop"] = decodeIP(record.NextHop)
var err error
fields["next_hop"], err = decodeIP(record.NextHop)
if err != nil {
return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err)
}
fields["src_mask"] = record.SrcMaskLen
fields["dst_mask"] = record.DstMaskLen
case sflow.ExtendedGateway:
fields["next_hop"] = decodeIP(record.NextHop)
var err error
fields["next_hop"], err = decodeIP(record.NextHop)
if err != nil {
return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err)
}
fields["bgp_src_as"] = record.SrcAS
fields["bgp_dst_as"] = record.ASDestinations
fields["bgp_next_hop"] = decodeIP(record.NextHop)
fields["bgp_next_hop"], err = decodeIP(record.NextHop)
if err != nil {
return nil, fmt.Errorf("decoding 'bgp_next_hop' failed: %w", err)
}
fields["bgp_prev_as"] = record.SrcPeerAS
if len(record.ASPath) > 0 {
fields["bgp_next_as"] = record.ASPath[0]

View File

@ -0,0 +1,2 @@
netflow,source=127.0.0.1,version=IPFIX bgp_dst_as=2152u,bgp_src_as=65535u,direction="ingress",dst="101.104.199.17",dst_port=50202u,flow_end=0u,flow_start=0u,flows=1u,in_bytes=256u,in_packets=1u,in_snmp=101u,out_snmp=201u,protocol="tcp",src="0.0.0.0",src_port=443u 1701440347872006922
netflow,source=127.0.0.1,version=IPFIX bgp_dst_as=2152u,bgp_src_as=65535u,direction="ingress",dst="2001:db8:0:1::1",dst_port=50202u,flow_end=0u,flow_start=0u,flows=1u,in_bytes=256u,in_packets=1u,in_snmp=101u,out_snmp=201u,protocol="tcp",src="::6568:c711:0:0:6568:c711",src_port=443u 1701440347872025552

View File

@ -0,0 +1,2 @@
[[inputs.netflow]]
service_address = "udp://127.0.0.1:0"

View File

@ -83,69 +83,86 @@ func initIPv4OptionMapping() error {
return nil
}
func decodeInt32(b []byte) interface{} {
return int64(int32(binary.BigEndian.Uint32(b)))
}
func decodeUint(b []byte) interface{} {
func decodeInt(b []byte) (interface{}, error) {
switch len(b) {
case 0:
return int64(0), nil
case 1:
return uint64(b[0])
return int64(int8(b[0])), nil
case 2:
return uint64(binary.BigEndian.Uint16(b))
return int64(int16(binary.BigEndian.Uint16(b))), nil
case 4:
return uint64(binary.BigEndian.Uint32(b))
return int64(int32(binary.BigEndian.Uint32(b))), nil
case 8:
return binary.BigEndian.Uint64(b)
return int64(binary.BigEndian.Uint64(b)), nil
}
panic(fmt.Errorf("invalid length for uint buffer %v", b))
return nil, fmt.Errorf("invalid length for int buffer %v", b)
}
func decodeFloat64(b []byte) interface{} {
func decodeUint(b []byte) (interface{}, error) {
switch len(b) {
case 0:
return uint64(0), nil
case 1:
return uint64(b[0]), nil
case 2:
return uint64(binary.BigEndian.Uint16(b)), nil
case 4:
return uint64(binary.BigEndian.Uint32(b)), nil
case 8:
return binary.BigEndian.Uint64(b), nil
}
return nil, fmt.Errorf("invalid length for uint buffer %v", b)
}
func decodeFloat64(b []byte) (interface{}, error) {
raw := binary.BigEndian.Uint64(b)
return math.Float64frombits(raw)
return math.Float64frombits(raw), nil
}
// According to https://www.rfc-editor.org/rfc/rfc5101#section-6.1.5
func decodeBool(b []byte) interface{} {
func decodeBool(b []byte) (interface{}, error) {
if len(b) == 0 {
return nil, errors.New("empty data")
}
if b[0] == 1 {
return true
return true, nil
}
if b[0] == 2 {
return false
return false, nil
}
return b[0]
return b[0], nil
}
func decodeHex(b []byte) interface{} {
func decodeHex(b []byte) (interface{}, error) {
if len(b) == 0 {
return ""
return "", nil
}
return "0x" + hex.EncodeToString(b)
return "0x" + hex.EncodeToString(b), nil
}
func decodeString(b []byte) interface{} {
return string(b)
func decodeString(b []byte) (interface{}, error) {
return string(b), nil
}
func decodeMAC(b []byte) interface{} {
func decodeMAC(b []byte) (interface{}, error) {
mac := net.HardwareAddr(b)
return mac.String()
return mac.String(), nil
}
func decodeIP(b []byte) interface{} {
func decodeIP(b []byte) (interface{}, error) {
ip := net.IP(b)
return ip.String()
return ip.String(), nil
}
func decodeIPFromUint32(a uint32) interface{} {
func decodeIPFromUint32(a uint32) (interface{}, error) {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, a)
return decodeIP(b)
}
func decodeL4Proto(b []byte) interface{} {
return mapL4Proto(b[0])
func decodeL4Proto(b []byte) (interface{}, error) {
return mapL4Proto(b[0]), nil
}
func mapL4Proto(id uint8) string {
@ -156,7 +173,7 @@ func mapL4Proto(id uint8) string {
return strconv.FormatUint(uint64(id), 10)
}
func decodeIPv4Options(b []byte) interface{} {
func decodeIPv4Options(b []byte) (interface{}, error) {
flags := binary.BigEndian.Uint32(b)
var result []string
@ -170,16 +187,16 @@ func decodeIPv4Options(b []byte) interface{} {
}
}
return strings.Join(result, ",")
return strings.Join(result, ","), nil
}
func decodeTCPFlags(b []byte) interface{} {
if len(b) < 1 {
return ""
func decodeTCPFlags(b []byte) (interface{}, error) {
if len(b) == 0 {
return "", nil
}
if len(b) == 1 {
return mapTCPFlags(b[0])
return mapTCPFlags(b[0]), nil
}
// IPFIX has more flags
@ -192,7 +209,7 @@ func decodeTCPFlags(b []byte) interface{} {
results = append(results, ".")
}
}
return strings.Join(results, "") + mapTCPFlags(b[1])
return strings.Join(results, "") + mapTCPFlags(b[1]), nil
}
func mapTCPFlags(flags uint8) string {
@ -220,7 +237,7 @@ func mapTCPFlags(flags uint8) string {
return strings.Join(result, "")
}
func decodeFragmentFlags(b []byte) interface{} {
func decodeFragmentFlags(b []byte) (interface{}, error) {
flagMapping := []string{
"*", // do not care
"*", // do not care
@ -242,21 +259,21 @@ func decodeFragmentFlags(b []byte) interface{} {
}
}
return strings.Join(result, "")
return strings.Join(result, ""), nil
}
func decodeSampleAlgo(b []byte) interface{} {
func decodeSampleAlgo(b []byte) (interface{}, error) {
switch b[0] {
case 1:
return "deterministic"
return "deterministic", nil
case 2:
return "random"
return "random", nil
}
return strconv.FormatUint(uint64(b[0]), 10)
return strconv.FormatUint(uint64(b[0]), 10), nil
}
func decodeEngineType(b []byte) interface{} {
return mapEngineType(b[0])
func decodeEngineType(b []byte) (interface{}, error) {
return mapEngineType(b[0]), nil
}
func mapEngineType(b uint8) string {
@ -271,211 +288,211 @@ func mapEngineType(b uint8) string {
return strconv.FormatUint(uint64(b), 10)
}
func decodeMPLSType(b []byte) interface{} {
func decodeMPLSType(b []byte) (interface{}, error) {
switch b[0] {
case 0:
return "unknown"
return "unknown", nil
case 1:
return "TE-MIDPT"
return "TE-MIDPT", nil
case 2:
return "Pseudowire"
return "Pseudowire", nil
case 3:
return "VPN"
return "VPN", nil
case 4:
return "BGP"
return "BGP", nil
case 5:
return "LDP"
return "LDP", nil
case 6:
return "Path computation element"
return "Path computation element", nil
case 7:
return "OSPFv2"
return "OSPFv2", nil
case 8:
return "OSPFv3"
return "OSPFv3", nil
case 9:
return "IS-IS"
return "IS-IS", nil
case 10:
return "BGP segment routing Prefix-SID"
return "BGP segment routing Prefix-SID", nil
}
return strconv.FormatUint(uint64(b[0]), 10)
return strconv.FormatUint(uint64(b[0]), 10), nil
}
func decodeIPVersion(b []byte) interface{} {
func decodeIPVersion(b []byte) (interface{}, error) {
switch b[0] {
case 4:
return "IPv4"
return "IPv4", nil
case 6:
return "IPv6"
return "IPv6", nil
}
return strconv.FormatUint(uint64(b[0]), 10)
return strconv.FormatUint(uint64(b[0]), 10), nil
}
func decodeDirection(b []byte) interface{} {
func decodeDirection(b []byte) (interface{}, error) {
switch b[0] {
case 0:
return "ingress"
return "ingress", nil
case 1:
return "egress"
return "egress", nil
}
return strconv.FormatUint(uint64(b[0]), 10)
return strconv.FormatUint(uint64(b[0]), 10), nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-forwarding-status
func decodeFwdStatus(b []byte) interface{} {
func decodeFwdStatus(b []byte) (interface{}, error) {
switch b[0] >> 6 {
case 0:
return "unknown"
return "unknown", nil
case 1:
return "forwarded"
return "forwarded", nil
case 2:
return "dropped"
return "dropped", nil
case 3:
return "consumed"
return "consumed", nil
}
return "invalid"
return "invalid", nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-forwarding-status
func decodeFwdReason(b []byte) interface{} {
func decodeFwdReason(b []byte) (interface{}, error) {
switch b[0] {
// unknown
case 0:
return "unknown"
return "unknown", nil
// forwarded
case 64:
return "unknown"
return "unknown", nil
case 65:
return "fragmented"
return "fragmented", nil
case 66:
return "not fragmented"
return "not fragmented", nil
// dropped
case 128:
return "unknown"
return "unknown", nil
case 129:
return "ACL deny"
return "ACL deny", nil
case 130:
return "ACL drop"
return "ACL drop", nil
case 131:
return "unroutable"
return "unroutable", nil
case 132:
return "adjacency"
return "adjacency", nil
case 133:
return "fragmentation and DF set"
return "fragmentation and DF set", nil
case 134:
return "bad header checksum"
return "bad header checksum", nil
case 135:
return "bad total length"
return "bad total length", nil
case 136:
return "bad header length"
return "bad header length", nil
case 137:
return "bad TTL"
return "bad TTL", nil
case 138:
return "policer"
return "policer", nil
case 139:
return "WRED"
return "WRED", nil
case 140:
return "RPF"
return "RPF", nil
case 141:
return "for us"
return "for us", nil
case 142:
return "bad output interface"
return "bad output interface", nil
case 143:
return "hardware"
return "hardware", nil
// consumed
case 192:
return "unknown"
return "unknown", nil
case 193:
return "terminate punt adjacency"
return "terminate punt adjacency", nil
case 194:
return "terminate incomplete adjacency"
return "terminate incomplete adjacency", nil
case 195:
return "terminate for us"
return "terminate for us", nil
case 14:
return ""
return "", nil
}
return "invalid"
return "invalid", nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-firewall-event
func decodeFWEvent(b []byte) interface{} {
func decodeFWEvent(b []byte) (interface{}, error) {
switch b[0] {
case 0:
return "ignore"
return "ignore", nil
case 1:
return "flow created"
return "flow created", nil
case 2:
return "flow deleted"
return "flow deleted", nil
case 3:
return "flow denied"
return "flow denied", nil
case 4:
return "flow alert"
return "flow alert", nil
case 5:
return "flow update"
return "flow update", nil
}
return strconv.FormatUint(uint64(b[0]), 10)
return strconv.FormatUint(uint64(b[0]), 10), nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-flow-end-reason
func decodeFlowEndReason(b []byte) interface{} {
func decodeFlowEndReason(b []byte) (interface{}, error) {
switch b[0] {
case 0:
return "reserved"
return "reserved", nil
case 1:
return "idle timeout"
return "idle timeout", nil
case 2:
return "active timeout"
return "active timeout", nil
case 3:
return "end of flow"
return "end of flow", nil
case 4:
return "forced end"
return "forced end", nil
case 5:
return "lack of resources"
return "lack of resources", nil
}
return "unassigned"
return "unassigned", nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-biflow-direction
func decodeBiflowDirection(b []byte) interface{} {
func decodeBiflowDirection(b []byte) (interface{}, error) {
switch b[0] {
case 0:
return "arbitrary"
return "arbitrary", nil
case 1:
return "initiator"
return "initiator", nil
case 2:
return "reverse initiator"
return "reverse initiator", nil
case 3:
return "perimeter"
return "perimeter", nil
}
return "unassigned"
return "unassigned", nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-observation-point-type
func decodeOpsPointType(b []byte) interface{} {
func decodeOpsPointType(b []byte) (interface{}, error) {
switch b[0] {
case 0:
return "invalid"
return "invalid", nil
case 1:
return "physical port"
return "physical port", nil
case 2:
return "port channel"
return "port channel", nil
case 3:
return "vlan"
return "vlan", nil
}
return "unassigned"
return "unassigned", nil
}
func decodeAnonStabilityClass(b []byte) interface{} {
func decodeAnonStabilityClass(b []byte) (interface{}, error) {
switch b[1] & 0x03 {
case 1:
return "session"
return "session", nil
case 2:
return "exporter-collector"
return "exporter-collector", nil
case 3:
return "stable"
return "stable", nil
}
return "undefined"
return "undefined", nil
}
func decodeAnonFlags(b []byte) interface{} {
func decodeAnonFlags(b []byte) (interface{}, error) {
var result []string
if b[0]&(1<<2) != 0 {
result = append(result, "PmA")
@ -485,156 +502,156 @@ func decodeAnonFlags(b []byte) interface{} {
result = append(result, "LOR")
}
return strings.Join(result, ",")
return strings.Join(result, ","), nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-anonymization-technique
func decodeAnonTechnique(b []byte) interface{} {
func decodeAnonTechnique(b []byte) (interface{}, error) {
tech := binary.BigEndian.Uint16(b)
switch tech {
case 0:
return "undefined"
return "undefined", nil
case 1:
return "none"
return "none", nil
case 2:
return "precision degradation"
return "precision degradation", nil
case 3:
return "binning"
return "binning", nil
case 4:
return "enumeration"
return "enumeration", nil
case 5:
return "permutation"
return "permutation", nil
case 6:
return "structure permutation"
return "structure permutation", nil
case 7:
return "reverse truncation"
return "reverse truncation", nil
case 8:
return "noise"
return "noise", nil
case 9:
return "offset"
return "offset", nil
}
return "unassigned"
return "unassigned", nil
}
func decodeTechnology(b []byte) interface{} {
func decodeTechnology(b []byte) (interface{}, error) {
switch string(b) {
case "yes", "y", "1":
return "yes"
return "yes", nil
case "no", "n", "2":
return "no"
return "no", nil
case "unassigned", "u", "0":
return "unassigned"
return "unassigned", nil
}
switch b[0] {
case 0:
return "unassigned"
return "unassigned", nil
case 1:
return "yes"
return "yes", nil
case 2:
return "no"
return "no", nil
}
return "undefined"
return "undefined", nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-nat-type
func decodeIPNatType(b []byte) interface{} {
func decodeIPNatType(b []byte) (interface{}, error) {
tech := binary.BigEndian.Uint16(b)
switch tech {
case 0:
return "unknown"
return "unknown", nil
case 1:
return "NAT44"
return "NAT44", nil
case 2:
return "NAT64"
return "NAT64", nil
case 3:
return "NAT46"
return "NAT46", nil
case 4:
return "IPv4 no NAT"
return "IPv4 no NAT", nil
case 5:
return "NAT66"
return "NAT66", nil
case 6:
return "IPv6 no NAT"
return "IPv6 no NAT", nil
}
return "unassigned"
return "unassigned", nil
}
// https://www.iana.org/assignments/psamp-parameters/psamp-parameters.xhtml
func decodeSelectorAlgorithm(b []byte) interface{} {
func decodeSelectorAlgorithm(b []byte) (interface{}, error) {
tech := binary.BigEndian.Uint16(b)
switch tech {
case 0:
return "reserved"
return "reserved", nil
case 1:
return "systematic count-based sampling"
return "systematic count-based sampling", nil
case 2:
return "systematic time-based sampling"
return "systematic time-based sampling", nil
case 3:
return "random n-out-of-N sampling"
return "random n-out-of-N sampling", nil
case 4:
return "uniform probabilistic sampling"
return "uniform probabilistic sampling", nil
case 5:
return "property match filtering"
return "property match filtering", nil
case 6:
return "hash based filtering using BOB"
return "hash based filtering using BOB", nil
case 7:
return "hash based filtering using IPSX"
return "hash based filtering using IPSX", nil
case 8:
return "hash based filtering using CRC"
return "hash based filtering using CRC", nil
case 9:
return "flow-state dependent"
return "flow-state dependent", nil
}
return "unassigned"
return "unassigned", nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-value-distribution-method
func decodeValueDistMethod(b []byte) interface{} {
func decodeValueDistMethod(b []byte) (interface{}, error) {
switch b[0] {
case 0:
return "unspecified"
return "unspecified", nil
case 1:
return "start interval"
return "start interval", nil
case 2:
return "end interval"
return "end interval", nil
case 3:
return "mid interval"
return "mid interval", nil
case 4:
return "simple uniform distribution"
return "simple uniform distribution", nil
case 5:
return "proportional uniform distribution"
return "proportional uniform distribution", nil
case 6:
return "simulated process"
return "simulated process", nil
case 7:
return "direct"
return "direct", nil
}
return "unassigned"
return "unassigned", nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-data-link-frame-type
func decodeDataLinkFrameType(b []byte) interface{} {
func decodeDataLinkFrameType(b []byte) (interface{}, error) {
switch binary.BigEndian.Uint16(b) {
case 0x0001:
return "IEEE802.3 ethernet"
return "IEEE802.3 ethernet", nil
case 0x0002:
return "IEEE802.11 MAC"
return "IEEE802.11 MAC", nil
}
return "unassigned"
return "unassigned", nil
}
// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-mib-capture-time-semantics
func decodeCaptureTimeSemantics(b []byte) interface{} {
func decodeCaptureTimeSemantics(b []byte) (interface{}, error) {
switch b[0] {
case 0:
return "undefined"
return "undefined", nil
case 1:
return "begin"
return "begin", nil
case 2:
return "end"
return "end", nil
case 3:
return "export"
return "export", nil
case 4:
return "average"
return "average", nil
}
return "unassigned"
return "unassigned", nil
}
func decodeSflowIPVersion(v uint32) string {
@ -692,3 +709,7 @@ func decodeSflowHeaderProtocol(t uint32) string {
}
return "unassigned"
}
func decodeByteFunc(idx int) decoderFunc {
return func(b []byte) (interface{}, error) { return b[idx], nil }
}

View File

@ -9,7 +9,9 @@ import (
func TestDecodeInt32(t *testing.T) {
buf := []byte{0x82, 0xad, 0x80, 0x86}
out, ok := decodeInt32(buf).(int64)
v, err := decodeInt(buf)
require.NoError(t, err)
out, ok := v.(int64)
require.True(t, ok)
require.Equal(t, int64(-2102558586), out)
}
@ -44,7 +46,9 @@ func TestDecodeUint(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out, ok := decodeUint(tt.in).(uint64)
v, err := decodeUint(tt.in)
require.NoError(t, err)
out, ok := v.(uint64)
require.True(t, ok)
require.Equal(t, tt.expected, out)
})
@ -52,12 +56,15 @@ func TestDecodeUint(t *testing.T) {
}
func TestDecodeUintInvalid(t *testing.T) {
require.Panics(t, func() { decodeUint([]byte{0x00, 0x00, 0x00}) })
_, err := decodeUint([]byte{0x00, 0x00, 0x00})
require.ErrorContains(t, err, "invalid length")
}
func TestDecodeFloat64(t *testing.T) {
buf := []byte{0x40, 0x09, 0x21, 0xfb, 0x54, 0x44, 0x2e, 0xea}
out, ok := decodeFloat64(buf).(float64)
v, err := decodeFloat64(buf)
require.NoError(t, err)
out, ok := v.(float64)
require.True(t, ok)
require.Equal(t, float64(3.14159265359), out)
}
@ -92,7 +99,8 @@ func TestDecodeBool(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out := decodeBool(tt.in)
out, err := decodeBool(tt.in)
require.NoError(t, err)
require.Equal(t, tt.expected, out)
})
}
@ -100,21 +108,27 @@ func TestDecodeBool(t *testing.T) {
func TestDecodeHex(t *testing.T) {
buf := []byte{0x40, 0x09, 0x21, 0xfb, 0x54, 0x44, 0x2e, 0xea}
out, ok := decodeHex(buf).(string)
v, err := decodeHex(buf)
require.NoError(t, err)
out, ok := v.(string)
require.True(t, ok)
require.Equal(t, "0x400921fb54442eea", out)
}
func TestDecodeString(t *testing.T) {
buf := []byte{0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x74, 0x65, 0x6c, 0x65, 0x67, 0x72, 0x61, 0x66}
out, ok := decodeString(buf).(string)
v, err := decodeString(buf)
require.NoError(t, err)
out, ok := v.(string)
require.True(t, ok)
require.Equal(t, "hello telegraf", out)
}
func TestDecodeMAC(t *testing.T) {
buf := []byte{0x2c, 0xf0, 0x5d, 0xe9, 0x04, 0x42}
out, ok := decodeMAC(buf).(string)
v, err := decodeMAC(buf)
require.NoError(t, err)
out, ok := v.(string)
require.True(t, ok)
require.Equal(t, "2c:f0:5d:e9:04:42", out)
}
@ -164,7 +178,9 @@ func TestDecodeIP(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out, ok := decodeIP(tt.in).(string)
v, err := decodeIP(tt.in)
require.NoError(t, err)
out, ok := v.(string)
require.True(t, ok)
require.Equal(t, tt.expected, out)
})
@ -173,7 +189,9 @@ func TestDecodeIP(t *testing.T) {
func TestDecodeIPFromUint32(t *testing.T) {
in := uint32(0x7f000001)
out, ok := decodeIPFromUint32(in).(string)
v, err := decodeIPFromUint32(in)
require.NoError(t, err)
out, ok := v.(string)
require.True(t, ok)
require.Equal(t, "127.0.0.1", out)
}
@ -230,7 +248,9 @@ func TestDecodeLayer4ProtocolNumber(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out, ok := decodeL4Proto(tt.in).(string)
v, err := decodeL4Proto(tt.in)
require.NoError(t, err)
out, ok := v.(string)
require.True(t, ok)
require.Equal(t, tt.expected, out)
})
@ -283,7 +303,9 @@ func TestDecodeIPv4Options(t *testing.T) {
in := make([]byte, 4)
binary.BigEndian.PutUint32(in, options)
out, ok := decodeIPv4Options(in).(string)
v, err := decodeIPv4Options(in)
require.NoError(t, err)
out, ok := v.(string)
require.True(t, ok)
require.Equal(t, tt.expected, out)
})
@ -382,7 +404,9 @@ func TestDecodeTCPFlags(t *testing.T) {
}
in = []byte{options}
}
out, ok := decodeTCPFlags(in).(string)
v, err := decodeTCPFlags(in)
require.NoError(t, err)
out, ok := v.(string)
require.True(t, ok)
require.Equal(t, tt.expected, out)
})
@ -434,7 +458,9 @@ func TestDecodeFragmentFlags(t *testing.T) {
flags |= 1 << bit
}
in := []byte{flags}
out, ok := decodeFragmentFlags(in).(string)
v, err := decodeFragmentFlags(in)
require.NoError(t, err)
out, ok := v.(string)
require.True(t, ok)
require.Equal(t, tt.expected, out)
})