diff --git a/plugins/outputs/stackdriver/README.md b/plugins/outputs/stackdriver/README.md index 570fa70ff..1a6277caf 100644 --- a/plugins/outputs/stackdriver/README.md +++ b/plugins/outputs/stackdriver/README.md @@ -74,6 +74,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## metric type set to the cooresponding type. # metric_counter = [] # metric_gauge = [] + # metric_histogram = [] ## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## plugin definition, otherwise additional config options are read as part of @@ -101,8 +102,9 @@ Points collected with greater than 1 minute precision may need to be aggregated before then can be written. Consider using the [basicstats][] aggregator to do this. -Histogram / distribution and delta metrics are not yet supported. These will be -dropped silently unless debugging is on. +Histograms are supported only via metrics generated via the Prometheus metric +version 1 parser. The version 2 parser generates sparse metrics that would need +to be heavily transformed before sending to Stackdriver. Note that the plugin keeps an in-memory cache of the start times and last observed values of all COUNTER metrics in order to comply with the requirements diff --git a/plugins/outputs/stackdriver/counter_cache.go b/plugins/outputs/stackdriver/counter_cache.go index d350bba8e..3296194aa 100644 --- a/plugins/outputs/stackdriver/counter_cache.go +++ b/plugins/outputs/stackdriver/counter_cache.go @@ -91,5 +91,9 @@ func GetCounterCacheKey(m telegraf.Metric, f *telegraf.Field) string { tags = append(tags, strings.Join([]string{t.Key, t.Value}, "=")) } sort.Strings(tags) - return path.Join(m.Name(), strings.Join(tags, "/"), f.Key) + key := "" + if f != nil { + key = f.Key + } + return path.Join(m.Name(), strings.Join(tags, "/"), key) } diff --git a/plugins/outputs/stackdriver/sample.conf b/plugins/outputs/stackdriver/sample.conf index e9896f993..c8e808a0d 100644 --- a/plugins/outputs/stackdriver/sample.conf +++ b/plugins/outputs/stackdriver/sample.conf @@ -39,6 +39,7 @@ ## metric type set to the cooresponding type. # metric_counter = [] # metric_gauge = [] + # metric_histogram = [] ## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## plugin definition, otherwise additional config options are read as part of diff --git a/plugins/outputs/stackdriver/stackdriver.go b/plugins/outputs/stackdriver/stackdriver.go index d5f2b5269..c67cf5bb7 100644 --- a/plugins/outputs/stackdriver/stackdriver.go +++ b/plugins/outputs/stackdriver/stackdriver.go @@ -8,11 +8,13 @@ import ( "hash/fnv" "path" "sort" + "strconv" "strings" monitoring "cloud.google.com/go/monitoring/apiv3/v2" "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "google.golang.org/api/option" + "google.golang.org/genproto/googleapis/api/distribution" metricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" "google.golang.org/grpc/status" @@ -39,18 +41,23 @@ type Stackdriver struct { TagsAsResourceLabels []string `toml:"tags_as_resource_label"` MetricCounter []string `toml:"metric_counter"` MetricGauge []string `toml:"metric_gauge"` + MetricHistogram []string `toml:"metric_histogram"` Log telegraf.Logger `toml:"-"` - client *monitoring.MetricClient - counterCache *counterCache - filterCounter filter.Filter - filterGauge filter.Filter + client *monitoring.MetricClient + counterCache *counterCache + filterCounter filter.Filter + filterGauge filter.Filter + fitlerHistogram filter.Filter } const ( + // The user-defined limits are documented below: + // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas + // QuotaLabelsPerMetricDescriptor is the limit // to labels (tags) per metric descriptor. - QuotaLabelsPerMetricDescriptor = 10 + QuotaLabelsPerMetricDescriptor = 30 // QuotaStringLengthForLabelKey is the limit // to string length for label key. QuotaStringLengthForLabelKey = 100 @@ -92,6 +99,10 @@ func (s *Stackdriver) Init() error { if err != nil { return fmt.Errorf("creating gauge filter failed: %w", err) } + s.fitlerHistogram, err = filter.Compile(s.MetricHistogram) + if err != nil { + return fmt.Errorf("creating histogram filter failed: %w", err) + } return nil } @@ -152,12 +163,14 @@ func sorted(metrics []telegraf.Metric) []telegraf.Metric { type timeSeriesBuckets map[uint64][]*monitoringpb.TimeSeries -func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monitoringpb.TimeSeries) { +func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f []*telegraf.Field, ts *monitoringpb.TimeSeries) { h := fnv.New64a() h.Write([]byte(m.Name())) h.Write([]byte{'\n'}) - h.Write([]byte(f.Key)) - h.Write([]byte{'\n'}) + for _, field := range f { + h.Write([]byte(field.Key)) + h.Write([]byte{'\n'}) + } for key, value := range m.Tags() { h.Write([]byte(key)) h.Write([]byte{'\n'}) @@ -205,34 +218,45 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { buckets := make(timeSeriesBuckets) for _, m := range batch { - for _, f := range m.FieldList() { - value, err := s.getStackdriverTypedValue(f.Value) + // Set metric types based on user-provided filter + metricType := m.Type() + if s.filterCounter != nil && s.filterCounter.Match(m.Name()) { + metricType = telegraf.Counter + } + if s.filterGauge != nil && s.filterGauge.Match(m.Name()) { + metricType = telegraf.Gauge + } + if s.fitlerHistogram != nil && s.fitlerHistogram.Match(m.Name()) { + metricType = telegraf.Histogram + } + + metricKind, err := getStackdriverMetricKind(metricType) + if err != nil { + s.Log.Errorf("Get kind for metric %q (%T) failed: %s", m.Name(), metricType, err) + continue + } + + // Convert any declared tag to a resource label and remove it from + // the metric + resourceLabels := make(map[string]string, len(s.ResourceLabels)+len(s.TagsAsResourceLabels)) + for k, v := range s.ResourceLabels { + resourceLabels[k] = v + } + for _, tag := range s.TagsAsResourceLabels { + if val, ok := m.GetTag(tag); ok { + resourceLabels[tag] = val + m.RemoveTag(tag) + } + } + + if m.Type() == telegraf.Histogram { + value, err := s.buildHistogram(m) if err != nil { - s.Log.Errorf("Get type failed: %q", err) + s.Log.Errorf("Unable to build distribution from metric %s: %s", m, err) continue } - if value == nil { - continue - } - - // Set metric types based on user-provided filter - metricType := m.Type() - if s.filterCounter != nil && s.filterCounter.Match(m.Name()) { - metricType = telegraf.Counter - } - if s.filterGauge != nil && s.filterGauge.Match(m.Name()) { - metricType = telegraf.Gauge - } - - metricKind, err := getStackdriverMetricKind(metricType) - if err != nil { - s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), metricType, f, err) - continue - } - - startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, f, s.counterCache) - + startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, nil, s.counterCache) timeInterval, err := getStackdriverTimeInterval(metricKind, startTime, endTime) if err != nil { s.Log.Errorf("Get time interval failed: %s", err) @@ -245,17 +269,47 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { Value: value, } - // Convert any declared tag to a resource label and remove it from - // the metric - resourceLabels := make(map[string]string, len(s.ResourceLabels)+len(s.TagsAsResourceLabels)) - for k, v := range s.ResourceLabels { - resourceLabels[k] = v + // Prepare time series. + timeSeries := &monitoringpb.TimeSeries{ + Metric: &metricpb.Metric{ + Type: s.generateHistogramName(m), + Labels: s.getStackdriverLabels(m.TagList()), + }, + MetricKind: metricKind, + Resource: &monitoredrespb.MonitoredResource{ + Type: s.ResourceType, + Labels: resourceLabels, + }, + Points: []*monitoringpb.Point{ + dataPoint, + }, } - for _, tag := range s.TagsAsResourceLabels { - if val, ok := m.GetTag(tag); ok { - resourceLabels[tag] = val - m.RemoveTag(tag) - } + + buckets.Add(m, m.FieldList(), timeSeries) + continue + } + + for _, f := range m.FieldList() { + value, err := s.getStackdriverTypedValue(f.Value) + if err != nil { + s.Log.Errorf("Get type failed: %q", err) + continue + } + if value == nil { + continue + } + + startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, f, s.counterCache) + timeInterval, err := getStackdriverTimeInterval(metricKind, startTime, endTime) + if err != nil { + s.Log.Errorf("Get time interval failed: %s", err) + continue + } + + // Prepare an individual data point. + dataPoint := &monitoringpb.Point{ + Interval: timeInterval, + Value: value, } // Prepare time series. @@ -274,7 +328,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { }, } - buckets.Add(m, f, timeSeries) + buckets.Add(m, []*telegraf.Field{f}, timeSeries) // If the metric is untyped, it will end with unknown. We will also // send another metric with the unknown:counter suffix. Google will @@ -307,7 +361,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { dataPoint, }, } - buckets.Add(m, f, counterTimeSeries) + buckets.Add(m, []*telegraf.Field{f}, counterTimeSeries) } } } @@ -388,6 +442,19 @@ func (s *Stackdriver) generateMetricName(m telegraf.Metric, metricType telegraf. return path.Join(s.MetricTypePrefix, name, kind) } +func (s *Stackdriver) generateHistogramName(m telegraf.Metric) string { + if s.MetricNameFormat == "path" { + return path.Join(s.MetricTypePrefix, s.Namespace, m.Name()) + } + + name := m.Name() + if s.Namespace != "" { + name = s.Namespace + "_" + m.Name() + } + + return path.Join(s.MetricTypePrefix, name, "histogram") +} + func getStackdriverIntervalEndpoints( kind metricpb.MetricDescriptor_MetricKind, value *monitoringpb.TypedValue, @@ -436,7 +503,9 @@ func getStackdriverMetricKind(vt telegraf.ValueType) (metricpb.MetricDescriptor_ return metricpb.MetricDescriptor_GAUGE, nil case telegraf.Counter: return metricpb.MetricDescriptor_CUMULATIVE, nil - case telegraf.Histogram, telegraf.Summary: + case telegraf.Histogram: + return metricpb.MetricDescriptor_CUMULATIVE, nil + case telegraf.Summary: fallthrough default: return metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, fmt.Errorf("unsupported telegraf value type: %T", vt) @@ -497,6 +566,89 @@ func (s *Stackdriver) getStackdriverTypedValue(value interface{}) (*monitoringpb } } +func (s *Stackdriver) buildHistogram(m telegraf.Metric) (*monitoringpb.TypedValue, error) { + sumInter, ok := m.GetField("sum") + if !ok { + return nil, fmt.Errorf("no sum field present") + } + sum, err := internal.ToFloat64(sumInter) + if err != nil { + return nil, fmt.Errorf("unable to convert sum value to float64: %w", err) + } + m.RemoveField("sum") + + countInter, ok := m.GetField("count") + if !ok { + return nil, fmt.Errorf("no count field present") + } + count, err := internal.ToFloat64(countInter) + if err != nil { + return nil, fmt.Errorf("unable to convert count value to float64: %w", err) + } + m.RemoveField("count") + + // Build map of the buckets and their values + buckets := make([]float64, 0) + bucketCounts := make([]int64, 0) + for _, field := range m.FieldList() { + // Add the +inf value to bucket counts, no need to define a bound + if strings.Contains(strings.ToLower(field.Key), "+inf") { + count, err := internal.ToInt64(field.Value) + if err != nil { + continue + } + bucketCounts = append(bucketCounts, count) + continue + } + + bucket, err := strconv.ParseFloat(field.Key, 64) + if err != nil { + continue + } + + count, err := internal.ToInt64(field.Value) + if err != nil { + continue + } + + buckets = append(buckets, bucket) + bucketCounts = append(bucketCounts, count) + } + + sort.Slice(buckets, func(i, j int) bool { + return buckets[i] < buckets[j] + }) + sort.Slice(bucketCounts, func(i, j int) bool { + return bucketCounts[i] < bucketCounts[j] + }) + + // Bucket counts contain the count for a specific bucket, not the running + // total like Prometheus histograms use. Loop backwards to determine the + // count of each bucket rather than the running total count. + for i := len(bucketCounts) - 1; i > 0; i-- { + bucketCounts[i] = bucketCounts[i] - bucketCounts[i-1] + } + + v := &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distribution.Distribution{ + Count: int64(count), + Mean: sum / count, + BucketCounts: bucketCounts, + BucketOptions: &distribution.Distribution_BucketOptions{ + Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{ + Bounds: buckets, + }, + }, + }, + }, + }, + } + + return v, nil +} + func (s *Stackdriver) getStackdriverLabels(tags []*telegraf.Tag) map[string]string { labels := make(map[string]string) for _, t := range tags { diff --git a/plugins/outputs/stackdriver/stackdriver_test.go b/plugins/outputs/stackdriver/stackdriver_test.go index 120d2338b..369314a5b 100644 --- a/plugins/outputs/stackdriver/stackdriver_test.go +++ b/plugins/outputs/stackdriver/stackdriver_test.go @@ -239,6 +239,7 @@ func TestWriteMetricTypesOfficial(t *testing.T) { MetricNameFormat: "official", MetricCounter: []string{"mem_c"}, MetricGauge: []string{"mem_g"}, + MetricHistogram: []string{"mem_h"}, Log: testutil.Logger{}, client: c, } @@ -259,6 +260,19 @@ func TestWriteMetricTypesOfficial(t *testing.T) { }, time.Unix(3, 0), ), + testutil.MustMetric("mem_h", + map[string]string{}, + map[string]interface{}{ + "sum": 1, + "count": 1, + "5.0": 0.0, + "10.0": 0.0, + "15.0": 1.0, + "+Inf": 1.0, + }, + time.Unix(3, 0), + telegraf.Histogram, + ), } require.NoError(t, s.Connect()) @@ -266,13 +280,15 @@ func TestWriteMetricTypesOfficial(t *testing.T) { require.Len(t, mockMetric.reqs, 1) request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest) - require.Len(t, request.TimeSeries, 2) + require.Len(t, request.TimeSeries, 3) for _, ts := range request.TimeSeries { switch ts.Metric.Type { case "custom.googleapis.com/test_mem_c_value/counter": require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind) case "custom.googleapis.com/test_mem_g_value/gauge": require.Equal(t, metricpb.MetricDescriptor_GAUGE, ts.MetricKind) + case "custom.googleapis.com/test_mem_h/histogram": + require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind) default: require.False(t, true, "Unknown metric type", ts.Metric.Type) } @@ -642,6 +658,24 @@ func TestGetStackdriverLabels(t *testing.T) { {Key: "host", Value: "this"}, {Key: "name", Value: "bat"}, {Key: "device", Value: "local"}, + {Key: "foo", Value: "bar"}, + {Key: "hostname", Value: "local"}, + {Key: "a", Value: "1"}, + {Key: "b", Value: "2"}, + {Key: "c", Value: "3"}, + {Key: "d", Value: "4"}, + {Key: "e", Value: "5"}, + {Key: "f", Value: "6"}, + {Key: "g", Value: "7"}, + {Key: "h", Value: "8"}, + {Key: "i", Value: "9"}, + {Key: "j", Value: "10"}, + {Key: "k", Value: "11"}, + {Key: "l", Value: "12"}, + {Key: "m", Value: "13"}, + {Key: "n", Value: "14"}, + {Key: "o", Value: "15"}, + {Key: "p", Value: "16"}, {Key: "reserve", Value: "publication"}, {Key: "xpfqacltlmpguimhtjlou2qlmf9uqqwk3teajwlwqkoxtsppbnjksaxvzc1aa973pho9m96gfnl5op8ku7sv93rexyx42qe3zty12ityv", Value: "keyquota"}, { @@ -963,6 +997,147 @@ func TestStackdriverMetricNameOfficial(t *testing.T) { } } +func TestGenerateHistogramName(t *testing.T) { + tests := []struct { + name string + prefix string + namespace string + format string + expected string + + metric telegraf.Metric + }{ + { + name: "path", + prefix: "", + namespace: "", + format: "path", + expected: "uptime", + metric: metric.New( + "uptime", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Now(), + telegraf.Histogram, + ), + }, + { + name: "path with namespace", + prefix: "", + namespace: "name", + format: "path", + expected: "name/uptime", + metric: metric.New( + "uptime", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Now(), + telegraf.Histogram, + ), + }, + { + name: "path with namespace+prefix", + prefix: "prefix", + namespace: "name", + format: "path", + expected: "prefix/name/uptime", + metric: metric.New( + "uptime", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Now(), + telegraf.Histogram, + ), + }, + { + name: "official", + prefix: "", + namespace: "", + format: "official", + expected: "uptime/histogram", + metric: metric.New( + "uptime", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Now(), + telegraf.Histogram, + ), + }, + { + name: "official with namespace", + prefix: "", + namespace: "name", + format: "official", + expected: "name_uptime/histogram", + metric: metric.New( + "uptime", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Now(), + telegraf.Histogram, + ), + }, + { + name: "official with prefix+namespace", + prefix: "prefix", + namespace: "name", + format: "official", + expected: "prefix/name_uptime/histogram", + metric: metric.New( + "uptime", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Now(), + telegraf.Histogram, + ), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &Stackdriver{ + Namespace: tt.namespace, + MetricTypePrefix: tt.prefix, + MetricNameFormat: tt.format, + } + + require.Equal(t, tt.expected, s.generateHistogramName(tt.metric)) + }) + } +} + +func TestBuildHistogram(t *testing.T) { + s := &Stackdriver{ + MetricNameFormat: "official", + Log: testutil.Logger{}, + } + m := testutil.MustMetric( + "http_server_duration", + map[string]string{}, + map[string]interface{}{ + "sum": 1, + "count": 2, + "5.0": 0.0, + "10.0": 1.0, + "15.0": 1.0, + "20.0": 2.0, + "+Inf": 3.0, + "foo": 4.0, + }, + time.Unix(0, 0), + ) + value, err := s.buildHistogram(m) + require.NoError(t, err) + + dist := value.GetDistributionValue() + require.NotNil(t, dist) + require.Equal(t, int64(2), dist.Count) + require.Equal(t, 0.5, dist.Mean) + require.Len(t, dist.BucketCounts, 5) + require.Equal(t, []int64{0, 1, 0, 1, 1}, dist.BucketCounts) + require.Len(t, dist.BucketOptions.GetExplicitBuckets().Bounds, 4) + require.Equal(t, []float64{5.0, 10.0, 15.0, 20.0}, dist.BucketOptions.GetExplicitBuckets().Bounds) +} + func TestStackdriverValueInvalid(t *testing.T) { s := &Stackdriver{ MetricDataType: "foobar",