feat(inputs.amqp_consumer): Determine content encoding automatically (#11860)

This commit is contained in:
Sebastian Spaink 2022-09-26 14:28:32 -05:00 committed by GitHub
parent a2baab3d36
commit d982ed9a45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 105 additions and 4 deletions

View File

@ -82,6 +82,32 @@ func NewContentEncoder(encoding string) (ContentEncoder, error) {
}
}
type AutoDecoder struct {
encoding string
gzip *GzipDecoder
identity *IdentityDecoder
}
func (a *AutoDecoder) SetEnconding(encoding string) {
a.encoding = encoding
}
func (a *AutoDecoder) Decode(data []byte) ([]byte, error) {
if a.encoding == "gzip" {
return a.gzip.Decode(data)
}
return a.identity.Decode(data)
}
func NewAutoContentDecoder() (*AutoDecoder, error) {
var a AutoDecoder
var err error
a.identity = NewIdentityDecoder()
a.gzip, err = NewGzipDecoder()
return &a, err
}
// NewContentDecoder returns a ContentDecoder for the encoding type.
func NewContentDecoder(encoding string) (ContentDecoder, error) {
switch encoding {
@ -91,6 +117,8 @@ func NewContentDecoder(encoding string) (ContentDecoder, error) {
return NewZlibDecoder()
case "identity", "":
return NewIdentityDecoder(), nil
case "auto":
return NewAutoContentDecoder()
default:
return nil, errors.New("invalid value for content_encoding")
}
@ -171,6 +199,7 @@ func (*IdentityEncoder) Encode(data []byte) ([]byte, error) {
// ContentDecoder removes a wrapper encoding from byte buffers.
type ContentDecoder interface {
SetEnconding(string)
Decode([]byte) ([]byte, error)
}
@ -187,6 +216,8 @@ func NewGzipDecoder() (*GzipDecoder, error) {
}, nil
}
func (*GzipDecoder) SetEnconding(string) {}
func (d *GzipDecoder) Decode(data []byte) ([]byte, error) {
d.reader.Reset(bytes.NewBuffer(data))
d.buf.Reset()
@ -212,6 +243,8 @@ func NewZlibDecoder() (*ZlibDecoder, error) {
}, nil
}
func (*ZlibDecoder) SetEnconding(string) {}
func (d *ZlibDecoder) Decode(data []byte) ([]byte, error) {
d.buf.Reset()
@ -238,6 +271,8 @@ func NewIdentityDecoder() *IdentityDecoder {
return &IdentityDecoder{}
}
func (*IdentityDecoder) SetEnconding(string) {}
func (*IdentityDecoder) Decode(data []byte) ([]byte, error) {
return data, nil
}

View File

@ -82,8 +82,11 @@ For an introduction to AMQP see:
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
## Content encoding for message payloads, can be set to
## "gzip", "identity" or "auto"
## - Use "gzip" to decode gzip
## - Use "identity" to apply no encoding
## - Use "auto" determine the encoding using the ContentEncoding header
# content_encoding = "identity"
## Data format to consume.
@ -92,3 +95,11 @@ For an introduction to AMQP see:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```
## Metrics
TODO
## Example Output
TODO

View File

@ -395,6 +395,7 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive
}
}
a.decoder.SetEnconding(d.ContentEncoding)
body, err := a.decoder.Decode(d.Body)
if err != nil {
onError()

View File

@ -0,0 +1,51 @@
package amqp_consumer
import (
"testing"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
"github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/require"
)
func TestAutoEncoding(t *testing.T) {
enc, err := internal.NewGzipEncoder()
require.NoError(t, err)
payload, err := enc.Encode([]byte(`measurementName fieldKey="gzip" 1556813561098000000`))
require.NoError(t, err)
var a AMQPConsumer
parser := &influx.Parser{}
require.NoError(t, parser.Init())
a.deliveries = make(map[telegraf.TrackingID]amqp091.Delivery)
a.parser = parser
a.decoder, err = internal.NewContentDecoder("auto")
require.NoError(t, err)
acc := &testutil.Accumulator{}
d := amqp091.Delivery{
ContentEncoding: "gzip",
Body: payload,
}
err = a.onMessage(acc, d)
require.NoError(t, err)
acc.AssertContainsFields(t, "measurementName", map[string]interface{}{"fieldKey": "gzip"})
encIdentity := internal.NewIdentityEncoder()
require.NoError(t, err)
payload, err = encIdentity.Encode([]byte(`measurementName2 fieldKey="identity" 1556813561098000000`))
require.NoError(t, err)
d = amqp091.Delivery{
ContentEncoding: "not_gzip",
Body: payload,
}
err = a.onMessage(acc, d)
require.NoError(t, err)
acc.AssertContainsFields(t, "measurementName2", map[string]interface{}{"fieldKey": "identity"})
}

View File

@ -63,8 +63,11 @@
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
## Content encoding for message payloads, can be set to
## "gzip", "identity" or "auto"
## - Use "gzip" to decode gzip
## - Use "identity" to apply no encoding
## - Use "auto" determine the encoding using the ContentEncoding header
# content_encoding = "identity"
## Data format to consume.