diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index b394c907f..d8ae39e4f 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -175,6 +175,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy" _ "github.com/influxdata/telegraf/plugins/inputs/snmp_trap" _ "github.com/influxdata/telegraf/plugins/inputs/socket_listener" + _ "github.com/influxdata/telegraf/plugins/inputs/socketstat" _ "github.com/influxdata/telegraf/plugins/inputs/solr" _ "github.com/influxdata/telegraf/plugins/inputs/sql" _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" diff --git a/plugins/inputs/socketstat/README.md b/plugins/inputs/socketstat/README.md new file mode 100644 index 000000000..833e861bf --- /dev/null +++ b/plugins/inputs/socketstat/README.md @@ -0,0 +1,55 @@ +# SocketStat plugin + +The socketstat plugin gathers indicators from established connections, using iproute2's `ss` command. + +The `ss` command does not require specific privileges. + +**WARNING: The output format will produce series with very high cardinality.** You should either store those by an engine which doesn't suffer from it, use a short retention policy or do appropriate filtering. + +## Configuration + +```toml +[[inputs.socketstat]] + ## ss can display information about tcp, udp, raw, unix, packet, dccp and sctp sockets + ## Specify here the types you want to gather + socket_types = [ "tcp", "udp" ] + ## The default timeout of 1s for ss execution can be overridden here: + # timeout = "1s" +``` + +## Measurements & Fields + +- socketstat + - state (string) (for tcp, dccp and sctp protocols) + - If ss provides it (it depends on the protocol and ss version): + - bytes_acked (integer, bytes) + - bytes_received (integer, bytes) + - segs_out (integer, count) + - segs_in (integer, count) + - data_segs_out (integer, count) + - data_segs_in (integer, count) + +## Tags + +- All measurements have the following tags: + - proto + - local_addr + - local_port + - remote_addr + - remote_port + +## Example Output + +### recent ss version (iproute2 4.3.0 here) + +```sh +./telegraf --config telegraf.conf --input-filter socketstat --test +> socketstat,host=ubuntu-xenial,local_addr=10.6.231.226,local_port=42716,proto=tcp,remote_addr=192.168.2.21,remote_port=80 bytes_acked=184i,bytes_received=2624519595i,recv_q=4344i,segs_in=1812580i,segs_out=661642i,send_q=0i,state="ESTAB" 1606457205000000000 +``` + +### older ss version (iproute2 3.12.0 here) + +```sh +./telegraf --config telegraf.conf --input-filter socketstat --test +> socketstat,host=ubuntu-trusty,local_addr=10.6.231.163,local_port=35890,proto=tcp,remote_addr=192.168.2.21,remote_port=80 recv_q=0i,send_q=0i,state="ESTAB" 1606456977000000000 +``` diff --git a/plugins/inputs/socketstat/socketstat.go b/plugins/inputs/socketstat/socketstat.go new file mode 100644 index 000000000..3140c6ad5 --- /dev/null +++ b/plugins/inputs/socketstat/socketstat.go @@ -0,0 +1,222 @@ +//go:build !windows +// +build !windows + +// iproute2 doesn't exist on Windows + +package socketstat + +import ( + "bufio" + "bytes" + "fmt" + "os/exec" + "regexp" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const measurement = "socketstat" + +// Socketstat is a telegraf plugin to gather indicators from established connections, using iproute2's `ss` command. +type Socketstat struct { + SocketProto []string `toml:"protocols"` + Timeout config.Duration `toml:"timeout"` + Log telegraf.Logger `toml:"-"` + + isNewConnection *regexp.Regexp + validValues *regexp.Regexp + cmdName string + lister socketLister +} + +type socketLister func(cmdName string, proto string, timeout config.Duration) (*bytes.Buffer, error) + +// Description returns a short description of the plugin +func (ss *Socketstat) Description() string { + return "Gather indicators from established connections, using iproute2's `ss` command." +} + +// SampleConfig returns sample configuration options +func (ss *Socketstat) SampleConfig() string { + return ` + ## ss can display information about tcp, udp, raw, unix, packet, dccp and sctp sockets + ## List of protocol types to collect + # protocols = [ "tcp", "udp" ] + ## The default timeout of 1s for ss execution can be overridden here: + # timeout = "1s" +` +} + +// Gather gathers indicators from established connections +func (ss *Socketstat) Gather(acc telegraf.Accumulator) error { + // best effort : we continue through the protocols even if an error is encountered, + // but we keep track of the last error. + for _, proto := range ss.SocketProto { + out, err := ss.lister(ss.cmdName, proto, ss.Timeout) + if err != nil { + acc.AddError(err) + continue + } + ss.parseAndGather(acc, out, proto) + } + return nil +} + +func socketList(cmdName string, proto string, timeout config.Duration) (*bytes.Buffer, error) { + // Run ss for the given protocol, return the output as bytes.Buffer + args := []string{"-in", "--" + proto} + cmd := exec.Command(cmdName, args...) + var out bytes.Buffer + cmd.Stdout = &out + err := internal.RunTimeout(cmd, time.Duration(timeout)) + if err != nil { + return &out, fmt.Errorf("error running ss -in --%s: %v", proto, err) + } + return &out, nil +} + +func (ss *Socketstat) parseAndGather(acc telegraf.Accumulator, data *bytes.Buffer, proto string) { + scanner := bufio.NewScanner(data) + tags := map[string]string{} + fields := make(map[string]interface{}) + + // ss output can have blank lines, and/or socket basic info lines and more advanced + // statistics lines, in turns. + // In all non-empty lines, we can have metrics, so we need to group those relevant to + // the same connection. + // To achieve this, we're using the flushData variable which indicates if we should add + // a new measurement or postpone it to a later line. + + // The first line is only headers + scanner.Scan() + + flushData := false + for scanner.Scan() { + line := scanner.Text() + if line == "" { + continue + } + words := strings.Fields(line) + + if ss.isNewConnection.MatchString(line) { + // A line with starting whitespace means metrics about the current connection. + // We should never get 2 consecutive such lines. If we do, log a warning and in + // a best effort, extend the metrics from the 1st line with the metrics of the 2nd + // one, possibly overwriting. + for _, word := range words { + if !ss.validValues.MatchString(word) { + continue + } + // kv will have 2 fields because it matched the regexp + kv := strings.Split(word, ":") + v, err := strconv.ParseUint(kv[1], 10, 64) + if err != nil { + ss.Log.Infof("Couldn't parse metric %q: %v", word, err) + continue + } + fields[kv[0]] = v + } + if !flushData { + ss.Log.Warnf("Found orphaned metrics: %s", words) + ss.Log.Warn("Added them to the last known connection.") + } + acc.AddFields(measurement, fields, tags) + flushData = false + continue + } + // A line with no starting whitespace means we're going to parse a new connection. + // Flush what we gathered about the previous one, if any. + if flushData { + acc.AddFields(measurement, fields, tags) + } + + // Delegate the real parsing to getTagsAndState, which manages various + // formats depending on the protocol. + tags, fields = getTagsAndState(proto, words, ss.Log) + + // This line containted metrics, so record that. + flushData = true + } + if flushData { + acc.AddFields(measurement, fields, tags) + } +} + +func getTagsAndState(proto string, words []string, log telegraf.Logger) (map[string]string, map[string]interface{}) { + tags := map[string]string{ + "proto": proto, + } + fields := make(map[string]interface{}) + switch proto { + case "udp", "raw": + words = append([]string{"dummy"}, words...) + case "tcp", "dccp", "sctp": + fields["state"] = words[0] + } + switch proto { + case "tcp", "udp", "raw", "dccp", "sctp": + // Local and remote addresses are fields 3 and 4 + // Separate addresses and ports with the last ':' + localIndex := strings.LastIndex(words[3], ":") + remoteIndex := strings.LastIndex(words[4], ":") + tags["local_addr"] = words[3][:localIndex] + tags["local_port"] = words[3][localIndex+1:] + tags["remote_addr"] = words[4][:remoteIndex] + tags["remote_port"] = words[4][remoteIndex+1:] + case "unix", "packet": + fields["netid"] = words[0] + tags["local_addr"] = words[4] + tags["local_port"] = words[5] + tags["remote_addr"] = words[6] + tags["remote_port"] = words[7] + } + v, err := strconv.ParseUint(words[1], 10, 64) + if err != nil { + log.Warnf("Couldn't read recv_q in %q: %v", words, err) + } else { + fields["recv_q"] = v + } + v, err = strconv.ParseUint(words[2], 10, 64) + if err != nil { + log.Warnf("Couldn't read send_q in %q: %v", words, err) + } else { + fields["send_q"] = v + } + return tags, fields +} + +func (ss *Socketstat) Init() error { + if len(ss.SocketProto) == 0 { + ss.SocketProto = []string{"tcp", "udp"} + } + + // Initialize regexps to validate input data + validFields := "(bytes_acked|bytes_received|segs_out|segs_in|data_segs_in|data_segs_out)" + ss.validValues = regexp.MustCompile("^" + validFields + ":[0-9]+$") + ss.isNewConnection = regexp.MustCompile(`^\s+.*$`) + + ss.lister = socketList + + // Check that ss is installed, get its path. + // Do it last, because in test environments where `ss` might not be available, + // we still want the other Init() actions to be performed. + ssPath, err := exec.LookPath("ss") + if err != nil { + return err + } + ss.cmdName = ssPath + + return nil +} + +func init() { + inputs.Add("socketstat", func() telegraf.Input { + return &Socketstat{Timeout: config.Duration(time.Second)} + }) +} diff --git a/plugins/inputs/socketstat/socketstat_test.go b/plugins/inputs/socketstat/socketstat_test.go new file mode 100644 index 000000000..bd73051d7 --- /dev/null +++ b/plugins/inputs/socketstat/socketstat_test.go @@ -0,0 +1,126 @@ +//go:build !windows +// +build !windows + +package socketstat + +import ( + "bytes" + "errors" + "os" + "path/filepath" + "testing" + + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestSocketstat_Gather(t *testing.T) { + tests := []struct { + name string + proto []string + filename string + tags []map[string]string + fields [][]map[string]interface{} + err error + }{ + { + name: "tcp - no sockets => no results", + proto: []string{"tcp"}, + filename: "tcp_no_sockets.txt", + }, + { + name: "udp - no sockets => no results", + proto: []string{"udp"}, + filename: "udp_no_sockets.txt", + }, + { + name: "tcp sockets captured", + proto: []string{"tcp"}, + filename: "tcp_traffic.txt", + tags: []map[string]string{ + {"proto": "tcp", "local_addr": "192.168.1.21", "local_port": "6514", "remote_addr": "192.168.1.21", "remote_port": "443"}, + {"proto": "tcp", "local_addr": "192.168.122.1", "local_port": "55194", "remote_addr": "192.168.122.1", "remote_port": "6514"}, + {"proto": "tcp", "local_addr": "127.0.0.1", "local_port": "7778", "remote_addr": "127.0.0.1", "remote_port": "50378"}, + }, + fields: [][]map[string]interface{}{ + {map[string]interface{}{"state": "ESTAB", "bytes_acked": uint64(1126), "bytes_received": uint64(532644751), "segs_out": uint64(211249), "segs_in": uint64(211254), "data_segs_out": uint64(2), "data_segs_in": uint64(211251), "recv_q": uint64(0), "send_q": uint64(0)}}, + {map[string]interface{}{"state": "ESTAB", "bytes_acked": uint64(790782896), "bytes_received": uint64(1126), "segs_out": uint64(333361), "segs_in": uint64(333361), "data_segs_out": uint64(333358), "data_segs_in": uint64(2), "recv_q": uint64(0), "send_q": uint64(0)}}, + {map[string]interface{}{"state": "ESTAB", "bytes_acked": uint64(19983121), "bytes_received": uint64(266383), "segs_out": uint64(15431), "segs_in": uint64(17633), "data_segs_out": uint64(15119), "data_segs_in": uint64(5098), "recv_q": uint64(0), "send_q": uint64(0)}}, + }, + }, + { + name: "udp packets captured", + proto: []string{"udp"}, + filename: "udp_traffic.txt", + tags: []map[string]string{ + {"proto": "udp", "local_addr": "10.10.0.4", "local_port": "33149", "remote_addr": "10.10.0.5", "remote_port": "53"}, + {"proto": "udp", "local_addr": "10.10.0.4", "local_port": "54276", "remote_addr": "10.10.0.6", "remote_port": "53"}, + {"proto": "udp", "local_addr": "10.10.0.4", "local_port": "38312", "remote_addr": "10.10.0.7", "remote_port": "53"}, + }, + fields: [][]map[string]interface{}{ + {map[string]interface{}{"recv_q": uint64(0), "send_q": uint64(0)}}, + {map[string]interface{}{"recv_q": uint64(0), "send_q": uint64(0)}}, + {map[string]interface{}{"recv_q": uint64(0), "send_q": uint64(0)}}, + }, + }, + } + for i, tt := range tests { + octets, err := os.ReadFile(filepath.Join("testdata", tt.filename)) + require.NoError(t, err) + + t.Run(tt.name, func(t *testing.T) { + i++ + ss := &Socketstat{ + SocketProto: tt.proto, + } + acc := new(testutil.Accumulator) + + err := ss.Init() + if err != nil { + require.EqualError(t, err, "exec: \"ss\": executable file not found in $PATH") + } + ss.lister = func(cmdName string, proto string, timeout config.Duration) (*bytes.Buffer, error) { + return bytes.NewBuffer(octets), nil + } + + err = acc.GatherError(ss.Gather) + require.ErrorIs(t, err, tt.err) + if len(tt.proto) == 0 { + n := acc.NFields() + require.Equalf(t, 0, n, "%d: expected 0 values got %d", i, n) + return + } + if len(tt.tags) == 0 { + n := acc.NFields() + require.Equalf(t, 0, n, "%d: expected 0 values got %d", i, n) + return + } + n := 0 + for j, tags := range tt.tags { + for k, fields := range tt.fields[j] { + require.Greater(t, len(acc.Metrics), n) + m := acc.Metrics[n] + require.Equal(t, measurement, m.Measurement, "%d %d %d: expected measurement '%#v' got '%#v'\n", i, j, k, measurement, m.Measurement) + require.Equal(t, tags, m.Tags, "%d %d %d: expected tags\n%#v got\n%#v\n", i, j, k, tags, m.Tags) + require.Equal(t, fields, m.Fields, "%d %d %d: expected fields\n%#v got\n%#v\n", i, j, k, fields, m.Fields) + n++ + } + } + }) + } +} + +func TestSocketstat_Gather_listerError(t *testing.T) { + errorMessage := "error foobar" + errFoo := errors.New(errorMessage) + ss := &Socketstat{ + SocketProto: []string{"foobar"}, + } + ss.lister = func(cmdName string, proto string, timeout config.Duration) (*bytes.Buffer, error) { + return new(bytes.Buffer), errFoo + } + acc := new(testutil.Accumulator) + err := acc.GatherError(ss.Gather) + require.EqualError(t, err, errorMessage) +} diff --git a/plugins/inputs/socketstat/socketstat_windows.go b/plugins/inputs/socketstat/socketstat_windows.go new file mode 100644 index 000000000..4804257c9 --- /dev/null +++ b/plugins/inputs/socketstat/socketstat_windows.go @@ -0,0 +1,4 @@ +//go:build windows +// +build windows + +package socketstat diff --git a/plugins/inputs/socketstat/testdata/tcp_no_sockets.txt b/plugins/inputs/socketstat/testdata/tcp_no_sockets.txt new file mode 100644 index 000000000..c8fafec2a --- /dev/null +++ b/plugins/inputs/socketstat/testdata/tcp_no_sockets.txt @@ -0,0 +1 @@ +State Recv-Q Send-Q Local Address:Port Peer Address:Port diff --git a/plugins/inputs/socketstat/testdata/tcp_traffic.txt b/plugins/inputs/socketstat/testdata/tcp_traffic.txt new file mode 100644 index 000000000..eb4bb874e --- /dev/null +++ b/plugins/inputs/socketstat/testdata/tcp_traffic.txt @@ -0,0 +1,7 @@ +State Recv-Q Send-Q Local Address:Port Peer Address:Port +ESTAB 0 0 192.168.1.21:6514 192.168.1.21:443 + cubic wscale:7,7 rto:204 rtt:0.057/0.033 ato:40 mss:22976 cwnd:10 bytes_acked:1126 bytes_received:532644751 segs_out:211249 segs_in:211254 data_segs_out:2 data_segs_in:211251 send 32247.0Mbps lastsnd:299082764 lastrcv:5248 lastack:5252 rcv_rtt:3.532 rcv_space:186557 minrtt:0.047 +ESTAB 0 0 192.168.122.1:55194 192.168.122.1:6514 + cubic wscale:7,7 rto:204 rtt:0.034/0.01 ato:40 mss:65483 cwnd:10 bytes_acked:790782896 bytes_received:1126 segs_out:333361 segs_in:333361 data_segs_out:333358 data_segs_in:2 send 154077.6Mbps lastsnd:5248 lastrcv:443892492 lastack:5248 rcv_rtt:250 rcv_space:43690 minrtt:0.009 +ESTAB 0 0 127.0.0.1:7778 127.0.0.1:50378 + cubic wscale:7,7 rto:220 rtt:16.009/21.064 ato:44 mss:65483 cwnd:10 bytes_acked:19983121 bytes_received:266383 segs_out:15431 segs_in:17633 data_segs_out:15119 data_segs_in:5098 send 327.2Mbps lastsnd:9792 lastrcv:9840 lastack:9748 pacing_rate 654.4Mbps retrans:0/1 rcv_rtt:129800 rcv_space:44057 minrtt:0.043 diff --git a/plugins/inputs/socketstat/testdata/udp_no_sockets.txt b/plugins/inputs/socketstat/testdata/udp_no_sockets.txt new file mode 100644 index 000000000..0065bceb4 --- /dev/null +++ b/plugins/inputs/socketstat/testdata/udp_no_sockets.txt @@ -0,0 +1 @@ +Recv-Q Send-Q Local Address:Port Peer Address:Port diff --git a/plugins/inputs/socketstat/testdata/udp_traffic.txt b/plugins/inputs/socketstat/testdata/udp_traffic.txt new file mode 100644 index 000000000..e0ad7b2eb --- /dev/null +++ b/plugins/inputs/socketstat/testdata/udp_traffic.txt @@ -0,0 +1,4 @@ +Recv-Q Send-Q Local Address:Port Peer Address:Port +0 0 10.10.0.4:33149 10.10.0.5:53 +0 0 10.10.0.4:54276 10.10.0.6:53 +0 0 10.10.0.4:38312 10.10.0.7:53