diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 8e327ae3c..872e81efc 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -553,7 +553,8 @@ func TestKafkaRoundTripIntegration(t *testing.T) { MaxUndeliveredMessages: 1, } parser := &influx.Parser{} - parser.Init() + err = parser.Init() + require.NoError(t, err) input.SetParser(parser) err = input.Init() require.NoError(t, err) diff --git a/plugins/inputs/opcua/opcua.go b/plugins/inputs/opcua/opcua.go index 7fd9152f3..1f76f0455 100644 --- a/plugins/inputs/opcua/opcua.go +++ b/plugins/inputs/opcua/opcua.go @@ -3,12 +3,11 @@ package opcua import ( _ "embed" - "github.com/gopcua/opcua/ua" - "github.com/influxdata/telegraf/plugins/common/opcua" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/opcua" "github.com/influxdata/telegraf/plugins/common/opcua/input" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -21,7 +20,6 @@ type OpcUA struct { Log telegraf.Logger `toml:"-"` client *ReadClient - codes []ua.StatusCode } func (*OpcUA) SampleConfig() string { diff --git a/plugins/inputs/opentelemetry/grpc_services.go b/plugins/inputs/opentelemetry/grpc_services.go index 43f6ba2dd..6a676c270 100644 --- a/plugins/inputs/opentelemetry/grpc_services.go +++ b/plugins/inputs/opentelemetry/grpc_services.go @@ -4,12 +4,11 @@ import ( "context" "fmt" + "github.com/influxdata/influxdb-observability/common" + "github.com/influxdata/influxdb-observability/otel2influx" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" - - "github.com/influxdata/influxdb-observability/common" - "github.com/influxdata/influxdb-observability/otel2influx" ) type traceService struct { @@ -27,9 +26,9 @@ func newTraceService(logger common.Logger, writer *writeToAccumulator) *traceSer } } -func (s *traceService) Export(ctx context.Context, req ptraceotlp.Request) (ptraceotlp.Response, error) { +func (s *traceService) Export(ctx context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { err := s.converter.WriteTraces(ctx, req.Traces(), s.writer) - return ptraceotlp.NewResponse(), err + return ptraceotlp.NewExportResponse(), err } type metricsService struct { @@ -60,9 +59,9 @@ func newMetricsService(logger common.Logger, writer *writeToAccumulator, schema }, nil } -func (s *metricsService) Export(ctx context.Context, req pmetricotlp.Request) (pmetricotlp.Response, error) { +func (s *metricsService) Export(ctx context.Context, req pmetricotlp.ExportRequest) (pmetricotlp.ExportResponse, error) { err := s.converter.WriteMetrics(ctx, req.Metrics(), s.writer) - return pmetricotlp.NewResponse(), err + return pmetricotlp.NewExportResponse(), err } type logsService struct { @@ -80,7 +79,7 @@ func newLogsService(logger common.Logger, writer *writeToAccumulator) *logsServi } } -func (s *logsService) Export(ctx context.Context, req plogotlp.Request) (plogotlp.Response, error) { +func (s *logsService) Export(ctx context.Context, req plogotlp.ExportRequest) (plogotlp.ExportResponse, error) { err := s.converter.WriteLogs(ctx, req.Logs(), s.writer) - return plogotlp.NewResponse(), err + return plogotlp.NewExportResponse(), err } diff --git a/plugins/outputs/opentelemetry/opentelemetry.go b/plugins/outputs/opentelemetry/opentelemetry.go index e8a706c42..9d5c92bbe 100644 --- a/plugins/outputs/opentelemetry/opentelemetry.go +++ b/plugins/outputs/opentelemetry/opentelemetry.go @@ -13,9 +13,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - - // Blank import to allow gzip encoding - _ "google.golang.org/grpc/encoding/gzip" + _ "google.golang.org/grpc/encoding/gzip" // Blank import to allow gzip encoding "google.golang.org/grpc/metadata" "github.com/influxdata/telegraf" @@ -101,7 +99,7 @@ func (o *OpenTelemetry) Connect() error { return err } - metricsServiceClient := pmetricotlp.NewClient(grpcClientConn) + metricsServiceClient := pmetricotlp.NewGRPCClient(grpcClientConn) o.metricsConverter = metricsConverter o.grpcClientConn = grpcClientConn @@ -149,7 +147,7 @@ func (o *OpenTelemetry) Write(metrics []telegraf.Metric) error { } } - md := pmetricotlp.NewRequestFromMetrics(batch.GetMetrics()) + md := pmetricotlp.NewExportRequestFromMetrics(batch.GetMetrics()) if md.Metrics().ResourceMetrics().Len() == 0 { return nil } diff --git a/plugins/outputs/opentelemetry/opentelemetry_test.go b/plugins/outputs/opentelemetry/opentelemetry_test.go index 85d64c645..9e3691fb9 100644 --- a/plugins/outputs/opentelemetry/opentelemetry_test.go +++ b/plugins/outputs/opentelemetry/opentelemetry_test.go @@ -6,16 +6,14 @@ import ( "testing" "time" + "github.com/influxdata/influxdb-observability/common" + "github.com/influxdata/influxdb-observability/influx2otel" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" - "google.golang.org/grpc/credentials/insecure" - - "github.com/influxdata/influxdb-observability/common" - "github.com/influxdata/influxdb-observability/influx2otel" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "github.com/influxdata/telegraf" @@ -51,7 +49,7 @@ func TestOpenTelemetry(t *testing.T) { Attributes: map[string]string{"attr-key": "attr-val"}, metricsConverter: metricsConverter, grpcClientConn: m.GrpcClient(), - metricsServiceClient: pmetricotlp.NewClient(m.GrpcClient()), + metricsServiceClient: pmetricotlp.NewGRPCClient(m.GrpcClient()), } input := testutil.MustMetric( @@ -71,13 +69,14 @@ func TestOpenTelemetry(t *testing.T) { got := m.GotMetrics() - expectJSON, err := pmetric.NewJSONMarshaler().MarshalMetrics(expect) + marshaller := pmetric.JSONMarshaler{} + expectJSON, err := marshaller.MarshalMetrics(expect) require.NoError(t, err) - gotJSON, err := pmetric.NewJSONMarshaler().MarshalMetrics(got) + gotJSON, err := marshaller.MarshalMetrics(got) require.NoError(t, err) - assert.JSONEq(t, string(expectJSON), string(gotJSON)) + require.JSONEq(t, string(expectJSON), string(gotJSON)) } var _ pmetricotlp.GRPCServer = (*mockOtelService)(nil) @@ -103,7 +102,7 @@ func newMockOtelService(t *testing.T) *mockOtelService { } pmetricotlp.RegisterGRPCServer(grpcServer, mockOtelService) - go func() { assert.NoError(t, grpcServer.Serve(listener)) }() + go func() { require.NoError(t, grpcServer.Serve(listener)) }() grpcClient, err := grpc.Dial(listener.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) require.NoError(t, err) @@ -113,7 +112,7 @@ func newMockOtelService(t *testing.T) *mockOtelService { } func (m *mockOtelService) Cleanup() { - assert.NoError(m.t, m.grpcClient.Close()) + require.NoError(m.t, m.grpcClient.Close()) m.grpcServer.Stop() } @@ -129,11 +128,11 @@ func (m *mockOtelService) Address() string { return m.listener.Addr().String() } -func (m *mockOtelService) Export(ctx context.Context, request pmetricotlp.Request) (pmetricotlp.Response, error) { +func (m *mockOtelService) Export(ctx context.Context, request pmetricotlp.ExportRequest) (pmetricotlp.ExportResponse, error) { m.metrics = pmetric.NewMetrics() request.Metrics().CopyTo(m.metrics) ctxMetadata, ok := metadata.FromIncomingContext(ctx) - assert.Equal(m.t, []string{"header1"}, ctxMetadata.Get("test")) - assert.True(m.t, ok) - return pmetricotlp.NewResponse(), nil + require.Equal(m.t, []string{"header1"}, ctxMetadata.Get("test")) + require.True(m.t, ok) + return pmetricotlp.NewExportResponse(), nil } diff --git a/plugins/processors/parser/parser_test.go b/plugins/processors/parser/parser_test.go index 688aede1a..661c6f05f 100644 --- a/plugins/processors/parser/parser_test.go +++ b/plugins/processors/parser/parser_test.go @@ -4,19 +4,16 @@ import ( "testing" "time" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" "github.com/stretchr/testify/require" - //Blank import to register all new-style parsers - + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/plugins/parsers/value" - "github.com/influxdata/telegraf/testutil" ) @@ -701,8 +698,8 @@ func TestBadApply(t *testing.T) { func getMetricFields(m telegraf.Metric) interface{} { key := "field3" - if value, ok := m.Fields()[key]; ok { - return value + if v, ok := m.Fields()[key]; ok { + return v } return nil }