feat(inputs.procstat): Allow multiple selection criteria (#14948)

Co-authored-by: Joshua Powers <powersj@fastmail.com>
This commit is contained in:
Sven Rebhan 2024-04-19 06:12:37 -04:00 committed by GitHub
parent fa0dbba658
commit 2acae45d09
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 677 additions and 33 deletions

View File

@ -77,6 +77,34 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## the native finder performs the search directly in a manor dependent on the
## platform. Default is 'pgrep'
# pid_finder = "pgrep"
## New-style filtering configuration (multiple filter sections are allowed)
# [[inputs.procstat.filter]]
# ## Name of the filter added as 'filter' tag
# name = "shell"
#
# ## Service filters, only one is allowed
# ## Systemd unit names (wildcards are supported)
# # systemd_units = []
# ## CGroup name or path (wildcards are supported)
# # cgroups = []
# ## Supervisor service names of hypervisorctl management
# # supervisor_units = []
# ## Windows service names
# # win_service = []
#
# ## Process filters, multiple are allowed
# ## Regular expressions to use for matching againt the full command
# # patterns = ['.*']
# ## List of users owning the process (wildcards are supported)
# # users = ['*']
# ## List of executable paths of the process (wildcards are supported)
# # executables = ['*']
# ## List of process names (wildcards are supported)
# # process_names = ['*']
# ## Recursion depth for determining children of the matched processes
# ## A negative value means all children with infinite depth
# # recursion_depth = 0
```
### Windows support

View File

@ -0,0 +1,226 @@
package procstat
import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"github.com/influxdata/telegraf/filter"
"github.com/shirou/gopsutil/v3/process"
)
type Filter struct {
Name string `toml:"name"`
PidFiles []string `toml:"pid_files"`
SystemdUnits []string `toml:"systemd_units"`
SupervisorUnits []string `toml:"supervisor_units"`
WinService []string `toml:"win_services"`
CGroups []string `toml:"cgroups"`
Patterns []string `toml:"patterns"`
Users []string `toml:"users"`
Executables []string `toml:"executables"`
ProcessNames []string `toml:"process_names"`
RecursionDepth int `toml:"recursion_depth"`
filterSupervisorUnit string
filterCmds []*regexp.Regexp
filterUser filter.Filter
filterExecutable filter.Filter
filterProcessName filter.Filter
}
func (f *Filter) Init() error {
if f.Name == "" {
return errors.New("filter must be named")
}
// Check for only one service selector being active
var active []string
if len(f.PidFiles) > 0 {
active = append(active, "pid_files")
}
if len(f.CGroups) > 0 {
active = append(active, "cgroups")
}
if len(f.SystemdUnits) > 0 {
active = append(active, "systemd_units")
}
if len(f.SupervisorUnits) > 0 {
active = append(active, "supervisor_units")
}
if len(f.WinService) > 0 {
active = append(active, "win_services")
}
if len(active) > 1 {
return fmt.Errorf("cannot select multiple services %q", strings.Join(active, ", "))
}
// Prepare the filters
f.filterCmds = make([]*regexp.Regexp, 0, len(f.Patterns))
for _, p := range f.Patterns {
re, err := regexp.Compile(p)
if err != nil {
return fmt.Errorf("compiling pattern %q of filter %q failed: %w", p, f.Name, err)
}
f.filterCmds = append(f.filterCmds, re)
}
f.filterSupervisorUnit = strings.TrimSpace(strings.Join(f.SupervisorUnits, " "))
var err error
if f.filterUser, err = filter.Compile(f.Users); err != nil {
return fmt.Errorf("compiling users filter for %q failed: %w", f.Name, err)
}
if f.filterExecutable, err = filter.Compile(f.Executables); err != nil {
return fmt.Errorf("compiling executables filter for %q failed: %w", f.Name, err)
}
if f.filterProcessName, err = filter.Compile(f.ProcessNames); err != nil {
return fmt.Errorf("compiling process-names filter for %q failed: %w", f.Name, err)
}
return nil
}
func (f *Filter) ApplyFilter() ([]processGroup, error) {
// Determine processes on service level. if there is no constraint on the
// services, use all processes for matching.
var groups []processGroup
switch {
case len(f.PidFiles) > 0:
g, err := findByPidFiles(f.PidFiles)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case len(f.CGroups) > 0:
g, err := findByCgroups(f.CGroups)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case len(f.SystemdUnits) > 0:
g, err := findBySystemdUnits(f.CGroups)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case f.filterSupervisorUnit != "":
g, err := findBySupervisorUnits(f.filterSupervisorUnit)
if err != nil {
return nil, err
}
groups = append(groups, g...)
case len(f.WinService) > 0:
g, err := findByWindowsServices(f.WinService)
if err != nil {
return nil, err
}
groups = append(groups, g...)
default:
procs, err := process.Processes()
if err != nil {
return nil, err
}
groups = append(groups, processGroup{processes: procs, tags: make(map[string]string)})
}
// Filter by additional properties such as users, patterns etc
result := make([]processGroup, 0, len(groups))
for _, g := range groups {
var matched []*process.Process
for _, p := range g.processes {
// Users
if f.filterUser != nil {
if username, err := p.Username(); err != nil || !f.filterUser.Match(username) {
// Errors can happen if we don't have permissions or the process no longer exists
continue
}
}
// Executables
if f.filterExecutable != nil {
if exe, err := p.Exe(); err != nil || !f.filterExecutable.Match(exe) {
continue
}
}
// Process names
if f.filterProcessName != nil {
if name, err := p.Name(); err != nil || !f.filterProcessName.Match(name) {
continue
}
}
// Patterns
if len(f.filterCmds) > 0 {
cmd, err := p.Cmdline()
if err != nil {
// This can happen if we don't have permissions or the process no longer exists
continue
}
var found bool
for _, re := range f.filterCmds {
if re.MatchString(cmd) {
found = true
break
}
}
if !found {
continue
}
}
matched = append(matched, p)
}
result = append(result, processGroup{processes: matched, tags: g.tags})
}
// Resolve children down to the requested depth
previous := result
for depth := 0; depth < f.RecursionDepth || f.RecursionDepth < 0; depth++ {
children := make([]processGroup, 0, len(previous))
for _, group := range previous {
for _, p := range group.processes {
c, err := getChildren(p)
if err != nil {
return nil, fmt.Errorf("unable to get children of process %d: %w", p.Pid, err)
}
if len(c) == 0 {
continue
}
tags := make(map[string]string, len(group.tags)+1)
for k, v := range group.tags {
tags[k] = v
}
tags["parent_pid"] = strconv.FormatInt(int64(p.Pid), 10)
children = append(children, processGroup{
processes: c,
tags: tags,
})
}
}
if len(children) == 0 {
break
}
result = append(result, children...)
previous = children
}
return result, nil
}
func getChildren(p *process.Process) ([]*process.Process, error) {
children, err := p.Children()
// Check for cases that do not really mean error but rather means that there
// is no match.
switch {
case err == nil,
errors.Is(err, process.ErrorNoChildren),
strings.Contains(err.Error(), "exit status 1"):
return children, nil
}
return nil, fmt.Errorf("unable to get children of process %d: %w", p.Pid, err)
}

View File

@ -3,8 +3,11 @@
package procstat
import (
"context"
"errors"
"fmt"
"github.com/coreos/go-systemd/v22/dbus"
"github.com/shirou/gopsutil/v3/process"
)
@ -31,3 +34,45 @@ func collectMemmap(proc Process, prefix string, fields map[string]any) {
fields[prefix+"memory_swap"] = memMap.Swap
}
}
func findBySystemdUnits(units []string) ([]processGroup, error) {
ctx := context.Background()
conn, err := dbus.NewSystemConnectionContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to connect to systemd: %w", err)
}
defer conn.Close()
sdunits, err := conn.ListUnitsByPatternsContext(ctx, []string{"enabled", "disabled", "static"}, units)
if err != nil {
return nil, fmt.Errorf("failed to list units: %w", err)
}
groups := make([]processGroup, 0, len(sdunits))
for _, u := range sdunits {
prop, err := conn.GetUnitTypePropertyContext(ctx, u.Name, "Service", "MainPID")
if err != nil {
// This unit might not be a service or similar
continue
}
raw := prop.Value.Value()
pid, ok := raw.(uint32)
if !ok {
return nil, fmt.Errorf("failed to parse PID %v of unit %q: invalid type %T", raw, u, raw)
}
p, err := process.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of unit %q: %w", pid, u, err)
}
groups = append(groups, processGroup{
processes: []*process.Process{p},
tags: map[string]string{"systemd_unit": u.Name},
})
}
return groups, nil
}
func findByWindowsServices(_ []string) ([]processGroup, error) {
return nil, nil
}

View File

@ -17,3 +17,11 @@ func queryPidWithWinServiceName(_ string) (uint32, error) {
}
func collectMemmap(Process, string, map[string]any) {}
func findBySystemdUnits(_ []string) ([]processGroup, error) {
return nil, nil
}
func findByWindowsServices(_ []string) ([]processGroup, error) {
return nil, nil
}

View File

@ -4,6 +4,7 @@ package procstat
import (
"errors"
"fmt"
"unsafe"
"github.com/shirou/gopsutil/v3/process"
@ -55,3 +56,29 @@ func queryPidWithWinServiceName(winServiceName string) (uint32, error) {
}
func collectMemmap(Process, string, map[string]any) {}
func findBySystemdUnits(_ []string) ([]processGroup, error) {
return nil, nil
}
func findByWindowsServices(services []string) ([]processGroup, error) {
groups := make([]processGroup, 0, len(services))
for _, service := range services {
pid, err := queryPidWithWinServiceName(service)
if err != nil {
return nil, fmt.Errorf("failed to query PID of service %q: %w", service, err)
}
p, err := process.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of service %q: %w", pid, service, err)
}
groups = append(groups, processGroup{
processes: []*process.Process{p},
tags: map[string]string{"win_service": service},
})
}
return groups, nil
}

View File

@ -14,6 +14,8 @@ import (
"strings"
"time"
"github.com/shirou/gopsutil/v3/process"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/inputs"
@ -46,12 +48,14 @@ type Procstat struct {
WinService string `toml:"win_service"`
Mode string `toml:"mode"`
TagWith []string `toml:"tag_with"`
Filter []Filter `toml:"filter"`
Log telegraf.Logger `toml:"-"`
solarisMode bool
finder PIDFinder
processes map[PID]Process
tagging map[string]bool
oldMode bool
createProcess func(PID) (Process, error)
}
@ -61,6 +65,11 @@ type PidsTags struct {
Tags map[string]string
}
type processGroup struct {
processes []*process.Process
tags map[string]string
}
func (*Procstat) SampleConfig() string {
return sampleConfig
}
@ -86,44 +95,64 @@ func (p *Procstat) Init() error {
p.tagging[tag] = true
}
// Keep the old settings for compatibility
for _, u := range p.SupervisorUnit {
if !choice.Contains(u, p.SupervisorUnits) {
p.SupervisorUnits = append(p.SupervisorUnits, u)
// Check if we got any new-style configuration options and determine
// operation mode.
p.oldMode = len(p.Filter) == 0
if p.oldMode {
// Keep the old settings for compatibility
for _, u := range p.SupervisorUnit {
if !choice.Contains(u, p.SupervisorUnits) {
p.SupervisorUnits = append(p.SupervisorUnits, u)
}
}
}
// Check filtering
switch {
case len(p.SupervisorUnits) > 0, p.SystemdUnit != "", p.WinService != "",
p.CGroup != "", p.PidFile != "", p.Exe != "", p.Pattern != "",
p.User != "":
// Do nothing as those are valid settings
default:
return errors.New("require filter option but none set")
}
// Check filtering
switch {
case len(p.SupervisorUnits) > 0, p.SystemdUnit != "", p.WinService != "",
p.CGroup != "", p.PidFile != "", p.Exe != "", p.Pattern != "",
p.User != "":
// Do nothing as those are valid settings
default:
return errors.New("require filter option but none set")
}
// Instantiate the finder
switch p.PidFinder {
case "", "pgrep":
p.PidFinder = "pgrep"
finder, err := newPgrepFinder()
if err != nil {
return fmt.Errorf("creating pgrep finder failed: %w", err)
// Instantiate the finder
switch p.PidFinder {
case "", "pgrep":
p.PidFinder = "pgrep"
finder, err := newPgrepFinder()
if err != nil {
return fmt.Errorf("creating pgrep finder failed: %w", err)
}
p.finder = finder
case "native":
// gopsutil relies on pgrep when looking up children on darwin
// see https://github.com/shirou/gopsutil/blob/v3.23.10/process/process_darwin.go#L235
requiresChildren := len(p.SupervisorUnits) > 0 && p.Pattern != ""
if requiresChildren && runtime.GOOS == "darwin" {
return errors.New("configuration requires 'pgrep' finder on your OS")
}
p.finder = &NativeFinder{}
case "test":
p.Log.Warn("running in test mode")
default:
return fmt.Errorf("unknown pid_finder %q", p.PidFinder)
}
p.finder = finder
case "native":
// gopsutil relies on pgrep when looking up children on darwin
// see https://github.com/shirou/gopsutil/blob/v3.23.10/process/process_darwin.go#L235
requiresChildren := len(p.SupervisorUnits) > 0 && p.Pattern != ""
if requiresChildren && runtime.GOOS == "darwin" {
return errors.New("configuration requires 'pgrep' finder on your OS")
} else {
// Check for mixed mode
switch {
case p.PidFile != "", p.Exe != "", p.Pattern != "", p.User != "",
p.SystemdUnit != "", len(p.SupervisorUnit) > 0,
len(p.SupervisorUnits) > 0, p.CGroup != "", p.WinService != "":
return errors.New("cannot operate in mixed mode with filters and old-style config")
}
// New-style operations
for i := range p.Filter {
if err := p.Filter[i].Init(); err != nil {
return fmt.Errorf("initializing filter %d failed: %w", i, err)
}
}
p.finder = &NativeFinder{}
case "test":
p.Log.Warn("running in test mode")
default:
return fmt.Errorf("unknown pid_finder %q", p.PidFinder)
}
// Initialize the running process cache
@ -133,6 +162,14 @@ func (p *Procstat) Init() error {
}
func (p *Procstat) Gather(acc telegraf.Accumulator) error {
if p.oldMode {
return p.gatherOld(acc)
}
return p.gatherNew(acc)
}
func (p *Procstat) gatherOld(acc telegraf.Accumulator) error {
now := time.Now()
results, err := p.findPids()
if err != nil {
@ -226,6 +263,100 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error {
return nil
}
func (p *Procstat) gatherNew(acc telegraf.Accumulator) error {
now := time.Now()
for _, f := range p.Filter {
groups, err := f.ApplyFilter()
if err != nil {
// Add lookup error-metric
acc.AddFields(
"procstat_lookup",
map[string]interface{}{
"pid_count": 0,
"running": 0,
"result_code": 1,
},
map[string]string{
"filter": f.Name,
"result": "lookup_error",
},
now,
)
acc.AddError(fmt.Errorf("applying filter %q failed: %w", f.Name, err))
continue
}
var count int
running := make(map[PID]bool)
for _, g := range groups {
count += len(g.processes)
for _, gp := range g.processes {
// Skip over non-running processes
if running, err := gp.IsRunning(); err != nil || !running {
continue
}
// Use the cached processes as we need the existing instances
// to compute delta-metrics (e.g. cpu-usage).
pid := PID(gp.Pid)
proc, found := p.processes[pid]
if !found {
// Assumption: if a process has no name, it probably does not exist
if name, _ := gp.Name(); name == "" {
continue
}
// We've found a process that was not recorded before so add it
// to the list of processes
tags := make(map[string]string, len(g.tags)+1)
for k, v := range g.tags {
tags[k] = v
}
if p.ProcessName != "" {
proc.SetTag("process_name", p.ProcessName)
}
tags["filter"] = f.Name
proc = &Proc{
Process: gp,
hasCPUTimes: false,
tags: tags,
}
p.processes[pid] = proc
}
running[pid] = true
m := proc.Metric(p.Prefix, p.tagging, p.solarisMode)
m.SetTime(now)
acc.AddMetric(m)
}
}
// Cleanup processes that are not running anymore
for pid := range p.processes {
if !running[pid] {
delete(p.processes, pid)
}
}
// Add lookup statistics-metric
acc.AddFields(
"procstat_lookup",
map[string]interface{}{
"pid_count": count,
"running": len(running),
"result_code": 0,
},
map[string]string{
"filter": f.Name,
"result": "success",
},
now,
)
}
return nil
}
// Get matching PIDs and their initial tags
func (p *Procstat) findPids() ([]PidsTags, error) {
switch {

View File

@ -48,3 +48,31 @@
## the native finder performs the search directly in a manor dependent on the
## platform. Default is 'pgrep'
# pid_finder = "pgrep"
## New-style filtering configuration (multiple filter sections are allowed)
# [[inputs.procstat.filter]]
# ## Name of the filter added as 'filter' tag
# name = "shell"
#
# ## Service filters, only one is allowed
# ## Systemd unit names (wildcards are supported)
# # systemd_units = []
# ## CGroup name or path (wildcards are supported)
# # cgroups = []
# ## Supervisor service names of hypervisorctl management
# # supervisor_units = []
# ## Windows service names
# # win_service = []
#
# ## Process filters, multiple are allowed
# ## Regular expressions to use for matching againt the full command
# # patterns = ['.*']
# ## List of users owning the process (wildcards are supported)
# # users = ['*']
# ## List of executable paths of the process (wildcards are supported)
# # executables = ['*']
# ## List of process names (wildcards are supported)
# # process_names = ['*']
# ## Recursion depth for determining children of the matched processes
# ## A negative value means all children with infinite depth
# # recursion_depth = 0

View File

@ -0,0 +1,151 @@
package procstat
import (
"bytes"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/shirou/gopsutil/v3/process"
)
func findByPidFiles(paths []string) ([]processGroup, error) {
groups := make([]processGroup, 0, len(paths))
for _, path := range paths {
buf, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read pidfile %q: %w", path, err)
}
pid, err := strconv.ParseInt(strings.TrimSpace(string(buf)), 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse PID in file %q: %w", path, err)
}
p, err := process.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of file %q: %w", pid, path, err)
}
groups = append(groups, processGroup{
processes: []*process.Process{p},
tags: map[string]string{"pidfile": path},
})
}
return groups, nil
}
func findByCgroups(cgroups []string) ([]processGroup, error) {
groups := make([]processGroup, 0, len(cgroups))
for _, cgroup := range cgroups {
path := cgroup
if !filepath.IsAbs(cgroup) {
path = filepath.Join("sys", "fs", "cgroup"+cgroup)
}
files, err := filepath.Glob(path)
if err != nil {
return nil, fmt.Errorf("failed to determin files for cgroup %q: %w", cgroup, err)
}
for _, fpath := range files {
if f, err := os.Stat(fpath); err != nil {
return nil, fmt.Errorf("accessing %q failed: %w", fpath, err)
} else if !f.IsDir() {
return nil, fmt.Errorf("%q is not a directory", fpath)
}
fn := filepath.Join(fpath, "cgroup.procs")
buf, err := os.ReadFile(fn)
if err != nil {
return nil, err
}
lines := bytes.Split(buf, []byte{'\n'})
procs := make([]*process.Process, 0, len(lines))
for _, l := range lines {
l := strings.TrimSpace(string(l))
if len(l) == 0 {
continue
}
pid, err := strconv.ParseInt(l, 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse PID %q in file %q", l, fpath)
}
p, err := process.NewProcess(int32(pid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of %q: %w", pid, fpath, err)
}
procs = append(procs, p)
}
groups = append(groups, processGroup{
processes: procs,
tags: map[string]string{"cgroup": cgroup, "cgroup_full": fpath}})
}
}
return groups, nil
}
func findBySupervisorUnits(units string) ([]processGroup, error) {
buf, err := execCommand("supervisorctl", "status", units, " ").Output()
if err != nil && !strings.Contains(err.Error(), "exit status 3") {
// Exit 3 means at least on process is in one of the "STOPPED" states
return nil, fmt.Errorf("failed to execute 'supervisorctl': %w", err)
}
lines := strings.Split(string(buf), "\n")
// Get the PID, running status, running time and boot time of the main process:
// pid 11779, uptime 17:41:16
// Exited too quickly (process log may have details)
groups := make([]processGroup, 0, len(lines))
for _, line := range lines {
if line == "" {
continue
}
kv := strings.Fields(line)
if len(kv) < 2 {
// Not a key-value pair
continue
}
name, status := kv[0], kv[1]
tags := map[string]string{
"supervisor_unit": name,
"status": status,
}
var procs []*process.Process
switch status {
case "FATAL", "EXITED", "BACKOFF", "STOPPING":
tags["error"] = strings.Join(kv[2:], " ")
case "RUNNING":
tags["uptimes"] = kv[5]
rawpid := strings.ReplaceAll(kv[3], ",", "")
grouppid, err := strconv.ParseInt(rawpid, 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse group PID %q: %w", rawpid, err)
}
p, err := process.NewProcess(int32(grouppid))
if err != nil {
return nil, fmt.Errorf("failed to find process for PID %d of unit %q: %w", grouppid, name, err)
}
// Get all children of the supervisor unit
procs, err = p.Children()
if err != nil {
return nil, fmt.Errorf("failed to get children for PID %d of unit %q: %w", grouppid, name, err)
}
tags["parent_pid"] = rawpid
case "STOPPED", "UNKNOWN", "STARTING":
// No additional info
}
groups = append(groups, processGroup{
processes: procs,
tags: tags,
})
}
return groups, nil
}