diff --git a/go.mod b/go.mod index a569c672a..dc8b762d1 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.1.5 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.2.2 // indirect github.com/aws/smithy-go v1.8.0 - github.com/benbjohnson/clock v1.0.3 + github.com/benbjohnson/clock v1.1.0 github.com/beorn7/perks v1.0.1 // indirect github.com/bitly/go-hostpool v0.1.0 // indirect github.com/bmatcuk/doublestar/v3 v3.0.0 @@ -271,6 +271,9 @@ require ( go.mongodb.org/mongo-driver v1.5.3 go.opencensus.io v0.23.0 // indirect go.opentelemetry.io/collector/model v0.35.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.23.0 + go.opentelemetry.io/otel/metric v0.23.0 + go.opentelemetry.io/otel/sdk/metric v0.23.0 go.starlark.net v0.0.0-20210406145628-7a1108eaa012 go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect @@ -329,6 +332,17 @@ require ( sigs.k8s.io/yaml v1.2.0 // indirect ) +require ( + github.com/cenkalti/backoff/v4 v4.1.1 // indirect + go.opentelemetry.io/otel v1.0.0-RC3 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.23.0 // indirect + go.opentelemetry.io/otel/internal/metric v0.23.0 // indirect + go.opentelemetry.io/otel/sdk v1.0.0-RC3 // indirect + go.opentelemetry.io/otel/sdk/export/metric v0.23.0 // indirect + go.opentelemetry.io/otel/trace v1.0.0-RC3 // indirect + go.opentelemetry.io/proto/otlp v0.9.0 // indirect +) + // replaced due to https://github.com/satori/go.uuid/issues/73 replace github.com/satori/go.uuid => github.com/gofrs/uuid v3.2.0+incompatible diff --git a/go.sum b/go.sum index 300b12d8d..4189b4157 100644 --- a/go.sum +++ b/go.sum @@ -291,8 +291,8 @@ github.com/aws/smithy-go v1.0.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB github.com/aws/smithy-go v1.3.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.8.0 h1:AEwwwXQZtUwP5Mz506FeXXrKBe0jA8gVM+1gEcSRooc= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= -github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -333,6 +333,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEe github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= +github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= +github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -1639,7 +1641,27 @@ go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/collector/model v0.35.0 h1:NpKjghiqlei4ecwjOYOMhD6tj4gY8yiWHPJmbFs/ArI= go.opentelemetry.io/collector/model v0.35.0/go.mod h1:+7YCSjJG+MqiIFjauzt7oM2qkqBsaJWh5hcsO4fwsAc= +go.opentelemetry.io/otel v1.0.0-RC3 h1:kvwiyEkiUT/JaadXzVLI/R1wDO934A7r3Bs2wEe6wqA= +go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.23.0 h1:vKIEsT6IJU0NYd+iZccjgCmk80zsa7dTiC2Bu7U1jz0= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.23.0/go.mod h1:pe9oOWRaZyapdajWCn64fnl76v3cmTEmNBgh7MkKvwE= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.23.0 h1:JSsJID+KU3G8wxynfHIlWaefOvYngDjnrmtHOGb1sb0= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.23.0/go.mod h1:aSP5oMNaAfOYq+sRydHANZ0vBYLyZR/3lR9pru9aPLk= +go.opentelemetry.io/otel/internal/metric v0.23.0 h1:mPfzm9Iqhw7G2nDBmUAjFTfPqLZPbOW2k7QI57ITbaI= +go.opentelemetry.io/otel/internal/metric v0.23.0/go.mod h1:z+RPiDJe30YnCrOhFGivwBS+DU1JU/PiLKkk4re2DNY= +go.opentelemetry.io/otel/metric v0.23.0 h1:mYCcDxi60P4T27/0jchIDFa1WHEfQeU3zH9UEMpnj2c= +go.opentelemetry.io/otel/metric v0.23.0/go.mod h1:G/Nn9InyNnIv7J6YVkQfpc0JCfKBNJaERBGw08nqmVQ= +go.opentelemetry.io/otel/sdk v1.0.0-RC3 h1:iRMkET+EmJUn5mW0hJzygBraXRmrUwzbOtNvTCh/oKs= +go.opentelemetry.io/otel/sdk v1.0.0-RC3/go.mod h1:78H6hyg2fka0NYT9fqGuFLvly2yCxiBXDJAgLKo/2Us= +go.opentelemetry.io/otel/sdk/export/metric v0.23.0 h1:7NeoKPPx6NdZBVHLEp/LY5Lq85Ff1WNZnuJkuRy+azw= +go.opentelemetry.io/otel/sdk/export/metric v0.23.0/go.mod h1:SuMiREmKVRIwFKq73zvGTvwFpxb/ZAYkMfyqMoOtDqs= +go.opentelemetry.io/otel/sdk/metric v0.23.0 h1:xlZhPbiue1+jjSFEth94q9QCmX8Q24mOtue9IAmlVyI= +go.opentelemetry.io/otel/sdk/metric v0.23.0/go.mod h1:wa0sKK13eeIFW+0OFjcC3S1i7FTRRiLAXe1kjBVbhwg= +go.opentelemetry.io/otel/trace v1.0.0-RC3 h1:9F0ayEvlxv8BmNmPbU005WK7hC+7KbOazCPZjNa1yME= +go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v0.9.0 h1:C0g6TWmQYvjKRnljRULLWUVJGy8Uvu0NEL/5frY2/t4= +go.opentelemetry.io/proto/otlp v0.9.0/go.mod h1:1vKfU9rv61e9EVGthD1zNvUbiwPcimSsOPU9brfSHJg= go.starlark.net v0.0.0-20210406145628-7a1108eaa012 h1:4RGobP/iq7S22H0Bb92OEt+M8/cfBQnW+T+a2MC0sQo= go.starlark.net v0.0.0-20210406145628-7a1108eaa012/go.mod h1:t3mmBBPzAVvK0L0n1drDmrQsJ8FoIx4INCqVMTr/Zo0= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -1944,6 +1966,7 @@ golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/plugins/inputs/opentelemetry/grpc_services.go b/plugins/inputs/opentelemetry/grpc_services.go index f5fa450fa..1c805e2a2 100644 --- a/plugins/inputs/opentelemetry/grpc_services.go +++ b/plugins/inputs/opentelemetry/grpc_services.go @@ -56,7 +56,7 @@ func newMetricsService(logger common.Logger, writer *writeToAccumulator, schema func (s *metricsService) Export(ctx context.Context, req pdata.Metrics) (otlpgrpc.MetricsResponse, error) { err := s.converter.WriteMetrics(ctx, req, s.writer) - return otlpgrpc.MetricsResponse{}, err + return otlpgrpc.NewMetricsResponse(), err } type logsService struct { diff --git a/plugins/inputs/opentelemetry/opentelemetry.go b/plugins/inputs/opentelemetry/opentelemetry.go index 2e6cbf9b8..85f32a769 100644 --- a/plugins/inputs/opentelemetry/opentelemetry.go +++ b/plugins/inputs/opentelemetry/opentelemetry.go @@ -24,6 +24,7 @@ type OpenTelemetry struct { Log telegraf.Logger `toml:"-"` + listener net.Listener // overridden in tests grpcServer *grpc.Server wg sync.WaitGroup @@ -89,14 +90,16 @@ func (o *OpenTelemetry) Start(accumulator telegraf.Accumulator) error { otlpgrpc.RegisterMetricsServer(o.grpcServer, ms) otlpgrpc.RegisterLogsServer(o.grpcServer, newLogsService(logger, influxWriter)) - listener, err := net.Listen("tcp", o.ServiceAddress) - if err != nil { - return err + if o.listener == nil { + o.listener, err = net.Listen("tcp", o.ServiceAddress) + if err != nil { + return err + } } o.wg.Add(1) go func() { - if err := o.grpcServer.Serve(listener); err != nil { + if err := o.grpcServer.Serve(o.listener); err != nil { accumulator.AddError(fmt.Errorf("failed to stop OpenTelemetry gRPC service: %w", err)) } o.wg.Done() diff --git a/plugins/inputs/opentelemetry/opentelemetry_test.go b/plugins/inputs/opentelemetry/opentelemetry_test.go new file mode 100644 index 000000000..2de35bb06 --- /dev/null +++ b/plugins/inputs/opentelemetry/opentelemetry_test.go @@ -0,0 +1,83 @@ +package opentelemetry + +import ( + "context" + "net" + "testing" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/global" + controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" + processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" +) + +func TestOpenTelemetry(t *testing.T) { + mockListener := bufconn.Listen(1024 * 1024) + plugin := inputs.Inputs["opentelemetry"]().(*OpenTelemetry) + plugin.listener = mockListener + accumulator := new(testutil.Accumulator) + + err := plugin.Start(accumulator) + require.NoError(t, err) + t.Cleanup(plugin.Stop) + + metricExporter, err := otlpmetricgrpc.New(context.Background(), + otlpmetricgrpc.WithInsecure(), + otlpmetricgrpc.WithDialOption( + grpc.WithBlock(), + grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) { + return mockListener.Dial() + })), + ) + require.NoError(t, err) + t.Cleanup(func() { _ = metricExporter.Shutdown(context.Background()) }) + + pusher := controller.New( + processor.New( + simple.NewWithExactDistribution(), + metricExporter, + ), + controller.WithExporter(metricExporter), + ) + + err = pusher.Start(context.Background()) + require.NoError(t, err) + t.Cleanup(func() { _ = pusher.Stop(context.Background()) }) + + global.SetMeterProvider(pusher.MeterProvider()) + + // write metrics + meter := global.Meter("library-name") + counter := metric.Must(meter).NewInt64Counter("measurement-counter") + meter.RecordBatch(context.Background(), nil, counter.Measurement(7)) + + err = pusher.Stop(context.Background()) + require.NoError(t, err) + + // Shutdown + + plugin.Stop() + + err = metricExporter.Shutdown(context.Background()) + require.NoError(t, err) + + // Check + + assert.Empty(t, accumulator.Errors) + + if assert.Len(t, accumulator.Metrics, 1) { + got := accumulator.Metrics[0] + assert.Equal(t, "measurement-counter", got.Measurement) + assert.Equal(t, telegraf.Counter, got.Type) + assert.Equal(t, "library-name", got.Tags["otel.library.name"]) + } +}