diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index c19ec6eb1..9663db555 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -247,6 +247,7 @@ following works: - github.com/nats-io/nats.go [Apache License 2.0](https://github.com/nats-io/nats.go/blob/master/LICENSE) - github.com/nats-io/nkeys [Apache License 2.0](https://github.com/nats-io/nkeys/blob/master/LICENSE) - github.com/nats-io/nuid [Apache License 2.0](https://github.com/nats-io/nuid/blob/master/LICENSE) +- github.com/netsampler/goflow2 [BSD 3-Clause "New" or "Revised" License](https://github.com/netsampler/goflow2/blob/main/LICENSE) - github.com/newrelic/newrelic-telemetry-sdk-go [Apache License 2.0](https://github.com/newrelic/newrelic-telemetry-sdk-go/blob/master/LICENSE.md) - github.com/nsqio/go-nsq [MIT License](https://github.com/nsqio/go-nsq/blob/master/LICENSE) - github.com/olivere/elastic [MIT License](https://github.com/olivere/elastic/blob/release-branch.v7/LICENSE) diff --git a/go.mod b/go.mod index f460ba159..e9db1e7b6 100644 --- a/go.mod +++ b/go.mod @@ -123,6 +123,7 @@ require ( github.com/multiplay/go-ts3 v1.0.1 github.com/nats-io/nats-server/v2 v2.9.4 github.com/nats-io/nats.go v1.19.0 + github.com/netsampler/goflow2 v1.1.1 github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1 github.com/nsqio/go-nsq v1.1.0 github.com/olivere/elastic v6.2.37+incompatible @@ -436,8 +437,8 @@ require ( k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect lukechampine.com/uint128 v1.2.0 // indirect modernc.org/cc/v3 v3.40.0 // indirect - modernc.org/ccgo/v3 v3.16.13-0.20221017192402-261537637ce8 // indirect - modernc.org/libc v1.21.2 // indirect + modernc.org/ccgo/v3 v3.16.12 // indirect + modernc.org/libc v1.20.3 // indirect modernc.org/mathutil v1.5.0 // indirect modernc.org/memory v1.4.0 // indirect modernc.org/opt v0.1.3 // indirect diff --git a/go.sum b/go.sum index c87eec060..c31182c26 100644 --- a/go.sum +++ b/go.sum @@ -1972,6 +1972,8 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU= github.com/nbutton23/zxcvbn-go v0.0.0-20201221231540-e56b841a3c88/go.mod h1:KSVJerMDfblTH7p5MZaTt+8zaT2iEk3AkVb9PQdZuE8= github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= +github.com/netsampler/goflow2 v1.1.1 h1:GpVlvPq4yRbyzoiz0Vp3XilNr5js/0UhHcQI7Ol/MDk= +github.com/netsampler/goflow2 v1.1.1/go.mod h1:oNIeGj67SjwrRSTEukErjNT1zZ02W9+8M5mSgQCkZC8= github.com/networkplumbing/go-nft v0.2.0/go.mod h1:HnnM+tYvlGAsMU7yoYwXEVLLiDW9gdMmb5HoGcwpuQs= github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1 h1:6OX5VXMuj2salqNBc41eXKz6K+nV6OB/hhlGnAKCbwU= github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1/go.mod h1:2kY6OeOxrJ+RIQlVjWDc/pZlT3MIf30prs6drzMfJ6E= @@ -3778,12 +3780,12 @@ lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= modernc.org/cc/v3 v3.40.0 h1:P3g79IUS/93SYhtoeaHW+kRCIrYaxJ27MFPv+7kaTOw= modernc.org/cc/v3 v3.40.0/go.mod h1:/bTg4dnWkSXowUO6ssQKnOV0yMVxDYNIsIrzqTFDGH0= -modernc.org/ccgo/v3 v3.16.13-0.20221017192402-261537637ce8 h1:0+dsXf0zeLx9ixj4nilg6jKe5Bg1ilzBwSFq4kJmIUc= -modernc.org/ccgo/v3 v3.16.13-0.20221017192402-261537637ce8/go.mod h1:fUB3Vn0nVPReA+7IG7yZDfjv1TMWjhQP8gCxrFAtL5g= +modernc.org/ccgo/v3 v3.16.12 h1:gWAnL87wSqwM6EQ1a+36O9zMFjqx1FBj0p9rA4xbQCY= +modernc.org/ccgo/v3 v3.16.12/go.mod h1:fUB3Vn0nVPReA+7IG7yZDfjv1TMWjhQP8gCxrFAtL5g= modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk= modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM= -modernc.org/libc v1.21.2 h1:V053DgNSpAY+IPrO3XlWqrFKUiQqHyPqG4dsx42Ulck= -modernc.org/libc v1.21.2/go.mod h1:przBsL5RDOZajTVslkugzLBj1evTue36jEomFQOoYuI= +modernc.org/libc v1.20.3 h1:BodaDPuUse7taQchAClMmbE/yZp3T2ZBiwCDFyBLEXw= +modernc.org/libc v1.20.3/go.mod h1:ZRfIaEkgrYgZDl6pa4W39HgN5G/yDW+NRmNKZBDFrk0= modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= modernc.org/memory v1.4.0 h1:crykUfNSnMAXaOJnnxcSzbUGMqkLWjklJKkBK2nwZwk= diff --git a/plugins/inputs/all/netflow.go b/plugins/inputs/all/netflow.go new file mode 100644 index 000000000..9bcbfbd1f --- /dev/null +++ b/plugins/inputs/all/netflow.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.netlow + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/netflow" // register plugin diff --git a/plugins/inputs/netflow/README.md b/plugins/inputs/netflow/README.md new file mode 100644 index 000000000..c6523081f --- /dev/null +++ b/plugins/inputs/netflow/README.md @@ -0,0 +1,109 @@ +# Netflow Input Plugin + +The `netflow` plugin acts as a collector for Netflow v5, Netflow v9 and IPFIX +flow information. The Layer 4 protocol numbers are gathered from the +[official IANA assignments][IANA assignments]. +The internal field mappings for Netflow v5 fields are defined according to +[Cisco's Netflow v5 documentation][CISCO NF5], Netflow v9 fields are defined +according to [Cisco's Netflow v9 documentation][CISCO NF9] and the +[ASA extensions][ASA extensions]. +Definitions for IPFIX are according to [IANA assignement document][IPFIX doc]. + +[IANA assignments]: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml +[CISCO NF5]: https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html#wp1006186 +[CISCO NF9]: https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html +[ASA extensions]: https://www.cisco.com/c/en/us/td/docs/security/asa/special/netflow/asa_netflow.html +[IPFIX doc]: https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-nat-type + +## Configuration + +```toml @sample.conf +# Netflow v5, Netflow v9 and IPFIX collector +[[inputs.netflow]] + ## Address to listen for netflow/ipfix packets. + ## example: service_address = "udp://:2055" + ## service_address = "udp4://:2055" + ## service_address = "udp6://:2055" + service_address = "udp://:2055" + + ## Set the size of the operating system's receive buffer. + ## example: read_buffer_size = "64KiB" + ## Uses the system's default if not set. + # read_buffer_size = "" + + ## Protocol version to use for decoding. + ## Available options are + ## "netflow v5" -- Netflow v5 protocol + ## "netflow v9" -- Netflow v9 protocol (also works for IPFIX) + ## "ipfix" -- IPFIX / Netflow v10 protocol (also works for Netflow v9) + # protocol = "ipfix" + + ## Dump incoming packets to the log + ## This can be helpful to debug parsing issues. Only active if + ## Telegraf is in debug mode. + # dump_packets = false +``` + +## Metrics + +Metrics depend on the format used as well as on the information provided +by the exporter. Furthermore, proprietary information might be sent requiring +further decoding information. Most exporters should provide at least the +following information + +- netflow + - tags: + - source (IP of the exporter sending the data) + - version (flow protocol version) + - fields: + - src (IP address, address of the source of the packets) + - src_mask (uint64, mask for the IP address in bits) + - dst (IP address, address of the destination of the packets) + - dst_mask (uint64, mask for the IP address in bits) + - src_port (uint64, source port) + - dst_port (uint64, destination port) + - protocol (string, Layer 4 protocol name) + - in_bytes (uint64, number of incoming bytes) + - in_packets (uint64, number of incomming packets) + - tcp_flags (string, TCP flags for the flow) + +## Example Output + +The specific fields vary for the different protocol versions, here are some +examples + +### Netflow v5 + +```shell +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="140.82.121.3",src_port=443u,dst="192.168.119.100",dst_port=55516u,flows=8u,in_bytes=87477u,in_packets=78u,first_switched=86400660u,last_switched=86403316u,tcp_flags="...PA...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="140.82.121.6",src_port=443u,dst="192.168.119.100",dst_port=36408u,flows=8u,in_bytes=5009u,in_packets=21u,first_switched=86400447u,last_switched=86403267u,tcp_flags="...PA...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="140.82.112.22",src_port=443u,dst="192.168.119.100",dst_port=39638u,flows=8u,in_bytes=925u,in_packets=6u,first_switched=86400324u,last_switched=86403214u,tcp_flags="...PA...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="140.82.114.26",src_port=443u,dst="192.168.119.100",dst_port=49398u,flows=8u,in_bytes=250u,in_packets=2u,first_switched=86403131u,last_switched=86403362u,tcp_flags="...PA...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="192.168.119.100",src_port=55516u,dst="140.82.121.3",dst_port=443u,flows=8u,in_bytes=4969u,in_packets=37u,first_switched=86400652u,last_switched=86403269u,tcp_flags="...PA...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="192.168.119.100",src_port=36408u,dst="140.82.121.6",dst_port=443u,flows=8u,in_bytes=2736u,in_packets=21u,first_switched=86400438u,last_switched=86403258u,tcp_flags="...PA...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="192.168.119.100",src_port=39638u,dst="140.82.112.22",dst_port=443u,flows=8u,in_bytes=1560u,in_packets=6u,first_switched=86400225u,last_switched=86403255u,tcp_flags="...PA...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="192.168.119.100",src_port=49398u,dst="140.82.114.26",dst_port=443u,flows=8u,in_bytes=697u,in_packets=4u,first_switched=86403030u,last_switched=86403362u,tcp_flags="...PA...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +``` + +### Netflow v9 + +```shell +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="140.82.121.3",src_port=443u,dst="192.168.119.100",dst_port=55516u,in_bytes=87477u,in_packets=78u,flow_start_ms=1666350478660u,flow_end_ms=1666350481316u,tcp_flags="...PA...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="140.82.121.6",src_port=443u,dst="192.168.119.100",dst_port=36408u,in_bytes=5009u,in_packets=21u,flow_start_ms=1666350478447u,flow_end_ms=1666350481267u,tcp_flags="...PA...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="140.82.112.22",src_port=443u,dst="192.168.119.100",dst_port=39638u,in_bytes=925u,in_packets=6u,flow_start_ms=1666350478324u,flow_end_ms=1666350481214u,tcp_flags="...PA...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="140.82.114.26",src_port=443u,dst="192.168.119.100",dst_port=49398u,in_bytes=250u,in_packets=2u,flow_start_ms=1666350481131u,flow_end_ms=1666350481362u,tcp_flags="...PA...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",src_port=55516u,dst="140.82.121.3",dst_port=443u,in_bytes=4969u,in_packets=37u,flow_start_ms=1666350478652u,flow_end_ms=1666350481269u,tcp_flags="...PA...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",src_port=36408u,dst="140.82.121.6",dst_port=443u,in_bytes=2736u,in_packets=21u,flow_start_ms=1666350478438u,flow_end_ms=1666350481258u,tcp_flags="...PA...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",src_port=39638u,dst="140.82.112.22",dst_port=443u,in_bytes=1560u,in_packets=6u,flow_start_ms=1666350478225u,flow_end_ms=1666350481255u,tcp_flags="...PA...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",src_port=49398u,dst="140.82.114.26",dst_port=443u,in_bytes=697u,in_packets=4u,flow_start_ms=1666350481030u,flow_end_ms=1666350481362u,tcp_flags="...PA...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +``` + +### IPFIX + +```shell +netflow,source=127.0.0.1,version=IPFIX protocol="tcp",vlan_src=0u,src_tos="0x00",flow_end_ms=1666345513807u,src="192.168.119.100",dst="44.233.90.52",src_port=51008u,total_bytes_exported=0u,flow_end_reason="end of flow",flow_start_ms=1666345513807u,in_total_bytes=52u,in_total_packets=1u,dst_port=443u +netflow,source=127.0.0.1,version=IPFIX src_tos="0x00",src_port=54330u,rev_total_bytes_exported=0u,last_switched=9u,vlan_src=0u,flow_start_ms=1666345513807u,in_total_packets=1u,flow_end_reason="end of flow",flow_end_ms=1666345513816u,in_total_bytes=40u,dst_port=443u,src="192.168.119.100",dst="104.17.240.92",total_bytes_exported=0u,protocol="tcp" +netflow,source=127.0.0.1,version=IPFIX flow_start_ms=1666345513807u,flow_end_ms=1666345513977u,src="192.168.119.100",dst_port=443u,total_bytes_exported=0u,last_switched=170u,src_tos="0x00",in_total_bytes=40u,dst="44.233.90.52",src_port=51024u,protocol="tcp",flow_end_reason="end of flow",in_total_packets=1u,rev_total_bytes_exported=0u,vlan_src=0u +netflow,source=127.0.0.1,version=IPFIX src_port=58246u,total_bytes_exported=1u,flow_start_ms=1666345513806u,flow_end_ms=1666345513806u,in_total_bytes=156u,src="192.168.119.100",rev_total_bytes_exported=0u,last_switched=0u,flow_end_reason="forced end",dst="192.168.119.17",dst_port=53u,protocol="udp",in_total_packets=2u,vlan_src=0u,src_tos="0x00" +netflow,source=127.0.0.1,version=IPFIX protocol="udp",vlan_src=0u,src_port=58879u,dst_port=53u,flow_end_ms=1666345513832u,src_tos="0x00",src="192.168.119.100",total_bytes_exported=1u,rev_total_bytes_exported=0u,flow_end_reason="forced end",last_switched=33u,in_total_bytes=221u,in_total_packets=2u,flow_start_ms=1666345513799u,dst="192.168.119.17" +``` diff --git a/plugins/inputs/netflow/ipv4_options.csv b/plugins/inputs/netflow/ipv4_options.csv new file mode 100644 index 000000000..e5c7c163e --- /dev/null +++ b/plugins/inputs/netflow/ipv4_options.csv @@ -0,0 +1,27 @@ +Number,Name +0,EOOL +1,NOP +2,SEC +3,LSR +4,TS +5,E-SEC +6,CIPSO +7,RR +8,SID +9,SSR +10,ZSU +11,MTUP +12,MTUR +13,FINN +14,VISA +15,ENCODE +16,IMITD +17,EIP +18,TR +19,ADDEXT +20,RTRALT +21,SDB +23,DPS +24,UMP +25,QS +30,EXP diff --git a/plugins/inputs/netflow/layer4_protocol_numbers.csv b/plugins/inputs/netflow/layer4_protocol_numbers.csv new file mode 100644 index 000000000..67aff1881 --- /dev/null +++ b/plugins/inputs/netflow/layer4_protocol_numbers.csv @@ -0,0 +1,151 @@ +Decimal,Keyword +0,HOPOPT +1,ICMP +2,IGMP +3,GGP +4,IPv4 +5,ST +6,TCP +7,CBT +8,EGP +9,IGP +10,BBN-RCC-MON +11,NVP-II +12,PUP +13,ARGUS +14,EMCON +15,XNET +16,CHAOS +17,UDP +18,MUX +19,DCN-MEAS +20,HMP +21,PRM +22,XNS-IDP +23,TRUNK-1 +24,TRUNK-2 +25,LEAF-1 +26,LEAF-2 +27,RDP +28,IRTP +29,ISO-TP4 +30,NETBLT +31,MFE-NSP +32,MERIT-INP +33,DCCP +34,3PC +35,IDPR +36,XTP +37,DDP +38,IDPR-CMTP +39,TP++ +40,IL +41,IPv6 +42,SDRP +43,IPv6-Route +44,IPv6-Frag +45,IDRP +46,RSVP +47,GRE +48,DSR +49,BNA +50,ESP +51,AH +52,I-NLSP +53,SWIPE (deprecated) +54,NARP +55,MOBILE +56,TLSP +57,SKIP +58,IPv6-ICMP +59,IPv6-NoNxt +60,IPv6-Opts +61,host internal +62,CFTP +63,local network +64,SAT-EXPAK +65,KRYPTOLAN +66,RVD +67,IPPC +68,distributed FS +69,SAT-MON +70,VISA +71,IPCV +72,CPNX +73,CPHB +74,WSN +75,PVP +76,BR-SAT-MON +77,SUN-ND +78,WB-MON +79,WB-EXPAK +80,ISO-IP +81,VMTP +82,SECURE-VMTP +83,VINES +84,TTP +84,IPTM +85,NSFNET-IGP +86,DGP +87,TCF +88,EIGRP +89,OSPFIGP +90,Sprite-RPC +91,LARP +92,MTP +93,AX.25 +94,IPIP +95,MICP (deprecated) +96,SCC-SP +97,ETHERIP +98,ENCAP +99,private encryption scheme +100,GMTP +101,IFMP +102,PNNI +103,PIM +104,ARIS +105,SCPS +106,QNX +107,A/N +108,IPComp +109,SNP +110,Compaq-Peer +111,IPX-in-IP +112,VRRP +113,PGM +114,0-hop +115,L2TP +116,DDX +117,IATP +118,STP +119,SRP +120,UTI +121,SMP +122,SM (deprecated) +123,PTP +124,ISIS over IPv4 +125,FIRE +126,CRTP +127,CRUDP +128,SSCOPMCE +129,IPLT +130,SPS +131,PIPE +132,SCTP +133,FC +134,RSVP-E2E-IGNORE +135,Mobility Header +136,UDPLite +137,MPLS-in-IP +138,manet +139,HIP +140,Shim6 +141,WESP +142,ROHC +143,Ethernet +144,AGGFRAG +145-252, +253,experimental +254,experimental +255,Reserved diff --git a/plugins/inputs/netflow/netflow.go b/plugins/inputs/netflow/netflow.go new file mode 100644 index 000000000..20e44524e --- /dev/null +++ b/plugins/inputs/netflow/netflow.go @@ -0,0 +1,144 @@ +//go:generate ../../../tools/readme_config_includer/generator +package netflow + +import ( + _ "embed" + "encoding/hex" + "errors" + "fmt" + "net" + "net/url" + "strings" + "sync" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/inputs" +) + +//go:embed sample.conf +var sampleConfig string + +type protocolDecoder interface { + Init() error + Decode(net.IP, []byte) ([]telegraf.Metric, error) +} + +type NetFlow struct { + ServiceAddress string `toml:"service_address"` + ReadBufferSize config.Size `toml:"read_buffer_size"` + Protocol string `toml:"protocol"` + DumpPackets bool `toml:"dump_packets"` + Log telegraf.Logger `toml:"-"` + + conn *net.UDPConn + decoder protocolDecoder + wg sync.WaitGroup +} + +func (*NetFlow) SampleConfig() string { + return sampleConfig +} + +func (n *NetFlow) Init() error { + if n.ServiceAddress == "" { + return errors.New("service_address required") + } + u, err := url.Parse(n.ServiceAddress) + if err != nil { + return fmt.Errorf("invalid service address %q: %w", n.ServiceAddress, err) + } + switch u.Scheme { + case "udp", "udp4", "udp6": + default: + return fmt.Errorf("invalid scheme %q, should be 'udp', 'udp4' or 'udp6'", u.Scheme) + } + + switch strings.ToLower(n.Protocol) { + case "", "netflow v9", "ipfix": + n.decoder = &netflowDecoder{Log: n.Log} + case "netflow v5": + n.decoder = &netflowv5Decoder{} + default: + return fmt.Errorf("invalid protocol %q, only supports 'netflow v5', 'netflow v9' and 'ipfix'", n.Protocol) + } + return n.decoder.Init() +} + +func (n *NetFlow) Start(acc telegraf.Accumulator) error { + u, err := url.Parse(n.ServiceAddress) + if err != nil { + return err + } + addr, err := net.ResolveUDPAddr(u.Scheme, u.Host) + if err != nil { + return err + } + + conn, err := net.ListenUDP(u.Scheme, addr) + if err != nil { + return err + } + n.conn = conn + + if n.ReadBufferSize > 0 { + if err := conn.SetReadBuffer(int(n.ReadBufferSize)); err != nil { + return err + } + } + n.Log.Infof("Listening on %s://%s", n.conn.LocalAddr().Network(), n.conn.LocalAddr().String()) + + n.wg.Add(1) + go func() { + defer n.wg.Done() + n.read(acc) + }() + + return nil +} + +func (n *NetFlow) Stop() { + if n.conn != nil { + _ = n.conn.Close() + } + n.wg.Wait() +} + +func (n *NetFlow) read(acc telegraf.Accumulator) { + buf := make([]byte, 64*1024) // 64kB + for { + count, src, err := n.conn.ReadFromUDP(buf) + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + acc.AddError(err) + } + break + } + n.Log.Debugf("received %d bytes\n", count) + if count < 1 { + continue + } + if n.DumpPackets { + n.Log.Debugf("raw data: %s", hex.EncodeToString(buf[:count])) + } + metrics, err := n.decoder.Decode(src.IP, buf[:count]) + if err != nil { + acc.AddError(err) + continue + } + for _, m := range metrics { + acc.AddMetric(m) + } + } +} + +func (n *NetFlow) Gather(_ telegraf.Accumulator) error { + return nil +} + +// Register the plugin +func init() { + inputs.Add("netflow", func() telegraf.Input { + return &NetFlow{} + }) +} diff --git a/plugins/inputs/netflow/netflow_decoder.go b/plugins/inputs/netflow/netflow_decoder.go new file mode 100644 index 000000000..48c2f4551 --- /dev/null +++ b/plugins/inputs/netflow/netflow_decoder.go @@ -0,0 +1,709 @@ +package netflow + +import ( + "bytes" + "fmt" + "net" + "strconv" + "sync" + "time" + + "github.com/netsampler/goflow2/decoders/netflow" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +type fieldMapping struct { + name string + decoder func([]byte) interface{} +} + +// Default field mappings common for Netflow version 9 and IPFIX +// From documentations at +// - https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html +// - https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-information-elements +var fieldMappingsNetflowCommon = map[uint16][]fieldMapping{ + // 0: reserved + 1: {{"in_bytes", decodeUint}}, // IN_BYTES / octetDeltaCount + 2: {{"in_packets", decodeUint}}, // IN_PKTS / packetDeltaCount + 3: {{"flows", decodeUint}}, // FLOWS / deltaFlowCount + 4: {{"protocol", decodeL4Proto}}, // PROTOCOL / protocolIdentifier + 5: {{"src_tos", decodeHex}}, // SRC_TOS / ipClassOfService + 6: {{"tcp_flags", decodeTCPFlags}}, // TCP_FLAGS / tcpControlBits + 7: {{"src_port", decodeUint}}, // L4_SRC_PORT / sourceTransportPort + 8: {{"src", decodeIP}}, // IPV4_SRC_ADDR / sourceIPv4Address + 9: {{"src_mask", decodeUint}}, // SRC_MASK / sourceIPv4PrefixLength + 10: {{"in_snmp", decodeUint}}, // INPUT_SNMP / ingressInterface + 11: {{"dst_port", decodeUint}}, // L4_DST_PORT / destinationTransportPort + 12: {{"dst", decodeIP}}, // IPV4_DST_ADDR / destinationIPv4Address + 13: {{"dst_mask", decodeUint}}, // DST_MASK / destinationIPv4PrefixLength + 14: {{"out_snmp", decodeUint}}, // OUTPUT_SNMP / egressInterface + 15: {{"next_hop", decodeIP}}, // IPV4_NEXT_HOP / ipNextHopIPv4Address + 16: {{"bgp_src_as", decodeUint}}, // SRC_AS / bgpSourceAsNumber + 17: {{"bgp_dst_as", decodeUint}}, // DST_AS / bgpDestinationAsNumber + 18: {{"bgp_next_hop", decodeIP}}, // BGP_IPV4_NEXT_HOP / bgpNextHopIPv4Address + 19: {{"out_mcast_packets", decodeUint}}, // MUL_DST_PKTS / postMCastPacketDeltaCount + 20: {{"out_mcast_bytes", decodeUint}}, // MUL_DST_BYTES / postMCastOctetDeltaCount + 21: {{"last_switched", decodeUint}}, // LAST_SWITCHED / flowEndSysUpTime + 22: {{"first_switched", decodeUint}}, // FIRST_SWITCHED / flowStartSysUpTime + 23: {{"out_bytes", decodeUint}}, // OUT_BYTES / postOctetDeltaCount + 24: {{"out_packets", decodeUint}}, // OUT_PKTS / postPacketDeltaCount + 25: {{"min_packet_len", decodeUint}}, // MIN_PKT_LNGTH / minimumIpTotalLength + 26: {{"max_packet_len", decodeUint}}, // MAX_PKT_LNGTH / maximumIpTotalLength + 27: {{"src", decodeIP}}, // IPV6_SRC_ADDR / sourceIPv6Address + 28: {{"dst", decodeIP}}, // IPV6_DST_ADDR / destinationIPv6Address + 29: {{"src_mask", decodeUint}}, // IPV6_SRC_MASK / sourceIPv6PrefixLength + 30: {{"dst_mask", decodeUint}}, // IPV6_DST_MASK / destinationIPv6PrefixLength + 31: {{"flow_label", decodeHex}}, // IPV6_FLOW_LABEL / flowLabelIPv6 + 32: { + {"icmp_type", func(b []byte) interface{} { return b[0] }}, // ICMP_TYPE / icmpTypeCodeIPv4 + {"icmp_code", func(b []byte) interface{} { return b[1] }}, + }, + 33: {{"igmp_type", decodeUint}}, // MUL_IGMP_TYPE / igmpType + 34: {{"sampling_interval", decodeUint}}, // SAMPLING_INTERVAL / samplingInterval (deprecated) + 35: {{"sampling_algo", decodeSampleAlgo}}, // SAMPLING_ALGORITHM / samplingAlgorithm (deprecated) + 36: {{"flow_active_timeout", decodeUint}}, // FLOW_ACTIVE_TIMEOUT / flowActiveTimeout + 37: {{"flow_inactive_timeout", decodeUint}}, // FLOW_INACTIVE_TIMEOUT / flowIdleTimeout + 38: {{"engine_type", decodeEngineType}}, // ENGINE_TYPE / engineType (deprecated) + 39: {{"engine_id", decodeHex}}, // ENGINE_ID / engineId (deprecated) + 40: {{"total_bytes_exported", decodeUint}}, // TOTAL_BYTES_EXP / exportedOctetTotalCount + 41: {{"total_messages_exported", decodeUint}}, // TOTAL_PKTS_EXP / exportedMessageTotalCount + 42: {{"total_flows_exported", decodeUint}}, // TOTAL_FLOWS_EXP / exportedFlowRecordTotalCount + // 43: vendor proprietary / deprecated + 44: {{"ipv4_src_prefix", decodeIP}}, // IPV4_SRC_PREFIX / sourceIPv4Prefix + 45: {{"ipv4_dst_prefix", decodeIP}}, // IPV4_DST_PREFIX / destinationIPv4Prefix + 46: {{"mpls_top_label_type", decodeMPLSType}}, // MPLS_TOP_LABEL_TYPE / mplsTopLabelType + 47: {{"mpls_top_label_ip", decodeIP}}, // MPLS_TOP_LABEL_IP_ADDR / mplsTopLabelIPv4Address + 48: {{"flow_sampler_id", decodeUint}}, // FLOW_SAMPLER_ID / samplerId (deprecated) + 49: {{"flow_sampler_mode", decodeSampleAlgo}}, // FLOW_SAMPLER_MODE / samplerMode (deprecated) + 50: {{"flow_sampler_interval", decodeUint}}, // FLOW_SAMPLER_RANDOM_INTERVAL / samplerRandomInterval (deprecated) + // 51: vendor proprietary / deprecated + 52: {{"min_ttl", decodeUint}}, // MIN_TTL / minimumTTL + 53: {{"max_ttl", decodeUint}}, // MAX_TTL / maximumTTL + 54: {{"fragment_id", decodeHex}}, // IPV4_IDENT / fragmentIdentification + 55: {{"dst_tos", decodeHex}}, // DST_TOS / postIpClassOfService + 56: {{"in_src_mac", decodeHex}}, // IN_SRC_MAC / sourceMacAddress + 57: {{"out_dst_mac", decodeHex}}, // OUT_DST_MAC / postDestinationMacAddress + 58: {{"vlan_src", decodeUint}}, // SRC_VLAN / vlanId + 59: {{"vlan_dst", decodeUint}}, // DST_VLAN / postVlanId + 60: {{"ip_version", decodeIPVersion}}, // IP_PROTOCOL_VERSION / ipVersion + 61: {{"direction", decodeDirection}}, // DIRECTION / flowDirection + 62: {{"next_hop", decodeIP}}, // IPV6_NEXT_HOP / ipNextHopIPv6Address + 63: {{"bgp_next_hop", decodeIP}}, // BPG_IPV6_NEXT_HOP / bgpNextHopIPv6Address + 64: {{"ipv6_extensions", decodeHex}}, // IPV6_OPTION_HEADERS / ipv6ExtensionHeaders + // 65 - 69: vendor proprietary + 70: {{"mpls_label_1", decodeHex}}, // MPLS_LABEL_1 / mplsTopLabelStackSection + 71: {{"mpls_label_2", decodeHex}}, // MPLS_LABEL_2 / mplsLabelStackSection2 + 72: {{"mpls_label_3", decodeHex}}, // MPLS_LABEL_3 / mplsLabelStackSection3 + 73: {{"mpls_label_4", decodeHex}}, // MPLS_LABEL_4 / mplsLabelStackSection4 + 74: {{"mpls_label_5", decodeHex}}, // MPLS_LABEL_5 / mplsLabelStackSection5 + 75: {{"mpls_label_6", decodeHex}}, // MPLS_LABEL_6 / mplsLabelStackSection6 + 76: {{"mpls_label_7", decodeHex}}, // MPLS_LABEL_7 / mplsLabelStackSection7 + 77: {{"mpls_label_8", decodeHex}}, // MPLS_LABEL_8 / mplsLabelStackSection8 + 78: {{"mpls_label_9", decodeHex}}, // MPLS_LABEL_9 / mplsLabelStackSection9 + 79: {{"mpls_label_10", decodeHex}}, // MPLS_LABEL_10 / mplsLabelStackSection10 + 80: {{"in_dst_mac", decodeHex}}, // IN_DST_MAC / destinationMacAddress + 81: {{"out_src_mac", decodeHex}}, // OUT_SRC_MAC / postSourceMacAddress + 82: {{"interface", decodeString}}, // IF_NAME / interfaceName + 83: {{"interface_desc", decodeString}}, // IF_DESC / interfaceDescription + 84: {{"sampler_name", decodeString}}, // SAMPLER_NAME / samplerName + 85: {{"in_total_bytes", decodeUint}}, // IN_PERMANENT_BYTES / octetTotalCount + 86: {{"in_total_packets", decodeUint}}, // IN_PERMANENT_PKTS / packetTotalCount + // 87: vendor proprietary + 88: {{"fragment_offset", decodeUint}}, // FRAGMENT_OFFSET / fragmentOffset + 89: { + {"fwd_status", decodeFwdStatus}, // FORWARDING STATUS / forwardingStatus + {"fwd_reason", decodeFwdReason}, + }, + 90: {{"mpls_vpn_rd", decodeHex}}, // MPLS PAL RD / mplsVpnRouteDistinguisher + 91: {{"mpls_prefix_len", decodeUint}}, // MPLS PREFIX LEN / mplsTopLabelPrefixLength + 92: {{"src_traffic_index", decodeUint}}, // SRC TRAFFIC INDEX / srcTrafficIndex + 93: {{"dst_traffic_index", decodeUint}}, // DST TRAFFIC INDEX / dstTrafficIndex + 94: {{"app_desc", decodeString}}, // APPLICATION DESCRIPTION / applicationDescription + 95: {{"app_id", decodeHex}}, // APPLICATION TAG / applicationId + 96: {{"app_name", decodeString}}, // APPLICATION NAME / applicationName + // 97: undefined + 98: {{"out_dscp", decodeUint}}, // postipDiffServCodePoint / postIpDiffServCodePoint + 99: {{"replication_factor", decodeUint}}, // replication factor / multicastReplicationFactor + // 100: deprecated / className + 101: {{"classification_engine_id", decodeUint}}, // undefined / classificationEngineId + 102: {{"l2_packet_section_offset", decodeUint}}, // layer2packetSectionOffset + 103: {{"l2_packet_section_size", decodeUint}}, // layer2packetSectionSize + 104: {{"l2_packet_section_data", decodeHex}}, // layer2packetSectionData + // 105 - 127: reserved + + // Common between Netflow v9 ASA extension + // https://www.cisco.com/c/en/us/td/docs/security/asa/special/netflow/asa_netflow.html + // and IPFIX. + 148: {{"flow_id", decodeUint}}, // flowId + 152: {{"flow_start_ms", decodeUint}}, // NF_F_FLOW_CREATE_TIME_MSEC / flowStartMilliseconds + 153: {{"flow_end_ms", decodeUint}}, // NF_F_FLOW_END_TIME_MSEC / flowEndMilliseconds + 176: {{"icmp_type", decodeUint}}, // NF_F_ICMP_TYPE / icmpTypeIPv4 + 177: {{"icmp_code", decodeUint}}, // NF_F_ICMP_CODE / icmpCodeIPv4 + 178: {{"icmp_type", decodeUint}}, // NF_F_ICMP_TYPE_IPV6 / icmpTypeIPv6 + 179: {{"icmp_code", decodeUint}}, // NF_F_ICMP_CODE_IPV6 / icmpCodeIPv6 + 225: {{"xlat_src", decodeIP}}, // NF_F_XLATE_SRC_ADDR_IPV4 / postNATSourceIPv4Address + 226: {{"xlat_dst", decodeIP}}, // NF_F_XLATE_DST_ADDR_IPV4 / postNATDestinationIPv4Address + 227: {{"xlat_src_port", decodeUint}}, // NF_F_XLATE_SRC_PORT / postNAPTSourceTransportPort + 228: {{"xlat_dst_port", decodeUint}}, // NF_F_XLATE_DST_PORT / postNAPTDestinationTransportPort + 231: {{"initiator_bytes", decodeUint}}, // NF_F_FWD_FLOW_DELTA_BYTES / initiatorOctets + 232: {{"responder_bytes", decodeUint}}, // NF_F_REV_FLOW_DELTA_BYTES / responderOctets + 233: {{"fw_event", decodeFWEvent}}, // NF_F_FW_EVENT / firewallEvent + 281: {{"xlat_src", decodeIP}}, // NF_F_XLATE_SRC_ADDR_IPV6 / postNATSourceIPv6Address + 282: {{"xlat_dst", decodeIP}}, // NF_F_XLATE_DST_ADDR_IPV6 / postNATDestinationIPv6Address + 323: {{"event_time_ms", decodeUint}}, // NF_F_EVENT_TIME_MSEC / observationTimeMilliseconds + 324: {{"event_time_us", decodeUint}}, // NF_F_EVENT_TIME_USEC / observationTimeMicroseconds + 325: {{"event_time_ns", decodeUint}}, // NF_F_EVENT_TIME_NSEC / observationTimeNanoseconds +} + +// Default field mappings specific to Netflow version 9 +// From documentation at https://www.cisco.com/c/en/us/td/docs/security/asa/special/netflow/asa_netflow.html +var fieldMappingsNetflowV9 = map[uint16][]fieldMapping{ + 33000: {{"in_acl_id", decodeHex}}, // NF_F_INGRESS_ACL_ID + 33001: {{"out_acl_id", decodeHex}}, // NF_F_EGRESS_ACL_ID + 33002: {{"fw_event_ext", decodeHex}}, // NF_F_FW_EXT_EVENT + 40000: {{"username", decodeString}}, // NF_F_USERNAME +} + +// Default field mappings specific to Netflow version 9 +// From documentation at +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-information-elements +var fieldMappingsIPFIX = map[uint16][]fieldMapping{ + 128: {{"bgp_next_as", decodeUint}}, // bgpNextAdjacentAsNumber + 129: {{"bgp_prev_as", decodeUint}}, // bgpPrevAdjacentAsNumber + 130: {{"exporter", decodeIP}}, // exporterIPv4Address + 131: {{"exporter", decodeIP}}, // exporterIPv6Address + 132: {{"dropped_bytes", decodeUint}}, // droppedOctetDeltaCount + 133: {{"dropped_packets", decodeUint}}, // droppedPacketDeltaCount + 134: {{"dropped_bytes_total", decodeUint}}, // droppedOctetTotalCount + 135: {{"dropped_packets_total", decodeUint}}, // droppedPacketTotalCount + 136: {{"flow_end_reason", decodeFlowEndReason}}, // flowEndReason + 137: {{"common_properties_id", decodeUint}}, // commonPropertiesId + 138: {{"observation_point_id", decodeUint}}, // observationPointId + 139: { + {"icmp_type", func(b []byte) interface{} { return b[0] }}, // icmpTypeCodeIPv6 + {"icmp_code", func(b []byte) interface{} { return b[1] }}, + }, + 140: {{"mpls_top_label_ip", decodeIP}}, // mplsTopLabelIPv6Address + 141: {{"linecard_id", decodeUint}}, // lineCardId + 142: {{"port_id", decodeUint}}, // portId + 143: {{"metering_pid", decodeUint}}, // meteringProcessId + 144: {{"exporting_pid", decodeUint}}, // exportingProcessId + 145: {{"template_id", decodeUint}}, // templateId + 146: {{"wlan_channel", decodeUint}}, // wlanChannelId + 147: {{"wlan_ssid", decodeString}}, // wlanSSID + // 148: common + 149: {{"observation_domain_id", decodeUint}}, // observationDomainId + 150: {{"flow_start", decodeUint}}, // flowStartSeconds + // 151 - 152: common + 153: {{"flow_end_ms", decodeUint}}, // flowEndMilliseconds + 154: {{"flow_start_us", decodeUint}}, // flowStartMicroseconds + 155: {{"flow_end_us", decodeUint}}, // flowEndMicroseconds + 156: {{"flow_start_ns", decodeUint}}, // flowStartNanoseconds + 157: {{"flow_end_ns", decodeUint}}, // flowEndNanoseconds + 158: {{"flow_start_delta_us", decodeUint}}, // flowStartDeltaMicroseconds + 159: {{"flow_end_delta_us", decodeUint}}, // flowEndDeltaMicroseconds + 160: {{"system_init_ms", decodeUint}}, // systemInitTimeMilliseconds + 161: {{"flow_duration_ms", decodeUint}}, // flowDurationMilliseconds + 162: {{"flow_duration_us", decodeUint}}, // flowDurationMicroseconds + 163: {{"flow_count_total", decodeUint}}, // observedFlowTotalCount + 164: {{"ignored_packet_total", decodeUint}}, // ignoredPacketTotalCount + 165: {{"ignored_bytes_total", decodeUint}}, // ignoredOctetTotalCount + 166: {{"notsent_flow_count_total", decodeUint}}, // notSentFlowTotalCount + 167: {{"notsent_packet_total", decodeUint}}, // notSentPacketTotalCount + 168: {{"notsent_bytes_total", decodeUint}}, // notSentOctetTotalCount + 169: {{"ipv6_dst_prefix", decodeIP}}, // destinationIPv6Prefix + 170: {{"ipv6_src_prefix", decodeIP}}, // sourceIPv6Prefix + 171: {{"out_bytes_total", decodeUint}}, // postOctetTotalCount + 172: {{"out_packets_total", decodeUint}}, // postPacketTotalCount + 173: {{"flow_key_indicator", decodeHex}}, // flowKeyIndicator + 174: {{"out_mcast_packets_total", decodeUint}}, // postMCastPacketTotalCount + 175: {{"out_mcast_bytes_total", decodeUint}}, // postMCastOctetTotalCount + // 176 - 179: common + 180: {{"udp_src_port", decodeUint}}, // udpSourcePort + 181: {{"udp_dst_port", decodeUint}}, // udpDestinationPort + 182: {{"tcp_src_port", decodeUint}}, // tcpSourcePort + 183: {{"tcp_dst_port", decodeUint}}, // tcpDestinationPort + 184: {{"tcp_seq_number", decodeUint}}, // tcpSequenceNumber + 185: {{"tcp_ack_number", decodeUint}}, // tcpAcknowledgementNumber + 186: {{"tcp_window_size", decodeUint}}, // tcpWindowSize + 187: {{"tcp_urgent_ptr", decodeUint}}, // tcpUrgentPointer + 188: {{"tcp_header_len", decodeUint}}, // tcpHeaderLength + 189: {{"ip_header_len", decodeUint}}, // ipHeaderLength + 190: {{"ipv4_total_len", decodeUint}}, // totalLengthIPv4 + 191: {{"ipv6_payload_len", decodeUint}}, // payloadLengthIPv6 + 192: {{"ttl", decodeUint}}, // ipTTL + 193: {{"ipv6_next_header", decodeUint}}, // nextHeaderIPv6 + 194: {{"mpls_payload_len", decodeUint}}, // mplsPayloadLength + 195: {{"dscp", decodeUint}}, // ipDiffServCodePoint + 196: {{"precedence", decodeUint}}, // ipPrecedence + 197: {{"fragement_flags", decodeFragmentFlags}}, // fragmentFlags + 198: {{"bytes_sqr_sum", decodeUint}}, // octetDeltaSumOfSquares + 199: {{"bytes_sqr_sum_total", decodeUint}}, // octetTotalSumOfSquares + 200: {{"mpls_top_label_ttl", decodeUint}}, // mplsTopLabelTTL + 201: {{"mpls_stack_len", decodeUint}}, // mplsLabelStackLength + 202: {{"mpls_stack_depth", decodeUint}}, // mplsLabelStackDepth + 203: {{"mpls_top_label_exp", decodeUint}}, // mplsTopLabelExp + 204: {{"ip_payload_len", decodeUint}}, // ipPayloadLength + 205: {{"udp_msg_len", decodeUint}}, // udpMessageLength + 206: {{"mcast", decodeUint}}, // isMulticast + 207: {{"ipv4_inet_header_len", decodeUint}}, // ipv4IHL + 208: {{"ipv4_options", decodeIPv4Options}}, // ipv4Options + 209: {{"tcp_options", decodeHex}}, // tcpOptions + 210: {{"padding", decodeHex}}, // paddingOctets + 211: {{"collector", decodeIP}}, // collectorIPv4Address + 212: {{"collector", decodeIP}}, // collectorIPv6Address + 213: {{"export_interface", decodeUint}}, // exportInterface + 214: {{"export_proto_version", decodeUint}}, //exportProtocolVersion + 215: {{"export_transport_proto", decodeUint}}, //exportTransportProtocol + 216: {{"collector_transport_port", decodeUint}}, //collectorTransportPort + 217: {{"exporter_transport_port", decodeUint}}, //exporterTransportPort + 218: {{"tcp_syn_total", decodeUint}}, // tcpSynTotalCount + 219: {{"tcp_fin_total", decodeUint}}, // tcpFinTotalCount + 220: {{"tcp_rst_total", decodeUint}}, // tcpRstTotalCount + 221: {{"tcp_psh_total", decodeUint}}, // tcpPshTotalCount + 222: {{"tcp_ack_total", decodeUint}}, // tcpAckTotalCount + 223: {{"tcp_urg_total", decodeUint}}, // tcpUrgTotalCount + 224: {{"ip_total_len", decodeUint}}, // ipTotalLength + // 225 - 228: common + 229: {{"nat_origin_addr_realm", decodeUint}}, // natOriginatingAddressRealm + 230: {{"nat_event", decodeUint}}, // natEvent + // 231 - 233: common + 234: {{"in_vrf_id", decodeUint}}, // ingressVRFID + 235: {{"out_vrf_id", decodeUint}}, // egressVRFID + 236: {{"vrf_name", decodeString}}, // VRFname + 237: {{"out_mpls_top_label_exp", decodeUint}}, // postMplsTopLabelExp + 238: {{"tcp_window_scale", decodeUint}}, // tcpWindowScale + 239: {{"biflow_direction", decodeBiflowDirection}}, // biflowDirection + 240: {{"eth_header_len", decodeUint}}, // ethernetHeaderLength + 241: {{"eth_payload_len", decodeUint}}, // ethernetPayloadLength + 242: {{"eth_total_len", decodeUint}}, // ethernetTotalLength + 243: {{"vlan_id", decodeUint}}, // dot1qVlanId + 244: {{"vlan_priority", decodeUint}}, // dot1qPriority + 245: {{"vlan_customer_id", decodeUint}}, // dot1qCustomerVlanId + 246: {{"vlan_customer_priority", decodeUint}}, // dot1qCustomerPriority + 247: {{"metro_evc_id", decodeString}}, // metroEvcId + 248: {{"metro_evc_type", decodeUint}}, // metroEvcType + 249: {{"pseudo_wire_id", decodeUint}}, // pseudoWireId + 250: {{"pseudo_wire_type", decodeHex}}, // pseudoWireType + 251: {{"pseudo_wire_ctrl_word", decodeHex}}, // pseudoWireControlWord + 252: {{"in_phy_interface", decodeUint}}, // ingressPhysicalInterface + 253: {{"out_phy_interface", decodeUint}}, // egressPhysicalInterface + 254: {{"out_vlan_id", decodeUint}}, // postDot1qVlanId + 255: {{"out_vlan_customer_id", decodeUint}}, // postDot1qCustomerVlanId + 256: {{"eth_type", decodeHex}}, // ethernetType + 257: {{"out_precedence", decodeUint}}, // postIpPrecedence + 258: {{"collection_time_ms", decodeUint}}, // collectionTimeMilliseconds + 259: {{"export_sctp_stream_id", decodeUint}}, // exportSctpStreamId + 260: {{"max_export_time", decodeUint}}, // maxExportSeconds + 261: {{"max_flow_end_time", decodeUint}}, // maxFlowEndSeconds + 262: {{"msg_md5", decodeHex}}, // messageMD5Checksum + 263: {{"msg_scope", decodeUint}}, // messageScope + 264: {{"min_export_time", decodeUint}}, // minExportSeconds + 265: {{"min_flow_start_time", decodeUint}}, // minFlowStartSeconds + 266: {{"opaque_bytes", decodeUint}}, // opaqueOctets + 267: { /* MUST BE IGNORED according to standard */ }, // sessionScope + 268: {{"max_flow_end_time_us", decodeUint}}, // maxFlowEndMicroseconds + 269: {{"max_flow_end_time_ms", decodeUint}}, // maxFlowEndMilliseconds + 270: {{"max_flow_end_time_ns", decodeUint}}, // maxFlowEndNanoseconds + 271: {{"min_flow_start_time_us", decodeUint}}, // minFlowStartMicroseconds + 272: {{"min_flow_start_time_ms", decodeUint}}, // minFlowStartMilliseconds + 273: {{"min_flow_start_time_ns", decodeUint}}, // minFlowStartNanoseconds + 274: {{"collector_cert", decodeString}}, // collectorCertificate + 275: {{"exporter_cert", decodeString}}, // exporterCertificate + 276: {{"data_records_reliability", decodeBool}}, // dataRecordsReliability + 277: {{"observation_point_type", decodeOpsPointType}}, // observationPointType + 278: {{"connection_new_count", decodeUint}}, // newConnectionDeltaCount + 279: {{"connection_duration_sum", decodeUint}}, // connectionSumDurationSeconds + 280: {{"connection_transaction_id", decodeUint}}, // connectionTransactionId + // 281 - 282: common + 283: {{"nat_pool_id", decodeUint}}, // natPoolId + 284: {{"nat_pool_name", decodeString}}, // natPoolName + 285: { + {"anon_stability_class", decodeAnonStabilityClass}, // anonymizationFlags + {"anon_flags", decodeAnonFlags}, + }, + 286: {{"anon_technique", decodeAnonTechnique}}, // anonymizationTechnique + 287: {{"information_element", decodeUint}}, // informationElementIndex + 288: {{"p2p", decodeTechnology}}, // p2pTechnology + 289: {{"tunnel", decodeTechnology}}, // tunnelTechnology + 290: {{"encryption", decodeTechnology}}, // encryptedTechnology + 291: { /* IGNORED for parse-ability */ }, // basicList + 292: { /* IGNORED for parse-ability */ }, // subTemplateList + 293: { /* IGNORED for parse-ability */ }, // subTemplateMultiList + 294: {{"bgp_validity_state", decodeUint}}, // bgpValidityState + 295: {{"ipsec_spi", decodeUint}}, // IPSecSPI + 296: {{"gre_key", decodeUint}}, // greKey + 297: {{"nat_type", decodeIPNatType}}, // natType + 298: {{"initiator_packets", decodeUint}}, // initiatorPackets + 299: {{"responder_packets", decodeUint}}, // responderPackets + 300: {{"observation_domain_name", decodeString}}, // observationDomainName + 301: {{"observation_seq_id", decodeUint}}, // selectionSequenceId + 302: {{"selector_id", decodeUint}}, // selectorId + 303: {{"information_elem_id", decodeUint}}, // informationElementId + 304: {{"selector_algo", decodeSelectorAlgorithm}}, // selectorAlgorithm + 305: {{"sampling_packet_interval", decodeUint}}, // samplingPacketInterval + 306: {{"sampling_packet_space", decodeUint}}, // samplingPacketSpace + 307: {{"sampling_time_interval_us", decodeUint}}, // samplingTimeInterval + 308: {{"sampling_time_space_us", decodeUint}}, // samplingTimeSpace + 309: {{"sampling_size", decodeUint}}, // samplingSize + 310: {{"sampling_population", decodeUint}}, // samplingPopulation + 311: {{"sampling_probability", decodeFloat64}}, // samplingProbability + 312: {{"datalink_frame_size", decodeUint}}, // dataLinkFrameSize + 313: {{"ip_header_packet_section", decodeHex}}, // ipHeaderPacketSection + 314: {{"ip_payload_packet_section", decodeHex}}, // ipPayloadPacketSection + 315: {{"datalink_frame_section", decodeHex}}, // dataLinkFrameSection + 316: {{"mpls_label_stack_section", decodeHex}}, // mplsLabelStackSection + 317: {{"mpls_payload_packet_section", decodeHex}}, // mplsPayloadPacketSection + 318: {{"selector_total_packets_observed", decodeUint}}, // selectorIdTotalPktsObserved + 319: {{"selector_total_packets_selected", decodeUint}}, // selectorIdTotalPktsSelected + 320: {{"absolute_error", decodeFloat64}}, // absoluteError + 321: {{"relative_error", decodeFloat64}}, // relativeError + 322: {{"event_time", decodeUint}}, // observationTimeSeconds + // 323 - 325: common + 326: {{"hash_digest", decodeHex}}, // digestHashValue + 327: {{"hash_ip_payload_offset", decodeUint}}, // hashIPPayloadOffset + 328: {{"hash_ip_payload_size", decodeUint}}, // hashIPPayloadSize + 329: {{"hash_out_range_min", decodeUint}}, // hashOutputRangeMin + 330: {{"hash_out_range_max", decodeUint}}, // hashOutputRangeMax + 331: {{"hash_selected_range_min", decodeUint}}, // hashSelectedRangeMin + 332: {{"hash_selected_range_max", decodeUint}}, // hashSelectedRangeMax + 333: {{"hash_digest_out", decodeBool}}, // hashDigestOutput + 334: {{"hash_init_val", decodeUint}}, // hashInitialiserValue + 335: {{"selector_name", decodeString}}, // selectorName + 336: {{"upper_confidence_interval_limit", decodeFloat64}}, // upperCILimit + 337: {{"lower_confidence_interval_limit", decodeFloat64}}, // upperCILimit + 338: {{"confidence_level", decodeFloat64}}, // confidenceLevel + // 339 - 346: information element fields, do not map for now + 347: {{"virtual_station_interface_id", decodeHex}}, // virtualStationInterfaceId + 348: {{"virtual_station_interface_name", decodeString}}, // virtualStationInterfaceName + 349: {{"virtual_station_uuid", decodeHex}}, // virtualStationUUID + 350: {{"virtual_station_name", decodeString}}, // virtualStationName + 351: {{"l2_segment_id", decodeUint}}, // layer2SegmentId + 352: {{"l2_bytes", decodeUint}}, // layer2OctetDeltaCount + 353: {{"l2_bytes_total", decodeUint}}, // layer2OctetTotalCount + 354: {{"in_unicast_packets_total", decodeUint}}, // ingressUnicastPacketTotalCount + 355: {{"in_mcast_packets_total", decodeUint}}, // ingressMulticastPacketTotalCount + 356: {{"in_broadcast_packets_total", decodeUint}}, // ingressBroadcastPacketTotalCount + 357: {{"out_unicast_packets_total", decodeUint}}, // egressUnicastPacketTotalCount + 358: {{"out_broadcast_packets_total", decodeUint}}, // egressBroadcastPacketTotalCount + 359: {{"monitoring_interval_start_ms", decodeUint}}, // monitoringIntervalStartMilliSeconds + 360: {{"monitoring_interval_end_ms", decodeUint}}, // monitoringIntervalEndMilliSeconds + 361: {{"port_range_start", decodeUint}}, // portRangeStart + 362: {{"port_range_end", decodeUint}}, // portRangeEnd + 363: {{"port_range_step_size", decodeUint}}, // portRangeStepSize + 364: {{"port_range_ports", decodeUint}}, // portRangeNumPorts + 365: {{"station_mac", decodeHex}}, // staMacAddress + 366: {{"station", decodeIP}}, // staIPv4Address + 367: {{"wtp_mac", decodeHex}}, // wtpMacAddress + 368: {{"in_interface_type", decodeUint}}, // ingressInterfaceType + 369: {{"out_interface_type", decodeUint}}, // egressInterfaceType + 370: {{"rtp_seq_number", decodeUint}}, // rtpSequenceNumber + 371: {{"username", decodeString}}, // userName + 372: {{"app_category", decodeString}}, // applicationCategoryName + 373: {{"app_subcategory", decodeHex}}, // applicationSubCategoryName + 374: {{"app_group", decodeString}}, // applicationGroupName + 375: {{"flows_original_present", decodeUint}}, // originalFlowsPresent + 376: {{"flows_original_initiated", decodeUint}}, // originalFlowsInitiated + 377: {{"flows_original_completed", decodeUint}}, // originalFlowsCompleted + 378: {{"flow_src_ip_count", decodeUint}}, // distinctCountOfSourceIPAddress + 379: {{"flow_dst_ip_count", decodeUint}}, // distinctCountOfDestinationIPAddress + 380: {{"flow_src_ipv4_count", decodeUint}}, // distinctCountOfSourceIPv4Address + 381: {{"flow_dst_ipv4_count", decodeUint}}, // distinctCountOfDestinationIPv4Address + 382: {{"flow_src_ipv6_count", decodeUint}}, // distinctCountOfSourceIPv6Address + 383: {{"flow_dst_ipv6_count", decodeUint}}, // distinctCountOfDestinationIPv6Address + 384: {{"value_dist_method", decodeValueDistMethod}}, // valueDistributionMethod + 385: {{"rfc3550_jitter_ms", decodeUint}}, // rfc3550JitterMilliseconds + 386: {{"rfc3550_jitter_us", decodeUint}}, // rfc3550JitterMicroseconds + 387: {{"rfc3550_jitter_ns", decodeUint}}, // rfc3550JitterNanoseconds + 388: {{"vlan_dei", decodeBool}}, // dot1qDEI + 389: {{"vlan_customer_dei", decodeUint}}, // dot1qCustomerDEI + 390: {{"flow_selector_algo", decodeSelectorAlgorithm}}, // flowSelectorAlgorithm + 391: {{"flow_selected_byte_count", decodeUint}}, // flowSelectedOctetDeltaCount + 392: {{"flow_selected_packet_count", decodeUint}}, // flowSelectedOctetDeltaCount + 393: {{"flow_selected_count", decodeUint}}, // flowSelectedFlowDeltaCount + 394: {{"selector_id_flows_observed_total", decodeUint}}, // selectorIDTotalFlowsObserved + 395: {{"selector_id_flows_selected_total", decodeUint}}, // selectorIDTotalFlowsSelected + 396: {{"sampling_flow_interval_count", decodeUint}}, // samplingFlowInterval + 397: {{"sampling_flow_spacing_count", decodeUint}}, // samplingFlowSpacing + 398: {{"sampling_flow_interval_ms", decodeUint}}, // flowSamplingTimeInterval + 399: {{"sampling_flow_spacing_ms", decodeUint}}, // flowSamplingTimeSpacing + 400: {{"flow_domain_hash_element_id", decodeUint}}, // hashFlowDomain + 401: {{"transport_byte_count", decodeUint}}, // transportOctetDeltaCount + 402: {{"transport_packet_count", decodeUint}}, // transportPacketDeltaCount + 403: {{"exporter_original_ip", decodeUint}}, // originalExporterIPv4Address + 404: {{"exporter_original_ip", decodeUint}}, // originalExporterIPv6Address + 405: {{"exporter_original_domain", decodeHex}}, // originalObservationDomainId + 406: {{"intermediate_process_id", decodeHex}}, // intermediateProcessId + 407: {{"ignored_data_records_total", decodeUint}}, // ignoredDataRecordTotalCount + 408: {{"datalink_frame_type", decodeDataLinkFrameType}}, // dataLinkFrameType + 409: {{"section_offset", decodeUint}}, // sectionOffset + 410: {{"section_exported_bytes", decodeUint}}, // sectionExportedOctets + 411: {{"vlan_service_instance_tag", decodeHex}}, // dot1qServiceInstanceTag + 412: {{"vlan_service_instance_id", decodeUint}}, // dot1qServiceInstanceId + 413: {{"vlan_service_instance_priority", decodeUint}}, // dot1qServiceInstancePriority + 414: {{"vlan_customer_src_mac", decodeMAC}}, // dot1qCustomerSourceMacAddress + 415: {{"vlan_customer_dst_mac", decodeMAC}}, // dot1qCustomerDestinationMacAddress + // 416: deprecated + 417: {{"post_layer2_bytes", decodeUint}}, // postLayer2OctetDeltaCount + 418: {{"post_mcast_layer2_bytes", decodeUint}}, // postMCastLayer2OctetDeltaCount + // 419: deprecated + 420: {{"post_layer2_bytes_total", decodeUint}}, // postLayer2OctetTotalCount + 421: {{"post_mcast_layer2_bytes_total", decodeUint}}, // postMCastLayer2OctetTotalCount + 422: {{"min_layer2_total_length", decodeUint}}, // minimumLayer2TotalLength + 423: {{"max_layer2_total_length", decodeUint}}, // maximumLayer2TotalLength + 424: {{"dropped_layer2_bytes", decodeUint}}, // droppedLayer2OctetDeltaCount + 425: {{"dropped_layer2_bytes_total", decodeUint}}, // droppedLayer2OctetTotalCount + 426: {{"ignored_layer2_bytes_total", decodeUint}}, // ignoredLayer2OctetTotalCount + 427: {{"not_sent_layer2_bytes_total", decodeUint}}, // notSentLayer2OctetTotalCount + 428: {{"layer2_bytes_sumsqr", decodeUint}}, // layer2OctetDeltaSumOfSquares + 429: {{"layer2_bytes_total_sumsqr", decodeUint}}, // layer2OctetTotalSumOfSquares + 430: {{"layer2_frames", decodeUint}}, // layer2FrameDeltaCount + 431: {{"layer2_frames_total", decodeUint}}, // layer2FrameTotalCount + 432: {{"pseudo_wire_dst", decodeIP}}, // pseudoWireDestinationIPv4Address + 433: {{"ignored_layer2_frames_total", decodeUint}}, // ignoredLayer2FrameTotalCount + 434: {{"mib_obj_value_int", decodeInt32}}, // mibObjectValueInteger + 435: {{"mib_obj_value_str", decodeString}}, // mibObjectValueOctetString + 436: {{"mib_obj_value_oid", decodeHex}}, // mibObjectValueOID + 437: {{"mib_obj_value_bits", decodeHex}}, // mibObjectValueBits + 438: {{"mib_obj_value_ip", decodeIP}}, // mibObjectValueIPAddress + 439: {{"mib_obj_value_counter", decodeUint}}, // mibObjectValueCounter + 440: {{"mib_obj_value_gauge", decodeUint}}, // mibObjectValueGauge + 441: {{"mib_obj_value_time", decodeUint}}, // mibObjectValueTimeTicks + 442: {{"mib_obj_value_uint", decodeUint}}, // mibObjectValueUnsigned + 443: {{"mib_obj_value_table", decodeHex}}, // mibObjectValueTable + 444: {{"mib_obj_value_row", decodeHex}}, // mibObjectValueRow + 445: {{"mib_oid", decodeHex}}, // mibObjectIdentifier + 446: {{"mib_sub_id", decodeUint}}, // mibSubIdentifier + 447: {{"mib_index_indicator", decodeHex}}, // mibIndexIndicator + 448: {{"mib_capture_time_semantics", decodeCaptureTimeSemantics}}, // mibCaptureTimeSemantics + 449: {{"mib_context_engine_id", decodeHex}}, // mibContextEngineID + 450: {{"mib_context_name", decodeString}}, // mibContextName + 451: {{"mib_obj_name", decodeString}}, // mibObjectName + 452: {{"mib_obj_desc", decodeString}}, // mibObjectDescription + 453: {{"mib_obj_syntax", decodeString}}, // mibObjectSyntax + 454: {{"mib_module_name", decodeString}}, // mibModuleName + 455: {{"imsi", decodeString}}, // mobileIMSI + 456: {{"msisdn", decodeString}}, // mobileMSISDN + 457: {{"http_status_code", decodeUint}}, // httpStatusCode + 458: {{"src_transport_port_limit", decodeUint}}, // sourceTransportPortsLimit + 459: {{"http_request_method", decodeString}}, // httpRequestMethod + 460: {{"http_request_host", decodeString}}, // httpRequestHost + 461: {{"http_request_target", decodeString}}, // httpRequestTarget + 462: {{"http_msg_version", decodeString}}, // httpMessageVersion + 463: {{"nat_instance_id", decodeUint}}, // natInstanceID + 464: {{"internal_addr_realm", decodeHex}}, // internalAddressRealm + 465: {{"external_addr_realm", decodeHex}}, // externalAddressRealm + 466: {{"nat_quota_exceeded_event", decodeUint}}, // natQuotaExceededEvent + 467: {{"nat_threshold_event", decodeUint}}, // natThresholdEvent + 468: {{"http_user_agent", decodeString}}, // httpUserAgent + 469: {{"http_content_type", decodeString}}, // httpContentType + 470: {{"http_reason_phrase", decodeString}}, // httpReasonPhrase + 471: {{"max_session_entries", decodeUint}}, // maxSessionEntries + 472: {{"max_bib_entries", decodeUint}}, // maxBIBEntries + 473: {{"max_entries_per_user", decodeUint}}, // maxEntriesPerUser + 474: {{"max_subscribers", decodeUint}}, // maxSubscribers + 475: {{"max_fragments_pending_reassembly", decodeUint}}, // maxFragmentsPendingReassembly + 476: {{"addr_pool_threshold_high", decodeUint}}, // addressPoolHighThreshold + 477: {{"addr_pool_threshold_low", decodeUint}}, // addressPoolLowThreshold + 478: {{"addr_port_mapping_threshold_high", decodeUint}}, // addressPortMappingHighThreshold + 479: {{"addr_port_mapping_threshold_low", decodeUint}}, // addressPortMappingLowThreshold + 480: {{"addr_port_mapping_per_user_threshold_high", decodeUint}}, // addressPortMappingPerUserHighThreshold + 481: {{"global_addr_mapping_threshold_high", decodeUint}}, // globalAddressMappingHighThreshold + 482: {{"vpn_identifier", decodeIP}}, // vpnIdentifier + 483: {{"bgp_community", decodeUint}}, // bgpCommunity + 484: {{"bgp_src_community_list", decodeHex}}, // bgpSourceCommunityList + 485: {{"bgp_dst_community_list", decodeHex}}, // bgpDestinationCommunityList + 486: {{"bgp_extended_community", decodeHex}}, // bgpExtendedCommunity + 487: {{"bgp_src_extended_community_list", decodeHex}}, // bgpSourceExtendedCommunityList + 488: {{"bgp_dst_extended_community_list", decodeHex}}, // bgpDestinationExtendedCommunityList + 489: {{"bgp_large_community", decodeHex}}, // bgpLargeCommunity + 490: {{"bgp_src_large_community_list", decodeHex}}, // bgpSourceLargeCommunityList + 491: {{"bgp_dst_large_community_list", decodeHex}}, // bgpDestinationLargeCommunityList +} + +// Decoder structure +type netflowDecoder struct { + Log telegraf.Logger + + templates map[string]*netflow.BasicTemplateSystem + mappingsV9 map[uint16]fieldMapping + mappingsIPFIX map[uint16]fieldMapping + + sync.Mutex +} + +func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric, error) { + var metrics []telegraf.Metric + + t := time.Now() + src := srcIP.String() + + // Prepare the templates used to decode the messages + d.Lock() + if _, ok := d.templates[src]; !ok { + d.templates[src] = netflow.CreateTemplateSystem() + } + templates := d.templates[src] + d.Unlock() + + // Decode the overall message + buf := bytes.NewBuffer(payload) + packet, err := netflow.DecodeMessage(buf, templates) + if err != nil { + return nil, err + } + + // Extract metrics + switch msg := packet.(type) { + case netflow.NFv9Packet: + for _, flowsets := range msg.FlowSets { + switch fs := flowsets.(type) { + case netflow.TemplateFlowSet: + case netflow.NFv9OptionsTemplateFlowSet: + case netflow.OptionsDataFlowSet: + case netflow.DataFlowSet: + for _, record := range fs.Records { + tags := map[string]string{ + "source": src, + "version": "NetFlowV9", + } + fields := make(map[string]interface{}) + for _, value := range record.Values { + for _, field := range d.decodeValueV9(value) { + fields[field.Key] = field.Value + } + } + metrics = append(metrics, metric.New("netflow", tags, fields, t)) + } + } + } + case netflow.IPFIXPacket: + for _, flowsets := range msg.FlowSets { + switch fs := flowsets.(type) { + case netflow.TemplateFlowSet: + case netflow.IPFIXOptionsTemplateFlowSet: + case netflow.OptionsDataFlowSet: + case netflow.DataFlowSet: + for _, record := range fs.Records { + tags := map[string]string{ + "source": srcIP.String(), + "version": "IPFIX", + } + fields := make(map[string]interface{}) + t := time.Now() + for _, value := range record.Values { + for _, field := range d.decodeValueIPFIX(value) { + fields[field.Key] = field.Value + } + } + metrics = append(metrics, metric.New("netflow", tags, fields, t)) + } + } + } + default: + return nil, fmt.Errorf("invalid message of type %T", packet) + } + + return metrics, nil +} + +func (d *netflowDecoder) Init() error { + if err := initL4ProtoMapping(); err != nil { + return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err) + } + if err := initIPv4OptionMapping(); err != nil { + return fmt.Errorf("initializing IPv4 options mapping failed: %w", err) + } + + d.templates = make(map[string]*netflow.BasicTemplateSystem) + d.mappingsV9 = make(map[uint16]fieldMapping) + d.mappingsIPFIX = make(map[uint16]fieldMapping) + + return nil +} + +func (d *netflowDecoder) decodeValueV9(field netflow.DataField) []telegraf.Field { + raw := field.Value.([]byte) + + // Check the user-specified mapping + if m, found := d.mappingsV9[field.Type]; found { + return []telegraf.Field{{Key: m.name, Value: m.decoder(raw)}} + } + + // Check the version specific default field mappings + if mappings, found := fieldMappingsNetflowV9[field.Type]; found { + var fields []telegraf.Field + for _, m := range mappings { + fields = append(fields, telegraf.Field{ + Key: m.name, + Value: m.decoder(raw), + }) + } + return fields + } + + // Check the common default field mappings + if mappings, found := fieldMappingsNetflowCommon[field.Type]; found { + var fields []telegraf.Field + for _, m := range mappings { + fields = append(fields, telegraf.Field{ + Key: m.name, + Value: m.decoder(raw), + }) + } + return fields + } + + // Return the raw data if no mapping was found + d.Log.Debugf("unknown data field %v", field) + name := "type_" + strconv.FormatUint(uint64(field.Type), 10) + return []telegraf.Field{{Key: name, Value: decodeHex(raw)}} +} + +func (d *netflowDecoder) decodeValueIPFIX(field netflow.DataField) []telegraf.Field { + raw := field.Value.([]byte) + + // Checking for reverse elements according to RFC5103 + var prefix string + elementID := field.Type + if field.Type&0x4000 != 0 { + prefix = "rev_" + elementID = field.Type & (0x4000 ^ 0xffff) + } + + // Check the user-specified mapping + if m, found := d.mappingsIPFIX[elementID]; found { + return []telegraf.Field{{Key: prefix + m.name, Value: m.decoder(raw)}} + } + + // Check the version specific default field mappings + if mappings, found := fieldMappingsIPFIX[elementID]; found { + var fields []telegraf.Field + for _, m := range mappings { + fields = append(fields, telegraf.Field{ + Key: prefix + m.name, + Value: m.decoder(raw), + }) + } + return fields + } + + // Check the common default field mappings + if mappings, found := fieldMappingsNetflowCommon[elementID]; found { + var fields []telegraf.Field + for _, m := range mappings { + fields = append(fields, telegraf.Field{ + Key: prefix + m.name, + Value: m.decoder(raw), + }) + } + return fields + } + + // Return the raw data if no mapping was found + d.Log.Debugf("unknown data field %v", field) + name := "type_" + strconv.FormatUint(uint64(field.Type), 10) + return []telegraf.Field{{Key: name, Value: decodeHex(raw)}} +} diff --git a/plugins/inputs/netflow/netflow_test.go b/plugins/inputs/netflow/netflow_test.go new file mode 100644 index 000000000..f3ba0f3ff --- /dev/null +++ b/plugins/inputs/netflow/netflow_test.go @@ -0,0 +1,240 @@ +package netflow + +import ( + "fmt" + "net" + "os" + "path/filepath" + "sort" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestInit(t *testing.T) { + tests := []struct { + name string + address string + protocol string + errmsg string + }{ + { + name: "Netflow v5", + address: "udp://:2055", + protocol: "netflow v5", + }, + { + name: "Netflow v5 (uppercase)", + address: "udp://:2055", + protocol: "Netflow v5", + }, + { + name: "Netflow v9", + address: "udp://:2055", + protocol: "netflow v9", + }, + { + name: "Netflow v9 (uppercase)", + address: "udp://:2055", + protocol: "Netflow v9", + }, + { + name: "IPFIX", + address: "udp://:2055", + protocol: "ipfix", + }, + { + name: "IPFIX (uppercase)", + address: "udp://:2055", + protocol: "IPFIX", + }, + { + name: "invalid protocol", + address: "udp://:2055", + protocol: "foo", + errmsg: "invalid protocol", + }, + { + name: "UDP", + address: "udp://:2055", + protocol: "netflow v5", + }, + { + name: "UDP4", + address: "udp4://:2055", + protocol: "netflow v5", + }, + { + name: "UDP6", + address: "udp6://:2055", + protocol: "netflow v5", + }, + { + name: "empty service address", + address: "", + protocol: "netflow v5", + errmsg: "service_address required", + }, + { + name: "invalid address scheme", + address: "tcp://:2055", + protocol: "netflow v5", + errmsg: "invalid scheme", + }, + { + name: "invalid service address", + address: "udp://198.168.1.290:la", + protocol: "netflow v5", + errmsg: "invalid service address", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &NetFlow{ + ServiceAddress: tt.address, + Protocol: tt.protocol, + Log: testutil.Logger{}, + } + err := plugin.Init() + if tt.errmsg != "" { + require.ErrorContains(t, err, tt.errmsg) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestCases(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + // Register the plugin + inputs.Add("netflow", func() telegraf.Input { + return &NetFlow{} + }) + + // Prepare the influx parser for expectations + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + testcasePath := filepath.Join("testcases", f.Name()) + configFilename := filepath.Join(testcasePath, "telegraf.conf") + inputFiles := filepath.Join(testcasePath, "*.bin") + expectedFilename := filepath.Join(testcasePath, "expected.out") + expectedErrorFilename := filepath.Join(testcasePath, "expected.err") + + // Compare options + options := []cmp.Option{ + testutil.IgnoreTime(), + testutil.SortMetrics(), + } + + t.Run(f.Name(), func(t *testing.T) { + // Read the input data + var messages [][]byte + matches, err := filepath.Glob(inputFiles) + require.NoError(t, err) + require.NotEmpty(t, matches) + sort.Strings(matches) + for _, fn := range matches { + m, err := os.ReadFile(fn) + require.NoError(t, err) + messages = append(messages, m) + } + + // Read the expected output if any + var expected []telegraf.Metric + if _, err := os.Stat(expectedFilename); err == nil { + var err error + expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser) + require.NoError(t, err) + } + + // Read the expected output if any + var expectedErrors []string + if _, err := os.Stat(expectedErrorFilename); err == nil { + var err error + expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename) + require.NoError(t, err) + require.NotEmpty(t, expectedErrors) + } + + // Configure the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(configFilename)) + require.Len(t, cfg.Inputs, 1) + + // Setup and start the plugin + var acc testutil.Accumulator + plugin := cfg.Inputs[0].Input.(*NetFlow) + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Create a client without TLS + addr := plugin.conn.LocalAddr() + client, err := createClient(plugin.ServiceAddress, addr) + require.NoError(t, err) + + // Write the given sequence + for i, msg := range messages { + _, err := client.Write(msg) + require.NoErrorf(t, err, "writing message from %q failed: %v", matches[i], err) + } + require.NoError(t, client.Close()) + + getNErrors := func() int { + acc.Lock() + defer acc.Unlock() + return len(acc.Errors) + } + require.Eventuallyf(t, func() bool { + return getNErrors() >= len(expectedErrors) + }, 3*time.Second, 100*time.Millisecond, "did not receive errors (%d/%d)", getNErrors(), len(expectedErrors)) + + require.Lenf(t, acc.Errors, len(expectedErrors), "got errors: %v", acc.Errors) + sort.SliceStable(acc.Errors, func(i, j int) bool { + return acc.Errors[i].Error() < acc.Errors[j].Error() + }) + for i, err := range acc.Errors { + require.ErrorContains(t, err, expectedErrors[i]) + } + + require.Eventuallyf(t, func() bool { + acc.Lock() + defer acc.Unlock() + return acc.NMetrics() >= uint64(len(expected)) + }, 3*time.Second, 100*time.Millisecond, "did not receive metrics (%d/%d)", acc.NMetrics(), len(expected)) + + // Check the metric nevertheless as we might get some metrics despite errors. + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, options...) + }) + } +} + +func createClient(endpoint string, addr net.Addr) (net.Conn, error) { + // Determine the protocol in a crude fashion + parts := strings.SplitN(endpoint, "://", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid endpoint %q", endpoint) + } + protocol := parts[0] + return net.Dial(protocol, addr.String()) +} diff --git a/plugins/inputs/netflow/netflow_v5.go b/plugins/inputs/netflow/netflow_v5.go new file mode 100644 index 000000000..2cb7eb716 --- /dev/null +++ b/plugins/inputs/netflow/netflow_v5.go @@ -0,0 +1,77 @@ +package netflow + +import ( + "bytes" + "fmt" + "net" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/netsampler/goflow2/decoders/netflowlegacy" +) + +// Decoder structure +type netflowv5Decoder struct{} + +func (d *netflowv5Decoder) Init() error { + if err := initL4ProtoMapping(); err != nil { + return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err) + } + return nil +} + +func (d *netflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric, error) { + src := srcIP.String() + + // Decode the message + buf := bytes.NewBuffer(payload) + packet, err := netflowlegacy.DecodeMessage(buf) + if err != nil { + return nil, err + } + + // Extract metrics + msg, ok := packet.(netflowlegacy.PacketNetFlowV5) + if !ok { + return nil, fmt.Errorf("unexpected message type %T", packet) + } + + t := time.Unix(int64(msg.UnixSecs), int64(msg.UnixNSecs)) + metrics := make([]telegraf.Metric, 0, len(msg.Records)) + for _, record := range msg.Records { + tags := map[string]string{ + "source": src, + "version": "NetFlowV5", + } + fields := map[string]interface{}{ + "flows": msg.Count, + "sys_uptime": msg.SysUptime, + "seq_number": msg.FlowSequence, + "engine_type": mapEngineType(msg.EngineType), + "engine_id": decodeHex([]byte{msg.EngineId}), + "sampling_interval": msg.SamplingInterval, + "src": decodeIPFromUint32(record.SrcAddr), + "dst": decodeIPFromUint32(record.DstAddr), + "next_hop": decodeIPFromUint32(record.NextHop), + "in_snmp": record.Input, + "out_snmp": record.Output, + "in_packets": record.DPkts, + "in_bytes": record.DOctets, + "first_switched": record.First, + "last_switched": record.Last, + "src_port": record.SrcPort, + "dst_port": record.DstPort, + "tcp_flags": mapTCPFlags(record.TCPFlags), + "protocol": mapL4Proto(record.Proto), + "src_tos": decodeHex([]byte{record.Tos}), + "bgp_src_as": record.SrcAS, + "bgp_dst_as": record.DstAS, + "src_mask": record.SrcMask, + "dst_mask": record.DstMask, + } + metrics = append(metrics, metric.New("netflow", tags, fields, t)) + } + + return metrics, nil +} diff --git a/plugins/inputs/netflow/sample.conf b/plugins/inputs/netflow/sample.conf new file mode 100644 index 000000000..b8e3ea1c0 --- /dev/null +++ b/plugins/inputs/netflow/sample.conf @@ -0,0 +1,24 @@ +# Netflow v5, Netflow v9 and IPFIX collector +[[inputs.netflow]] + ## Address to listen for netflow/ipfix packets. + ## example: service_address = "udp://:2055" + ## service_address = "udp4://:2055" + ## service_address = "udp6://:2055" + service_address = "udp://:2055" + + ## Set the size of the operating system's receive buffer. + ## example: read_buffer_size = "64KiB" + ## Uses the system's default if not set. + # read_buffer_size = "" + + ## Protocol version to use for decoding. + ## Available options are + ## "netflow v5" -- Netflow v5 protocol + ## "netflow v9" -- Netflow v9 protocol (also works for IPFIX) + ## "ipfix" -- IPFIX / Netflow v10 protocol (also works for Netflow v9) + # protocol = "ipfix" + + ## Dump incoming packets to the log + ## This can be helpful to debug parsing issues. Only active if + ## Telegraf is in debug mode. + # dump_packets = false diff --git a/plugins/inputs/netflow/testcases/ipfix_example/expected.out b/plugins/inputs/netflow/testcases/ipfix_example/expected.out new file mode 100644 index 000000000..a367795c4 --- /dev/null +++ b/plugins/inputs/netflow/testcases/ipfix_example/expected.out @@ -0,0 +1,29 @@ +netflow,source=127.0.0.1,version=IPFIX protocol="tcp",vlan_src=0u,src_tos="0x00",flow_end_ms=1666345513807u,src="192.168.119.100",dst="44.233.90.52",src_port=51008u,total_bytes_exported=0u,flow_end_reason="end of flow",flow_start_ms=1666345513807u,in_total_bytes=52u,in_total_packets=1u,dst_port=443u +netflow,source=127.0.0.1,version=IPFIX src_tos="0x00",src_port=54330u,rev_total_bytes_exported=0u,last_switched=9u,vlan_src=0u,flow_start_ms=1666345513807u,in_total_packets=1u,flow_end_reason="end of flow",flow_end_ms=1666345513816u,in_total_bytes=40u,dst_port=443u,src="192.168.119.100",dst="104.17.240.92",total_bytes_exported=0u,protocol="tcp" +netflow,source=127.0.0.1,version=IPFIX flow_start_ms=1666345513807u,flow_end_ms=1666345513977u,src="192.168.119.100",dst_port=443u,total_bytes_exported=0u,last_switched=170u,src_tos="0x00",in_total_bytes=40u,dst="44.233.90.52",src_port=51024u,protocol="tcp",flow_end_reason="end of flow",in_total_packets=1u,rev_total_bytes_exported=0u,vlan_src=0u +netflow,source=127.0.0.1,version=IPFIX src_port=58246u,total_bytes_exported=1u,flow_start_ms=1666345513806u,flow_end_ms=1666345513806u,in_total_bytes=156u,src="192.168.119.100",rev_total_bytes_exported=0u,last_switched=0u,flow_end_reason="forced end",dst="192.168.119.17",dst_port=53u,protocol="udp",in_total_packets=2u,vlan_src=0u,src_tos="0x00" +netflow,source=127.0.0.1,version=IPFIX protocol="udp",vlan_src=0u,src_port=58879u,dst_port=53u,flow_end_ms=1666345513832u,src_tos="0x00",src="192.168.119.100",total_bytes_exported=1u,rev_total_bytes_exported=0u,flow_end_reason="forced end",last_switched=33u,in_total_bytes=221u,in_total_packets=2u,flow_start_ms=1666345513799u,dst="192.168.119.17" +netflow,source=127.0.0.1,version=IPFIX in_total_packets=2u,dst="192.168.119.17",last_switched=0u,in_total_bytes=522u,flow_end_reason="forced end",flow_start_ms=1666345514150u,src_tos="0x00",flow_end_ms=1666345514167u,src="192.168.119.100",src_port=56439u,dst_port=53u,total_bytes_exported=1u,rev_total_bytes_exported=0u,protocol="udp",vlan_src=0u +netflow,source=127.0.0.1,version=IPFIX in_total_packets=68u,last_switched=18u,in_total_bytes=70228u,dst="34.149.140.181",src_tos="0x00",flow_start_ms=1666345513832u,rev_total_bytes_exported=0u,protocol="udp",vlan_src=0u,total_bytes_exported=0u,src="192.168.119.100",src_port=57795u,dst_port=443u,flow_end_reason="forced end",flow_end_ms=1666345514328u +netflow,source=127.0.0.1,version=IPFIX in_total_packets=4u,src="192.168.119.100",dst="239.255.255.250",src_port=57622u,protocol="udp",vlan_src=0u,src_tos="0x00",flow_start_ms=1666345512753u,flow_end_ms=1666345515756u,in_total_bytes=784u,dst_port=1900u,total_bytes_exported=1u,flow_end_reason="forced end" +netflow,source=127.0.0.1,version=IPFIX flow_start_ms=1666345512531u,in_total_bytes=92215u,src="192.168.119.100",rev_total_bytes_exported=0u,flow_end_reason="forced end",vlan_src=0u,src_tos="0x00",flow_end_ms=1666345519408u,in_total_packets=102u,dst="216.58.212.132",src_port=54458u,dst_port=443u,last_switched=17u,total_bytes_exported=0u,protocol="udp" +netflow,source=127.0.0.1,version=IPFIX dst="13.32.99.76",flow_start_ms=1666345519932u,src="192.168.119.100",vlan_src=0u,flow_end_ms=1666345519942u,flow_end_reason="forced end",dst_port=443u,rev_total_bytes_exported=0u,protocol="tcp",last_switched=10u,in_total_packets=1u,src_port=60758u,src_tos="0x00",in_total_bytes=52u,total_bytes_exported=0u +netflow,source=127.0.0.1,version=IPFIX flow_start_ms=1666345519932u,total_bytes_exported=0u,protocol="tcp",last_switched=10u,vlan_src=0u,src_port=58432u,src_tos="0x00",flow_end_ms=1666345519942u,in_total_bytes=40u,in_total_packets=1u,rev_total_bytes_exported=0u,flow_end_reason="forced end",src="192.168.119.100",dst="104.17.146.91",dst_port=443u +netflow,source=127.0.0.1,version=IPFIX dst_port=53u,rev_total_bytes_exported=0u,src_tos="0x00",in_total_bytes=284u,dst="192.168.119.17",last_switched=0u,src_port=36397u,total_bytes_exported=1u,protocol="udp",flow_start_ms=1666345521006u,in_total_packets=2u,flow_end_reason="forced end",vlan_src=0u,flow_end_ms=1666345521006u,src="192.168.119.100" +netflow,source=127.0.0.1,version=IPFIX in_total_packets=2u,dst_port=53u,flow_start_ms=1666345520998u,flow_end_ms=1666345521019u,rev_total_bytes_exported=0u,last_switched=0u,src="192.168.119.100",dst="192.168.119.17",src_port=39786u,flow_end_reason="forced end",vlan_src=0u,src_tos="0x00",in_total_bytes=193u,total_bytes_exported=1u,protocol="udp" +netflow,source=127.0.0.1,version=IPFIX protocol="tcp",src_tos="0x00",flow_start_ms=1666345521006u,flow_end_ms=1666345521032u,rev_total_bytes_exported=0u,total_bytes_exported=0u,vlan_src=0u,in_total_packets=4u,src="192.168.119.100",src_port=52370u,dst_port=443u,flow_end_reason="forced end",last_switched=9u,in_total_bytes=653u,dst="185.199.109.154" +netflow,source=127.0.0.1,version=IPFIX dst="192.168.119.17",dst_port=53u,vlan_src=0u,flow_start_ms=1666345521742u,in_total_packets=2u,flow_end_reason="forced end",last_switched=0u,flow_end_ms=1666345521742u,src_port=44461u,total_bytes_exported=1u,rev_total_bytes_exported=0u,src="192.168.119.100",protocol="udp",src_tos="0x00",in_total_bytes=326u +netflow,source=127.0.0.1,version=IPFIX total_bytes_exported=0u,protocol="tcp",last_switched=9u,vlan_src=0u,flow_end_ms=1666345521771u,in_total_packets=4u,flow_end_reason="forced end",src_port=52376u,rev_total_bytes_exported=0u,in_total_bytes=653u,src="192.168.119.100",dst_port=443u,src_tos="0x00",flow_start_ms=1666345521742u,dst="185.199.109.154" +netflow,source=127.0.0.1,version=IPFIX src="192.168.119.100",rev_total_bytes_exported=0u,last_switched=0u,in_total_bytes=334u,vlan_src=0u,src_tos="0x00",in_total_packets=2u,dst="192.168.119.17",src_port=51858u,total_bytes_exported=1u,flow_end_reason="forced end",flow_start_ms=1666345521780u,flow_end_ms=1666345521780u,dst_port=53u,protocol="udp" +netflow,source=127.0.0.1,version=IPFIX flow_end_reason="forced end",src_tos="0x00",in_total_bytes=344u,rev_total_bytes_exported=0u,last_switched=13u,dst_port=53u,vlan_src=0u,flow_start_ms=1666345521780u,flow_end_ms=1666345521794u,src_port=34970u,total_bytes_exported=1u,protocol="udp",in_total_packets=2u,src="192.168.119.100",dst="192.168.119.17" +netflow,source=127.0.0.1,version=IPFIX dst="192.168.119.17",total_bytes_exported=1u,dst_port=53u,rev_total_bytes_exported=0u,flow_start_ms=1666345521813u,src_port=52794u,protocol="udp",flow_end_reason="forced end",vlan_src=0u,flow_end_ms=1666345521836u,in_total_bytes=290u,last_switched=23u,src_tos="0x00",in_total_packets=2u,src="192.168.119.100" +netflow,source=127.0.0.1,version=IPFIX in_total_bytes=318u,total_bytes_exported=1u,vlan_src=0u,src_tos="0x00",dst_port=53u,protocol="udp",flow_end_reason="forced end",flow_start_ms=1666345522036u,in_total_packets=2u,flow_end_ms=1666345522050u,src="192.168.119.100",dst="192.168.119.17",src_port=43629u,rev_total_bytes_exported=0u,last_switched=11u +netflow,source=127.0.0.1,version=IPFIX in_total_packets=2u,flow_end_reason="forced end",vlan_src=0u,flow_end_ms=1666345522240u,dst="192.168.119.17",src="192.168.119.100",total_bytes_exported=1u,rev_total_bytes_exported=0u,protocol="udp",last_switched=0u,src_tos="0x00",in_total_bytes=279u,src_port=48781u,dst_port=53u,flow_start_ms=1666345522229u +netflow,source=127.0.0.1,version=IPFIX src_tos="0x00",flow_start_ms=1666345522279u,dst="192.168.119.17",dst_port=53u,total_bytes_exported=1u,rev_total_bytes_exported=0u,last_switched=0u,in_total_bytes=201u,src_port=43078u,flow_end_reason="forced end",vlan_src=0u,flow_end_ms=1666345522291u,in_total_packets=2u,src="192.168.119.100",protocol="udp" +netflow,source=127.0.0.1,version=IPFIX flow_start_ms=1666345521806u,in_total_bytes=19213u,src="192.168.119.100",src_tos="0x00",vlan_src=0u,flow_end_ms=1666345525312u,in_total_packets=98u,src_port=49880u,protocol="tcp",dst_port=443u,total_bytes_exported=0u,rev_total_bytes_exported=0u,last_switched=8u,dst="185.199.111.133",flow_end_reason="forced end" +netflow,source=127.0.0.1,version=IPFIX src="192.168.119.100",src_tos="0x00",flow_start_ms=1666345522240u,flow_end_ms=1666345525417u,vlan_src=0u,total_bytes_exported=0u,protocol="tcp",flow_end_reason="forced end",in_total_packets=15u,dst="140.82.113.21",dst_port=443u,rev_total_bytes_exported=0u,last_switched=102u,in_total_bytes=5660u,src_port=43438u +netflow,source=127.0.0.1,version=IPFIX rev_total_bytes_exported=0u,protocol="tcp",last_switched=9u,flow_start_ms=1666345522291u,in_total_bytes=9678u,dst="140.82.121.6",src_tos="0x00",total_bytes_exported=0u,vlan_src=0u,in_total_packets=50u,src="192.168.119.100",dst_port=443u,flow_end_ms=1666345525576u,src_port=59884u,flow_end_reason="forced end" +netflow,source=127.0.0.1,version=IPFIX rev_total_bytes_exported=0u,flow_end_reason="forced end",flow_end_ms=1666345525645u,in_total_bytes=3896u,in_total_packets=9u,last_switched=0u,src_tos="0x00",protocol="tcp",vlan_src=0u,src="140.82.113.25",dst="192.168.119.100",total_bytes_exported=0u,flow_start_ms=1666345518733u,src_port=443u,dst_port=49448u +netflow,source=127.0.0.1,version=IPFIX src="192.168.119.100",dst="142.250.186.170",rev_total_bytes_exported=0u,in_total_packets=21u,dst_port=443u,protocol="udp",last_switched=18u,vlan_src=0u,flow_start_ms=1666345514168u,flow_end_ms=1666345525871u,in_total_bytes=5520u,total_bytes_exported=0u,flow_end_reason="forced end",src_port=58246u,src_tos="0x00" +netflow,source=127.0.0.1,version=IPFIX flow_end_ms=1666345525880u,dst_port=443u,rev_total_bytes_exported=0u,flow_end_reason="forced end",src_tos="0x00",dst="140.82.121.3",src_port=37792u,vlan_src=0u,in_total_packets=212u,total_bytes_exported=0u,protocol="tcp",flow_start_ms=1666345521019u,in_total_bytes=254425u,src="192.168.119.100",last_switched=9u +netflow,source=127.0.0.1,version=IPFIX src="192.168.119.100",total_bytes_exported=1u,flow_end_reason="forced end",vlan_src=0u,flow_end_ms=1666345527739u,in_total_packets=2u,rev_total_bytes_exported=0u,last_switched=0u,flow_start_ms=1666345527739u,dst="192.168.119.17",protocol="udp",in_total_bytes=164u,dst_port=53u,src_port=50077u,src_tos="0x00" diff --git a/plugins/inputs/netflow/testcases/ipfix_example/ipfix_0.bin b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_0.bin new file mode 100644 index 000000000..b8b616ef8 Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_0.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_example/ipfix_1.bin b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_1.bin new file mode 100644 index 000000000..0cc85f89c Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_1.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_example/ipfix_2.bin b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_2.bin new file mode 100644 index 000000000..8b02ee246 Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_2.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_example/ipfix_3.bin b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_3.bin new file mode 100644 index 000000000..ce0b37ee2 Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_3.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_example/ipfix_4.bin b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_4.bin new file mode 100644 index 000000000..e700906b6 Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_4.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_example/ipfix_5.bin b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_5.bin new file mode 100644 index 000000000..467c16082 Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_5.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_example/ipfix_6.bin b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_6.bin new file mode 100644 index 000000000..80aa123cf Binary files /dev/null and b/plugins/inputs/netflow/testcases/ipfix_example/ipfix_6.bin differ diff --git a/plugins/inputs/netflow/testcases/ipfix_example/telegraf.conf b/plugins/inputs/netflow/testcases/ipfix_example/telegraf.conf new file mode 100644 index 000000000..cfd23d363 --- /dev/null +++ b/plugins/inputs/netflow/testcases/ipfix_example/telegraf.conf @@ -0,0 +1,2 @@ +[[inputs.netflow]] + service_address = "udp://127.0.0.1:0" diff --git a/plugins/inputs/netflow/testcases/netflow_v5_example/expected.out b/plugins/inputs/netflow/testcases/netflow_v5_example/expected.out new file mode 100644 index 000000000..3b5ca3119 --- /dev/null +++ b/plugins/inputs/netflow/testcases/netflow_v5_example/expected.out @@ -0,0 +1,8 @@ +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="140.82.121.3",src_port=443u,dst="192.168.119.100",dst_port=55516u,flows=8u,in_bytes=87477u,in_packets=78u,first_switched=86400660u,last_switched=86403316u,tcp_flags="...AP...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="140.82.121.6",src_port=443u,dst="192.168.119.100",dst_port=36408u,flows=8u,in_bytes=5009u,in_packets=21u,first_switched=86400447u,last_switched=86403267u,tcp_flags="...AP...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="140.82.112.22",src_port=443u,dst="192.168.119.100",dst_port=39638u,flows=8u,in_bytes=925u,in_packets=6u,first_switched=86400324u,last_switched=86403214u,tcp_flags="...AP...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="140.82.114.26",src_port=443u,dst="192.168.119.100",dst_port=49398u,flows=8u,in_bytes=250u,in_packets=2u,first_switched=86403131u,last_switched=86403362u,tcp_flags="...AP...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="192.168.119.100",src_port=55516u,dst="140.82.121.3",dst_port=443u,flows=8u,in_bytes=4969u,in_packets=37u,first_switched=86400652u,last_switched=86403269u,tcp_flags="...AP...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="192.168.119.100",src_port=36408u,dst="140.82.121.6",dst_port=443u,flows=8u,in_bytes=2736u,in_packets=21u,first_switched=86400438u,last_switched=86403258u,tcp_flags="...AP...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="192.168.119.100",src_port=39638u,dst="140.82.112.22",dst_port=443u,flows=8u,in_bytes=1560u,in_packets=6u,first_switched=86400225u,last_switched=86403255u,tcp_flags="...AP...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u +netflow,source=127.0.0.1,version=NetFlowV5 protocol="tcp",src="192.168.119.100",src_port=49398u,dst="140.82.114.26",dst_port=443u,flows=8u,in_bytes=697u,in_packets=4u,first_switched=86403030u,last_switched=86403362u,tcp_flags="...AP...",engine_type="19",engine_id="0x56",sys_uptime=90003000u,src_tos="0x00",bgp_src_as=0u,bgp_dst_as=0u,src_mask=0u,dst_mask=0u,in_snmp=0u,out_snmp=0u,next_hop="0.0.0.0",seq_number=0u,sampling_interval=0u \ No newline at end of file diff --git a/plugins/inputs/netflow/testcases/netflow_v5_example/netflow_v5.bin b/plugins/inputs/netflow/testcases/netflow_v5_example/netflow_v5.bin new file mode 100644 index 000000000..747001d7b Binary files /dev/null and b/plugins/inputs/netflow/testcases/netflow_v5_example/netflow_v5.bin differ diff --git a/plugins/inputs/netflow/testcases/netflow_v5_example/telegraf.conf b/plugins/inputs/netflow/testcases/netflow_v5_example/telegraf.conf new file mode 100644 index 000000000..e78977a11 --- /dev/null +++ b/plugins/inputs/netflow/testcases/netflow_v5_example/telegraf.conf @@ -0,0 +1,3 @@ +[[inputs.netflow]] + service_address = "udp://127.0.0.1:0" + protocol = "netflow v5" \ No newline at end of file diff --git a/plugins/inputs/netflow/testcases/netflow_v9_example/expected.out b/plugins/inputs/netflow/testcases/netflow_v9_example/expected.out new file mode 100644 index 000000000..27d9d335a --- /dev/null +++ b/plugins/inputs/netflow/testcases/netflow_v9_example/expected.out @@ -0,0 +1,8 @@ +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="140.82.121.3",src_port=443u,dst="192.168.119.100",dst_port=55516u,in_bytes=87477u,in_packets=78u,flow_start_ms=1666350478660u,flow_end_ms=1666350481316u,tcp_flags="...AP...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="140.82.121.6",src_port=443u,dst="192.168.119.100",dst_port=36408u,in_bytes=5009u,in_packets=21u,flow_start_ms=1666350478447u,flow_end_ms=1666350481267u,tcp_flags="...AP...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="140.82.112.22",src_port=443u,dst="192.168.119.100",dst_port=39638u,in_bytes=925u,in_packets=6u,flow_start_ms=1666350478324u,flow_end_ms=1666350481214u,tcp_flags="...AP...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="140.82.114.26",src_port=443u,dst="192.168.119.100",dst_port=49398u,in_bytes=250u,in_packets=2u,flow_start_ms=1666350481131u,flow_end_ms=1666350481362u,tcp_flags="...AP...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",src_port=55516u,dst="140.82.121.3",dst_port=443u,in_bytes=4969u,in_packets=37u,flow_start_ms=1666350478652u,flow_end_ms=1666350481269u,tcp_flags="...AP...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",src_port=36408u,dst="140.82.121.6",dst_port=443u,in_bytes=2736u,in_packets=21u,flow_start_ms=1666350478438u,flow_end_ms=1666350481258u,tcp_flags="...AP...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",src_port=39638u,dst="140.82.112.22",dst_port=443u,in_bytes=1560u,in_packets=6u,flow_start_ms=1666350478225u,flow_end_ms=1666350481255u,tcp_flags="...AP...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" +netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",src_port=49398u,dst="140.82.114.26",dst_port=443u,in_bytes=697u,in_packets=4u,flow_start_ms=1666350481030u,flow_end_ms=1666350481362u,tcp_flags="...AP...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00" diff --git a/plugins/inputs/netflow/testcases/netflow_v9_example/netflow_v9.bin b/plugins/inputs/netflow/testcases/netflow_v9_example/netflow_v9.bin new file mode 100644 index 000000000..e138a8521 Binary files /dev/null and b/plugins/inputs/netflow/testcases/netflow_v9_example/netflow_v9.bin differ diff --git a/plugins/inputs/netflow/testcases/netflow_v9_example/telegraf.conf b/plugins/inputs/netflow/testcases/netflow_v9_example/telegraf.conf new file mode 100644 index 000000000..cfd23d363 --- /dev/null +++ b/plugins/inputs/netflow/testcases/netflow_v9_example/telegraf.conf @@ -0,0 +1,2 @@ +[[inputs.netflow]] + service_address = "udp://127.0.0.1:0" diff --git a/plugins/inputs/netflow/type_conversion.go b/plugins/inputs/netflow/type_conversion.go new file mode 100644 index 000000000..3b7095728 --- /dev/null +++ b/plugins/inputs/netflow/type_conversion.go @@ -0,0 +1,635 @@ +package netflow + +import ( + "bytes" + _ "embed" + "encoding/binary" + "encoding/csv" + "encoding/hex" + "errors" + "fmt" + "math" + "net" + "strconv" + "strings" +) + +// From https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml +// +//go:embed layer4_protocol_numbers.csv +var l4ProtoFile []byte + +// From https://www.iana.org/assignments/ip-parameters/ip-parameters.xhtml +// +//go:embed ipv4_options.csv +var ip4OptionFile []byte + +var l4ProtoMapping map[uint8]string +var ipv4OptionMapping []string + +func initL4ProtoMapping() error { + buf := bytes.NewBuffer(l4ProtoFile) + reader := csv.NewReader(buf) + records, err := reader.ReadAll() + if err != nil { + return err + } + if len(records) < 2 { + return errors.New("empty file") + } + + l4ProtoMapping = make(map[uint8]string) + for _, r := range records[1:] { + if len(r) != 2 { + return fmt.Errorf("invalid record: %v", r) + } + name := strings.ToLower(r[1]) + if name == "" { + continue + } + id, err := strconv.ParseUint(r[0], 10, 8) + if err != nil { + return fmt.Errorf("%w: %v", err, r) + } + l4ProtoMapping[uint8(id)] = name + } + + return nil +} + +func initIPv4OptionMapping() error { + buf := bytes.NewBuffer(ip4OptionFile) + reader := csv.NewReader(buf) + records, err := reader.ReadAll() + if err != nil { + return err + } + if len(records) < 2 { + return errors.New("empty file") + } + + ipv4OptionMapping = make([]string, 32) + for _, r := range records[1:] { + if len(r) != 2 { + return fmt.Errorf("invalid record: %v", r) + } + idx, err := strconv.ParseUint(r[0], 10, 8) + if err != nil { + return fmt.Errorf("%w: %v", err, r) + } + ipv4OptionMapping[idx] = r[1] + } + + return nil +} + +func decodeInt32(b []byte) interface{} { + return int64(int32(binary.BigEndian.Uint32(b))) +} + +func decodeUint(b []byte) interface{} { + switch len(b) { + case 1: + return uint64(b[0]) + case 2: + return uint64(binary.BigEndian.Uint16(b)) + case 4: + return uint64(binary.BigEndian.Uint32(b)) + case 8: + return binary.BigEndian.Uint64(b) + } + panic(fmt.Errorf("invalid length for uint buffer %v", b)) +} + +func decodeFloat64(b []byte) interface{} { + raw := binary.BigEndian.Uint64(b) + return math.Float64frombits(raw) +} + +// According to https://www.rfc-editor.org/rfc/rfc5101#section-6.1.5 +func decodeBool(b []byte) interface{} { + if b[0] == 1 { + return true + } + if b[0] == 2 { + return false + } + return b[0] +} + +func decodeHex(b []byte) interface{} { + return "0x" + hex.EncodeToString(b) +} + +func decodeString(b []byte) interface{} { + return string(b) +} + +func decodeMAC(b []byte) interface{} { + mac := net.HardwareAddr(b) + return mac.String() +} + +func decodeIP(b []byte) interface{} { + ip := net.IP(b) + return ip.String() +} + +func decodeIPFromUint32(a uint32) interface{} { + b := make([]byte, 4) + binary.BigEndian.PutUint32(b, a) + return decodeIP(b) +} + +func decodeL4Proto(b []byte) interface{} { + return mapL4Proto(b[0]) +} + +func mapL4Proto(id uint8) string { + name, found := l4ProtoMapping[id] + if found { + return name + } + return strconv.FormatUint(uint64(id), 10) +} + +func decodeIPv4Options(b []byte) interface{} { + flags := binary.BigEndian.Uint32(b) + + var result []string + for i := 0; i < 32; i++ { + name := ipv4OptionMapping[i] + if name == "" { + name = fmt.Sprintf("UA%d", i) + } + if (flags>>i)&0x01 != 0 { + result = append(result, name) + } + } + + return strings.Join(result, ",") +} + +func decodeTCPFlags(b []byte) interface{} { + if len(b) < 1 { + return "" + } + + if len(b) == 1 { + return mapTCPFlags(b[0]) + } + + // IPFIX has more flags + results := make([]string, 0, 8) + for i := 7; i >= 0; i-- { + if (b[0]>>i)&0x01 != 0 { + // Currently all flags are reserved so denote the bit set + results = append(results, "*") + } else { + results = append(results, ".") + } + } + return strings.Join(results, "") + mapTCPFlags(b[1]) +} + +func mapTCPFlags(flags uint8) string { + flagMapping := []string{ + "F", // FIN + "S", // SYN + "R", // RST + "P", // PSH + "A", // ACK + "U", // URG + "E", // ECE + "C", // CWR + } + + result := make([]string, 0, 8) + + for i := 7; i >= 0; i-- { + if (flags>>i)&0x01 != 0 { + result = append(result, flagMapping[i]) + } else { + result = append(result, ".") + } + } + + return strings.Join(result, "") +} + +func decodeFragmentFlags(b []byte) interface{} { + flagMapping := []string{ + "*", // do not care + "*", // do not care + "*", // do not care + "*", // do not care + "*", // do not care + "M", // MF -- more fragments + "D", // DF -- don't fragment + "R", // RS -- reserved + } + + flags := b[0] + result := make([]string, 0, 8) + for i := 7; i >= 0; i-- { + if (flags>>i)&0x01 != 0 { + result = append(result, flagMapping[i]) + } else { + result = append(result, ".") + } + } + + return strings.Join(result, "") +} + +func decodeSampleAlgo(b []byte) interface{} { + switch b[0] { + case 1: + return "deterministic" + case 2: + return "random" + } + return strconv.FormatUint(uint64(b[0]), 10) +} + +func decodeEngineType(b []byte) interface{} { + return mapEngineType(b[0]) +} + +func mapEngineType(b uint8) string { + switch b { + case 0: + return "RP" + case 1: + return "VIP/linecard" + case 2: + return "PFC/DFC" + } + return strconv.FormatUint(uint64(b), 10) +} + +func decodeMPLSType(b []byte) interface{} { + switch b[0] { + case 0: + return "unknown" + case 1: + return "TE-MIDPT" + case 2: + return "Pseudowire" + case 3: + return "VPN" + case 4: + return "BGP" + case 5: + return "LDP" + case 6: + return "Path computation element" + case 7: + return "OSPFv2" + case 8: + return "OSPFv3" + case 9: + return "IS-IS" + case 10: + return "BGP segment routing Prefix-SID" + } + return strconv.FormatUint(uint64(b[0]), 10) +} + +func decodeIPVersion(b []byte) interface{} { + switch b[0] { + case 4: + return "IPv4" + case 6: + return "IPv6" + } + return strconv.FormatUint(uint64(b[0]), 10) +} + +func decodeDirection(b []byte) interface{} { + switch b[0] { + case 0: + return "ingress" + case 1: + return "egress" + } + return strconv.FormatUint(uint64(b[0]), 10) +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-forwarding-status +func decodeFwdStatus(b []byte) interface{} { + switch b[0] >> 6 { + case 0: + return "unknown" + case 1: + return "forwarded" + case 2: + return "dropped" + case 3: + return "consumed" + } + return "invalid" +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-forwarding-status +func decodeFwdReason(b []byte) interface{} { + switch b[0] { + // unknown + case 0: + return "unknown" + // forwarded + case 64: + return "unknown" + case 65: + return "fragmented" + case 66: + return "not fragmented" + // dropped + case 128: + return "unknown" + case 129: + return "ACL deny" + case 130: + return "ACL drop" + case 131: + return "unroutable" + case 132: + return "adjacency" + case 133: + return "fragmentation and DF set" + case 134: + return "bad header checksum" + case 135: + return "bad total length" + case 136: + return "bad header length" + case 137: + return "bad TTL" + case 138: + return "policer" + case 139: + return "WRED" + case 140: + return "RPF" + case 141: + return "for us" + case 142: + return "bad output interface" + case 143: + return "hardware" + // consumed + case 192: + return "unknown" + case 193: + return "terminate punt adjacency" + case 194: + return "terminate incomplete adjacency" + case 195: + return "terminate for us" + case 14: + return "" + } + return "invalid" +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-firewall-event +func decodeFWEvent(b []byte) interface{} { + switch b[0] { + case 0: + return "ignore" + case 1: + return "flow created" + case 2: + return "flow deleted" + case 3: + return "flow denied" + case 4: + return "flow alert" + case 5: + return "flow update" + } + return strconv.FormatUint(uint64(b[0]), 10) +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-flow-end-reason +func decodeFlowEndReason(b []byte) interface{} { + switch b[0] { + case 0: + return "reserved" + case 1: + return "idle timeout" + case 2: + return "active timeout" + case 3: + return "end of flow" + case 4: + return "forced end" + case 5: + return "lack of resources" + } + return "unassigned" +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-biflow-direction +func decodeBiflowDirection(b []byte) interface{} { + switch b[0] { + case 0: + return "arbitrary" + case 1: + return "initiator" + case 2: + return "reverse initiator" + case 3: + return "perimeter" + } + return "unassigned" +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-observation-point-type +func decodeOpsPointType(b []byte) interface{} { + switch b[0] { + case 0: + return "invalid" + case 1: + return "physical port" + case 2: + return "port channel" + case 3: + return "vlan" + } + return "unassigned" +} + +func decodeAnonStabilityClass(b []byte) interface{} { + switch b[1] & 0x03 { + case 1: + return "session" + case 2: + return "exporter-collector" + case 3: + return "stable" + } + return "undefined" +} + +func decodeAnonFlags(b []byte) interface{} { + var result []string + if b[0]&(1<<2) != 0 { + result = append(result, "PmA") + } + + if b[0]&(1<<3) != 0 { + result = append(result, "LOR") + } + + return strings.Join(result, ",") +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-anonymization-technique +func decodeAnonTechnique(b []byte) interface{} { + tech := binary.BigEndian.Uint16(b) + switch tech { + case 0: + return "undefined" + case 1: + return "none" + case 2: + return "precision degradation" + case 3: + return "binning" + case 4: + return "enumeration" + case 5: + return "permutation" + case 6: + return "structure permutation" + case 7: + return "reverse truncation" + case 8: + return "noise" + case 9: + return "offset" + } + return "unassigned" +} + +func decodeTechnology(b []byte) interface{} { + switch string(b) { + case "yes", "y", "1": + return "yes" + case "no", "n", "2": + return "no" + case "unassigned", "u", "0": + return "unassigned" + } + switch b[0] { + case 0: + return "unassigned" + case 1: + return "yes" + case 2: + return "no" + } + return "undefined" +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-nat-type +func decodeIPNatType(b []byte) interface{} { + tech := binary.BigEndian.Uint16(b) + switch tech { + case 0: + return "unknown" + case 1: + return "NAT44" + case 2: + return "NAT64" + case 3: + return "NAT46" + case 4: + return "IPv4 no NAT" + case 5: + return "NAT66" + case 6: + return "IPv6 no NAT" + } + return "unassigned" +} + +// https://www.iana.org/assignments/psamp-parameters/psamp-parameters.xhtml +func decodeSelectorAlgorithm(b []byte) interface{} { + tech := binary.BigEndian.Uint16(b) + switch tech { + case 0: + return "reserved" + case 1: + return "systematic count-based sampling" + case 2: + return "systematic time-based sampling" + case 3: + return "random n-out-of-N sampling" + case 4: + return "uniform probabilistic sampling" + case 5: + return "property match filtering" + case 6: + return "hash based filtering using BOB" + case 7: + return "hash based filtering using IPSX" + case 8: + return "hash based filtering using CRC" + case 9: + return "flow-state dependent" + } + return "unassigned" +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-value-distribution-method +func decodeValueDistMethod(b []byte) interface{} { + switch b[0] { + case 0: + return "unspecified" + case 1: + return "start interval" + case 2: + return "end interval" + case 3: + return "mid interval" + case 4: + return "simple uniform distribution" + case 5: + return "proportional uniform distribution" + case 6: + return "simulated process" + case 7: + return "direct" + } + return "unassigned" +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-data-link-frame-type +func decodeDataLinkFrameType(b []byte) interface{} { + switch binary.BigEndian.Uint16(b) { + case 0x0001: + return "IEEE802.3 ethernet" + case 0x0002: + return "IEEE802.11 MAC" + } + return "unassigned" +} + +// https://www.iana.org/assignments/ipfix/ipfix.xhtml#ipfix-mib-capture-time-semantics +func decodeCaptureTimeSemantics(b []byte) interface{} { + switch b[0] { + case 0: + return "undefined" + case 1: + return "begin" + case 2: + return "end" + case 3: + return "export" + case 4: + return "average" + } + return "unassigned" +} diff --git a/plugins/inputs/netflow/type_conversion_test.go b/plugins/inputs/netflow/type_conversion_test.go new file mode 100644 index 000000000..d54ef5467 --- /dev/null +++ b/plugins/inputs/netflow/type_conversion_test.go @@ -0,0 +1,442 @@ +package netflow + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDecodeInt32(t *testing.T) { + buf := []byte{0x82, 0xad, 0x80, 0x86} + out, ok := decodeInt32(buf).(int64) + require.True(t, ok) + require.Equal(t, int64(-2102558586), out) +} + +func TestDecodeUint(t *testing.T) { + tests := []struct { + name string + in []byte + expected uint64 + }{ + { + name: "uint8", + in: []byte{0x42}, + expected: 66, + }, + { + name: "uint16", + in: []byte{0x0A, 0x42}, + expected: 2626, + }, + { + name: "uint32", + in: []byte{0x82, 0xad, 0x80, 0x86}, + expected: 2192408710, + }, + { + name: "uint64", + in: []byte{0x00, 0x00, 0x23, 0x42, 0x8f, 0xad, 0x80, 0x86}, + expected: 38768785326214, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out, ok := decodeUint(tt.in).(uint64) + require.True(t, ok) + require.Equal(t, tt.expected, out) + }) + } +} + +func TestDecodeUintInvalid(t *testing.T) { + require.Panics(t, func() { decodeUint([]byte{0x00, 0x00, 0x00}) }) +} + +func TestDecodeFloat64(t *testing.T) { + buf := []byte{0x40, 0x09, 0x21, 0xfb, 0x54, 0x44, 0x2e, 0xea} + out, ok := decodeFloat64(buf).(float64) + require.True(t, ok) + require.Equal(t, float64(3.14159265359), out) +} + +func TestDecodeBool(t *testing.T) { + tests := []struct { + name string + in []byte + expected interface{} + }{ + { + name: "zero", + in: []byte{0x00}, + expected: uint8(0), + }, + { + name: "true", + in: []byte{0x01}, + expected: true, + }, + { + name: "false", + in: []byte{0x02}, + expected: false, + }, + { + name: "other", + in: []byte{0x23}, + expected: uint8(35), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out := decodeBool(tt.in) + require.Equal(t, tt.expected, out) + }) + } +} + +func TestDecodeHex(t *testing.T) { + buf := []byte{0x40, 0x09, 0x21, 0xfb, 0x54, 0x44, 0x2e, 0xea} + out, ok := decodeHex(buf).(string) + require.True(t, ok) + require.Equal(t, "0x400921fb54442eea", out) +} + +func TestDecodeString(t *testing.T) { + buf := []byte{0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x74, 0x65, 0x6c, 0x65, 0x67, 0x72, 0x61, 0x66} + out, ok := decodeString(buf).(string) + require.True(t, ok) + require.Equal(t, "hello telegraf", out) +} + +func TestDecodeMAC(t *testing.T) { + buf := []byte{0x2c, 0xf0, 0x5d, 0xe9, 0x04, 0x42} + out, ok := decodeMAC(buf).(string) + require.True(t, ok) + require.Equal(t, "2c:f0:5d:e9:04:42", out) +} + +func TestDecodeIP(t *testing.T) { + tests := []struct { + name string + in []byte + expected string + }{ + { + name: "localhost IPv4", + in: []byte{0x7f, 0x00, 0x00, 0x01}, + expected: "127.0.0.1", + }, + { + name: "unrouted IPv4", + in: []byte{0xc0, 0xa8, 0x04, 0x42}, + expected: "192.168.4.66", + }, + { + name: "localhost IPv6", + in: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, + expected: "::1", + }, + { + name: "local network IPv6", + in: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xfe, 0x80, 0xd6, 0x8e, 0x07, 0x7f, 0x59, 0x5a, 0x23, 0xf1}, + expected: "::fe80:d68e:77f:595a:23f1", + }, + { + name: "google.com IPv6", + in: []byte{0x00, 0x00, 0x00, 0x00, 0x2a, 0x00, 0x14, 0x50, 0x40, 0x01, 0x08, 0x11, 0x00, 0x00, 0x20, 0x0e}, + expected: "::2a00:1450:4001:811:0:200e", + }, + { + name: "stripped in between IPv6", + in: []byte{0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14, 0x50, 0x40, 0x01, 0x08, 0x11, 0x00, 0x01, 0x20, 0x0e}, + expected: "2a00::1450:4001:811:1:200e", + }, + { + name: "IPv6 not enough bytes", + in: []byte{0x00, 0x00, 0x00, 0xff, 0x00, 0x01}, + expected: "?000000ff0001", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out, ok := decodeIP(tt.in).(string) + require.True(t, ok) + require.Equal(t, tt.expected, out) + }) + } +} + +func TestDecodeIPFromUint32(t *testing.T) { + in := uint32(0x7f000001) + out, ok := decodeIPFromUint32(in).(string) + require.True(t, ok) + require.Equal(t, "127.0.0.1", out) +} + +func TestDecodeLayer4ProtocolNumber(t *testing.T) { + require.NoError(t, initL4ProtoMapping()) + + tests := []struct { + name string + in []byte + expected string + }{ + { + name: "ICMP 1", + in: []byte{0x01}, + expected: "icmp", + }, + { + name: "IPv4 4", + in: []byte{0x04}, + expected: "ipv4", + }, + { + name: "IPv6 41", + in: []byte{0x29}, + expected: "ipv6", + }, + { + name: "L2TP 115", + in: []byte{0x73}, + expected: "l2tp", + }, + { + name: "PTP 123", + in: []byte{0x7b}, + expected: "ptp", + }, + { + name: "unassigned 201", + in: []byte{0xc9}, + expected: "201", + }, + { + name: "experimental 254", + in: []byte{0xfe}, + expected: "experimental", + }, + { + name: "Reserved 255", + in: []byte{0xff}, + expected: "reserved", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out, ok := decodeL4Proto(tt.in).(string) + require.True(t, ok) + require.Equal(t, tt.expected, out) + }) + } +} + +func TestDecodeIPv4Options(t *testing.T) { + require.NoError(t, initIPv4OptionMapping()) + + tests := []struct { + name string + bits []int + expected string + }{ + { + name: "none", + bits: []int{}, + expected: "", + }, + { + name: "all", + bits: []int{ + 0, 1, 2, 3, 4, 5, 6, 7, + 8, 9, 10, 11, 12, 13, 14, 15, + 16, 17, 18, 19, 20, 21, 22, 23, + 24, 25, 26, 27, 28, 29, 30, 31, + }, + expected: "EOOL,NOP,SEC,LSR,TS,E-SEC,CIPSO,RR,SID,SSR,ZSU,MTUP," + + "MTUR,FINN,VISA,ENCODE,IMITD,EIP,TR,ADDEXT,RTRALT,SDB," + + "UA22,DPS,UMP,QS,UA26,UA27,UA28,UA29,EXP,UA31", + }, + { + name: "EOOL", + bits: []int{0}, + expected: "EOOL", + }, + { + name: "SSR", + bits: []int{9}, + expected: "SSR", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var options uint32 + for _, bit := range tt.bits { + options |= 1 << bit + } + in := make([]byte, 4) + binary.BigEndian.PutUint32(in, options) + + out, ok := decodeIPv4Options(in).(string) + require.True(t, ok) + require.Equal(t, tt.expected, out) + }) + } +} + +func TestDecodeTCPFlags(t *testing.T) { + tests := []struct { + name string + bits []int + expected string + ipfix bool + }{ + { + name: "none", + bits: []int{}, + expected: "........", + }, + { + name: "none IPFIX", + bits: []int{}, + expected: "................", + ipfix: true, + }, + { + name: "all", + bits: []int{0, 1, 2, 3, 4, 5, 6, 7}, + expected: "CEUAPRSF", + }, + { + name: "all IPFIX", + bits: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + expected: "********CEUAPRSF", + ipfix: true, + }, + { + name: "SYN", + bits: []int{1}, + expected: "......S.", + }, + { + name: "SYN/ACK", + bits: []int{1, 4}, + expected: "...A..S.", + }, + { + name: "ACK", + bits: []int{4}, + expected: "...A....", + }, + { + name: "FIN", + bits: []int{0}, + expected: ".......F", + }, + { + name: "FIN/ACK", + bits: []int{0, 4}, + expected: "...A...F", + }, + { + name: "ACK IPFIX", + bits: []int{4}, + expected: "...........A....", + ipfix: true, + }, + { + name: "FIN IPFIX", + bits: []int{0}, + expected: "...............F", + ipfix: true, + }, + { + name: "ECN Nounce Sum IPFIX", + bits: []int{8}, + expected: ".......*........", + ipfix: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var in []byte + + if tt.ipfix { + var options uint16 + for _, bit := range tt.bits { + options |= 1 << bit + } + in = make([]byte, 2) + binary.BigEndian.PutUint16(in, options) + } else { + var options uint8 + for _, bit := range tt.bits { + options |= 1 << bit + } + in = []byte{options} + } + out, ok := decodeTCPFlags(in).(string) + require.True(t, ok) + require.Equal(t, tt.expected, out) + }) + } +} + +func TestDecodeFragmentFlags(t *testing.T) { + tests := []struct { + name string + bits []int + expected string + }{ + { + name: "none", + bits: []int{}, + expected: "........", + }, + { + name: "all", + bits: []int{0, 1, 2, 3, 4, 5, 6, 7}, + expected: "RDM*****", + }, + { + name: "RS", + bits: []int{7}, + expected: "R.......", + }, + { + name: "DF", + bits: []int{6}, + expected: ".D......", + }, + { + name: "MF", + bits: []int{5}, + expected: "..M.....", + }, + { + name: "Bit 7 (LSB)", + bits: []int{0}, + expected: ".......*", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var flags uint8 + for _, bit := range tt.bits { + flags |= 1 << bit + } + in := []byte{flags} + out, ok := decodeFragmentFlags(in).(string) + require.True(t, ok) + require.Equal(t, tt.expected, out) + }) + } +}