feat(inputs.netflow): Add support for sFlow drop notification packets (#15396)
This commit is contained in:
parent
f11ead9980
commit
afac9eb7a8
2
go.mod
2
go.mod
|
|
@ -147,7 +147,7 @@ require (
|
|||
github.com/multiplay/go-ts3 v1.1.0
|
||||
github.com/nats-io/nats-server/v2 v2.10.17
|
||||
github.com/nats-io/nats.go v1.36.0
|
||||
github.com/netsampler/goflow2/v2 v2.1.3
|
||||
github.com/netsampler/goflow2/v2 v2.1.5
|
||||
github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1
|
||||
github.com/nsqio/go-nsq v1.1.0
|
||||
github.com/nwaples/tacplus v0.0.3
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -1983,8 +1983,8 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh
|
|||
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/ncw/swift/v2 v2.0.2 h1:jx282pcAKFhmoZBSdMcCRFn9VWkoBIRsCpe+yZq7vEk=
|
||||
github.com/ncw/swift/v2 v2.0.2/go.mod h1:z0A9RVdYPjNjXVo2pDOPxZ4eu3oarO1P91fTItcb+Kg=
|
||||
github.com/netsampler/goflow2/v2 v2.1.3 h1:glfeG2hIzFlAmEz236nZIAWi848AB+p1pJnuQqiyLkI=
|
||||
github.com/netsampler/goflow2/v2 v2.1.3/go.mod h1:94ZaxfHuUwG6KviCxMWuZIvz0UGu657StE/g83hlS8A=
|
||||
github.com/netsampler/goflow2/v2 v2.1.5 h1:xW9xkBBNmSWaDjC5VsV7wK556pJB8dB9FsuthmcXKDA=
|
||||
github.com/netsampler/goflow2/v2 v2.1.5/go.mod h1:DnkDq99+sHUMUkR8PaN5Z4hLZALyrQObVhtz8zPSj8g=
|
||||
github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1 h1:6OX5VXMuj2salqNBc41eXKz6K+nV6OB/hhlGnAKCbwU=
|
||||
github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1/go.mod h1:2kY6OeOxrJ+RIQlVjWDc/pZlT3MIf30prs6drzMfJ6E=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
|
|
|
|||
|
|
@ -159,6 +159,43 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
|
|||
fields[k] = v
|
||||
}
|
||||
metrics = append(metrics, metric.New("netflow", tags, fields, t))
|
||||
case sflow.DropSample:
|
||||
fields := map[string]interface{}{
|
||||
"ip_version": decodeSflowIPVersion(msg.IPVersion),
|
||||
"sys_uptime": msg.Uptime,
|
||||
"agent_subid": msg.SubAgentId,
|
||||
"seq_number": sample.Header.SampleSequenceNumber,
|
||||
"sampling_drops": sample.Drops,
|
||||
"in_snmp": sample.Input,
|
||||
"out_snmp": sample.Output,
|
||||
"reason": sample.Reason,
|
||||
}
|
||||
|
||||
var err error
|
||||
fields["agent_ip"], err = decodeIP(msg.AgentIP)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err)
|
||||
}
|
||||
|
||||
// Decode the source information
|
||||
if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" {
|
||||
fields[name] = sample.Header.SourceIdValue
|
||||
}
|
||||
// Decode the sampling direction
|
||||
if sample.Header.SourceIdValue == sample.Input {
|
||||
fields["direction"] = "ingress"
|
||||
} else {
|
||||
fields["direction"] = "egress"
|
||||
}
|
||||
recordFields, err := d.decodeFlowRecords(sample.Records)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for k, v := range recordFields {
|
||||
fields[k] = v
|
||||
}
|
||||
metrics = append(metrics, metric.New("netflow", tags, fields, t))
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown record type %T", s)
|
||||
}
|
||||
|
|
@ -249,9 +286,6 @@ func (d *sflowv5Decoder) decodeFlowRecords(records []sflow.FlowRecord) (map[stri
|
|||
case sflow.ExtendedGateway:
|
||||
var err error
|
||||
fields["next_hop_ip_version"] = record.NextHopIPVersion
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err)
|
||||
}
|
||||
fields["next_hop"], err = decodeIP(record.NextHop)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err)
|
||||
|
|
@ -276,6 +310,21 @@ func (d *sflowv5Decoder) decodeFlowRecords(records []sflow.FlowRecord) (map[stri
|
|||
}
|
||||
fields["communities"] = strings.Join(parts, ",")
|
||||
fields["local_pref"] = record.LocalPref
|
||||
case sflow.EgressQueue:
|
||||
fields["out_queue"] = record.Queue
|
||||
case sflow.ExtendedACL:
|
||||
fields["acl_id"] = record.Number
|
||||
fields["acl_name"] = record.Name
|
||||
switch record.Direction {
|
||||
case 1:
|
||||
fields["direction"] = "ingress"
|
||||
case 2:
|
||||
fields["direction"] = "egress"
|
||||
default:
|
||||
fields["direction"] = "unknown"
|
||||
}
|
||||
case sflow.ExtendedFunction:
|
||||
fields["function"] = record.Symbol
|
||||
default:
|
||||
return nil, fmt.Errorf("unhandled flow record type %T", r.Data)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
netflow,source=127.0.0.1,version=sFlowV5 sys_uptime=12414u,agent_ip="192.168.119.184",agent_subid=100000u,seq_number=2u,direction="ingress",in_snmp=1u,out_snmp=2u,out_queue=42u,reason=1u,sampling_drops=256u,ip_version="IPv4" 12414000000
|
||||
Binary file not shown.
|
|
@ -0,0 +1,3 @@
|
|||
[[inputs.netflow]]
|
||||
service_address = "udp://127.0.0.1:0"
|
||||
protocol = "sflow v5"
|
||||
Loading…
Reference in New Issue