feat: add compression to Datadog Output (#9963)

This commit is contained in:
Jeremy Yang 2022-01-07 08:38:19 -08:00 committed by GitHub
parent a26f53ec48
commit 1e04157c52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 134 additions and 6 deletions

View File

@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"compress/gzip"
"compress/zlib"
"errors"
"io"
)
@ -72,6 +73,8 @@ func NewContentEncoder(encoding string) (ContentEncoder, error) {
switch encoding {
case "gzip":
return NewGzipEncoder()
case "zlib":
return NewZlibEncoder()
case "identity", "":
return NewIdentityEncoder(), nil
default:
@ -84,6 +87,8 @@ func NewContentDecoder(encoding string) (ContentDecoder, error) {
switch encoding {
case "gzip":
return NewGzipDecoder()
case "zlib":
return NewZlibDecoder()
case "identity", "":
return NewIdentityDecoder(), nil
default:
@ -125,6 +130,34 @@ func (e *GzipEncoder) Encode(data []byte) ([]byte, error) {
return e.buf.Bytes(), nil
}
type ZlibEncoder struct {
writer *zlib.Writer
buf *bytes.Buffer
}
func NewZlibEncoder() (*ZlibEncoder, error) {
var buf bytes.Buffer
return &ZlibEncoder{
writer: zlib.NewWriter(&buf),
buf: &buf,
}, nil
}
func (e *ZlibEncoder) Encode(data []byte) ([]byte, error) {
e.buf.Reset()
e.writer.Reset(e.buf)
_, err := e.writer.Write(data)
if err != nil {
return nil, err
}
err = e.writer.Close()
if err != nil {
return nil, err
}
return e.buf.Bytes(), nil
}
// IdentityEncoder is a null encoder that applies no transformation.
type IdentityEncoder struct{}
@ -169,6 +202,35 @@ func (d *GzipDecoder) Decode(data []byte) ([]byte, error) {
return d.buf.Bytes(), nil
}
type ZlibDecoder struct {
buf *bytes.Buffer
}
func NewZlibDecoder() (*ZlibDecoder, error) {
return &ZlibDecoder{
buf: new(bytes.Buffer),
}, nil
}
func (d *ZlibDecoder) Decode(data []byte) ([]byte, error) {
d.buf.Reset()
b := bytes.NewBuffer(data)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
_, err = io.Copy(d.buf, r)
if err != nil && err != io.EOF {
return nil, err
}
err = r.Close()
if err != nil {
return nil, err
}
return d.buf.Bytes(), nil
}
// IdentityDecoder is a null decoder that returns the input.
type IdentityDecoder struct{}

View File

@ -46,6 +46,21 @@ func TestGzipReuse(t *testing.T) {
require.Equal(t, "doody", string(actual))
}
func TestZlibEncodeDecode(t *testing.T) {
enc, err := NewZlibEncoder()
require.NoError(t, err)
dec, err := NewZlibDecoder()
require.NoError(t, err)
payload, err := enc.Encode([]byte("howdy"))
require.NoError(t, err)
actual, err := dec.Decode(payload)
require.NoError(t, err)
require.Equal(t, "howdy", string(actual))
}
func TestIdentityEncodeDecode(t *testing.T) {
enc := NewIdentityEncoder()
dec := NewIdentityDecoder()

View File

@ -18,6 +18,10 @@ This plugin writes to the [Datadog Metrics API][metrics] and requires an
## Set http_proxy (telegraf uses the system wide proxy settings if it isn't set)
# http_proxy_url = "http://localhost:8888"
## Override the default (none) compression used to send data.
## Supports: "zlib", "none"
# compression = "none"
```
## Metrics

View File

@ -12,15 +12,17 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/proxy"
"github.com/influxdata/telegraf/plugins/outputs"
)
type Datadog struct {
Apikey string `toml:"apikey"`
Timeout config.Duration `toml:"timeout"`
URL string `toml:"url"`
Log telegraf.Logger `toml:"-"`
Apikey string `toml:"apikey"`
Timeout config.Duration `toml:"timeout"`
URL string `toml:"url"`
Compression string `toml:"compression"`
Log telegraf.Logger `toml:"-"`
client *http.Client
proxy.HTTPProxy
@ -38,6 +40,10 @@ var sampleConfig = `
## Set http_proxy (telegraf uses the system wide proxy settings if it isn't set)
# http_proxy_url = "http://localhost:8888"
## Override the default (none) compression used to send data.
## Supports: "zlib", "none"
# compression = "none"
`
type TimeSeries struct {
@ -122,7 +128,30 @@ func (d *Datadog) Write(metrics []telegraf.Metric) error {
if err != nil {
return fmt.Errorf("unable to marshal TimeSeries, %s", err.Error())
}
req, err := http.NewRequest("POST", d.authenticatedURL(), bytes.NewBuffer(tsBytes))
var req *http.Request
c := strings.ToLower(d.Compression)
switch c {
case "zlib":
encoder, err := internal.NewContentEncoder(c)
if err != nil {
return err
}
buf, err := encoder.Encode(tsBytes)
if err != nil {
return err
}
req, err = http.NewRequest("POST", d.authenticatedURL(), bytes.NewBuffer(buf))
if err != nil {
return err
}
req.Header.Set("Content-Encoding", "deflate")
case "none":
fallthrough
default:
req, err = http.NewRequest("POST", d.authenticatedURL(), bytes.NewBuffer(tsBytes))
}
if err != nil {
return fmt.Errorf("unable to create http.Request, %s", strings.Replace(err.Error(), d.Apikey, redactedAPIKey, -1))
}
@ -219,7 +248,8 @@ func (d *Datadog) Close() error {
func init() {
outputs.Add("datadog", func() telegraf.Output {
return &Datadog{
URL: datadogAPI,
URL: datadogAPI,
Compression: "none",
}
})
}

View File

@ -49,6 +49,23 @@ func TestUriOverride(t *testing.T) {
require.NoError(t, err)
}
func TestCompressionOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
//nolint:errcheck,revive // Ignore the returned error as the test will fail anyway
json.NewEncoder(w).Encode(`{"status":"ok"}`)
}))
defer ts.Close()
d := NewDatadog(ts.URL)
d.Apikey = "123456"
d.Compression = "zlib"
err := d.Connect()
require.NoError(t, err)
err = d.Write(testutil.MockMetrics())
require.NoError(t, err)
}
func TestBadStatusCode(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)