diff --git a/plugins/outputs/http/README.md b/plugins/outputs/http/README.md index d90192b70..909779262 100644 --- a/plugins/outputs/http/README.md +++ b/plugins/outputs/http/README.md @@ -1,7 +1,8 @@ # HTTP Output Plugin This plugin sends metrics in a HTTP message encoded using one of the output -data formats. For data_formats that support batching, metrics are sent in batch format. +data formats. For data_formats that support batching, metrics are sent in +batch format by default. ### Configuration: @@ -49,6 +50,11 @@ data formats. For data_formats that support batching, metrics are sent in batch ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md # data_format = "influx" + ## Use batch serialization format (default) instead of line based format. + ## Batch format is more efficient and should be used unless line based + ## format is really needed. + # use_batch_format = true + ## HTTP Content-Encoding for write request body, can be set to "gzip" to ## compress body or "identity" to apply no encoding. # content_encoding = "identity" diff --git a/plugins/outputs/http/http.go b/plugins/outputs/http/http.go index c94052ea9..b866c6021 100644 --- a/plugins/outputs/http/http.go +++ b/plugins/outputs/http/http.go @@ -8,7 +8,6 @@ import ( "io" "net/http" "strings" - "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -64,6 +63,11 @@ var sampleConfig = ` ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md # data_format = "influx" + ## Use batch serialization format (default) instead of line based format. + ## Batch format is more efficient and should be used unless line based + ## format is really needed. + # use_batch_format = true + ## HTTP Content-Encoding for write request body, can be set to "gzip" to ## compress body or "identity" to apply no encoding. # content_encoding = "identity" @@ -80,9 +84,9 @@ var sampleConfig = ` ` const ( - defaultClientTimeout = 5 * time.Second - defaultContentType = "text/plain; charset=utf-8" - defaultMethod = http.MethodPost + defaultContentType = "text/plain; charset=utf-8" + defaultMethod = http.MethodPost + defaultUseBatchFormat = true ) type HTTP struct { @@ -92,6 +96,7 @@ type HTTP struct { Password string `toml:"password"` Headers map[string]string `toml:"headers"` ContentEncoding string `toml:"content_encoding"` + UseBatchFormat bool `toml:"use_batch_format"` httpconfig.HTTPClientConfig Log telegraf.Logger `toml:"-"` @@ -136,12 +141,30 @@ func (h *HTTP) SampleConfig() string { } func (h *HTTP) Write(metrics []telegraf.Metric) error { - reqBody, err := h.serializer.SerializeBatch(metrics) - if err != nil { - return err + var reqBody []byte + + if h.UseBatchFormat { + var err error + reqBody, err = h.serializer.SerializeBatch(metrics) + if err != nil { + return err + } + + return h.write(reqBody) } - return h.write(reqBody) + for _, metric := range metrics { + var err error + reqBody, err = h.serializer.Serialize(metric) + if err != nil { + return err + } + + if err := h.write(reqBody); err != nil { + return err + } + } + return nil } func (h *HTTP) write(reqBody []byte) error { @@ -205,8 +228,9 @@ func (h *HTTP) write(reqBody []byte) error { func init() { outputs.Add("http", func() telegraf.Output { return &HTTP{ - Method: defaultMethod, - URL: defaultURL, + Method: defaultMethod, + URL: defaultURL, + UseBatchFormat: defaultUseBatchFormat, } }) } diff --git a/plugins/outputs/http/http_test.go b/plugins/outputs/http/http_test.go index d6803eed3..a5fc49b84 100644 --- a/plugins/outputs/http/http_test.go +++ b/plugins/outputs/http/http_test.go @@ -15,7 +15,9 @@ import ( "github.com/influxdata/telegraf/metric" httpconfig "github.com/influxdata/telegraf/plugins/common/http" oauth "github.com/influxdata/telegraf/plugins/common/oauth" + "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers/influx" + "github.com/influxdata/telegraf/plugins/serializers/json" "github.com/stretchr/testify/require" ) @@ -32,6 +34,15 @@ func getMetric() telegraf.Metric { return m } +func getMetrics(n int) []telegraf.Metric { + m := make([]telegraf.Metric, n) + for n > 0 { + n-- + m[n] = getMetric() + } + return m +} + func TestInvalidMethod(t *testing.T) { plugin := &HTTP{ URL: "", @@ -455,3 +466,51 @@ func TestDefaultUserAgent(t *testing.T) { require.NoError(t, err) }) } + +func TestBatchedUnbatched(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + client := &HTTP{ + URL: u.String(), + Method: defaultMethod, + } + + var s = map[string]serializers.Serializer{ + "influx": influx.NewSerializer(), + "json": func(s serializers.Serializer, err error) serializers.Serializer { + require.NoError(t, err) + return s + }(json.NewSerializer(time.Second, "")), + } + + for name, serializer := range s { + var requests int + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requests++ + w.WriteHeader(http.StatusOK) + }) + + t.Run(name, func(t *testing.T) { + for _, mode := range [...]bool{false, true} { + requests = 0 + client.UseBatchFormat = mode + client.SetSerializer(serializer) + + err = client.Connect() + require.NoError(t, err) + err = client.Write(getMetrics(3)) + require.NoError(t, err) + + if client.UseBatchFormat { + require.Equal(t, requests, 1, "batched") + } else { + require.Equal(t, requests, 3, "unbatched") + } + } + }) + } +}