diff --git a/plugins/outputs/opentelemetry/opentelemetry.go b/plugins/outputs/opentelemetry/opentelemetry.go index 9d5c92bbe..7b5a92d26 100644 --- a/plugins/outputs/opentelemetry/opentelemetry.go +++ b/plugins/outputs/opentelemetry/opentelemetry.go @@ -5,6 +5,7 @@ import ( "context" ntls "crypto/tls" _ "embed" + "sort" "time" "github.com/influxdata/influxdb-observability/common" @@ -121,7 +122,34 @@ func (o *OpenTelemetry) Close() error { return nil } +// Split metrics up by timestamp and send to Google Cloud Stackdriver func (o *OpenTelemetry) Write(metrics []telegraf.Metric) error { + metricBatch := make(map[int64][]telegraf.Metric) + timestamps := []int64{} + for _, metric := range metrics { + timestamp := metric.Time().UnixNano() + if existingSlice, ok := metricBatch[timestamp]; ok { + metricBatch[timestamp] = append(existingSlice, metric) + } else { + metricBatch[timestamp] = []telegraf.Metric{metric} + timestamps = append(timestamps, timestamp) + } + } + + // sort the timestamps we collected + sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) + + o.Log.Debugf("received %d metrics and split into %d groups by timestamp", len(metrics), len(metricBatch)) + for _, timestamp := range timestamps { + if err := o.sendBatch(metricBatch[timestamp]); err != nil { + return err + } + } + + return nil +} + +func (o *OpenTelemetry) sendBatch(metrics []telegraf.Metric) error { batch := o.metricsConverter.NewBatch() for _, metric := range metrics { var vType common.InfluxMetricValueType diff --git a/plugins/outputs/opentelemetry/opentelemetry_test.go b/plugins/outputs/opentelemetry/opentelemetry_test.go index ea67f50e5..feb66e8f9 100644 --- a/plugins/outputs/opentelemetry/opentelemetry_test.go +++ b/plugins/outputs/opentelemetry/opentelemetry_test.go @@ -50,6 +50,7 @@ func TestOpenTelemetry(t *testing.T) { metricsConverter: metricsConverter, grpcClientConn: m.GrpcClient(), metricsServiceClient: pmetricotlp.NewGRPCClient(m.GrpcClient()), + Log: testutil.Logger{}, } input := testutil.MustMetric(