fix(inputs.phpfpm): Continue despite erroneous sockets (#14852)
This commit is contained in:
parent
7aed6d2c23
commit
c475771993
|
|
@ -73,17 +73,17 @@ type JSONMetrics struct {
|
||||||
} `json:"processes"`
|
} `json:"processes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type metric map[string]int64
|
type metricStat map[string]int64
|
||||||
type poolStat map[string]metric
|
type poolStat map[string]metricStat
|
||||||
|
|
||||||
type phpfpm struct {
|
type phpfpm struct {
|
||||||
Format string `toml:"format"`
|
Format string `toml:"format"`
|
||||||
Timeout config.Duration `toml:"timeout"`
|
Timeout config.Duration `toml:"timeout"`
|
||||||
Urls []string `toml:"urls"`
|
Urls []string `toml:"urls"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
|
|
||||||
client *http.Client
|
client *http.Client
|
||||||
Log telegraf.Logger
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*phpfpm) SampleConfig() string {
|
func (*phpfpm) SampleConfig() string {
|
||||||
|
|
@ -91,6 +91,10 @@ func (*phpfpm) SampleConfig() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *phpfpm) Init() error {
|
func (p *phpfpm) Init() error {
|
||||||
|
if len(p.Urls) == 0 {
|
||||||
|
p.Urls = []string{"http://127.0.0.1/status"}
|
||||||
|
}
|
||||||
|
|
||||||
tlsCfg, err := p.ClientConfig.TLSConfig()
|
tlsCfg, err := p.ClientConfig.TLSConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -117,18 +121,8 @@ func (p *phpfpm) Init() error {
|
||||||
// Reads stats from all configured servers accumulates stats.
|
// Reads stats from all configured servers accumulates stats.
|
||||||
// Returns one of the errors encountered while gather stats (if any).
|
// Returns one of the errors encountered while gather stats (if any).
|
||||||
func (p *phpfpm) Gather(acc telegraf.Accumulator) error {
|
func (p *phpfpm) Gather(acc telegraf.Accumulator) error {
|
||||||
if len(p.Urls) == 0 {
|
|
||||||
return p.gatherServer("http://127.0.0.1/status", acc)
|
|
||||||
}
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
for _, serv := range expandUrls(acc, p.Urls) {
|
||||||
urls, err := expandUrls(p.Urls)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, serv := range urls {
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(serv string) {
|
go func(serv string) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
@ -259,7 +253,7 @@ func parseLines(r io.Reader, acc telegraf.Accumulator, addr string) {
|
||||||
// We start to gather data for a new pool here
|
// We start to gather data for a new pool here
|
||||||
if fieldName == PfPool {
|
if fieldName == PfPool {
|
||||||
currentPool = strings.Trim(keyvalue[1], " ")
|
currentPool = strings.Trim(keyvalue[1], " ")
|
||||||
stats[currentPool] = make(metric)
|
stats[currentPool] = make(metricStat)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -347,7 +341,7 @@ func (p *phpfpm) parseJSON(r io.Reader, acc telegraf.Accumulator, addr string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func expandUrls(urls []string) ([]string, error) {
|
func expandUrls(acc telegraf.Accumulator, urls []string) []string {
|
||||||
addrs := make([]string, 0, len(urls))
|
addrs := make([]string, 0, len(urls))
|
||||||
for _, address := range urls {
|
for _, address := range urls {
|
||||||
if isNetworkURL(address) {
|
if isNetworkURL(address) {
|
||||||
|
|
@ -356,11 +350,12 @@ func expandUrls(urls []string) ([]string, error) {
|
||||||
}
|
}
|
||||||
paths, err := globUnixSocket(address)
|
paths, err := globUnixSocket(address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
acc.AddError(err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
addrs = append(addrs, paths...)
|
addrs = append(addrs, paths...)
|
||||||
}
|
}
|
||||||
return addrs, nil
|
return addrs
|
||||||
}
|
}
|
||||||
|
|
||||||
func globUnixSocket(address string) ([]string, error) {
|
func globUnixSocket(address string) ([]string, error) {
|
||||||
|
|
|
||||||
|
|
@ -19,9 +19,12 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/common/shim"
|
"github.com/influxdata/telegraf/plugins/common/shim"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
@ -49,6 +52,7 @@ func TestPhpFpmGeneratesMetrics_From_Http(t *testing.T) {
|
||||||
url := ts.URL + "?test=ok"
|
url := ts.URL + "?test=ok"
|
||||||
r := &phpfpm{
|
r := &phpfpm{
|
||||||
Urls: []string{url},
|
Urls: []string{url},
|
||||||
|
Log: &testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, r.Init())
|
require.NoError(t, r.Init())
|
||||||
|
|
@ -96,7 +100,7 @@ func TestPhpFpmGeneratesJSONMetrics_From_Http(t *testing.T) {
|
||||||
input := &phpfpm{
|
input := &phpfpm{
|
||||||
Urls: []string{server.URL + "?full&json"},
|
Urls: []string{server.URL + "?full&json"},
|
||||||
Format: "json",
|
Format: "json",
|
||||||
Log: testutil.Logger{},
|
Log: &testutil.Logger{},
|
||||||
}
|
}
|
||||||
require.NoError(t, input.Init())
|
require.NoError(t, input.Init())
|
||||||
|
|
||||||
|
|
@ -117,8 +121,8 @@ func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) {
|
||||||
//Now we tested again above server
|
//Now we tested again above server
|
||||||
r := &phpfpm{
|
r := &phpfpm{
|
||||||
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
|
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
|
||||||
|
Log: &testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, r.Init())
|
require.NoError(t, r.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|
@ -161,12 +165,11 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
|
||||||
|
|
||||||
r := &phpfpm{
|
r := &phpfpm{
|
||||||
Urls: []string{tcp.Addr().String()},
|
Urls: []string{tcp.Addr().String()},
|
||||||
|
Log: &testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, r.Init())
|
require.NoError(t, r.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
require.NoError(t, acc.GatherError(r.Gather))
|
require.NoError(t, acc.GatherError(r.Gather))
|
||||||
|
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
|
|
@ -214,14 +217,12 @@ func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) {
|
||||||
|
|
||||||
r := &phpfpm{
|
r := &phpfpm{
|
||||||
Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"},
|
Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"},
|
||||||
|
Log: &testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, r.Init())
|
require.NoError(t, r.Init())
|
||||||
|
|
||||||
var acc1, acc2 testutil.Accumulator
|
var acc1, acc2 testutil.Accumulator
|
||||||
|
|
||||||
require.NoError(t, acc1.GatherError(r.Gather))
|
require.NoError(t, acc1.GatherError(r.Gather))
|
||||||
|
|
||||||
require.NoError(t, acc2.GatherError(r.Gather))
|
require.NoError(t, acc2.GatherError(r.Gather))
|
||||||
|
|
||||||
tags1 := map[string]string{
|
tags1 := map[string]string{
|
||||||
|
|
@ -267,12 +268,11 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
|
||||||
|
|
||||||
r := &phpfpm{
|
r := &phpfpm{
|
||||||
Urls: []string{tcp.Addr().String() + ":custom-status-path"},
|
Urls: []string{tcp.Addr().String() + ":custom-status-path"},
|
||||||
|
Log: &testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, r.Init())
|
require.NoError(t, r.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
require.NoError(t, acc.GatherError(r.Gather))
|
require.NoError(t, acc.GatherError(r.Gather))
|
||||||
|
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
|
|
@ -300,15 +300,14 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
|
||||||
// When not passing server config, we default to localhost
|
// When not passing server config, we default to localhost
|
||||||
// We just want to make sure we did request stat from localhost
|
// We just want to make sure we did request stat from localhost
|
||||||
func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) {
|
func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) {
|
||||||
r := &phpfpm{Urls: []string{"http://bad.localhost:62001/status"}}
|
r := &phpfpm{
|
||||||
|
Urls: []string{"http://bad.localhost:62001/status"},
|
||||||
|
Log: &testutil.Logger{},
|
||||||
|
}
|
||||||
require.NoError(t, r.Init())
|
require.NoError(t, r.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
require.ErrorContains(t, acc.GatherError(r.Gather), "/status")
|
||||||
err := acc.GatherError(r.Gather)
|
|
||||||
require.Error(t, err)
|
|
||||||
require.Contains(t, err.Error(), "/status")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t *testing.T) {
|
func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t *testing.T) {
|
||||||
|
|
@ -318,30 +317,25 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t
|
||||||
|
|
||||||
r := &phpfpm{
|
r := &phpfpm{
|
||||||
Urls: []string{"http://aninvalidone"},
|
Urls: []string{"http://aninvalidone"},
|
||||||
|
Log: &testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, r.Init())
|
require.NoError(t, r.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
err := acc.GatherError(r.Gather)
|
err := acc.GatherError(r.Gather)
|
||||||
require.Error(t, err)
|
require.ErrorContains(t, err, `unable to connect to phpfpm status page 'http://aninvalidone'`)
|
||||||
require.Contains(t, err.Error(), `unable to connect to phpfpm status page 'http://aninvalidone'`)
|
require.ErrorContains(t, err, `lookup aninvalidone`)
|
||||||
require.Contains(t, err.Error(), `lookup aninvalidone`)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testing.T) {
|
func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testing.T) {
|
||||||
r := &phpfpm{
|
r := &phpfpm{
|
||||||
Urls: []string{"/tmp/invalid.sock"},
|
Urls: []string{"/tmp/invalid.sock"},
|
||||||
|
Log: &testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, r.Init())
|
require.NoError(t, r.Init())
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
require.ErrorContains(t, acc.GatherError(r.Gather), `socket doesn't exist "/tmp/invalid.sock"`)
|
||||||
err := acc.GatherError(r.Gather)
|
|
||||||
require.Error(t, err)
|
|
||||||
require.Equal(t, `socket doesn't exist "/tmp/invalid.sock"`, err.Error())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const outputSample = `
|
const outputSample = `
|
||||||
|
|
@ -389,3 +383,48 @@ func TestPhpFpmParseJSON_Log_Error_Without_Panic_When_When_JSON_Is_Invalid(t *te
|
||||||
require.NotPanics(t, func() { p.parseJSON(bytes.NewReader(invalidJSON), &testutil.NopAccumulator{}, "") })
|
require.NotPanics(t, func() { p.parseJSON(bytes.NewReader(invalidJSON), &testutil.NopAccumulator{}, "") })
|
||||||
require.Contains(t, logOutput.String(), "E! Unable to decode JSON response: invalid character 'X' looking for beginning of value")
|
require.Contains(t, logOutput.String(), "E! Unable to decode JSON response: invalid character 'X' looking for beginning of value")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGatherDespiteUnavailable(t *testing.T) {
|
||||||
|
// Let OS find an available port
|
||||||
|
tcp, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err, "Cannot initialize test server")
|
||||||
|
defer tcp.Close()
|
||||||
|
|
||||||
|
s := statServer{}
|
||||||
|
go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway
|
||||||
|
|
||||||
|
//Now we tested again above server
|
||||||
|
r := &phpfpm{
|
||||||
|
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status", "/lala"},
|
||||||
|
Log: &testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, r.Init())
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"phpfpm",
|
||||||
|
map[string]string{
|
||||||
|
"pool": "www",
|
||||||
|
"url": r.Urls[0],
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"start_since": int64(1991),
|
||||||
|
"accepted_conn": int64(3),
|
||||||
|
"listen_queue": int64(1),
|
||||||
|
"max_listen_queue": int64(0),
|
||||||
|
"listen_queue_len": int64(0),
|
||||||
|
"idle_processes": int64(1),
|
||||||
|
"active_processes": int64(1),
|
||||||
|
"total_processes": int64(2),
|
||||||
|
"max_active_processes": int64(1),
|
||||||
|
"max_children_reached": int64(2),
|
||||||
|
"slow_requests": int64(1),
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.ErrorContains(t, acc.GatherError(r.Gather), "socket doesn't exist")
|
||||||
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue