chore(internal.gzip): cleanup CompressWithGzip (#12587)

This commit is contained in:
Sven Rebhan 2023-02-01 19:20:11 +01:00 committed by GitHub
parent e1db44c3b2
commit e6de0cc9c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 79 additions and 66 deletions

View File

@ -194,30 +194,39 @@ func (r *ReadWaitCloser) Close() error {
return err return err
} }
// CompressWithGzip takes an io.Reader as input and pipes // CompressWithGzip takes an io.Reader as input and pipes it through a
// it through a gzip.Writer returning an io.Reader containing // gzip.Writer returning an io.Reader containing the gzipped data.
// the gzipped data. // Errors occurring during compression are returned to the instance reading
// An error is returned if passing data to the gzip.Writer fails // from the returned reader via through the corresponding read call
func CompressWithGzip(data io.Reader) (io.ReadCloser, error) { // (e.g. io.Copy or io.ReadAll).
func CompressWithGzip(data io.Reader) io.ReadCloser {
pipeReader, pipeWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()
gzipWriter := gzip.NewWriter(pipeWriter) gzipWriter := gzip.NewWriter(pipeWriter)
rc := &ReadWaitCloser{ // Start copying from the uncompressed reader to the output reader
pipeReader: pipeReader, // in the background until the input reader is closed (or errors out).
}
rc.wg.Add(1)
var err error
go func() { go func() {
_, err = io.Copy(gzipWriter, data) // This copy will block until "data" reached EOF or an error occurs
gzipWriter.Close() _, err := io.Copy(gzipWriter, data)
// subsequent reads from the read half of the pipe will
// return no bytes and the error err, or EOF if err is nil. // Close the compression writer and make sure we do not overwrite
pipeWriter.CloseWithError(err) // the copy error if any.
rc.wg.Done() gzipErr := gzipWriter.Close()
if err == nil {
err = gzipErr
}
// Subsequent reads from the output reader (connected to "pipeWriter"
// via pipe) will return the copy (or closing) error if any to the
// instance reading from the reader returned by the CompressWithGzip
// function. If "err" is nil, the below function will correctly report
// io.EOF.
_ = pipeWriter.CloseWithError(err)
}() }()
return pipeReader, err // Return a reader which then can be read by the caller to collect the
// compressed stream.
return pipeReader
} }
// ParseTimestamp parses a Time according to the standard Telegraf options. // ParseTimestamp parses a Time according to the standard Telegraf options.

View File

@ -174,9 +174,7 @@ func TestCompressWithGzip(t *testing.T) {
testData := "the quick brown fox jumps over the lazy dog" testData := "the quick brown fox jumps over the lazy dog"
inputBuffer := bytes.NewBuffer([]byte(testData)) inputBuffer := bytes.NewBuffer([]byte(testData))
outputBuffer, err := CompressWithGzip(inputBuffer) outputBuffer := CompressWithGzip(inputBuffer)
require.NoError(t, err)
gzipReader, err := gzip.NewReader(outputBuffer) gzipReader, err := gzip.NewReader(outputBuffer)
require.NoError(t, err) require.NoError(t, err)
defer gzipReader.Close() defer gzipReader.Close()
@ -188,37 +186,71 @@ func TestCompressWithGzip(t *testing.T) {
} }
type mockReader struct { type mockReader struct {
readN uint64 // record the number of calls to Read err error
ncalls uint64 // record the number of calls to Read
msg []byte
} }
func (r *mockReader) Read(p []byte) (n int, err error) { func (r *mockReader) Read(p []byte) (n int, err error) {
r.readN++ r.ncalls++
return rand.Read(p)
if len(r.msg) > 0 {
n, err = copy(p, r.msg), io.EOF
} else {
n, err = rand.Read(p)
}
if r.err == nil {
return n, err
}
return n, r.err
} }
func TestCompressWithGzipEarlyClose(t *testing.T) { func TestCompressWithGzipEarlyClose(t *testing.T) {
mr := &mockReader{} mr := &mockReader{}
rc, err := CompressWithGzip(mr) rc := CompressWithGzip(mr)
require.NoError(t, err)
n, err := io.CopyN(io.Discard, rc, 10000) n, err := io.CopyN(io.Discard, rc, 10000)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int64(10000), n) require.Equal(t, int64(10000), n)
r1 := mr.readN r1 := mr.ncalls
err = rc.Close() require.NoError(t, rc.Close())
require.NoError(t, err)
n, err = io.CopyN(io.Discard, rc, 10000) n, err = io.CopyN(io.Discard, rc, 10000)
require.Error(t, io.EOF, err) require.ErrorIs(t, err, io.ErrClosedPipe)
require.Equal(t, int64(0), n) require.Equal(t, int64(0), n)
r2 := mr.readN r2 := mr.ncalls
// no more read to the source after closing // no more read to the source after closing
require.Equal(t, r1, r2) require.Equal(t, r1, r2)
} }
func TestCompressWithGzipErrorPropagationCopy(t *testing.T) {
errs := []error{io.ErrClosedPipe, io.ErrNoProgress, io.ErrUnexpectedEOF}
for _, expected := range errs {
r := &mockReader{msg: []byte("this is a test"), err: expected}
rc := CompressWithGzip(r)
n, err := io.Copy(io.Discard, rc)
require.Greater(t, n, int64(0))
require.ErrorIs(t, err, expected)
require.NoError(t, rc.Close())
}
}
func TestCompressWithGzipErrorPropagationReadAll(t *testing.T) {
errs := []error{io.ErrClosedPipe, io.ErrNoProgress, io.ErrUnexpectedEOF}
for _, expected := range errs {
r := &mockReader{msg: []byte("this is a test"), err: expected}
rc := CompressWithGzip(r)
buf, err := io.ReadAll(rc)
require.NotEmpty(t, buf)
require.ErrorIs(t, err, expected)
require.NoError(t, rc.Close())
}
}
func TestAlignDuration(t *testing.T) { func TestAlignDuration(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -2,7 +2,6 @@
package http package http
import ( import (
"bytes"
"context" "context"
_ "embed" _ "embed"
"fmt" "fmt"
@ -214,15 +213,7 @@ func makeRequestBodyReader(contentEncoding, body string) (io.Reader, error) {
var reader io.Reader = strings.NewReader(body) var reader io.Reader = strings.NewReader(body)
if contentEncoding == "gzip" { if contentEncoding == "gzip" {
rc, err := internal.CompressWithGzip(reader) return internal.CompressWithGzip(reader), nil
if err != nil {
return nil, err
}
data, err := io.ReadAll(rc)
if err != nil {
return nil, err
}
return bytes.NewReader(data), nil
} }
return reader, nil return reader, nil

View File

@ -136,10 +136,7 @@ func (h *HTTP) writeMetric(reqBody []byte) error {
var err error var err error
if h.ContentEncoding == "gzip" { if h.ContentEncoding == "gzip" {
rc, err := internal.CompressWithGzip(reqBodyBuffer) rc := internal.CompressWithGzip(reqBodyBuffer)
if err != nil {
return err
}
defer rc.Close() defer rc.Close()
reqBodyBuffer = rc reqBodyBuffer = rc
} }

View File

@ -485,12 +485,7 @@ func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) (io.ReadCloser
reader := influx.NewReader(metrics, c.config.Serializer) reader := influx.NewReader(metrics, c.config.Serializer)
if c.config.ContentEncoding == "gzip" { if c.config.ContentEncoding == "gzip" {
rc, err := internal.CompressWithGzip(reader) return internal.CompressWithGzip(reader), nil
if err != nil {
return nil, err
}
return rc, nil
} }
return io.NopCloser(reader), nil return io.NopCloser(reader), nil

View File

@ -391,12 +391,7 @@ func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) (io.ReadCloser
reader := influx.NewReader(metrics, c.serializer) reader := influx.NewReader(metrics, c.serializer)
if c.ContentEncoding == "gzip" { if c.ContentEncoding == "gzip" {
rc, err := internal.CompressWithGzip(reader) return internal.CompressWithGzip(reader), nil
if err != nil {
return nil, err
}
return rc, nil
} }
return io.NopCloser(reader), nil return io.NopCloser(reader), nil

View File

@ -143,10 +143,7 @@ func (l *Loki) writeMetrics(s Streams) error {
var reqBodyBuffer io.Reader = bytes.NewBuffer(bs) var reqBodyBuffer io.Reader = bytes.NewBuffer(bs)
if l.GZipRequest { if l.GZipRequest {
rc, err := internal.CompressWithGzip(reqBodyBuffer) rc := internal.CompressWithGzip(reqBodyBuffer)
if err != nil {
return err
}
defer rc.Close() defer rc.Close()
reqBodyBuffer = rc reqBodyBuffer = rc
} }

View File

@ -213,10 +213,7 @@ func (s *Sensu) writeMetrics(reqBody []byte) error {
method := http.MethodPost method := http.MethodPost
if s.ContentEncoding == "gzip" { if s.ContentEncoding == "gzip" {
rc, err := internal.CompressWithGzip(reqBodyBuffer) rc := internal.CompressWithGzip(reqBodyBuffer)
if err != nil {
return err
}
defer rc.Close() defer rc.Close()
reqBodyBuffer = rc reqBodyBuffer = rc
} }