feat(inputs.netflow): Add sFlow decoder (#13047)
This commit is contained in:
parent
0a491a7bf3
commit
6c49584355
|
|
@ -157,6 +157,7 @@ following works:
|
|||
- github.com/google/go-github [BSD 3-Clause "New" or "Revised" License](https://github.com/google/go-github/blob/master/LICENSE)
|
||||
- github.com/google/go-querystring [BSD 3-Clause "New" or "Revised" License](https://github.com/google/go-querystring/blob/master/LICENSE)
|
||||
- github.com/google/gofuzz [Apache License 2.0](https://github.com/google/gofuzz/blob/master/LICENSE)
|
||||
- github.com/google/gopacket [BSD 3-Clause "New" or "Revised" License](https://github.com/google/gopacket/blob/master/LICENSE)
|
||||
- github.com/google/s2a-go [Apache License 2.0](https://github.com/google/s2a-go/blob/main/LICENSE.md)
|
||||
- github.com/google/uuid [BSD 3-Clause "New" or "Revised" License](https://github.com/google/uuid/blob/master/LICENSE)
|
||||
- github.com/googleapis/enterprise-certificate-proxy [Apache License 2.0](https://github.com/googleapis/enterprise-certificate-proxy/blob/main/LICENSE)
|
||||
|
|
|
|||
1
go.mod
1
go.mod
|
|
@ -87,6 +87,7 @@ require (
|
|||
github.com/google/gnxi v0.0.0-20221016143401-2aeceb5a2901
|
||||
github.com/google/go-cmp v0.5.9
|
||||
github.com/google/go-github/v32 v32.1.0
|
||||
github.com/google/gopacket v1.1.19
|
||||
github.com/google/licensecheck v0.3.1
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/gopcua/opcua v0.3.7
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -727,6 +727,8 @@ github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17
|
|||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
|
||||
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
|
||||
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
|
||||
github.com/google/licensecheck v0.3.1 h1:QoxgoDkaeC4nFrtGN1jV7IPmDCHFNIVh54e5hSt6sPs=
|
||||
github.com/google/licensecheck v0.3.1/go.mod h1:ORkR35t/JjW+emNKtfJDII0zlciG9JgbT7SmsohlHmY=
|
||||
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
```toml @sample.conf
|
||||
# Netflow v5, Netflow v9 and IPFIX collector
|
||||
[[inputs.netflow]]
|
||||
## Address to listen for netflow/ipfix packets.
|
||||
## Address to listen for netflow,ipfix or sflow packets.
|
||||
## example: service_address = "udp://:2055"
|
||||
## service_address = "udp4://:2055"
|
||||
## service_address = "udp6://:2055"
|
||||
|
|
@ -53,9 +53,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
|
||||
## Protocol version to use for decoding.
|
||||
## Available options are
|
||||
## "ipfix" -- IPFIX / Netflow v10 protocol (also works for Netflow v9)
|
||||
## "netflow v5" -- Netflow v5 protocol
|
||||
## "netflow v9" -- Netflow v9 protocol (also works for IPFIX)
|
||||
## "ipfix" -- IPFIX / Netflow v10 protocol (also works for Netflow v9)
|
||||
## "sflow v5" -- sFlow v5 protocol
|
||||
# protocol = "ipfix"
|
||||
|
||||
## Dump incoming packets to the log
|
||||
|
|
@ -92,6 +93,16 @@ following information
|
|||
The specific fields vary for the different protocol versions, here are some
|
||||
examples
|
||||
|
||||
### IPFIX
|
||||
|
||||
```text
|
||||
netflow,source=127.0.0.1,version=IPFIX protocol="tcp",vlan_src=0u,src_tos="0x00",flow_end_ms=1666345513807u,src="192.168.119.100",dst="44.233.90.52",src_port=51008u,total_bytes_exported=0u,flow_end_reason="end of flow",flow_start_ms=1666345513807u,in_total_bytes=52u,in_total_packets=1u,dst_port=443u
|
||||
netflow,source=127.0.0.1,version=IPFIX src_tos="0x00",src_port=54330u,rev_total_bytes_exported=0u,last_switched=9u,vlan_src=0u,flow_start_ms=1666345513807u,in_total_packets=1u,flow_end_reason="end of flow",flow_end_ms=1666345513816u,in_total_bytes=40u,dst_port=443u,src="192.168.119.100",dst="104.17.240.92",total_bytes_exported=0u,protocol="tcp"
|
||||
netflow,source=127.0.0.1,version=IPFIX flow_start_ms=1666345513807u,flow_end_ms=1666345513977u,src="192.168.119.100",dst_port=443u,total_bytes_exported=0u,last_switched=170u,src_tos="0x00",in_total_bytes=40u,dst="44.233.90.52",src_port=51024u,protocol="tcp",flow_end_reason="end of flow",in_total_packets=1u,rev_total_bytes_exported=0u,vlan_src=0u
|
||||
netflow,source=127.0.0.1,version=IPFIX src_port=58246u,total_bytes_exported=1u,flow_start_ms=1666345513806u,flow_end_ms=1666345513806u,in_total_bytes=156u,src="192.168.119.100",rev_total_bytes_exported=0u,last_switched=0u,flow_end_reason="forced end",dst="192.168.119.17",dst_port=53u,protocol="udp",in_total_packets=2u,vlan_src=0u,src_tos="0x00"
|
||||
netflow,source=127.0.0.1,version=IPFIX protocol="udp",vlan_src=0u,src_port=58879u,dst_port=53u,flow_end_ms=1666345513832u,src_tos="0x00",src="192.168.119.100",total_bytes_exported=1u,rev_total_bytes_exported=0u,flow_end_reason="forced end",last_switched=33u,in_total_bytes=221u,in_total_packets=2u,flow_start_ms=1666345513799u,dst="192.168.119.17"
|
||||
```
|
||||
|
||||
### Netflow v5
|
||||
|
||||
```text
|
||||
|
|
@ -118,12 +129,11 @@ netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",
|
|||
netflow,source=127.0.0.1,version=NetFlowV9 protocol="tcp",src="192.168.119.100",src_port=49398u,dst="140.82.114.26",dst_port=443u,in_bytes=697u,in_packets=4u,flow_start_ms=1666350481030u,flow_end_ms=1666350481362u,tcp_flags="...PA...",engine_type="17",engine_id="0x01",icmp_type=0u,icmp_code=0u,fwd_status="unknown",fwd_reason="unknown",src_tos="0x00"
|
||||
```
|
||||
|
||||
### IPFIX
|
||||
### sFlow v5
|
||||
|
||||
```text
|
||||
netflow,source=127.0.0.1,version=IPFIX protocol="tcp",vlan_src=0u,src_tos="0x00",flow_end_ms=1666345513807u,src="192.168.119.100",dst="44.233.90.52",src_port=51008u,total_bytes_exported=0u,flow_end_reason="end of flow",flow_start_ms=1666345513807u,in_total_bytes=52u,in_total_packets=1u,dst_port=443u
|
||||
netflow,source=127.0.0.1,version=IPFIX src_tos="0x00",src_port=54330u,rev_total_bytes_exported=0u,last_switched=9u,vlan_src=0u,flow_start_ms=1666345513807u,in_total_packets=1u,flow_end_reason="end of flow",flow_end_ms=1666345513816u,in_total_bytes=40u,dst_port=443u,src="192.168.119.100",dst="104.17.240.92",total_bytes_exported=0u,protocol="tcp"
|
||||
netflow,source=127.0.0.1,version=IPFIX flow_start_ms=1666345513807u,flow_end_ms=1666345513977u,src="192.168.119.100",dst_port=443u,total_bytes_exported=0u,last_switched=170u,src_tos="0x00",in_total_bytes=40u,dst="44.233.90.52",src_port=51024u,protocol="tcp",flow_end_reason="end of flow",in_total_packets=1u,rev_total_bytes_exported=0u,vlan_src=0u
|
||||
netflow,source=127.0.0.1,version=IPFIX src_port=58246u,total_bytes_exported=1u,flow_start_ms=1666345513806u,flow_end_ms=1666345513806u,in_total_bytes=156u,src="192.168.119.100",rev_total_bytes_exported=0u,last_switched=0u,flow_end_reason="forced end",dst="192.168.119.17",dst_port=53u,protocol="udp",in_total_packets=2u,vlan_src=0u,src_tos="0x00"
|
||||
netflow,source=127.0.0.1,version=IPFIX protocol="udp",vlan_src=0u,src_port=58879u,dst_port=53u,flow_end_ms=1666345513832u,src_tos="0x00",src="192.168.119.100",total_bytes_exported=1u,rev_total_bytes_exported=0u,flow_end_reason="forced end",last_switched=33u,in_total_bytes=221u,in_total_packets=2u,flow_start_ms=1666345513799u,dst="192.168.119.17"
|
||||
netflow,source=127.0.0.1,version=sFlowV5 out_errors=0i,out_bytes=3946i,status="up",in_unknown_protocol=4294967295i,out_unicast_packets_total=29i,agent_subid=100000i,interface_type=6i,in_unicast_packets_total=28i,out_dropped_packets=0i,in_bytes=3910i,in_broadcast_packets_total=4294967295i,ip_version="IPv4",agent_ip="192.168.119.184",in_snmp=3i,in_errors=0i,promiscuous=0i,interface=3i,in_mcast_packets_total=4294967295i,in_dropped_packets=0i,sys_uptime=12414i,seq_number=2i,speed=1000000000i,out_mcast_packets_total=4294967295i,out_broadcast_packets_total=4294967295i 12414000000
|
||||
netflow,source=127.0.0.1,version=sFlowV5 sys_uptime=17214i,agent_ip="192.168.119.184",agent_subid=100000i,seq_number=2i,in_phy_interface=1i,ip_version="IPv4" 17214000000
|
||||
netflow,source=127.0.0.1,version=sFlowV5 in_errors=0i,out_unicast_packets_total=36i,interface=3i,in_broadcast_packets_total=4294967295i,ip_version="IPv4",speed=1000000000i,out_bytes=4408i,out_mcast_packets_total=4294967295i,status="up",in_snmp=3i,in_mcast_packets_total=4294967295i,out_broadcast_packets_total=4294967295i,promiscuous=0i,in_bytes=5568i,out_dropped_packets=0i,sys_uptime=22014i,agent_subid=100000i,in_unknown_protocol=4294967295i,interface_type=6i,in_dropped_packets=0i,in_unicast_packets_total=37i,out_errors=0i,agent_ip="192.168.119.184",seq_number=3i 22014000000
|
||||
|
||||
```
|
||||
|
|
|
|||
|
|
@ -59,8 +59,10 @@ func (n *NetFlow) Init() error {
|
|||
n.decoder = &netflowDecoder{Log: n.Log}
|
||||
case "netflow v5":
|
||||
n.decoder = &netflowv5Decoder{}
|
||||
case "sflow", "sflow v5":
|
||||
n.decoder = &sflowv5Decoder{Log: n.Log}
|
||||
default:
|
||||
return fmt.Errorf("invalid protocol %q, only supports 'netflow v5', 'netflow v9' and 'ipfix'", n.Protocol)
|
||||
return fmt.Errorf("invalid protocol %q, only supports 'sflow', 'netflow v5', 'netflow v9' and 'ipfix'", n.Protocol)
|
||||
}
|
||||
return n.decoder.Init()
|
||||
}
|
||||
|
|
@ -123,7 +125,8 @@ func (n *NetFlow) read(acc telegraf.Accumulator) {
|
|||
}
|
||||
metrics, err := n.decoder.Decode(src.IP, buf[:count])
|
||||
if err != nil {
|
||||
acc.AddError(err)
|
||||
errWithData := fmt.Errorf("%w; raw data: %s", err, hex.EncodeToString(buf[:count]))
|
||||
acc.AddError(errWithData)
|
||||
continue
|
||||
}
|
||||
for _, m := range metrics {
|
||||
|
|
|
|||
|
|
@ -239,7 +239,7 @@ var fieldMappingsIPFIX = map[uint16][]fieldMapping{
|
|||
194: {{"mpls_payload_len", decodeUint}}, // mplsPayloadLength
|
||||
195: {{"dscp", decodeUint}}, // ipDiffServCodePoint
|
||||
196: {{"precedence", decodeUint}}, // ipPrecedence
|
||||
197: {{"fragement_flags", decodeFragmentFlags}}, // fragmentFlags
|
||||
197: {{"fragment_flags", decodeFragmentFlags}}, // fragmentFlags
|
||||
198: {{"bytes_sqr_sum", decodeUint}}, // octetDeltaSumOfSquares
|
||||
199: {{"bytes_sqr_sum_total", decodeUint}}, // octetTotalSumOfSquares
|
||||
200: {{"mpls_top_label_ttl", decodeUint}}, // mplsTopLabelTTL
|
||||
|
|
@ -256,10 +256,10 @@ var fieldMappingsIPFIX = map[uint16][]fieldMapping{
|
|||
211: {{"collector", decodeIP}}, // collectorIPv4Address
|
||||
212: {{"collector", decodeIP}}, // collectorIPv6Address
|
||||
213: {{"export_interface", decodeUint}}, // exportInterface
|
||||
214: {{"export_proto_version", decodeUint}}, //exportProtocolVersion
|
||||
215: {{"export_transport_proto", decodeUint}}, //exportTransportProtocol
|
||||
216: {{"collector_transport_port", decodeUint}}, //collectorTransportPort
|
||||
217: {{"exporter_transport_port", decodeUint}}, //exporterTransportPort
|
||||
214: {{"export_proto_version", decodeUint}}, // exportProtocolVersion
|
||||
215: {{"export_transport_proto", decodeUint}}, // exportTransportProtocol
|
||||
216: {{"collector_transport_port", decodeUint}}, // collectorTransportPort
|
||||
217: {{"exporter_transport_port", decodeUint}}, // exporterTransportPort
|
||||
218: {{"tcp_syn_total", decodeUint}}, // tcpSynTotalCount
|
||||
219: {{"tcp_fin_total", decodeUint}}, // tcpFinTotalCount
|
||||
220: {{"tcp_rst_total", decodeUint}}, // tcpRstTotalCount
|
||||
|
|
|
|||
|
|
@ -6,9 +6,10 @@ import (
|
|||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/netsampler/goflow2/decoders/netflowlegacy"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/netsampler/goflow2/decoders/netflowlegacy"
|
||||
)
|
||||
|
||||
// Decoder structure
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
# Netflow v5, Netflow v9 and IPFIX collector
|
||||
[[inputs.netflow]]
|
||||
## Address to listen for netflow/ipfix packets.
|
||||
## Address to listen for netflow,ipfix or sflow packets.
|
||||
## example: service_address = "udp://:2055"
|
||||
## service_address = "udp4://:2055"
|
||||
## service_address = "udp6://:2055"
|
||||
|
|
@ -13,9 +13,10 @@
|
|||
|
||||
## Protocol version to use for decoding.
|
||||
## Available options are
|
||||
## "ipfix" -- IPFIX / Netflow v10 protocol (also works for Netflow v9)
|
||||
## "netflow v5" -- Netflow v5 protocol
|
||||
## "netflow v9" -- Netflow v9 protocol (also works for IPFIX)
|
||||
## "ipfix" -- IPFIX / Netflow v10 protocol (also works for Netflow v9)
|
||||
## "sflow v5" -- sFlow v5 protocol
|
||||
# protocol = "ipfix"
|
||||
|
||||
## Dump incoming packets to the log
|
||||
|
|
|
|||
|
|
@ -0,0 +1,421 @@
|
|||
package netflow
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/netsampler/goflow2/decoders/sflow"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
)
|
||||
|
||||
// Decoder structure
|
||||
type sflowv5Decoder struct {
|
||||
Log telegraf.Logger
|
||||
|
||||
warnedCounterRaw map[uint32]bool
|
||||
warnedFlowRaw map[int64]bool
|
||||
}
|
||||
|
||||
func (d *sflowv5Decoder) Init() error {
|
||||
if err := initL4ProtoMapping(); err != nil {
|
||||
return fmt.Errorf("initializing layer 4 protocol mapping failed: %w", err)
|
||||
}
|
||||
d.warnedCounterRaw = make(map[uint32]bool)
|
||||
d.warnedFlowRaw = make(map[int64]bool)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric, error) {
|
||||
src := srcIP.String()
|
||||
|
||||
// Decode the message
|
||||
buf := bytes.NewBuffer(payload)
|
||||
packet, err := sflow.DecodeMessage(buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Extract metrics
|
||||
msg, ok := packet.(sflow.Packet)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected message type %T", packet)
|
||||
}
|
||||
|
||||
t := time.Unix(0, int64(msg.Uptime)*int64(time.Millisecond))
|
||||
metrics := make([]telegraf.Metric, 0, len(msg.Samples))
|
||||
for _, s := range msg.Samples {
|
||||
tags := map[string]string{
|
||||
"source": src,
|
||||
"version": "sFlowV5",
|
||||
}
|
||||
|
||||
switch sample := s.(type) {
|
||||
case sflow.FlowSample:
|
||||
fields := map[string]interface{}{
|
||||
"ip_version": decodeSflowIPVersion(msg.IPVersion),
|
||||
"sys_uptime": msg.Uptime,
|
||||
"agent_ip": decodeIP(msg.AgentIP),
|
||||
"agent_subid": msg.SubAgentId,
|
||||
"seq_number": sample.Header.SampleSequenceNumber,
|
||||
"sampling_interval": sample.SamplingRate,
|
||||
"in_total_packets": sample.SamplePool,
|
||||
"sampling_drops": sample.Drops,
|
||||
"in_snmp": sample.Input,
|
||||
}
|
||||
if sample.Output>>31 == 0 {
|
||||
fields["out_snmp"] = sample.Output & 0x7fffffff
|
||||
}
|
||||
// Decode the source information
|
||||
if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" {
|
||||
fields[name] = sample.Header.SourceIdValue
|
||||
}
|
||||
// Decode the sampling direction
|
||||
if sample.Header.SourceIdValue == sample.Input {
|
||||
fields["direction"] = "ingress"
|
||||
} else {
|
||||
fields["direction"] = "egress"
|
||||
}
|
||||
recordFields, err := d.decodeFlowRecords(sample.Records)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for k, v := range recordFields {
|
||||
fields[k] = v
|
||||
}
|
||||
metrics = append(metrics, metric.New("netflow", tags, fields, t))
|
||||
case sflow.ExpandedFlowSample:
|
||||
fields := map[string]interface{}{
|
||||
"ip_version": decodeSflowIPVersion(msg.IPVersion),
|
||||
"sys_uptime": msg.Uptime,
|
||||
"agent_ip": decodeIP(msg.AgentIP),
|
||||
"agent_subid": msg.SubAgentId,
|
||||
"seq_number": sample.Header.SampleSequenceNumber,
|
||||
"sampling_interval": sample.SamplingRate,
|
||||
"in_total_packets": sample.SamplePool,
|
||||
"sampling_drops": sample.Drops,
|
||||
"in_snmp": sample.InputIfValue,
|
||||
}
|
||||
if sample.OutputIfFormat == 0 {
|
||||
fields["out_snmp"] = sample.OutputIfValue
|
||||
}
|
||||
// Decode the source information
|
||||
if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" {
|
||||
fields[name] = sample.Header.SourceIdValue
|
||||
}
|
||||
// Decode the sampling direction
|
||||
if sample.Header.SourceIdValue == sample.InputIfValue {
|
||||
fields["direction"] = "ingress"
|
||||
} else {
|
||||
fields["direction"] = "egress"
|
||||
}
|
||||
recordFields, err := d.decodeFlowRecords(sample.Records)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for k, v := range recordFields {
|
||||
fields[k] = v
|
||||
}
|
||||
metrics = append(metrics, metric.New("netflow", tags, fields, t))
|
||||
case sflow.CounterSample:
|
||||
fields := map[string]interface{}{
|
||||
"ip_version": decodeSflowIPVersion(msg.IPVersion),
|
||||
"sys_uptime": msg.Uptime,
|
||||
"agent_ip": decodeIP(msg.AgentIP),
|
||||
"agent_subid": msg.SubAgentId,
|
||||
"seq_number": sample.Header.SampleSequenceNumber,
|
||||
}
|
||||
// Decode the source information
|
||||
if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" {
|
||||
fields[name] = sample.Header.SourceIdValue
|
||||
}
|
||||
recordFields, err := d.decodeCounterRecords(sample.Records)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for k, v := range recordFields {
|
||||
fields[k] = v
|
||||
}
|
||||
metrics = append(metrics, metric.New("netflow", tags, fields, t))
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown record type %T", s)
|
||||
}
|
||||
}
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
func (d *sflowv5Decoder) decodeFlowRecords(records []sflow.FlowRecord) (map[string]interface{}, error) {
|
||||
fields := make(map[string]interface{})
|
||||
for _, r := range records {
|
||||
if r.Data == nil {
|
||||
continue
|
||||
}
|
||||
switch record := r.Data.(type) {
|
||||
case sflow.SampledHeader:
|
||||
fields["l2_protocol"] = decodeSflowHeaderProtocol(record.Protocol)
|
||||
fields["l2_bytes"] = record.FrameLength
|
||||
pktfields, err := d.decodeRawHeaderSample(&record)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for k, v := range pktfields {
|
||||
fields[k] = v
|
||||
}
|
||||
case sflow.SampledEthernet:
|
||||
fields["eth_total_len"] = record.Length
|
||||
fields["in_src_mac"] = decodeMAC(record.SrcMac)
|
||||
fields["out_dst_mac"] = decodeMAC(record.DstMac)
|
||||
fields["datalink_frame_type"] = layers.EthernetType(record.EthType & 0x0000ffff).String()
|
||||
case sflow.SampledIPv4:
|
||||
fields["ipv4_total_len"] = record.Base.Length
|
||||
fields["protocol"] = mapL4Proto(uint8(record.Base.Protocol & 0x000000ff))
|
||||
fields["src"] = decodeIP(record.Base.SrcIP)
|
||||
fields["dst"] = decodeIP(record.Base.DstIP)
|
||||
fields["src_port"] = record.Base.SrcPort
|
||||
fields["dst_port"] = record.Base.DstPort
|
||||
fields["src_tos"] = record.Tos
|
||||
fields["tcp_flags"] = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)})
|
||||
case sflow.SampledIPv6:
|
||||
fields["ipv6_total_len"] = record.Base.Length
|
||||
fields["protocol"] = mapL4Proto(uint8(record.Base.Protocol & 0x000000ff))
|
||||
fields["src"] = decodeIP(record.Base.SrcIP)
|
||||
fields["dst"] = decodeIP(record.Base.DstIP)
|
||||
fields["src_port"] = record.Base.SrcPort
|
||||
fields["dst_port"] = record.Base.DstPort
|
||||
fields["tcp_flags"] = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)})
|
||||
case sflow.ExtendedSwitch:
|
||||
fields["vlan_src"] = record.SrcVlan
|
||||
fields["vlan_src_priority"] = record.SrcPriority
|
||||
fields["vlan_dst"] = record.DstVlan
|
||||
fields["vlan_dst_priority"] = record.DstPriority
|
||||
case sflow.ExtendedRouter:
|
||||
fields["next_hop"] = decodeIP(record.NextHop)
|
||||
fields["src_mask"] = record.SrcMaskLen
|
||||
fields["dst_mask"] = record.DstMaskLen
|
||||
case sflow.ExtendedGateway:
|
||||
fields["next_hop"] = decodeIP(record.NextHop)
|
||||
fields["bgp_src_as"] = record.SrcAS
|
||||
fields["bgp_dst_as"] = record.ASDestinations
|
||||
fields["bgp_next_hop"] = decodeIP(record.NextHop)
|
||||
fields["bgp_prev_as"] = record.SrcPeerAS
|
||||
if len(record.ASPath) > 0 {
|
||||
fields["bgp_next_as"] = record.ASPath[0]
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unhandled flow record type %T", r.Data)
|
||||
}
|
||||
}
|
||||
return fields, nil
|
||||
}
|
||||
|
||||
func (d *sflowv5Decoder) decodeRawHeaderSample(record *sflow.SampledHeader) (map[string]interface{}, error) {
|
||||
var packet gopacket.Packet
|
||||
switch record.Protocol {
|
||||
case 1: // ETHERNET-ISO8023
|
||||
packet = gopacket.NewPacket(record.HeaderData, layers.LayerTypeEthernet, gopacket.Default)
|
||||
case 2: // ISO88024-TOKENBUS
|
||||
fallthrough
|
||||
case 3: // ISO88025-TOKENRING
|
||||
fallthrough
|
||||
case 4: // FDDI
|
||||
fallthrough
|
||||
case 5: // FRAME-RELAY
|
||||
fallthrough
|
||||
case 6: // X25
|
||||
fallthrough
|
||||
case 7: // PPP
|
||||
fallthrough
|
||||
case 8: // SMDS
|
||||
fallthrough
|
||||
case 9: // AAL5
|
||||
fallthrough
|
||||
case 10: // AAL5-IP
|
||||
fallthrough
|
||||
case 11: // IPv4
|
||||
fallthrough
|
||||
case 12: // IPv6
|
||||
fallthrough
|
||||
case 13: // MPLS
|
||||
fallthrough
|
||||
default:
|
||||
return nil, fmt.Errorf("unhandled protocol %d", record.Protocol)
|
||||
}
|
||||
|
||||
fields := make(map[string]interface{})
|
||||
for _, pkt := range packet.Layers() {
|
||||
switch l := pkt.(type) {
|
||||
case *layers.Ethernet:
|
||||
fields["in_src_mac"] = l.SrcMAC
|
||||
fields["out_dst_mac"] = l.DstMAC
|
||||
fields["datalink_frame_type"] = l.EthernetType.String()
|
||||
if l.Length > 0 {
|
||||
fields["eth_header_len"] = l.Length
|
||||
}
|
||||
case *layers.Dot1Q:
|
||||
fields["vlan_id"] = l.VLANIdentifier
|
||||
fields["vlan_priority"] = l.Priority
|
||||
fields["vlan_drop_eligible"] = l.DropEligible
|
||||
case *layers.IPv4:
|
||||
fields["ip_version"] = l.Version
|
||||
fields["ipv4_inet_header_len"] = l.IHL
|
||||
fields["src_tos"] = l.TOS
|
||||
fields["ipv4_total_len"] = l.Length
|
||||
fields["ipv4_id"] = l.Id // ?
|
||||
fields["ttl"] = l.TTL
|
||||
fields["protocol"] = mapL4Proto(uint8(l.Protocol))
|
||||
fields["src"] = l.SrcIP.String()
|
||||
fields["dst"] = l.DstIP.String()
|
||||
|
||||
flags := []byte("........")
|
||||
switch {
|
||||
case l.Flags&layers.IPv4EvilBit > 0:
|
||||
flags[7] = byte('E')
|
||||
case l.Flags&layers.IPv4DontFragment > 0:
|
||||
flags[6] = byte('D')
|
||||
case l.Flags&layers.IPv4MoreFragments > 0:
|
||||
flags[5] = byte('M')
|
||||
}
|
||||
fields["fragment_flags"] = string(flags)
|
||||
fields["fragment_offset"] = l.FragOffset
|
||||
fields["ip_total_len"] = l.Length
|
||||
case *layers.IPv6:
|
||||
fields["ip_version"] = l.Version
|
||||
fields["ipv6_total_len"] = l.Length
|
||||
fields["ttl"] = l.HopLimit
|
||||
fields["protocol"] = mapL4Proto(uint8(l.NextHeader))
|
||||
fields["src"] = l.SrcIP.String()
|
||||
fields["dst"] = l.DstIP.String()
|
||||
fields["ip_total_len"] = l.Length
|
||||
case *layers.TCP:
|
||||
fields["src_port"] = l.SrcPort
|
||||
fields["dst_port"] = l.DstPort
|
||||
fields["tcp_seq_number"] = l.Seq
|
||||
fields["tcp_ack_number"] = l.Ack
|
||||
fields["tcp_window_size"] = l.Window
|
||||
fields["tcp_urgent_ptr"] = l.Urgent
|
||||
flags := []byte("........")
|
||||
switch {
|
||||
case l.FIN:
|
||||
flags[7] = byte('F')
|
||||
case l.SYN:
|
||||
flags[6] = byte('S')
|
||||
case l.RST:
|
||||
flags[5] = byte('R')
|
||||
case l.PSH:
|
||||
flags[4] = byte('P')
|
||||
case l.ACK:
|
||||
flags[3] = byte('A')
|
||||
case l.URG:
|
||||
flags[2] = byte('U')
|
||||
case l.ECE:
|
||||
flags[1] = byte('E')
|
||||
case l.CWR:
|
||||
flags[0] = byte('C')
|
||||
}
|
||||
fields["tcp_flags"] = string(flags)
|
||||
case *layers.UDP:
|
||||
fields["src_port"] = l.SrcPort
|
||||
fields["dst_port"] = l.DstPort
|
||||
fields["ip_total_len"] = l.Length
|
||||
case *gopacket.Payload:
|
||||
// Ignore the payload
|
||||
default:
|
||||
ltype := int64(pkt.LayerType())
|
||||
if !d.warnedFlowRaw[ltype] {
|
||||
contents := hex.EncodeToString(pkt.LayerContents())
|
||||
payload := hex.EncodeToString(pkt.LayerPayload())
|
||||
d.Log.Warnf("Unknown flow raw flow message %s (%d):", pkt.LayerType().String(), pkt.LayerType())
|
||||
d.Log.Warnf(" contents: %s", contents)
|
||||
d.Log.Warnf(" payload: %s", payload)
|
||||
|
||||
d.Log.Warn("This message is only printed once.")
|
||||
}
|
||||
d.warnedFlowRaw[ltype] = true
|
||||
}
|
||||
}
|
||||
return fields, nil
|
||||
}
|
||||
|
||||
func (d *sflowv5Decoder) decodeCounterRecords(records []sflow.CounterRecord) (map[string]interface{}, error) {
|
||||
for _, r := range records {
|
||||
if r.Data == nil {
|
||||
continue
|
||||
}
|
||||
switch record := r.Data.(type) {
|
||||
case sflow.IfCounters:
|
||||
fields := map[string]interface{}{
|
||||
"interface": record.IfIndex,
|
||||
"interface_type": record.IfType,
|
||||
"speed": record.IfSpeed,
|
||||
"in_bytes": record.IfInOctets,
|
||||
"in_unicast_packets_total": record.IfInUcastPkts,
|
||||
"in_mcast_packets_total": record.IfInMulticastPkts,
|
||||
"in_broadcast_packets_total": record.IfInBroadcastPkts,
|
||||
"in_dropped_packets": record.IfInDiscards,
|
||||
"in_errors": record.IfInErrors,
|
||||
"in_unknown_protocol": record.IfInUnknownProtos,
|
||||
"out_bytes": record.IfOutOctets,
|
||||
"out_unicast_packets_total": record.IfOutUcastPkts,
|
||||
"out_mcast_packets_total": record.IfOutMulticastPkts,
|
||||
"out_broadcast_packets_total": record.IfOutBroadcastPkts,
|
||||
"out_dropped_packets": record.IfOutDiscards,
|
||||
"out_errors": record.IfOutErrors,
|
||||
"promiscuous": record.IfPromiscuousMode,
|
||||
}
|
||||
if record.IfStatus == 0 {
|
||||
fields["status"] = "down"
|
||||
} else {
|
||||
fields["status"] = "up"
|
||||
}
|
||||
return fields, nil
|
||||
case sflow.EthernetCounters:
|
||||
fields := map[string]interface{}{
|
||||
"type": "IEEE 802.3",
|
||||
"collision_frames_single": record.Dot3StatsSingleCollisionFrames,
|
||||
"collision_frames_multi": record.Dot3StatsMultipleCollisionFrames,
|
||||
"collisions_late": record.Dot3StatsLateCollisions,
|
||||
"collisions_excessive": record.Dot3StatsExcessiveCollisions,
|
||||
"deferred": record.Dot3StatsDeferredTransmissions,
|
||||
"errors_alignment": record.Dot3StatsAlignmentErrors,
|
||||
"errors_fcs": record.Dot3StatsFCSErrors,
|
||||
"errors_sqetest": record.Dot3StatsSQETestErrors,
|
||||
"errors_internal_mac_tx": record.Dot3StatsInternalMacTransmitErrors,
|
||||
"errors_internal_mac_rx": record.Dot3StatsInternalMacReceiveErrors,
|
||||
"errors_carrier_sense": record.Dot3StatsCarrierSenseErrors,
|
||||
"errors_frame_too_long": record.Dot3StatsFrameTooLongs,
|
||||
"errors_symbols": record.Dot3StatsSymbolErrors,
|
||||
}
|
||||
return fields, nil
|
||||
case *sflow.FlowRecordRaw:
|
||||
switch r.Header.DataFormat {
|
||||
case 1005:
|
||||
// Openflow port-name
|
||||
if len(record.Data) < 4 {
|
||||
return nil, fmt.Errorf("invalid data for raw counter %+v", r)
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"port_name": string(record.Data[4:]),
|
||||
}
|
||||
return fields, nil
|
||||
default:
|
||||
if !d.warnedCounterRaw[r.Header.DataFormat] {
|
||||
data := hex.EncodeToString(record.Data)
|
||||
d.Log.Warnf("Unknown counter raw flow message %d: %s", r.Header.DataFormat, data)
|
||||
d.Log.Warn("This message is only printed once.")
|
||||
}
|
||||
d.warnedCounterRaw[r.Header.DataFormat] = true
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("unhandled counter record type %T", r.Data)
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
netflow,source=127.0.0.1,version=sFlowV5 sys_uptime=12414u,agent_ip="192.168.119.184",agent_subid=100000u,seq_number=2u,in_snmp=3u,port_name="eno1",ip_version="IPv4" 12414000000
|
||||
Binary file not shown.
|
|
@ -0,0 +1,3 @@
|
|||
[[inputs.netflow]]
|
||||
service_address = "udp://127.0.0.1:0"
|
||||
protocol = "sflow v5"
|
||||
|
|
@ -636,3 +636,59 @@ func decodeCaptureTimeSemantics(b []byte) interface{} {
|
|||
}
|
||||
return "unassigned"
|
||||
}
|
||||
|
||||
func decodeSflowIPVersion(v uint32) string {
|
||||
switch v {
|
||||
case 0:
|
||||
return "unknown"
|
||||
case 1:
|
||||
return "IPv4"
|
||||
case 2:
|
||||
return "IPv6"
|
||||
}
|
||||
return strconv.FormatUint(uint64(v), 10)
|
||||
}
|
||||
|
||||
func decodeSflowSourceInterface(t uint32) string {
|
||||
switch t {
|
||||
case 0:
|
||||
return "in_snmp"
|
||||
case 1:
|
||||
return "in_vlan_id"
|
||||
case 2:
|
||||
return "in_phy_interface"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func decodeSflowHeaderProtocol(t uint32) string {
|
||||
switch t {
|
||||
case 1:
|
||||
return "ETHERNET-ISO8023"
|
||||
case 2:
|
||||
return "ISO88024-TOKENBUS"
|
||||
case 3:
|
||||
return "ISO88025-TOKENRING"
|
||||
case 4:
|
||||
return "FDDI"
|
||||
case 5:
|
||||
return "FRAME-RELAY"
|
||||
case 6:
|
||||
return "X25"
|
||||
case 7:
|
||||
return "PPP"
|
||||
case 8:
|
||||
return "SMDS"
|
||||
case 9:
|
||||
return "AAL5"
|
||||
case 10:
|
||||
return "AAL5-IP"
|
||||
case 11:
|
||||
return "IPv4"
|
||||
case 12:
|
||||
return "IPv6"
|
||||
case 13:
|
||||
return "MPLS"
|
||||
}
|
||||
return "unassigned"
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue