fix(outputs.opentelemetry): group metrics by age and timestamp (#13292)
This commit is contained in:
parent
d4235ab86b
commit
6377f69501
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
ntls "crypto/tls"
|
ntls "crypto/tls"
|
||||||
_ "embed"
|
_ "embed"
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influxdb-observability/common"
|
"github.com/influxdata/influxdb-observability/common"
|
||||||
|
|
@ -121,7 +122,34 @@ func (o *OpenTelemetry) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Split metrics up by timestamp and send to Google Cloud Stackdriver
|
||||||
func (o *OpenTelemetry) Write(metrics []telegraf.Metric) error {
|
func (o *OpenTelemetry) Write(metrics []telegraf.Metric) error {
|
||||||
|
metricBatch := make(map[int64][]telegraf.Metric)
|
||||||
|
timestamps := []int64{}
|
||||||
|
for _, metric := range metrics {
|
||||||
|
timestamp := metric.Time().UnixNano()
|
||||||
|
if existingSlice, ok := metricBatch[timestamp]; ok {
|
||||||
|
metricBatch[timestamp] = append(existingSlice, metric)
|
||||||
|
} else {
|
||||||
|
metricBatch[timestamp] = []telegraf.Metric{metric}
|
||||||
|
timestamps = append(timestamps, timestamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort the timestamps we collected
|
||||||
|
sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] })
|
||||||
|
|
||||||
|
o.Log.Debugf("received %d metrics and split into %d groups by timestamp", len(metrics), len(metricBatch))
|
||||||
|
for _, timestamp := range timestamps {
|
||||||
|
if err := o.sendBatch(metricBatch[timestamp]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *OpenTelemetry) sendBatch(metrics []telegraf.Metric) error {
|
||||||
batch := o.metricsConverter.NewBatch()
|
batch := o.metricsConverter.NewBatch()
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
var vType common.InfluxMetricValueType
|
var vType common.InfluxMetricValueType
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,7 @@ func TestOpenTelemetry(t *testing.T) {
|
||||||
metricsConverter: metricsConverter,
|
metricsConverter: metricsConverter,
|
||||||
grpcClientConn: m.GrpcClient(),
|
grpcClientConn: m.GrpcClient(),
|
||||||
metricsServiceClient: pmetricotlp.NewGRPCClient(m.GrpcClient()),
|
metricsServiceClient: pmetricotlp.NewGRPCClient(m.GrpcClient()),
|
||||||
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
|
|
||||||
input := testutil.MustMetric(
|
input := testutil.MustMetric(
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue