feat(internal): Add additional faster compression options (#13316)

This commit is contained in:
Zeyad Kenawi 2023-06-09 16:28:14 +03:00 committed by GitHub
parent dada11e228
commit cba7369903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 339 additions and 31 deletions

View File

@ -222,6 +222,7 @@ following works:
- github.com/kballard/go-shellquote [MIT License](https://github.com/kballard/go-shellquote/blob/master/LICENSE) - github.com/kballard/go-shellquote [MIT License](https://github.com/kballard/go-shellquote/blob/master/LICENSE)
- github.com/klauspost/compress [BSD 3-Clause Clear License](https://github.com/klauspost/compress/blob/master/LICENSE) - github.com/klauspost/compress [BSD 3-Clause Clear License](https://github.com/klauspost/compress/blob/master/LICENSE)
- github.com/klauspost/cpuid [MIT License](https://github.com/klauspost/cpuid/blob/master/LICENSE) - github.com/klauspost/cpuid [MIT License](https://github.com/klauspost/cpuid/blob/master/LICENSE)
- github.com/klauspost/pgzip [MIT License](https://github.com/klauspost/pgzip/blob/master/LICENSE)
- github.com/kolo/xmlrpc [MIT License](https://github.com/kolo/xmlrpc/blob/master/LICENSE) - github.com/kolo/xmlrpc [MIT License](https://github.com/kolo/xmlrpc/blob/master/LICENSE)
- github.com/kylelemons/godebug [Apache License 2.0](https://github.com/kylelemons/godebug/blob/master/LICENSE) - github.com/kylelemons/godebug [Apache License 2.0](https://github.com/kylelemons/godebug/blob/master/LICENSE)
- github.com/leodido/ragel-machinery [MIT License](https://github.com/leodido/ragel-machinery/blob/develop/LICENSE) - github.com/leodido/ragel-machinery [MIT License](https://github.com/leodido/ragel-machinery/blob/develop/LICENSE)

3
go.mod
View File

@ -120,6 +120,8 @@ require (
github.com/kardianos/service v1.2.2 github.com/kardianos/service v1.2.2
github.com/karrick/godirwalk v1.16.2 github.com/karrick/godirwalk v1.16.2
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/klauspost/compress v1.16.5
github.com/klauspost/pgzip v1.2.6
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b
github.com/linkedin/goavro/v2 v2.12.0 github.com/linkedin/goavro/v2 v2.12.0
github.com/logzio/azure-monitor-metrics-receiver v1.0.0 github.com/logzio/azure-monitor-metrics-receiver v1.0.0
@ -352,7 +354,6 @@ require (
github.com/juju/webbrowser v1.0.0 // indirect github.com/juju/webbrowser v1.0.0 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/kr/fs v0.1.0 // indirect github.com/kr/fs v0.1.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect github.com/kylelemons/godebug v1.1.0 // indirect

2
go.sum
View File

@ -1015,6 +1015,8 @@ github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZX
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/klauspost/pgzip v1.2.4/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.4/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.2.5/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/klauspost/pgzip v1.2.6 h1:8RXeL5crjEUFnR2/Sn6GJNWtSQ3Dk8pq4CL3jvdDyjU=
github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00= github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b h1:udzkj9S/zlT5X367kqJis0QP7YMxobob6zhzq6Yre00=
github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM= github.com/kolo/xmlrpc v0.0.0-20220921171641-a4b6fa1dd06b/go.mod h1:pcaDhQK0/NJZEvtCO0qQPPropqV0sJOJ6YW7X+9kRwM=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=

View File

@ -3,15 +3,31 @@ package internal
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"compress/gzip"
"compress/zlib"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zlib"
"github.com/klauspost/pgzip"
) )
const DefaultMaxDecompressionSize = 500 * 1024 * 1024 //500MB const DefaultMaxDecompressionSize = 500 * 1024 * 1024 //500MB
// EncodingOption provide methods to change the encoding from the standard
// configuration.
type EncodingOption func(*encoderConfig)
type encoderConfig struct {
level int
}
func EncoderCompressionLevel(level int) EncodingOption {
return func(cfg *encoderConfig) {
cfg.level = level
}
}
// NewStreamContentDecoder returns a reader that will decode the stream // NewStreamContentDecoder returns a reader that will decode the stream
// according to the encoding type. // according to the encoding type.
func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) { func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) {
@ -28,7 +44,7 @@ func NewStreamContentDecoder(encoding string, r io.Reader) (io.Reader, error) {
// GzipReader is similar to gzip.Reader but reads only a single gzip stream per read. // GzipReader is similar to gzip.Reader but reads only a single gzip stream per read.
type GzipReader struct { type GzipReader struct {
r io.Reader r io.Reader
z *gzip.Reader z *pgzip.Reader
endOfStream bool endOfStream bool
} }
@ -38,7 +54,7 @@ func NewGzipReader(r io.Reader) (io.Reader, error) {
br := bufio.NewReader(r) br := bufio.NewReader(r)
// Reads the first gzip stream header. // Reads the first gzip stream header.
z, err := gzip.NewReader(br) z, err := pgzip.NewReader(br)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -72,12 +88,12 @@ func (r *GzipReader) Read(b []byte) (int, error) {
} }
// NewContentEncoder returns a ContentEncoder for the encoding type. // NewContentEncoder returns a ContentEncoder for the encoding type.
func NewContentEncoder(encoding string) (ContentEncoder, error) { func NewContentEncoder(encoding string, options ...EncodingOption) (ContentEncoder, error) {
switch encoding { switch encoding {
case "gzip": case "gzip":
return NewGzipEncoder(), nil return NewGzipEncoder(options...)
case "zlib": case "zlib":
return NewZlibEncoder(), nil return NewZlibEncoder(options...)
case "identity", "": case "identity", "":
return NewIdentityEncoder(), nil return NewIdentityEncoder(), nil
default: default:
@ -133,19 +149,42 @@ type ContentEncoder interface {
// GzipEncoder compresses the buffer using gzip at the default level. // GzipEncoder compresses the buffer using gzip at the default level.
type GzipEncoder struct { type GzipEncoder struct {
writer *gzip.Writer pwriter *pgzip.Writer
buf *bytes.Buffer writer *gzip.Writer
buf *bytes.Buffer
} }
func NewGzipEncoder() *GzipEncoder { func NewGzipEncoder(options ...EncodingOption) (*GzipEncoder, error) {
var buf bytes.Buffer cfg := encoderConfig{level: pgzip.DefaultCompression}
return &GzipEncoder{ for _, o := range options {
writer: gzip.NewWriter(&buf), o(&cfg)
buf: &buf,
} }
var buf bytes.Buffer
pw, err := pgzip.NewWriterLevel(&buf, cfg.level)
if err != nil {
return nil, err
}
w, err := gzip.NewWriterLevel(&buf, cfg.level)
return &GzipEncoder{
pwriter: pw,
writer: w,
buf: &buf,
}, err
} }
func (e *GzipEncoder) Encode(data []byte) ([]byte, error) { func (e *GzipEncoder) Encode(data []byte) ([]byte, error) {
// Parallel Gzip is only faster for larger data chunks. According to the
// project's documentation the trade-off size is at about 1MB, so we switch
// to parallel Gzip if the data is larger and run the built-in version
// otherwise.
if len(data) > 1024*1024 {
return e.encodeBig(data)
}
return e.encodeSmall(data)
}
func (e *GzipEncoder) encodeSmall(data []byte) ([]byte, error) {
e.buf.Reset() e.buf.Reset()
e.writer.Reset(e.buf) e.writer.Reset(e.buf)
@ -160,17 +199,38 @@ func (e *GzipEncoder) Encode(data []byte) ([]byte, error) {
return e.buf.Bytes(), nil return e.buf.Bytes(), nil
} }
func (e *GzipEncoder) encodeBig(data []byte) ([]byte, error) {
e.buf.Reset()
e.pwriter.Reset(e.buf)
_, err := e.pwriter.Write(data)
if err != nil {
return nil, err
}
err = e.pwriter.Close()
if err != nil {
return nil, err
}
return e.buf.Bytes(), nil
}
type ZlibEncoder struct { type ZlibEncoder struct {
writer *zlib.Writer writer *zlib.Writer
buf *bytes.Buffer buf *bytes.Buffer
} }
func NewZlibEncoder() *ZlibEncoder { func NewZlibEncoder(options ...EncodingOption) (*ZlibEncoder, error) {
var buf bytes.Buffer cfg := encoderConfig{level: zlib.DefaultCompression}
return &ZlibEncoder{ for _, o := range options {
writer: zlib.NewWriter(&buf), o(&cfg)
buf: &buf,
} }
var buf bytes.Buffer
w, err := zlib.NewWriterLevel(&buf, cfg.level)
return &ZlibEncoder{
writer: w,
buf: &buf,
}, err
} }
func (e *ZlibEncoder) Encode(data []byte) ([]byte, error) { func (e *ZlibEncoder) Encode(data []byte) ([]byte, error) {
@ -207,20 +267,33 @@ type ContentDecoder interface {
// GzipDecoder decompresses buffers with gzip compression. // GzipDecoder decompresses buffers with gzip compression.
type GzipDecoder struct { type GzipDecoder struct {
reader *gzip.Reader preader *pgzip.Reader
buf *bytes.Buffer reader *gzip.Reader
buf *bytes.Buffer
} }
func NewGzipDecoder() *GzipDecoder { func NewGzipDecoder() *GzipDecoder {
return &GzipDecoder{ return &GzipDecoder{
reader: new(gzip.Reader), preader: new(pgzip.Reader),
buf: new(bytes.Buffer), reader: new(gzip.Reader),
buf: new(bytes.Buffer),
} }
} }
func (*GzipDecoder) SetEncoding(string) {} func (*GzipDecoder) SetEncoding(string) {}
func (d *GzipDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, error) { func (d *GzipDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, error) {
// Parallel Gzip is only faster for larger data chunks. According to the
// project's documentation the trade-off size is at about 1MB, so we switch
// to parallel Gzip if the data is larger and run the built-in version
// otherwise.
if len(data) > 1024*1024 {
return d.decodeBig(data, maxDecompressionSize)
}
return d.decodeSmall(data, maxDecompressionSize)
}
func (d *GzipDecoder) decodeSmall(data []byte, maxDecompressionSize int64) ([]byte, error) {
err := d.reader.Reset(bytes.NewBuffer(data)) err := d.reader.Reset(bytes.NewBuffer(data))
if err != nil { if err != nil {
return nil, err return nil, err
@ -241,6 +314,27 @@ func (d *GzipDecoder) Decode(data []byte, maxDecompressionSize int64) ([]byte, e
return d.buf.Bytes(), nil return d.buf.Bytes(), nil
} }
func (d *GzipDecoder) decodeBig(data []byte, maxDecompressionSize int64) ([]byte, error) {
err := d.preader.Reset(bytes.NewBuffer(data))
if err != nil {
return nil, err
}
d.buf.Reset()
n, err := io.CopyN(d.buf, d.preader, maxDecompressionSize)
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
} else if n == maxDecompressionSize {
return nil, fmt.Errorf("size of decoded data exceeds allowed size %d", maxDecompressionSize)
}
err = d.preader.Close()
if err != nil {
return nil, err
}
return d.buf.Bytes(), nil
}
type ZlibDecoder struct { type ZlibDecoder struct {
buf *bytes.Buffer buf *bytes.Buffer
} }

View File

@ -3,6 +3,7 @@ package internal
import ( import (
"bytes" "bytes"
"io" "io"
"strings"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -11,7 +12,8 @@ import (
const maxDecompressionSize = 1024 const maxDecompressionSize = 1024
func TestGzipEncodeDecode(t *testing.T) { func TestGzipEncodeDecode(t *testing.T) {
enc := NewGzipEncoder() enc, err := NewGzipEncoder()
require.NoError(t, err)
dec := NewGzipDecoder() dec := NewGzipDecoder()
payload, err := enc.Encode([]byte("howdy")) payload, err := enc.Encode([]byte("howdy"))
@ -24,7 +26,8 @@ func TestGzipEncodeDecode(t *testing.T) {
} }
func TestGzipReuse(t *testing.T) { func TestGzipReuse(t *testing.T) {
enc := NewGzipEncoder() enc, err := NewGzipEncoder()
require.NoError(t, err)
dec := NewGzipDecoder() dec := NewGzipDecoder()
payload, err := enc.Encode([]byte("howdy")) payload, err := enc.Encode([]byte("howdy"))
@ -45,7 +48,8 @@ func TestGzipReuse(t *testing.T) {
} }
func TestZlibEncodeDecode(t *testing.T) { func TestZlibEncodeDecode(t *testing.T) {
enc := NewZlibEncoder() enc, err := NewZlibEncoder()
require.NoError(t, err)
dec := NewZlibDecoder() dec := NewZlibDecoder()
payload, err := enc.Encode([]byte("howdy")) payload, err := enc.Encode([]byte("howdy"))
@ -58,7 +62,8 @@ func TestZlibEncodeDecode(t *testing.T) {
} }
func TestZlibEncodeDecodeWithTooLargeMessage(t *testing.T) { func TestZlibEncodeDecodeWithTooLargeMessage(t *testing.T) {
enc := NewZlibEncoder() enc, err := NewZlibEncoder()
require.NoError(t, err)
dec := NewZlibDecoder() dec := NewZlibDecoder()
payload, err := enc.Encode([]byte("howdy")) payload, err := enc.Encode([]byte("howdy"))
@ -97,7 +102,8 @@ func TestStreamIdentityDecode(t *testing.T) {
} }
func TestStreamGzipDecode(t *testing.T) { func TestStreamGzipDecode(t *testing.T) {
enc := NewGzipEncoder() enc, err := NewGzipEncoder()
require.NoError(t, err)
written, err := enc.Encode([]byte("howdy")) written, err := enc.Encode([]byte("howdy"))
require.NoError(t, err) require.NoError(t, err)
@ -113,3 +119,205 @@ func TestStreamGzipDecode(t *testing.T) {
require.Equal(t, []byte("howdy"), b[:n]) require.Equal(t, []byte("howdy"), b[:n])
} }
func BenchmarkGzipEncode(b *testing.B) {
data := []byte(strings.Repeat("-howdy stranger-", 64))
dataLen := int64(len(data)) + 1
enc, err := NewGzipEncoder()
require.NoError(b, err)
dec := NewGzipDecoder()
payload, err := enc.Encode(data)
require.NoError(b, err)
actual, err := dec.Decode(payload, dataLen)
require.NoError(b, err)
require.Equal(b, data, actual)
for n := 0; n < b.N; n++ {
_, err := enc.Encode(data)
require.NoError(b, err)
}
}
func BenchmarkGzipDecode(b *testing.B) {
data := []byte(strings.Repeat("-howdy stranger-", 64))
dataLen := int64(len(data)) + 1
enc, err := NewGzipEncoder()
require.NoError(b, err)
dec := NewGzipDecoder()
payload, err := enc.Encode(data)
require.NoError(b, err)
actual, err := dec.Decode(payload, dataLen)
require.NoError(b, err)
require.Equal(b, data, actual)
for n := 0; n < b.N; n++ {
_, err = dec.Decode(payload, dataLen)
require.NoError(b, err)
}
}
func BenchmarkGzipEncodeDecode(b *testing.B) {
data := []byte(strings.Repeat("-howdy stranger-", 64))
dataLen := int64(len(data)) + 1
enc, err := NewGzipEncoder()
require.NoError(b, err)
dec := NewGzipDecoder()
payload, err := enc.Encode(data)
require.NoError(b, err)
actual, err := dec.Decode(payload, dataLen)
require.NoError(b, err)
require.Equal(b, data, actual)
for n := 0; n < b.N; n++ {
payload, err := enc.Encode(data)
require.NoError(b, err)
_, err = dec.Decode(payload, dataLen)
require.NoError(b, err)
}
}
func BenchmarkGzipEncodeBig(b *testing.B) {
data := []byte(strings.Repeat("-howdy stranger-", 1024*1024))
dataLen := int64(len(data)) + 1
enc, err := NewGzipEncoder()
require.NoError(b, err)
dec := NewGzipDecoder()
payload, err := enc.Encode(data)
require.NoError(b, err)
actual, err := dec.Decode(payload, dataLen)
require.NoError(b, err)
require.Equal(b, data, actual)
for n := 0; n < b.N; n++ {
_, err := enc.Encode(data)
require.NoError(b, err)
}
}
func BenchmarkGzipDecodeBig(b *testing.B) {
data := []byte(strings.Repeat("-howdy stranger-", 1024*1024))
dataLen := int64(len(data)) + 1
enc, err := NewGzipEncoder()
require.NoError(b, err)
dec := NewGzipDecoder()
payload, err := enc.Encode(data)
require.NoError(b, err)
actual, err := dec.Decode(payload, dataLen)
require.NoError(b, err)
require.Equal(b, data, actual)
for n := 0; n < b.N; n++ {
_, err = dec.Decode(payload, dataLen)
require.NoError(b, err)
}
}
func BenchmarkGzipEncodeDecodeBig(b *testing.B) {
data := []byte(strings.Repeat("-howdy stranger-", 1024*1024))
dataLen := int64(len(data)) + 1
enc, err := NewGzipEncoder()
require.NoError(b, err)
dec := NewGzipDecoder()
payload, err := enc.Encode(data)
require.NoError(b, err)
actual, err := dec.Decode(payload, dataLen)
require.NoError(b, err)
require.Equal(b, data, actual)
for n := 0; n < b.N; n++ {
payload, err := enc.Encode(data)
require.NoError(b, err)
_, err = dec.Decode(payload, dataLen)
require.NoError(b, err)
}
}
func BenchmarkZlibEncode(b *testing.B) {
data := []byte(strings.Repeat("-howdy stranger-", 64))
dataLen := int64(len(data)) + 1
enc, err := NewZlibEncoder()
require.NoError(b, err)
dec := NewZlibDecoder()
payload, err := enc.Encode(data)
require.NoError(b, err)
actual, err := dec.Decode(payload, dataLen)
require.NoError(b, err)
require.Equal(b, data, actual)
for n := 0; n < b.N; n++ {
_, err := enc.Encode(data)
require.NoError(b, err)
}
}
func BenchmarkZlibDecode(b *testing.B) {
data := []byte(strings.Repeat("-howdy stranger-", 64))
dataLen := int64(len(data)) + 1
enc, err := NewZlibEncoder()
require.NoError(b, err)
dec := NewZlibDecoder()
payload, err := enc.Encode(data)
require.NoError(b, err)
actual, err := dec.Decode(payload, dataLen)
require.NoError(b, err)
require.Equal(b, data, actual)
for n := 0; n < b.N; n++ {
_, err = dec.Decode(payload, dataLen)
require.NoError(b, err)
}
}
func BenchmarkZlibEncodeDecode(b *testing.B) {
data := []byte(strings.Repeat("-howdy stranger-", 64))
dataLen := int64(len(data)) + 1
enc, err := NewZlibEncoder()
require.NoError(b, err)
dec := NewZlibDecoder()
payload, err := enc.Encode(data)
require.NoError(b, err)
actual, err := dec.Decode(payload, dataLen)
require.NoError(b, err)
require.Equal(b, data, actual)
for n := 0; n < b.N; n++ {
payload, err := enc.Encode(data)
require.NoError(b, err)
_, err = dec.Decode(payload, dataLen)
require.NoError(b, err)
}
}
func BenchmarkIdentityEncodeDecode(b *testing.B) {
data := []byte(strings.Repeat("-howdy stranger-", 64))
dataLen := int64(len(data)) + 1
enc := NewIdentityEncoder()
dec := NewIdentityDecoder()
payload, err := enc.Encode(data)
require.NoError(b, err)
actual, err := dec.Decode(payload, dataLen)
require.NoError(b, err)
require.Equal(b, data, actual)
for n := 0; n < b.N; n++ {
payload, err := enc.Encode(data)
require.NoError(b, err)
_, err = dec.Decode(payload, dataLen)
require.NoError(b, err)
}
}

View File

@ -13,7 +13,8 @@ import (
) )
func TestAutoEncoding(t *testing.T) { func TestAutoEncoding(t *testing.T) {
enc := internal.NewGzipEncoder() enc, err := internal.NewGzipEncoder()
require.NoError(t, err)
payload, err := enc.Encode([]byte(`measurementName fieldKey="gzip" 1556813561098000000`)) payload, err := enc.Encode([]byte(`measurementName fieldKey="gzip" 1556813561098000000`))
require.NoError(t, err) require.NoError(t, err)

View File

@ -143,7 +143,8 @@ func TestRunGzipDecode(t *testing.T) {
require.NotNil(t, ps.sub) require.NotNil(t, ps.sub)
testTracker := &testTracker{} testTracker := &testTracker{}
enc := internal.NewGzipEncoder() enc, err := internal.NewGzipEncoder()
require.NoError(t, err)
gzippedMsg, err := enc.Encode([]byte(msgInflux)) gzippedMsg, err := enc.Encode([]byte(msgInflux))
require.NoError(t, err) require.NoError(t, err)
msg := &testMsg{ msg := &testMsg{