diff --git a/plugins/inputs/netflow/README.md b/plugins/inputs/netflow/README.md index c96b2631a..e72861220 100644 --- a/plugins/inputs/netflow/README.md +++ b/plugins/inputs/netflow/README.md @@ -59,12 +59,47 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## "sflow v5" -- sFlow v5 protocol # protocol = "ipfix" + ## Private Enterprise Numbers (PEN) mappings for decoding + ## This option allows to specify vendor-specific mapping files to use during + ## decoding. + # private_enterprise_number_files = [] + ## Dump incoming packets to the log ## This can be helpful to debug parsing issues. Only active if ## Telegraf is in debug mode. # dump_packets = false ``` +## Private Enterprise Number mapping + +Using the `private_enterprise_number_files` option you can specify mappings for +vendor-specific element-IDs with a PEN specification. The mapping has to be a +comma-separated-file (CSV) containing the element's `ID`, its `name` and the +`data-type`. A comma (`,`) is used as separator and comments are allowed using +the hash (`#`) prefix. +The element `ID` has the form `.`, the `name` has to be +a valid field-name and `data-type` denotes the mapping of the raw-byte value to +the field's type. For example + +```csv +# PEN.ID, name, data type +35632.349,in_src_osi_sap,hex +35632.471,nprobe_ipv4_address,ip +35632.1028,protocol_ntop,string +35632.1036,l4_srv_port,uint +``` + +specify four elements (`349`, `471`, `1028` and `1036`) for PEN `35632` (ntop) +with the corresponding name and data-type. + +Currently the following `data-type`s are supported: + +- `uint` unsigned integer with 8, 16, 32 or 64 bit +- `hex` hex-encoding of the raw byte sequence with `0x` prefix +- `string` string interpretation of the raw byte sequence +- `ip` IPv4 or IPv6 address +- `proto` mapping of layer-4 protocol numbers to names + ## Metrics Metrics depend on the format used as well as on the information provided diff --git a/plugins/inputs/netflow/mappings.go b/plugins/inputs/netflow/mappings.go new file mode 100644 index 000000000..147c98792 --- /dev/null +++ b/plugins/inputs/netflow/mappings.go @@ -0,0 +1,44 @@ +package netflow + +import ( + "encoding/csv" + "fmt" + "os" +) + +var funcMapping = map[string]decoderFunc{ + "uint": decodeUint, + "hex": decodeHex, + "string": decodeString, + "ip": decodeIP, + "proto": decodeL4Proto, +} + +func loadMapping(filename string) (map[string]fieldMapping, error) { + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("opening %q failed: %w", filename, err) + } + defer file.Close() + + reader := csv.NewReader(file) + reader.Comma = ',' + reader.Comment = '#' + reader.TrimLeadingSpace = true + records, err := reader.ReadAll() + if err != nil { + return nil, fmt.Errorf("reading csv failed: %w", err) + } + + mappings := make(map[string]fieldMapping, len(records)) + for _, r := range records { + id, name, dtype := r[0], r[1], r[2] + fun, found := funcMapping[dtype] + if !found { + return nil, fmt.Errorf("unknown data-type %q for id %q", dtype, id) + } + mappings[id] = fieldMapping{name, fun} + } + + return mappings, nil +} diff --git a/plugins/inputs/netflow/mappings_ipfix_pen/ntop-35632.csv b/plugins/inputs/netflow/mappings_ipfix_pen/ntop-35632.csv new file mode 100644 index 000000000..c497c00bc --- /dev/null +++ b/plugins/inputs/netflow/mappings_ipfix_pen/ntop-35632.csv @@ -0,0 +1,101 @@ +# The following data is extracted from +# https://www.ntop.org/guides/nprobe/flow_information_elements.html +# and contains IPFIX element definitions for Private Enterprise Number (PEN) +# 35632 (ntop) +# +# PEN.ID, name, data type +35632.1028,protocol_ntop,string +35632.1031,l4_src_port_name,string +35632.1035,l4_dst_port_name,string +35632.1036,l4_srv_port,uint +35632.1037,l4_srv_port_name,string +35632.80,src_fragments,uint +35632.81,dst_fragments,uint +35632.123,client_nw_latency_ms,uint +35632.124,server_nw_latency_ms,uint +35632.78,client_tcp_flags,uint +35632.79,server_tcp_flags,uint +35632.125,app_latency_ms,uint +35632.471,nprobe_ipv4_address,ip +35632.82,src_to_dst_max_throughput,uint +35632.83,src_to_dst_min_throughput,uint +35632.84,src_to_dst_avg_throughput,uint +35632.85,dst_to_src_max_throughput,uint +35632.86,dst_to_src_min_throughput,uint +35632.87,dst_to_src_avg_throughput,uint +35632.88,pkts_up_to_128_bytes,uint +35632.89,pkts_128_to_256_bytes,uint +35632.90,pkts_256_to_512_bytes,uint +35632.91,pkts_512_to_1024_bytes,uint +35632.92,pkts_1024_to_1514_bytes,uint +35632.93,pkts_over_1514_bytes,uint +35632.98,cumulative_icmp_type,uint +35632.101,src_ip_country,string +35632.102,src_ip_city,string +35632.103,dst_ip_country,string +35632.104,dst_ip_city,string +35632.448,src_ip_long,hex +35632.449,src_ip_lat,hex +35632.450,dst_ip_long,hex +35632.451,dst_ip_lat,hex +35632.105,flow_proto_port,uint +35632.106,upstream_tunnel_id,uint +35632.446,upstream_session_id,uint +35632.107,longest_flow_pkt,uint +35632.108,shortest_flow_pkt,uint +35632.127,retransmitted_in_bytes,uint +35632.109,retransmitted_in_pkts,uint +35632.128,retransmitted_out_bytes,uint +35632.110,retransmitted_out_pkts,uint +35632.111,ooorder_in_pkts,uint +35632.112,ooorder_out_pkts,uint +35632.113,untunneled_protocol,proto +35632.114,untunneled_ipv4_src_addr,ip +35632.115,untunneled_l4_src_port,uint +35632.116,untunneled_ipv4_dst_addr,ip +35632.117,untunneled_l4_dst_port,uint +35632.118,l7_proto,uint +35632.119,l7_proto_name,string +35632.120,downstream_tunnel_id,uint +35632.447,downstream_session_id,uint +35632.188,ssl_server_name,string +35632.189,bittorrent_hash,string +35632.121,flow_user_name,string +35632.122,flow_server_name,string +35632.126,plugin_name,string +35632.396,untunneled_ipv6_src_addr,ip +35632.397,untunneled_ipv6_dst_addr,ip +35632.347,pkts_ttl_eq_1,uint +35632.346,pkts_ttl_2_5,uint +35632.334,pkts_ttl_5_32,uint +35632.335,pkts_ttl_32_64,uint +35632.336,pkts_ttl_64_96,uint +35632.337,pkts_ttl_96_128,uint +35632.338,pkts_ttl_128_160,uint +35632.339,pkts_ttl_160_192,uint +35632.340,pkts_ttl_192_224,uint +35632.341,pkts_ttl_224_255,uint +35632.349,in_src_osi_sap,hex +35632.350,out_dst_osi_sap,hex +35632.391,duration_in,uint +35632.392,duration_out,uint +35632.415,tcp_win_min_in,uint +35632.416,tcp_win_max_in,uint +35632.417,tcp_win_mss_in,uint +35632.418,tcp_win_scale_in,uint +35632.419,tcp_win_min_out,uint +35632.420,tcp_win_max_out,uint +35632.421,tcp_win_mss_out,uint +35632.422,tcp_win_scale_out,uint +35632.438,payload_hash,uint +35632.443,src_as_name,string +35632.444,dst_as_name,string +35632.472,src_to_dst_second_bytes,uint +35632.473,dst_to_src_second_bytes,uint +35632.489,ja3c_hash,string +35632.490,ja3s_hash,string +35632.491,src_host_name,string +35632.492,dst_host_name,string +35632.493,ssl_cipher,uint +35632.494,ssl_unsafe_cipher,uint +35632.495,ssl_version,uint diff --git a/plugins/inputs/netflow/netflow.go b/plugins/inputs/netflow/netflow.go index 7c653da14..096b127a2 100644 --- a/plugins/inputs/netflow/netflow.go +++ b/plugins/inputs/netflow/netflow.go @@ -29,6 +29,7 @@ type NetFlow struct { ReadBufferSize config.Size `toml:"read_buffer_size"` Protocol string `toml:"protocol"` DumpPackets bool `toml:"dump_packets"` + PENFiles []string `toml:"private_enterprise_number_files"` Log telegraf.Logger `toml:"-"` conn *net.UDPConn @@ -55,15 +56,29 @@ func (n *NetFlow) Init() error { } switch strings.ToLower(n.Protocol) { - case "", "netflow v9", "ipfix": - n.decoder = &netflowDecoder{Log: n.Log} + case "netflow v9": + if len(n.PENFiles) != 0 { + n.Log.Warn("'private_enterprise_number_files' option will be ignored in 'netflow v9'") + } + n.decoder = &netflowDecoder{ + Log: n.Log, + } + case "", "ipfix": + n.decoder = &netflowDecoder{ + PENFiles: n.PENFiles, + Log: n.Log, + } case "netflow v5": + if len(n.PENFiles) != 0 { + n.Log.Warn("'private_enterprise_number_files' option will be ignored in 'netflow v5'") + } n.decoder = &netflowv5Decoder{} case "sflow", "sflow v5": n.decoder = &sflowv5Decoder{Log: n.Log} default: return fmt.Errorf("invalid protocol %q, only supports 'sflow', 'netflow v5', 'netflow v9' and 'ipfix'", n.Protocol) } + return n.decoder.Init() } diff --git a/plugins/inputs/netflow/netflow_decoder.go b/plugins/inputs/netflow/netflow_decoder.go index 70955bed6..aadfb96a2 100644 --- a/plugins/inputs/netflow/netflow_decoder.go +++ b/plugins/inputs/netflow/netflow_decoder.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" "net" - "strconv" + "regexp" "sync" "time" @@ -15,9 +15,13 @@ import ( "github.com/influxdata/telegraf/metric" ) +var regexpIPFIXPENMapping = regexp.MustCompile(`\d+\.\d+`) + +type decoderFunc func([]byte) interface{} + type fieldMapping struct { name string - decoder func([]byte) interface{} + decoder decoderFunc } // Default field mappings common for Netflow version 9 and IPFIX @@ -526,13 +530,15 @@ var fieldMappingsIPFIX = map[uint16][]fieldMapping{ // Decoder structure type netflowDecoder struct { - Log telegraf.Logger + PENFiles []string + Log telegraf.Logger templates map[string]*netflow.BasicTemplateSystem mappingsV9 map[uint16]fieldMapping mappingsIPFIX map[uint16]fieldMapping mappingsPEN map[string]fieldMapping + logged map[string]bool sync.Mutex } @@ -578,13 +584,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric } fields := make(map[string]interface{}) for _, value := range record.Values { - var extracted []telegraf.Field - if value.PenProvided { - extracted = d.decodeValuePEN(value) - } else { - extracted = d.decodeValueV9(value) - } - for _, field := range extracted { + for _, field := range d.decodeValueV9(value) { fields[field.Key] = field.Value } } @@ -607,13 +607,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric fields := make(map[string]interface{}) t := time.Now() for _, value := range record.Values { - var extracted []telegraf.Field - if value.PenProvided { - extracted = d.decodeValuePEN(value) - } else { - extracted = d.decodeValueIPFIX(value) - } - for _, field := range extracted { + for _, field := range d.decodeValueIPFIX(value) { fields[field.Key] = field.Value } } @@ -640,20 +634,40 @@ func (d *netflowDecoder) Init() error { d.mappingsV9 = make(map[uint16]fieldMapping) d.mappingsIPFIX = make(map[uint16]fieldMapping) d.mappingsPEN = make(map[string]fieldMapping) + for _, fn := range d.PENFiles { + d.Log.Debugf("Loading PEN mapping file %q...", fn) + mappings, err := loadMapping(fn) + if err != nil { + return err + } + for k, v := range mappings { + if !regexpIPFIXPENMapping.MatchString(k) { + return fmt.Errorf("key %q in file %q does not match pattern .; maybe wrong file", k, fn) + } + if _, found := d.mappingsPEN[k]; found { + return fmt.Errorf("duplicate entries for ID %q", k) + } + d.mappingsPEN[k] = v + } + } + d.Log.Infof("Loaded %d PEN mappings...", len(d.mappingsPEN)) + + d.logged = make(map[string]bool) return nil } func (d *netflowDecoder) decodeValueV9(field netflow.DataField) []telegraf.Field { raw := field.Value.([]byte) + elementID := field.Type // Check the user-specified mapping - if m, found := d.mappingsV9[field.Type]; found { + if m, found := d.mappingsV9[elementID]; found { return []telegraf.Field{{Key: m.name, Value: m.decoder(raw)}} } // Check the version specific default field mappings - if mappings, found := fieldMappingsNetflowV9[field.Type]; found { + if mappings, found := fieldMappingsNetflowV9[elementID]; found { var fields []telegraf.Field for _, m := range mappings { fields = append(fields, telegraf.Field{ @@ -665,7 +679,7 @@ func (d *netflowDecoder) decodeValueV9(field netflow.DataField) []telegraf.Field } // Check the common default field mappings - if mappings, found := fieldMappingsNetflowCommon[field.Type]; found { + if mappings, found := fieldMappingsNetflowCommon[elementID]; found { var fields []telegraf.Field for _, m := range mappings { fields = append(fields, telegraf.Field{ @@ -677,9 +691,11 @@ func (d *netflowDecoder) decodeValueV9(field netflow.DataField) []telegraf.Field } // Return the raw data if no mapping was found - d.Log.Debugf("unknown data field %v", field) - name := "type_" + strconv.FormatUint(uint64(field.Type), 10) - return []telegraf.Field{{Key: name, Value: decodeHex(raw)}} + key := fmt.Sprintf("type_%d", elementID) + if !d.logged[key] { + d.Log.Debugf("unknown Netflow v9 data field %v", field) + } + return []telegraf.Field{{Key: key, Value: decodeHex(raw)}} } func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) []telegraf.Field { @@ -693,6 +709,20 @@ func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) []telegraf.Fi elementID = field.Type & (0x4000 ^ 0xffff) } + // Handle messages with Private Enterprise Numbers (PENs) + if field.PenProvided { + key := fmt.Sprintf("%d.%d", field.Pen, elementID) + if m, found := d.mappingsPEN[key]; found { + name := prefix + m.name + return []telegraf.Field{{Key: name, Value: m.decoder(raw)}} + } + if !d.logged[key] { + d.Log.Debugf("unknown IPFIX PEN data field %v", field) + } + name := fmt.Sprintf("type_%d_%s%d", field.Pen, prefix, elementID) + return []telegraf.Field{{Key: name, Value: decodeHex(raw)}} + } + // Check the user-specified mapping if m, found := d.mappingsIPFIX[elementID]; found { return []telegraf.Field{{Key: prefix + m.name, Value: m.decoder(raw)}} @@ -723,28 +753,9 @@ func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) []telegraf.Fi } // Return the raw data if no mapping was found - d.Log.Debugf("unknown data field %v", field) - name := "type_" + strconv.FormatUint(uint64(field.Type), 10) - return []telegraf.Field{{Key: name, Value: decodeHex(raw)}} -} - -func (d *netflowDecoder) decodeValuePEN(field netflow.DataField) []telegraf.Field { - raw := field.Value.([]byte) - - var prefix string - elementID := field.Type - if field.Type&0x4000 != 0 { - prefix = "rev_" - elementID = field.Type & (0x4000 ^ 0xffff) + key := fmt.Sprintf("type_%d", elementID) + if !d.logged[key] { + d.Log.Debugf("unknown IPFIX data field %v", field) } - - key := fmt.Sprintf("%d.%d", field.Pen, elementID) - if m, found := d.mappingsPEN[key]; found { - return []telegraf.Field{{Key: m.name, Value: m.decoder(raw)}} - } - - // Return the raw data if no mapping was found - d.Log.Debugf("unknown PEN data field %v", field) - name := fmt.Sprintf("type_%d_%s%d", field.Pen, prefix, elementID) - return []telegraf.Field{{Key: name, Value: decodeHex(raw)}} + return []telegraf.Field{{Key: key, Value: decodeHex(raw)}} } diff --git a/plugins/inputs/netflow/netflow_test.go b/plugins/inputs/netflow/netflow_test.go index ae3c742c2..ae5db6f3b 100644 --- a/plugins/inputs/netflow/netflow_test.go +++ b/plugins/inputs/netflow/netflow_test.go @@ -158,6 +158,17 @@ func TestMissingTemplate(t *testing.T) { require.True(t, found, "warning not found") } +func TestWrongMapping(t *testing.T) { + var logger testutil.CaptureLogger + plugin := &NetFlow{ + ServiceAddress: "udp://127.0.0.1:0", + Protocol: "ipfix", + PENFiles: []string{"testcases/netflow_mapping.csv"}, + Log: &logger, + } + require.ErrorContains(t, plugin.Init(), "does not match pattern") +} + func TestCases(t *testing.T) { // Get all directories in testdata folders, err := os.ReadDir("testcases") diff --git a/plugins/inputs/netflow/sample.conf b/plugins/inputs/netflow/sample.conf index 81e78418e..b91849414 100644 --- a/plugins/inputs/netflow/sample.conf +++ b/plugins/inputs/netflow/sample.conf @@ -19,6 +19,11 @@ ## "sflow v5" -- sFlow v5 protocol # protocol = "ipfix" + ## Private Enterprise Numbers (PEN) mappings for decoding + ## This option allows to specify vendor-specific mapping files to use during + ## decoding. + # private_enterprise_number_files = [] + ## Dump incoming packets to the log ## This can be helpful to debug parsing issues. Only active if ## Telegraf is in debug mode. diff --git a/plugins/inputs/netflow/testcases/ipfix_pen_35632/expected.out b/plugins/inputs/netflow/testcases/ipfix_pen_35632/expected.out new file mode 100644 index 000000000..f016cf82b --- /dev/null +++ b/plugins/inputs/netflow/testcases/ipfix_pen_35632/expected.out @@ -0,0 +1,2 @@ +netflow,source=127.0.0.1,version=IPFIX app_latency_ms=0u,flow_end_ms=1684767922502u,ssl_unsafe_cipher=0u,src_mask=0u,dst_port=44400u,in_src_mac="00:50:56:b3:86:e7",src_tos="0x00",ja3c_hash="",server_nw_latency_ms=0u,l7_proto=37u,last_switched=22474460u,tcp_flags="........",ja3s_hash="",in_snmp=0u,flow_start_ms=1684767922502u,ssl_version=0u,client_nw_latency_ms=0u,out_dst_mac="00:50:56:b3:a7:f8",src="192.168.2.203",dst_mask=0u,next_hop="0.0.0.0",flow_end=1684767922u,ssl_cipher=0u,src_port=51413u,dst_tos="0x00",ip_version="IPv4",retransmitted_out_bytes=0u,in_bytes=122u,out_snmp=0u,protocol="udp",first_switched=22474460u,retransmitted_in_pkts=0u,dst="189.127.188.175",retransmitted_in_bytes=0u,flow_start=1684767922u,ssl_server_name="",retransmitted_out_pkts=0u,in_packets=1u 1684928292858572674 +netflow,source=127.0.0.1,version=IPFIX app_latency_ms=0u,out_dst_mac="00:50:56:b3:a7:f8",protocol="udp",ja3c_hash="",ssl_unsafe_cipher=0u,dst_mask=0u,retransmitted_in_pkts=0u,out_snmp=0u,flow_start_ms=1684767922502u,ssl_cipher=0u,l7_proto=37u,in_snmp=0u,retransmitted_in_bytes=0u,src_tos="0x00",last_switched=22474460u,ssl_version=0u,in_packets=1u,first_switched=22474460u,flow_end=1684767922u,src="192.168.2.203",retransmitted_out_pkts=0u,src_port=51413u,client_nw_latency_ms=0u,next_hop="0.0.0.0",dst="177.234.165.79",server_nw_latency_ms=0u,tcp_flags="........",flow_start=1684767922u,src_mask=0u,dst_port=47707u,ssl_server_name="",ip_version="IPv4",retransmitted_out_bytes=0u,dst_tos="0x00",in_bytes=86u,flow_end_ms=1684767922502u,ja3s_hash="",in_src_mac="00:50:56:b3:86:e7" 1684928292858831665 diff --git a/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-1.bin b/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-1.bin new file mode 100644 index 000000000..9b7dbd5e6 Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-1.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-2.bin b/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-2.bin new file mode 100644 index 000000000..7683df39d Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-2.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-3.bin b/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-3.bin new file mode 100644 index 000000000..42df8e129 Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-3.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-4.bin b/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-4.bin new file mode 100644 index 000000000..dc5cdf211 Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_pen_35632/message-4.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_pen_35632/telegraf.conf b/plugins/inputs/netflow/testcases/ipfix_pen_35632/telegraf.conf new file mode 100644 index 000000000..c7da2faf6 --- /dev/null +++ b/plugins/inputs/netflow/testcases/ipfix_pen_35632/telegraf.conf @@ -0,0 +1,3 @@ +[[inputs.netflow]] + service_address = "udp://127.0.0.1:0" + private_enterprise_number_files = ["mappings_ipfix_pen/ntop-35632.csv"] diff --git a/plugins/inputs/netflow/testcases/netflow_mapping.csv b/plugins/inputs/netflow/testcases/netflow_mapping.csv new file mode 100644 index 000000000..302cf4482 --- /dev/null +++ b/plugins/inputs/netflow/testcases/netflow_mapping.csv @@ -0,0 +1,7 @@ +58500,protocol_ntop,string +58503,l4_src_port_name,string +58507,l4_dst_port_name,string +58508,l4_srv_port,uint +58509,l4_srv_port_name,string +57552,src_fragments,uint +57553,dst_fragments,uint