fix(outputs.stackdriver): Group batches by timestamp (#12994)

This commit is contained in:
Joshua Powers 2023-05-04 03:48:54 -06:00 committed by GitHub
parent 4929f1ade7
commit 1a7c274ddf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 22 deletions

View File

@ -131,11 +131,38 @@ func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monit
tsb[k] = s
}
// Write the metrics to Google Cloud Stackdriver.
// Split metrics up by timestamp and send to Google Cloud Stackdriver
func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
metricBatch := make(map[int64][]telegraf.Metric)
timestamps := []int64{}
for _, metric := range sorted(metrics) {
timestamp := metric.Time().UnixNano()
if existingSlice, ok := metricBatch[timestamp]; ok {
metricBatch[timestamp] = append(existingSlice, metric)
} else {
metricBatch[timestamp] = []telegraf.Metric{metric}
timestamps = append(timestamps, timestamp)
}
}
// sort the timestamps we collected
sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] })
s.Log.Debugf("received %d metrics\n", len(metrics))
s.Log.Debugf("split into %d groups by timestamp\n", len(metricBatch))
for _, timestamp := range timestamps {
if err := s.sendBatch(metricBatch[timestamp]); err != nil {
return err
}
}
return nil
}
// Write the metrics to Google Cloud Stackdriver.
func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
ctx := context.Background()
batch := sorted(metrics)
buckets := make(timeSeriesBuckets)
for _, m := range batch {
for _, f := range m.FieldList() {

View File

@ -314,14 +314,16 @@ func TestWriteBatchable(t *testing.T) {
err = s.Write(metrics)
require.NoError(t, err)
require.Len(t, mockMetric.reqs, 2)
require.Len(t, mockMetric.reqs, 5)
// Request 1 with two time series
request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest)
require.Len(t, request.TimeSeries, 6)
require.Len(t, request.TimeSeries, 2)
ts := request.TimeSeries[0]
require.Len(t, ts.Points, 1)
require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{
EndTime: &timestamppb.Timestamp{
Seconds: 3,
Seconds: 1,
},
})
require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{
@ -343,31 +345,47 @@ func TestWriteBatchable(t *testing.T) {
},
})
ts = request.TimeSeries[2]
// Request 2 with 1 time series
request = mockMetric.reqs[1].(*monitoringpb.CreateTimeSeriesRequest)
require.Len(t, request.TimeSeries, 1)
require.Len(t, ts.Points, 1)
require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{
require.Equal(t, &monitoringpb.TimeInterval{
EndTime: &timestamppb.Timestamp{
Seconds: 2,
},
}, request.TimeSeries[0].Points[0].Interval)
// Request 3 with 1 time series with 1 point
request = mockMetric.reqs[2].(*monitoringpb.CreateTimeSeriesRequest)
require.Len(t, request.TimeSeries, 3)
require.Len(t, request.TimeSeries[0].Points, 1)
require.Len(t, request.TimeSeries[1].Points, 1)
require.Len(t, request.TimeSeries[2].Points, 1)
require.Equal(t, &monitoringpb.TimeInterval{
EndTime: &timestamppb.Timestamp{
Seconds: 3,
},
})
require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(43),
},
})
}, request.TimeSeries[0].Points[0].Interval)
ts = request.TimeSeries[4]
require.Len(t, ts.Points, 1)
require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{
// Request 4 with 1 time series with 1 point
request = mockMetric.reqs[3].(*monitoringpb.CreateTimeSeriesRequest)
require.Len(t, request.TimeSeries, 1)
require.Len(t, request.TimeSeries[0].Points, 1)
require.Equal(t, &monitoringpb.TimeInterval{
EndTime: &timestamppb.Timestamp{
Seconds: 4,
},
}, request.TimeSeries[0].Points[0].Interval)
// Request 5 with 1 time series with 1 point
request = mockMetric.reqs[4].(*monitoringpb.CreateTimeSeriesRequest)
require.Len(t, request.TimeSeries, 1)
require.Len(t, request.TimeSeries[0].Points, 1)
require.Equal(t, &monitoringpb.TimeInterval{
EndTime: &timestamppb.Timestamp{
Seconds: 5,
},
})
require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(43),
},
})
}, request.TimeSeries[0].Points[0].Interval)
}
func TestWriteIgnoredErrors(t *testing.T) {