From 697855c98b38a81bbe7dead59355f5740875448a Mon Sep 17 00:00:00 2001 From: Nathan J Mehl <70606471+n-oden@users.noreply.github.com> Date: Wed, 22 Dec 2021 15:11:28 -0500 Subject: [PATCH] fix: cumulative interval start times for stackdriver output (#10097) --- plugins/outputs/stackdriver/README.md | 11 +- plugins/outputs/stackdriver/counter_cache.go | 96 ++++++++++ .../outputs/stackdriver/counter_cache_test.go | 166 ++++++++++++++++++ plugins/outputs/stackdriver/stackdriver.go | 61 ++++--- .../outputs/stackdriver/stackdriver_test.go | 95 ++++++++++ 5 files changed, 406 insertions(+), 23 deletions(-) create mode 100644 plugins/outputs/stackdriver/counter_cache.go create mode 100644 plugins/outputs/stackdriver/counter_cache_test.go diff --git a/plugins/outputs/stackdriver/README.md b/plugins/outputs/stackdriver/README.md index a3c4f8295..1b074751e 100644 --- a/plugins/outputs/stackdriver/README.md +++ b/plugins/outputs/stackdriver/README.md @@ -50,7 +50,16 @@ Points collected with greater than 1 minute precision may need to be aggregated before then can be written. Consider using the [basicstats][] aggregator to do this. +Histogram / distribution and delta metrics are not yet supported. These will +be dropped silently unless debugging is on. + +Note that the plugin keeps an in-memory cache of the start times and last +observed values of all COUNTER metrics in order to comply with the +requirements of the stackdriver API. This cache is not GCed: if you remove +a large number of counters from the input side, you may wish to restart +telegraf to clear it. + [basicstats]: /plugins/aggregators/basicstats/README.md [stackdriver]: https://cloud.google.com/monitoring/api/v3/ [authentication]: https://cloud.google.com/docs/authentication/getting-started -[pricing]: https://cloud.google.com/stackdriver/pricing#stackdriver_monitoring_services +[pricing]: https://cloud.google.com/stackdriver/pricing#google-clouds-operations-suite-pricing diff --git a/plugins/outputs/stackdriver/counter_cache.go b/plugins/outputs/stackdriver/counter_cache.go new file mode 100644 index 000000000..b87a5806f --- /dev/null +++ b/plugins/outputs/stackdriver/counter_cache.go @@ -0,0 +1,96 @@ +package stackdriver + +import ( + "path" + "sort" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + + monpb "google.golang.org/genproto/googleapis/monitoring/v3" + tspb "google.golang.org/protobuf/types/known/timestamppb" +) + +type counterCache struct { + sync.RWMutex + cache map[string]*counterCacheEntry + log telegraf.Logger +} + +type counterCacheEntry struct { + LastValue *monpb.TypedValue + StartTime *tspb.Timestamp +} + +func (cce *counterCacheEntry) Reset(ts *tspb.Timestamp) { + // always backdate a reset by -1ms, otherwise stackdriver's API will hate us + cce.StartTime = tspb.New(ts.AsTime().Add(time.Millisecond * -1)) +} + +func (cc *counterCache) get(key string) (*counterCacheEntry, bool) { + cc.RLock() + defer cc.RUnlock() + value, ok := cc.cache[key] + return value, ok +} + +func (cc *counterCache) set(key string, value *counterCacheEntry) { + cc.Lock() + defer cc.Unlock() + cc.cache[key] = value +} + +func (cc *counterCache) GetStartTime(key string, value *monpb.TypedValue, endTime *tspb.Timestamp) *tspb.Timestamp { + lastObserved, ok := cc.get(key) + + // init: create a new key, backdate the state time to 1ms before the end time + if !ok { + newEntry := NewCounterCacheEntry(value, endTime) + cc.set(key, newEntry) + return newEntry.StartTime + } + + // update of existing entry + if value.GetDoubleValue() < lastObserved.LastValue.GetDoubleValue() || value.GetInt64Value() < lastObserved.LastValue.GetInt64Value() { + // counter reset + lastObserved.Reset(endTime) + } else { + // counter increment + // + // ...but... + // start times cannot be over 25 hours old; reset after 1 day to be safe + age := endTime.GetSeconds() - lastObserved.StartTime.GetSeconds() + cc.log.Debugf("age: %d", age) + if age > 86400 { + lastObserved.Reset(endTime) + } + } + // update last observed value + lastObserved.LastValue = value + return lastObserved.StartTime +} + +func NewCounterCache(log telegraf.Logger) *counterCache { + return &counterCache{ + cache: make(map[string]*counterCacheEntry), + log: log} +} + +func NewCounterCacheEntry(value *monpb.TypedValue, ts *tspb.Timestamp) *counterCacheEntry { + // Start times must be _before_ the end time, so backdate our original start time + // to 1ms before the observed time. + backDatedStart := ts.AsTime().Add(time.Millisecond * -1) + return &counterCacheEntry{LastValue: value, StartTime: tspb.New(backDatedStart)} +} + +func GetCounterCacheKey(m telegraf.Metric, f *telegraf.Field) string { + // normalize tag list to form a predictable key + var tags []string + for _, t := range m.TagList() { + tags = append(tags, strings.Join([]string{t.Key, t.Value}, "=")) + } + sort.Strings(tags) + return path.Join(m.Name(), strings.Join(tags, "/"), f.Key) +} diff --git a/plugins/outputs/stackdriver/counter_cache_test.go b/plugins/outputs/stackdriver/counter_cache_test.go new file mode 100644 index 000000000..703246f6a --- /dev/null +++ b/plugins/outputs/stackdriver/counter_cache_test.go @@ -0,0 +1,166 @@ +package stackdriver + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf/models" + + monpb "google.golang.org/genproto/googleapis/monitoring/v3" + tspb "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestCreateCounterCacheEntry(t *testing.T) { + cc := NewCounterCache(models.NewLogger("outputs", "stackdriver", "TestCreateCounterCacheEntry")) + value := &monpb.TypedValue{ + Value: &monpb.TypedValue_Int64Value{ + Int64Value: int64(1), + }, + } + endTime := tspb.Now() + startTime := cc.GetStartTime("key", value, endTime) + if endTime.AsTime().Add(time.Millisecond*-1) != startTime.AsTime() { + t.Fatal("Start time on a new entry should be 1ms behind the end time") + } +} + +func TestUpdateCounterCacheEntry(t *testing.T) { + cc := NewCounterCache(models.NewLogger("outputs", "stackdriver", "TestUpdateCounterCacheEntry")) + now := time.Now().UTC() + value := &monpb.TypedValue{ + Value: &monpb.TypedValue_Int64Value{ + Int64Value: int64(1), + }, + } + endTime := tspb.New(now) + startTime := cc.GetStartTime("key", value, endTime) + if endTime.AsTime().Add(time.Millisecond*-1) != startTime.AsTime() { + t.Fatal("Start time on a new entry should be 1ms behind the end time") + } + + // next observation, 1m later + value = &monpb.TypedValue{ + Value: &monpb.TypedValue_Int64Value{ + Int64Value: int64(2), + }, + } + endTime = tspb.New(now.Add(time.Second * 60)) + startTime = cc.GetStartTime("key", value, endTime) + // startTime is unchanged + if startTime.GetSeconds() != now.Unix() { + t.Fatal("Returned start time on an updated counter on the same day should not change") + } + obs, ok := cc.get("key") + if !ok { + t.Fatal("GetStartTime should create a fetchable k/v") + } + if obs.StartTime != startTime { + t.Fatal("Start time on fetched observation should match output from GetStartTime()") + } + if obs.LastValue != value { + t.Fatal("Stored value on fetched observation should have been updated.") + } +} + +func TestCounterCounterCacheEntryReset(t *testing.T) { + cc := NewCounterCache(models.NewLogger("outputs", "stackdriver", "TestCounterCounterCacheEntryReset")) + now := time.Now().UTC() + backdatedNow := now.Add(time.Millisecond * -1) + value := &monpb.TypedValue{ + Value: &monpb.TypedValue_Int64Value{ + Int64Value: int64(2), + }, + } + endTime := tspb.New(now) + startTime := cc.GetStartTime("key", value, endTime) + if startTime.AsTime() != backdatedNow { + t.Fatal("Start time on a new entry should be 1ms behind the end time") + } + + // next observation, 1m later, but a lower value + value = &monpb.TypedValue{ + Value: &monpb.TypedValue_Int64Value{ + Int64Value: int64(1), + }, + } + later := now.Add(time.Second * 60) + endTime = tspb.New(later) + startTime = cc.GetStartTime("key", value, endTime) + // startTime should now be the new endTime -1ms + if startTime.AsTime() != later.Add(time.Millisecond*-1) { + t.Fatal("Returned start time after a counter reset should equal the end time minus 1ms") + } + obs, ok := cc.get("key") + if !ok { + t.Fatal("GetStartTime should create a fetchable k/v") + } + if obs.StartTime.AsTime() != endTime.AsTime().Add(time.Millisecond*-1) { + t.Fatal("Start time on fetched observation after a counter reset should equal the end time minus 1ms") + } + if obs.LastValue != value { + t.Fatal("Stored value on fetched observation should have been updated.") + } +} + +func TestCounterCacheDayRollover(t *testing.T) { + cc := NewCounterCache(models.NewLogger("outputs", "stackdriver", "TestCounterCacheDayRollover")) + now := time.Now().UTC() + backdatedNow := now.Add(time.Millisecond * -1) + value := &monpb.TypedValue{ + Value: &monpb.TypedValue_Int64Value{ + Int64Value: int64(1), + }, + } + endTime := tspb.New(now) + startTime := cc.GetStartTime("key", value, endTime) + if startTime.AsTime() != backdatedNow { + t.Fatal("Start time on a new entry should be 1ms behind the end time") + } + + // next observation, 24h later + value = &monpb.TypedValue{ + Value: &monpb.TypedValue_Int64Value{ + Int64Value: int64(2), + }, + } + later := now.Add(time.Hour * 24) + endTime = tspb.New(later) + startTime = cc.GetStartTime("key", value, endTime) + if startTime.AsTime() != backdatedNow { + t.Fatalf("Returned start time %d 1s before a day rollover should equal the end time %d", startTime.GetSeconds(), now.Unix()) + } + obs, ok := cc.get("key") + if !ok { + t.Fatal("GetStartTime should create a fetchable k/v") + } + if obs.StartTime.AsTime() != backdatedNow { + t.Fatal("Start time on an updated counter 1s before a day rollover should be unchanged") + } + if obs.LastValue != value { + t.Fatal("Stored value on an updated counter should have been updated.") + } + + // next observation, 24h 1s later + value = &monpb.TypedValue{ + Value: &monpb.TypedValue_Int64Value{ + Int64Value: int64(3), + }, + } + tomorrow := later.Add(time.Second * 1) + endTime = tspb.New(tomorrow) + startTime = cc.GetStartTime("key", value, endTime) + // startTime should now be the new endTime + if startTime.GetSeconds() != tomorrow.Unix() { + t.Fatalf("Returned start time %d after a day rollover should equal the end time %d", startTime.GetSeconds(), tomorrow.Unix()) + } + obs, ok = cc.get("key") + if !ok { + t.Fatal("GetStartTime should create a fetchable k/v") + } + if obs.StartTime.AsTime() != endTime.AsTime().Add(time.Millisecond*-1) { + t.Fatal("Start time on fetched observation after a day rollover should equal the new end time -1ms") + } + if obs.LastValue != value { + t.Fatal("Stored value on fetched observation should have been updated.") + } +} diff --git a/plugins/outputs/stackdriver/stackdriver.go b/plugins/outputs/stackdriver/stackdriver.go index e1fb49d2e..0c4a7f958 100644 --- a/plugins/outputs/stackdriver/stackdriver.go +++ b/plugins/outputs/stackdriver/stackdriver.go @@ -22,13 +22,14 @@ import ( // Stackdriver is the Google Stackdriver config info. type Stackdriver struct { - Project string - Namespace string + Project string `toml:"project"` + Namespace string `toml:"namespace"` ResourceType string `toml:"resource_type"` ResourceLabels map[string]string `toml:"resource_labels"` Log telegraf.Logger `toml:"-"` - client *monitoring.MetricClient + client *monitoring.MetricClient + counterCache *counterCache } const ( @@ -42,8 +43,6 @@ const ( // to string length for label value. QuotaStringLengthForLabelValue = 1024 - // StartTime for cumulative metrics. - StartTime = int64(1) // MaxInt is the max int64 value. MaxInt = int(^uint(0) >> 1) @@ -87,6 +86,10 @@ func (s *Stackdriver) Connect() error { s.ResourceLabels = make(map[string]string, 1) } + if s.counterCache == nil { + s.counterCache = NewCounterCache(s.Log) + } + s.ResourceLabels["project_id"] = s.Project if s.client == nil { @@ -146,7 +149,7 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error { for _, f := range m.FieldList() { value, err := getStackdriverTypedValue(f.Value) if err != nil { - s.Log.Errorf("Get type failed: %s", err) + s.Log.Errorf("Get type failed: %q", err) continue } @@ -156,11 +159,13 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error { metricKind, err := getStackdriverMetricKind(m.Type()) if err != nil { - s.Log.Errorf("Get metric failed: %s", err) + s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), m.Type(), f, err) continue } - timeInterval, err := getStackdriverTimeInterval(metricKind, StartTime, m.Time().Unix()) + startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, f, s.counterCache) + + timeInterval, err := getStackdriverTimeInterval(metricKind, startTime, endTime) if err != nil { s.Log.Errorf("Get time interval failed: %s", err) continue @@ -240,26 +245,38 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error { return nil } +func getStackdriverIntervalEndpoints( + kind metricpb.MetricDescriptor_MetricKind, + value *monitoringpb.TypedValue, + m telegraf.Metric, + f *telegraf.Field, + cc *counterCache, +) (*timestamppb.Timestamp, *timestamppb.Timestamp) { + endTime := timestamppb.New(m.Time()) + var startTime *timestamppb.Timestamp + if kind == metricpb.MetricDescriptor_CUMULATIVE { + // Interval starts for stackdriver CUMULATIVE metrics must reset any time + // the counter resets, so we keep a cache of the start times and last + // observed values for each counter in the batch. + startTime = cc.GetStartTime(GetCounterCacheKey(m, f), value, endTime) + } + return startTime, endTime +} + func getStackdriverTimeInterval( m metricpb.MetricDescriptor_MetricKind, - start int64, - end int64, + startTime *timestamppb.Timestamp, + endTime *timestamppb.Timestamp, ) (*monitoringpb.TimeInterval, error) { switch m { case metricpb.MetricDescriptor_GAUGE: return &monitoringpb.TimeInterval{ - EndTime: ×tamppb.Timestamp{ - Seconds: end, - }, + EndTime: endTime, }, nil case metricpb.MetricDescriptor_CUMULATIVE: return &monitoringpb.TimeInterval{ - StartTime: ×tamppb.Timestamp{ - Seconds: start, - }, - EndTime: ×tamppb.Timestamp{ - Seconds: end, - }, + StartTime: startTime, + EndTime: endTime, }, nil case metricpb.MetricDescriptor_DELTA, metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED: fallthrough @@ -279,7 +296,7 @@ func getStackdriverMetricKind(vt telegraf.ValueType) (metricpb.MetricDescriptor_ case telegraf.Histogram, telegraf.Summary: fallthrough default: - return metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, fmt.Errorf("unsupported telegraf value type") + return metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, fmt.Errorf("unsupported telegraf value type: %T", vt) } } @@ -331,12 +348,12 @@ func (s *Stackdriver) getStackdriverLabels(tags []*telegraf.Tag) map[string]stri } for k, v := range labels { if len(k) > QuotaStringLengthForLabelKey { - s.Log.Warnf("Removing tag [%s] key exceeds string length for label key [%d]", k, QuotaStringLengthForLabelKey) + s.Log.Warnf("Removing tag %q key exceeds string length for label key [%d]", k, QuotaStringLengthForLabelKey) delete(labels, k) continue } if len(v) > QuotaStringLengthForLabelValue { - s.Log.Warnf("Removing tag [%s] value exceeds string length for label value [%d]", k, QuotaStringLengthForLabelValue) + s.Log.Warnf("Removing tag %q value exceeds string length for label value [%d]", k, QuotaStringLengthForLabelValue) delete(labels, k) continue } diff --git a/plugins/outputs/stackdriver/stackdriver_test.go b/plugins/outputs/stackdriver/stackdriver_test.go index 741e08e65..b963a3482 100644 --- a/plugins/outputs/stackdriver/stackdriver_test.go +++ b/plugins/outputs/stackdriver/stackdriver_test.go @@ -14,6 +14,7 @@ import ( monitoring "cloud.google.com/go/monitoring/apiv3/v2" "github.com/stretchr/testify/require" "google.golang.org/api/option" + metricpb "google.golang.org/genproto/googleapis/api/metric" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -447,3 +448,97 @@ func TestGetStackdriverLabels(t *testing.T) { labels := s.getStackdriverLabels(tags) require.Equal(t, QuotaLabelsPerMetricDescriptor, len(labels)) } + +func TestGetStackdriverIntervalEndpoints(t *testing.T) { + c, err := monitoring.NewMetricClient(context.Background(), clientOpt) + if err != nil { + t.Fatal(err) + } + + s := &Stackdriver{ + Project: fmt.Sprintf("projects/%s", "[PROJECT]"), + Namespace: "test", + Log: testutil.Logger{}, + client: c, + counterCache: NewCounterCache(testutil.Logger{}), + } + + now := time.Now().UTC() + later := time.Now().UTC().Add(time.Second * 10) + + // Metrics in descending order of timestamp + metrics := []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": 42, + }, + now, + telegraf.Gauge, + ), + testutil.MustMetric("cpu", + map[string]string{ + "foo": "foo", + }, + map[string]interface{}{ + "value": 43, + }, + later, + telegraf.Gauge, + ), + testutil.MustMetric("uptime", + map[string]string{ + "foo": "bar", + }, + map[string]interface{}{ + "value": 42, + }, + now, + telegraf.Counter, + ), + testutil.MustMetric("uptime", + map[string]string{ + "foo": "foo", + }, + map[string]interface{}{ + "value": 43, + }, + later, + telegraf.Counter, + ), + } + + for idx, m := range metrics { + for _, f := range m.FieldList() { + value, err := getStackdriverTypedValue(f.Value) + require.NoError(t, err) + require.NotNilf(t, value, "Got nil value for metric %q field %q", m, f) + + metricKind, err := getStackdriverMetricKind(m.Type()) + require.NoErrorf(t, err, "Get kind for metric %q (%T) field %q failed: %v", m.Name(), m.Type(), f, err) + + startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, f, s.counterCache) + + // we only generate startTimes for counters + if metricKind != metricpb.MetricDescriptor_CUMULATIVE { + require.Nilf(t, startTime, "startTime for non-counter metric %q (%T) field %q should be nil, was: %v", m.Name(), m.Type(), f, startTime) + } else { + if idx%2 == 0 { + // greaterorequal because we might pass a second boundary while the test is running + // and new startTimes are backdated 1ms from the endTime. + require.GreaterOrEqual(t, startTime.AsTime().UTC().Unix(), now.UTC().Unix()) + } else { + require.GreaterOrEqual(t, startTime.AsTime().UTC().Unix(), later.UTC().Unix()) + } + } + + if idx%2 == 0 { + require.Equal(t, now, endTime.AsTime()) + } else { + require.Equal(t, later, endTime.AsTime()) + } + } + } +}