fix(inputs.phpfpm): Add timeout for fcgi (#15036)
This commit is contained in:
parent
21f355a59c
commit
4c1aa59574
|
|
@ -6,10 +6,11 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Create an fcgi client
|
// Create an fcgi client
|
||||||
func newFcgiClient(h string, args ...interface{}) (*conn, error) {
|
func newFcgiClient(timeout time.Duration, h string, args ...interface{}) (*conn, error) {
|
||||||
var con net.Conn
|
var con net.Conn
|
||||||
if len(args) != 1 {
|
if len(args) != 1 {
|
||||||
return nil, errors.New("fcgi: not enough params")
|
return nil, errors.New("fcgi: not enough params")
|
||||||
|
|
@ -19,13 +20,24 @@ func newFcgiClient(h string, args ...interface{}) (*conn, error) {
|
||||||
switch args[0].(type) {
|
switch args[0].(type) {
|
||||||
case int:
|
case int:
|
||||||
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
|
addr := h + ":" + strconv.FormatInt(int64(args[0].(int)), 10)
|
||||||
con, err = net.Dial("tcp", addr)
|
if timeout == 0 {
|
||||||
|
con, err = net.Dial("tcp", addr)
|
||||||
|
} else {
|
||||||
|
con, err = net.DialTimeout("tcp", addr, timeout)
|
||||||
|
}
|
||||||
case string:
|
case string:
|
||||||
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
|
laddr := net.UnixAddr{Name: args[0].(string), Net: h}
|
||||||
con, err = net.DialUnix(h, nil, &laddr)
|
con, err = net.DialUnix(h, nil, &laddr)
|
||||||
default:
|
default:
|
||||||
err = errors.New("fcgi: we only accept int (port) or string (socket) params")
|
err = errors.New("fcgi: we only accept int (port) or string (socket) params")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if timeout != 0 {
|
||||||
|
if err := con.SetDeadline(time.Now().Add(timeout)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fcgi := &conn{
|
fcgi := &conn{
|
||||||
rwc: con,
|
rwc: con,
|
||||||
}
|
}
|
||||||
|
|
@ -66,6 +78,12 @@ READ_LOOP:
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
if err1 != nil && strings.Contains(err1.Error(), "i/o timeout") {
|
||||||
|
if !errors.Is(err1, io.EOF) {
|
||||||
|
err = err1
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case rec.h.Type == typeStdout:
|
case rec.h.Type == typeStdout:
|
||||||
|
|
|
||||||
|
|
@ -159,7 +159,7 @@ func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
fcgiIP := socketAddr[0]
|
fcgiIP := socketAddr[0]
|
||||||
fcgiPort, _ := strconv.Atoi(socketAddr[1])
|
fcgiPort, _ := strconv.Atoi(socketAddr[1])
|
||||||
fcgi, err = newFcgiClient(fcgiIP, fcgiPort)
|
fcgi, err = newFcgiClient(time.Duration(p.Timeout), fcgiIP, fcgiPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -173,7 +173,7 @@ func (p *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
|
||||||
if statusPath == "" {
|
if statusPath == "" {
|
||||||
statusPath = "status"
|
statusPath = "status"
|
||||||
}
|
}
|
||||||
fcgi, err = newFcgiClient("unix", socketPath)
|
fcgi, err = newFcgiClient(time.Duration(p.Timeout), "unix", socketPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/plugins/common/shim"
|
"github.com/influxdata/telegraf/plugins/common/shim"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
|
|
@ -150,6 +151,42 @@ func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) {
|
||||||
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
|
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPhpFpmTimeout_From_Fcgi(t *testing.T) {
|
||||||
|
// Let OS find an available port
|
||||||
|
tcp, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err, "Cannot initialize test server")
|
||||||
|
defer tcp.Close()
|
||||||
|
|
||||||
|
const timeout = 200 * time.Millisecond
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
conn, err := tcp.Accept()
|
||||||
|
if err != nil {
|
||||||
|
return // ignore the returned error as we cannot do anything about it anyway
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
// Sleep longer than the timeout
|
||||||
|
time.Sleep(2 * timeout)
|
||||||
|
}()
|
||||||
|
|
||||||
|
//Now we tested again above server
|
||||||
|
r := &phpfpm{
|
||||||
|
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
|
||||||
|
Timeout: config.Duration(timeout),
|
||||||
|
Log: &testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, r.Init())
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.Error(t, acc.GatherError(r.Gather))
|
||||||
|
|
||||||
|
require.Empty(t, acc.GetTelegrafMetrics())
|
||||||
|
require.GreaterOrEqual(t, time.Since(start), timeout)
|
||||||
|
}
|
||||||
|
|
||||||
func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
|
func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
|
||||||
// Create a socket in /tmp because we always have write permission and if the
|
// Create a socket in /tmp because we always have write permission and if the
|
||||||
// removing of socket fail when system restart /tmp is clear so
|
// removing of socket fail when system restart /tmp is clear so
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue