feat(cloud_pubsub): Add support for gzip compression (#13094)

This commit is contained in:
Daniel Ayvar 2023-05-02 13:04:00 -05:00 committed by GitHub
parent 92b6d96486
commit 872d51714e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 341 additions and 111 deletions

View File

@ -101,6 +101,16 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## PubSub message data before parsing. Many GCP services that ## PubSub message data before parsing. Many GCP services that
## output JSON to Google PubSub base64-encode the JSON payload. ## output JSON to Google PubSub base64-encode the JSON payload.
# base64_data = false # base64_data = false
## Content encoding for message payloads, can be set to "gzip" or
## "identity" to apply no encoding.
# content_encoding = "identity"
## If content encoding is not "identity", sets the maximum allowed size,
## in bytes, for a message payload when it's decompressed. Can be increased
## for larger payloads or reduced to protect against decompression bombs.
## Acceptable units are B, KiB, KB, MiB, MB...
# max_decompression_size = "500MB"
``` ```
### Multiple Subscriptions and Topics ### Multiple Subscriptions and Topics

View File

@ -49,7 +49,9 @@ type PubSub struct {
Base64Data bool `toml:"base64_data"` Base64Data bool `toml:"base64_data"`
Log telegraf.Logger ContentEncoding string `toml:"content_encoding"`
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
Log telegraf.Logger `toml:"-"`
sub subscription sub subscription
stubSub func() subscription stubSub func() subscription
@ -62,6 +64,7 @@ type PubSub struct {
undelivered map[telegraf.TrackingID]message undelivered map[telegraf.TrackingID]message
sem semaphore sem semaphore
decoder internal.ContentDecoder
} }
func (*PubSub) SampleConfig() string { func (*PubSub) SampleConfig() string {
@ -82,14 +85,6 @@ func (ps *PubSub) SetParser(parser parsers.Parser) {
// Two goroutines are started - one pulling for the subscription, one // Two goroutines are started - one pulling for the subscription, one
// receiving delivery notifications from the accumulator. // receiving delivery notifications from the accumulator.
func (ps *PubSub) Start(ac telegraf.Accumulator) error { func (ps *PubSub) Start(ac telegraf.Accumulator) error {
if ps.Subscription == "" {
return fmt.Errorf(`"subscription" is required`)
}
if ps.Project == "" {
return fmt.Errorf(`"project" is required`)
}
ps.sem = make(semaphore, ps.MaxUndeliveredMessages) ps.sem = make(semaphore, ps.MaxUndeliveredMessages)
ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages) ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages)
@ -176,21 +171,20 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen) return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen)
} }
var data []byte data, err := ps.decompressData(msg.Data())
if ps.Base64Data { if err != nil {
strData, err := base64.StdEncoding.DecodeString(string(msg.Data())) return fmt.Errorf("unable to decompress %s message: %w", ps.ContentEncoding, err)
if err != nil { }
return fmt.Errorf("unable to base64 decode message: %w", err)
} data, err = ps.decodeB64Data(data)
data = strData if err != nil {
} else { return fmt.Errorf("unable to decode base64 message: %w", err)
data = msg.Data()
} }
metrics, err := ps.parser.Parse(data) metrics, err := ps.parser.Parse(data)
if err != nil { if err != nil {
msg.Ack() msg.Ack()
return err return fmt.Errorf("unable to parse message: %w", err)
} }
if len(metrics) == 0 { if len(metrics) == 0 {
@ -217,6 +211,31 @@ func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
return nil return nil
} }
func (ps *PubSub) decompressData(data []byte) ([]byte, error) {
if ps.ContentEncoding == "identity" {
return data, nil
}
data, err := ps.decoder.Decode(data, int64(ps.MaxDecompressionSize))
if err != nil {
return nil, err
}
decompressedData := make([]byte, len(data))
copy(decompressedData, data)
data = decompressedData
return data, nil
}
func (ps *PubSub) decodeB64Data(data []byte) ([]byte, error) {
if ps.Base64Data {
return base64.StdEncoding.DecodeString(string(data))
}
return data, nil
}
func (ps *PubSub) waitForDelivery(parentCtx context.Context) { func (ps *PubSub) waitForDelivery(parentCtx context.Context) {
for { for {
select { select {
@ -286,6 +305,35 @@ func (ps *PubSub) getGCPSubscription(subID string) (subscription, error) {
return &gcpSubscription{s}, nil return &gcpSubscription{s}, nil
} }
func (ps *PubSub) Init() error {
if ps.Subscription == "" {
return fmt.Errorf(`"subscription" is required`)
}
if ps.Project == "" {
return fmt.Errorf(`"project" is required`)
}
switch ps.ContentEncoding {
case "", "identity":
ps.ContentEncoding = "identity"
case "gzip":
var err error
ps.decoder, err = internal.NewContentDecoder(ps.ContentEncoding)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding)
}
if ps.MaxDecompressionSize <= 0 {
ps.MaxDecompressionSize = internal.DefaultMaxDecompressionSize
}
return nil
}
func init() { func init() {
inputs.Add("cloud_pubsub", func() telegraf.Input { inputs.Add("cloud_pubsub", func() telegraf.Input {
ps := &PubSub{ ps := &PubSub{

View File

@ -5,8 +5,10 @@ import (
"errors" "errors"
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -28,6 +30,8 @@ func TestRunParse(t *testing.T) {
} }
sub.receiver = testMessagesReceive(sub) sub.receiver = testMessagesReceive(sub)
decoder, _ := internal.NewContentDecoder("identity")
ps := &PubSub{ ps := &PubSub{
Log: testutil.Logger{}, Log: testutil.Logger{},
parser: testParser, parser: testParser,
@ -35,17 +39,15 @@ func TestRunParse(t *testing.T) {
Project: "projectIDontMatterForTests", Project: "projectIDontMatterForTests",
Subscription: subID, Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages, MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
} }
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil { require.NoError(t, ps.Init())
t.Fatalf("test PubSub failed to start: %s", err) require.NoError(t, ps.Start(acc))
}
defer ps.Stop() defer ps.Stop()
if ps.sub == nil { require.NotNil(t, ps.sub)
t.Fatal("expected plugin subscription to be non-nil")
}
testTracker := &testTracker{} testTracker := &testTracker{}
msg := &testMsg{ msg := &testMsg{
@ -73,6 +75,8 @@ func TestRunBase64(t *testing.T) {
} }
sub.receiver = testMessagesReceive(sub) sub.receiver = testMessagesReceive(sub)
decoder, _ := internal.NewContentDecoder("identity")
ps := &PubSub{ ps := &PubSub{
Log: testutil.Logger{}, Log: testutil.Logger{},
parser: testParser, parser: testParser,
@ -81,17 +85,15 @@ func TestRunBase64(t *testing.T) {
Subscription: subID, Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages, MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
Base64Data: true, Base64Data: true,
decoder: decoder,
} }
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil { require.NoError(t, ps.Init())
t.Fatalf("test PubSub failed to start: %s", err) require.NoError(t, ps.Start(acc))
}
defer ps.Stop() defer ps.Stop()
if ps.sub == nil { require.NotNil(t, ps.sub)
t.Fatal("expected plugin subscription to be non-nil")
}
testTracker := &testTracker{} testTracker := &testTracker{}
msg := &testMsg{ msg := &testMsg{
@ -106,6 +108,55 @@ func TestRunBase64(t *testing.T) {
validateTestInfluxMetric(t, metric) validateTestInfluxMetric(t, metric)
} }
func TestRunGzipDecode(t *testing.T) {
subID := "sub-run-gzip"
testParser := &influx.Parser{}
require.NoError(t, testParser.Init())
sub := &stubSub{
id: subID,
messages: make(chan *testMsg, 100),
}
sub.receiver = testMessagesReceive(sub)
decoder, err := internal.NewContentDecoder("gzip")
require.NoError(t, err)
ps := &PubSub{
Log: testutil.Logger{},
parser: testParser,
stubSub: func() subscription { return sub },
Project: "projectIDontMatterForTests",
Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
ContentEncoding: "gzip",
MaxDecompressionSize: internal.DefaultMaxDecompressionSize,
decoder: decoder,
}
acc := &testutil.Accumulator{}
require.NoError(t, ps.Init())
require.NoError(t, ps.Start(acc))
defer ps.Stop()
require.NotNil(t, ps.sub)
testTracker := &testTracker{}
enc := internal.NewGzipEncoder()
gzippedMsg, err := enc.Encode([]byte(msgInflux))
require.NoError(t, err)
msg := &testMsg{
value: string(gzippedMsg),
tracker: testTracker,
}
sub.messages <- msg
acc.Wait(1)
assert.Equal(t, acc.NFields(), 1)
metric := acc.Metrics[0]
validateTestInfluxMetric(t, metric)
}
func TestRunInvalidMessages(t *testing.T) { func TestRunInvalidMessages(t *testing.T) {
subID := "sub-invalid-messages" subID := "sub-invalid-messages"
@ -118,6 +169,8 @@ func TestRunInvalidMessages(t *testing.T) {
} }
sub.receiver = testMessagesReceive(sub) sub.receiver = testMessagesReceive(sub)
decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{ ps := &PubSub{
Log: testutil.Logger{}, Log: testutil.Logger{},
parser: testParser, parser: testParser,
@ -125,17 +178,16 @@ func TestRunInvalidMessages(t *testing.T) {
Project: "projectIDontMatterForTests", Project: "projectIDontMatterForTests",
Subscription: subID, Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages, MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
} }
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
if err := ps.Start(acc); err != nil { require.NoError(t, ps.Init())
t.Fatalf("test PubSub failed to start: %s", err) require.NoError(t, ps.Start(acc))
}
defer ps.Stop() defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil") require.NotNil(t, ps.sub)
}
testTracker := &testTracker{} testTracker := &testTracker{}
msg := &testMsg{ msg := &testMsg{
@ -166,6 +218,8 @@ func TestRunOverlongMessages(t *testing.T) {
} }
sub.receiver = testMessagesReceive(sub) sub.receiver = testMessagesReceive(sub)
decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{ ps := &PubSub{
Log: testutil.Logger{}, Log: testutil.Logger{},
parser: testParser, parser: testParser,
@ -173,17 +227,16 @@ func TestRunOverlongMessages(t *testing.T) {
Project: "projectIDontMatterForTests", Project: "projectIDontMatterForTests",
Subscription: subID, Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages, MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
// Add MaxMessageLen Param // Add MaxMessageLen Param
MaxMessageLen: 1, MaxMessageLen: 1,
} }
if err := ps.Start(acc); err != nil { require.NoError(t, ps.Init())
t.Fatalf("test PubSub failed to start: %s", err) require.NoError(t, ps.Start(acc))
}
defer ps.Stop() defer ps.Stop()
if ps.sub == nil {
t.Fatal("expected plugin subscription to be non-nil") require.NotNil(t, ps.sub)
}
testTracker := &testTracker{} testTracker := &testTracker{}
msg := &testMsg{ msg := &testMsg{
@ -215,6 +268,8 @@ func TestRunErrorInSubscriber(t *testing.T) {
fakeErrStr := "a fake error" fakeErrStr := "a fake error"
sub.receiver = testMessagesError(errors.New("a fake error")) sub.receiver = testMessagesError(errors.New("a fake error"))
decoder, err := internal.NewContentDecoder("identity")
require.NoError(t, err)
ps := &PubSub{ ps := &PubSub{
Log: testutil.Logger{}, Log: testutil.Logger{},
parser: testParser, parser: testParser,
@ -222,17 +277,16 @@ func TestRunErrorInSubscriber(t *testing.T) {
Project: "projectIDontMatterForTests", Project: "projectIDontMatterForTests",
Subscription: subID, Subscription: subID,
MaxUndeliveredMessages: defaultMaxUndeliveredMessages, MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
decoder: decoder,
RetryReceiveDelaySeconds: 1, RetryReceiveDelaySeconds: 1,
} }
if err := ps.Start(acc); err != nil { require.NoError(t, ps.Init())
t.Fatalf("test PubSub failed to start: %s", err) require.NoError(t, ps.Start(acc))
}
defer ps.Stop() defer ps.Stop()
if ps.sub == nil { require.NotNil(t, ps.sub)
t.Fatal("expected plugin subscription to be non-nil")
}
acc.WaitError(1) acc.WaitError(1)
require.Regexp(t, fakeErrStr, acc.Errors[0]) require.Regexp(t, fakeErrStr, acc.Errors[0])
} }

View File

@ -73,3 +73,13 @@
## PubSub message data before parsing. Many GCP services that ## PubSub message data before parsing. Many GCP services that
## output JSON to Google PubSub base64-encode the JSON payload. ## output JSON to Google PubSub base64-encode the JSON payload.
# base64_data = false # base64_data = false
## Content encoding for message payloads, can be set to "gzip" or
## "identity" to apply no encoding.
# content_encoding = "identity"
## If content encoding is not "identity", sets the maximum allowed size,
## in bytes, for a message payload when it's decompressed. Can be increased
## for larger payloads or reduced to protect against decompression bombs.
## Acceptable units are B, KiB, KB, MiB, MB...
# max_decompression_size = "500MB"

View File

@ -24,6 +24,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Required. Name of PubSub topic to publish metrics to. ## Required. Name of PubSub topic to publish metrics to.
topic = "my-topic" topic = "my-topic"
## Content encoding for message payloads, can be set to "gzip" or
## "identity" to apply no encoding.
# content_encoding = "identity"
## Required. Data format to consume. ## Required. Data format to consume.
## Each data format has its own unique set of configuration options. ## Each data format has its own unique set of configuration options.
## Read more about them here: ## Read more about them here:

View File

@ -35,6 +35,7 @@ type PubSub struct {
PublishNumGoroutines int `toml:"publish_num_go_routines"` PublishNumGoroutines int `toml:"publish_num_go_routines"`
PublishTimeout config.Duration `toml:"publish_timeout"` PublishTimeout config.Duration `toml:"publish_timeout"`
Base64Data bool `toml:"base64_data"` Base64Data bool `toml:"base64_data"`
ContentEncoding string `toml:"content_encoding"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
@ -45,6 +46,7 @@ type PubSub struct {
serializer serializers.Serializer serializer serializers.Serializer
publishResults []publishResult publishResults []publishResult
encoder internal.ContentEncoder
} }
func (*PubSub) SampleConfig() string { func (*PubSub) SampleConfig() string {
@ -56,17 +58,10 @@ func (ps *PubSub) SetSerializer(serializer serializers.Serializer) {
} }
func (ps *PubSub) Connect() error { func (ps *PubSub) Connect() error {
if ps.Topic == "" {
return fmt.Errorf(`"topic" is required`)
}
if ps.Project == "" {
return fmt.Errorf(`"project" is required`)
}
if ps.stubTopic == nil { if ps.stubTopic == nil {
return ps.initPubSubClient() return ps.initPubSubClient()
} }
return nil return nil
} }
@ -168,9 +163,11 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
return nil, err return nil, err
} }
if ps.Base64Data { b = ps.encodeB64Data(b)
encoded := base64.StdEncoding.EncodeToString(b)
b = []byte(encoded) b, err = ps.compressData(b)
if err != nil {
return nil, fmt.Errorf("unable to compress message with %s: %w", ps.ContentEncoding, err)
} }
msg := &pubsub.Message{Data: b} msg := &pubsub.Message{Data: b}
@ -188,9 +185,12 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
continue continue
} }
if ps.Base64Data { b = ps.encodeB64Data(b)
encoded := base64.StdEncoding.EncodeToString(b)
b = []byte(encoded) b, err = ps.compressData(b)
if err != nil {
ps.Log.Errorf("unable to compress message with %s: %w", ps.ContentEncoding, err)
continue
} }
msg := &pubsub.Message{ msg := &pubsub.Message{
@ -205,6 +205,32 @@ func (ps *PubSub) toMessages(metrics []telegraf.Metric) ([]*pubsub.Message, erro
return msgs, nil return msgs, nil
} }
func (ps *PubSub) encodeB64Data(data []byte) []byte {
if ps.Base64Data {
encoded := base64.StdEncoding.EncodeToString(data)
data = []byte(encoded)
}
return data
}
func (ps *PubSub) compressData(data []byte) ([]byte, error) {
if ps.ContentEncoding == "identity" {
return data, nil
}
data, err := ps.encoder.Encode(data)
if err != nil {
return nil, err
}
compressedData := make([]byte, len(data))
copy(compressedData, data)
data = compressedData
return data, nil
}
func (ps *PubSub) waitForResults(ctx context.Context, cancel context.CancelFunc) error { func (ps *PubSub) waitForResults(ctx context.Context, cancel context.CancelFunc) error {
var pErr error var pErr error
var setErr sync.Once var setErr sync.Once
@ -230,6 +256,31 @@ func (ps *PubSub) waitForResults(ctx context.Context, cancel context.CancelFunc)
return pErr return pErr
} }
func (ps *PubSub) Init() error {
if ps.Topic == "" {
return fmt.Errorf(`"topic" is required`)
}
if ps.Project == "" {
return fmt.Errorf(`"project" is required`)
}
switch ps.ContentEncoding {
case "", "identity":
ps.ContentEncoding = "identity"
case "gzip":
var err error
ps.encoder, err = internal.NewContentEncoder(ps.ContentEncoding)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding)
}
return nil
}
func init() { func init() {
outputs.Add("cloud_pubsub", func() telegraf.Output { outputs.Add("cloud_pubsub", func() telegraf.Output {
return &PubSub{} return &PubSub{}

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -21,10 +22,7 @@ func TestPubSub_WriteSingle(t *testing.T) {
settings.CountThreshold = 1 settings.CountThreshold = 1
ps, topic, metrics := getTestResources(t, settings, testMetrics) ps, topic, metrics := getTestResources(t, settings, testMetrics)
err := ps.Write(metrics) require.NoError(t, ps.Write(metrics))
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyRawMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
@ -43,10 +41,7 @@ func TestPubSub_WriteWithAttribute(t *testing.T) {
"foo2": "bar2", "foo2": "bar2",
} }
err := ps.Write(metrics) require.NoError(t, ps.Write(metrics))
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics { for _, testM := range testMetrics {
msg := verifyRawMetricPublished(t, testM.m, topic.published) msg := verifyRawMetricPublished(t, testM.m, topic.published)
@ -65,10 +60,7 @@ func TestPubSub_WriteMultiple(t *testing.T) {
ps, topic, metrics := getTestResources(t, settings, testMetrics) ps, topic, metrics := getTestResources(t, settings, testMetrics)
err := ps.Write(metrics) require.NoError(t, ps.Write(metrics))
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyRawMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
@ -89,10 +81,7 @@ func TestPubSub_WriteOverCountThreshold(t *testing.T) {
ps, topic, metrics := getTestResources(t, settings, testMetrics) ps, topic, metrics := getTestResources(t, settings, testMetrics)
err := ps.Write(metrics) require.NoError(t, ps.Write(metrics))
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyRawMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
@ -112,10 +101,7 @@ func TestPubSub_WriteOverByteThreshold(t *testing.T) {
ps, topic, metrics := getTestResources(t, settings, testMetrics) ps, topic, metrics := getTestResources(t, settings, testMetrics)
err := ps.Write(metrics) require.NoError(t, ps.Write(metrics))
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyRawMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
@ -133,14 +119,12 @@ func TestPubSub_WriteBase64Single(t *testing.T) {
settings.CountThreshold = 1 settings.CountThreshold = 1
ps, topic, metrics := getTestResources(t, settings, testMetrics) ps, topic, metrics := getTestResources(t, settings, testMetrics)
ps.Base64Data = true ps.Base64Data = true
topic.Base64Data = true
err := ps.Write(metrics) require.NoError(t, ps.Write(metrics))
if err != nil {
t.Fatalf("got unexpected error: %v", err)
}
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published, true /* base64encoded */) verifyMetricPublished(t, testM.m, topic.published, true /* base64encoded */, false /* gzipEncoded */)
} }
} }
@ -155,24 +139,64 @@ func TestPubSub_Error(t *testing.T) {
ps, _, metrics := getTestResources(t, settings, testMetrics) ps, _, metrics := getTestResources(t, settings, testMetrics)
err := ps.Write(metrics) err := ps.Write(metrics)
if err == nil { require.Error(t, err)
t.Fatalf("expected error") require.ErrorContains(t, err, errMockFail)
}
func TestPubSub_WriteGzipSingle(t *testing.T) {
testMetrics := []testMetric{
{testutil.TestMetric("value_1", "test"), false /*return error */},
{testutil.TestMetric("value_2", "test"), false},
} }
if err.Error() != errMockFail {
t.Fatalf("expected fake error, got %v", err) settings := pubsub.DefaultPublishSettings
settings.CountThreshold = 1
ps, topic, metrics := getTestResources(t, settings, testMetrics)
topic.ContentEncoding = "gzip"
ps.ContentEncoding = "gzip"
var err error
ps.encoder, err = internal.NewContentEncoder(ps.ContentEncoding)
require.NoError(t, err)
require.NoError(t, ps.Write(metrics))
for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published, false /* base64encoded */, true /* Gzipencoded */)
}
}
func TestPubSub_WriteGzipAndBase64Single(t *testing.T) {
testMetrics := []testMetric{
{testutil.TestMetric("value_1", "test"), false /*return error */},
{testutil.TestMetric("value_2", "test"), false},
}
settings := pubsub.DefaultPublishSettings
settings.CountThreshold = 1
ps, topic, metrics := getTestResources(t, settings, testMetrics)
topic.ContentEncoding = "gzip"
topic.Base64Data = true
ps.ContentEncoding = "gzip"
ps.Base64Data = true
var err error
ps.encoder, err = internal.NewContentEncoder(ps.ContentEncoding)
require.NoError(t, err)
require.NoError(t, ps.Write(metrics))
for _, testM := range testMetrics {
verifyMetricPublished(t, testM.m, topic.published, true /* base64encoded */, true /* Gzipencoded */)
} }
} }
func verifyRawMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message { func verifyRawMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message) *pubsub.Message {
return verifyMetricPublished(t, m, published, false) return verifyMetricPublished(t, m, published, false, false)
} }
func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message, base64Encoded bool) *pubsub.Message { func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string]*pubsub.Message, base64Encoded bool, gzipEncoded bool) *pubsub.Message {
p := influx.Parser{} p := influx.Parser{}
err := p.Init() require.NoError(t, p.Init())
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
}
v, _ := m.GetField("value") v, _ := m.GetField("value")
psMsg, ok := published[v.(string)] psMsg, ok := published[v.(string)]
if !ok { if !ok {
@ -180,8 +204,18 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string
} }
data := psMsg.Data data := psMsg.Data
if gzipEncoded {
decoder, _ := internal.NewContentDecoder("gzip")
var err error
data, err = decoder.Decode(data, internal.DefaultMaxDecompressionSize)
if err != nil {
t.Fatalf("Unable to decode expected gzip encoded message: %s", err)
}
}
if base64Encoded { if base64Encoded {
v, err := base64.StdEncoding.DecodeString(string(psMsg.Data)) v, err := base64.StdEncoding.DecodeString(string(data))
if err != nil { if err != nil {
t.Fatalf("Unable to decode expected base64-encoded message: %s", err) t.Fatalf("Unable to decode expected base64-encoded message: %s", err)
} }
@ -190,7 +224,7 @@ func verifyMetricPublished(t *testing.T, m telegraf.Metric, published map[string
parsed, err := p.Parse(data) parsed, err := p.Parse(data)
if err != nil { if err != nil {
t.Fatalf("could not parse influxdb metric from published message: %s", string(psMsg.Data)) t.Fatalf("could not parse influxdb metric from published message: %s", string(data))
} }
if len(parsed) > 1 { if len(parsed) > 1 {
t.Fatalf("expected only one influxdb metric per published message, got %d", len(published)) t.Fatalf("expected only one influxdb metric per published message, got %d", len(published))

View File

@ -7,6 +7,10 @@
## Required. Name of PubSub topic to publish metrics to. ## Required. Name of PubSub topic to publish metrics to.
topic = "my-topic" topic = "my-topic"
## Content encoding for message payloads, can be set to "gzip" or
## "identity" to apply no encoding.
# content_encoding = "identity"
## Required. Data format to consume. ## Required. Data format to consume.
## Each data format has its own unique set of configuration options. ## Each data format has its own unique set of configuration options.
## Read more about them here: ## Read more about them here:

View File

@ -16,6 +16,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
serializer "github.com/influxdata/telegraf/plugins/serializers/influx" serializer "github.com/influxdata/telegraf/plugins/serializers/influx"
@ -49,6 +50,9 @@ type (
ReturnErr map[string]bool ReturnErr map[string]bool
parsers.Parser parsers.Parser
*testing.T *testing.T
Base64Data bool
ContentEncoding string
MaxDecompressionSize int64
stopped bool stopped bool
pLock sync.Mutex pLock sync.Mutex
@ -68,9 +72,11 @@ func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []te
metrics := make([]telegraf.Metric, 0, len(testM)) metrics := make([]telegraf.Metric, 0, len(testM))
t := &stubTopic{ t := &stubTopic{
T: tT, T: tT,
ReturnErr: make(map[string]bool), ReturnErr: make(map[string]bool),
published: make(map[string]*pubsub.Message), published: make(map[string]*pubsub.Message),
ContentEncoding: "identity",
MaxDecompressionSize: internal.DefaultMaxDecompressionSize,
} }
for _, tm := range testM { for _, tm := range testM {
@ -89,7 +95,11 @@ func getTestResources(tT *testing.T, settings pubsub.PublishSettings, testM []te
PublishByteThreshold: settings.ByteThreshold, PublishByteThreshold: settings.ByteThreshold,
PublishNumGoroutines: settings.NumGoroutines, PublishNumGoroutines: settings.NumGoroutines,
PublishTimeout: config.Duration(settings.Timeout), PublishTimeout: config.Duration(settings.Timeout),
ContentEncoding: "identity",
} }
require.NoError(tT, ps.Init())
ps.encoder, _ = internal.NewContentEncoder(ps.ContentEncoding)
ps.SetSerializer(s) ps.SetSerializer(s)
return ps, t, metrics return ps, t, metrics
@ -185,17 +195,22 @@ func (t *stubTopic) parseIDs(msg *pubsub.Message) []string {
p := influx.Parser{} p := influx.Parser{}
err := p.Init() err := p.Init()
require.NoError(t, err) require.NoError(t, err)
metrics, err := p.Parse(msg.Data)
decoder, _ := internal.NewContentDecoder(t.ContentEncoding)
d, err := decoder.Decode(msg.Data, t.MaxDecompressionSize)
if err != nil { if err != nil {
// Just attempt to base64-decode first before returning error. t.Errorf("unable to decode message: %v", err)
d, err := base64.StdEncoding.DecodeString(string(msg.Data)) }
if t.Base64Data {
strData, err := base64.StdEncoding.DecodeString(string(d))
if err != nil { if err != nil {
t.Errorf("unable to base64-decode potential test message: %v", err) t.Errorf("unable to base64 decode message: %v", err)
}
metrics, err = p.Parse(d)
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
} }
d = strData
}
metrics, err := p.Parse(d)
if err != nil {
t.Fatalf("unexpected parsing error: %v", err)
} }
ids := make([]string, 0, len(metrics)) ids := make([]string, 0, len(metrics))