Fix data race in plugin output pubsub tests (#7782)

This commit is contained in:
Jakub Warczarek 2020-07-03 18:01:22 +02:00 committed by GitHub
parent df26b037cb
commit 31407141cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 7 deletions

View File

@ -3,8 +3,10 @@ package cloud_pubsub
import ( import (
"testing" "testing"
"cloud.google.com/go/pubsub"
"encoding/base64" "encoding/base64"
"cloud.google.com/go/pubsub"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
@ -73,7 +75,7 @@ func TestPubSub_WriteMultiple(t *testing.T) {
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyRawMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
} }
assert.Equalf(t, 1, topic.bundleCount, "unexpected bundle count") assert.Equalf(t, 1, topic.getBundleCount(), "unexpected bundle count")
} }
func TestPubSub_WriteOverCountThreshold(t *testing.T) { func TestPubSub_WriteOverCountThreshold(t *testing.T) {
@ -97,7 +99,7 @@ func TestPubSub_WriteOverCountThreshold(t *testing.T) {
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyRawMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
} }
assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count") assert.Equalf(t, 2, topic.getBundleCount(), "unexpected bundle count")
} }
func TestPubSub_WriteOverByteThreshold(t *testing.T) { func TestPubSub_WriteOverByteThreshold(t *testing.T) {
@ -120,7 +122,7 @@ func TestPubSub_WriteOverByteThreshold(t *testing.T) {
for _, testM := range testMetrics { for _, testM := range testMetrics {
verifyRawMetricPublished(t, testM.m, topic.published) verifyRawMetricPublished(t, testM.m, topic.published)
} }
assert.Equalf(t, 2, topic.bundleCount, "unexpected bundle count") assert.Equalf(t, 2, topic.getBundleCount(), "unexpected bundle count")
} }
func TestPubSub_WriteBase64Single(t *testing.T) { func TestPubSub_WriteBase64Single(t *testing.T) {

View File

@ -9,8 +9,9 @@ import (
"testing" "testing"
"time" "time"
"cloud.google.com/go/pubsub"
"encoding/base64" "encoding/base64"
"cloud.google.com/go/pubsub"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
@ -123,8 +124,7 @@ func (t *stubTopic) Publish(ctx context.Context, msg *pubsub.Message) publishRes
} }
bundled := &bundledMsg{msg, r} bundled := &bundledMsg{msg, r}
err := t.bundler.Add(bundled, len(msg.Data)) if err := t.bundler.Add(bundled, len(msg.Data)); err != nil {
if err != nil {
t.Fatalf("unexpected error while adding to bundle: %v", err) t.Fatalf("unexpected error while adding to bundle: %v", err)
} }
return r return r
@ -210,3 +210,9 @@ func (r *stubResult) Get(ctx context.Context) (string, error) {
return fmt.Sprintf("id-%s", r.metricIds[0]), nil return fmt.Sprintf("id-%s", r.metricIds[0]), nil
} }
} }
func (t *stubTopic) getBundleCount() int {
t.bLock.Lock()
defer t.bLock.Unlock()
return t.bundleCount
}