From 731fb3596d17b7a96c6998d0ff958965365edf25 Mon Sep 17 00:00:00 2001 From: Samantha Wang Date: Tue, 28 Jul 2020 11:49:11 -0700 Subject: [PATCH 1/5] add external plugins info --- CONTRIBUTING.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d68d726dc..11c30d785 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -14,9 +14,11 @@ 1. Open a new [pull request][]. #### Contributing an External Plugin *(experimental)* -Input plugins written for internal Telegraf can be run as externally-compiled plugins through the [Execd Input Plugin](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/execd) without having to change the plugin code. +Input, output, and processor plugins written for internal Telegraf can be run as externally-compiled plugins through the [Execd Input](plugins/inputs/execd), [Execd Output](/plugins/inputs/execd), and [Execd Processor](plugins/processors/execd) Plugins without having to change the plugin code. -Follow the guidelines of how to integrate your plugin with the [Execd Go Shim](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/execd/shim) to easily compile it as a separate app and run it from the inputs.execd plugin. +Follow the guidelines of how to integrate your plugin with the [Execd Go Shim](/plugins/common/shim) to easily compile it as a separate app and run it with the respective `execd` plugin. + +Check out some guidelines on how to build and set up your external plugins to run with `execd`. #### Security Vulnerability Reporting InfluxData takes security and our users' trust very seriously. If you believe you have found a security issue in any of our From 2344a01e739ddb5f849d26a42e6e74acfb298ee6 Mon Sep 17 00:00:00 2001 From: Samantha Wang Date: Tue, 28 Jul 2020 11:54:44 -0700 Subject: [PATCH 2/5] add external plugins in docs --- docs/EXTERNAL_PLUGINS.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 docs/EXTERNAL_PLUGINS.md diff --git a/docs/EXTERNAL_PLUGINS.md b/docs/EXTERNAL_PLUGINS.md new file mode 100644 index 000000000..ca13e8563 --- /dev/null +++ b/docs/EXTERNAL_PLUGINS.md @@ -0,0 +1,31 @@ +### External Plugins + +External plugins are external programs that are built outside of Telegraf that +can run through an `execd` plugin. These external plugins allow for more flexibility +compared to internal Telegraf plugins. + +- External plugins can be written in any language (internal Telegraf plugins can only written in Go) +- External plugins can access to libraries not written in Go +- Utilize licensed software that isn't available to the open source community +- Can include large dependencies that would otherwise bloat Telegraf + +### External Plugin Guidelines +The guidelines of writing external plugins would follow those for our general [input](docs/INPUTS.md), +[output](docs/OUTPUTS.md), [processor](docs/PROCESSORS.md), and [aggregator](docs/AGGREGATOR.md) plugins. +Please reference the documentation on how to create these plugins written in Go. + + +## Execd Go Shim +For Go plugins, there is a [Execd Go Shim](plugins/common/shim) that will make it trivial to extract an internal input, processor, or output plugin from the main Telegraf repo out to a stand-alone repo. This shim This allows anyone to build and run it as a separate app using one of the `execd`plugins: +- [inputs.execd](/plugins/inputs/execd) +- [processors.execd](/plugins/processors/execd) +- [outputs.execd](/plugins/outputs/execd) + +Follow the [Steps to externalize a plugin](plugins/common/shim#steps-to-externalize-a-plugin) and [Steps to build and run your plugin](plugins/common/shim#steps-to-build-and-run-your-plugin) to properly with the Execd Go Shim + +#### Step-by-Step guidelines +To-be-added + + + + From 72aa771891c7e09be08295c51294771d5b5509f1 Mon Sep 17 00:00:00 2001 From: Samantha Wang Date: Tue, 28 Jul 2020 12:13:26 -0700 Subject: [PATCH 3/5] edit docs --- CONTRIBUTING.md | 2 +- docs/EXTERNAL_PLUGINS.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 11c30d785..0fcca4306 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -18,7 +18,7 @@ Input, output, and processor plugins written for internal Telegraf can be run as Follow the guidelines of how to integrate your plugin with the [Execd Go Shim](/plugins/common/shim) to easily compile it as a separate app and run it with the respective `execd` plugin. -Check out some guidelines on how to build and set up your external plugins to run with `execd`. +Check out our [guidelines](docs/EXTERNAL_PLUGINS.md#external-plugin-guidelines) on how to build and set up your external plugins to run with `execd`. #### Security Vulnerability Reporting InfluxData takes security and our users' trust very seriously. If you believe you have found a security issue in any of our diff --git a/docs/EXTERNAL_PLUGINS.md b/docs/EXTERNAL_PLUGINS.md index ca13e8563..28ea924bb 100644 --- a/docs/EXTERNAL_PLUGINS.md +++ b/docs/EXTERNAL_PLUGINS.md @@ -15,7 +15,7 @@ The guidelines of writing external plugins would follow those for our general [i Please reference the documentation on how to create these plugins written in Go. -## Execd Go Shim +#### Execd Go Shim For Go plugins, there is a [Execd Go Shim](plugins/common/shim) that will make it trivial to extract an internal input, processor, or output plugin from the main Telegraf repo out to a stand-alone repo. This shim This allows anyone to build and run it as a separate app using one of the `execd`plugins: - [inputs.execd](/plugins/inputs/execd) - [processors.execd](/plugins/processors/execd) From be90a96fedf27efca04e6a9c2e9e5c75a1cc72f0 Mon Sep 17 00:00:00 2001 From: Samantha Wang Date: Thu, 15 Oct 2020 12:26:26 -0700 Subject: [PATCH 4/5] add netflow plugin --- plugins/inputs/netflow/README.md | 82 ++++++++++ plugins/inputs/netflow/netflow.go | 216 +++++++++++++++++++++++++ plugins/inputs/netflow/netflow_test.go | 92 +++++++++++ 3 files changed, 390 insertions(+) create mode 100644 plugins/inputs/netflow/README.md create mode 100644 plugins/inputs/netflow/netflow.go create mode 100644 plugins/inputs/netflow/netflow_test.go diff --git a/plugins/inputs/netflow/README.md b/plugins/inputs/netflow/README.md new file mode 100644 index 000000000..29ae32755 --- /dev/null +++ b/plugins/inputs/netflow/README.md @@ -0,0 +1,82 @@ +# SFlow Input Plugin + +The Netflow Input Plugin provides support for acting as an Netflow V9/V10 collector in accordance with the specification from [IETF](https://tools.ietf.org/html/rfc7011). + + +# Configuration +The following configuration options are availabe: + +| Name | Description +|---|---| +| service_address| URL to listen on expressed as UDP (IPv4 or 6) OP address and port number +| | Example: ```service_address = "udp://:2055"``` +| read_buffer_size | Maximum socket buffer size (in bytes when no unit specified). Once the buffer fills up, metrics will start dropping. Defaults to the OS default. +||Example = ```read_buffer_size"64KiB"``` | +| dns_multi_name_processor | An optional regexp and template to use to transform a DNS resolve name. Particularily useful when DNS resolves an IP address to more than one name, and they alternative in order when queried. Using this processor command it is possible to tranform the name into something common irrespect of which entry is first - if the names conform to a regular naming schema. Note TOML [escape sequences](https://github.com/toml-lang/toml) may be required. +||Example: ````s/(.*)(?:-net[0-9])/$1```` will strip ```-net``` from the host name thereby converting, as an example, ```hostx-net1``` and ```hostx-net2``` both to ```hostx``` +|dns_fqdn_resolve|Determines whether IP addresses should be resolved to Host names. +||Example: ```dns_fqdn_resolve = true``` +|dns_fqdn_cache_ttl|The time to live for entries in the DNS name cache expressed in seconds. Default is 0 which is infinite +||Example: ```dns_fwdn_cache_ttl = 3600``` + +## Configuration: + +This is a sample configuration for the plugin. + +```toml +[[inputs.netflow]] + ## URL to listen on + # service_address = "udp://:2055" + # service_address = "udp4://:2055" + # service_address = "udp6://:2055" + + ## Maximum socket buffer size (in bytes when no unit specified). + ## For stream sockets, once the buffer fills up, the sender will start backing up. + ## For datagram sockets, once the buffer fills up, metrics will start dropping. + ## Defaults to the OS default. + # read_buffer_size = "64KiB" + + # Whether IP addresses should be resolved to host names + # dns_fqdn_resolve = true + + # How long should resolved IP->Hostnames be cached (in seconds) + # dns_fqdn_cache_ttl = 3600 + + # Optional processing instructions for transforming DNS resolve host names + # dns_multi_name_processor = "s/(.*)(?:-net[0-9])/$1" +``` + +## DNS Name and SNMP Interface name resolution and caching + +Raw Netflow packets, and their sample data, communicate IP addresses which are not very useful to humans. + +The Netflow plugin can be configured to attempt to resolve IP addresses to host names via DNS. + +The resolved names, or in the case of a resolution error the ip/id will be used as 'the' name, are configurably cached for a period of time to avoid continual lookups. + +| Source IP Tag | Resolved Host Tag +|---|---| +|agentAddress|agentHost| +|sourceIPv4Address|sourceIPv4Host| +|destinationIPv4Address|sourceIPv4Host| +|sourceIPv6Address|sourceIPv6Host| +|destinationIPv6Address|destinationIPv6Host| +|exporterIPv4Address|exporterIPv4Host| +|exporterIPv6Address|exporterIPv6Host| + + +### Multipe DNS Name resolution & processing + +In some cases DNS servers may maintain multiple entries for the same IP address in support of load balancing. In this setup the same IP address may be resolved to multiple DNS names, via a single DNS query, and it is likely the order of those DNS names will change over time. + +In order to provide some stability to the names recorded against flow records, it is possible to provide a regular expression and template transformation that should be capable of converting multiple names to a single common name where a mathodical naming scheme has been used. + +Example: ````s/(.*)(?:-net[0-9])/$1```` will strip ```-net``` from the host name thereby converting, as an example, ```hostx-net1``` and ```hostx-net2``` both to ```hostx``` + +# Schema + +The parsing of Netflow packets is handled by the Netflow Parser and the schema is described [here](../../parsers/network_flow/netflow/README.md). + +At a high level, individual Flow Samples within the V10 Flow Packet are translated to individual Metric objects. + + diff --git a/plugins/inputs/netflow/netflow.go b/plugins/inputs/netflow/netflow.go new file mode 100644 index 000000000..4a0018943 --- /dev/null +++ b/plugins/inputs/netflow/netflow.go @@ -0,0 +1,216 @@ +// Package sflow contains a Telegraf input plugin that listens for SFLow V5 network flow sample monitoring packets, parses them to extract flow +// samples which it turns into Metrics for output +package sflow + +import ( + "fmt" + "io" + "log" + "net" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/inputs/network_flow" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/network_flow/netflow" +) + +type setReadBufferer interface { + SetReadBuffer(bytes int) error +} + +type packetListener struct { + net.PacketConn + *Listener + network_flow.Resolver +} + +func (psl *packetListener) listen() { + buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet + for { + n, a, err := psl.ReadFrom(buf) + if err != nil { + if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + psl.AddError(err) + } + break + } + psl.process(a, buf[:n]) + } +} + +func (psl *packetListener) process(addr net.Addr, buf []byte) { + fmt.Println("netflow received len(buf)", len(buf)) + metrics, err := psl.Parse(buf) + if err != nil { + psl.AddError(fmt.Errorf("unable to parse incoming packet: %s", err)) + + } + fmt.Println("netflow resulted in len(metrisc), err", len(metrics), err) + + for _, m := range metrics { + if h, _, e := net.SplitHostPort(addr.String()); e == nil { + m.AddTag("agentAddress", h) + } + psl.Resolver.Resolve(m, func(resolvedM telegraf.Metric) { + psl.AddMetric(resolvedM) + }) + } +} + +// Listener configuration structure +type Listener struct { + ServiceAddress string `toml:"service_address"` + ReadBufferSize internal.Size `toml:"read_buffer_size"` + + SNMPCommunity string `toml:"snmp_community"` + SNMPIfaceResolve bool `toml:"snmp_iface_resolve"` + SNMPIfaceCacheTTL int `toml:"snmp_iface_cache_ttl"` + + DNSFQDNResolve bool `toml:"dns_fqdn_resolve"` + DNSFQDNCacheTTL int `toml:"dns_fqdn_cache_ttl"` + DNSMultiNameProcessor string `toml:"dns_multi_name_processor"` + + nameResolver network_flow.Resolver + parsers.Parser + telegraf.Accumulator + io.Closer +} + +// Description answers a description of this input plugin +func (sl *Listener) Description() string { + return "Netflow v9/v10 Protocol Listener" +} + +// SampleConfig answers a sample configuration +func (sl *Listener) SampleConfig() string { + return ` + ## URL to listen on + # service_address = "udp://:2055" + # service_address = "udp4://:2055" + # service_address = "udp6://:2055" + + ## Maximum socket buffer size (in bytes when no unit specified). + ## For stream sockets, once the buffer fills up, the sender will start backing up. + ## For datagram sockets, once the buffer fills up, metrics will start dropping. + ## Defaults to the OS default. + # read_buffer_size = "64KiB" + + # Whether IP addresses should be resolved to host names + # dns_fqdn_resolve = true + + # How long should resolved IP->Hostnames be cached (in seconds) + # dns_fqdn_cache_ttl = 3600 + + # Optional processing instructions for transforming DNS resolve host names + # dns_multi_name_processor = "s/(.*)(?:-net[0-9])/$1" + + # Whether Interface Indexes should be resolved to Interface Names via SNMP + # snmp_iface_resolve = true + + # SNMP Community string to use when resolving Interface Names + # snmp_community = "public" + + # How long should resolved Iface Index->Iface Name be cached (in seconds) + # snmp_iface_cache_ttl = 3600 + ` +} + +// Gather is a NOP for sFlow as it receives, asynchronously, sFlow network packets +func (sl *Listener) Gather(_ telegraf.Accumulator) error { + return nil +} + +// Start starts this sFlow listener listening on the configured network for sFlow packets +func (sl *Listener) Start(acc telegraf.Accumulator) error { + + dnsToResolve := map[string]string{ + "agentAddress": "agentHost", + "sourceIPv4Address": "sourceIPv4Host", + "destinationIPv4Address": "sourceIPv4Host", + "sourceIPv6Address": "sourceIPv6Host", + "destinationIPv6Address": "destinationIPv6Host", + "exporterIPv4Address": "exporterIPv4Host", + "exporterIPv6Address": "exporterIPv6Host", + } + + sl.Accumulator = acc + sl.nameResolver = network_flow.NewAsyncResolver(sl.DNSFQDNResolve, time.Duration(sl.DNSFQDNCacheTTL)*time.Second, sl.DNSMultiNameProcessor, sl.SNMPIfaceResolve, time.Duration(sl.SNMPIfaceCacheTTL)*time.Second, sl.SNMPCommunity, "netflow", dnsToResolve) + sl.nameResolver.Start() + + parser, err := netflow.NewParser("netflow", make(map[string]string)) + if err != nil { + return err + } + sl.Parser = parser + + spl := strings.SplitN(sl.ServiceAddress, "://", 2) + if len(spl) != 2 { + return fmt.Errorf("invalid service address: %s", sl.ServiceAddress) + } + + protocol := spl[0] + addr := spl[1] + + pc, err := newUDPListener(protocol, addr) + if err != nil { + return err + } + if sl.ReadBufferSize.Size > 0 { + if srb, ok := pc.(setReadBufferer); ok { + srb.SetReadBuffer(int(sl.ReadBufferSize.Size)) + } else { + log.Printf("W! Unable to set read buffer on a %s socket", protocol) + } + } + + log.Printf("I! [inputs.netflow] Listening on %s://%s", protocol, pc.LocalAddr()) + + psl := &packetListener{ + PacketConn: pc, + Listener: sl, + Resolver: sl.nameResolver, + } + + sl.Closer = psl + go psl.listen() + + return nil +} + +// Stop this Listener +func (sl *Listener) Stop() { + if sl.Closer != nil { + sl.Close() + sl.Closer = nil + } + sl.nameResolver.Stop() +} + +// newListener constructs a new vanilla, unconfigured, listener and returns it +func newListener() *Listener { + p, _ := netflow.NewParser("netflow", make(map[string]string)) + return &Listener{Parser: p} +} + +// newUDPListener answers a net.PacketConn for the expected UDP network and address passed in +func newUDPListener(network string, address string) (net.PacketConn, error) { + switch network { + case "udp", "udp4", "udp6": + addr, err := net.ResolveUDPAddr(network, address) + if err != nil { + return nil, err + } + return net.ListenUDP(network, addr) + default: + return nil, fmt.Errorf("unsupported network type %s", network) + } +} + +// init registers this SFflow input plug in with the Telegraf framework +func init() { + inputs.Add("netflow", func() telegraf.Input { return newListener() }) +} diff --git a/plugins/inputs/netflow/netflow_test.go b/plugins/inputs/netflow/netflow_test.go new file mode 100644 index 000000000..40d1bfc64 --- /dev/null +++ b/plugins/inputs/netflow/netflow_test.go @@ -0,0 +1,92 @@ +package sflow + +import ( + "bytes" + "encoding/hex" + "fmt" + "io" + "log" + "net" + "os" + "testing" + + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/influxdata/wlog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// testEmptyLog is a helper function to ensure no data is written to log. +// Should be called at the start of the test, and returns a function which should run at the end. +func testEmptyLog(t *testing.T) func() { + buf := bytes.NewBuffer(nil) + log.SetOutput(wlog.NewWriter(buf)) + + level := wlog.WARN + wlog.SetLevel(level) + + return func() { + log.SetOutput(os.Stderr) + + for { + line, err := buf.ReadBytes('\n') + if err != nil { + assert.Equal(t, io.EOF, err) + break + } + assert.Empty(t, string(line), "log not empty") + } + } +} + +func TestNetflowDescription(t *testing.T) { + sl := newListener() + assert.NotEmpty(t, sl.Description()) +} + +func TestNetflowSampleConfig(t *testing.T) { + sl := newListener() + assert.NotEmpty(t, sl.SampleConfig()) +} + +func TestNetflowGather(t *testing.T) { + sl := newListener() + assert.Nil(t, sl.Gather(nil)) +} + +func TestNetflowToMetrics(t *testing.T) { + defer testEmptyLog(t)() + + sl := newListener() + sl.ServiceAddress = "udp://127.0.0.1:0" + sl.ReadBufferSize = internal.Size{Size: 1024} + sl.DNSFQDNResolve = false + + acc := &testutil.Accumulator{} + err := sl.Start(acc) + require.NoError(t, err) + defer sl.Stop() + + client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String()) + require.NoError(t, err) + + template257And258 := []byte("00090004000071d45dc583690000000000000041000000840101000f00010004000200040004000100050001000600010007000200080004000a0004000b0002000c0004000e0004001000040011000400150004001600040102000f000100040002000400040001000500010006000100070002000a0004000b0002000e000400100004001100040015000400160004001b0010001c00100001001801030004000800010004002a000400290004000001030010000000000000000100000000") + dataAgainst257And258 := []byte("00090004000071d45dc583690000000100000041010100340000004800000001110000e115ac10ec0100000000e115ac10ecff000000000000000000000000000000000000000004") + expected := "[netflow map[agentAddress:127.0.0.1 bgpDestinationAsNumber:0 bgpSourceAsNumber:0 destinationIPv4Address:172.16.236.255 destinationTransportPort:57621 destinationTransportSvc:57621 egressInterface:0 ingressInterface:0 ipClassOfService:0 protocolIdentifier:17 sourceID:65 sourceIPv4Address:172.16.236.1 sourceTransportPort:57621 sourceTransportSvc:57621 tcpControlBits:0] map[flowEndSysUpTime:0 flowStartSysUpTime:0 octetDeltaCount:72 packetDeltaCount:1]]" + + packetBytes := make([]byte, hex.DecodedLen(len(template257And258))) + _, err = hex.Decode(packetBytes, template257And258) + client.Write(packetBytes) + + packetBytes = make([]byte, hex.DecodedLen(len(dataAgainst257And258))) + _, err = hex.Decode(packetBytes, dataAgainst257And258) + client.Write(packetBytes) + + acc.Wait(1) + acc.Lock() + actual := fmt.Sprintf(("%s"), acc.Metrics) + acc.Unlock() + + assert.Equal(t, expected, actual) +} From 61c31e73af09a411ec4fd6979a2e428ef561ec1a Mon Sep 17 00:00:00 2001 From: Samantha Wang Date: Thu, 15 Oct 2020 12:27:54 -0700 Subject: [PATCH 5/5] Revert "add netflow plugin" This reverts commit be90a96fedf27efca04e6a9c2e9e5c75a1cc72f0. --- plugins/inputs/netflow/README.md | 82 ---------- plugins/inputs/netflow/netflow.go | 216 ------------------------- plugins/inputs/netflow/netflow_test.go | 92 ----------- 3 files changed, 390 deletions(-) delete mode 100644 plugins/inputs/netflow/README.md delete mode 100644 plugins/inputs/netflow/netflow.go delete mode 100644 plugins/inputs/netflow/netflow_test.go diff --git a/plugins/inputs/netflow/README.md b/plugins/inputs/netflow/README.md deleted file mode 100644 index 29ae32755..000000000 --- a/plugins/inputs/netflow/README.md +++ /dev/null @@ -1,82 +0,0 @@ -# SFlow Input Plugin - -The Netflow Input Plugin provides support for acting as an Netflow V9/V10 collector in accordance with the specification from [IETF](https://tools.ietf.org/html/rfc7011). - - -# Configuration -The following configuration options are availabe: - -| Name | Description -|---|---| -| service_address| URL to listen on expressed as UDP (IPv4 or 6) OP address and port number -| | Example: ```service_address = "udp://:2055"``` -| read_buffer_size | Maximum socket buffer size (in bytes when no unit specified). Once the buffer fills up, metrics will start dropping. Defaults to the OS default. -||Example = ```read_buffer_size"64KiB"``` | -| dns_multi_name_processor | An optional regexp and template to use to transform a DNS resolve name. Particularily useful when DNS resolves an IP address to more than one name, and they alternative in order when queried. Using this processor command it is possible to tranform the name into something common irrespect of which entry is first - if the names conform to a regular naming schema. Note TOML [escape sequences](https://github.com/toml-lang/toml) may be required. -||Example: ````s/(.*)(?:-net[0-9])/$1```` will strip ```-net``` from the host name thereby converting, as an example, ```hostx-net1``` and ```hostx-net2``` both to ```hostx``` -|dns_fqdn_resolve|Determines whether IP addresses should be resolved to Host names. -||Example: ```dns_fqdn_resolve = true``` -|dns_fqdn_cache_ttl|The time to live for entries in the DNS name cache expressed in seconds. Default is 0 which is infinite -||Example: ```dns_fwdn_cache_ttl = 3600``` - -## Configuration: - -This is a sample configuration for the plugin. - -```toml -[[inputs.netflow]] - ## URL to listen on - # service_address = "udp://:2055" - # service_address = "udp4://:2055" - # service_address = "udp6://:2055" - - ## Maximum socket buffer size (in bytes when no unit specified). - ## For stream sockets, once the buffer fills up, the sender will start backing up. - ## For datagram sockets, once the buffer fills up, metrics will start dropping. - ## Defaults to the OS default. - # read_buffer_size = "64KiB" - - # Whether IP addresses should be resolved to host names - # dns_fqdn_resolve = true - - # How long should resolved IP->Hostnames be cached (in seconds) - # dns_fqdn_cache_ttl = 3600 - - # Optional processing instructions for transforming DNS resolve host names - # dns_multi_name_processor = "s/(.*)(?:-net[0-9])/$1" -``` - -## DNS Name and SNMP Interface name resolution and caching - -Raw Netflow packets, and their sample data, communicate IP addresses which are not very useful to humans. - -The Netflow plugin can be configured to attempt to resolve IP addresses to host names via DNS. - -The resolved names, or in the case of a resolution error the ip/id will be used as 'the' name, are configurably cached for a period of time to avoid continual lookups. - -| Source IP Tag | Resolved Host Tag -|---|---| -|agentAddress|agentHost| -|sourceIPv4Address|sourceIPv4Host| -|destinationIPv4Address|sourceIPv4Host| -|sourceIPv6Address|sourceIPv6Host| -|destinationIPv6Address|destinationIPv6Host| -|exporterIPv4Address|exporterIPv4Host| -|exporterIPv6Address|exporterIPv6Host| - - -### Multipe DNS Name resolution & processing - -In some cases DNS servers may maintain multiple entries for the same IP address in support of load balancing. In this setup the same IP address may be resolved to multiple DNS names, via a single DNS query, and it is likely the order of those DNS names will change over time. - -In order to provide some stability to the names recorded against flow records, it is possible to provide a regular expression and template transformation that should be capable of converting multiple names to a single common name where a mathodical naming scheme has been used. - -Example: ````s/(.*)(?:-net[0-9])/$1```` will strip ```-net``` from the host name thereby converting, as an example, ```hostx-net1``` and ```hostx-net2``` both to ```hostx``` - -# Schema - -The parsing of Netflow packets is handled by the Netflow Parser and the schema is described [here](../../parsers/network_flow/netflow/README.md). - -At a high level, individual Flow Samples within the V10 Flow Packet are translated to individual Metric objects. - - diff --git a/plugins/inputs/netflow/netflow.go b/plugins/inputs/netflow/netflow.go deleted file mode 100644 index 4a0018943..000000000 --- a/plugins/inputs/netflow/netflow.go +++ /dev/null @@ -1,216 +0,0 @@ -// Package sflow contains a Telegraf input plugin that listens for SFLow V5 network flow sample monitoring packets, parses them to extract flow -// samples which it turns into Metrics for output -package sflow - -import ( - "fmt" - "io" - "log" - "net" - "strings" - "time" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/plugins/inputs/network_flow" - "github.com/influxdata/telegraf/plugins/parsers" - "github.com/influxdata/telegraf/plugins/parsers/network_flow/netflow" -) - -type setReadBufferer interface { - SetReadBuffer(bytes int) error -} - -type packetListener struct { - net.PacketConn - *Listener - network_flow.Resolver -} - -func (psl *packetListener) listen() { - buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet - for { - n, a, err := psl.ReadFrom(buf) - if err != nil { - if !strings.HasSuffix(err.Error(), ": use of closed network connection") { - psl.AddError(err) - } - break - } - psl.process(a, buf[:n]) - } -} - -func (psl *packetListener) process(addr net.Addr, buf []byte) { - fmt.Println("netflow received len(buf)", len(buf)) - metrics, err := psl.Parse(buf) - if err != nil { - psl.AddError(fmt.Errorf("unable to parse incoming packet: %s", err)) - - } - fmt.Println("netflow resulted in len(metrisc), err", len(metrics), err) - - for _, m := range metrics { - if h, _, e := net.SplitHostPort(addr.String()); e == nil { - m.AddTag("agentAddress", h) - } - psl.Resolver.Resolve(m, func(resolvedM telegraf.Metric) { - psl.AddMetric(resolvedM) - }) - } -} - -// Listener configuration structure -type Listener struct { - ServiceAddress string `toml:"service_address"` - ReadBufferSize internal.Size `toml:"read_buffer_size"` - - SNMPCommunity string `toml:"snmp_community"` - SNMPIfaceResolve bool `toml:"snmp_iface_resolve"` - SNMPIfaceCacheTTL int `toml:"snmp_iface_cache_ttl"` - - DNSFQDNResolve bool `toml:"dns_fqdn_resolve"` - DNSFQDNCacheTTL int `toml:"dns_fqdn_cache_ttl"` - DNSMultiNameProcessor string `toml:"dns_multi_name_processor"` - - nameResolver network_flow.Resolver - parsers.Parser - telegraf.Accumulator - io.Closer -} - -// Description answers a description of this input plugin -func (sl *Listener) Description() string { - return "Netflow v9/v10 Protocol Listener" -} - -// SampleConfig answers a sample configuration -func (sl *Listener) SampleConfig() string { - return ` - ## URL to listen on - # service_address = "udp://:2055" - # service_address = "udp4://:2055" - # service_address = "udp6://:2055" - - ## Maximum socket buffer size (in bytes when no unit specified). - ## For stream sockets, once the buffer fills up, the sender will start backing up. - ## For datagram sockets, once the buffer fills up, metrics will start dropping. - ## Defaults to the OS default. - # read_buffer_size = "64KiB" - - # Whether IP addresses should be resolved to host names - # dns_fqdn_resolve = true - - # How long should resolved IP->Hostnames be cached (in seconds) - # dns_fqdn_cache_ttl = 3600 - - # Optional processing instructions for transforming DNS resolve host names - # dns_multi_name_processor = "s/(.*)(?:-net[0-9])/$1" - - # Whether Interface Indexes should be resolved to Interface Names via SNMP - # snmp_iface_resolve = true - - # SNMP Community string to use when resolving Interface Names - # snmp_community = "public" - - # How long should resolved Iface Index->Iface Name be cached (in seconds) - # snmp_iface_cache_ttl = 3600 - ` -} - -// Gather is a NOP for sFlow as it receives, asynchronously, sFlow network packets -func (sl *Listener) Gather(_ telegraf.Accumulator) error { - return nil -} - -// Start starts this sFlow listener listening on the configured network for sFlow packets -func (sl *Listener) Start(acc telegraf.Accumulator) error { - - dnsToResolve := map[string]string{ - "agentAddress": "agentHost", - "sourceIPv4Address": "sourceIPv4Host", - "destinationIPv4Address": "sourceIPv4Host", - "sourceIPv6Address": "sourceIPv6Host", - "destinationIPv6Address": "destinationIPv6Host", - "exporterIPv4Address": "exporterIPv4Host", - "exporterIPv6Address": "exporterIPv6Host", - } - - sl.Accumulator = acc - sl.nameResolver = network_flow.NewAsyncResolver(sl.DNSFQDNResolve, time.Duration(sl.DNSFQDNCacheTTL)*time.Second, sl.DNSMultiNameProcessor, sl.SNMPIfaceResolve, time.Duration(sl.SNMPIfaceCacheTTL)*time.Second, sl.SNMPCommunity, "netflow", dnsToResolve) - sl.nameResolver.Start() - - parser, err := netflow.NewParser("netflow", make(map[string]string)) - if err != nil { - return err - } - sl.Parser = parser - - spl := strings.SplitN(sl.ServiceAddress, "://", 2) - if len(spl) != 2 { - return fmt.Errorf("invalid service address: %s", sl.ServiceAddress) - } - - protocol := spl[0] - addr := spl[1] - - pc, err := newUDPListener(protocol, addr) - if err != nil { - return err - } - if sl.ReadBufferSize.Size > 0 { - if srb, ok := pc.(setReadBufferer); ok { - srb.SetReadBuffer(int(sl.ReadBufferSize.Size)) - } else { - log.Printf("W! Unable to set read buffer on a %s socket", protocol) - } - } - - log.Printf("I! [inputs.netflow] Listening on %s://%s", protocol, pc.LocalAddr()) - - psl := &packetListener{ - PacketConn: pc, - Listener: sl, - Resolver: sl.nameResolver, - } - - sl.Closer = psl - go psl.listen() - - return nil -} - -// Stop this Listener -func (sl *Listener) Stop() { - if sl.Closer != nil { - sl.Close() - sl.Closer = nil - } - sl.nameResolver.Stop() -} - -// newListener constructs a new vanilla, unconfigured, listener and returns it -func newListener() *Listener { - p, _ := netflow.NewParser("netflow", make(map[string]string)) - return &Listener{Parser: p} -} - -// newUDPListener answers a net.PacketConn for the expected UDP network and address passed in -func newUDPListener(network string, address string) (net.PacketConn, error) { - switch network { - case "udp", "udp4", "udp6": - addr, err := net.ResolveUDPAddr(network, address) - if err != nil { - return nil, err - } - return net.ListenUDP(network, addr) - default: - return nil, fmt.Errorf("unsupported network type %s", network) - } -} - -// init registers this SFflow input plug in with the Telegraf framework -func init() { - inputs.Add("netflow", func() telegraf.Input { return newListener() }) -} diff --git a/plugins/inputs/netflow/netflow_test.go b/plugins/inputs/netflow/netflow_test.go deleted file mode 100644 index 40d1bfc64..000000000 --- a/plugins/inputs/netflow/netflow_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package sflow - -import ( - "bytes" - "encoding/hex" - "fmt" - "io" - "log" - "net" - "os" - "testing" - - "github.com/influxdata/telegraf/internal" - "github.com/influxdata/telegraf/testutil" - "github.com/influxdata/wlog" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// testEmptyLog is a helper function to ensure no data is written to log. -// Should be called at the start of the test, and returns a function which should run at the end. -func testEmptyLog(t *testing.T) func() { - buf := bytes.NewBuffer(nil) - log.SetOutput(wlog.NewWriter(buf)) - - level := wlog.WARN - wlog.SetLevel(level) - - return func() { - log.SetOutput(os.Stderr) - - for { - line, err := buf.ReadBytes('\n') - if err != nil { - assert.Equal(t, io.EOF, err) - break - } - assert.Empty(t, string(line), "log not empty") - } - } -} - -func TestNetflowDescription(t *testing.T) { - sl := newListener() - assert.NotEmpty(t, sl.Description()) -} - -func TestNetflowSampleConfig(t *testing.T) { - sl := newListener() - assert.NotEmpty(t, sl.SampleConfig()) -} - -func TestNetflowGather(t *testing.T) { - sl := newListener() - assert.Nil(t, sl.Gather(nil)) -} - -func TestNetflowToMetrics(t *testing.T) { - defer testEmptyLog(t)() - - sl := newListener() - sl.ServiceAddress = "udp://127.0.0.1:0" - sl.ReadBufferSize = internal.Size{Size: 1024} - sl.DNSFQDNResolve = false - - acc := &testutil.Accumulator{} - err := sl.Start(acc) - require.NoError(t, err) - defer sl.Stop() - - client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String()) - require.NoError(t, err) - - template257And258 := []byte("00090004000071d45dc583690000000000000041000000840101000f00010004000200040004000100050001000600010007000200080004000a0004000b0002000c0004000e0004001000040011000400150004001600040102000f000100040002000400040001000500010006000100070002000a0004000b0002000e000400100004001100040015000400160004001b0010001c00100001001801030004000800010004002a000400290004000001030010000000000000000100000000") - dataAgainst257And258 := []byte("00090004000071d45dc583690000000100000041010100340000004800000001110000e115ac10ec0100000000e115ac10ecff000000000000000000000000000000000000000004") - expected := "[netflow map[agentAddress:127.0.0.1 bgpDestinationAsNumber:0 bgpSourceAsNumber:0 destinationIPv4Address:172.16.236.255 destinationTransportPort:57621 destinationTransportSvc:57621 egressInterface:0 ingressInterface:0 ipClassOfService:0 protocolIdentifier:17 sourceID:65 sourceIPv4Address:172.16.236.1 sourceTransportPort:57621 sourceTransportSvc:57621 tcpControlBits:0] map[flowEndSysUpTime:0 flowStartSysUpTime:0 octetDeltaCount:72 packetDeltaCount:1]]" - - packetBytes := make([]byte, hex.DecodedLen(len(template257And258))) - _, err = hex.Decode(packetBytes, template257And258) - client.Write(packetBytes) - - packetBytes = make([]byte, hex.DecodedLen(len(dataAgainst257And258))) - _, err = hex.Decode(packetBytes, dataAgainst257And258) - client.Write(packetBytes) - - acc.Wait(1) - acc.Lock() - actual := fmt.Sprintf(("%s"), acc.Metrics) - acc.Unlock() - - assert.Equal(t, expected, actual) -}