feat(inputs.procstat): Obtain process information through supervisor (#13417)

This commit is contained in:
chenbt 2023-11-14 07:11:31 +08:00 committed by GitHub
parent 59f53c0302
commit 2c5fbbc2a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 326 additions and 53 deletions

View File

@ -12,6 +12,7 @@ Processes can be selected for monitoring using one of several methods:
- user
- systemd_unit
- cgroup
- supervisor_unit
- win_service
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
@ -41,6 +42,8 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# include_systemd_children = false
## CGroup name or path, supports globs
# cgroup = "systemd/system.slice/nginx.service"
## Supervisor service names of hypervisorctl management
# supervisor_units = ["webserver", "proxy"]
## Windows service name
# win_service = ""
@ -78,6 +81,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
Preliminary support for Windows has been added, however you may prefer using
the `win_perf_counters` input plugin as a more mature alternative.
### Darwin specifics
If you use this plugin with `supervisor_units` *and* `pattern` on Darwin, you
**have to** use the `pgrep` finder as the underlying library relies on `pgrep`.
### Permissions
Some files or directories may require elevated permissions. As such a user may
@ -109,6 +117,7 @@ Below are an example set of tags and fields:
- systemd_unit (when defined)
- cgroup (when defined)
- cgroup_full (when cgroup or systemd_unit is used with glob)
- supervisor_unit (when defined)
- win_service (when defined)
- fields:
- child_major_faults (int)
@ -179,6 +188,7 @@ Below are an example set of tags and fields:
- user
- systemd_unit
- cgroup
- supervisor_unit
- win_service
- result
- fields:

View File

@ -80,6 +80,41 @@ func (pg *NativeFinder) FullPattern(pattern string) ([]PID, error) {
return pids, err
}
// ChildPattern matches children pids on the command line when the process was executed
func (pg *NativeFinder) ChildPattern(pattern string) ([]PID, error) {
regxPattern, err := regexp.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("compiling regexp failed: %w", err)
}
procs, err := process.Processes()
if err != nil {
return nil, fmt.Errorf("getting processes failed: %w", err)
}
var pids []PID
for _, p := range procs {
cmd, err := p.Cmdline()
if err != nil || !regxPattern.MatchString(cmd) {
continue
}
parent, err := process.NewProcess(p.Pid)
if err != nil {
return nil, fmt.Errorf("unable to get process %d: %w", p.Pid, err)
}
children, err := parent.Children()
if err != nil {
return nil, fmt.Errorf("unable to get children of process %d: %w", p.Pid, err)
}
for _, child := range children {
pids = append(pids, PID(child.Pid))
}
}
return pids, err
}
func (pg *NativeFinder) FastProcessList() ([]*process.Process, error) {
pids, err := process.Pids()
if err != nil {

View File

@ -1,29 +1,62 @@
package procstat
import (
"context"
"os"
"os/exec"
"runtime"
"testing"
"github.com/stretchr/testify/require"
)
func BenchmarkPattern(b *testing.B) {
f, err := NewNativeFinder()
finder, err := NewNativeFinder()
require.NoError(b, err)
for n := 0; n < b.N; n++ {
_, err := f.Pattern(".*")
if err != nil {
panic(err)
}
_, err = finder.Pattern(".*")
require.NoError(b, err)
}
}
func BenchmarkFullPattern(b *testing.B) {
f, err := NewNativeFinder()
finder, err := NewNativeFinder()
require.NoError(b, err)
for n := 0; n < b.N; n++ {
_, err := f.FullPattern(".*")
if err != nil {
panic(err)
}
_, err := finder.FullPattern(".*")
require.NoError(b, err)
}
}
func TestChildPattern(t *testing.T) {
if runtime.GOOS == "windows" || runtime.GOOS == "darwin" {
t.Skip("Skipping test on unsupported platform")
}
// Get our own process name
parentName, err := os.Executable()
require.NoError(t, err)
// Spawn two child processes and get their PIDs
expected := make([]PID, 0, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// First process
cmd1 := exec.CommandContext(ctx, "/bin/sh")
require.NoError(t, cmd1.Start(), "starting first command failed")
expected = append(expected, PID(cmd1.Process.Pid))
// Second process
cmd2 := exec.CommandContext(ctx, "/bin/sh")
require.NoError(t, cmd2.Start(), "starting first command failed")
expected = append(expected, PID(cmd2.Process.Pid))
// Use the plugin to find the children
finder, err := NewNativeFinder()
require.NoError(t, err)
childs, err := finder.ChildPattern(parentName)
require.NoError(t, err)
require.ElementsMatch(t, expected, childs)
}

View File

@ -1,7 +1,6 @@
package procstat
import (
"fmt"
"os/user"
"testing"
@ -16,7 +15,6 @@ func TestGather_RealPatternIntegration(t *testing.T) {
require.NoError(t, err)
pids, err := pg.Pattern(`procstat`)
require.NoError(t, err)
fmt.Println(pids)
require.NotEmpty(t, pids)
}
@ -26,9 +24,9 @@ func TestGather_RealFullPatternIntegration(t *testing.T) {
}
pg, err := NewNativeFinder()
require.NoError(t, err)
pids, err := pg.FullPattern(`%procstat%`)
require.NoError(t, err)
fmt.Println(pids)
require.NotEmpty(t, pids)
}
@ -42,6 +40,5 @@ func TestGather_RealUserIntegration(t *testing.T) {
require.NoError(t, err)
pids, err := pg.UID(currentUser.Username)
require.NoError(t, err)
fmt.Println(pids)
require.NotEmpty(t, pids)
}

View File

@ -53,6 +53,32 @@ func (pg *Pgrep) FullPattern(pattern string) ([]PID, error) {
return find(pg.path, args)
}
func (pg *Pgrep) ChildPattern(pattern string) ([]PID, error) {
args := []string{"-P", pattern}
out, err := run(pg.path, args)
if err != nil {
return nil, err
}
pids := []PID{}
pid, err := strconv.ParseInt(pattern, 10, 32)
if err != nil {
return nil, err
}
pids = append(pids, PID(pid))
fields := strings.Fields(out)
for _, field := range fields {
pid, err := strconv.ParseInt(field, 10, 32)
if err != nil {
return pids, err
}
pids = append(pids, PID(pid))
}
return pids, nil
}
func find(path string, args []string) ([]PID, error) {
out, err := run(path, args)
if err != nil {

View File

@ -37,6 +37,7 @@ type PIDFinder interface {
Pattern(pattern string) ([]PID, error)
UID(user string) ([]PID, error)
FullPattern(path string) ([]PID, error)
ChildPattern(path string) ([]PID, error)
}
type Proc struct {

View File

@ -4,6 +4,7 @@ package procstat
import (
"bytes"
_ "embed"
"errors"
"fmt"
"os"
"os/exec"
@ -22,11 +23,6 @@ import (
//go:embed sample.conf
var sampleConfig string
var (
defaultPIDFinder = NewPgrep
defaultProcess = NewProc
)
type PID int32
type Procstat struct {
@ -38,9 +34,10 @@ type Procstat struct {
CmdLineTag bool `toml:"cmdline_tag"`
ProcessName string
User string
SystemdUnit string `toml:"systemd_unit"`
IncludeSystemdChildren bool `toml:"include_systemd_children"`
CGroup string `toml:"cgroup"`
SystemdUnits string `toml:"systemd_units"`
SupervisorUnit []string `toml:"supervisor_unit"`
IncludeSystemdChildren bool `toml:"include_systemd_children"`
CGroup string `toml:"cgroup"`
PidTag bool
WinService string `toml:"win_service"`
Mode string
@ -64,28 +61,43 @@ func (*Procstat) SampleConfig() string {
return sampleConfig
}
func (p *Procstat) Gather(acc telegraf.Accumulator) error {
if p.createPIDFinder == nil {
switch p.PidFinder {
case "native":
p.createPIDFinder = NewNativeFinder
case "pgrep":
p.createPIDFinder = NewPgrep
default:
p.PidFinder = "pgrep"
p.createPIDFinder = defaultPIDFinder
}
}
if p.createProcess == nil {
p.createProcess = defaultProcess
func (p *Procstat) Init() error {
if strings.ToLower(p.Mode) == "solaris" {
p.solarisMode = true
}
switch p.PidFinder {
case "":
p.PidFinder = "pgrep"
p.createPIDFinder = NewPgrep
case "native":
p.createPIDFinder = NewNativeFinder
case "pgrep":
p.createPIDFinder = NewPgrep
default:
return fmt.Errorf("unknown pid_finder %q", p.PidFinder)
}
// gopsutil relies on pgrep when looking up children on darwin
// see https://github.com/shirou/gopsutil/blob/v3.23.10/process/process_darwin.go#L235
requiresChildren := len(p.SupervisorUnit) > 0 && p.Pattern != ""
if requiresChildren && p.PidFinder == "native" && runtime.GOOS == "darwin" {
return errors.New("configuration requires the 'pgrep' finder on you OS")
}
return nil
}
func (p *Procstat) Gather(acc telegraf.Accumulator) error {
pidCount := 0
now := time.Now()
newProcs := make(map[PID]Process, len(p.procs))
tags := make(map[string]string)
pidTags := p.findPids()
for _, pidTag := range pidTags {
if len(pidTag.PIDS) < 1 && len(p.SupervisorUnit) > 0 {
continue
}
pids := pidTag.PIDS
err := pidTag.Err
pidCount += len(pids)
@ -120,6 +132,9 @@ func (p *Procstat) Gather(acc telegraf.Accumulator) error {
tags["pid_finder"] = p.PidFinder
tags["result"] = "success"
if len(p.SupervisorUnit) > 0 {
tags["supervisor_unit"] = strings.Join(p.SupervisorUnit, ";")
}
acc.AddFields("procstat_lookup", fields, tags, now)
return nil
@ -338,7 +353,51 @@ func (p *Procstat) getPIDFinder() (PIDFinder, error) {
func (p *Procstat) findPids() []PidsTags {
var pidTags []PidsTags
if p.SystemdUnit != "" {
if len(p.SupervisorUnit) > 0 {
groups, groupsTags, err := p.supervisorPIDs()
if err != nil {
pidTags = append(pidTags, PidsTags{nil, nil, err})
return pidTags
}
// According to the PID, find the system process number and use pgrep to filter to get the number of child processes
for _, group := range groups {
f, err := p.getPIDFinder()
if err != nil {
pidTags = append(pidTags, PidsTags{nil, nil, err})
return pidTags
}
p.Pattern = groupsTags[group]["pid"]
if p.Pattern == "" {
pidTags = append(pidTags, PidsTags{nil, groupsTags[group], err})
return pidTags
}
pids, tags, err := p.SimpleFindPids(f)
if err != nil {
pidTags = append(pidTags, PidsTags{nil, nil, err})
return pidTags
}
// Handle situations where the PID does not exist
if len(pids) == 0 {
pidTags = append(pidTags, PidsTags{nil, groupsTags[group], err})
continue
}
stats := groupsTags[group]
// Merge tags map
for k, v := range stats {
_, ok := tags[k]
if !ok {
tags[k] = v
}
}
// Remove duplicate pid tags
delete(tags, "pid")
pidTags = append(pidTags, PidsTags{pids, tags, err})
}
return pidTags
} else if p.SystemdUnits != "" {
groups := p.systemdUnitPIDs()
return groups
} else if p.CGroup != "" {
@ -369,6 +428,9 @@ func (p *Procstat) SimpleFindPids(f PIDFinder) ([]PID, map[string]string, error)
} else if p.Exe != "" {
pids, err = f.Pattern(p.Exe)
tags = map[string]string{"exe": p.Exe}
} else if len(p.SupervisorUnit) > 0 && p.Pattern != "" {
pids, err = f.ChildPattern(p.Pattern)
tags = map[string]string{"pattern": p.Pattern, "parent_pid": p.Pattern}
} else if p.Pattern != "" {
pids, err = f.FullPattern(p.Pattern)
tags = map[string]string{"pattern": p.Pattern}
@ -388,22 +450,65 @@ func (p *Procstat) SimpleFindPids(f PIDFinder) ([]PID, map[string]string, error)
// execCommand is so tests can mock out exec.Command usage.
var execCommand = exec.Command
func (p *Procstat) supervisorPIDs() ([]string, map[string]map[string]string, error) {
out, err := execCommand("supervisorctl", "status", strings.Join(p.SupervisorUnit, " ")).Output()
if err != nil {
if !strings.Contains(err.Error(), "exit status 3") {
return nil, nil, err
}
}
lines := strings.Split(string(out), "\n")
// Get the PID, running status, running time and boot time of the main process:
// pid 11779, uptime 17:41:16
// Exited too quickly (process log may have details)
mainPids := make(map[string]map[string]string)
for _, line := range lines {
if line == "" {
continue
}
kv := strings.Fields(line)
if len(kv) < 2 {
// Not a key-value pair
continue
}
name := kv[0]
statusMap := map[string]string{
"supervisor_unit": name,
"status": kv[1],
}
switch kv[1] {
case "FATAL", "EXITED", "BACKOFF", "STOPPING":
statusMap["error"] = strings.Join(kv[2:], " ")
case "RUNNING":
statusMap["pid"] = strings.ReplaceAll(kv[3], ",", "")
statusMap["uptimes"] = kv[5]
case "STOPPED", "UNKNOWN", "STARTING":
// No additional info
}
mainPids[name] = statusMap
}
return p.SupervisorUnit, mainPids, nil
}
func (p *Procstat) systemdUnitPIDs() []PidsTags {
if p.IncludeSystemdChildren {
p.CGroup = fmt.Sprintf("systemd/system.slice/%s", p.SystemdUnit)
p.CGroup = fmt.Sprintf("systemd/system.slice/%s", p.SystemdUnits)
return p.cgroupPIDs()
}
var pidTags []PidsTags
pids, err := p.simpleSystemdUnitPIDs()
tags := map[string]string{"systemd_unit": p.SystemdUnit}
tags := map[string]string{"systemd_unit": p.SystemdUnits}
pidTags = append(pidTags, PidsTags{pids, tags, err})
return pidTags
}
func (p *Procstat) simpleSystemdUnitPIDs() ([]PID, error) {
out, err := execCommand("systemctl", "show", p.SystemdUnit).Output()
out, err := execCommand("systemctl", "show", p.SystemdUnits).Output()
if err != nil {
return nil, err
}
@ -503,16 +608,8 @@ func (p *Procstat) winServicePIDs() ([]PID, error) {
return pids, nil
}
func (p *Procstat) Init() error {
if strings.ToLower(p.Mode) == "solaris" {
p.solarisMode = true
}
return nil
}
func init() {
inputs.Add("procstat", func() telegraf.Input {
return &Procstat{}
return &Procstat{createProcess: NewProc}
})
}

View File

@ -54,6 +54,20 @@ ExecMainPID=11408
os.Exit(0)
}
if cmdline == "supervisorctl status TestGather_supervisorUnitPIDs" {
fmt.Printf(`TestGather_supervisorUnitPIDs RUNNING pid 7311, uptime 0:00:19
`)
//nolint:revive // error code is important for this "test"
os.Exit(0)
}
if cmdline == "supervisorctl status TestGather_STARTINGsupervisorUnitPIDs TestGather_FATALsupervisorUnitPIDs" {
fmt.Printf(`TestGather_FATALsupervisorUnitPIDs FATAL Exited too quickly (process log may have details)
TestGather_STARTINGsupervisorUnitPIDs STARTING`)
//nolint:revive // error code is important for this "test"
os.Exit(0)
}
fmt.Printf("command not found\n")
//nolint:revive // error code is important for this "test"
os.Exit(1)
@ -93,6 +107,11 @@ func (pg *testPgrep) FullPattern(_ string) ([]PID, error) {
return pg.pids, pg.err
}
func (pg *testPgrep) ChildPattern(_ string) ([]PID, error) {
pids := []PID{7311, 8111, 8112}
return pids, pg.err
}
type testProc struct {
pid PID
tags map[string]string
@ -180,6 +199,19 @@ func (p *testProc) Status() ([]string, error) {
var pid = PID(42)
var exe = "foo"
func TestInitRequiresChildDarwin(t *testing.T) {
if runtime.GOOS != "darwin" {
t.Skip("Skipping test on non-darwin platform")
}
p := Procstat{
Pattern: "somepattern",
SupervisorUnit: []string{"a_unit"},
PidFinder: "native",
}
require.ErrorContains(t, p.Init(), "requires the 'pgrep' finder")
}
func TestGather_CreateProcessErrorOk(t *testing.T) {
var acc testutil.Accumulator
@ -374,7 +406,7 @@ func TestGather_PercentSecondPass(t *testing.T) {
func TestGather_systemdUnitPIDs(t *testing.T) {
p := Procstat{
createPIDFinder: pidFinder([]PID{}),
SystemdUnit: "TestGather_systemdUnitPIDs",
SystemdUnits: "TestGather_systemdUnitPIDs",
}
pidsTags := p.findPids()
for _, pidsTag := range pidsTags {
@ -414,11 +446,11 @@ func TestGather_cgroupPIDs(t *testing.T) {
func TestProcstatLookupMetric(t *testing.T) {
p := Procstat{
createPIDFinder: pidFinder([]PID{543}),
createProcess: NewProc,
Exe: "-Gsys",
}
var acc testutil.Accumulator
err := acc.GatherError(p.Gather)
require.NoError(t, err)
require.NoError(t, acc.GatherError(p.Gather))
require.Len(t, acc.Metrics, len(p.procs)+1)
}
@ -438,3 +470,43 @@ func TestGather_SameTimestamps(t *testing.T) {
require.Equal(t, procstat.Time, procstatLookup.Time)
}
func TestGather_supervisorUnitPIDs(t *testing.T) {
p := Procstat{
createPIDFinder: pidFinder([]PID{}),
SupervisorUnit: []string{"TestGather_supervisorUnitPIDs"},
}
pidsTags := p.findPids()
for _, pidsTag := range pidsTags {
pids := pidsTag.PIDS
tags := pidsTag.Tags
err := pidsTag.Err
require.NoError(t, err)
require.Equal(t, []PID{7311, 8111, 8112}, pids)
require.Equal(t, "TestGather_supervisorUnitPIDs", tags["supervisor_unit"])
}
}
func TestGather_MoresupervisorUnitPIDs(t *testing.T) {
p := Procstat{
createPIDFinder: pidFinder([]PID{}),
Pattern: "7311",
SupervisorUnit: []string{"TestGather_STARTINGsupervisorUnitPIDs", "TestGather_FATALsupervisorUnitPIDs"},
}
pidsTags := p.findPids()
for _, pidsTag := range pidsTags {
pids := pidsTag.PIDS
tags := pidsTag.Tags
err := pidsTag.Err
require.Empty(t, pids)
require.Contains(t, []string{"TestGather_STARTINGsupervisorUnitPIDs", "TestGather_FATALsupervisorUnitPIDs"}, tags["supervisor_unit"])
if tags["supervisor_unit"] == "TestGather_STARTINGsupervisorUnitPIDs" {
require.Equal(t, "STARTING", tags["status"])
require.NoError(t, err)
} else if tags["supervisor_unit"] == "TestGather_FATALsupervisorUnitPIDs" {
require.Equal(t, "FATAL", tags["status"])
require.NoError(t, err)
require.Equal(t, "Exited too quickly (process log may have details)", tags["error"])
}
}
}

View File

@ -13,6 +13,8 @@
# include_systemd_children = false
## CGroup name or path, supports globs
# cgroup = "systemd/system.slice/nginx.service"
## Supervisor service names of hypervisorctl management
# supervisor_units = ["webserver", "proxy"]
## Windows service name
# win_service = ""