fix: Handle compression level correctly for different algorithms (#13434)
This commit is contained in:
parent
a2125f0457
commit
a1c06429de
|
|
@ -14,15 +14,15 @@ import (
|
||||||
|
|
||||||
const DefaultMaxDecompressionSize = 500 * 1024 * 1024 //500MB
|
const DefaultMaxDecompressionSize = 500 * 1024 * 1024 //500MB
|
||||||
|
|
||||||
// EncodingOption provide methods to change the encoding from the standard
|
|
||||||
// configuration.
|
|
||||||
type EncodingOption func(*encoderConfig)
|
|
||||||
|
|
||||||
type encoderConfig struct {
|
type encoderConfig struct {
|
||||||
level int
|
level int
|
||||||
}
|
}
|
||||||
|
|
||||||
func EncoderCompressionLevel(level int) EncodingOption {
|
// EncodingOption provide methods to change the encoding from the standard
|
||||||
|
// configuration.
|
||||||
|
type EncodingOption func(*encoderConfig)
|
||||||
|
|
||||||
|
func WithCompressionLevel(level int) EncodingOption {
|
||||||
return func(cfg *encoderConfig) {
|
return func(cfg *encoderConfig) {
|
||||||
cfg.level = level
|
cfg.level = level
|
||||||
}
|
}
|
||||||
|
|
@ -95,7 +95,7 @@ func NewContentEncoder(encoding string, options ...EncodingOption) (ContentEncod
|
||||||
case "zlib":
|
case "zlib":
|
||||||
return NewZlibEncoder(options...)
|
return NewZlibEncoder(options...)
|
||||||
case "identity", "":
|
case "identity", "":
|
||||||
return NewIdentityEncoder(), nil
|
return NewIdentityEncoder(options...)
|
||||||
default:
|
default:
|
||||||
return nil, errors.New("invalid value for content_encoding")
|
return nil, errors.New("invalid value for content_encoding")
|
||||||
}
|
}
|
||||||
|
|
@ -155,16 +155,25 @@ type GzipEncoder struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGzipEncoder(options ...EncodingOption) (*GzipEncoder, error) {
|
func NewGzipEncoder(options ...EncodingOption) (*GzipEncoder, error) {
|
||||||
cfg := encoderConfig{level: pgzip.DefaultCompression}
|
cfg := encoderConfig{level: gzip.DefaultCompression}
|
||||||
for _, o := range options {
|
for _, o := range options {
|
||||||
o(&cfg)
|
o(&cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if the compression level is supported
|
||||||
|
switch cfg.level {
|
||||||
|
case gzip.NoCompression, gzip.DefaultCompression, gzip.BestSpeed, gzip.BestCompression:
|
||||||
|
// Do nothing as those are valid levels
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid compression level, only 0, 1 and 9 are supported")
|
||||||
|
}
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
pw, err := pgzip.NewWriterLevel(&buf, cfg.level)
|
pw, err := pgzip.NewWriterLevel(&buf, cfg.level)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := gzip.NewWriterLevel(&buf, cfg.level)
|
w, err := gzip.NewWriterLevel(&buf, cfg.level)
|
||||||
return &GzipEncoder{
|
return &GzipEncoder{
|
||||||
pwriter: pw,
|
pwriter: pw,
|
||||||
|
|
@ -225,6 +234,13 @@ func NewZlibEncoder(options ...EncodingOption) (*ZlibEncoder, error) {
|
||||||
o(&cfg)
|
o(&cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
switch cfg.level {
|
||||||
|
case zlib.NoCompression, zlib.DefaultCompression, zlib.BestSpeed, zlib.BestCompression:
|
||||||
|
// Do nothing as those are valid levels
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid compression level, only 0, 1 and 9 are supported")
|
||||||
|
}
|
||||||
|
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
w, err := zlib.NewWriterLevel(&buf, cfg.level)
|
w, err := zlib.NewWriterLevel(&buf, cfg.level)
|
||||||
return &ZlibEncoder{
|
return &ZlibEncoder{
|
||||||
|
|
@ -251,8 +267,12 @@ func (e *ZlibEncoder) Encode(data []byte) ([]byte, error) {
|
||||||
// IdentityEncoder is a null encoder that applies no transformation.
|
// IdentityEncoder is a null encoder that applies no transformation.
|
||||||
type IdentityEncoder struct{}
|
type IdentityEncoder struct{}
|
||||||
|
|
||||||
func NewIdentityEncoder() *IdentityEncoder {
|
func NewIdentityEncoder(options ...EncodingOption) (*IdentityEncoder, error) {
|
||||||
return &IdentityEncoder{}
|
if len(options) > 0 {
|
||||||
|
return nil, errors.New("identity encoder does not support options")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &IdentityEncoder{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*IdentityEncoder) Encode(data []byte) ([]byte, error) {
|
func (*IdentityEncoder) Encode(data []byte) ([]byte, error) {
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
@ -74,7 +75,8 @@ func TestZlibEncodeDecodeWithTooLargeMessage(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIdentityEncodeDecode(t *testing.T) {
|
func TestIdentityEncodeDecode(t *testing.T) {
|
||||||
enc := NewIdentityEncoder()
|
enc, err := NewIdentityEncoder()
|
||||||
|
require.NoError(t, err)
|
||||||
dec := NewIdentityDecoder()
|
dec := NewIdentityDecoder()
|
||||||
|
|
||||||
payload, err := enc.Encode([]byte("howdy"))
|
payload, err := enc.Encode([]byte("howdy"))
|
||||||
|
|
@ -120,6 +122,66 @@ func TestStreamGzipDecode(t *testing.T) {
|
||||||
require.Equal(t, []byte("howdy"), b[:n])
|
require.Equal(t, []byte("howdy"), b[:n])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCompressionLevel(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
algorithm string
|
||||||
|
validLevels []int
|
||||||
|
errormsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
algorithm: "gzip",
|
||||||
|
validLevels: []int{0, 1, 9},
|
||||||
|
errormsg: "invalid compression level",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
algorithm: "zlib",
|
||||||
|
validLevels: []int{0, 1, 9},
|
||||||
|
errormsg: "invalid compression level",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
algorithm: "identity",
|
||||||
|
errormsg: "does not support options",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
// Check default i.e. without specifying level
|
||||||
|
t.Run(tt.algorithm+" default", func(t *testing.T) {
|
||||||
|
enc, err := NewContentEncoder(tt.algorithm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, enc)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Check invalid level
|
||||||
|
t.Run(tt.algorithm+" invalid", func(t *testing.T) {
|
||||||
|
_, err := NewContentEncoder(tt.algorithm, WithCompressionLevel(11))
|
||||||
|
require.ErrorContains(t, err, tt.errormsg)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Check known levels 0..9
|
||||||
|
for level := 0; level < 10; level++ {
|
||||||
|
name := fmt.Sprintf("%s level %d", tt.algorithm, level)
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
var valid bool
|
||||||
|
for _, l := range tt.validLevels {
|
||||||
|
if l == level {
|
||||||
|
valid = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enc, err := NewContentEncoder(tt.algorithm, WithCompressionLevel(level))
|
||||||
|
if valid {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, enc)
|
||||||
|
} else {
|
||||||
|
require.ErrorContains(t, err, tt.errormsg)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkGzipEncode(b *testing.B) {
|
func BenchmarkGzipEncode(b *testing.B) {
|
||||||
data := []byte(strings.Repeat("-howdy stranger-", 64))
|
data := []byte(strings.Repeat("-howdy stranger-", 64))
|
||||||
dataLen := int64(len(data)) + 1
|
dataLen := int64(len(data)) + 1
|
||||||
|
|
@ -304,7 +366,8 @@ func BenchmarkIdentityEncodeDecode(b *testing.B) {
|
||||||
data := []byte(strings.Repeat("-howdy stranger-", 64))
|
data := []byte(strings.Repeat("-howdy stranger-", 64))
|
||||||
dataLen := int64(len(data)) + 1
|
dataLen := int64(len(data)) + 1
|
||||||
|
|
||||||
enc := NewIdentityEncoder()
|
enc, err := NewIdentityEncoder()
|
||||||
|
require.NoError(b, err)
|
||||||
dec := NewIdentityDecoder()
|
dec := NewIdentityDecoder()
|
||||||
|
|
||||||
payload, err := enc.Encode(data)
|
payload, err := enc.Encode(data)
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ func TestAutoEncoding(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
acc.AssertContainsFields(t, "measurementName", map[string]interface{}{"fieldKey": "gzip"})
|
acc.AssertContainsFields(t, "measurementName", map[string]interface{}{"fieldKey": "gzip"})
|
||||||
|
|
||||||
encIdentity := internal.NewIdentityEncoder()
|
encIdentity, err := internal.NewIdentityEncoder()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
payload, err = encIdentity.Encode([]byte(`measurementName2 fieldKey="identity" 1556813561098000000`))
|
payload, err = encIdentity.Encode([]byte(`measurementName2 fieldKey="identity" 1556813561098000000`))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue