Revert "add netflow plugin"

This reverts commit be90a96fed.
This commit is contained in:
Samantha Wang 2020-10-15 12:27:54 -07:00
parent be90a96fed
commit 61c31e73af
3 changed files with 0 additions and 390 deletions

View File

@ -1,82 +0,0 @@
# SFlow Input Plugin
The Netflow Input Plugin provides support for acting as an Netflow V9/V10 collector in accordance with the specification from [IETF](https://tools.ietf.org/html/rfc7011).
# Configuration
The following configuration options are availabe:
| Name | Description
|---|---|
| service_address| URL to listen on expressed as UDP (IPv4 or 6) OP address and port number
| | Example: ```service_address = "udp://:2055"```
| read_buffer_size | Maximum socket buffer size (in bytes when no unit specified). Once the buffer fills up, metrics will start dropping. Defaults to the OS default.
||Example = ```read_buffer_size"64KiB"``` |
| dns_multi_name_processor | An optional regexp and template to use to transform a DNS resolve name. Particularily useful when DNS resolves an IP address to more than one name, and they alternative in order when queried. Using this processor command it is possible to tranform the name into something common irrespect of which entry is first - if the names conform to a regular naming schema. Note TOML [escape sequences](https://github.com/toml-lang/toml) may be required.
||Example: ````s/(.*)(?:-net[0-9])/$1```` will strip ```-net<n>``` from the host name thereby converting, as an example, ```hostx-net1``` and ```hostx-net2``` both to ```hostx```
|dns_fqdn_resolve|Determines whether IP addresses should be resolved to Host names.
||Example: ```dns_fqdn_resolve = true```
|dns_fqdn_cache_ttl|The time to live for entries in the DNS name cache expressed in seconds. Default is 0 which is infinite
||Example: ```dns_fwdn_cache_ttl = 3600```
## Configuration:
This is a sample configuration for the plugin.
```toml
[[inputs.netflow]]
## URL to listen on
# service_address = "udp://:2055"
# service_address = "udp4://:2055"
# service_address = "udp6://:2055"
## Maximum socket buffer size (in bytes when no unit specified).
## For stream sockets, once the buffer fills up, the sender will start backing up.
## For datagram sockets, once the buffer fills up, metrics will start dropping.
## Defaults to the OS default.
# read_buffer_size = "64KiB"
# Whether IP addresses should be resolved to host names
# dns_fqdn_resolve = true
# How long should resolved IP->Hostnames be cached (in seconds)
# dns_fqdn_cache_ttl = 3600
# Optional processing instructions for transforming DNS resolve host names
# dns_multi_name_processor = "s/(.*)(?:-net[0-9])/$1"
```
## DNS Name and SNMP Interface name resolution and caching
Raw Netflow packets, and their sample data, communicate IP addresses which are not very useful to humans.
The Netflow plugin can be configured to attempt to resolve IP addresses to host names via DNS.
The resolved names, or in the case of a resolution error the ip/id will be used as 'the' name, are configurably cached for a period of time to avoid continual lookups.
| Source IP Tag | Resolved Host Tag
|---|---|
|agentAddress|agentHost|
|sourceIPv4Address|sourceIPv4Host|
|destinationIPv4Address|sourceIPv4Host|
|sourceIPv6Address|sourceIPv6Host|
|destinationIPv6Address|destinationIPv6Host|
|exporterIPv4Address|exporterIPv4Host|
|exporterIPv6Address|exporterIPv6Host|
### Multipe DNS Name resolution & processing
In some cases DNS servers may maintain multiple entries for the same IP address in support of load balancing. In this setup the same IP address may be resolved to multiple DNS names, via a single DNS query, and it is likely the order of those DNS names will change over time.
In order to provide some stability to the names recorded against flow records, it is possible to provide a regular expression and template transformation that should be capable of converting multiple names to a single common name where a mathodical naming scheme has been used.
Example: ````s/(.*)(?:-net[0-9])/$1```` will strip ```-net<n>``` from the host name thereby converting, as an example, ```hostx-net1``` and ```hostx-net2``` both to ```hostx```
# Schema
The parsing of Netflow packets is handled by the Netflow Parser and the schema is described [here](../../parsers/network_flow/netflow/README.md).
At a high level, individual Flow Samples within the V10 Flow Packet are translated to individual Metric objects.

View File

@ -1,216 +0,0 @@
// Package sflow contains a Telegraf input plugin that listens for SFLow V5 network flow sample monitoring packets, parses them to extract flow
// samples which it turns into Metrics for output
package sflow
import (
"fmt"
"io"
"log"
"net"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/inputs/network_flow"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/network_flow/netflow"
)
type setReadBufferer interface {
SetReadBuffer(bytes int) error
}
type packetListener struct {
net.PacketConn
*Listener
network_flow.Resolver
}
func (psl *packetListener) listen() {
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
n, a, err := psl.ReadFrom(buf)
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
psl.AddError(err)
}
break
}
psl.process(a, buf[:n])
}
}
func (psl *packetListener) process(addr net.Addr, buf []byte) {
fmt.Println("netflow received len(buf)", len(buf))
metrics, err := psl.Parse(buf)
if err != nil {
psl.AddError(fmt.Errorf("unable to parse incoming packet: %s", err))
}
fmt.Println("netflow resulted in len(metrisc), err", len(metrics), err)
for _, m := range metrics {
if h, _, e := net.SplitHostPort(addr.String()); e == nil {
m.AddTag("agentAddress", h)
}
psl.Resolver.Resolve(m, func(resolvedM telegraf.Metric) {
psl.AddMetric(resolvedM)
})
}
}
// Listener configuration structure
type Listener struct {
ServiceAddress string `toml:"service_address"`
ReadBufferSize internal.Size `toml:"read_buffer_size"`
SNMPCommunity string `toml:"snmp_community"`
SNMPIfaceResolve bool `toml:"snmp_iface_resolve"`
SNMPIfaceCacheTTL int `toml:"snmp_iface_cache_ttl"`
DNSFQDNResolve bool `toml:"dns_fqdn_resolve"`
DNSFQDNCacheTTL int `toml:"dns_fqdn_cache_ttl"`
DNSMultiNameProcessor string `toml:"dns_multi_name_processor"`
nameResolver network_flow.Resolver
parsers.Parser
telegraf.Accumulator
io.Closer
}
// Description answers a description of this input plugin
func (sl *Listener) Description() string {
return "Netflow v9/v10 Protocol Listener"
}
// SampleConfig answers a sample configuration
func (sl *Listener) SampleConfig() string {
return `
## URL to listen on
# service_address = "udp://:2055"
# service_address = "udp4://:2055"
# service_address = "udp6://:2055"
## Maximum socket buffer size (in bytes when no unit specified).
## For stream sockets, once the buffer fills up, the sender will start backing up.
## For datagram sockets, once the buffer fills up, metrics will start dropping.
## Defaults to the OS default.
# read_buffer_size = "64KiB"
# Whether IP addresses should be resolved to host names
# dns_fqdn_resolve = true
# How long should resolved IP->Hostnames be cached (in seconds)
# dns_fqdn_cache_ttl = 3600
# Optional processing instructions for transforming DNS resolve host names
# dns_multi_name_processor = "s/(.*)(?:-net[0-9])/$1"
# Whether Interface Indexes should be resolved to Interface Names via SNMP
# snmp_iface_resolve = true
# SNMP Community string to use when resolving Interface Names
# snmp_community = "public"
# How long should resolved Iface Index->Iface Name be cached (in seconds)
# snmp_iface_cache_ttl = 3600
`
}
// Gather is a NOP for sFlow as it receives, asynchronously, sFlow network packets
func (sl *Listener) Gather(_ telegraf.Accumulator) error {
return nil
}
// Start starts this sFlow listener listening on the configured network for sFlow packets
func (sl *Listener) Start(acc telegraf.Accumulator) error {
dnsToResolve := map[string]string{
"agentAddress": "agentHost",
"sourceIPv4Address": "sourceIPv4Host",
"destinationIPv4Address": "sourceIPv4Host",
"sourceIPv6Address": "sourceIPv6Host",
"destinationIPv6Address": "destinationIPv6Host",
"exporterIPv4Address": "exporterIPv4Host",
"exporterIPv6Address": "exporterIPv6Host",
}
sl.Accumulator = acc
sl.nameResolver = network_flow.NewAsyncResolver(sl.DNSFQDNResolve, time.Duration(sl.DNSFQDNCacheTTL)*time.Second, sl.DNSMultiNameProcessor, sl.SNMPIfaceResolve, time.Duration(sl.SNMPIfaceCacheTTL)*time.Second, sl.SNMPCommunity, "netflow", dnsToResolve)
sl.nameResolver.Start()
parser, err := netflow.NewParser("netflow", make(map[string]string))
if err != nil {
return err
}
sl.Parser = parser
spl := strings.SplitN(sl.ServiceAddress, "://", 2)
if len(spl) != 2 {
return fmt.Errorf("invalid service address: %s", sl.ServiceAddress)
}
protocol := spl[0]
addr := spl[1]
pc, err := newUDPListener(protocol, addr)
if err != nil {
return err
}
if sl.ReadBufferSize.Size > 0 {
if srb, ok := pc.(setReadBufferer); ok {
srb.SetReadBuffer(int(sl.ReadBufferSize.Size))
} else {
log.Printf("W! Unable to set read buffer on a %s socket", protocol)
}
}
log.Printf("I! [inputs.netflow] Listening on %s://%s", protocol, pc.LocalAddr())
psl := &packetListener{
PacketConn: pc,
Listener: sl,
Resolver: sl.nameResolver,
}
sl.Closer = psl
go psl.listen()
return nil
}
// Stop this Listener
func (sl *Listener) Stop() {
if sl.Closer != nil {
sl.Close()
sl.Closer = nil
}
sl.nameResolver.Stop()
}
// newListener constructs a new vanilla, unconfigured, listener and returns it
func newListener() *Listener {
p, _ := netflow.NewParser("netflow", make(map[string]string))
return &Listener{Parser: p}
}
// newUDPListener answers a net.PacketConn for the expected UDP network and address passed in
func newUDPListener(network string, address string) (net.PacketConn, error) {
switch network {
case "udp", "udp4", "udp6":
addr, err := net.ResolveUDPAddr(network, address)
if err != nil {
return nil, err
}
return net.ListenUDP(network, addr)
default:
return nil, fmt.Errorf("unsupported network type %s", network)
}
}
// init registers this SFflow input plug in with the Telegraf framework
func init() {
inputs.Add("netflow", func() telegraf.Input { return newListener() })
}

View File

@ -1,92 +0,0 @@
package sflow
import (
"bytes"
"encoding/hex"
"fmt"
"io"
"log"
"net"
"os"
"testing"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/testutil"
"github.com/influxdata/wlog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// testEmptyLog is a helper function to ensure no data is written to log.
// Should be called at the start of the test, and returns a function which should run at the end.
func testEmptyLog(t *testing.T) func() {
buf := bytes.NewBuffer(nil)
log.SetOutput(wlog.NewWriter(buf))
level := wlog.WARN
wlog.SetLevel(level)
return func() {
log.SetOutput(os.Stderr)
for {
line, err := buf.ReadBytes('\n')
if err != nil {
assert.Equal(t, io.EOF, err)
break
}
assert.Empty(t, string(line), "log not empty")
}
}
}
func TestNetflowDescription(t *testing.T) {
sl := newListener()
assert.NotEmpty(t, sl.Description())
}
func TestNetflowSampleConfig(t *testing.T) {
sl := newListener()
assert.NotEmpty(t, sl.SampleConfig())
}
func TestNetflowGather(t *testing.T) {
sl := newListener()
assert.Nil(t, sl.Gather(nil))
}
func TestNetflowToMetrics(t *testing.T) {
defer testEmptyLog(t)()
sl := newListener()
sl.ServiceAddress = "udp://127.0.0.1:0"
sl.ReadBufferSize = internal.Size{Size: 1024}
sl.DNSFQDNResolve = false
acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)
defer sl.Stop()
client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String())
require.NoError(t, err)
template257And258 := []byte("00090004000071d45dc583690000000000000041000000840101000f00010004000200040004000100050001000600010007000200080004000a0004000b0002000c0004000e0004001000040011000400150004001600040102000f000100040002000400040001000500010006000100070002000a0004000b0002000e000400100004001100040015000400160004001b0010001c00100001001801030004000800010004002a000400290004000001030010000000000000000100000000")
dataAgainst257And258 := []byte("00090004000071d45dc583690000000100000041010100340000004800000001110000e115ac10ec0100000000e115ac10ecff000000000000000000000000000000000000000004")
expected := "[netflow map[agentAddress:127.0.0.1 bgpDestinationAsNumber:0 bgpSourceAsNumber:0 destinationIPv4Address:172.16.236.255 destinationTransportPort:57621 destinationTransportSvc:57621 egressInterface:0 ingressInterface:0 ipClassOfService:0 protocolIdentifier:17 sourceID:65 sourceIPv4Address:172.16.236.1 sourceTransportPort:57621 sourceTransportSvc:57621 tcpControlBits:0] map[flowEndSysUpTime:0 flowStartSysUpTime:0 octetDeltaCount:72 packetDeltaCount:1]]"
packetBytes := make([]byte, hex.DecodedLen(len(template257And258)))
_, err = hex.Decode(packetBytes, template257And258)
client.Write(packetBytes)
packetBytes = make([]byte, hex.DecodedLen(len(dataAgainst257And258)))
_, err = hex.Decode(packetBytes, dataAgainst257And258)
client.Write(packetBytes)
acc.Wait(1)
acc.Lock()
actual := fmt.Sprintf(("%s"), acc.Metrics)
acc.Unlock()
assert.Equal(t, expected, actual)
}