Add nginx_sts input plugin (#7205)

This commit is contained in:
Zinovii Dmytriv 2020-07-01 09:29:44 +03:00 committed by GitHub
parent 0b71d2f943
commit 0830b2f8af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 802 additions and 0 deletions

View File

@ -108,6 +108,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_plus"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_plus_api"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_sts"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_upstream_check"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx_vts"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq"

View File

@ -0,0 +1,99 @@
# Telegraf Plugin: nginx_sts
This plugin gathers Nginx status using external virtual host traffic status module - https://github.com/vozlt/nginx-module-sts. This is an Nginx module that provides access to stream host status information. It contains the current status such as servers, upstreams, caches. This is similar to the live activity monitoring of Nginx plus.
For module configuration details please see its [documentation](https://github.com/vozlt/nginx-module-sts#synopsis).
### Configuration:
```
# Read nginx status information using nginx-module-sts module
[[inputs.nginx_sts]]
## An array of Nginx status URIs to gather stats.
urls = ["http://localhost/stream_traffic_status/status/format/json"]
```
### Measurements & Fields:
- nginx_sts_connections
- active
- reading
- writing
- waiting
- accepted
- handled
- requests
- nginx_sts_server, nginx_sts_filter
- connects
- in_bytes
- out_bytes
- response_1xx_count
- response_2xx_count
- response_3xx_count
- response_4xx_count
- response_5xx_count
- session_msec_counter
- session_msec
- nginx_sts_upstream
- connects
- in_bytes
- out_bytes
- response_1xx_count
- response_2xx_count
- response_3xx_count
- response_4xx_count
- response_5xx_count
- session_msec_counter
- session_msec
- upstream_session_msec_counter
- upstream_session_msec
- upstream_connect_msec_counter
- upstream_connect_msec
- upstream_firstbyte_msec_counter
- upstream_firstbyte_msec
- weight
- max_fails
- fail_timeout
- backup
- down
### Tags:
- nginx_sts_connections
- source
- port
- nginx_sts_server
- source
- port
- zone
- nginx_sts_filter
- source
- port
- filter_name
- filter_key
- nginx_sts_upstream
- source
- port
- upstream
- upstream_address
### Example Output:
Using this configuration:
```
[[inputs.nginx_sts]]
## An array of Nginx status URIs to gather stats.
urls = ["http://localhost/stream_traffic_status/status/format/json"]
```
When run with:
```
./telegraf -config telegraf.conf -input-filter nginx_sts -test
```
It produces:
```
nginx_sts_upstream,host=localhost,port=80,source=127.0.0.1,upstream=backend_cluster,upstream_address=1.2.3.4:8080 upstream_connect_msec_counter=0i,out_bytes=0i,down=false,connects=0i,session_msec=0i,upstream_session_msec=0i,upstream_session_msec_counter=0i,upstream_connect_msec=0i,upstream_firstbyte_msec_counter=0i,response_3xx_count=0i,session_msec_counter=0i,weight=1i,max_fails=1i,backup=false,upstream_firstbyte_msec=0i,in_bytes=0i,response_1xx_count=0i,response_2xx_count=0i,response_4xx_count=0i,response_5xx_count=0i,fail_timeout=10i 1584699180000000000
nginx_sts_upstream,host=localhost,port=80,source=127.0.0.1,upstream=backend_cluster,upstream_address=9.8.7.6:8080 upstream_firstbyte_msec_counter=0i,response_2xx_count=0i,down=false,upstream_session_msec_counter=0i,out_bytes=0i,response_5xx_count=0i,weight=1i,max_fails=1i,fail_timeout=10i,connects=0i,session_msec_counter=0i,upstream_session_msec=0i,in_bytes=0i,response_1xx_count=0i,response_3xx_count=0i,response_4xx_count=0i,session_msec=0i,upstream_connect_msec=0i,upstream_connect_msec_counter=0i,upstream_firstbyte_msec=0i,backup=false 1584699180000000000
nginx_sts_server,host=localhost,port=80,source=127.0.0.1,zone=* response_2xx_count=0i,response_4xx_count=0i,response_5xx_count=0i,session_msec_counter=0i,in_bytes=0i,out_bytes=0i,session_msec=0i,response_1xx_count=0i,response_3xx_count=0i,connects=0i 1584699180000000000
nginx_sts_connections,host=localhost,port=80,source=127.0.0.1 waiting=1i,accepted=146i,handled=146i,requests=13421i,active=3i,reading=0i,writing=2i 1584699180000000000
```

View File

@ -0,0 +1,304 @@
package nginx_sts
import (
"bufio"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
type NginxSTS struct {
Urls []string `toml:"urls"`
ResponseTimeout internal.Duration `toml:"response_timeout"`
tls.ClientConfig
client *http.Client
}
var sampleConfig = `
## An array of ngx_http_status_module or status URI to gather stats.
urls = ["http://localhost/status"]
## HTTP response timeout (default: 5s)
response_timeout = "5s"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
func (n *NginxSTS) SampleConfig() string {
return sampleConfig
}
func (n *NginxSTS) Description() string {
return "Read Nginx virtual host traffic status module information (nginx-module-sts)"
}
func (n *NginxSTS) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
// Create an HTTP client that is re-used for each
// collection interval
if n.client == nil {
client, err := n.createHTTPClient()
if err != nil {
return err
}
n.client = client
}
for _, u := range n.Urls {
addr, err := url.Parse(u)
if err != nil {
acc.AddError(fmt.Errorf("Unable to parse address '%s': %s", u, err))
continue
}
wg.Add(1)
go func(addr *url.URL) {
defer wg.Done()
acc.AddError(n.gatherURL(addr, acc))
}(addr)
}
wg.Wait()
return nil
}
func (n *NginxSTS) createHTTPClient() (*http.Client, error) {
if n.ResponseTimeout.Duration < time.Second {
n.ResponseTimeout.Duration = time.Second * 5
}
tlsConfig, err := n.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
Timeout: n.ResponseTimeout.Duration,
}
return client, nil
}
func (n *NginxSTS) gatherURL(addr *url.URL, acc telegraf.Accumulator) error {
resp, err := n.client.Get(addr.String())
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", addr.String(), resp.Status)
}
contentType := strings.Split(resp.Header.Get("Content-Type"), ";")[0]
switch contentType {
case "application/json":
return gatherStatusURL(bufio.NewReader(resp.Body), getTags(addr), acc)
default:
return fmt.Errorf("%s returned unexpected content type %s", addr.String(), contentType)
}
}
type NginxSTSResponse struct {
Connections struct {
Active uint64 `json:"active"`
Reading uint64 `json:"reading"`
Writing uint64 `json:"writing"`
Waiting uint64 `json:"waiting"`
Accepted uint64 `json:"accepted"`
Handled uint64 `json:"handled"`
Requests uint64 `json:"requests"`
} `json:"connections"`
Hostname string `json:"hostName"`
StreamFilterZones map[string]map[string]Server `json:"streamFilterZones"`
StreamServerZones map[string]Server `json:"streamServerZones"`
StreamUpstreamZones map[string][]Upstream `json:"streamUpstreamZones"`
}
type Server struct {
ConnectCounter uint64 `json:"connectCounter"`
InBytes uint64 `json:"inBytes"`
OutBytes uint64 `json:"outBytes"`
SessionMsecCounter uint64 `json:"sessionMsecCounter"`
SessionMsec uint64 `json:"sessionMsec"`
Responses struct {
OneXx uint64 `json:"1xx"`
TwoXx uint64 `json:"2xx"`
ThreeXx uint64 `json:"3xx"`
FourXx uint64 `json:"4xx"`
FiveXx uint64 `json:"5xx"`
} `json:"responses"`
}
type Upstream struct {
Server string `json:"server"`
ConnectCounter uint64 `json:"connectCounter"`
InBytes uint64 `json:"inBytes"`
OutBytes uint64 `json:"outBytes"`
Responses struct {
OneXx uint64 `json:"1xx"`
TwoXx uint64 `json:"2xx"`
ThreeXx uint64 `json:"3xx"`
FourXx uint64 `json:"4xx"`
FiveXx uint64 `json:"5xx"`
} `json:"responses"`
SessionMsecCounter uint64 `json:"sessionMsecCounter"`
SessionMsec uint64 `json:"sessionMsec"`
USessionMsecCounter uint64 `json:"uSessionMsecCounter"`
USessionMsec uint64 `json:"uSessionMsec"`
UConnectMsecCounter uint64 `json:"uConnectMsecCounter"`
UConnectMsec uint64 `json:"uConnectMsec"`
UFirstByteMsecCounter uint64 `json:"uFirstByteMsecCounter"`
UFirstByteMsec uint64 `json:"uFirstByteMsec"`
Weight uint64 `json:"weight"`
MaxFails uint64 `json:"maxFails"`
FailTimeout uint64 `json:"failTimeout"`
Backup bool `json:"backup"`
Down bool `json:"down"`
}
func gatherStatusURL(r *bufio.Reader, tags map[string]string, acc telegraf.Accumulator) error {
dec := json.NewDecoder(r)
status := &NginxSTSResponse{}
if err := dec.Decode(status); err != nil {
return fmt.Errorf("Error while decoding JSON response")
}
acc.AddFields("nginx_sts_connections", map[string]interface{}{
"active": status.Connections.Active,
"reading": status.Connections.Reading,
"writing": status.Connections.Writing,
"waiting": status.Connections.Waiting,
"accepted": status.Connections.Accepted,
"handled": status.Connections.Handled,
"requests": status.Connections.Requests,
}, tags)
for zoneName, zone := range status.StreamServerZones {
zoneTags := map[string]string{}
for k, v := range tags {
zoneTags[k] = v
}
zoneTags["zone"] = zoneName
acc.AddFields("nginx_sts_server", map[string]interface{}{
"connects": zone.ConnectCounter,
"in_bytes": zone.InBytes,
"out_bytes": zone.OutBytes,
"session_msec_counter": zone.SessionMsecCounter,
"session_msec": zone.SessionMsec,
"response_1xx_count": zone.Responses.OneXx,
"response_2xx_count": zone.Responses.TwoXx,
"response_3xx_count": zone.Responses.ThreeXx,
"response_4xx_count": zone.Responses.FourXx,
"response_5xx_count": zone.Responses.FiveXx,
}, zoneTags)
}
for filterName, filters := range status.StreamFilterZones {
for filterKey, upstream := range filters {
filterTags := map[string]string{}
for k, v := range tags {
filterTags[k] = v
}
filterTags["filter_key"] = filterKey
filterTags["filter_name"] = filterName
acc.AddFields("nginx_sts_filter", map[string]interface{}{
"connects": upstream.ConnectCounter,
"in_bytes": upstream.InBytes,
"out_bytes": upstream.OutBytes,
"session_msec_counter": upstream.SessionMsecCounter,
"session_msec": upstream.SessionMsec,
"response_1xx_count": upstream.Responses.OneXx,
"response_2xx_count": upstream.Responses.TwoXx,
"response_3xx_count": upstream.Responses.ThreeXx,
"response_4xx_count": upstream.Responses.FourXx,
"response_5xx_count": upstream.Responses.FiveXx,
}, filterTags)
}
}
for upstreamName, upstreams := range status.StreamUpstreamZones {
for _, upstream := range upstreams {
upstreamServerTags := map[string]string{}
for k, v := range tags {
upstreamServerTags[k] = v
}
upstreamServerTags["upstream"] = upstreamName
upstreamServerTags["upstream_address"] = upstream.Server
acc.AddFields("nginx_sts_upstream", map[string]interface{}{
"connects": upstream.ConnectCounter,
"session_msec": upstream.SessionMsec,
"session_msec_counter": upstream.SessionMsecCounter,
"upstream_session_msec": upstream.USessionMsec,
"upstream_session_msec_counter": upstream.USessionMsecCounter,
"upstream_connect_msec": upstream.UConnectMsec,
"upstream_connect_msec_counter": upstream.UConnectMsecCounter,
"upstream_firstbyte_msec": upstream.UFirstByteMsec,
"upstream_firstbyte_msec_counter": upstream.UFirstByteMsecCounter,
"in_bytes": upstream.InBytes,
"out_bytes": upstream.OutBytes,
"response_1xx_count": upstream.Responses.OneXx,
"response_2xx_count": upstream.Responses.TwoXx,
"response_3xx_count": upstream.Responses.ThreeXx,
"response_4xx_count": upstream.Responses.FourXx,
"response_5xx_count": upstream.Responses.FiveXx,
"weight": upstream.Weight,
"max_fails": upstream.MaxFails,
"fail_timeout": upstream.FailTimeout,
"backup": upstream.Backup,
"down": upstream.Down,
}, upstreamServerTags)
}
}
return nil
}
// Get tag(s) for the nginx plugin
func getTags(addr *url.URL) map[string]string {
h := addr.Host
host, port, err := net.SplitHostPort(h)
if err != nil {
host = addr.Host
if addr.Scheme == "http" {
port = "80"
} else if addr.Scheme == "https" {
port = "443"
} else {
port = ""
}
}
return map[string]string{"source": host, "port": port}
}
func init() {
inputs.Add("nginx_sts", func() telegraf.Input {
return &NginxSTS{}
})
}

View File

@ -0,0 +1,398 @@
package nginx_sts
import (
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
const sampleStatusResponse = `
{
"hostName": "test.example.com",
"nginxVersion": "1.12.2",
"loadMsec": 1518180328331,
"nowMsec": 1518256058416,
"connections": {
"active": 111,
"reading": 222,
"writing": 333,
"waiting": 444,
"accepted": 555,
"handled": 666,
"requests": 777
},
"streamServerZones": {
"example.com": {
"connectCounter": 1415887,
"inBytes": 1296356607,
"outBytes": 4404939605,
"responses": {
"1xx": 100,
"2xx": 200,
"3xx": 300,
"4xx": 400,
"5xx": 500
},
"sessionMsecCounter": 13,
"sessionMsec": 14
},
"other.example.com": {
"connectCounter": 505,
"inBytes": 171388,
"outBytes": 1273382,
"responses": {
"1xx": 101,
"2xx": 201,
"3xx": 301,
"4xx": 401,
"5xx": 501
},
"sessionMsecCounter": 12,
"sessionMsec": 15
}
},
"streamFilterZones": {
"country": {
"FI": {
"connectCounter": 60,
"inBytes": 2570,
"outBytes": 53597,
"responses": {
"1xx": 106,
"2xx": 206,
"3xx": 306,
"4xx": 406,
"5xx": 506
},
"sessionMsecCounter": 12,
"sessionMsec": 15
}
}
},
"streamUpstreamZones": {
"backend_cluster": [
{
"server": "127.0.0.1:6000",
"connectCounter": 2103849,
"inBytes": 1774680141,
"outBytes": 11727669190,
"responses": {
"1xx": 103,
"2xx": 203,
"3xx": 303,
"4xx": 403,
"5xx": 503
},
"sessionMsecCounter": 31,
"sessionMsec": 131,
"uSessionMsecCounter": 32,
"uSessionMsec": 132,
"uConnectMsecCounter": 33,
"uConnectMsec": 130,
"uFirstByteMsecCounter": 34,
"uFirstByteMsec": 129,
"weight": 32,
"maxFails": 33,
"failTimeout": 34,
"backup": false,
"down": false
}
],
"::nogroups": [
{
"server": "127.0.0.1:4433",
"connectCounter": 8,
"inBytes": 5013,
"outBytes": 487585,
"responses": {
"1xx": 104,
"2xx": 204,
"3xx": 304,
"4xx": 404,
"5xx": 504
},
"sessionMsecCounter": 31,
"sessionMsec": 131,
"uSessionMsecCounter": 32,
"uSessionMsec": 132,
"uConnectMsecCounter": 33,
"uConnectMsec": 130,
"uFirstByteMsecCounter": 34,
"uFirstByteMsec": 129,
"weight": 36,
"maxFails": 37,
"failTimeout": 38,
"backup": true,
"down": false
},
{
"server": "127.0.0.1:8080",
"connectCounter": 7,
"inBytes": 2926,
"outBytes": 3846638,
"responses": {
"1xx": 105,
"2xx": 205,
"3xx": 305,
"4xx": 405,
"5xx": 505
},
"sessionMsecCounter": 31,
"sessionMsec": 131,
"uSessionMsecCounter": 32,
"uSessionMsec": 132,
"uConnectMsecCounter": 33,
"uConnectMsec": 130,
"uFirstByteMsecCounter": 34,
"uFirstByteMsec": 129,
"weight": 41,
"maxFails": 42,
"failTimeout": 43,
"backup": true,
"down": true
}
]
}
}
`
func TestNginxPlusGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string
if r.URL.Path == "/status" {
rsp = sampleStatusResponse
w.Header()["Content-Type"] = []string{"application/json"}
} else {
panic("Cannot handle request")
}
fmt.Fprintln(w, rsp)
}))
defer ts.Close()
n := &NginxSTS{
Urls: []string{fmt.Sprintf("%s/status", ts.URL)},
}
var acc testutil.Accumulator
err := n.Gather(&acc)
require.NoError(t, err)
addr, err := url.Parse(ts.URL)
if err != nil {
panic(err)
}
host, port, err := net.SplitHostPort(addr.Host)
if err != nil {
host = addr.Host
if addr.Scheme == "http" {
port = "80"
} else if addr.Scheme == "https" {
port = "443"
} else {
port = ""
}
}
acc.AssertContainsTaggedFields(
t,
"nginx_sts_connections",
map[string]interface{}{
"accepted": uint64(555),
"active": uint64(111),
"handled": uint64(666),
"reading": uint64(222),
"requests": uint64(777),
"waiting": uint64(444),
"writing": uint64(333),
},
map[string]string{
"source": host,
"port": port,
})
acc.AssertContainsTaggedFields(
t,
"nginx_sts_server",
map[string]interface{}{
"connects": uint64(1415887),
"in_bytes": uint64(1296356607),
"out_bytes": uint64(4404939605),
"session_msec_counter": uint64(13),
"session_msec": uint64(14),
"response_1xx_count": uint64(100),
"response_2xx_count": uint64(200),
"response_3xx_count": uint64(300),
"response_4xx_count": uint64(400),
"response_5xx_count": uint64(500),
},
map[string]string{
"source": host,
"port": port,
"zone": "example.com",
})
acc.AssertContainsTaggedFields(
t,
"nginx_sts_filter",
map[string]interface{}{
"connects": uint64(60),
"in_bytes": uint64(2570),
"out_bytes": uint64(53597),
"session_msec_counter": uint64(12),
"session_msec": uint64(15),
"response_1xx_count": uint64(106),
"response_2xx_count": uint64(206),
"response_3xx_count": uint64(306),
"response_4xx_count": uint64(406),
"response_5xx_count": uint64(506),
},
map[string]string{
"source": host,
"port": port,
"filter_key": "FI",
"filter_name": "country",
})
acc.AssertContainsTaggedFields(
t,
"nginx_sts_server",
map[string]interface{}{
"connects": uint64(505),
"in_bytes": uint64(171388),
"out_bytes": uint64(1273382),
"session_msec_counter": uint64(12),
"session_msec": uint64(15),
"response_1xx_count": uint64(101),
"response_2xx_count": uint64(201),
"response_3xx_count": uint64(301),
"response_4xx_count": uint64(401),
"response_5xx_count": uint64(501),
},
map[string]string{
"source": host,
"port": port,
"zone": "other.example.com",
})
acc.AssertContainsTaggedFields(
t,
"nginx_sts_upstream",
map[string]interface{}{
"connects": uint64(2103849),
"in_bytes": uint64(1774680141),
"out_bytes": uint64(11727669190),
"response_1xx_count": uint64(103),
"response_2xx_count": uint64(203),
"response_3xx_count": uint64(303),
"response_4xx_count": uint64(403),
"response_5xx_count": uint64(503),
"session_msec_counter": uint64(31),
"session_msec": uint64(131),
"upstream_session_msec_counter": uint64(32),
"upstream_session_msec": uint64(132),
"upstream_connect_msec_counter": uint64(33),
"upstream_connect_msec": uint64(130),
"upstream_firstbyte_msec_counter": uint64(34),
"upstream_firstbyte_msec": uint64(129),
"weight": uint64(32),
"max_fails": uint64(33),
"fail_timeout": uint64(34),
"backup": bool(false),
"down": bool(false),
},
map[string]string{
"source": host,
"port": port,
"upstream": "backend_cluster",
"upstream_address": "127.0.0.1:6000",
})
acc.AssertContainsTaggedFields(
t,
"nginx_sts_upstream",
map[string]interface{}{
"connects": uint64(8),
"in_bytes": uint64(5013),
"out_bytes": uint64(487585),
"response_1xx_count": uint64(104),
"response_2xx_count": uint64(204),
"response_3xx_count": uint64(304),
"response_4xx_count": uint64(404),
"response_5xx_count": uint64(504),
"session_msec_counter": uint64(31),
"session_msec": uint64(131),
"upstream_session_msec_counter": uint64(32),
"upstream_session_msec": uint64(132),
"upstream_connect_msec_counter": uint64(33),
"upstream_connect_msec": uint64(130),
"upstream_firstbyte_msec_counter": uint64(34),
"upstream_firstbyte_msec": uint64(129),
"weight": uint64(36),
"max_fails": uint64(37),
"fail_timeout": uint64(38),
"backup": bool(true),
"down": bool(false),
},
map[string]string{
"source": host,
"port": port,
"upstream": "::nogroups",
"upstream_address": "127.0.0.1:4433",
})
acc.AssertContainsTaggedFields(
t,
"nginx_sts_upstream",
map[string]interface{}{
"connects": uint64(7),
"in_bytes": uint64(2926),
"out_bytes": uint64(3846638),
"response_1xx_count": uint64(105),
"response_2xx_count": uint64(205),
"response_3xx_count": uint64(305),
"response_4xx_count": uint64(405),
"response_5xx_count": uint64(505),
"session_msec_counter": uint64(31),
"session_msec": uint64(131),
"upstream_session_msec_counter": uint64(32),
"upstream_session_msec": uint64(132),
"upstream_connect_msec_counter": uint64(33),
"upstream_connect_msec": uint64(130),
"upstream_firstbyte_msec_counter": uint64(34),
"upstream_firstbyte_msec": uint64(129),
"weight": uint64(41),
"max_fails": uint64(42),
"fail_timeout": uint64(43),
"backup": bool(true),
"down": bool(true),
},
map[string]string{
"source": host,
"port": port,
"upstream": "::nogroups",
"upstream_address": "127.0.0.1:8080",
})
}