chore: Fix linter findings for `revive:exported` in `plugins/inputs/h*` (#16050)
This commit is contained in:
parent
5c4ef13c66
commit
fc55d3ab7b
|
|
@ -24,25 +24,41 @@ import (
|
||||||
//go:embed sample.conf
|
//go:embed sample.conf
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
|
var (
|
||||||
|
typeNames = []string{"frontend", "backend", "server", "listener"}
|
||||||
|
fieldRenames = map[string]string{
|
||||||
|
"pxname": "proxy",
|
||||||
|
"svname": "sv",
|
||||||
|
"act": "active_servers",
|
||||||
|
"bck": "backup_servers",
|
||||||
|
"cli_abrt": "cli_abort",
|
||||||
|
"srv_abrt": "srv_abort",
|
||||||
|
"hrsp_1xx": "http_response.1xx",
|
||||||
|
"hrsp_2xx": "http_response.2xx",
|
||||||
|
"hrsp_3xx": "http_response.3xx",
|
||||||
|
"hrsp_4xx": "http_response.4xx",
|
||||||
|
"hrsp_5xx": "http_response.5xx",
|
||||||
|
"hrsp_other": "http_response.other",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// CSV format: https://cbonte.github.io/haproxy-dconv/1.5/configuration.html#9.1
|
// CSV format: https://cbonte.github.io/haproxy-dconv/1.5/configuration.html#9.1
|
||||||
|
|
||||||
type haproxy struct {
|
type HAProxy struct {
|
||||||
Servers []string
|
Servers []string `toml:"servers"`
|
||||||
KeepFieldNames bool
|
KeepFieldNames bool `toml:"keep_field_names"`
|
||||||
Username string
|
Username string `toml:"username"`
|
||||||
Password string
|
Password string `toml:"password"`
|
||||||
tls.ClientConfig
|
tls.ClientConfig
|
||||||
|
|
||||||
client *http.Client
|
client *http.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*haproxy) SampleConfig() string {
|
func (*HAProxy) SampleConfig() string {
|
||||||
return sampleConfig
|
return sampleConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reads stats from all configured servers accumulates stats.
|
func (h *HAProxy) Gather(acc telegraf.Accumulator) error {
|
||||||
// Returns one of the errors encountered while gather stats (if any).
|
|
||||||
func (h *haproxy) Gather(acc telegraf.Accumulator) error {
|
|
||||||
if len(h.Servers) == 0 {
|
if len(h.Servers) == 0 {
|
||||||
return h.gatherServer("http://127.0.0.1:1936/haproxy?stats", acc)
|
return h.gatherServer("http://127.0.0.1:1936/haproxy?stats", acc)
|
||||||
}
|
}
|
||||||
|
|
@ -85,7 +101,7 @@ func (h *haproxy) Gather(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {
|
func (h *HAProxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {
|
||||||
var network, address string
|
var network, address string
|
||||||
if strings.HasPrefix(addr, "tcp://") {
|
if strings.HasPrefix(addr, "tcp://") {
|
||||||
network = "tcp"
|
network = "tcp"
|
||||||
|
|
@ -108,7 +124,7 @@ func (h *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) erro
|
||||||
return h.importCsvResult(c, acc, address)
|
return h.importCsvResult(c, acc, address)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *haproxy) gatherServer(addr string, acc telegraf.Accumulator) error {
|
func (h *HAProxy) gatherServer(addr string, acc telegraf.Accumulator) error {
|
||||||
if !strings.HasPrefix(addr, "http") {
|
if !strings.HasPrefix(addr, "http") {
|
||||||
return h.gatherServerSocket(addr, acc)
|
return h.gatherServerSocket(addr, acc)
|
||||||
}
|
}
|
||||||
|
|
@ -179,23 +195,7 @@ func getSocketAddr(sock string) string {
|
||||||
return socketAddr[0]
|
return socketAddr[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
var typeNames = []string{"frontend", "backend", "server", "listener"}
|
func (h *HAProxy) importCsvResult(r io.Reader, acc telegraf.Accumulator, host string) error {
|
||||||
var fieldRenames = map[string]string{
|
|
||||||
"pxname": "proxy",
|
|
||||||
"svname": "sv",
|
|
||||||
"act": "active_servers",
|
|
||||||
"bck": "backup_servers",
|
|
||||||
"cli_abrt": "cli_abort",
|
|
||||||
"srv_abrt": "srv_abort",
|
|
||||||
"hrsp_1xx": "http_response.1xx",
|
|
||||||
"hrsp_2xx": "http_response.2xx",
|
|
||||||
"hrsp_3xx": "http_response.3xx",
|
|
||||||
"hrsp_4xx": "http_response.4xx",
|
|
||||||
"hrsp_5xx": "http_response.5xx",
|
|
||||||
"hrsp_other": "http_response.other",
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *haproxy) importCsvResult(r io.Reader, acc telegraf.Accumulator, host string) error {
|
|
||||||
csvr := csv.NewReader(r)
|
csvr := csv.NewReader(r)
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
|
@ -278,6 +278,6 @@ func (h *haproxy) importCsvResult(r io.Reader, acc telegraf.Accumulator, host st
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("haproxy", func() telegraf.Input {
|
inputs.Add("haproxy", func() telegraf.Input {
|
||||||
return &haproxy{}
|
return &HAProxy{}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -66,7 +66,7 @@ func TestHaproxyGeneratesMetricsWithAuthentication(t *testing.T) {
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
// Now we tested again above server, with our authentication data
|
// Now we tested again above server, with our authentication data
|
||||||
r := &haproxy{
|
r := &HAProxy{
|
||||||
Servers: []string{strings.Replace(ts.URL, "http://", "http://user:password@", 1)},
|
Servers: []string{strings.Replace(ts.URL, "http://", "http://user:password@", 1)},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -82,11 +82,11 @@ func TestHaproxyGeneratesMetricsWithAuthentication(t *testing.T) {
|
||||||
"type": "server",
|
"type": "server",
|
||||||
}
|
}
|
||||||
|
|
||||||
fields := HaproxyGetFieldValues()
|
fields := haproxyGetFieldValues()
|
||||||
acc.AssertContainsTaggedFields(t, "haproxy", fields, tags)
|
acc.AssertContainsTaggedFields(t, "haproxy", fields, tags)
|
||||||
|
|
||||||
// Here, we should get error because we don't pass authentication data
|
// Here, we should get error because we don't pass authentication data
|
||||||
r = &haproxy{
|
r = &HAProxy{
|
||||||
Servers: []string{ts.URL},
|
Servers: []string{ts.URL},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -101,7 +101,7 @@ func TestHaproxyGeneratesMetricsWithoutAuthentication(t *testing.T) {
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
r := &haproxy{
|
r := &HAProxy{
|
||||||
Servers: []string{ts.URL},
|
Servers: []string{ts.URL},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -116,7 +116,7 @@ func TestHaproxyGeneratesMetricsWithoutAuthentication(t *testing.T) {
|
||||||
"type": "server",
|
"type": "server",
|
||||||
}
|
}
|
||||||
|
|
||||||
fields := HaproxyGetFieldValues()
|
fields := haproxyGetFieldValues()
|
||||||
acc.AssertContainsTaggedFields(t, "haproxy", fields, tags)
|
acc.AssertContainsTaggedFields(t, "haproxy", fields, tags)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -143,7 +143,7 @@ func TestHaproxyGeneratesMetricsUsingSocket(t *testing.T) {
|
||||||
go s.serverSocket(sock)
|
go s.serverSocket(sock)
|
||||||
}
|
}
|
||||||
|
|
||||||
r := &haproxy{
|
r := &HAProxy{
|
||||||
Servers: []string{_globmask},
|
Servers: []string{_globmask},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -152,7 +152,7 @@ func TestHaproxyGeneratesMetricsUsingSocket(t *testing.T) {
|
||||||
err := r.Gather(&acc)
|
err := r.Gather(&acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
fields := HaproxyGetFieldValues()
|
fields := haproxyGetFieldValues()
|
||||||
|
|
||||||
for _, sock := range sockets {
|
for _, sock := range sockets {
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
|
|
@ -182,14 +182,14 @@ func TestHaproxyGeneratesMetricsUsingTcp(t *testing.T) {
|
||||||
s := statServer{}
|
s := statServer{}
|
||||||
go s.serverSocket(l)
|
go s.serverSocket(l)
|
||||||
|
|
||||||
r := &haproxy{
|
r := &HAProxy{
|
||||||
Servers: []string{"tcp://" + l.Addr().String()},
|
Servers: []string{"tcp://" + l.Addr().String()},
|
||||||
}
|
}
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
require.NoError(t, r.Gather(&acc))
|
require.NoError(t, r.Gather(&acc))
|
||||||
|
|
||||||
fields := HaproxyGetFieldValues()
|
fields := haproxyGetFieldValues()
|
||||||
|
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"server": l.Addr().String(),
|
"server": l.Addr().String(),
|
||||||
|
|
@ -206,7 +206,7 @@ func TestHaproxyGeneratesMetricsUsingTcp(t *testing.T) {
|
||||||
// When not passing server config, we default to localhost
|
// When not passing server config, we default to localhost
|
||||||
// We just want to make sure we did request stat from localhost
|
// We just want to make sure we did request stat from localhost
|
||||||
func TestHaproxyDefaultGetFromLocalhost(t *testing.T) {
|
func TestHaproxyDefaultGetFromLocalhost(t *testing.T) {
|
||||||
r := &haproxy{}
|
r := &HAProxy{}
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
|
@ -222,7 +222,7 @@ func TestHaproxyKeepFieldNames(t *testing.T) {
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
r := &haproxy{
|
r := &HAProxy{
|
||||||
Servers: []string{ts.URL},
|
Servers: []string{ts.URL},
|
||||||
KeepFieldNames: true,
|
KeepFieldNames: true,
|
||||||
}
|
}
|
||||||
|
|
@ -238,7 +238,7 @@ func TestHaproxyKeepFieldNames(t *testing.T) {
|
||||||
"type": "server",
|
"type": "server",
|
||||||
}
|
}
|
||||||
|
|
||||||
fields := HaproxyGetFieldValues()
|
fields := haproxyGetFieldValues()
|
||||||
fields["act"] = fields["active_servers"]
|
fields["act"] = fields["active_servers"]
|
||||||
delete(fields, "active_servers")
|
delete(fields, "active_servers")
|
||||||
fields["bck"] = fields["backup_servers"]
|
fields["bck"] = fields["backup_servers"]
|
||||||
|
|
@ -273,7 +273,7 @@ func mustReadSampleOutput() []byte {
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
|
|
||||||
func HaproxyGetFieldValues() map[string]interface{} {
|
func haproxyGetFieldValues() map[string]interface{} {
|
||||||
fields := map[string]interface{}{
|
fields := map[string]interface{}{
|
||||||
"active_servers": uint64(1),
|
"active_servers": uint64(1),
|
||||||
"backup_servers": uint64(0),
|
"backup_servers": uint64(0),
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Disk contains disk data gathered from hddtemp
|
||||||
type Disk struct {
|
type Disk struct {
|
||||||
DeviceName string
|
DeviceName string
|
||||||
Model string
|
Model string
|
||||||
|
|
@ -16,13 +17,14 @@ type Disk struct {
|
||||||
Status string
|
Status string
|
||||||
}
|
}
|
||||||
|
|
||||||
type hddtemp struct {
|
type hddtemp struct{}
|
||||||
}
|
|
||||||
|
|
||||||
|
// New creates hddtemp
|
||||||
func New() *hddtemp {
|
func New() *hddtemp {
|
||||||
return &hddtemp{}
|
return &hddtemp{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fetch gathers disks data from hddtemp daemon.
|
||||||
func (h *hddtemp) Fetch(address string) ([]Disk, error) {
|
func (h *hddtemp) Fetch(address string) ([]Disk, error) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
|
|
|
||||||
|
|
@ -16,12 +16,13 @@ var sampleConfig string
|
||||||
const defaultAddress = "127.0.0.1:7634"
|
const defaultAddress = "127.0.0.1:7634"
|
||||||
|
|
||||||
type HDDTemp struct {
|
type HDDTemp struct {
|
||||||
Address string
|
Address string `toml:"address"`
|
||||||
Devices []string
|
Devices []string `toml:"devices"`
|
||||||
fetcher Fetcher
|
|
||||||
|
fetcher fetcher
|
||||||
}
|
}
|
||||||
|
|
||||||
type Fetcher interface {
|
type fetcher interface {
|
||||||
Fetch(address string) ([]gohddtemp.Disk, error)
|
Fetch(address string) ([]gohddtemp.Disk, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ func (h *mockFetcher) Fetch(_ string) ([]hddtemp.Disk, error) {
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMockFetcher() *mockFetcher {
|
func newMockFetcher() *mockFetcher {
|
||||||
return &mockFetcher{}
|
return &mockFetcher{}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -82,12 +82,14 @@ func (h *HTTP) Init() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *HTTP) SetParserFunc(fn telegraf.ParserFunc) {
|
||||||
|
h.parserFunc = fn
|
||||||
|
}
|
||||||
|
|
||||||
func (h *HTTP) Start(_ telegraf.Accumulator) error {
|
func (h *HTTP) Start(_ telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gather takes in an accumulator and adds the metrics that the Input
|
|
||||||
// gathers. This is called every "interval"
|
|
||||||
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
|
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, u := range h.URLs {
|
for _, u := range h.URLs {
|
||||||
|
|
@ -111,11 +113,6 @@ func (h *HTTP) Stop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetParserFunc takes the data_format from the config and finds the right parser for that format
|
|
||||||
func (h *HTTP) SetParserFunc(fn telegraf.ParserFunc) {
|
|
||||||
h.parserFunc = fn
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gathers data from a particular URL
|
// Gathers data from a particular URL
|
||||||
// Parameters:
|
// Parameters:
|
||||||
//
|
//
|
||||||
|
|
|
||||||
|
|
@ -317,7 +317,7 @@ func TestBodyAndContentEncoding(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
|
type testHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
|
||||||
|
|
||||||
func TestOAuthClientCredentialsGrant(t *testing.T) {
|
func TestOAuthClientCredentialsGrant(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.NotFoundHandler())
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
|
@ -331,8 +331,8 @@ func TestOAuthClientCredentialsGrant(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
plugin *httpplugin.HTTP
|
plugin *httpplugin.HTTP
|
||||||
tokenHandler TestHandlerFunc
|
tokenHandler testHandlerFunc
|
||||||
handler TestHandlerFunc
|
handler testHandlerFunc
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "no credentials",
|
name: "no credentials",
|
||||||
|
|
|
||||||
|
|
@ -36,21 +36,16 @@ var sampleConfig string
|
||||||
|
|
||||||
var once sync.Once
|
var once sync.Once
|
||||||
|
|
||||||
// defaultMaxBodySize is the default maximum request body size, in bytes.
|
|
||||||
// if the request body is over this size, we will return an HTTP 413 error.
|
|
||||||
// 500 MB
|
|
||||||
const defaultMaxBodySize = 500 * 1024 * 1024
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
body = "body"
|
// defaultMaxBodySize is the default maximum request body size, in bytes.
|
||||||
query = "query"
|
// if the request body is over this size, we will return an HTTP 413 error.
|
||||||
pathTag = "http_listener_v2_path"
|
// 500 MB
|
||||||
|
defaultMaxBodySize = 500 * 1024 * 1024
|
||||||
|
body = "body"
|
||||||
|
query = "query"
|
||||||
|
pathTag = "http_listener_v2_path"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TimeFunc provides a timestamp for the metrics
|
|
||||||
type TimeFunc func() time.Time
|
|
||||||
|
|
||||||
// HTTPListenerV2 is an input plugin that collects external metrics sent via HTTP
|
|
||||||
type HTTPListenerV2 struct {
|
type HTTPListenerV2 struct {
|
||||||
ServiceAddress string `toml:"service_address"`
|
ServiceAddress string `toml:"service_address"`
|
||||||
SocketMode string `toml:"socket_mode"`
|
SocketMode string `toml:"socket_mode"`
|
||||||
|
|
@ -72,7 +67,7 @@ type HTTPListenerV2 struct {
|
||||||
common_tls.ServerConfig
|
common_tls.ServerConfig
|
||||||
tlsConf *tls.Config
|
tlsConf *tls.Config
|
||||||
|
|
||||||
TimeFunc
|
timeFunc
|
||||||
Log telegraf.Logger
|
Log telegraf.Logger
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
@ -85,11 +80,36 @@ type HTTPListenerV2 struct {
|
||||||
acc telegraf.Accumulator
|
acc telegraf.Accumulator
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// timeFunc provides a timestamp for the metrics
|
||||||
|
type timeFunc func() time.Time
|
||||||
|
|
||||||
func (*HTTPListenerV2) SampleConfig() string {
|
func (*HTTPListenerV2) SampleConfig() string {
|
||||||
return sampleConfig
|
return sampleConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HTTPListenerV2) Gather(_ telegraf.Accumulator) error {
|
func (h *HTTPListenerV2) Init() error {
|
||||||
|
tlsConf, err := h.ServerConfig.TLSConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
protoRegex := regexp.MustCompile(`\w://`)
|
||||||
|
if !protoRegex.MatchString(h.ServiceAddress) {
|
||||||
|
h.ServiceAddress = "tcp://" + h.ServiceAddress
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(h.ServiceAddress)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parsing address failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.url = u
|
||||||
|
h.tlsConf = tlsConf
|
||||||
|
|
||||||
|
if h.SuccessCode == 0 {
|
||||||
|
h.SuccessCode = http.StatusNoContent
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -97,7 +117,6 @@ func (h *HTTPListenerV2) SetParser(parser telegraf.Parser) {
|
||||||
h.Parser = parser
|
h.Parser = parser
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the http listener service.
|
|
||||||
func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
|
func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
|
||||||
u := h.url
|
u := h.url
|
||||||
address := u.Host
|
address := u.Host
|
||||||
|
|
@ -178,17 +197,10 @@ func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HTTPListenerV2) createHTTPServer() *http.Server {
|
func (h *HTTPListenerV2) Gather(_ telegraf.Accumulator) error {
|
||||||
return &http.Server{
|
return nil
|
||||||
Addr: h.ServiceAddress,
|
|
||||||
Handler: h,
|
|
||||||
ReadTimeout: time.Duration(h.ReadTimeout),
|
|
||||||
WriteTimeout: time.Duration(h.WriteTimeout),
|
|
||||||
TLSConfig: h.tlsConf,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop cleans up all resources
|
|
||||||
func (h *HTTPListenerV2) Stop() {
|
func (h *HTTPListenerV2) Stop() {
|
||||||
if h.listener != nil {
|
if h.listener != nil {
|
||||||
h.listener.Close()
|
h.listener.Close()
|
||||||
|
|
@ -196,32 +208,7 @@ func (h *HTTPListenerV2) Stop() {
|
||||||
h.wg.Wait()
|
h.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HTTPListenerV2) Init() error {
|
// ServeHTTP implements [http.Handler]
|
||||||
tlsConf, err := h.ServerConfig.TLSConfig()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
protoRegex := regexp.MustCompile(`\w://`)
|
|
||||||
if !protoRegex.MatchString(h.ServiceAddress) {
|
|
||||||
h.ServiceAddress = "tcp://" + h.ServiceAddress
|
|
||||||
}
|
|
||||||
|
|
||||||
u, err := url.Parse(h.ServiceAddress)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("parsing address failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
h.url = u
|
|
||||||
h.tlsConf = tlsConf
|
|
||||||
|
|
||||||
if h.SuccessCode == 0 {
|
|
||||||
h.SuccessCode = http.StatusNoContent
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
||||||
handler := h.serveWrite
|
handler := h.serveWrite
|
||||||
|
|
||||||
|
|
@ -236,6 +223,16 @@ func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
||||||
h.authenticateIfSet(handler, res, req)
|
h.authenticateIfSet(handler, res, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *HTTPListenerV2) createHTTPServer() *http.Server {
|
||||||
|
return &http.Server{
|
||||||
|
Addr: h.ServiceAddress,
|
||||||
|
Handler: h,
|
||||||
|
ReadTimeout: time.Duration(h.ReadTimeout),
|
||||||
|
WriteTimeout: time.Duration(h.WriteTimeout),
|
||||||
|
TLSConfig: h.tlsConf,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) {
|
func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request) {
|
||||||
select {
|
select {
|
||||||
case <-h.close:
|
case <-h.close:
|
||||||
|
|
@ -426,7 +423,7 @@ func init() {
|
||||||
inputs.Add("http_listener_v2", func() telegraf.Input {
|
inputs.Add("http_listener_v2", func() telegraf.Input {
|
||||||
return &HTTPListenerV2{
|
return &HTTPListenerV2{
|
||||||
ServiceAddress: ":8080",
|
ServiceAddress: ":8080",
|
||||||
TimeFunc: time.Now,
|
timeFunc: time.Now,
|
||||||
Paths: []string{"/telegraf"},
|
Paths: []string{"/telegraf"},
|
||||||
Methods: []string{"POST", "PUT"},
|
Methods: []string{"POST", "PUT"},
|
||||||
DataSource: body,
|
DataSource: body,
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ func newTestHTTPListenerV2() (*HTTPListenerV2, error) {
|
||||||
Path: "/write",
|
Path: "/write",
|
||||||
Methods: []string{"POST"},
|
Methods: []string{"POST"},
|
||||||
Parser: parser,
|
Parser: parser,
|
||||||
TimeFunc: time.Now,
|
timeFunc: time.Now,
|
||||||
MaxBodySize: config.Size(70000),
|
MaxBodySize: config.Size(70000),
|
||||||
DataSource: "body",
|
DataSource: "body",
|
||||||
close: make(chan struct{}),
|
close: make(chan struct{}),
|
||||||
|
|
@ -92,7 +92,7 @@ func newTestHTTPSListenerV2() (*HTTPListenerV2, error) {
|
||||||
Methods: []string{"POST"},
|
Methods: []string{"POST"},
|
||||||
Parser: parser,
|
Parser: parser,
|
||||||
ServerConfig: *pki.TLSServerConfig(),
|
ServerConfig: *pki.TLSServerConfig(),
|
||||||
TimeFunc: time.Now,
|
timeFunc: time.Now,
|
||||||
close: make(chan struct{}),
|
close: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -135,7 +135,7 @@ func TestInvalidListenerConfig(t *testing.T) {
|
||||||
Path: "/write",
|
Path: "/write",
|
||||||
Methods: []string{"POST"},
|
Methods: []string{"POST"},
|
||||||
Parser: parser,
|
Parser: parser,
|
||||||
TimeFunc: time.Now,
|
timeFunc: time.Now,
|
||||||
MaxBodySize: config.Size(70000),
|
MaxBodySize: config.Size(70000),
|
||||||
DataSource: "body",
|
DataSource: "body",
|
||||||
close: make(chan struct{}),
|
close: make(chan struct{}),
|
||||||
|
|
@ -373,7 +373,7 @@ func TestWriteHTTPExactMaxBodySize(t *testing.T) {
|
||||||
Methods: []string{"POST"},
|
Methods: []string{"POST"},
|
||||||
Parser: parser,
|
Parser: parser,
|
||||||
MaxBodySize: config.Size(len(hugeMetric)),
|
MaxBodySize: config.Size(len(hugeMetric)),
|
||||||
TimeFunc: time.Now,
|
timeFunc: time.Now,
|
||||||
close: make(chan struct{}),
|
close: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -399,7 +399,7 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
|
||||||
Methods: []string{"POST"},
|
Methods: []string{"POST"},
|
||||||
Parser: parser,
|
Parser: parser,
|
||||||
MaxBodySize: config.Size(4096),
|
MaxBodySize: config.Size(4096),
|
||||||
TimeFunc: time.Now,
|
timeFunc: time.Now,
|
||||||
close: make(chan struct{}),
|
close: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,6 @@ const (
|
||||||
defaultResponseBodyMaxSize = 32 * 1024 * 1024
|
defaultResponseBodyMaxSize = 32 * 1024 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
// HTTPResponse struct
|
|
||||||
type HTTPResponse struct {
|
type HTTPResponse struct {
|
||||||
Address string `toml:"address" deprecated:"1.12.0;1.35.0;use 'urls' instead"`
|
Address string `toml:"address" deprecated:"1.12.0;1.35.0;use 'urls' instead"`
|
||||||
URLs []string `toml:"urls"`
|
URLs []string `toml:"urls"`
|
||||||
|
|
@ -73,9 +72,83 @@ type client struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpClient interface {
|
type httpClient interface {
|
||||||
|
// Do implements [http.Client]
|
||||||
Do(req *http.Request) (*http.Response, error)
|
Do(req *http.Request) (*http.Response, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (*HTTPResponse) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HTTPResponse) Init() error {
|
||||||
|
// Compile the body regex if it exists
|
||||||
|
if h.ResponseStringMatch != "" {
|
||||||
|
var err error
|
||||||
|
h.compiledStringMatch, err = regexp.Compile(h.ResponseStringMatch)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to compile regular expression %q: %w", h.ResponseStringMatch, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set default values
|
||||||
|
if h.ResponseTimeout < config.Duration(time.Second) {
|
||||||
|
h.ResponseTimeout = config.Duration(time.Second * 5)
|
||||||
|
}
|
||||||
|
if h.Method == "" {
|
||||||
|
h.Method = "GET"
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(h.URLs) == 0 {
|
||||||
|
if h.Address == "" {
|
||||||
|
h.URLs = []string{"http://localhost"}
|
||||||
|
} else {
|
||||||
|
h.URLs = []string{h.Address}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
h.clients = make([]client, 0, len(h.URLs))
|
||||||
|
for _, u := range h.URLs {
|
||||||
|
addr, err := url.Parse(u)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("%q is not a valid address: %w", u, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if addr.Scheme != "http" && addr.Scheme != "https" {
|
||||||
|
return fmt.Errorf("%q is not a valid address: only http and https types are supported", u)
|
||||||
|
}
|
||||||
|
|
||||||
|
cl, err := h.createHTTPClient(*addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
h.clients = append(h.clients, client{httpClient: cl, address: u})
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gather gets all metric fields and tags and returns any errors it encounters
|
||||||
|
func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
|
||||||
|
for _, c := range h.clients {
|
||||||
|
// Prepare data
|
||||||
|
var fields map[string]interface{}
|
||||||
|
var tags map[string]string
|
||||||
|
|
||||||
|
// Gather data
|
||||||
|
fields, tags, err := h.httpGather(c)
|
||||||
|
if err != nil {
|
||||||
|
acc.AddError(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add metrics
|
||||||
|
acc.AddFields("http_response", fields, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Set the proxy. A configured proxy overwrites the system-wide proxy.
|
// Set the proxy. A configured proxy overwrites the system-wide proxy.
|
||||||
func getProxyFunc(httpProxy string) func(*http.Request) (*url.URL, error) {
|
func getProxyFunc(httpProxy string) func(*http.Request) (*url.URL, error) {
|
||||||
if httpProxy == "" {
|
if httpProxy == "" {
|
||||||
|
|
@ -384,79 +457,6 @@ func (h *HTTPResponse) setBodyReadError(errorMsg string, bodyBytes []byte, field
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*HTTPResponse) SampleConfig() string {
|
|
||||||
return sampleConfig
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *HTTPResponse) Init() error {
|
|
||||||
// Compile the body regex if it exists
|
|
||||||
if h.ResponseStringMatch != "" {
|
|
||||||
var err error
|
|
||||||
h.compiledStringMatch, err = regexp.Compile(h.ResponseStringMatch)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to compile regular expression %q: %w", h.ResponseStringMatch, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set default values
|
|
||||||
if h.ResponseTimeout < config.Duration(time.Second) {
|
|
||||||
h.ResponseTimeout = config.Duration(time.Second * 5)
|
|
||||||
}
|
|
||||||
if h.Method == "" {
|
|
||||||
h.Method = "GET"
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(h.URLs) == 0 {
|
|
||||||
if h.Address == "" {
|
|
||||||
h.URLs = []string{"http://localhost"}
|
|
||||||
} else {
|
|
||||||
h.URLs = []string{h.Address}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
h.clients = make([]client, 0, len(h.URLs))
|
|
||||||
for _, u := range h.URLs {
|
|
||||||
addr, err := url.Parse(u)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("%q is not a valid address: %w", u, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if addr.Scheme != "http" && addr.Scheme != "https" {
|
|
||||||
return fmt.Errorf("%q is not a valid address: only http and https types are supported", u)
|
|
||||||
}
|
|
||||||
|
|
||||||
cl, err := h.createHTTPClient(*addr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
h.clients = append(h.clients, client{httpClient: cl, address: u})
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Gather gets all metric fields and tags and returns any errors it encounters
|
|
||||||
func (h *HTTPResponse) Gather(acc telegraf.Accumulator) error {
|
|
||||||
for _, c := range h.clients {
|
|
||||||
// Prepare data
|
|
||||||
var fields map[string]interface{}
|
|
||||||
var tags map[string]string
|
|
||||||
|
|
||||||
// Gather data
|
|
||||||
fields, tags, err := h.httpGather(c)
|
|
||||||
if err != nil {
|
|
||||||
acc.AddError(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add metrics
|
|
||||||
acc.AddFields("http_response", fields, tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *HTTPResponse) setRequestAuth(request *http.Request) error {
|
func (h *HTTPResponse) setRequestAuth(request *http.Request) error {
|
||||||
if h.Username.Empty() || h.Password.Empty() {
|
if h.Username.Empty() || h.Password.Empty() {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -20,19 +20,6 @@ import (
|
||||||
//go:embed sample.conf
|
//go:embed sample.conf
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
const (
|
|
||||||
// path to root huge page control directory
|
|
||||||
rootHugepagePath = "/sys/kernel/mm/hugepages"
|
|
||||||
// path where per NUMA node statistics are kept
|
|
||||||
numaNodePath = "/sys/devices/system/node"
|
|
||||||
// path to the meminfo file
|
|
||||||
meminfoPath = "/proc/meminfo"
|
|
||||||
|
|
||||||
rootHugepages = "root"
|
|
||||||
perNodeHugepages = "per_node"
|
|
||||||
meminfoHugepages = "meminfo"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
newlineByte = []byte("\n")
|
newlineByte = []byte("\n")
|
||||||
colonByte = []byte(":")
|
colonByte = []byte(":")
|
||||||
|
|
@ -65,6 +52,19 @@ var (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// path to root huge page control directory
|
||||||
|
rootHugepagePath = "/sys/kernel/mm/hugepages"
|
||||||
|
// path where per NUMA node statistics are kept
|
||||||
|
numaNodePath = "/sys/devices/system/node"
|
||||||
|
// path to the meminfo file
|
||||||
|
meminfoPath = "/proc/meminfo"
|
||||||
|
|
||||||
|
rootHugepages = "root"
|
||||||
|
perNodeHugepages = "per_node"
|
||||||
|
meminfoHugepages = "meminfo"
|
||||||
|
)
|
||||||
|
|
||||||
type Hugepages struct {
|
type Hugepages struct {
|
||||||
Types []string `toml:"types"`
|
Types []string `toml:"types"`
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,11 +17,15 @@ type Hugepages struct {
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (*Hugepages) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Hugepages) Init() error {
|
func (h *Hugepages) Init() error {
|
||||||
h.Log.Warn("current platform is not supported")
|
h.Log.Warn("current platform is not supported")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func (*Hugepages) SampleConfig() string { return sampleConfig }
|
|
||||||
func (*Hugepages) Gather(_ telegraf.Accumulator) error { return nil }
|
func (*Hugepages) Gather(_ telegraf.Accumulator) error { return nil }
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue