Adds snappy support for http_listener_v2 (#8966)
This commit is contained in:
parent
1eb47e245c
commit
cc6c51cf16
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/snappy"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
|
|
@ -247,28 +248,50 @@ func (h *HTTPListenerV2) serveWrite(res http.ResponseWriter, req *http.Request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HTTPListenerV2) collectBody(res http.ResponseWriter, req *http.Request) ([]byte, bool) {
|
func (h *HTTPListenerV2) collectBody(res http.ResponseWriter, req *http.Request) ([]byte, bool) {
|
||||||
body := req.Body
|
encoding := req.Header.Get("Content-Encoding")
|
||||||
|
|
||||||
// Handle gzip request bodies
|
switch encoding {
|
||||||
if req.Header.Get("Content-Encoding") == "gzip" {
|
case "gzip":
|
||||||
var err error
|
r, err := gzip.NewReader(req.Body)
|
||||||
body, err = gzip.NewReader(req.Body)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.Log.Debug(err.Error())
|
h.Log.Debug(err.Error())
|
||||||
badRequest(res)
|
badRequest(res)
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
defer body.Close()
|
defer r.Close()
|
||||||
|
maxReader := http.MaxBytesReader(res, r, h.MaxBodySize.Size)
|
||||||
|
bytes, err := ioutil.ReadAll(maxReader)
|
||||||
|
if err != nil {
|
||||||
|
tooLarge(res)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return bytes, true
|
||||||
|
case "snappy":
|
||||||
|
defer req.Body.Close()
|
||||||
|
bytes, err := ioutil.ReadAll(req.Body)
|
||||||
|
if err != nil {
|
||||||
|
h.Log.Debug(err.Error())
|
||||||
|
badRequest(res)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
// snappy block format is only supported by decode/encode not snappy reader/writer
|
||||||
|
bytes, err = snappy.Decode(nil, bytes)
|
||||||
|
if err != nil {
|
||||||
|
h.Log.Debug(err.Error())
|
||||||
|
badRequest(res)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return bytes, true
|
||||||
|
default:
|
||||||
|
defer req.Body.Close()
|
||||||
|
bytes, err := ioutil.ReadAll(req.Body)
|
||||||
|
if err != nil {
|
||||||
|
h.Log.Debug(err.Error())
|
||||||
|
badRequest(res)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
return bytes, true
|
||||||
}
|
}
|
||||||
|
|
||||||
body = http.MaxBytesReader(res, body, h.MaxBodySize.Size)
|
|
||||||
bytes, err := ioutil.ReadAll(body)
|
|
||||||
if err != nil {
|
|
||||||
tooLarge(res)
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
|
|
||||||
return bytes, true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HTTPListenerV2) collectQuery(res http.ResponseWriter, req *http.Request) ([]byte, bool) {
|
func (h *HTTPListenerV2) collectQuery(res http.ResponseWriter, req *http.Request) ([]byte, bool) {
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/snappy"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
|
@ -327,6 +328,44 @@ func TestWriteHTTPGzippedData(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// test that writing snappy data works
|
||||||
|
func TestWriteHTTPSnappyData(t *testing.T) {
|
||||||
|
listener := newTestHTTPListenerV2()
|
||||||
|
|
||||||
|
acc := &testutil.Accumulator{}
|
||||||
|
require.NoError(t, listener.Start(acc))
|
||||||
|
defer listener.Stop()
|
||||||
|
|
||||||
|
testData := "cpu_load_short,host=server01 value=12.0 1422568543702900257\n"
|
||||||
|
encodedData := snappy.Encode(nil, []byte(testData))
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", createURL(listener, "http", "/write", ""), bytes.NewBuffer(encodedData))
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set("Content-Encoding", "snappy")
|
||||||
|
|
||||||
|
client := &http.Client{}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Log("Test client request failed. Error: ", err)
|
||||||
|
}
|
||||||
|
err = resp.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Log("Test client close failed. Error: ", err)
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.EqualValues(t, 204, resp.StatusCode)
|
||||||
|
|
||||||
|
hostTags := []string{"server01"}
|
||||||
|
acc.Wait(1)
|
||||||
|
|
||||||
|
for _, hostTag := range hostTags {
|
||||||
|
acc.AssertContainsTaggedFields(t, "cpu_load_short",
|
||||||
|
map[string]interface{}{"value": float64(12)},
|
||||||
|
map[string]string{"host": hostTag},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// writes 25,000 metrics to the listener with 10 different writers
|
// writes 25,000 metrics to the listener with 10 different writers
|
||||||
func TestWriteHTTPHighTraffic(t *testing.T) {
|
func TestWriteHTTPHighTraffic(t *testing.T) {
|
||||||
if runtime.GOOS == "darwin" {
|
if runtime.GOOS == "darwin" {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue