diff --git a/go.mod b/go.mod index 9a6fe3a32..3235d9327 100644 --- a/go.mod +++ b/go.mod @@ -141,7 +141,7 @@ require ( github.com/multiplay/go-ts3 v1.1.0 github.com/nats-io/nats-server/v2 v2.10.9 github.com/nats-io/nats.go v1.32.0 - github.com/netsampler/goflow2 v1.3.6 + github.com/netsampler/goflow2/v2 v2.1.2 github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1 github.com/nsqio/go-nsq v1.1.0 github.com/nwaples/tacplus v0.0.3 @@ -156,7 +156,7 @@ require ( github.com/peterbourgon/unixtransport v0.0.4 github.com/pion/dtls/v2 v2.2.8 github.com/prometheus-community/pro-bing v0.3.0 - github.com/prometheus/client_golang v1.17.0 + github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.5.0 github.com/prometheus/common v0.45.0 github.com/prometheus/procfs v0.12.0 @@ -223,17 +223,6 @@ require ( modernc.org/sqlite v1.28.0 ) -require ( - filippo.io/edwards25519 v1.1.0 // indirect - github.com/apache/arrow/go/v14 v14.0.2 // indirect - github.com/distribution/reference v0.5.0 // indirect - github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fxamacker/cbor/v2 v2.6.0 // indirect - github.com/moby/sys/user v0.1.0 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect -) - require ( cloud.google.com/go v0.112.0 // indirect cloud.google.com/go/compute v1.23.3 // indirect @@ -241,6 +230,7 @@ require ( cloud.google.com/go/iam v1.1.6 // indirect code.cloudfoundry.org/clock v1.0.0 // indirect dario.cat/mergo v1.0.0 // indirect + filippo.io/edwards25519 v1.1.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/Azure/azure-amqp-common-go/v4 v4.2.0 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect @@ -268,6 +258,7 @@ require ( github.com/Microsoft/hcsshim v0.11.4 // indirect github.com/alecthomas/participle v0.4.1 // indirect github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect + github.com/apache/arrow/go/v14 v14.0.2 // indirect github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/awnumar/memcall v0.2.0 // indirect @@ -308,6 +299,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/devigned/tab v0.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/distribution/reference v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect @@ -316,7 +308,9 @@ require ( github.com/eapache/queue v1.1.0 // indirect github.com/echlebek/timeproxy v1.0.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect + github.com/fxamacker/cbor/v2 v2.6.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/go-asn1-ber/asn1-ber v1.5.5 // indirect github.com/go-logr/logr v1.4.1 // indirect @@ -398,6 +392,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/moby/patternmatcher v0.6.0 // indirect github.com/moby/sys/sequential v0.5.0 // indirect + github.com/moby/sys/user v0.1.0 // indirect github.com/moby/term v0.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -462,6 +457,8 @@ require ( go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector/consumer v0.84.0 // indirect go.opentelemetry.io/collector/semconv v0.87.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect go.opentelemetry.io/otel v1.23.0 // indirect go.opentelemetry.io/otel/metric v1.23.0 // indirect go.opentelemetry.io/otel/sdk v1.21.0 // indirect diff --git a/go.sum b/go.sum index 14a59bf34..31035f4af 100644 --- a/go.sum +++ b/go.sum @@ -1850,8 +1850,8 @@ github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/netsampler/goflow2 v1.3.6 h1:fZbHDcWPcG+nkg2wGHCv4VJ9MrG8iA16YmuYhrSAEdQ= -github.com/netsampler/goflow2 v1.3.6/go.mod h1:4UZsVGVAs//iMCptUHn3WNScztJeUhZH7kDW2+/vDdQ= +github.com/netsampler/goflow2/v2 v2.1.2 h1:jgzUC+xZ1B0T7iv1tyz+DFQKgWvwVIPFRdzc84XTX4g= +github.com/netsampler/goflow2/v2 v2.1.2/go.mod h1:mDkDLl+uSFLq7aRuQ113+ZAJN0HdzCx/Dgf0wCmr+Cc= github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1 h1:6OX5VXMuj2salqNBc41eXKz6K+nV6OB/hhlGnAKCbwU= github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1/go.mod h1:2kY6OeOxrJ+RIQlVjWDc/pZlT3MIf30prs6drzMfJ6E= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -1977,8 +1977,8 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= -github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= -github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= +github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/plugins/inputs/netflow/netflow_decoder.go b/plugins/inputs/netflow/netflow_decoder.go index 82e6e1777..0e35aa66f 100644 --- a/plugins/inputs/netflow/netflow_decoder.go +++ b/plugins/inputs/netflow/netflow_decoder.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/netsampler/goflow2/decoders/netflow" + "github.com/netsampler/goflow2/v2/decoders/netflow" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" @@ -533,7 +533,7 @@ type netflowDecoder struct { PENFiles []string Log telegraf.Logger - templates map[string]*netflow.BasicTemplateSystem + templates map[string]netflow.NetFlowTemplateSystem mappingsV9 map[uint16]fieldMapping mappingsIPFIX map[uint16]fieldMapping mappingsPEN map[string]fieldMapping @@ -557,11 +557,11 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric d.Unlock() // Decode the overall message + var msg9 netflow.NFv9Packet + var msg10 netflow.IPFIXPacket buf := bytes.NewBuffer(payload) - packet, err := netflow.DecodeMessage(buf, templates) - if err != nil { - var terr *netflow.ErrorTemplateNotFound - if errors.As(err, &terr) { + if err := netflow.DecodeMessageVersion(buf, templates, &msg9, &msg10); err != nil { + if errors.Is(err, netflow.ErrorTemplateNotFound) { d.Log.Warnf("%v; skipping packet", err) return nil, nil } @@ -569,8 +569,9 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric } // Extract metrics - switch msg := packet.(type) { - case netflow.NFv9Packet: + switch { + case msg9.Version == 9: + msg := msg9 for _, flowsets := range msg.FlowSets { switch fs := flowsets.(type) { case netflow.TemplateFlowSet: @@ -597,7 +598,8 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric } } } - case netflow.IPFIXPacket: + case msg10.Version == 10: + msg := msg10 for _, flowsets := range msg.FlowSets { switch fs := flowsets.(type) { case netflow.TemplateFlowSet: @@ -626,7 +628,7 @@ func (d *netflowDecoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric } } default: - return nil, fmt.Errorf("invalid message of type %T", packet) + return nil, errors.New("invalid message of type") } return metrics, nil @@ -640,7 +642,7 @@ func (d *netflowDecoder) Init() error { return fmt.Errorf("initializing IPv4 options mapping failed: %w", err) } - d.templates = make(map[string]*netflow.BasicTemplateSystem) + d.templates = make(map[string]netflow.NetFlowTemplateSystem) d.mappingsV9 = make(map[uint16]fieldMapping) d.mappingsIPFIX = make(map[uint16]fieldMapping) d.mappingsPEN = make(map[string]fieldMapping) diff --git a/plugins/inputs/netflow/netflow_test.go b/plugins/inputs/netflow/netflow_test.go index ae5db6f3b..98a58e734 100644 --- a/plugins/inputs/netflow/netflow_test.go +++ b/plugins/inputs/netflow/netflow_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/netsampler/goflow2/v2/decoders/netflow" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" @@ -153,7 +154,7 @@ func TestMissingTemplate(t *testing.T) { var found bool for _, w := range logger.Warnings() { - found = found || strings.Contains(w, "No info template 261 found for and domain id 231; skipping packet") + found = found || strings.Contains(w, netflow.ErrorTemplateNotFound.Error()+"; skipping packet") } require.True(t, found, "warning not found") } diff --git a/plugins/inputs/netflow/netflow_v5.go b/plugins/inputs/netflow/netflow_v5.go index a29bcb4ee..ee3e9d2c3 100644 --- a/plugins/inputs/netflow/netflow_v5.go +++ b/plugins/inputs/netflow/netflow_v5.go @@ -6,7 +6,7 @@ import ( "net" "time" - "github.com/netsampler/goflow2/decoders/netflowlegacy" + "github.com/netsampler/goflow2/v2/decoders/netflowlegacy" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" @@ -26,18 +26,13 @@ func (d *netflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metr src := srcIP.String() // Decode the message + var msg netflowlegacy.PacketNetFlowV5 buf := bytes.NewBuffer(payload) - packet, err := netflowlegacy.DecodeMessage(buf) - if err != nil { + if err := netflowlegacy.DecodeMessageVersion(buf, &msg); err != nil { return nil, err } // Extract metrics - msg, ok := packet.(netflowlegacy.PacketNetFlowV5) - if !ok { - return nil, fmt.Errorf("unexpected message type %T", packet) - } - t := time.Unix(int64(msg.UnixSecs), int64(msg.UnixNSecs)) metrics := make([]telegraf.Metric, 0, len(msg.Records)) for _, record := range msg.Records { @@ -72,15 +67,15 @@ func (d *netflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metr if err != nil { return nil, fmt.Errorf("decoding 'engine_id' failed: %w", err) } - fields["src"], err = decodeIPFromUint32(record.SrcAddr) + fields["src"], err = decodeIPFromUint32(uint32(record.SrcAddr)) if err != nil { return nil, fmt.Errorf("decoding 'src' failed: %w", err) } - fields["dst"], err = decodeIPFromUint32(record.DstAddr) + fields["dst"], err = decodeIPFromUint32(uint32(record.DstAddr)) if err != nil { return nil, fmt.Errorf("decoding 'dst' failed: %w", err) } - fields["next_hop"], err = decodeIPFromUint32(record.NextHop) + fields["next_hop"], err = decodeIPFromUint32(uint32(record.NextHop)) if err != nil { return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err) } diff --git a/plugins/inputs/netflow/sflow_v5.go b/plugins/inputs/netflow/sflow_v5.go index 903abbb41..9bc166029 100644 --- a/plugins/inputs/netflow/sflow_v5.go +++ b/plugins/inputs/netflow/sflow_v5.go @@ -9,7 +9,7 @@ import ( "github.com/google/gopacket" "github.com/google/gopacket/layers" - "github.com/netsampler/goflow2/decoders/sflow" + "github.com/netsampler/goflow2/v2/decoders/sflow" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" @@ -38,18 +38,13 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric src := srcIP.String() // Decode the message + var msg sflow.Packet buf := bytes.NewBuffer(payload) - packet, err := sflow.DecodeMessage(buf) - if err != nil { + if err := sflow.DecodeMessageVersion(buf, &msg); err != nil { return nil, err } // Extract metrics - msg, ok := packet.(sflow.Packet) - if !ok { - return nil, fmt.Errorf("unexpected message type %T", packet) - } - metrics := make([]telegraf.Metric, 0, len(msg.Samples)) for _, s := range msg.Samples { tags := map[string]string{ @@ -70,6 +65,7 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric "in_snmp": sample.Input, } + var err error fields["agent_ip"], err = decodeIP(msg.AgentIP) if err != nil { return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err) @@ -107,6 +103,8 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric "sampling_drops": sample.Drops, "in_snmp": sample.InputIfValue, } + + var err error fields["agent_ip"], err = decodeIP(msg.AgentIP) if err != nil { return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err) @@ -140,6 +138,8 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric "agent_subid": msg.SubAgentId, "seq_number": sample.Header.SampleSequenceNumber, } + + var err error fields["agent_ip"], err = decodeIP(msg.AgentIP) if err != nil { return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err) @@ -196,38 +196,38 @@ func (d *sflowv5Decoder) decodeFlowRecords(records []sflow.FlowRecord) (map[stri fields["datalink_frame_type"] = layers.EthernetType(record.EthType & 0x0000ffff).String() case sflow.SampledIPv4: var err error - fields["ipv4_total_len"] = record.Base.Length - fields["protocol"] = mapL4Proto(uint8(record.Base.Protocol & 0x000000ff)) - fields["src"], err = decodeIP(record.Base.SrcIP) + fields["ipv4_total_len"] = record.Length + fields["protocol"] = mapL4Proto(uint8(record.Protocol & 0x000000ff)) + fields["src"], err = decodeIP(record.SrcIP) if err != nil { return nil, fmt.Errorf("decoding 'src' failed: %w", err) } - fields["dst"], err = decodeIP(record.Base.DstIP) + fields["dst"], err = decodeIP(record.DstIP) if err != nil { return nil, fmt.Errorf("decoding 'dst' failed: %w", err) } - fields["src_port"] = record.Base.SrcPort - fields["dst_port"] = record.Base.DstPort + fields["src_port"] = record.SrcPort + fields["dst_port"] = record.DstPort fields["src_tos"] = record.Tos - fields["tcp_flags"], err = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)}) + fields["tcp_flags"], err = decodeTCPFlags([]byte{byte(record.TcpFlags & 0x000000ff)}) if err != nil { return nil, fmt.Errorf("decoding 'tcp_flags' failed: %w", err) } case sflow.SampledIPv6: var err error - fields["ipv6_total_len"] = record.Base.Length - fields["protocol"] = mapL4Proto(uint8(record.Base.Protocol & 0x000000ff)) - fields["src"], err = decodeIP(record.Base.SrcIP) + fields["ipv6_total_len"] = record.Length + fields["protocol"] = mapL4Proto(uint8(record.Protocol & 0x000000ff)) + fields["src"], err = decodeIP(record.SrcIP) if err != nil { return nil, fmt.Errorf("decoding 'src' failed: %w", err) } - fields["dst"], err = decodeIP(record.Base.DstIP) + fields["dst"], err = decodeIP(record.DstIP) if err != nil { return nil, fmt.Errorf("decoding 'dst' failed: %w", err) } - fields["src_port"] = record.Base.SrcPort - fields["dst_port"] = record.Base.DstPort - fields["tcp_flags"], err = decodeTCPFlags([]byte{byte(record.Base.TcpFlags & 0x000000ff)}) + fields["src_port"] = record.SrcPort + fields["dst_port"] = record.DstPort + fields["tcp_flags"], err = decodeTCPFlags([]byte{byte(record.TcpFlags & 0x000000ff)}) if err != nil { return nil, fmt.Errorf("decoding 'tcp_flags' failed: %w", err) } @@ -445,7 +445,7 @@ func (d *sflowv5Decoder) decodeCounterRecords(records []sflow.CounterRecord) (ma "errors_symbols": record.Dot3StatsSymbolErrors, } return fields, nil - case *sflow.FlowRecordRaw: + case sflow.RawRecord: switch r.Header.DataFormat { case 1005: // Openflow port-name