feat: Supervisord input plugin (#9015)

This commit is contained in:
niasar 2022-08-08 21:25:26 +03:00 committed by GitHub
parent 401d2b2a28
commit b80d34a422
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 495 additions and 0 deletions

View File

@ -188,6 +188,7 @@ following works:
- github.com/karrick/godirwalk [BSD 2-Clause "Simplified" License](https://github.com/karrick/godirwalk/blob/master/LICENSE)
- github.com/kballard/go-shellquote [MIT License](https://github.com/kballard/go-shellquote/blob/master/LICENSE)
- github.com/klauspost/compress [BSD 3-Clause Clear License](https://github.com/klauspost/compress/blob/master/LICENSE)
- github.com/kolo/xmlrpc [MIT License](https://github.com/kolo/xmlrpc/blob/master/LICENSE)
- github.com/kylelemons/godebug [Apache License 2.0](https://github.com/kylelemons/godebug/blob/master/LICENSE)
- github.com/leodido/ragel-machinery [MIT License](https://github.com/leodido/ragel-machinery/blob/develop/LICENSE)
- github.com/magiconair/properties [BSD 2-Clause "Simplified" License](https://github.com/magiconair/properties/blob/main/LICENSE.md)

1
go.mod
View File

@ -302,6 +302,7 @@ require (
github.com/juju/webbrowser v1.0.0 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/klauspost/compress v1.15.8 // indirect
github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b
github.com/kr/fs v0.1.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect

3
go.sum
View File

@ -1524,6 +1524,8 @@ github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPR
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/pgzip v1.2.4/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b h1:iNjcivnc6lhbvJA3LD622NPrUponluJrBWPIwGG/3Bg=
github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@ -3339,6 +3341,7 @@ k8s.io/cri-api v0.20.6/go.mod h1:ew44AjNXwyn1s0U4xCKGodU7J1HzBeZ1MpGrpa5r8Yc=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=

View File

@ -187,6 +187,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/stackdriver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/supervisor"
_ "github.com/influxdata/telegraf/plugins/inputs/suricata"
_ "github.com/influxdata/telegraf/plugins/inputs/swap"
_ "github.com/influxdata/telegraf/plugins/inputs/synproxy"

View File

@ -0,0 +1,101 @@
# Supervisor Input Plugin
This plugin gathers information about processes that
running under supervisor using XML-RPC API.
Minimum tested version of supervisor: 3.3.2
## Supervisor configuration
This plugin needs an HTTP server to be enabled in supervisor,
also it's recommended to enable basic authentication on the
HTTP server. When using basic authentication make sure to
include the username and password in the plugin's url setting.
Here is an example of the `inet_http_server` section in supervisor's
config that will work with default plugin configuration:
```ini
[inet_http_server]
port = 127.0.0.1:9001
username = user
password = pass
```
## Configuration
```toml
[inputs.supervisor]
## Url of supervisor's XML-RPC endpoint if basic auth enabled in supervisor http server,
## than you have to add credentials to url (ex. http://login:pass@localhost:9001/RPC2)
# url="http://localhost:9001/RPC2"
## With settings below you can manage gathering additional information about processes
## If both of them empty, then all additional information will be collected.
## Currently supported supported additional metrics are: pid, rc
# metrics_include = []
# metrics_exclude = ["pid", "rc"]
```
### Optional metrics
You can control gathering of some supervisor's metrics (processes PIDs
and exit codes) by setting metrics_include and metrics_exclude parameters
in configuration file.
### Server tag
Server tag is used to identify metrics source server. You have an option
to use host:port pair of supervisor's http endpoint by default or you
can use supervisor's identification string, which is set in supervisor's
configuration file.
## Metrics
- supervisor_processes
- Tags:
- source (Hostname or IP address of supervisor's instance)
- port (Port number of supervisor's HTTP server)
- id (Supervisor's identification string)
- name (Process name)
- group (Process group)
- Fields:
- state (int, see reference)
- uptime (int, seconds)
- pid (int, optional)
- exitCode (int, optional)
- supervisor_instance
- Tags:
- source (Hostname or IP address of supervisor's instance)
- port (Port number of supervisor's HTTP server)
- id (Supervisor's identification string)
- Fields:
- state (int, see reference)
### Supervisor process state field reference table
|Statecode|Statename| Description |
|--------|----------|--------------------------------------------------------------------------------------------------------|
| 0 | STOPPED | The process has been stopped due to a stop request or has never been started. |
| 10 | STARTING | The process is starting due to a start request. |
| 20 | RUNNING | The process is running. |
| 30 | BACKOFF |The process entered the STARTING state but subsequently exited too quickly to move to the RUNNING state.|
| 40 | STOPPING | The process is stopping due to a stop request. |
| 100 | EXITED | The process exited from the RUNNING state (expectedly or unexpectedly). |
| 200 | FATAL | The process could not be started successfully. |
| 1000 | UNKNOWN | The process is in an unknown state (supervisord programming error). |
### Supervisor instance state field reference
|Statecode| Statename | Description |
|---------|------------|----------------------------------------------|
| 2 | FATAL | Supervisor has experienced a serious error. |
| 1 | RUNNING | Supervisor is working normally. |
| 0 | RESTARTING | Supervisor is in the process of restarting. |
| -1 | SHUTDOWN |Supervisor is in the process of shutting down.|
## Example Output
```shell
supervisor_processes,group=ExampleGroup,id=supervisor,port=9001,process=ExampleProcess,source=localhost state=20i,uptime=75958i 1659786637000000000
supervisor_instance,id=supervisor,port=9001,source=localhost state=1i 1659786637000000000
```

View File

@ -0,0 +1,10 @@
# Gathers information about processes that running under supervisor using XML-RPC API
[[inputs.supervisor]]
## Url of supervisor's XML-RPC endpoint if basic auth enabled in supervisor http server,
## than you have to add credentials to url (ex. http://login:pass@localhost:9001/RPC2)
# url="http://localhost:9001/RPC2"
## With settings below you can manage gathering additional information about processes
## If both of them empty, then all additional information will be collected.
## Currently supported supported additional metrics are: pid, rc
# metrics_include = []
# metrics_exclude = ["pid", "rc"]

View File

@ -0,0 +1,180 @@
package supervisor
import (
_ "embed"
"fmt"
"net/url"
"strings"
"github.com/kolo/xmlrpc"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/inputs"
)
type Supervisor struct {
Server string `toml:"url"`
MetricsInc []string `toml:"metrics_include"`
MetricsExc []string `toml:"metrics_exclude"`
Log telegraf.Logger `toml:"-"`
rpcClient *xmlrpc.Client
fieldFilter filter.Filter
}
type processInfo struct {
Name string `xmlrpc:"name"`
Group string `xmlrpc:"group"`
Description string `xmlrpc:"description"`
Start int32 `xmlrpc:"start"`
Stop int32 `xmlrpc:"stop"`
Now int32 `xmlrpc:"now"`
State int16 `xmlrpc:"state"`
Statename string `xmlrpc:"statename"`
StdoutLogfile string `xmlrpc:"stdout_logfile"`
StderrLogfile string `xmlrpc:"stderr_logfile"`
SpawnErr string `xmlrpc:"spawnerr"`
ExitStatus int8 `xmlrpc:"exitstatus"`
Pid int32 `xmlrpc:"pid"`
}
type supervisorInfo struct {
StateCode int8 `xmlrpc:"statecode"`
StateName string `xmlrpc:"statename"`
Ident string
}
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
//go:embed sample.conf
var sampleConfig string
func (s *Supervisor) Description() string {
return "Gather info about processes state, that running under supervisor using its XML-RPC API"
}
func (s *Supervisor) SampleConfig() string {
return sampleConfig
}
func (s *Supervisor) Gather(acc telegraf.Accumulator) error {
// API call to get information about all running processes
var rawProcessData []processInfo
err := s.rpcClient.Call("supervisor.getAllProcessInfo", nil, &rawProcessData)
if err != nil {
return fmt.Errorf("failed to get processes info: %v", err)
}
// API call to get information about instance status
var status supervisorInfo
err = s.rpcClient.Call("supervisor.getState", nil, &status)
if err != nil {
return fmt.Errorf("failed to get processes info: %v", err)
}
// API call to get identification string
err = s.rpcClient.Call("supervisor.getIdentification", nil, &status.Ident)
if err != nil {
return fmt.Errorf("failed to get instance identification: %v", err)
}
// Iterating through array of structs with processes info and adding fields to accumulator
for _, process := range rawProcessData {
processTags, processFields, err := s.parseProcessData(process, status)
if err != nil {
acc.AddError(err)
continue
}
acc.AddFields("supervisor_processes", processFields, processTags)
}
// Adding instance info fields to accumulator
instanceTags, instanceFields, err := s.parseInstanceData(status)
if err != nil {
return fmt.Errorf("failed to parse instance data: %v", err)
}
acc.AddFields("supervisor_instance", instanceFields, instanceTags)
return nil
}
func (s *Supervisor) parseProcessData(pInfo processInfo, status supervisorInfo) (map[string]string, map[string]interface{}, error) {
tags := map[string]string{
"process": pInfo.Name,
"group": pInfo.Group,
}
fields := map[string]interface{}{
"uptime": pInfo.Now - pInfo.Start,
"state": pInfo.State,
}
if s.fieldFilter.Match("pid") {
fields["pid"] = pInfo.Pid
}
if s.fieldFilter.Match("rc") {
fields["exitCode"] = pInfo.ExitStatus
}
splittedURL, err := beautifyServerString(s.Server)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse server string: %v", err)
}
tags["id"] = status.Ident
tags["source"] = splittedURL[0]
tags["port"] = splittedURL[1]
return tags, fields, nil
}
// Parsing of supervisor instance data
func (s *Supervisor) parseInstanceData(status supervisorInfo) (map[string]string, map[string]interface{}, error) {
splittedURL, err := beautifyServerString(s.Server)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse server string: %v", err)
}
tags := map[string]string{}
tags["id"] = status.Ident
tags["source"] = splittedURL[0]
tags["port"] = splittedURL[1]
fields := map[string]interface{}{"state": status.StateCode}
return tags, fields, nil
}
func (s *Supervisor) Init() error {
// Using default server URL if none was specified in config
if s.Server == "" {
s.Server = "http://localhost:9001/RPC2"
}
var err error
// Initializing XML-RPC client
s.rpcClient, err = xmlrpc.NewClient(s.Server, nil)
if err != nil {
return fmt.Errorf("XML-RPC client initialization failed: %v", err)
}
// Setting filter for additional metrics
s.fieldFilter, err = filter.NewIncludeExcludeFilter(s.MetricsInc, s.MetricsExc)
if err != nil {
return fmt.Errorf("metrics filter setup failed: %v", err)
}
return nil
}
func init() {
inputs.Add("supervisor", func() telegraf.Input {
return &Supervisor{
MetricsExc: []string{"pid", "rc"},
}
})
}
// Function to get only address and port from URL
func beautifyServerString(rawurl string) ([]string, error) {
parsedURL, err := url.Parse(rawurl)
splittedURL := strings.Split(parsedURL.Host, ":")
if err != nil {
return nil, err
}
if len(splittedURL) < 2 {
if parsedURL.Scheme == "https" {
splittedURL[1] = "443"
} else {
splittedURL[1] = "80"
}
}
return splittedURL, nil
}

View File

@ -0,0 +1,176 @@
package supervisor
import (
"path/filepath"
"testing"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
)
func TestShort_SampleData(t *testing.T) {
testCases := []struct {
desc string
supervisorData supervisorInfo
sampleProcInfo []processInfo
expProcessFields []map[string]interface{}
expProcessTags []map[string]string
expInstanceFields map[string]interface{}
expInstancesTags map[string]string
}{
{
desc: "Case 1",
sampleProcInfo: []processInfo{
{
Name: "Process0",
Group: "ProcessGroup0",
Description: "pid 112 uptime 0:12:11",
Start: 1615632853,
Stop: 0,
Now: 1615632853 + 731,
State: 20,
Statename: "RUNNING",
StdoutLogfile: "/var/log/supervisor/process0-stdout.log",
StderrLogfile: "/var/log/supervisor/process0-stdout.log",
SpawnErr: "",
ExitStatus: 0,
Pid: 112,
},
{
Name: "Process1",
Group: "ProcessGroup1",
Description: "pid 113 uptime 0:12:11",
Start: 1615632853,
Stop: 0,
Now: 1615632853 + 731,
State: 20,
Statename: "RUNNING",
StdoutLogfile: "/var/log/supervisor/process1-stdout.log",
StderrLogfile: "/var/log/supervisor/process1-stderr.log",
SpawnErr: "",
ExitStatus: 0,
Pid: 113,
},
},
supervisorData: supervisorInfo{
StateCode: int8(1),
StateName: "RUNNING",
Ident: "supervisor",
},
expProcessFields: []map[string]interface{}{
{
"uptime": int32(731),
"state": int16(20),
"pid": int32(112),
"exitCode": int8(0),
},
{
"uptime": int32(731),
"state": int16(20),
"pid": int32(113),
"exitCode": int8(0),
},
},
expProcessTags: []map[string]string{
{
"process": "Process0",
"group": "ProcessGroup0",
"source": "example.org",
"port": "9001",
"id": "supervisor",
},
{
"process": "Process1",
"group": "ProcessGroup1",
"source": "example.org",
"port": "9001",
"id": "supervisor",
},
},
expInstanceFields: map[string]interface{}{
"state": int8(1),
},
expInstancesTags: map[string]string{
"source": "example.org",
"port": "9001",
"id": "supervisor",
},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
s := &Supervisor{
Server: "http://example.org:9001/RPC2",
MetricsInc: []string{},
MetricsExc: []string{},
}
status := supervisorInfo{
StateCode: tC.supervisorData.StateCode,
StateName: tC.supervisorData.StateName,
Ident: tC.supervisorData.Ident,
}
err := s.Init()
if err != nil {
t.Errorf("failed to run Init function: %v", err)
}
for k, v := range tC.sampleProcInfo {
processTags, processFields, err := s.parseProcessData(v, status)
require.NoError(t, err)
require.Equal(t, tC.expProcessFields[k], processFields)
require.Equal(t, tC.expProcessTags[k], processTags)
}
instanceTags, instanceFields, err := s.parseInstanceData(status)
require.NoError(t, err)
require.Equal(t, tC.expInstancesTags, instanceTags)
require.Equal(t, tC.expInstanceFields, instanceFields)
})
}
}
func TestIntegration_BasicGathering(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
const supervisorPort = "9001"
supervisorConfig, err := filepath.Abs("testdata/supervisord.conf")
require.NoError(t, err, "Failed to get absolute path of supervisord config")
ctr := testutil.Container{
Image: "niasar/supervisor:stretch-3.3",
ExposedPorts: []string{supervisorPort},
BindMounts: map[string]string{
"/etc/supervisor/supervisord.conf": supervisorConfig,
},
WaitingFor: wait.ForAll(
wait.ForLog("supervisord started with pid"),
wait.ForListeningPort(nat.Port(supervisorPort)),
),
}
err = ctr.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, ctr.Terminate(), "terminating container failed")
}()
s := &Supervisor{
Server: "http://login:pass@" + testutil.GetLocalHost() + ":" + ctr.Ports[supervisorPort] + "/RPC2",
MetricsInc: []string{},
MetricsExc: []string{},
}
err = s.Init()
require.NoError(t, err, "failed to run Init function")
var acc testutil.Accumulator
err = acc.GatherError(s.Gather)
require.NoError(t, err)
require.Equal(t, acc.HasField("supervisor_processes", "uptime"), true)
require.Equal(t, acc.HasField("supervisor_processes", "state"), true)
require.Equal(t, acc.HasField("supervisor_processes", "pid"), true)
require.Equal(t, acc.HasField("supervisor_processes", "exitCode"), true)
require.Equal(t, acc.HasField("supervisor_instance", "state"), true)
require.Equal(t, acc.HasTag("supervisor_processes", "id"), true)
require.Equal(t, acc.HasTag("supervisor_processes", "source"), true)
require.Equal(t, acc.HasTag("supervisor_processes", "port"), true)
require.Equal(t, acc.HasTag("supervisor_instance", "id"), true)
require.Equal(t, acc.HasTag("supervisor_instance", "source"), true)
require.Equal(t, acc.HasTag("supervisor_instance", "port"), true)
}

View File

@ -0,0 +1,22 @@
[inet_http_server]
port = 0.0.0.0:9001
username = login
password = pass
[supervisord]
logfile=/dev/stdout
pidfile=/var/run/supervisord.pid
logfile_maxbytes = 0
nodaemon = true
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=http://localhost:9001
[program:sleep]
process_name = %(program_name)s-%(process_num)s
command=/bin/sleep infinity
numprocs=3
autorestart=true