fix: cumulative interval start times for stackdriver output (#10097)

This commit is contained in:
Nathan J Mehl 2021-12-22 15:11:28 -05:00 committed by GitHub
parent a202f68333
commit 697855c98b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 406 additions and 23 deletions

View File

@ -50,7 +50,16 @@ Points collected with greater than 1 minute precision may need to be
aggregated before then can be written. Consider using the [basicstats][]
aggregator to do this.
Histogram / distribution and delta metrics are not yet supported. These will
be dropped silently unless debugging is on.
Note that the plugin keeps an in-memory cache of the start times and last
observed values of all COUNTER metrics in order to comply with the
requirements of the stackdriver API. This cache is not GCed: if you remove
a large number of counters from the input side, you may wish to restart
telegraf to clear it.
[basicstats]: /plugins/aggregators/basicstats/README.md
[stackdriver]: https://cloud.google.com/monitoring/api/v3/
[authentication]: https://cloud.google.com/docs/authentication/getting-started
[pricing]: https://cloud.google.com/stackdriver/pricing#stackdriver_monitoring_services
[pricing]: https://cloud.google.com/stackdriver/pricing#google-clouds-operations-suite-pricing

View File

@ -0,0 +1,96 @@
package stackdriver
import (
"path"
"sort"
"strings"
"sync"
"time"
"github.com/influxdata/telegraf"
monpb "google.golang.org/genproto/googleapis/monitoring/v3"
tspb "google.golang.org/protobuf/types/known/timestamppb"
)
type counterCache struct {
sync.RWMutex
cache map[string]*counterCacheEntry
log telegraf.Logger
}
type counterCacheEntry struct {
LastValue *monpb.TypedValue
StartTime *tspb.Timestamp
}
func (cce *counterCacheEntry) Reset(ts *tspb.Timestamp) {
// always backdate a reset by -1ms, otherwise stackdriver's API will hate us
cce.StartTime = tspb.New(ts.AsTime().Add(time.Millisecond * -1))
}
func (cc *counterCache) get(key string) (*counterCacheEntry, bool) {
cc.RLock()
defer cc.RUnlock()
value, ok := cc.cache[key]
return value, ok
}
func (cc *counterCache) set(key string, value *counterCacheEntry) {
cc.Lock()
defer cc.Unlock()
cc.cache[key] = value
}
func (cc *counterCache) GetStartTime(key string, value *monpb.TypedValue, endTime *tspb.Timestamp) *tspb.Timestamp {
lastObserved, ok := cc.get(key)
// init: create a new key, backdate the state time to 1ms before the end time
if !ok {
newEntry := NewCounterCacheEntry(value, endTime)
cc.set(key, newEntry)
return newEntry.StartTime
}
// update of existing entry
if value.GetDoubleValue() < lastObserved.LastValue.GetDoubleValue() || value.GetInt64Value() < lastObserved.LastValue.GetInt64Value() {
// counter reset
lastObserved.Reset(endTime)
} else {
// counter increment
//
// ...but...
// start times cannot be over 25 hours old; reset after 1 day to be safe
age := endTime.GetSeconds() - lastObserved.StartTime.GetSeconds()
cc.log.Debugf("age: %d", age)
if age > 86400 {
lastObserved.Reset(endTime)
}
}
// update last observed value
lastObserved.LastValue = value
return lastObserved.StartTime
}
func NewCounterCache(log telegraf.Logger) *counterCache {
return &counterCache{
cache: make(map[string]*counterCacheEntry),
log: log}
}
func NewCounterCacheEntry(value *monpb.TypedValue, ts *tspb.Timestamp) *counterCacheEntry {
// Start times must be _before_ the end time, so backdate our original start time
// to 1ms before the observed time.
backDatedStart := ts.AsTime().Add(time.Millisecond * -1)
return &counterCacheEntry{LastValue: value, StartTime: tspb.New(backDatedStart)}
}
func GetCounterCacheKey(m telegraf.Metric, f *telegraf.Field) string {
// normalize tag list to form a predictable key
var tags []string
for _, t := range m.TagList() {
tags = append(tags, strings.Join([]string{t.Key, t.Value}, "="))
}
sort.Strings(tags)
return path.Join(m.Name(), strings.Join(tags, "/"), f.Key)
}

View File

@ -0,0 +1,166 @@
package stackdriver
import (
"testing"
"time"
"github.com/influxdata/telegraf/models"
monpb "google.golang.org/genproto/googleapis/monitoring/v3"
tspb "google.golang.org/protobuf/types/known/timestamppb"
)
func TestCreateCounterCacheEntry(t *testing.T) {
cc := NewCounterCache(models.NewLogger("outputs", "stackdriver", "TestCreateCounterCacheEntry"))
value := &monpb.TypedValue{
Value: &monpb.TypedValue_Int64Value{
Int64Value: int64(1),
},
}
endTime := tspb.Now()
startTime := cc.GetStartTime("key", value, endTime)
if endTime.AsTime().Add(time.Millisecond*-1) != startTime.AsTime() {
t.Fatal("Start time on a new entry should be 1ms behind the end time")
}
}
func TestUpdateCounterCacheEntry(t *testing.T) {
cc := NewCounterCache(models.NewLogger("outputs", "stackdriver", "TestUpdateCounterCacheEntry"))
now := time.Now().UTC()
value := &monpb.TypedValue{
Value: &monpb.TypedValue_Int64Value{
Int64Value: int64(1),
},
}
endTime := tspb.New(now)
startTime := cc.GetStartTime("key", value, endTime)
if endTime.AsTime().Add(time.Millisecond*-1) != startTime.AsTime() {
t.Fatal("Start time on a new entry should be 1ms behind the end time")
}
// next observation, 1m later
value = &monpb.TypedValue{
Value: &monpb.TypedValue_Int64Value{
Int64Value: int64(2),
},
}
endTime = tspb.New(now.Add(time.Second * 60))
startTime = cc.GetStartTime("key", value, endTime)
// startTime is unchanged
if startTime.GetSeconds() != now.Unix() {
t.Fatal("Returned start time on an updated counter on the same day should not change")
}
obs, ok := cc.get("key")
if !ok {
t.Fatal("GetStartTime should create a fetchable k/v")
}
if obs.StartTime != startTime {
t.Fatal("Start time on fetched observation should match output from GetStartTime()")
}
if obs.LastValue != value {
t.Fatal("Stored value on fetched observation should have been updated.")
}
}
func TestCounterCounterCacheEntryReset(t *testing.T) {
cc := NewCounterCache(models.NewLogger("outputs", "stackdriver", "TestCounterCounterCacheEntryReset"))
now := time.Now().UTC()
backdatedNow := now.Add(time.Millisecond * -1)
value := &monpb.TypedValue{
Value: &monpb.TypedValue_Int64Value{
Int64Value: int64(2),
},
}
endTime := tspb.New(now)
startTime := cc.GetStartTime("key", value, endTime)
if startTime.AsTime() != backdatedNow {
t.Fatal("Start time on a new entry should be 1ms behind the end time")
}
// next observation, 1m later, but a lower value
value = &monpb.TypedValue{
Value: &monpb.TypedValue_Int64Value{
Int64Value: int64(1),
},
}
later := now.Add(time.Second * 60)
endTime = tspb.New(later)
startTime = cc.GetStartTime("key", value, endTime)
// startTime should now be the new endTime -1ms
if startTime.AsTime() != later.Add(time.Millisecond*-1) {
t.Fatal("Returned start time after a counter reset should equal the end time minus 1ms")
}
obs, ok := cc.get("key")
if !ok {
t.Fatal("GetStartTime should create a fetchable k/v")
}
if obs.StartTime.AsTime() != endTime.AsTime().Add(time.Millisecond*-1) {
t.Fatal("Start time on fetched observation after a counter reset should equal the end time minus 1ms")
}
if obs.LastValue != value {
t.Fatal("Stored value on fetched observation should have been updated.")
}
}
func TestCounterCacheDayRollover(t *testing.T) {
cc := NewCounterCache(models.NewLogger("outputs", "stackdriver", "TestCounterCacheDayRollover"))
now := time.Now().UTC()
backdatedNow := now.Add(time.Millisecond * -1)
value := &monpb.TypedValue{
Value: &monpb.TypedValue_Int64Value{
Int64Value: int64(1),
},
}
endTime := tspb.New(now)
startTime := cc.GetStartTime("key", value, endTime)
if startTime.AsTime() != backdatedNow {
t.Fatal("Start time on a new entry should be 1ms behind the end time")
}
// next observation, 24h later
value = &monpb.TypedValue{
Value: &monpb.TypedValue_Int64Value{
Int64Value: int64(2),
},
}
later := now.Add(time.Hour * 24)
endTime = tspb.New(later)
startTime = cc.GetStartTime("key", value, endTime)
if startTime.AsTime() != backdatedNow {
t.Fatalf("Returned start time %d 1s before a day rollover should equal the end time %d", startTime.GetSeconds(), now.Unix())
}
obs, ok := cc.get("key")
if !ok {
t.Fatal("GetStartTime should create a fetchable k/v")
}
if obs.StartTime.AsTime() != backdatedNow {
t.Fatal("Start time on an updated counter 1s before a day rollover should be unchanged")
}
if obs.LastValue != value {
t.Fatal("Stored value on an updated counter should have been updated.")
}
// next observation, 24h 1s later
value = &monpb.TypedValue{
Value: &monpb.TypedValue_Int64Value{
Int64Value: int64(3),
},
}
tomorrow := later.Add(time.Second * 1)
endTime = tspb.New(tomorrow)
startTime = cc.GetStartTime("key", value, endTime)
// startTime should now be the new endTime
if startTime.GetSeconds() != tomorrow.Unix() {
t.Fatalf("Returned start time %d after a day rollover should equal the end time %d", startTime.GetSeconds(), tomorrow.Unix())
}
obs, ok = cc.get("key")
if !ok {
t.Fatal("GetStartTime should create a fetchable k/v")
}
if obs.StartTime.AsTime() != endTime.AsTime().Add(time.Millisecond*-1) {
t.Fatal("Start time on fetched observation after a day rollover should equal the new end time -1ms")
}
if obs.LastValue != value {
t.Fatal("Stored value on fetched observation should have been updated.")
}
}

View File

@ -22,13 +22,14 @@ import (
// Stackdriver is the Google Stackdriver config info.
type Stackdriver struct {
Project string
Namespace string
Project string `toml:"project"`
Namespace string `toml:"namespace"`
ResourceType string `toml:"resource_type"`
ResourceLabels map[string]string `toml:"resource_labels"`
Log telegraf.Logger `toml:"-"`
client *monitoring.MetricClient
client *monitoring.MetricClient
counterCache *counterCache
}
const (
@ -42,8 +43,6 @@ const (
// to string length for label value.
QuotaStringLengthForLabelValue = 1024
// StartTime for cumulative metrics.
StartTime = int64(1)
// MaxInt is the max int64 value.
MaxInt = int(^uint(0) >> 1)
@ -87,6 +86,10 @@ func (s *Stackdriver) Connect() error {
s.ResourceLabels = make(map[string]string, 1)
}
if s.counterCache == nil {
s.counterCache = NewCounterCache(s.Log)
}
s.ResourceLabels["project_id"] = s.Project
if s.client == nil {
@ -146,7 +149,7 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
for _, f := range m.FieldList() {
value, err := getStackdriverTypedValue(f.Value)
if err != nil {
s.Log.Errorf("Get type failed: %s", err)
s.Log.Errorf("Get type failed: %q", err)
continue
}
@ -156,11 +159,13 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
metricKind, err := getStackdriverMetricKind(m.Type())
if err != nil {
s.Log.Errorf("Get metric failed: %s", err)
s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), m.Type(), f, err)
continue
}
timeInterval, err := getStackdriverTimeInterval(metricKind, StartTime, m.Time().Unix())
startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, f, s.counterCache)
timeInterval, err := getStackdriverTimeInterval(metricKind, startTime, endTime)
if err != nil {
s.Log.Errorf("Get time interval failed: %s", err)
continue
@ -240,26 +245,38 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
return nil
}
func getStackdriverIntervalEndpoints(
kind metricpb.MetricDescriptor_MetricKind,
value *monitoringpb.TypedValue,
m telegraf.Metric,
f *telegraf.Field,
cc *counterCache,
) (*timestamppb.Timestamp, *timestamppb.Timestamp) {
endTime := timestamppb.New(m.Time())
var startTime *timestamppb.Timestamp
if kind == metricpb.MetricDescriptor_CUMULATIVE {
// Interval starts for stackdriver CUMULATIVE metrics must reset any time
// the counter resets, so we keep a cache of the start times and last
// observed values for each counter in the batch.
startTime = cc.GetStartTime(GetCounterCacheKey(m, f), value, endTime)
}
return startTime, endTime
}
func getStackdriverTimeInterval(
m metricpb.MetricDescriptor_MetricKind,
start int64,
end int64,
startTime *timestamppb.Timestamp,
endTime *timestamppb.Timestamp,
) (*monitoringpb.TimeInterval, error) {
switch m {
case metricpb.MetricDescriptor_GAUGE:
return &monitoringpb.TimeInterval{
EndTime: &timestamppb.Timestamp{
Seconds: end,
},
EndTime: endTime,
}, nil
case metricpb.MetricDescriptor_CUMULATIVE:
return &monitoringpb.TimeInterval{
StartTime: &timestamppb.Timestamp{
Seconds: start,
},
EndTime: &timestamppb.Timestamp{
Seconds: end,
},
StartTime: startTime,
EndTime: endTime,
}, nil
case metricpb.MetricDescriptor_DELTA, metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED:
fallthrough
@ -279,7 +296,7 @@ func getStackdriverMetricKind(vt telegraf.ValueType) (metricpb.MetricDescriptor_
case telegraf.Histogram, telegraf.Summary:
fallthrough
default:
return metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, fmt.Errorf("unsupported telegraf value type")
return metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, fmt.Errorf("unsupported telegraf value type: %T", vt)
}
}
@ -331,12 +348,12 @@ func (s *Stackdriver) getStackdriverLabels(tags []*telegraf.Tag) map[string]stri
}
for k, v := range labels {
if len(k) > QuotaStringLengthForLabelKey {
s.Log.Warnf("Removing tag [%s] key exceeds string length for label key [%d]", k, QuotaStringLengthForLabelKey)
s.Log.Warnf("Removing tag %q key exceeds string length for label key [%d]", k, QuotaStringLengthForLabelKey)
delete(labels, k)
continue
}
if len(v) > QuotaStringLengthForLabelValue {
s.Log.Warnf("Removing tag [%s] value exceeds string length for label value [%d]", k, QuotaStringLengthForLabelValue)
s.Log.Warnf("Removing tag %q value exceeds string length for label value [%d]", k, QuotaStringLengthForLabelValue)
delete(labels, k)
continue
}

View File

@ -14,6 +14,7 @@ import (
monitoring "cloud.google.com/go/monitoring/apiv3/v2"
"github.com/stretchr/testify/require"
"google.golang.org/api/option"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
@ -447,3 +448,97 @@ func TestGetStackdriverLabels(t *testing.T) {
labels := s.getStackdriverLabels(tags)
require.Equal(t, QuotaLabelsPerMetricDescriptor, len(labels))
}
func TestGetStackdriverIntervalEndpoints(t *testing.T) {
c, err := monitoring.NewMetricClient(context.Background(), clientOpt)
if err != nil {
t.Fatal(err)
}
s := &Stackdriver{
Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
Namespace: "test",
Log: testutil.Logger{},
client: c,
counterCache: NewCounterCache(testutil.Logger{}),
}
now := time.Now().UTC()
later := time.Now().UTC().Add(time.Second * 10)
// Metrics in descending order of timestamp
metrics := []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"foo": "bar",
},
map[string]interface{}{
"value": 42,
},
now,
telegraf.Gauge,
),
testutil.MustMetric("cpu",
map[string]string{
"foo": "foo",
},
map[string]interface{}{
"value": 43,
},
later,
telegraf.Gauge,
),
testutil.MustMetric("uptime",
map[string]string{
"foo": "bar",
},
map[string]interface{}{
"value": 42,
},
now,
telegraf.Counter,
),
testutil.MustMetric("uptime",
map[string]string{
"foo": "foo",
},
map[string]interface{}{
"value": 43,
},
later,
telegraf.Counter,
),
}
for idx, m := range metrics {
for _, f := range m.FieldList() {
value, err := getStackdriverTypedValue(f.Value)
require.NoError(t, err)
require.NotNilf(t, value, "Got nil value for metric %q field %q", m, f)
metricKind, err := getStackdriverMetricKind(m.Type())
require.NoErrorf(t, err, "Get kind for metric %q (%T) field %q failed: %v", m.Name(), m.Type(), f, err)
startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, f, s.counterCache)
// we only generate startTimes for counters
if metricKind != metricpb.MetricDescriptor_CUMULATIVE {
require.Nilf(t, startTime, "startTime for non-counter metric %q (%T) field %q should be nil, was: %v", m.Name(), m.Type(), f, startTime)
} else {
if idx%2 == 0 {
// greaterorequal because we might pass a second boundary while the test is running
// and new startTimes are backdated 1ms from the endTime.
require.GreaterOrEqual(t, startTime.AsTime().UTC().Unix(), now.UTC().Unix())
} else {
require.GreaterOrEqual(t, startTime.AsTime().UTC().Unix(), later.UTC().Unix())
}
}
if idx%2 == 0 {
require.Equal(t, now, endTime.AsTime())
} else {
require.Equal(t, later, endTime.AsTime())
}
}
}
}