diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index a5b0fc449..ec960d729 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -188,6 +188,7 @@ following works: - github.com/karrick/godirwalk [BSD 2-Clause "Simplified" License](https://github.com/karrick/godirwalk/blob/master/LICENSE) - github.com/kballard/go-shellquote [MIT License](https://github.com/kballard/go-shellquote/blob/master/LICENSE) - github.com/klauspost/compress [BSD 3-Clause Clear License](https://github.com/klauspost/compress/blob/master/LICENSE) +- github.com/kolo/xmlrpc [MIT License](https://github.com/kolo/xmlrpc/blob/master/LICENSE) - github.com/kylelemons/godebug [Apache License 2.0](https://github.com/kylelemons/godebug/blob/master/LICENSE) - github.com/leodido/ragel-machinery [MIT License](https://github.com/leodido/ragel-machinery/blob/develop/LICENSE) - github.com/magiconair/properties [BSD 2-Clause "Simplified" License](https://github.com/magiconair/properties/blob/main/LICENSE.md) diff --git a/go.mod b/go.mod index 331b07e09..f00fdd5f5 100644 --- a/go.mod +++ b/go.mod @@ -302,6 +302,7 @@ require ( github.com/juju/webbrowser v1.0.0 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/klauspost/compress v1.15.8 // indirect + github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b github.com/kr/fs v0.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect diff --git a/go.sum b/go.sum index e555b27c3..2cb4af6dc 100644 --- a/go.sum +++ b/go.sum @@ -1524,6 +1524,8 @@ github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPR github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.4/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= +github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b h1:iNjcivnc6lhbvJA3LD622NPrUponluJrBWPIwGG/3Bg= +github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -3339,6 +3341,7 @@ k8s.io/cri-api v0.20.6/go.mod h1:ew44AjNXwyn1s0U4xCKGodU7J1HzBeZ1MpGrpa5r8Yc= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= +k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index a801e1ad0..2c99204be 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -187,6 +187,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/sqlserver" _ "github.com/influxdata/telegraf/plugins/inputs/stackdriver" _ "github.com/influxdata/telegraf/plugins/inputs/statsd" + _ "github.com/influxdata/telegraf/plugins/inputs/supervisor" _ "github.com/influxdata/telegraf/plugins/inputs/suricata" _ "github.com/influxdata/telegraf/plugins/inputs/swap" _ "github.com/influxdata/telegraf/plugins/inputs/synproxy" diff --git a/plugins/inputs/supervisor/README.md b/plugins/inputs/supervisor/README.md new file mode 100644 index 000000000..5ab9d3091 --- /dev/null +++ b/plugins/inputs/supervisor/README.md @@ -0,0 +1,101 @@ +# Supervisor Input Plugin + +This plugin gathers information about processes that +running under supervisor using XML-RPC API. + +Minimum tested version of supervisor: 3.3.2 + +## Supervisor configuration + +This plugin needs an HTTP server to be enabled in supervisor, +also it's recommended to enable basic authentication on the +HTTP server. When using basic authentication make sure to +include the username and password in the plugin's url setting. +Here is an example of the `inet_http_server` section in supervisor's +config that will work with default plugin configuration: + +```ini +[inet_http_server] +port = 127.0.0.1:9001 +username = user +password = pass +``` + +## Configuration + +```toml +[inputs.supervisor] + ## Url of supervisor's XML-RPC endpoint if basic auth enabled in supervisor http server, + ## than you have to add credentials to url (ex. http://login:pass@localhost:9001/RPC2) + # url="http://localhost:9001/RPC2" + ## With settings below you can manage gathering additional information about processes + ## If both of them empty, then all additional information will be collected. + ## Currently supported supported additional metrics are: pid, rc + # metrics_include = [] + # metrics_exclude = ["pid", "rc"] +``` + +### Optional metrics + +You can control gathering of some supervisor's metrics (processes PIDs +and exit codes) by setting metrics_include and metrics_exclude parameters +in configuration file. + +### Server tag + +Server tag is used to identify metrics source server. You have an option +to use host:port pair of supervisor's http endpoint by default or you +can use supervisor's identification string, which is set in supervisor's +configuration file. + +## Metrics + +- supervisor_processes + - Tags: + - source (Hostname or IP address of supervisor's instance) + - port (Port number of supervisor's HTTP server) + - id (Supervisor's identification string) + - name (Process name) + - group (Process group) + - Fields: + - state (int, see reference) + - uptime (int, seconds) + - pid (int, optional) + - exitCode (int, optional) + +- supervisor_instance + - Tags: + - source (Hostname or IP address of supervisor's instance) + - port (Port number of supervisor's HTTP server) + - id (Supervisor's identification string) + - Fields: + - state (int, see reference) + +### Supervisor process state field reference table + +|Statecode|Statename| Description | +|--------|----------|--------------------------------------------------------------------------------------------------------| +| 0 | STOPPED | The process has been stopped due to a stop request or has never been started. | +| 10 | STARTING | The process is starting due to a start request. | +| 20 | RUNNING | The process is running. | +| 30 | BACKOFF |The process entered the STARTING state but subsequently exited too quickly to move to the RUNNING state.| +| 40 | STOPPING | The process is stopping due to a stop request. | +| 100 | EXITED | The process exited from the RUNNING state (expectedly or unexpectedly). | +| 200 | FATAL | The process could not be started successfully. | +| 1000 | UNKNOWN | The process is in an unknown state (supervisord programming error). | + +### Supervisor instance state field reference + +|Statecode| Statename | Description | +|---------|------------|----------------------------------------------| +| 2 | FATAL | Supervisor has experienced a serious error. | +| 1 | RUNNING | Supervisor is working normally. | +| 0 | RESTARTING | Supervisor is in the process of restarting. | +| -1 | SHUTDOWN |Supervisor is in the process of shutting down.| + +## Example Output + +```shell +supervisor_processes,group=ExampleGroup,id=supervisor,port=9001,process=ExampleProcess,source=localhost state=20i,uptime=75958i 1659786637000000000 +supervisor_instance,id=supervisor,port=9001,source=localhost state=1i 1659786637000000000 +``` diff --git a/plugins/inputs/supervisor/sample.conf b/plugins/inputs/supervisor/sample.conf new file mode 100644 index 000000000..c2f1bdf9a --- /dev/null +++ b/plugins/inputs/supervisor/sample.conf @@ -0,0 +1,10 @@ +# Gathers information about processes that running under supervisor using XML-RPC API +[[inputs.supervisor]] + ## Url of supervisor's XML-RPC endpoint if basic auth enabled in supervisor http server, + ## than you have to add credentials to url (ex. http://login:pass@localhost:9001/RPC2) + # url="http://localhost:9001/RPC2" + ## With settings below you can manage gathering additional information about processes + ## If both of them empty, then all additional information will be collected. + ## Currently supported supported additional metrics are: pid, rc + # metrics_include = [] + # metrics_exclude = ["pid", "rc"] \ No newline at end of file diff --git a/plugins/inputs/supervisor/supervisor.go b/plugins/inputs/supervisor/supervisor.go new file mode 100644 index 000000000..6f0cf41e0 --- /dev/null +++ b/plugins/inputs/supervisor/supervisor.go @@ -0,0 +1,180 @@ +package supervisor + +import ( + _ "embed" + "fmt" + "net/url" + "strings" + + "github.com/kolo/xmlrpc" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type Supervisor struct { + Server string `toml:"url"` + MetricsInc []string `toml:"metrics_include"` + MetricsExc []string `toml:"metrics_exclude"` + Log telegraf.Logger `toml:"-"` + + rpcClient *xmlrpc.Client + fieldFilter filter.Filter +} + +type processInfo struct { + Name string `xmlrpc:"name"` + Group string `xmlrpc:"group"` + Description string `xmlrpc:"description"` + Start int32 `xmlrpc:"start"` + Stop int32 `xmlrpc:"stop"` + Now int32 `xmlrpc:"now"` + State int16 `xmlrpc:"state"` + Statename string `xmlrpc:"statename"` + StdoutLogfile string `xmlrpc:"stdout_logfile"` + StderrLogfile string `xmlrpc:"stderr_logfile"` + SpawnErr string `xmlrpc:"spawnerr"` + ExitStatus int8 `xmlrpc:"exitstatus"` + Pid int32 `xmlrpc:"pid"` +} + +type supervisorInfo struct { + StateCode int8 `xmlrpc:"statecode"` + StateName string `xmlrpc:"statename"` + Ident string +} + +// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data. +//go:embed sample.conf +var sampleConfig string + +func (s *Supervisor) Description() string { + return "Gather info about processes state, that running under supervisor using its XML-RPC API" +} + +func (s *Supervisor) SampleConfig() string { + return sampleConfig +} + +func (s *Supervisor) Gather(acc telegraf.Accumulator) error { + // API call to get information about all running processes + var rawProcessData []processInfo + err := s.rpcClient.Call("supervisor.getAllProcessInfo", nil, &rawProcessData) + if err != nil { + return fmt.Errorf("failed to get processes info: %v", err) + } + + // API call to get information about instance status + var status supervisorInfo + err = s.rpcClient.Call("supervisor.getState", nil, &status) + if err != nil { + return fmt.Errorf("failed to get processes info: %v", err) + } + + // API call to get identification string + err = s.rpcClient.Call("supervisor.getIdentification", nil, &status.Ident) + if err != nil { + return fmt.Errorf("failed to get instance identification: %v", err) + } + + // Iterating through array of structs with processes info and adding fields to accumulator + for _, process := range rawProcessData { + processTags, processFields, err := s.parseProcessData(process, status) + if err != nil { + acc.AddError(err) + continue + } + acc.AddFields("supervisor_processes", processFields, processTags) + } + // Adding instance info fields to accumulator + instanceTags, instanceFields, err := s.parseInstanceData(status) + if err != nil { + return fmt.Errorf("failed to parse instance data: %v", err) + } + acc.AddFields("supervisor_instance", instanceFields, instanceTags) + return nil +} + +func (s *Supervisor) parseProcessData(pInfo processInfo, status supervisorInfo) (map[string]string, map[string]interface{}, error) { + tags := map[string]string{ + "process": pInfo.Name, + "group": pInfo.Group, + } + fields := map[string]interface{}{ + "uptime": pInfo.Now - pInfo.Start, + "state": pInfo.State, + } + if s.fieldFilter.Match("pid") { + fields["pid"] = pInfo.Pid + } + if s.fieldFilter.Match("rc") { + fields["exitCode"] = pInfo.ExitStatus + } + splittedURL, err := beautifyServerString(s.Server) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse server string: %v", err) + } + tags["id"] = status.Ident + tags["source"] = splittedURL[0] + tags["port"] = splittedURL[1] + return tags, fields, nil +} + +// Parsing of supervisor instance data +func (s *Supervisor) parseInstanceData(status supervisorInfo) (map[string]string, map[string]interface{}, error) { + splittedURL, err := beautifyServerString(s.Server) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse server string: %v", err) + } + tags := map[string]string{} + tags["id"] = status.Ident + tags["source"] = splittedURL[0] + tags["port"] = splittedURL[1] + fields := map[string]interface{}{"state": status.StateCode} + return tags, fields, nil +} + +func (s *Supervisor) Init() error { + // Using default server URL if none was specified in config + if s.Server == "" { + s.Server = "http://localhost:9001/RPC2" + } + var err error + // Initializing XML-RPC client + s.rpcClient, err = xmlrpc.NewClient(s.Server, nil) + if err != nil { + return fmt.Errorf("XML-RPC client initialization failed: %v", err) + } + // Setting filter for additional metrics + s.fieldFilter, err = filter.NewIncludeExcludeFilter(s.MetricsInc, s.MetricsExc) + if err != nil { + return fmt.Errorf("metrics filter setup failed: %v", err) + } + return nil +} + +func init() { + inputs.Add("supervisor", func() telegraf.Input { + return &Supervisor{ + MetricsExc: []string{"pid", "rc"}, + } + }) +} + +// Function to get only address and port from URL +func beautifyServerString(rawurl string) ([]string, error) { + parsedURL, err := url.Parse(rawurl) + splittedURL := strings.Split(parsedURL.Host, ":") + if err != nil { + return nil, err + } + if len(splittedURL) < 2 { + if parsedURL.Scheme == "https" { + splittedURL[1] = "443" + } else { + splittedURL[1] = "80" + } + } + return splittedURL, nil +} diff --git a/plugins/inputs/supervisor/supervisor_test.go b/plugins/inputs/supervisor/supervisor_test.go new file mode 100644 index 000000000..b6c25119c --- /dev/null +++ b/plugins/inputs/supervisor/supervisor_test.go @@ -0,0 +1,176 @@ +package supervisor + +import ( + "path/filepath" + "testing" + + "github.com/docker/go-connections/nat" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" +) + +func TestShort_SampleData(t *testing.T) { + testCases := []struct { + desc string + supervisorData supervisorInfo + sampleProcInfo []processInfo + expProcessFields []map[string]interface{} + expProcessTags []map[string]string + expInstanceFields map[string]interface{} + expInstancesTags map[string]string + }{ + { + desc: "Case 1", + sampleProcInfo: []processInfo{ + { + Name: "Process0", + Group: "ProcessGroup0", + Description: "pid 112 uptime 0:12:11", + Start: 1615632853, + Stop: 0, + Now: 1615632853 + 731, + State: 20, + Statename: "RUNNING", + StdoutLogfile: "/var/log/supervisor/process0-stdout.log", + StderrLogfile: "/var/log/supervisor/process0-stdout.log", + SpawnErr: "", + ExitStatus: 0, + Pid: 112, + }, + { + Name: "Process1", + Group: "ProcessGroup1", + Description: "pid 113 uptime 0:12:11", + Start: 1615632853, + Stop: 0, + Now: 1615632853 + 731, + State: 20, + Statename: "RUNNING", + StdoutLogfile: "/var/log/supervisor/process1-stdout.log", + StderrLogfile: "/var/log/supervisor/process1-stderr.log", + SpawnErr: "", + ExitStatus: 0, + Pid: 113, + }, + }, + supervisorData: supervisorInfo{ + StateCode: int8(1), + StateName: "RUNNING", + Ident: "supervisor", + }, + expProcessFields: []map[string]interface{}{ + { + "uptime": int32(731), + "state": int16(20), + "pid": int32(112), + "exitCode": int8(0), + }, + { + "uptime": int32(731), + "state": int16(20), + "pid": int32(113), + "exitCode": int8(0), + }, + }, + expProcessTags: []map[string]string{ + { + "process": "Process0", + "group": "ProcessGroup0", + "source": "example.org", + "port": "9001", + "id": "supervisor", + }, + { + "process": "Process1", + "group": "ProcessGroup1", + "source": "example.org", + "port": "9001", + "id": "supervisor", + }, + }, + expInstanceFields: map[string]interface{}{ + "state": int8(1), + }, + expInstancesTags: map[string]string{ + "source": "example.org", + "port": "9001", + "id": "supervisor", + }, + }, + } + for _, tC := range testCases { + t.Run(tC.desc, func(t *testing.T) { + s := &Supervisor{ + Server: "http://example.org:9001/RPC2", + MetricsInc: []string{}, + MetricsExc: []string{}, + } + status := supervisorInfo{ + StateCode: tC.supervisorData.StateCode, + StateName: tC.supervisorData.StateName, + Ident: tC.supervisorData.Ident, + } + err := s.Init() + if err != nil { + t.Errorf("failed to run Init function: %v", err) + } + for k, v := range tC.sampleProcInfo { + processTags, processFields, err := s.parseProcessData(v, status) + require.NoError(t, err) + require.Equal(t, tC.expProcessFields[k], processFields) + require.Equal(t, tC.expProcessTags[k], processTags) + } + instanceTags, instanceFields, err := s.parseInstanceData(status) + require.NoError(t, err) + require.Equal(t, tC.expInstancesTags, instanceTags) + require.Equal(t, tC.expInstanceFields, instanceFields) + }) + } +} + +func TestIntegration_BasicGathering(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + const supervisorPort = "9001" + supervisorConfig, err := filepath.Abs("testdata/supervisord.conf") + require.NoError(t, err, "Failed to get absolute path of supervisord config") + ctr := testutil.Container{ + Image: "niasar/supervisor:stretch-3.3", + ExposedPorts: []string{supervisorPort}, + BindMounts: map[string]string{ + "/etc/supervisor/supervisord.conf": supervisorConfig, + }, + WaitingFor: wait.ForAll( + wait.ForLog("supervisord started with pid"), + wait.ForListeningPort(nat.Port(supervisorPort)), + ), + } + err = ctr.Start() + require.NoError(t, err, "failed to start container") + defer func() { + require.NoError(t, ctr.Terminate(), "terminating container failed") + }() + s := &Supervisor{ + Server: "http://login:pass@" + testutil.GetLocalHost() + ":" + ctr.Ports[supervisorPort] + "/RPC2", + MetricsInc: []string{}, + MetricsExc: []string{}, + } + err = s.Init() + require.NoError(t, err, "failed to run Init function") + var acc testutil.Accumulator + err = acc.GatherError(s.Gather) + require.NoError(t, err) + require.Equal(t, acc.HasField("supervisor_processes", "uptime"), true) + require.Equal(t, acc.HasField("supervisor_processes", "state"), true) + require.Equal(t, acc.HasField("supervisor_processes", "pid"), true) + require.Equal(t, acc.HasField("supervisor_processes", "exitCode"), true) + require.Equal(t, acc.HasField("supervisor_instance", "state"), true) + require.Equal(t, acc.HasTag("supervisor_processes", "id"), true) + require.Equal(t, acc.HasTag("supervisor_processes", "source"), true) + require.Equal(t, acc.HasTag("supervisor_processes", "port"), true) + require.Equal(t, acc.HasTag("supervisor_instance", "id"), true) + require.Equal(t, acc.HasTag("supervisor_instance", "source"), true) + require.Equal(t, acc.HasTag("supervisor_instance", "port"), true) +} diff --git a/plugins/inputs/supervisor/testdata/supervisord.conf b/plugins/inputs/supervisor/testdata/supervisord.conf new file mode 100644 index 000000000..c09a351a6 --- /dev/null +++ b/plugins/inputs/supervisor/testdata/supervisord.conf @@ -0,0 +1,22 @@ +[inet_http_server] +port = 0.0.0.0:9001 +username = login +password = pass + +[supervisord] +logfile=/dev/stdout +pidfile=/var/run/supervisord.pid +logfile_maxbytes = 0 +nodaemon = true + +[rpcinterface:supervisor] +supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface + +[supervisorctl] +serverurl=http://localhost:9001 + +[program:sleep] +process_name = %(program_name)s-%(process_num)s +command=/bin/sleep infinity +numprocs=3 +autorestart=true \ No newline at end of file