feat(inputs.procstat): Add ability to collect per-process socket statistics (#15423)

This commit is contained in:
Sven Rebhan 2024-07-17 20:44:41 +02:00 committed by GitHub
parent 7245ea96bd
commit fd8cbbf662
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 714 additions and 40 deletions

2
go.mod
View File

@ -468,7 +468,7 @@ require (
github.com/twmb/murmur3 v1.1.7 // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/vishvananda/netlink v1.2.1-beta.2 // indirect
github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21
github.com/vishvananda/netns v0.0.4
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect

6
go.sum
View File

@ -2380,8 +2380,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241 h1:3r4OPQ/jPYQA0C7i149kevHLGSG4JZtrQv2986fXSCo=
github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241/go.mod h1:aGkV5xHz9sBkAckp2hez7khfehKp4YvyBwAmVdVEulg=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netlink v1.2.1-beta.2 h1:Llsql0lnQEbHj0I1OuKyp8otXp0r3q0mPkuhwHfStVs=
github.com/vishvananda/netlink v1.2.1-beta.2/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho=
github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21 h1:tcHUxOT8j/R+0S+A1j8D2InqguXFNxAiij+8QFOlX7Y=
github.com/vishvananda/netlink v1.2.1-beta.2.0.20240524165444-4d4ba1473f21/go.mod h1:whJevzBpTrid75eZy99s3DqCmy05NfibNaF2Ol5Ox5A=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=
github.com/vishvananda/netns v0.0.0-20200728191858-db3c7e526aae/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
@ -2816,7 +2816,6 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -2894,6 +2893,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

View File

@ -64,17 +64,40 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## result in a large number of series, especially with short-lived processes,
## creating high cardinality at the output.
## Available options are:
## cmdline -- full commandline
## pid -- ID of the process
## ppid -- ID of the process' parent
## status -- state of the process
## user -- username owning the process
## cmdline -- full commandline
## pid -- ID of the process
## ppid -- ID of the process' parent
## status -- state of the process
## user -- username owning the process
## socket only options:
## protocol -- protocol type of the process socket
## state -- state of the process socket
## src -- source address of the process socket (non-unix sockets)
## src_port -- source port of the process socket (non-unix sockets)
## dest -- destination address of the process socket (non-unix sockets)
## dest_port -- destination port of the process socket (non-unix sockets)
## name -- name of the process socket (unix sockets only)
# tag_with = []
## Properties to collect
## Available options are "cpu", "limits", "memory", "mmap"
## Available options are
## cpu -- CPU usage statistics
## limits -- set resource limits
## memory -- memory usage statistics
## mmap -- mapped memory usage statistics (caution: can cause high load)
## sockets -- socket statistics for protocols in 'socket_protocols'
# properties = ["cpu", "limits", "memory", "mmap"]
## Protocol filter for the sockets property
## Available options are
## all -- all of the protocols below
## tcp4 -- TCP socket statistics for IPv4
## tcp6 -- TCP socket statistics for IPv6
## udp4 -- UDP socket statistics for IPv4
## udp6 -- UDP socket statistics for IPv6
## unix -- Unix socket statistics
# socket_protocols = ["all"]
## Method to use when finding process IDs. Can be one of 'pgrep', or
## 'native'. The pgrep finder calls the pgrep executable in the PATH while
## the native finder performs the search directly in a manor dependent on the
@ -141,8 +164,8 @@ Below are an example set of tags and fields:
- procstat
- tags:
- pid (when `pid_tag` is true)
- cmdline (when 'cmdline_tag' is true)
- pid (if requested)
- cmdline (if requested)
- process_name
- pidfile (when defined)
- exe (when defined)
@ -231,6 +254,36 @@ Below are an example set of tags and fields:
- pid_count (int)
- running (int)
- result_code (int, success = 0, lookup_error = 1)
- procstat_socket (if configured, Linux only)
- tags:
- pid (if requested)
- protocol (if requested)
- cmdline (if requested)
- process_name
- pidfile (when defined)
- exe (when defined)
- pattern (when defined)
- user (when selected)
- systemd_unit (when defined)
- cgroup (when defined)
- cgroup_full (when cgroup or systemd_unit is used with glob)
- supervisor_unit (when defined)
- win_service (when defined)
- fields:
- protocol
- state
- pid
- src
- src_port (tcp and udp sockets only)
- dest (tcp and udp sockets only)
- dest_port (tcp and udp sockets only)
- bytes_received (tcp sockets only)
- bytes_sent (tcp sockets only)
- lost (tcp sockets only)
- retransmits (tcp sockets only)
- rx_queue
- tx_queue
- inode (unix sockets only)
*NOTE: Resource limit > 2147483647 will be reported as 2147483647.*
@ -239,4 +292,5 @@ Below are an example set of tags and fields:
```text
procstat_lookup,host=prash-laptop,pattern=influxd,pid_finder=pgrep,result=success pid_count=1i,running=1i,result_code=0i 1582089700000000000
procstat,host=prash-laptop,pattern=influxd,process_name=influxd,user=root involuntary_context_switches=151496i,child_minor_faults=1061i,child_major_faults=8i,cpu_time_user=2564.81,pid=32025i,major_faults=8609i,created_at=1580107536000000000i,voluntary_context_switches=1058996i,cpu_time_system=616.98,memory_swap=0i,memory_locked=0i,memory_usage=1.7797634601593018,num_threads=18i,cpu_time_iowait=0,memory_rss=148643840i,memory_vms=1435688960i,memory_data=0i,memory_stack=0i,minor_faults=1856550i 1582089700000000000
procstat_socket,host=prash-laptop,process_name=browser,protocol=tcp4 bytes_received=826987i,bytes_sent=32869i,dest="192.168.0.2",dest_port=443i,lost=0i,pid=32025i,retransmits=0i,rx_queue=0i,src="192.168.0.1",src_port=52106i,state="established",tx_queue=0i 1582089700000000000
```

View File

@ -7,11 +7,16 @@ import (
"errors"
"fmt"
"os"
"github.com/prometheus/procfs"
"strconv"
"strings"
"syscall"
"github.com/coreos/go-systemd/v22/dbus"
"github.com/prometheus/procfs"
"github.com/shirou/gopsutil/v3/net"
"github.com/shirou/gopsutil/v3/process"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
)
func processName(p *process.Process) (string, error) {
@ -103,3 +108,280 @@ func collectTotalReadWrite(proc Process) (r, w uint64, err error) {
return stat.RChar, stat.WChar, nil
}
/* Socket statistics functions */
func socketStateName(s uint8) string {
switch s {
case unix.BPF_TCP_ESTABLISHED:
return "established"
case unix.BPF_TCP_SYN_SENT:
return "syn-sent"
case unix.BPF_TCP_SYN_RECV:
return "syn-recv"
case unix.BPF_TCP_FIN_WAIT1:
return "fin-wait1"
case unix.BPF_TCP_FIN_WAIT2:
return "fin-wait2"
case unix.BPF_TCP_TIME_WAIT:
return "time-wait"
case unix.BPF_TCP_CLOSE:
return "closed"
case unix.BPF_TCP_CLOSE_WAIT:
return "close-wait"
case unix.BPF_TCP_LAST_ACK:
return "last-ack"
case unix.BPF_TCP_LISTEN:
return "listen"
case unix.BPF_TCP_CLOSING:
return "closing"
case unix.BPF_TCP_NEW_SYN_RECV:
return "sync-recv"
}
return "unknown"
}
func socketTypeName(t uint8) string {
switch t {
case syscall.SOCK_STREAM:
return "stream"
case syscall.SOCK_DGRAM:
return "dgram"
case syscall.SOCK_RAW:
return "raw"
case syscall.SOCK_RDM:
return "rdm"
case syscall.SOCK_SEQPACKET:
return "seqpacket"
case syscall.SOCK_DCCP:
return "dccp"
case syscall.SOCK_PACKET:
return "packet"
}
return "unknown"
}
func mapFdToInode(pid int32, fd uint32) (uint32, error) {
root := os.Getenv("HOST_PROC")
if root == "" {
root = "/proc"
}
fn := fmt.Sprintf("%s/%d/fd/%d", root, pid, fd)
link, err := os.Readlink(fn)
if err != nil {
return 0, fmt.Errorf("reading link failed: %w", err)
}
target := strings.TrimPrefix(link, "socket:[")
target = strings.TrimSuffix(target, "]")
inode, err := strconv.ParseUint(target, 10, 32)
if err != nil {
return 0, fmt.Errorf("parsing link %q: %w", link, err)
}
return uint32(inode), nil
}
func statsTCP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
// For TCP we need the inode for each connection to relate the connection
// statistics to the actual process socket. Therefore, map the
// file-descriptors to inodes using the /proc/<pid>/fd entries.
inodes := make(map[uint32]net.ConnectionStat, len(conns))
for _, c := range conns {
inode, err := mapFdToInode(c.Pid, c.Fd)
if err != nil {
return nil, fmt.Errorf("mapping fd %d of pid %d failed: %w", c.Fd, c.Pid, err)
}
inodes[inode] = c
}
// Get the TCP socket statistics from the netlink socket.
responses, err := netlink.SocketDiagTCPInfo(family)
if err != nil {
return nil, fmt.Errorf("connecting to diag socket failed: %w", err)
}
// Filter the responses via the inodes belonging to the process
fieldslist := make([]map[string]interface{}, 0)
for _, r := range responses {
c, found := inodes[r.InetDiagMsg.INode]
if !found {
// The inode does not belong to the process.
continue
}
var proto string
switch r.InetDiagMsg.Family {
case syscall.AF_INET:
proto = "tcp4"
case syscall.AF_INET6:
proto = "tcp6"
default:
continue
}
fields := map[string]interface{}{
"protocol": proto,
"state": socketStateName(r.InetDiagMsg.State),
"pid": c.Pid,
"src": r.InetDiagMsg.ID.Source.String(),
"src_port": r.InetDiagMsg.ID.SourcePort,
"dest": r.InetDiagMsg.ID.Destination.String(),
"dest_port": r.InetDiagMsg.ID.DestinationPort,
"bytes_received": r.TCPInfo.Bytes_received,
"bytes_sent": r.TCPInfo.Bytes_sent,
"lost": r.TCPInfo.Lost,
"retransmits": r.TCPInfo.Retransmits,
"rx_queue": r.InetDiagMsg.RQueue,
"tx_queue": r.InetDiagMsg.WQueue,
}
fieldslist = append(fieldslist, fields)
}
return fieldslist, nil
}
func statsUDP(conns []net.ConnectionStat, family uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
// For UDP we need the inode for each connection to relate the connection
// statistics to the actual process socket. Therefore, map the
// file-descriptors to inodes using the /proc/<pid>/fd entries.
inodes := make(map[uint32]net.ConnectionStat, len(conns))
for _, c := range conns {
inode, err := mapFdToInode(c.Pid, c.Fd)
if err != nil {
return nil, fmt.Errorf("mapping fd %d of pid %d failed: %w", c.Fd, c.Pid, err)
}
inodes[inode] = c
}
// Get the UDP socket statistics from the netlink socket.
responses, err := netlink.SocketDiagUDPInfo(family)
if err != nil {
return nil, fmt.Errorf("connecting to diag socket failed: %w", err)
}
// Filter the responses via the inodes belonging to the process
fieldslist := make([]map[string]interface{}, 0)
for _, r := range responses {
c, found := inodes[r.InetDiagMsg.INode]
if !found {
// The inode does not belong to the process.
continue
}
var proto string
switch r.InetDiagMsg.Family {
case syscall.AF_INET:
proto = "udp4"
case syscall.AF_INET6:
proto = "udp6"
default:
continue
}
fields := map[string]interface{}{
"protocol": proto,
"state": socketStateName(r.InetDiagMsg.State),
"pid": c.Pid,
"src": r.InetDiagMsg.ID.Source.String(),
"src_port": r.InetDiagMsg.ID.SourcePort,
"dest": r.InetDiagMsg.ID.Destination.String(),
"dest_port": r.InetDiagMsg.ID.DestinationPort,
"rx_queue": r.InetDiagMsg.RQueue,
"tx_queue": r.InetDiagMsg.WQueue,
}
fieldslist = append(fieldslist, fields)
}
return fieldslist, nil
}
func statsUnix(conns []net.ConnectionStat) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
// We need to read the inode for each connection to relate the connection
// statistics to the actual process socket. Therefore, map the
// file-descriptors to inodes using the /proc/<pid>/fd entries.
inodes := make(map[uint32]net.ConnectionStat, len(conns))
for _, c := range conns {
inode, err := mapFdToInode(c.Pid, c.Fd)
if err != nil {
return nil, fmt.Errorf("mapping fd %d of pid %d failed: %w", c.Fd, c.Pid, err)
}
inodes[inode] = c
}
// Get the UDP socket statistics from the netlink socket.
responses, err := netlink.UnixSocketDiagInfo()
if err != nil {
return nil, fmt.Errorf("connecting to diag socket failed: %w", err)
}
// Filter the responses via the inodes belonging to the process
fieldslist := make([]map[string]interface{}, 0)
for _, r := range responses {
// Check if the inode belongs to the process and skip otherwise
c, found := inodes[r.DiagMsg.INode]
if !found {
continue
}
name := c.Laddr.IP
if name == "" {
name = fmt.Sprintf("inode-%d", r.DiagMsg.INode)
}
fields := map[string]interface{}{
"protocol": "unix",
"type": "stream",
"state": socketStateName(r.DiagMsg.State),
"pid": c.Pid,
"name": name,
"rx_queue": r.Queue.RQueue,
"tx_queue": r.Queue.WQueue,
"inode": r.DiagMsg.INode,
}
if r.Peer != nil {
fields["peer"] = *r.Peer
}
fieldslist = append(fieldslist, fields)
}
// Diagnosis only works for stream sockets, so add all non-stream sockets
// of the process without further data
for inode, c := range inodes {
if c.Type == syscall.SOCK_STREAM {
continue
}
name := c.Laddr.IP
if name == "" {
name = fmt.Sprintf("inode-%d", inode)
}
fields := map[string]interface{}{
"protocol": "unix",
"type": socketTypeName(uint8(c.Type)),
"state": "close",
"pid": c.Pid,
"name": name,
"rx_queue": uint32(0),
"tx_queue": uint32(0),
"inode": inode,
}
fieldslist = append(fieldslist, fields)
}
return fieldslist, nil
}

View File

@ -4,7 +4,9 @@ package procstat
import (
"errors"
"syscall"
"github.com/shirou/gopsutil/v3/net"
"github.com/shirou/gopsutil/v3/process"
)
@ -12,20 +14,90 @@ func processName(p *process.Process) (string, error) {
return p.Exe()
}
func queryPidWithWinServiceName(_ string) (uint32, error) {
func queryPidWithWinServiceName(string) (uint32, error) {
return 0, errors.New("os not supporting win_service option")
}
func collectMemmap(Process, string, map[string]any) {}
func findBySystemdUnits(_ []string) ([]processGroup, error) {
func findBySystemdUnits([]string) ([]processGroup, error) {
return nil, nil
}
func findByWindowsServices(_ []string) ([]processGroup, error) {
func findByWindowsServices([]string) ([]processGroup, error) {
return nil, nil
}
func collectTotalReadWrite(_ Process) (r, w uint64, err error) {
func collectTotalReadWrite(Process) (r, w uint64, err error) {
return 0, 0, errors.ErrUnsupported
}
func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
// Filter the responses via the inodes belonging to the process
fieldslist := make([]map[string]interface{}, 0, len(conns))
for _, c := range conns {
var proto string
switch c.Family {
case syscall.AF_INET:
proto = "tcp4"
case syscall.AF_INET6:
proto = "tcp6"
default:
continue
}
fields := map[string]interface{}{
"protocol": proto,
"state": c.Status,
"pid": c.Pid,
"src": c.Laddr.IP,
"src_port": c.Laddr.Port,
"dest": c.Raddr.IP,
"dest_port": c.Raddr.Port,
}
fieldslist = append(fieldslist, fields)
}
return fieldslist, nil
}
func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
// Filter the responses via the inodes belonging to the process
fieldslist := make([]map[string]interface{}, 0, len(conns))
for _, c := range conns {
var proto string
switch c.Family {
case syscall.AF_INET:
proto = "udp4"
case syscall.AF_INET6:
proto = "udp6"
default:
continue
}
fields := map[string]interface{}{
"protocol": proto,
"state": c.Status,
"pid": c.Pid,
"src": c.Laddr.IP,
"src_port": c.Laddr.Port,
"dest": c.Raddr.IP,
"dest_port": c.Raddr.Port,
}
fieldslist = append(fieldslist, fields)
}
return fieldslist, nil
}
func statsUnix([]net.ConnectionStat) ([]map[string]interface{}, error) {
return nil, errors.ErrUnsupported
}

View File

@ -5,8 +5,10 @@ package procstat
import (
"errors"
"fmt"
"syscall"
"unsafe"
"github.com/shirou/gopsutil/v3/net"
"github.com/shirou/gopsutil/v3/process"
"golang.org/x/sys/windows"
"golang.org/x/sys/windows/svc/mgr"
@ -57,7 +59,7 @@ func queryPidWithWinServiceName(winServiceName string) (uint32, error) {
func collectMemmap(Process, string, map[string]any) {}
func findBySystemdUnits(_ []string) ([]processGroup, error) {
func findBySystemdUnits([]string) ([]processGroup, error) {
return nil, nil
}
@ -83,6 +85,76 @@ func findByWindowsServices(services []string) ([]processGroup, error) {
return groups, nil
}
func collectTotalReadWrite(_ Process) (r, w uint64, err error) {
func collectTotalReadWrite(Process) (r, w uint64, err error) {
return 0, 0, errors.ErrUnsupported
}
func statsTCP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
// Filter the responses via the inodes belonging to the process
fieldslist := make([]map[string]interface{}, 0, len(conns))
for _, c := range conns {
var proto string
switch c.Family {
case syscall.AF_INET:
proto = "tcp4"
case syscall.AF_INET6:
proto = "tcp6"
default:
continue
}
fields := map[string]interface{}{
"protocol": proto,
"state": c.Status,
"pid": c.Pid,
"src": c.Laddr.IP,
"src_port": c.Laddr.Port,
"dest": c.Raddr.IP,
"dest_port": c.Raddr.Port,
}
fieldslist = append(fieldslist, fields)
}
return fieldslist, nil
}
func statsUDP(conns []net.ConnectionStat, _ uint8) ([]map[string]interface{}, error) {
if len(conns) == 0 {
return nil, nil
}
// Filter the responses via the inodes belonging to the process
fieldslist := make([]map[string]interface{}, 0, len(conns))
for _, c := range conns {
var proto string
switch c.Family {
case syscall.AF_INET:
proto = "udp4"
case syscall.AF_INET6:
proto = "udp6"
default:
continue
}
fields := map[string]interface{}{
"protocol": proto,
"state": c.Status,
"pid": c.Pid,
"src": c.Laddr.IP,
"src_port": c.Laddr.Port,
"dest": c.Raddr.IP,
"dest_port": c.Raddr.Port,
}
fieldslist = append(fieldslist, fields)
}
return fieldslist, nil
}
func statsUnix([]net.ConnectionStat) ([]map[string]interface{}, error) {
return nil, nil
}

View File

@ -2,10 +2,13 @@ package procstat
import (
"errors"
"fmt"
"runtime"
"strconv"
"syscall"
"time"
gopsnet "github.com/shirou/gopsutil/v3/net"
"github.com/shirou/gopsutil/v3/process"
"github.com/influxdata/telegraf"
@ -17,7 +20,7 @@ type Process interface {
Name() (string, error)
SetTag(string, string)
MemoryMaps(bool) (*[]process.MemoryMapsStat, error)
Metric(string, *collectionConfig) telegraf.Metric
Metrics(string, *collectionConfig, time.Time) ([]telegraf.Metric, error)
}
type PIDFinder interface {
@ -66,7 +69,7 @@ func (p *Proc) percent(_ time.Duration) (float64, error) {
}
// Add metrics a single Process
func (p *Proc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric {
func (p *Proc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) {
if prefix != "" {
prefix += "_"
}
@ -245,5 +248,133 @@ func (p *Proc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric {
}
}
return metric.New("procstat", p.tags, fields, time.Time{})
metrics := []telegraf.Metric{metric.New("procstat", p.tags, fields, t)}
// Collect the socket statistics if requested
if cfg.features["sockets"] {
for _, protocol := range cfg.socketProtos {
// Get the requested connections for the PID
var fieldlist []map[string]interface{}
switch protocol {
case "all":
conns, err := gopsnet.ConnectionsPid(protocol, p.Pid)
if err != nil {
return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid)
}
var connsTCPv4, connsTCPv6, connsUDPv4, connsUDPv6, connsUnix []gopsnet.ConnectionStat
for _, c := range conns {
switch {
case c.Family == syscall.AF_INET && c.Type == syscall.SOCK_STREAM:
connsTCPv4 = append(connsTCPv4, c)
case c.Family == syscall.AF_INET6 && c.Type == syscall.SOCK_STREAM:
connsTCPv6 = append(connsTCPv6, c)
case c.Family == syscall.AF_INET && c.Type == syscall.SOCK_DGRAM:
connsUDPv4 = append(connsUDPv4, c)
case c.Family == syscall.AF_INET6 && c.Type == syscall.SOCK_DGRAM:
connsUDPv6 = append(connsUDPv6, c)
case c.Family == syscall.AF_UNIX:
connsUnix = append(connsUnix, c)
}
}
fl, err := statsTCP(connsTCPv4, syscall.AF_INET)
if err != nil {
return metrics, fmt.Errorf("cannot get statistics for \"tcp4\" of PID %d", p.Pid)
}
fieldlist = append(fieldlist, fl...)
fl, err = statsTCP(connsTCPv6, syscall.AF_INET6)
if err != nil {
return metrics, fmt.Errorf("cannot get statistics for \"tcp6\" of PID %d", p.Pid)
}
fieldlist = append(fieldlist, fl...)
fl, err = statsUDP(connsUDPv4, syscall.AF_INET)
if err != nil {
return metrics, fmt.Errorf("cannot get statistics for \"udp4\" of PID %d", p.Pid)
}
fieldlist = append(fieldlist, fl...)
fl, err = statsUDP(connsUDPv6, syscall.AF_INET6)
if err != nil {
return metrics, fmt.Errorf("cannot get statistics for \"udp6\" of PID %d", p.Pid)
}
fieldlist = append(fieldlist, fl...)
fl, err = statsUnix(connsUnix)
if err != nil {
return metrics, fmt.Errorf("cannot get statistics for \"unix\" of PID %d", p.Pid)
}
fieldlist = append(fieldlist, fl...)
case "tcp4", "tcp6":
family := uint8(syscall.AF_INET)
if protocol == "tcp6" {
family = syscall.AF_INET6
}
conns, err := gopsnet.ConnectionsPid(protocol, p.Pid)
if err != nil {
return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid)
}
if fieldlist, err = statsTCP(conns, family); err != nil {
return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid)
}
case "udp4", "udp6":
family := uint8(syscall.AF_INET)
if protocol == "udp6" {
family = syscall.AF_INET6
}
conns, err := gopsnet.ConnectionsPid(protocol, p.Pid)
if err != nil {
return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid)
}
if fieldlist, err = statsUDP(conns, family); err != nil {
return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid)
}
case "unix":
conns, err := gopsnet.ConnectionsPid(protocol, p.Pid)
if err != nil {
return metrics, fmt.Errorf("cannot get connections for %q of PID %d", protocol, p.Pid)
}
if fieldlist, err = statsUnix(conns); err != nil {
return metrics, fmt.Errorf("cannot get statistics for %q of PID %d", protocol, p.Pid)
}
}
for _, fields := range fieldlist {
if cfg.tagging["protocol"] {
p.tags["protocol"] = fields["protocol"].(string)
delete(fields, "protocol")
}
if cfg.tagging["state"] {
p.tags["state"] = fields["state"].(string)
delete(fields, "state")
}
if cfg.tagging["src"] && fields["src"] != nil {
p.tags["src"] = fields["src"].(string)
delete(fields, "src")
}
if cfg.tagging["src_port"] && fields["src_port"] != nil {
port := uint64(fields["src_port"].(uint16))
p.tags["src_port"] = strconv.FormatUint(port, 10)
delete(fields, "src_port")
}
if cfg.tagging["dest"] && fields["dest"] != nil {
p.tags["dest"] = fields["dest"].(string)
delete(fields, "dest")
}
if cfg.tagging["dest_port"] && fields["dest_port"] != nil {
port := uint64(fields["dest_port"].(uint16))
p.tags["dest_port"] = strconv.FormatUint(port, 10)
delete(fields, "dest_port")
}
if cfg.tagging["name"] && fields["name"] != nil {
p.tags["name"] = fields["name"].(string)
delete(fields, "name")
}
metrics = append(metrics, metric.New("procstat_socket", p.tags, fields, t))
}
}
}
return metrics, nil
}

View File

@ -10,6 +10,7 @@ import (
"os/exec"
"path/filepath"
"runtime"
"slices"
"strconv"
"strings"
"time"
@ -30,9 +31,10 @@ var execCommand = exec.Command
type PID int32
type collectionConfig struct {
solarisMode bool
tagging map[string]bool
features map[string]bool
solarisMode bool
tagging map[string]bool
features map[string]bool
socketProtos []string
}
type Procstat struct {
@ -53,6 +55,7 @@ type Procstat struct {
WinService string `toml:"win_service"`
Mode string `toml:"mode"`
Properties []string `toml:"properties"`
SocketProtocols []string `toml:"socket_protocols"`
TagWith []string `toml:"tag_with"`
Filter []Filter `toml:"filter"`
Log telegraf.Logger `toml:"-"`
@ -96,6 +99,10 @@ func (p *Procstat) Init() error {
for _, tag := range p.TagWith {
switch tag {
case "cmdline", "pid", "ppid", "status", "user":
case "protocol", "state", "src", "src_port", "dest", "dest_port", "name": // socket only
if !slices.Contains(p.Properties, "sockets") {
return fmt.Errorf("socket tagging option %q specified without sockets enabled", tag)
}
default:
return fmt.Errorf("invalid 'tag_with' setting %q", tag)
}
@ -107,6 +114,27 @@ func (p *Procstat) Init() error {
for _, prop := range p.Properties {
switch prop {
case "cpu", "limits", "memory", "mmap":
case "sockets":
if len(p.SocketProtocols) == 0 {
p.SocketProtocols = []string{"all"}
}
protos := make(map[string]bool, len(p.SocketProtocols))
for _, proto := range p.SocketProtocols {
switch proto {
case "all":
if len(protos) > 0 || len(p.SocketProtocols) > 1 {
return errors.New("additional 'socket_protocol' settings besides 'all' are not allowed")
}
case "tcp4", "tcp6", "udp4", "udp6", "unix":
default:
return fmt.Errorf("invalid 'socket_protocol' setting %q", proto)
}
if protos[proto] {
return fmt.Errorf("duplicate %q in 'socket_protocol' setting", proto)
}
protos[proto] = true
p.cfg.socketProtos = append(p.cfg.socketProtos, proto)
}
default:
return fmt.Errorf("invalid 'properties' setting %q", prop)
}
@ -252,9 +280,15 @@ func (p *Procstat) gatherOld(acc telegraf.Accumulator) error {
p.processes[pid] = proc
}
running[pid] = true
m := proc.Metric(p.Prefix, &p.cfg)
m.SetTime(now)
acc.AddMetric(m)
metrics, err := proc.Metrics(p.Prefix, &p.cfg, now)
if err != nil {
// Continue after logging an error as there might still be
// metrics available
acc.AddError(err)
}
for _, m := range metrics {
acc.AddMetric(m)
}
}
}
@ -351,9 +385,15 @@ func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
p.processes[pid] = proc
}
running[pid] = true
m := proc.Metric(p.Prefix, &p.cfg)
m.SetTime(now)
acc.AddMetric(m)
metrics, err := proc.Metrics(p.Prefix, &p.cfg, now)
if err != nil {
// Continue after logging an error as there might still be
// metrics available
acc.AddError(err)
}
for _, m := range metrics {
acc.AddMetric(m)
}
}
}

View File

@ -142,7 +142,7 @@ func (p *testProc) MemoryMaps(bool) (*[]process.MemoryMapsStat, error) {
return &[]process.MemoryMapsStat{}, nil
}
func (p *testProc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric {
func (p *testProc) Metrics(prefix string, cfg *collectionConfig, t time.Time) ([]telegraf.Metric, error) {
if prefix != "" {
prefix += "_"
}
@ -212,7 +212,7 @@ func (p *testProc) Metric(prefix string, cfg *collectionConfig) telegraf.Metric
fields[prefix+"user"] = "testuser"
}
return metric.New("procstat", tags, fields, time.Time{})
return []telegraf.Metric{metric.New("procstat", tags, fields, t)}, nil
}
var pid = PID(42)

View File

@ -35,17 +35,40 @@
## result in a large number of series, especially with short-lived processes,
## creating high cardinality at the output.
## Available options are:
## cmdline -- full commandline
## pid -- ID of the process
## ppid -- ID of the process' parent
## status -- state of the process
## user -- username owning the process
## cmdline -- full commandline
## pid -- ID of the process
## ppid -- ID of the process' parent
## status -- state of the process
## user -- username owning the process
## socket only options:
## protocol -- protocol type of the process socket
## state -- state of the process socket
## src -- source address of the process socket (non-unix sockets)
## src_port -- source port of the process socket (non-unix sockets)
## dest -- destination address of the process socket (non-unix sockets)
## dest_port -- destination port of the process socket (non-unix sockets)
## name -- name of the process socket (unix sockets only)
# tag_with = []
## Properties to collect
## Available options are "cpu", "limits", "memory", "mmap"
## Available options are
## cpu -- CPU usage statistics
## limits -- set resource limits
## memory -- memory usage statistics
## mmap -- mapped memory usage statistics (caution: can cause high load)
## sockets -- socket statistics for protocols in 'socket_protocols'
# properties = ["cpu", "limits", "memory", "mmap"]
## Protocol filter for the sockets property
## Available options are
## all -- all of the protocols below
## tcp4 -- TCP socket statistics for IPv4
## tcp6 -- TCP socket statistics for IPv6
## udp4 -- UDP socket statistics for IPv4
## udp6 -- UDP socket statistics for IPv6
## unix -- Unix socket statistics
# socket_protocols = ["all"]
## Method to use when finding process IDs. Can be one of 'pgrep', or
## 'native'. The pgrep finder calls the pgrep executable in the PATH while
## the native finder performs the search directly in a manor dependent on the