From b87d06eb69bca9ee02f211a851184539e3fef5ca Mon Sep 17 00:00:00 2001 From: Sebastian Spaink <3441183+sspaink@users.noreply.github.com> Date: Wed, 27 Jul 2022 10:31:42 -0500 Subject: [PATCH] fix(inputs.stackdriver): Handle when no buckets available (#11556) --- plugins/inputs/stackdriver/stackdriver.go | 93 +++++++++++++++---- .../inputs/stackdriver/stackdriver_test.go | 73 +++++++++++++++ 2 files changed, 146 insertions(+), 20 deletions(-) diff --git a/plugins/inputs/stackdriver/stackdriver.go b/plugins/inputs/stackdriver/stackdriver.go index cd45f2f62..25cf4c430 100644 --- a/plugins/inputs/stackdriver/stackdriver.go +++ b/plugins/inputs/stackdriver/stackdriver.go @@ -4,6 +4,7 @@ package stackdriver import ( "context" _ "embed" + "errors" "fmt" "math" "strconv" @@ -564,6 +565,73 @@ func (s *Stackdriver) gatherTimeSeries( return nil } +type Buckets interface { + Amount() int32 + UpperBound(i int32) float64 +} + +type LinearBuckets struct { + *distributionpb.Distribution_BucketOptions_Linear +} + +func (l *LinearBuckets) Amount() int32 { + return l.NumFiniteBuckets + 2 +} + +func (l *LinearBuckets) UpperBound(i int32) float64 { + return l.Offset + (l.Width * float64(i)) +} + +type ExponentialBuckets struct { + *distributionpb.Distribution_BucketOptions_Exponential +} + +func (e *ExponentialBuckets) Amount() int32 { + return e.NumFiniteBuckets + 2 +} + +func (e *ExponentialBuckets) UpperBound(i int32) float64 { + width := math.Pow(e.GrowthFactor, float64(i)) + return e.Scale * width +} + +type ExplicitBuckets struct { + *distributionpb.Distribution_BucketOptions_Explicit +} + +func (e *ExplicitBuckets) Amount() int32 { + return int32(len(e.Bounds)) + 1 +} + +func (e *ExplicitBuckets) UpperBound(i int32) float64 { + return e.Bounds[i] +} + +func NewBucket(dist *distributionpb.Distribution) (Buckets, error) { + linearBuckets := dist.BucketOptions.GetLinearBuckets() + if linearBuckets != nil { + var l LinearBuckets + l.Distribution_BucketOptions_Linear = linearBuckets + return &l, nil + } + + exponentialBuckets := dist.BucketOptions.GetExponentialBuckets() + if exponentialBuckets != nil { + var e ExponentialBuckets + e.Distribution_BucketOptions_Exponential = exponentialBuckets + return &e, nil + } + + explicitBuckets := dist.BucketOptions.GetExplicitBuckets() + if explicitBuckets != nil { + var e ExplicitBuckets + e.Distribution_BucketOptions_Explicit = explicitBuckets + return &e, nil + } + + return nil, errors.New("no buckets available") +} + // AddDistribution adds metrics from a distribution value type. func (s *Stackdriver) addDistribution(dist *distributionpb.Distribution, tags map[string]string, ts time.Time, grouper *lockedSeriesGrouper, tsConf *timeSeriesConf, @@ -590,18 +658,11 @@ func (s *Stackdriver) addDistribution(dist *distributionpb.Distribution, tags ma } } - linearBuckets := dist.BucketOptions.GetLinearBuckets() - exponentialBuckets := dist.BucketOptions.GetExponentialBuckets() - explicitBuckets := dist.BucketOptions.GetExplicitBuckets() - - var numBuckets int32 - if linearBuckets != nil { - numBuckets = linearBuckets.NumFiniteBuckets + 2 - } else if exponentialBuckets != nil { - numBuckets = exponentialBuckets.NumFiniteBuckets + 2 - } else { - numBuckets = int32(len(explicitBuckets.Bounds)) + 1 + bucket, err := NewBucket(dist) + if err != nil { + return err } + numBuckets := bucket.Amount() var i int32 var count int64 @@ -611,15 +672,7 @@ func (s *Stackdriver) addDistribution(dist *distributionpb.Distribution, tags ma if i == numBuckets-1 { tags["lt"] = "+Inf" } else { - var upperBound float64 - if linearBuckets != nil { - upperBound = linearBuckets.Offset + (linearBuckets.Width * float64(i)) - } else if exponentialBuckets != nil { - width := math.Pow(exponentialBuckets.GrowthFactor, float64(i)) - upperBound = exponentialBuckets.Scale * width - } else if explicitBuckets != nil { - upperBound = explicitBuckets.Bounds[i] - } + upperBound := bucket.UpperBound(i) tags["lt"] = strconv.FormatFloat(upperBound, 'f', -1, 64) } diff --git a/plugins/inputs/stackdriver/stackdriver_test.go b/plugins/inputs/stackdriver/stackdriver_test.go index ad6b15145..b3f926072 100644 --- a/plugins/inputs/stackdriver/stackdriver_test.go +++ b/plugins/inputs/stackdriver/stackdriver_test.go @@ -95,7 +95,78 @@ func TestGather(t *testing.T) { descriptor *metricpb.MetricDescriptor timeseries *monitoringpb.TimeSeries expected []telegraf.Metric + wantAccErr bool }{ + { + name: "no_bucket", + descriptor: &metricpb.MetricDescriptor{ + ValueType: metricpb.MetricDescriptor_DISTRIBUTION, + }, + timeseries: createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamppb.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distribution.Distribution{ + Count: 2, + }, + }, + }, + }, + metricpb.MetricDescriptor_DISTRIBUTION, + ), + expected: []telegraf.Metric{ + testutil.MustMetric("", + map[string]string{ + "project_id": "test", + "resource_type": "global", + }, + map[string]interface{}{ + "value_count": 2, + "value_mean": float64(0), + "value_sum_of_squared_deviation": float64(0), + }, + now), + }, + wantAccErr: true, + }, + { + name: "int64", + descriptor: &metricpb.MetricDescriptor{ + Type: "telegraf/cpu/usage", + ValueType: metricpb.MetricDescriptor_INT64, + }, + timeseries: createTimeSeries( + &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamppb.Timestamp{ + Seconds: now.Unix(), + }, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_Int64Value{ + Int64Value: 42, + }, + }, + }, + metricpb.MetricDescriptor_INT64, + ), + expected: []telegraf.Metric{ + testutil.MustMetric("telegraf/cpu", + map[string]string{ + "resource_type": "global", + "project_id": "test", + }, + map[string]interface{}{ + "usage": 42, + }, + now), + }, + }, { name: "double", descriptor: &metricpb.MetricDescriptor{ @@ -673,6 +744,8 @@ func TestGather(t *testing.T) { err := s.Gather(&acc) require.NoError(t, err) + require.Equalf(t, len(acc.Errors) > 0, tt.wantAccErr, + "Accumulator errors. got=%v, want=%t", acc.Errors, tt.wantAccErr) actual := []telegraf.Metric{} for _, m := range acc.Metrics {