feat: Add use_batch_format for HTTP output plugin (#8184)

This commit is contained in:
Heiko Schlittermann 2021-10-29 16:05:28 +02:00 committed by GitHub
parent 7d6672c53a
commit 8552c1187a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 11 deletions

View File

@ -1,7 +1,8 @@
# HTTP Output Plugin
This plugin sends metrics in a HTTP message encoded using one of the output
data formats. For data_formats that support batching, metrics are sent in batch format.
data formats. For data_formats that support batching, metrics are sent in
batch format by default.
### Configuration:
@ -49,6 +50,11 @@ data formats. For data_formats that support batching, metrics are sent in batch
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
# data_format = "influx"
## Use batch serialization format (default) instead of line based format.
## Batch format is more efficient and should be used unless line based
## format is really needed.
# use_batch_format = true
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"

View File

@ -8,7 +8,6 @@ import (
"io"
"net/http"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
@ -64,6 +63,11 @@ var sampleConfig = `
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
# data_format = "influx"
## Use batch serialization format (default) instead of line based format.
## Batch format is more efficient and should be used unless line based
## format is really needed.
# use_batch_format = true
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
@ -80,9 +84,9 @@ var sampleConfig = `
`
const (
defaultClientTimeout = 5 * time.Second
defaultContentType = "text/plain; charset=utf-8"
defaultMethod = http.MethodPost
defaultContentType = "text/plain; charset=utf-8"
defaultMethod = http.MethodPost
defaultUseBatchFormat = true
)
type HTTP struct {
@ -92,6 +96,7 @@ type HTTP struct {
Password string `toml:"password"`
Headers map[string]string `toml:"headers"`
ContentEncoding string `toml:"content_encoding"`
UseBatchFormat bool `toml:"use_batch_format"`
httpconfig.HTTPClientConfig
Log telegraf.Logger `toml:"-"`
@ -136,12 +141,30 @@ func (h *HTTP) SampleConfig() string {
}
func (h *HTTP) Write(metrics []telegraf.Metric) error {
reqBody, err := h.serializer.SerializeBatch(metrics)
if err != nil {
return err
var reqBody []byte
if h.UseBatchFormat {
var err error
reqBody, err = h.serializer.SerializeBatch(metrics)
if err != nil {
return err
}
return h.write(reqBody)
}
return h.write(reqBody)
for _, metric := range metrics {
var err error
reqBody, err = h.serializer.Serialize(metric)
if err != nil {
return err
}
if err := h.write(reqBody); err != nil {
return err
}
}
return nil
}
func (h *HTTP) write(reqBody []byte) error {
@ -205,8 +228,9 @@ func (h *HTTP) write(reqBody []byte) error {
func init() {
outputs.Add("http", func() telegraf.Output {
return &HTTP{
Method: defaultMethod,
URL: defaultURL,
Method: defaultMethod,
URL: defaultURL,
UseBatchFormat: defaultUseBatchFormat,
}
})
}

View File

@ -15,7 +15,9 @@ import (
"github.com/influxdata/telegraf/metric"
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
oauth "github.com/influxdata/telegraf/plugins/common/oauth"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/stretchr/testify/require"
)
@ -32,6 +34,15 @@ func getMetric() telegraf.Metric {
return m
}
func getMetrics(n int) []telegraf.Metric {
m := make([]telegraf.Metric, n)
for n > 0 {
n--
m[n] = getMetric()
}
return m
}
func TestInvalidMethod(t *testing.T) {
plugin := &HTTP{
URL: "",
@ -455,3 +466,51 @@ func TestDefaultUserAgent(t *testing.T) {
require.NoError(t, err)
})
}
func TestBatchedUnbatched(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
client := &HTTP{
URL: u.String(),
Method: defaultMethod,
}
var s = map[string]serializers.Serializer{
"influx": influx.NewSerializer(),
"json": func(s serializers.Serializer, err error) serializers.Serializer {
require.NoError(t, err)
return s
}(json.NewSerializer(time.Second, "")),
}
for name, serializer := range s {
var requests int
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requests++
w.WriteHeader(http.StatusOK)
})
t.Run(name, func(t *testing.T) {
for _, mode := range [...]bool{false, true} {
requests = 0
client.UseBatchFormat = mode
client.SetSerializer(serializer)
err = client.Connect()
require.NoError(t, err)
err = client.Write(getMetrics(3))
require.NoError(t, err)
if client.UseBatchFormat {
require.Equal(t, requests, 1, "batched")
} else {
require.Equal(t, requests, 3, "unbatched")
}
}
})
}
}