fix: error returned to OpenTelemetry client (#9797)

This commit is contained in:
Jacob Marble 2021-09-23 09:05:29 -07:00 committed by GitHub
parent ceae37d66e
commit fb088bd69c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 131 additions and 8 deletions

16
go.mod
View File

@ -61,7 +61,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.1.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.2.2 // indirect
github.com/aws/smithy-go v1.8.0
github.com/benbjohnson/clock v1.0.3
github.com/benbjohnson/clock v1.1.0
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-hostpool v0.1.0 // indirect
github.com/bmatcuk/doublestar/v3 v3.0.0
@ -271,6 +271,9 @@ require (
go.mongodb.org/mongo-driver v1.5.3
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/model v0.35.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.23.0
go.opentelemetry.io/otel/metric v0.23.0
go.opentelemetry.io/otel/sdk/metric v0.23.0
go.starlark.net v0.0.0-20210406145628-7a1108eaa012
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
@ -329,6 +332,17 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)
require (
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
go.opentelemetry.io/otel v1.0.0-RC3 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.23.0 // indirect
go.opentelemetry.io/otel/internal/metric v0.23.0 // indirect
go.opentelemetry.io/otel/sdk v1.0.0-RC3 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.23.0 // indirect
go.opentelemetry.io/otel/trace v1.0.0-RC3 // indirect
go.opentelemetry.io/proto/otlp v0.9.0 // indirect
)
// replaced due to https://github.com/satori/go.uuid/issues/73
replace github.com/satori/go.uuid => github.com/gofrs/uuid v3.2.0+incompatible

27
go.sum
View File

@ -291,8 +291,8 @@ github.com/aws/smithy-go v1.0.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB
github.com/aws/smithy-go v1.3.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.8.0 h1:AEwwwXQZtUwP5Mz506FeXXrKBe0jA8gVM+1gEcSRooc=
github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
@ -333,6 +333,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEe
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
@ -1639,7 +1641,27 @@ go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/collector/model v0.35.0 h1:NpKjghiqlei4ecwjOYOMhD6tj4gY8yiWHPJmbFs/ArI=
go.opentelemetry.io/collector/model v0.35.0/go.mod h1:+7YCSjJG+MqiIFjauzt7oM2qkqBsaJWh5hcsO4fwsAc=
go.opentelemetry.io/otel v1.0.0-RC3 h1:kvwiyEkiUT/JaadXzVLI/R1wDO934A7r3Bs2wEe6wqA=
go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.23.0 h1:vKIEsT6IJU0NYd+iZccjgCmk80zsa7dTiC2Bu7U1jz0=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.23.0/go.mod h1:pe9oOWRaZyapdajWCn64fnl76v3cmTEmNBgh7MkKvwE=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.23.0 h1:JSsJID+KU3G8wxynfHIlWaefOvYngDjnrmtHOGb1sb0=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.23.0/go.mod h1:aSP5oMNaAfOYq+sRydHANZ0vBYLyZR/3lR9pru9aPLk=
go.opentelemetry.io/otel/internal/metric v0.23.0 h1:mPfzm9Iqhw7G2nDBmUAjFTfPqLZPbOW2k7QI57ITbaI=
go.opentelemetry.io/otel/internal/metric v0.23.0/go.mod h1:z+RPiDJe30YnCrOhFGivwBS+DU1JU/PiLKkk4re2DNY=
go.opentelemetry.io/otel/metric v0.23.0 h1:mYCcDxi60P4T27/0jchIDFa1WHEfQeU3zH9UEMpnj2c=
go.opentelemetry.io/otel/metric v0.23.0/go.mod h1:G/Nn9InyNnIv7J6YVkQfpc0JCfKBNJaERBGw08nqmVQ=
go.opentelemetry.io/otel/sdk v1.0.0-RC3 h1:iRMkET+EmJUn5mW0hJzygBraXRmrUwzbOtNvTCh/oKs=
go.opentelemetry.io/otel/sdk v1.0.0-RC3/go.mod h1:78H6hyg2fka0NYT9fqGuFLvly2yCxiBXDJAgLKo/2Us=
go.opentelemetry.io/otel/sdk/export/metric v0.23.0 h1:7NeoKPPx6NdZBVHLEp/LY5Lq85Ff1WNZnuJkuRy+azw=
go.opentelemetry.io/otel/sdk/export/metric v0.23.0/go.mod h1:SuMiREmKVRIwFKq73zvGTvwFpxb/ZAYkMfyqMoOtDqs=
go.opentelemetry.io/otel/sdk/metric v0.23.0 h1:xlZhPbiue1+jjSFEth94q9QCmX8Q24mOtue9IAmlVyI=
go.opentelemetry.io/otel/sdk/metric v0.23.0/go.mod h1:wa0sKK13eeIFW+0OFjcC3S1i7FTRRiLAXe1kjBVbhwg=
go.opentelemetry.io/otel/trace v1.0.0-RC3 h1:9F0ayEvlxv8BmNmPbU005WK7hC+7KbOazCPZjNa1yME=
go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.9.0 h1:C0g6TWmQYvjKRnljRULLWUVJGy8Uvu0NEL/5frY2/t4=
go.opentelemetry.io/proto/otlp v0.9.0/go.mod h1:1vKfU9rv61e9EVGthD1zNvUbiwPcimSsOPU9brfSHJg=
go.starlark.net v0.0.0-20210406145628-7a1108eaa012 h1:4RGobP/iq7S22H0Bb92OEt+M8/cfBQnW+T+a2MC0sQo=
go.starlark.net v0.0.0-20210406145628-7a1108eaa012/go.mod h1:t3mmBBPzAVvK0L0n1drDmrQsJ8FoIx4INCqVMTr/Zo0=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
@ -1944,6 +1966,7 @@ golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

View File

@ -56,7 +56,7 @@ func newMetricsService(logger common.Logger, writer *writeToAccumulator, schema
func (s *metricsService) Export(ctx context.Context, req pdata.Metrics) (otlpgrpc.MetricsResponse, error) {
err := s.converter.WriteMetrics(ctx, req, s.writer)
return otlpgrpc.MetricsResponse{}, err
return otlpgrpc.NewMetricsResponse(), err
}
type logsService struct {

View File

@ -24,6 +24,7 @@ type OpenTelemetry struct {
Log telegraf.Logger `toml:"-"`
listener net.Listener // overridden in tests
grpcServer *grpc.Server
wg sync.WaitGroup
@ -89,14 +90,16 @@ func (o *OpenTelemetry) Start(accumulator telegraf.Accumulator) error {
otlpgrpc.RegisterMetricsServer(o.grpcServer, ms)
otlpgrpc.RegisterLogsServer(o.grpcServer, newLogsService(logger, influxWriter))
listener, err := net.Listen("tcp", o.ServiceAddress)
if o.listener == nil {
o.listener, err = net.Listen("tcp", o.ServiceAddress)
if err != nil {
return err
}
}
o.wg.Add(1)
go func() {
if err := o.grpcServer.Serve(listener); err != nil {
if err := o.grpcServer.Serve(o.listener); err != nil {
accumulator.AddError(fmt.Errorf("failed to stop OpenTelemetry gRPC service: %w", err))
}
o.wg.Done()

View File

@ -0,0 +1,83 @@
package opentelemetry
import (
"context"
"net"
"testing"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
)
func TestOpenTelemetry(t *testing.T) {
mockListener := bufconn.Listen(1024 * 1024)
plugin := inputs.Inputs["opentelemetry"]().(*OpenTelemetry)
plugin.listener = mockListener
accumulator := new(testutil.Accumulator)
err := plugin.Start(accumulator)
require.NoError(t, err)
t.Cleanup(plugin.Stop)
metricExporter, err := otlpmetricgrpc.New(context.Background(),
otlpmetricgrpc.WithInsecure(),
otlpmetricgrpc.WithDialOption(
grpc.WithBlock(),
grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
return mockListener.Dial()
})),
)
require.NoError(t, err)
t.Cleanup(func() { _ = metricExporter.Shutdown(context.Background()) })
pusher := controller.New(
processor.New(
simple.NewWithExactDistribution(),
metricExporter,
),
controller.WithExporter(metricExporter),
)
err = pusher.Start(context.Background())
require.NoError(t, err)
t.Cleanup(func() { _ = pusher.Stop(context.Background()) })
global.SetMeterProvider(pusher.MeterProvider())
// write metrics
meter := global.Meter("library-name")
counter := metric.Must(meter).NewInt64Counter("measurement-counter")
meter.RecordBatch(context.Background(), nil, counter.Measurement(7))
err = pusher.Stop(context.Background())
require.NoError(t, err)
// Shutdown
plugin.Stop()
err = metricExporter.Shutdown(context.Background())
require.NoError(t, err)
// Check
assert.Empty(t, accumulator.Errors)
if assert.Len(t, accumulator.Metrics, 1) {
got := accumulator.Metrics[0]
assert.Equal(t, "measurement-counter", got.Measurement)
assert.Equal(t, telegraf.Counter, got.Type)
assert.Equal(t, "library-name", got.Tags["otel.library.name"])
}
}