From 1a7c274ddf9bc5408f3ec610c4a26f885ba59035 Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Thu, 4 May 2023 03:48:54 -0600 Subject: [PATCH] fix(outputs.stackdriver): Group batches by timestamp (#12994) --- plugins/outputs/stackdriver/stackdriver.go | 31 +++++++++- .../outputs/stackdriver/stackdriver_test.go | 58 ++++++++++++------- 2 files changed, 67 insertions(+), 22 deletions(-) diff --git a/plugins/outputs/stackdriver/stackdriver.go b/plugins/outputs/stackdriver/stackdriver.go index 0ff075030..80d821c87 100644 --- a/plugins/outputs/stackdriver/stackdriver.go +++ b/plugins/outputs/stackdriver/stackdriver.go @@ -131,11 +131,38 @@ func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monit tsb[k] = s } -// Write the metrics to Google Cloud Stackdriver. +// Split metrics up by timestamp and send to Google Cloud Stackdriver func (s *Stackdriver) Write(metrics []telegraf.Metric) error { + metricBatch := make(map[int64][]telegraf.Metric) + timestamps := []int64{} + for _, metric := range sorted(metrics) { + timestamp := metric.Time().UnixNano() + if existingSlice, ok := metricBatch[timestamp]; ok { + metricBatch[timestamp] = append(existingSlice, metric) + } else { + metricBatch[timestamp] = []telegraf.Metric{metric} + timestamps = append(timestamps, timestamp) + } + } + + // sort the timestamps we collected + sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] }) + + s.Log.Debugf("received %d metrics\n", len(metrics)) + s.Log.Debugf("split into %d groups by timestamp\n", len(metricBatch)) + for _, timestamp := range timestamps { + if err := s.sendBatch(metricBatch[timestamp]); err != nil { + return err + } + } + + return nil +} + +// Write the metrics to Google Cloud Stackdriver. +func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { ctx := context.Background() - batch := sorted(metrics) buckets := make(timeSeriesBuckets) for _, m := range batch { for _, f := range m.FieldList() { diff --git a/plugins/outputs/stackdriver/stackdriver_test.go b/plugins/outputs/stackdriver/stackdriver_test.go index d3c6249c8..76a0c470b 100644 --- a/plugins/outputs/stackdriver/stackdriver_test.go +++ b/plugins/outputs/stackdriver/stackdriver_test.go @@ -314,14 +314,16 @@ func TestWriteBatchable(t *testing.T) { err = s.Write(metrics) require.NoError(t, err) - require.Len(t, mockMetric.reqs, 2) + require.Len(t, mockMetric.reqs, 5) + + // Request 1 with two time series request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest) - require.Len(t, request.TimeSeries, 6) + require.Len(t, request.TimeSeries, 2) ts := request.TimeSeries[0] require.Len(t, ts.Points, 1) require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{ EndTime: ×tamppb.Timestamp{ - Seconds: 3, + Seconds: 1, }, }) require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{ @@ -343,31 +345,47 @@ func TestWriteBatchable(t *testing.T) { }, }) - ts = request.TimeSeries[2] + // Request 2 with 1 time series + request = mockMetric.reqs[1].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 1) require.Len(t, ts.Points, 1) - require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{ + require.Equal(t, &monitoringpb.TimeInterval{ + EndTime: ×tamppb.Timestamp{ + Seconds: 2, + }, + }, request.TimeSeries[0].Points[0].Interval) + + // Request 3 with 1 time series with 1 point + request = mockMetric.reqs[2].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 3) + require.Len(t, request.TimeSeries[0].Points, 1) + require.Len(t, request.TimeSeries[1].Points, 1) + require.Len(t, request.TimeSeries[2].Points, 1) + require.Equal(t, &monitoringpb.TimeInterval{ EndTime: ×tamppb.Timestamp{ Seconds: 3, }, - }) - require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{ - Value: &monitoringpb.TypedValue_Int64Value{ - Int64Value: int64(43), - }, - }) + }, request.TimeSeries[0].Points[0].Interval) - ts = request.TimeSeries[4] - require.Len(t, ts.Points, 1) - require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{ + // Request 4 with 1 time series with 1 point + request = mockMetric.reqs[3].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 1) + require.Len(t, request.TimeSeries[0].Points, 1) + require.Equal(t, &monitoringpb.TimeInterval{ + EndTime: ×tamppb.Timestamp{ + Seconds: 4, + }, + }, request.TimeSeries[0].Points[0].Interval) + + // Request 5 with 1 time series with 1 point + request = mockMetric.reqs[4].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 1) + require.Len(t, request.TimeSeries[0].Points, 1) + require.Equal(t, &monitoringpb.TimeInterval{ EndTime: ×tamppb.Timestamp{ Seconds: 5, }, - }) - require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{ - Value: &monitoringpb.TypedValue_Int64Value{ - Int64Value: int64(43), - }, - }) + }, request.TimeSeries[0].Points[0].Interval) } func TestWriteIgnoredErrors(t *testing.T) {