diff --git a/plugins/inputs/netflow/netflow_decoder.go b/plugins/inputs/netflow/netflow_decoder.go index aadfb96a2..391b46524 100644 --- a/plugins/inputs/netflow/netflow_decoder.go +++ b/plugins/inputs/netflow/netflow_decoder.go @@ -17,7 +17,7 @@ import ( var regexpIPFIXPENMapping = regexp.MustCompile(`\d+\.\d+`) -type decoderFunc func([]byte) interface{} +type decoderFunc func([]byte) (interface{}, error) type fieldMapping struct { name string @@ -62,8 +62,8 @@ var fieldMappingsNetflowCommon = map[uint16][]fieldMapping{ 30: {{"dst_mask", decodeUint}}, // IPV6_DST_MASK / destinationIPv6PrefixLength 31: {{"flow_label", decodeHex}}, // IPV6_FLOW_LABEL / flowLabelIPv6 32: { - {"icmp_type", func(b []byte) interface{} { return b[0] }}, // ICMP_TYPE / icmpTypeCodeIPv4 - {"icmp_code", func(b []byte) interface{} { return b[1] }}, + {"icmp_type", decodeByteFunc(0)}, // ICMP_TYPE / icmpTypeCodeIPv4 + {"icmp_code", decodeByteFunc(1)}, }, 33: {{"igmp_type", decodeUint}}, // MUL_IGMP_TYPE / igmpType 34: {{"sampling_interval", decodeUint}}, // SAMPLING_INTERVAL / samplingInterval (deprecated) @@ -187,8 +187,8 @@ var fieldMappingsIPFIX = map[uint16][]fieldMapping{ 137: {{"common_properties_id", decodeUint}}, // commonPropertiesId 138: {{"observation_point_id", decodeUint}}, // observationPointId 139: { - {"icmp_type", func(b []byte) interface{} { return b[0] }}, // icmpTypeCodeIPv6 - {"icmp_code", func(b []byte) interface{} { return b[1] }}, + {"icmp_type", decodeByteFunc(0)}, // icmpTypeCodeIPv6 + {"icmp_code", decodeByteFunc(1)}, }, 140: {{"mpls_top_label_ip", decodeIP}}, // mplsTopLabelIPv6Address 141: {{"linecard_id", decodeUint}}, // lineCardId @@ -468,7 +468,7 @@ var fieldMappingsIPFIX = map[uint16][]fieldMapping{ 431: {{"layer2_frames_total", decodeUint}}, // layer2FrameTotalCount 432: {{"pseudo_wire_dst", decodeIP}}, // pseudoWireDestinationIPv4Address 433: {{"ignored_layer2_frames_total", decodeUint}}, // ignoredLayer2FrameTotalCount - 434: {{"mib_obj_value_int", decodeInt32}}, // mibObjectValueInteger + 434: {{"mib_obj_value_int", decodeInt}}, // mibObjectValueInteger 435: {{"mib_obj_value_str", decodeString}}, // mibObjectValueOctetString 436: {{"mib_obj_value_oid", decodeHex}}, // mibObjectValueOID 437: {{"mib_obj_value_bits", decodeHex}}, // mibObjectValueBits @@ -584,7 +584,12 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric } fields := make(map[string]interface{}) for _, value := range record.Values { - for _, field := range d.decodeValueV9(value) { + decodedFields, err := d.decodeValueV9(value) + if err != nil { + d.Log.Errorf("decoding record %+v failed: %v", record, err) + continue + } + for _, field := range decodedFields { fields[field.Key] = field.Value } } @@ -607,7 +612,12 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric fields := make(map[string]interface{}) t := time.Now() for _, value := range record.Values { - for _, field := range d.decodeValueIPFIX(value) { + decodedFields, err := d.decodeValueIPFIX(value) + if err != nil { + d.Log.Errorf("decoding value %+v failed: %v", value, err) + continue + } + for _, field := range decodedFields { fields[field.Key] = field.Value } } @@ -657,37 +667,43 @@ func (d *netflowDecoder) Init() error { return nil } -func (d *netflowDecoder) decodeValueV9(field netflow.DataField) []telegraf.Field { +func (d *netflowDecoder) decodeValueV9(field netflow.DataField) ([]telegraf.Field, error) { raw := field.Value.([]byte) elementID := field.Type // Check the user-specified mapping if m, found := d.mappingsV9[elementID]; found { - return []telegraf.Field{{Key: m.name, Value: m.decoder(raw)}} + v, err := m.decoder(raw) + if err != nil { + return nil, err + } + return []telegraf.Field{{Key: m.name, Value: v}}, nil } // Check the version specific default field mappings if mappings, found := fieldMappingsNetflowV9[elementID]; found { var fields []telegraf.Field for _, m := range mappings { - fields = append(fields, telegraf.Field{ - Key: m.name, - Value: m.decoder(raw), - }) + v, err := m.decoder(raw) + if err != nil { + return nil, err + } + fields = append(fields, telegraf.Field{Key: m.name, Value: v}) } - return fields + return fields, nil } // Check the common default field mappings if mappings, found := fieldMappingsNetflowCommon[elementID]; found { var fields []telegraf.Field for _, m := range mappings { - fields = append(fields, telegraf.Field{ - Key: m.name, - Value: m.decoder(raw), - }) + v, err := m.decoder(raw) + if err != nil { + return nil, err + } + fields = append(fields, telegraf.Field{Key: m.name, Value: v}) } - return fields + return fields, nil } // Return the raw data if no mapping was found @@ -695,10 +711,15 @@ func (d *netflowDecoder) decodeValueV9(field netflow.DataField) []telegraf.Field if !d.logged[key] { d.Log.Debugf("unknown Netflow v9 data field %v", field) } - return []telegraf.Field{{Key: key, Value: decodeHex(raw)}} + v, err := decodeHex(raw) + if err != nil { + return nil, err + } + + return []telegraf.Field{{Key: key, Value: v}}, nil } -func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) []telegraf.Field { +func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) ([]telegraf.Field, error) { raw := field.Value.([]byte) // Checking for reverse elements according to RFC5103 @@ -714,42 +735,56 @@ func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) []telegraf.Fi key := fmt.Sprintf("%d.%d", field.Pen, elementID) if m, found := d.mappingsPEN[key]; found { name := prefix + m.name - return []telegraf.Field{{Key: name, Value: m.decoder(raw)}} + v, err := m.decoder(raw) + if err != nil { + return nil, err + } + return []telegraf.Field{{Key: name, Value: v}}, nil } if !d.logged[key] { d.Log.Debugf("unknown IPFIX PEN data field %v", field) } name := fmt.Sprintf("type_%d_%s%d", field.Pen, prefix, elementID) - return []telegraf.Field{{Key: name, Value: decodeHex(raw)}} + v, err := decodeHex(raw) + if err != nil { + return nil, err + } + return []telegraf.Field{{Key: name, Value: v}}, nil } // Check the user-specified mapping if m, found := d.mappingsIPFIX[elementID]; found { - return []telegraf.Field{{Key: prefix + m.name, Value: m.decoder(raw)}} + v, err := m.decoder(raw) + if err != nil { + return nil, err + } + return []telegraf.Field{{Key: prefix + m.name, Value: v}}, nil } // Check the version specific default field mappings if mappings, found := fieldMappingsIPFIX[elementID]; found { var fields []telegraf.Field for _, m := range mappings { - fields = append(fields, telegraf.Field{ - Key: prefix + m.name, - Value: m.decoder(raw), - }) + v, err := m.decoder(raw) + if err != nil { + return nil, err + } + fields = append(fields, telegraf.Field{Key: prefix + m.name, Value: v}) } - return fields + return fields, nil } // Check the common default field mappings if mappings, found := fieldMappingsNetflowCommon[elementID]; found { var fields []telegraf.Field for _, m := range mappings { - fields = append(fields, telegraf.Field{ - Key: prefix + m.name, - Value: m.decoder(raw), - }) + v, err := m.decoder(raw) + if err != nil { + return nil, err + } + fields = append(fields, telegraf.Field{Key: prefix + m.name, Value: v}) } - return fields + return fields, nil } // Return the raw data if no mapping was found @@ -757,5 +792,9 @@ func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) []telegraf.Fi if !d.logged[key] { d.Log.Debugf("unknown IPFIX data field %v", field) } - return []telegraf.Field{{Key: key, Value: decodeHex(raw)}} + v, err := decodeHex(raw) + if err != nil { + return nil, err + } + return []telegraf.Field{{Key: key, Value: v}}, nil } diff --git a/plugins/inputs/netflow/netflow_v5.go b/plugins/inputs/netflow/netflow_v5.go index 6107f0a0a..a29bcb4ee 100644 --- a/plugins/inputs/netflow/netflow_v5.go +++ b/plugins/inputs/netflow/netflow_v5.go @@ -50,11 +50,7 @@ func (d *netflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metr "sys_uptime": msg.SysUptime, "seq_number": msg.FlowSequence, "engine_type": mapEngineType(msg.EngineType), - "engine_id": decodeHex([]byte{msg.EngineId}), "sampling_interval": msg.SamplingInterval, - "src": decodeIPFromUint32(record.SrcAddr), - "dst": decodeIPFromUint32(record.DstAddr), - "next_hop": decodeIPFromUint32(record.NextHop), "in_snmp": record.Input, "out_snmp": record.Output, "in_packets": record.DPkts, @@ -65,12 +61,34 @@ func (d *netflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metr "dst_port": record.DstPort, "tcp_flags": mapTCPFlags(record.TCPFlags), "protocol": mapL4Proto(record.Proto), - "src_tos": decodeHex([]byte{record.Tos}), "bgp_src_as": record.SrcAS, "bgp_dst_as": record.DstAS, "src_mask": record.SrcMask, "dst_mask": record.DstMask, } + + var err error + fields["engine_id"], err = decodeHex([]byte{msg.EngineId}) + if err != nil { + return nil, fmt.Errorf("decoding 'engine_id' failed: %w", err) + } + fields["src"], err = decodeIPFromUint32(record.SrcAddr) + if err != nil { + return nil, fmt.Errorf("decoding 'src' failed: %w", err) + } + fields["dst"], err = decodeIPFromUint32(record.DstAddr) + if err != nil { + return nil, fmt.Errorf("decoding 'dst' failed: %w", err) + } + fields["next_hop"], err = decodeIPFromUint32(record.NextHop) + if err != nil { + return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err) + } + fields["src_tos"], err = decodeHex([]byte{record.Tos}) + if err != nil { + return nil, fmt.Errorf("decoding 'src_tos' failed: %w", err) + } + metrics = append(metrics, metric.New("netflow", tags, fields, t)) } diff --git a/plugins/inputs/netflow/sflow_v5.go b/plugins/inputs/netflow/sflow_v5.go index 299eaba18..903abbb41 100644 --- a/plugins/inputs/netflow/sflow_v5.go +++ b/plugins/inputs/netflow/sflow_v5.go @@ -62,7 +62,6 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric fields := map[string]interface{}{ "ip_version": decodeSflowIPVersion(msg.IPVersion), "sys_uptime": msg.Uptime, - "agent_ip": decodeIP(msg.AgentIP), "agent_subid": msg.SubAgentId, "seq_number": sample.Header.SampleSequenceNumber, "sampling_interval": sample.SamplingRate, @@ -70,6 +69,12 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric "sampling_drops": sample.Drops, "in_snmp": sample.Input, } + + fields["agent_ip"], err = decodeIP(msg.AgentIP) + if err != nil { + return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err) + } + if sample.Output>>31 == 0 { fields["out_snmp"] = sample.Output & 0x7fffffff } @@ -95,7 +100,6 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric fields := map[string]interface{}{ "ip_version": decodeSflowIPVersion(msg.IPVersion), "sys_uptime": msg.Uptime, - "agent_ip": decodeIP(msg.AgentIP), "agent_subid": msg.SubAgentId, "seq_number": sample.Header.SampleSequenceNumber, "sampling_interval": sample.SamplingRate, @@ -103,6 +107,11 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric "sampling_drops": sample.Drops, "in_snmp": sample.InputIfValue, } + fields["agent_ip"], err = decodeIP(msg.AgentIP) + if err != nil { + return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err) + } + if sample.OutputIfFormat == 0 { fields["out_snmp"] = sample.OutputIfValue } @@ -128,10 +137,14 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric fields := map[string]interface{}{ "ip_version": decodeSflowIPVersion(msg.IPVersion), "sys_uptime": msg.Uptime, - "agent_ip": decodeIP(msg.AgentIP), "agent_subid": msg.SubAgentId, "seq_number": sample.Header.SampleSequenceNumber, } + fields["agent_ip"], err = decodeIP(msg.AgentIP) + if err != nil { + return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err) + } + // Decode the source information if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" { fields[name] = sample.Header.SourceIdValue @@ -170,41 +183,79 @@ func (d *sflowv5Decoder) decodeFlowRecords(records []sflow.FlowRecord) (map[stri fields[k] = v } case sflow.SampledEthernet: + var err error fields["eth_total_len"] = record.Length - fields["in_src_mac"] = decodeMAC(record.SrcMac) - fields["out_dst_mac"] = decodeMAC(record.DstMac) + fields["in_src_mac"], err = decodeMAC(record.SrcMac) + if err != nil { + return nil, fmt.Errorf("decoding 'in_src_mac' failed: %w", err) + } + fields["out_dst_mac"], err = decodeMAC(record.DstMac) + if err != nil { + return nil, fmt.Errorf("decoding 'out_dst_mac' failed: %w", err) + } fields["datalink_frame_type"] = layers.EthernetType(record.EthType & 0x0000ffff).String() case sflow.SampledIPv4: + var err error fields["ipv4_total_len"] = record.Base.Length fields["protocol"] = mapL4Proto(uint8(record.Base.Protocol & 0x000000ff)) - fields["src"] = decodeIP(record.Base.SrcIP) - fields["dst"] = decodeIP(record.Base.DstIP) + fields["src"], err = decodeIP(record.Base.SrcIP) + if err != nil { + return nil, fmt.Errorf("decoding 'src' failed: %w", err) + } + fields["dst"], err = decodeIP(record.Base.DstIP) + if err != nil { + return nil, fmt.Errorf("decoding 'dst' failed: %w", err) + } fields["src_port"] = record.Base.SrcPort fields["dst_port"] = record.Base.DstPort fields["src_tos"] = record.Tos - fields["tcp_flags"] = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)}) + fields["tcp_flags"], err = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)}) + if err != nil { + return nil, fmt.Errorf("decoding 'tcp_flags' failed: %w", err) + } case sflow.SampledIPv6: + var err error fields["ipv6_total_len"] = record.Base.Length fields["protocol"] = mapL4Proto(uint8(record.Base.Protocol & 0x000000ff)) - fields["src"] = decodeIP(record.Base.SrcIP) - fields["dst"] = decodeIP(record.Base.DstIP) + fields["src"], err = decodeIP(record.Base.SrcIP) + if err != nil { + return nil, fmt.Errorf("decoding 'src' failed: %w", err) + } + fields["dst"], err = decodeIP(record.Base.DstIP) + if err != nil { + return nil, fmt.Errorf("decoding 'dst' failed: %w", err) + } fields["src_port"] = record.Base.SrcPort fields["dst_port"] = record.Base.DstPort - fields["tcp_flags"] = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)}) + fields["tcp_flags"], err = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)}) + if err != nil { + return nil, fmt.Errorf("decoding 'tcp_flags' failed: %w", err) + } case sflow.ExtendedSwitch: fields["vlan_src"] = record.SrcVlan fields["vlan_src_priority"] = record.SrcPriority fields["vlan_dst"] = record.DstVlan fields["vlan_dst_priority"] = record.DstPriority case sflow.ExtendedRouter: - fields["next_hop"] = decodeIP(record.NextHop) + var err error + fields["next_hop"], err = decodeIP(record.NextHop) + if err != nil { + return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err) + } fields["src_mask"] = record.SrcMaskLen fields["dst_mask"] = record.DstMaskLen case sflow.ExtendedGateway: - fields["next_hop"] = decodeIP(record.NextHop) + var err error + fields["next_hop"], err = decodeIP(record.NextHop) + if err != nil { + return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err) + } fields["bgp_src_as"] = record.SrcAS fields["bgp_dst_as"] = record.ASDestinations - fields["bgp_next_hop"] = decodeIP(record.NextHop) + fields["bgp_next_hop"], err = decodeIP(record.NextHop) + if err != nil { + return nil, fmt.Errorf("decoding 'bgp_next_hop' failed: %w", err) + } fields["bgp_prev_as"] = record.SrcPeerAS if len(record.ASPath) > 0 { fields["bgp_next_as"] = record.ASPath[0] diff --git a/plugins/inputs/netflow/testcases/issue_14370/expected.out b/plugins/inputs/netflow/testcases/issue_14370/expected.out new file mode 100644 index 000000000..b423df4a3 --- /dev/null +++ b/plugins/inputs/netflow/testcases/issue_14370/expected.out @@ -0,0 +1,2 @@ +netflow,source=127.0.0.1,version=IPFIX bgp_dst_as=2152u,bgp_src_as=65535u,direction="ingress",dst="101.104.199.17",dst_port=50202u,flow_end=0u,flow_start=0u,flows=1u,in_bytes=256u,in_packets=1u,in_snmp=101u,out_snmp=201u,protocol="tcp",src="0.0.0.0",src_port=443u 1701440347872006922 +netflow,source=127.0.0.1,version=IPFIX bgp_dst_as=2152u,bgp_src_as=65535u,direction="ingress",dst="2001:db8:0:1::1",dst_port=50202u,flow_end=0u,flow_start=0u,flows=1u,in_bytes=256u,in_packets=1u,in_snmp=101u,out_snmp=201u,protocol="tcp",src="::6568:c711:0:0:6568:c711",src_port=443u 1701440347872025552 \ No newline at end of file diff --git a/plugins/inputs/netflow/testcases/issue_14370/message.bin b/plugins/inputs/netflow/testcases/issue_14370/message.bin new file mode 100644 index 000000000..0c1caff69 Binary files /dev/null and b/plugins/inputs/netflow/testcases/issue_14370/message.bin differ diff --git a/plugins/inputs/netflow/testcases/issue_14370/telegraf.conf b/plugins/inputs/netflow/testcases/issue_14370/telegraf.conf new file mode 100644 index 000000000..cfd23d363 --- /dev/null +++ b/plugins/inputs/netflow/testcases/issue_14370/telegraf.conf @@ -0,0 +1,2 @@ +[[inputs.netflow]] + service_address = "udp://127.0.0.1:0" diff --git a/plugins/inputs/netflow/type_conversion.go b/plugins/inputs/netflow/type_conversion.go index 772ed71d9..49db1a9ef 100644 --- a/plugins/inputs/netflow/type_conversion.go +++ b/plugins/inputs/netflow/type_conversion.go @@ -83,69 +83,86 @@ func initIPv4OptionMapping() error { return nil } -func decodeInt32(b []byte) interface{} { - return int64(int32(binary.BigEndian.Uint32(b))) -} - -func decodeUint(b []byte) interface{} { +func decodeInt(b []byte) (interface{}, error) { switch len(b) { + case 0: + return int64(0), nil case 1: - return uint64(b[0]) + return int64(int8(b[0])), nil case 2: - return uint64(binary.BigEndian.Uint16(b)) + return int64(int16(binary.BigEndian.Uint16(b))), nil case 4: - return uint64(binary.BigEndian.Uint32(b)) + return int64(int32(binary.BigEndian.Uint32(b))), nil case 8: - return binary.BigEndian.Uint64(b) + return int64(binary.BigEndian.Uint64(b)), nil } - panic(fmt.Errorf("invalid length for uint buffer %v", b)) + return nil, fmt.Errorf("invalid length for int buffer %v", b) } -func decodeFloat64(b []byte) interface{} { +func decodeUint(b []byte) (interface{}, error) { + switch len(b) { + case 0: + return uint64(0), nil + case 1: + return uint64(b[0]), nil + case 2: + return uint64(binary.BigEndian.Uint16(b)), nil + case 4: + return uint64(binary.BigEndian.Uint32(b)), nil + case 8: + return binary.BigEndian.Uint64(b), nil + } + return nil, fmt.Errorf("invalid length for uint buffer %v", b) +} + +func decodeFloat64(b []byte) (interface{}, error) { raw := binary.BigEndian.Uint64(b) - return math.Float64frombits(raw) + return math.Float64frombits(raw), nil } // According to https://www.rfc-editor.org/rfc/rfc5101#section-6.1.5 -func decodeBool(b []byte) interface{} { +func decodeBool(b []byte) (interface{}, error) { + if len(b) == 0 { + return nil, errors.New("empty data") + } if b[0] == 1 { - return true + return true, nil } if b[0] == 2 { - return false + return false, nil } - return b[0] + return b[0], nil } -func decodeHex(b []byte) interface{} { +func decodeHex(b []byte) (interface{}, error) { if len(b) == 0 { - return "" + return "", nil } - return "0x" + hex.EncodeToString(b) + return "0x" + hex.EncodeToString(b), nil } -func decodeString(b []byte) interface{} { - return string(b) +func decodeString(b []byte) (interface{}, error) { + return string(b), nil } -func decodeMAC(b []byte) interface{} { +func decodeMAC(b []byte) (interface{}, error) { mac := net.HardwareAddr(b) - return mac.String() + return mac.String(), nil } -func decodeIP(b []byte) interface{} { +func decodeIP(b []byte) (interface{}, error) { ip := net.IP(b) - return ip.String() + return ip.String(), nil } -func decodeIPFromUint32(a uint32) interface{} { +func decodeIPFromUint32(a uint32) (interface{}, error) { b := make([]byte, 4) binary.BigEndian.PutUint32(b, a) return decodeIP(b) } -func decodeL4Proto(b []byte) interface{} { - return mapL4Proto(b[0]) +func decodeL4Proto(b []byte) (interface{}, error) { + return mapL4Proto(b[0]), nil } func mapL4Proto(id uint8) string { @@ -156,7 +173,7 @@ func mapL4Proto(id uint8) string { return strconv.FormatUint(uint64(id), 10) } -func decodeIPv4Options(b []byte) interface{} { +func decodeIPv4Options(b []byte) (interface{}, error) { flags := binary.BigEndian.Uint32(b) var result []string @@ -170,16 +187,16 @@ func decodeIPv4Options(b []byte) interface{} { } } - return strings.Join(result, ",") + return strings.Join(result, ","), nil } -func decodeTCPFlags(b []byte) interface{} { - if len(b) < 1 { - return "" +func decodeTCPFlags(b []byte) (interface{}, error) { + if len(b) == 0 { + return "", nil } if len(b) == 1 { - return mapTCPFlags(b[0]) + return mapTCPFlags(b[0]), nil } // IPFIX has more flags @@ -192,7 +209,7 @@ func decodeTCPFlags(b []byte) interface{} { results = append(results, ".") } } - return strings.Join(results, "") + mapTCPFlags(b[1]) + return strings.Join(results, "") + mapTCPFlags(b[1]), nil } func mapTCPFlags(flags uint8) string { @@ -220,7 +237,7 @@ func mapTCPFlags(flags uint8) string { return strings.Join(result, "") } -func decodeFragmentFlags(b []byte) interface{} { +func decodeFragmentFlags(b []byte) (interface{}, error) { flagMapping := []string{ "*", // do not care "*", // do not care @@ -242,21 +259,21 @@ func decodeFragmentFlags(b []byte) interface{} { } } - return strings.Join(result, "") + return strings.Join(result, ""), nil } -func decodeSampleAlgo(b []byte) interface{} { +func decodeSampleAlgo(b []byte) (interface{}, error) { switch b[0] { case 1: - return "deterministic" + return "deterministic", nil case 2: - return "random" + return "random", nil } - return strconv.FormatUint(uint64(b[0]), 10) + return strconv.FormatUint(uint64(b[0]), 10), nil } -func decodeEngineType(b []byte) interface{} { - return mapEngineType(b[0]) +func decodeEngineType(b []byte) (interface{}, error) { + return mapEngineType(b[0]), nil } func mapEngineType(b uint8) string { @@ -271,211 +288,211 @@ func mapEngineType(b uint8) string { return strconv.FormatUint(uint64(b), 10) } -func decodeMPLSType(b []byte) interface{} { +func decodeMPLSType(b []byte) (interface{}, error) { switch b[0] { case 0: - return "unknown" + return "unknown", nil case 1: - return "TE-MIDPT" + return "TE-MIDPT", nil case 2: - return "Pseudowire" + return "Pseudowire", nil case 3: - return "VPN" + return "VPN", nil case 4: - return "BGP" + return "BGP", nil case 5: - return "LDP" + return "LDP", nil case 6: - return "Path computation element" + return "Path computation element", nil case 7: - return "OSPFv2" + return "OSPFv2", nil case 8: - return "OSPFv3" + return "OSPFv3", nil case 9: - return "IS-IS" + return "IS-IS", nil case 10: - return "BGP segment routing Prefix-SID" + return "BGP segment routing Prefix-SID", nil } - return strconv.FormatUint(uint64(b[0]), 10) + return strconv.FormatUint(uint64(b[0]), 10), nil } -func decodeIPVersion(b []byte) interface{} { +func decodeIPVersion(b []byte) (interface{}, error) { switch b[0] { case 4: - return "IPv4" + return "IPv4", nil case 6: - return "IPv6" + return "IPv6", nil } - return strconv.FormatUint(uint64(b[0]), 10) + return strconv.FormatUint(uint64(b[0]), 10), nil } -func decodeDirection(b []byte) interface{} { +func decodeDirection(b []byte) (interface{}, error) { switch b[0] { case 0: - return "ingress" + return "ingress", nil case 1: - return "egress" + return "egress", nil } - return strconv.FormatUint(uint64(b[0]), 10) + return strconv.FormatUint(uint64(b[0]), 10), nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-forwarding-status -func decodeFwdStatus(b []byte) interface{} { +func decodeFwdStatus(b []byte) (interface{}, error) { switch b[0] >> 6 { case 0: - return "unknown" + return "unknown", nil case 1: - return "forwarded" + return "forwarded", nil case 2: - return "dropped" + return "dropped", nil case 3: - return "consumed" + return "consumed", nil } - return "invalid" + return "invalid", nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-forwarding-status -func decodeFwdReason(b []byte) interface{} { +func decodeFwdReason(b []byte) (interface{}, error) { switch b[0] { // unknown case 0: - return "unknown" + return "unknown", nil // forwarded case 64: - return "unknown" + return "unknown", nil case 65: - return "fragmented" + return "fragmented", nil case 66: - return "not fragmented" + return "not fragmented", nil // dropped case 128: - return "unknown" + return "unknown", nil case 129: - return "ACL deny" + return "ACL deny", nil case 130: - return "ACL drop" + return "ACL drop", nil case 131: - return "unroutable" + return "unroutable", nil case 132: - return "adjacency" + return "adjacency", nil case 133: - return "fragmentation and DF set" + return "fragmentation and DF set", nil case 134: - return "bad header checksum" + return "bad header checksum", nil case 135: - return "bad total length" + return "bad total length", nil case 136: - return "bad header length" + return "bad header length", nil case 137: - return "bad TTL" + return "bad TTL", nil case 138: - return "policer" + return "policer", nil case 139: - return "WRED" + return "WRED", nil case 140: - return "RPF" + return "RPF", nil case 141: - return "for us" + return "for us", nil case 142: - return "bad output interface" + return "bad output interface", nil case 143: - return "hardware" + return "hardware", nil // consumed case 192: - return "unknown" + return "unknown", nil case 193: - return "terminate punt adjacency" + return "terminate punt adjacency", nil case 194: - return "terminate incomplete adjacency" + return "terminate incomplete adjacency", nil case 195: - return "terminate for us" + return "terminate for us", nil case 14: - return "" + return "", nil } - return "invalid" + return "invalid", nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-firewall-event -func decodeFWEvent(b []byte) interface{} { +func decodeFWEvent(b []byte) (interface{}, error) { switch b[0] { case 0: - return "ignore" + return "ignore", nil case 1: - return "flow created" + return "flow created", nil case 2: - return "flow deleted" + return "flow deleted", nil case 3: - return "flow denied" + return "flow denied", nil case 4: - return "flow alert" + return "flow alert", nil case 5: - return "flow update" + return "flow update", nil } - return strconv.FormatUint(uint64(b[0]), 10) + return strconv.FormatUint(uint64(b[0]), 10), nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-flow-end-reason -func decodeFlowEndReason(b []byte) interface{} { +func decodeFlowEndReason(b []byte) (interface{}, error) { switch b[0] { case 0: - return "reserved" + return "reserved", nil case 1: - return "idle timeout" + return "idle timeout", nil case 2: - return "active timeout" + return "active timeout", nil case 3: - return "end of flow" + return "end of flow", nil case 4: - return "forced end" + return "forced end", nil case 5: - return "lack of resources" + return "lack of resources", nil } - return "unassigned" + return "unassigned", nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-biflow-direction -func decodeBiflowDirection(b []byte) interface{} { +func decodeBiflowDirection(b []byte) (interface{}, error) { switch b[0] { case 0: - return "arbitrary" + return "arbitrary", nil case 1: - return "initiator" + return "initiator", nil case 2: - return "reverse initiator" + return "reverse initiator", nil case 3: - return "perimeter" + return "perimeter", nil } - return "unassigned" + return "unassigned", nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-observation-point-type -func decodeOpsPointType(b []byte) interface{} { +func decodeOpsPointType(b []byte) (interface{}, error) { switch b[0] { case 0: - return "invalid" + return "invalid", nil case 1: - return "physical port" + return "physical port", nil case 2: - return "port channel" + return "port channel", nil case 3: - return "vlan" + return "vlan", nil } - return "unassigned" + return "unassigned", nil } -func decodeAnonStabilityClass(b []byte) interface{} { +func decodeAnonStabilityClass(b []byte) (interface{}, error) { switch b[1] & 0x03 { case 1: - return "session" + return "session", nil case 2: - return "exporter-collector" + return "exporter-collector", nil case 3: - return "stable" + return "stable", nil } - return "undefined" + return "undefined", nil } -func decodeAnonFlags(b []byte) interface{} { +func decodeAnonFlags(b []byte) (interface{}, error) { var result []string if b[0]&(1<<2) != 0 { result = append(result, "PmA") @@ -485,156 +502,156 @@ func decodeAnonFlags(b []byte) interface{} { result = append(result, "LOR") } - return strings.Join(result, ",") + return strings.Join(result, ","), nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-anonymization-technique -func decodeAnonTechnique(b []byte) interface{} { +func decodeAnonTechnique(b []byte) (interface{}, error) { tech := binary.BigEndian.Uint16(b) switch tech { case 0: - return "undefined" + return "undefined", nil case 1: - return "none" + return "none", nil case 2: - return "precision degradation" + return "precision degradation", nil case 3: - return "binning" + return "binning", nil case 4: - return "enumeration" + return "enumeration", nil case 5: - return "permutation" + return "permutation", nil case 6: - return "structure permutation" + return "structure permutation", nil case 7: - return "reverse truncation" + return "reverse truncation", nil case 8: - return "noise" + return "noise", nil case 9: - return "offset" + return "offset", nil } - return "unassigned" + return "unassigned", nil } -func decodeTechnology(b []byte) interface{} { +func decodeTechnology(b []byte) (interface{}, error) { switch string(b) { case "yes", "y", "1": - return "yes" + return "yes", nil case "no", "n", "2": - return "no" + return "no", nil case "unassigned", "u", "0": - return "unassigned" + return "unassigned", nil } switch b[0] { case 0: - return "unassigned" + return "unassigned", nil case 1: - return "yes" + return "yes", nil case 2: - return "no" + return "no", nil } - return "undefined" + return "undefined", nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-nat-type -func decodeIPNatType(b []byte) interface{} { +func decodeIPNatType(b []byte) (interface{}, error) { tech := binary.BigEndian.Uint16(b) switch tech { case 0: - return "unknown" + return "unknown", nil case 1: - return "NAT44" + return "NAT44", nil case 2: - return "NAT64" + return "NAT64", nil case 3: - return "NAT46" + return "NAT46", nil case 4: - return "IPv4 no NAT" + return "IPv4 no NAT", nil case 5: - return "NAT66" + return "NAT66", nil case 6: - return "IPv6 no NAT" + return "IPv6 no NAT", nil } - return "unassigned" + return "unassigned", nil } // https://www.iana.org/assignments/psamp-parameters/psamp-parameters.xhtml -func decodeSelectorAlgorithm(b []byte) interface{} { +func decodeSelectorAlgorithm(b []byte) (interface{}, error) { tech := binary.BigEndian.Uint16(b) switch tech { case 0: - return "reserved" + return "reserved", nil case 1: - return "systematic count-based sampling" + return "systematic count-based sampling", nil case 2: - return "systematic time-based sampling" + return "systematic time-based sampling", nil case 3: - return "random n-out-of-N sampling" + return "random n-out-of-N sampling", nil case 4: - return "uniform probabilistic sampling" + return "uniform probabilistic sampling", nil case 5: - return "property match filtering" + return "property match filtering", nil case 6: - return "hash based filtering using BOB" + return "hash based filtering using BOB", nil case 7: - return "hash based filtering using IPSX" + return "hash based filtering using IPSX", nil case 8: - return "hash based filtering using CRC" + return "hash based filtering using CRC", nil case 9: - return "flow-state dependent" + return "flow-state dependent", nil } - return "unassigned" + return "unassigned", nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-value-distribution-method -func decodeValueDistMethod(b []byte) interface{} { +func decodeValueDistMethod(b []byte) (interface{}, error) { switch b[0] { case 0: - return "unspecified" + return "unspecified", nil case 1: - return "start interval" + return "start interval", nil case 2: - return "end interval" + return "end interval", nil case 3: - return "mid interval" + return "mid interval", nil case 4: - return "simple uniform distribution" + return "simple uniform distribution", nil case 5: - return "proportional uniform distribution" + return "proportional uniform distribution", nil case 6: - return "simulated process" + return "simulated process", nil case 7: - return "direct" + return "direct", nil } - return "unassigned" + return "unassigned", nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-data-link-frame-type -func decodeDataLinkFrameType(b []byte) interface{} { +func decodeDataLinkFrameType(b []byte) (interface{}, error) { switch binary.BigEndian.Uint16(b) { case 0x0001: - return "IEEE802.3 ethernet" + return "IEEE802.3 ethernet", nil case 0x0002: - return "IEEE802.11 MAC" + return "IEEE802.11 MAC", nil } - return "unassigned" + return "unassigned", nil } // https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-mib-capture-time-semantics -func decodeCaptureTimeSemantics(b []byte) interface{} { +func decodeCaptureTimeSemantics(b []byte) (interface{}, error) { switch b[0] { case 0: - return "undefined" + return "undefined", nil case 1: - return "begin" + return "begin", nil case 2: - return "end" + return "end", nil case 3: - return "export" + return "export", nil case 4: - return "average" + return "average", nil } - return "unassigned" + return "unassigned", nil } func decodeSflowIPVersion(v uint32) string { @@ -692,3 +709,7 @@ func decodeSflowHeaderProtocol(t uint32) string { } return "unassigned" } + +func decodeByteFunc(idx int) decoderFunc { + return func(b []byte) (interface{}, error) { return b[idx], nil } +} diff --git a/plugins/inputs/netflow/type_conversion_test.go b/plugins/inputs/netflow/type_conversion_test.go index d54ef5467..90149bd4d 100644 --- a/plugins/inputs/netflow/type_conversion_test.go +++ b/plugins/inputs/netflow/type_conversion_test.go @@ -9,7 +9,9 @@ import ( func TestDecodeInt32(t *testing.T) { buf := []byte{0x82, 0xad, 0x80, 0x86} - out, ok := decodeInt32(buf).(int64) + v, err := decodeInt(buf) + require.NoError(t, err) + out, ok := v.(int64) require.True(t, ok) require.Equal(t, int64(-2102558586), out) } @@ -44,7 +46,9 @@ func TestDecodeUint(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - out, ok := decodeUint(tt.in).(uint64) + v, err := decodeUint(tt.in) + require.NoError(t, err) + out, ok := v.(uint64) require.True(t, ok) require.Equal(t, tt.expected, out) }) @@ -52,12 +56,15 @@ func TestDecodeUint(t *testing.T) { } func TestDecodeUintInvalid(t *testing.T) { - require.Panics(t, func() { decodeUint([]byte{0x00, 0x00, 0x00}) }) + _, err := decodeUint([]byte{0x00, 0x00, 0x00}) + require.ErrorContains(t, err, "invalid length") } func TestDecodeFloat64(t *testing.T) { buf := []byte{0x40, 0x09, 0x21, 0xfb, 0x54, 0x44, 0x2e, 0xea} - out, ok := decodeFloat64(buf).(float64) + v, err := decodeFloat64(buf) + require.NoError(t, err) + out, ok := v.(float64) require.True(t, ok) require.Equal(t, float64(3.14159265359), out) } @@ -92,7 +99,8 @@ func TestDecodeBool(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - out := decodeBool(tt.in) + out, err := decodeBool(tt.in) + require.NoError(t, err) require.Equal(t, tt.expected, out) }) } @@ -100,21 +108,27 @@ func TestDecodeBool(t *testing.T) { func TestDecodeHex(t *testing.T) { buf := []byte{0x40, 0x09, 0x21, 0xfb, 0x54, 0x44, 0x2e, 0xea} - out, ok := decodeHex(buf).(string) + v, err := decodeHex(buf) + require.NoError(t, err) + out, ok := v.(string) require.True(t, ok) require.Equal(t, "0x400921fb54442eea", out) } func TestDecodeString(t *testing.T) { buf := []byte{0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x74, 0x65, 0x6c, 0x65, 0x67, 0x72, 0x61, 0x66} - out, ok := decodeString(buf).(string) + v, err := decodeString(buf) + require.NoError(t, err) + out, ok := v.(string) require.True(t, ok) require.Equal(t, "hello telegraf", out) } func TestDecodeMAC(t *testing.T) { buf := []byte{0x2c, 0xf0, 0x5d, 0xe9, 0x04, 0x42} - out, ok := decodeMAC(buf).(string) + v, err := decodeMAC(buf) + require.NoError(t, err) + out, ok := v.(string) require.True(t, ok) require.Equal(t, "2c:f0:5d:e9:04:42", out) } @@ -164,7 +178,9 @@ func TestDecodeIP(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - out, ok := decodeIP(tt.in).(string) + v, err := decodeIP(tt.in) + require.NoError(t, err) + out, ok := v.(string) require.True(t, ok) require.Equal(t, tt.expected, out) }) @@ -173,7 +189,9 @@ func TestDecodeIP(t *testing.T) { func TestDecodeIPFromUint32(t *testing.T) { in := uint32(0x7f000001) - out, ok := decodeIPFromUint32(in).(string) + v, err := decodeIPFromUint32(in) + require.NoError(t, err) + out, ok := v.(string) require.True(t, ok) require.Equal(t, "127.0.0.1", out) } @@ -230,7 +248,9 @@ func TestDecodeLayer4ProtocolNumber(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - out, ok := decodeL4Proto(tt.in).(string) + v, err := decodeL4Proto(tt.in) + require.NoError(t, err) + out, ok := v.(string) require.True(t, ok) require.Equal(t, tt.expected, out) }) @@ -283,7 +303,9 @@ func TestDecodeIPv4Options(t *testing.T) { in := make([]byte, 4) binary.BigEndian.PutUint32(in, options) - out, ok := decodeIPv4Options(in).(string) + v, err := decodeIPv4Options(in) + require.NoError(t, err) + out, ok := v.(string) require.True(t, ok) require.Equal(t, tt.expected, out) }) @@ -382,7 +404,9 @@ func TestDecodeTCPFlags(t *testing.T) { } in = []byte{options} } - out, ok := decodeTCPFlags(in).(string) + v, err := decodeTCPFlags(in) + require.NoError(t, err) + out, ok := v.(string) require.True(t, ok) require.Equal(t, tt.expected, out) }) @@ -434,7 +458,9 @@ func TestDecodeFragmentFlags(t *testing.T) { flags |= 1 << bit } in := []byte{flags} - out, ok := decodeFragmentFlags(in).(string) + v, err := decodeFragmentFlags(in) + require.NoError(t, err) + out, ok := v.(string) require.True(t, ok) require.Equal(t, tt.expected, out) })