From 1e04157c52e87372f5c4db408ee5f534627facc7 Mon Sep 17 00:00:00 2001 From: Jeremy Yang Date: Fri, 7 Jan 2022 08:38:19 -0800 Subject: [PATCH] feat: add compression to Datadog Output (#9963) --- internal/content_coding.go | 62 +++++++++++++++++++++++++ internal/content_coding_test.go | 15 ++++++ plugins/outputs/datadog/README.md | 4 ++ plugins/outputs/datadog/datadog.go | 42 ++++++++++++++--- plugins/outputs/datadog/datadog_test.go | 17 +++++++ 5 files changed, 134 insertions(+), 6 deletions(-) diff --git a/internal/content_coding.go b/internal/content_coding.go index b1a30bde1..df572ecb0 100644 --- a/internal/content_coding.go +++ b/internal/content_coding.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "compress/gzip" + "compress/zlib" "errors" "io" ) @@ -72,6 +73,8 @@ func NewContentEncoder(encoding string) (ContentEncoder, error) { switch encoding { case "gzip": return NewGzipEncoder() + case "zlib": + return NewZlibEncoder() case "identity", "": return NewIdentityEncoder(), nil default: @@ -84,6 +87,8 @@ func NewContentDecoder(encoding string) (ContentDecoder, error) { switch encoding { case "gzip": return NewGzipDecoder() + case "zlib": + return NewZlibDecoder() case "identity", "": return NewIdentityDecoder(), nil default: @@ -125,6 +130,34 @@ func (e *GzipEncoder) Encode(data []byte) ([]byte, error) { return e.buf.Bytes(), nil } +type ZlibEncoder struct { + writer *zlib.Writer + buf *bytes.Buffer +} + +func NewZlibEncoder() (*ZlibEncoder, error) { + var buf bytes.Buffer + return &ZlibEncoder{ + writer: zlib.NewWriter(&buf), + buf: &buf, + }, nil +} + +func (e *ZlibEncoder) Encode(data []byte) ([]byte, error) { + e.buf.Reset() + e.writer.Reset(e.buf) + + _, err := e.writer.Write(data) + if err != nil { + return nil, err + } + err = e.writer.Close() + if err != nil { + return nil, err + } + return e.buf.Bytes(), nil +} + // IdentityEncoder is a null encoder that applies no transformation. type IdentityEncoder struct{} @@ -169,6 +202,35 @@ func (d *GzipDecoder) Decode(data []byte) ([]byte, error) { return d.buf.Bytes(), nil } +type ZlibDecoder struct { + buf *bytes.Buffer +} + +func NewZlibDecoder() (*ZlibDecoder, error) { + return &ZlibDecoder{ + buf: new(bytes.Buffer), + }, nil +} + +func (d *ZlibDecoder) Decode(data []byte) ([]byte, error) { + d.buf.Reset() + + b := bytes.NewBuffer(data) + r, err := zlib.NewReader(b) + if err != nil { + return nil, err + } + _, err = io.Copy(d.buf, r) + if err != nil && err != io.EOF { + return nil, err + } + err = r.Close() + if err != nil { + return nil, err + } + return d.buf.Bytes(), nil +} + // IdentityDecoder is a null decoder that returns the input. type IdentityDecoder struct{} diff --git a/internal/content_coding_test.go b/internal/content_coding_test.go index 06235a638..72e4694f9 100644 --- a/internal/content_coding_test.go +++ b/internal/content_coding_test.go @@ -46,6 +46,21 @@ func TestGzipReuse(t *testing.T) { require.Equal(t, "doody", string(actual)) } +func TestZlibEncodeDecode(t *testing.T) { + enc, err := NewZlibEncoder() + require.NoError(t, err) + dec, err := NewZlibDecoder() + require.NoError(t, err) + + payload, err := enc.Encode([]byte("howdy")) + require.NoError(t, err) + + actual, err := dec.Decode(payload) + require.NoError(t, err) + + require.Equal(t, "howdy", string(actual)) +} + func TestIdentityEncodeDecode(t *testing.T) { enc := NewIdentityEncoder() dec := NewIdentityDecoder() diff --git a/plugins/outputs/datadog/README.md b/plugins/outputs/datadog/README.md index dc709449b..2414ccfac 100644 --- a/plugins/outputs/datadog/README.md +++ b/plugins/outputs/datadog/README.md @@ -18,6 +18,10 @@ This plugin writes to the [Datadog Metrics API][metrics] and requires an ## Set http_proxy (telegraf uses the system wide proxy settings if it isn't set) # http_proxy_url = "http://localhost:8888" + + ## Override the default (none) compression used to send data. + ## Supports: "zlib", "none" + # compression = "none" ``` ## Metrics diff --git a/plugins/outputs/datadog/datadog.go b/plugins/outputs/datadog/datadog.go index 6c89ab1e3..ecc707cb9 100644 --- a/plugins/outputs/datadog/datadog.go +++ b/plugins/outputs/datadog/datadog.go @@ -12,15 +12,17 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/common/proxy" "github.com/influxdata/telegraf/plugins/outputs" ) type Datadog struct { - Apikey string `toml:"apikey"` - Timeout config.Duration `toml:"timeout"` - URL string `toml:"url"` - Log telegraf.Logger `toml:"-"` + Apikey string `toml:"apikey"` + Timeout config.Duration `toml:"timeout"` + URL string `toml:"url"` + Compression string `toml:"compression"` + Log telegraf.Logger `toml:"-"` client *http.Client proxy.HTTPProxy @@ -38,6 +40,10 @@ var sampleConfig = ` ## Set http_proxy (telegraf uses the system wide proxy settings if it isn't set) # http_proxy_url = "http://localhost:8888" + + ## Override the default (none) compression used to send data. + ## Supports: "zlib", "none" + # compression = "none" ` type TimeSeries struct { @@ -122,7 +128,30 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error { if err != nil { return fmt.Errorf("unable to marshal TimeSeries, %s", err.Error()) } - req, err := http.NewRequest("POST", d.authenticatedURL(), bytes.NewBuffer(tsBytes)) + + var req *http.Request + c := strings.ToLower(d.Compression) + switch c { + case "zlib": + encoder, err := internal.NewContentEncoder(c) + if err != nil { + return err + } + buf, err := encoder.Encode(tsBytes) + if err != nil { + return err + } + req, err = http.NewRequest("POST", d.authenticatedURL(), bytes.NewBuffer(buf)) + if err != nil { + return err + } + req.Header.Set("Content-Encoding", "deflate") + case "none": + fallthrough + default: + req, err = http.NewRequest("POST", d.authenticatedURL(), bytes.NewBuffer(tsBytes)) + } + if err != nil { return fmt.Errorf("unable to create http.Request, %s", strings.Replace(err.Error(), d.Apikey, redactedAPIKey, -1)) } @@ -219,7 +248,8 @@ func (d *Datadog) Close() error { func init() { outputs.Add("datadog", func() telegraf.Output { return &Datadog{ - URL: datadogAPI, + URL: datadogAPI, + Compression: "none", } }) } diff --git a/plugins/outputs/datadog/datadog_test.go b/plugins/outputs/datadog/datadog_test.go index 4c149bf60..b2bd4352c 100644 --- a/plugins/outputs/datadog/datadog_test.go +++ b/plugins/outputs/datadog/datadog_test.go @@ -49,6 +49,23 @@ func TestUriOverride(t *testing.T) { require.NoError(t, err) } +func TestCompressionOverride(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + //nolint:errcheck,revive // Ignore the returned error as the test will fail anyway + json.NewEncoder(w).Encode(`{"status":"ok"}`) + })) + defer ts.Close() + + d := NewDatadog(ts.URL) + d.Apikey = "123456" + d.Compression = "zlib" + err := d.Connect() + require.NoError(t, err) + err = d.Write(testutil.MockMetrics()) + require.NoError(t, err) +} + func TestBadStatusCode(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError)