feat(http_listener_v2): allows multiple paths and add path_tag (#9529)
This commit is contained in:
parent
a48e11d0d1
commit
348c18db75
|
|
@ -19,7 +19,14 @@ This is a sample configuration for the plugin.
|
||||||
service_address = ":8080"
|
service_address = ":8080"
|
||||||
|
|
||||||
## Path to listen to.
|
## Path to listen to.
|
||||||
# path = "/telegraf"
|
## This option is deprecated and only available for backward-compatibility. Please use paths instead.
|
||||||
|
# path = ""
|
||||||
|
|
||||||
|
## Paths to listen to.
|
||||||
|
# paths = ["/telegraf"]
|
||||||
|
|
||||||
|
## Save path as http_listener_v2_path tag if set to true
|
||||||
|
# path_tag = false
|
||||||
|
|
||||||
## HTTP methods to accept.
|
## HTTP methods to accept.
|
||||||
# methods = ["POST", "PUT"]
|
# methods = ["POST", "PUT"]
|
||||||
|
|
@ -59,7 +66,7 @@ This is a sample configuration for the plugin.
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Each data format has its own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
data_format = "json"
|
data_format = "influx"
|
||||||
```
|
```
|
||||||
|
|
||||||
### Metrics:
|
### Metrics:
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal/choice"
|
||||||
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
|
|
@ -26,8 +27,9 @@ import (
|
||||||
const defaultMaxBodySize = 500 * 1024 * 1024
|
const defaultMaxBodySize = 500 * 1024 * 1024
|
||||||
|
|
||||||
const (
|
const (
|
||||||
body = "body"
|
body = "body"
|
||||||
query = "query"
|
query = "query"
|
||||||
|
pathTag = "http_listener_v2_path"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TimeFunc provides a timestamp for the metrics
|
// TimeFunc provides a timestamp for the metrics
|
||||||
|
|
@ -37,6 +39,8 @@ type TimeFunc func() time.Time
|
||||||
type HTTPListenerV2 struct {
|
type HTTPListenerV2 struct {
|
||||||
ServiceAddress string `toml:"service_address"`
|
ServiceAddress string `toml:"service_address"`
|
||||||
Path string `toml:"path"`
|
Path string `toml:"path"`
|
||||||
|
Paths []string `toml:"paths"`
|
||||||
|
PathTag bool `toml:"path_tag"`
|
||||||
Methods []string `toml:"methods"`
|
Methods []string `toml:"methods"`
|
||||||
DataSource string `toml:"data_source"`
|
DataSource string `toml:"data_source"`
|
||||||
ReadTimeout config.Duration `toml:"read_timeout"`
|
ReadTimeout config.Duration `toml:"read_timeout"`
|
||||||
|
|
@ -64,7 +68,14 @@ const sampleConfig = `
|
||||||
service_address = ":8080"
|
service_address = ":8080"
|
||||||
|
|
||||||
## Path to listen to.
|
## Path to listen to.
|
||||||
# path = "/telegraf"
|
## This option is deprecated and only available for backward-compatibility. Please use paths instead.
|
||||||
|
# path = ""
|
||||||
|
|
||||||
|
## Paths to listen to.
|
||||||
|
# paths = ["/telegraf"]
|
||||||
|
|
||||||
|
## Save path as http_listener_v2_path tag if set to true
|
||||||
|
# path_tag = false
|
||||||
|
|
||||||
## HTTP methods to accept.
|
## HTTP methods to accept.
|
||||||
# methods = ["POST", "PUT"]
|
# methods = ["POST", "PUT"]
|
||||||
|
|
@ -75,7 +86,7 @@ const sampleConfig = `
|
||||||
# write_timeout = "10s"
|
# write_timeout = "10s"
|
||||||
|
|
||||||
## Maximum allowed http request body size in bytes.
|
## Maximum allowed http request body size in bytes.
|
||||||
## 0 means to use the default of 524,288,00 bytes (500 mebibytes)
|
## 0 means to use the default of 524,288,000 bytes (500 mebibytes)
|
||||||
# max_body_size = "500MB"
|
# max_body_size = "500MB"
|
||||||
|
|
||||||
## Part of the request to consume. Available options are "body" and
|
## Part of the request to consume. Available options are "body" and
|
||||||
|
|
@ -136,6 +147,11 @@ func (h *HTTPListenerV2) Start(acc telegraf.Accumulator) error {
|
||||||
h.WriteTimeout = config.Duration(time.Second * 10)
|
h.WriteTimeout = config.Duration(time.Second * 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Append h.Path to h.Paths
|
||||||
|
if h.Path != "" && !choice.Contains(h.Path, h.Paths) {
|
||||||
|
h.Paths = append(h.Paths, h.Path)
|
||||||
|
}
|
||||||
|
|
||||||
h.acc = acc
|
h.acc = acc
|
||||||
|
|
||||||
tlsConf, err := h.ServerConfig.TLSConfig()
|
tlsConf, err := h.ServerConfig.TLSConfig()
|
||||||
|
|
@ -189,7 +205,7 @@ func (h *HTTPListenerV2) Stop() {
|
||||||
func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
func (h *HTTPListenerV2) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
||||||
handler := h.serveWrite
|
handler := h.serveWrite
|
||||||
|
|
||||||
if req.URL.Path != h.Path {
|
if !choice.Contains(req.URL.Path, h.Paths) {
|
||||||
handler = http.NotFound
|
handler = http.NotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -251,6 +267,10 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if h.PathTag {
|
||||||
|
m.AddTag(pathTag, req.URL.Path)
|
||||||
|
}
|
||||||
|
|
||||||
h.acc.AddMetric(m)
|
h.acc.AddMetric(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -370,7 +390,7 @@ func init() {
|
||||||
return &HTTPListenerV2{
|
return &HTTPListenerV2{
|
||||||
ServiceAddress: ":8080",
|
ServiceAddress: ":8080",
|
||||||
TimeFunc: time.Now,
|
TimeFunc: time.Now,
|
||||||
Path: "/telegraf",
|
Paths: []string{"/telegraf"},
|
||||||
Methods: []string{"POST", "PUT"},
|
Methods: []string{"POST", "PUT"},
|
||||||
DataSource: body,
|
DataSource: body,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -230,6 +230,62 @@ func TestWriteHTTP(t *testing.T) {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// http listener should add request path as configured path_tag
|
||||||
|
func TestWriteHTTPWithPathTag(t *testing.T) {
|
||||||
|
listener := newTestHTTPListenerV2()
|
||||||
|
listener.PathTag = true
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
require.NoError(t, listener.Start(acc))
|
||||||
|
defer listener.Stop()
|
||||||
|
|
||||||
|
// post single message to listener
|
||||||
|
resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline)))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, resp.Body.Close())
|
||||||
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
|
map[string]interface{}{"value": float64(12)},
|
||||||
|
map[string]string{"host": "server01", "http_listener_v2_path": "/write"},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// http listener should add request path as configured path_tag (trimming it before)
|
||||||
|
func TestWriteHTTPWithMultiplePaths(t *testing.T) {
|
||||||
|
listener := newTestHTTPListenerV2()
|
||||||
|
listener.Paths = []string{"/alternative_write"}
|
||||||
|
listener.PathTag = true
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
require.NoError(t, listener.Start(acc))
|
||||||
|
defer listener.Stop()
|
||||||
|
|
||||||
|
// post single message to /write
|
||||||
|
resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline)))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, resp.Body.Close())
|
||||||
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
|
// post single message to /alternative_write
|
||||||
|
resp, err = http.Post(createURL(listener, "http", "/alternative_write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline)))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, resp.Body.Close())
|
||||||
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
|
acc.Wait(1)
|
||||||
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
|
map[string]interface{}{"value": float64(12)},
|
||||||
|
map[string]string{"host": "server01", "http_listener_v2_path": "/write"},
|
||||||
|
)
|
||||||
|
|
||||||
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
|
map[string]interface{}{"value": float64(12)},
|
||||||
|
map[string]string{"host": "server01", "http_listener_v2_path": "/alternative_write"},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// http listener should add a newline at the end of the buffer if it's not there
|
// http listener should add a newline at the end of the buffer if it's not there
|
||||||
func TestWriteHTTPNoNewline(t *testing.T) {
|
func TestWriteHTTPNoNewline(t *testing.T) {
|
||||||
listener := newTestHTTPListenerV2()
|
listener := newTestHTTPListenerV2()
|
||||||
|
|
|
||||||
|
|
@ -9,8 +9,8 @@ Converts prometheus remote write samples directly into Telegraf metrics. It can
|
||||||
## Address and port to host HTTP listener on
|
## Address and port to host HTTP listener on
|
||||||
service_address = ":1234"
|
service_address = ":1234"
|
||||||
|
|
||||||
## Path to listen to.
|
## Paths to listen to.
|
||||||
path = "/receive"
|
paths = ["/receive"]
|
||||||
|
|
||||||
## Data format to consume.
|
## Data format to consume.
|
||||||
data_format = "prometheusremotewrite"
|
data_format = "prometheusremotewrite"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue