feat(outputs.stackdriver): Enable histogram support (#14275)

This commit is contained in:
Joshua Powers 2023-12-07 04:35:04 -07:00 committed by GitHub
parent 41b7a3d467
commit 3172fd5cfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 383 additions and 49 deletions

View File

@ -74,6 +74,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## metric type set to the cooresponding type. ## metric type set to the cooresponding type.
# metric_counter = [] # metric_counter = []
# metric_gauge = [] # metric_gauge = []
# metric_histogram = []
## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## NOTE: Due to the way TOML is parsed, tables must be at the END of the
## plugin definition, otherwise additional config options are read as part of ## plugin definition, otherwise additional config options are read as part of
@ -101,8 +102,9 @@ Points collected with greater than 1 minute precision may need to be aggregated
before then can be written. Consider using the [basicstats][] aggregator to do before then can be written. Consider using the [basicstats][] aggregator to do
this. this.
Histogram / distribution and delta metrics are not yet supported. These will be Histograms are supported only via metrics generated via the Prometheus metric
dropped silently unless debugging is on. version 1 parser. The version 2 parser generates sparse metrics that would need
to be heavily transformed before sending to Stackdriver.
Note that the plugin keeps an in-memory cache of the start times and last Note that the plugin keeps an in-memory cache of the start times and last
observed values of all COUNTER metrics in order to comply with the requirements observed values of all COUNTER metrics in order to comply with the requirements

View File

@ -91,5 +91,9 @@ func GetCounterCacheKey(m telegraf.Metric, f *telegraf.Field) string {
tags = append(tags, strings.Join([]string{t.Key, t.Value}, "=")) tags = append(tags, strings.Join([]string{t.Key, t.Value}, "="))
} }
sort.Strings(tags) sort.Strings(tags)
return path.Join(m.Name(), strings.Join(tags, "/"), f.Key) key := ""
if f != nil {
key = f.Key
}
return path.Join(m.Name(), strings.Join(tags, "/"), key)
} }

View File

@ -39,6 +39,7 @@
## metric type set to the cooresponding type. ## metric type set to the cooresponding type.
# metric_counter = [] # metric_counter = []
# metric_gauge = [] # metric_gauge = []
# metric_histogram = []
## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## NOTE: Due to the way TOML is parsed, tables must be at the END of the
## plugin definition, otherwise additional config options are read as part of ## plugin definition, otherwise additional config options are read as part of

View File

@ -8,11 +8,13 @@ import (
"hash/fnv" "hash/fnv"
"path" "path"
"sort" "sort"
"strconv"
"strings" "strings"
monitoring "cloud.google.com/go/monitoring/apiv3/v2" monitoring "cloud.google.com/go/monitoring/apiv3/v2"
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"google.golang.org/api/option" "google.golang.org/api/option"
"google.golang.org/genproto/googleapis/api/distribution"
metricpb "google.golang.org/genproto/googleapis/api/metric" metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -39,18 +41,23 @@ type Stackdriver struct {
TagsAsResourceLabels []string `toml:"tags_as_resource_label"` TagsAsResourceLabels []string `toml:"tags_as_resource_label"`
MetricCounter []string `toml:"metric_counter"` MetricCounter []string `toml:"metric_counter"`
MetricGauge []string `toml:"metric_gauge"` MetricGauge []string `toml:"metric_gauge"`
MetricHistogram []string `toml:"metric_histogram"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
client *monitoring.MetricClient client *monitoring.MetricClient
counterCache *counterCache counterCache *counterCache
filterCounter filter.Filter filterCounter filter.Filter
filterGauge filter.Filter filterGauge filter.Filter
fitlerHistogram filter.Filter
} }
const ( const (
// The user-defined limits are documented below:
// https://cloud.google.com/monitoring/quotas#custom_metrics_quotas
// QuotaLabelsPerMetricDescriptor is the limit // QuotaLabelsPerMetricDescriptor is the limit
// to labels (tags) per metric descriptor. // to labels (tags) per metric descriptor.
QuotaLabelsPerMetricDescriptor = 10 QuotaLabelsPerMetricDescriptor = 30
// QuotaStringLengthForLabelKey is the limit // QuotaStringLengthForLabelKey is the limit
// to string length for label key. // to string length for label key.
QuotaStringLengthForLabelKey = 100 QuotaStringLengthForLabelKey = 100
@ -92,6 +99,10 @@ func (s *Stackdriver) Init() error {
if err != nil { if err != nil {
return fmt.Errorf("creating gauge filter failed: %w", err) return fmt.Errorf("creating gauge filter failed: %w", err)
} }
s.fitlerHistogram, err = filter.Compile(s.MetricHistogram)
if err != nil {
return fmt.Errorf("creating histogram filter failed: %w", err)
}
return nil return nil
} }
@ -152,12 +163,14 @@ func sorted(metrics []telegraf.Metric) []telegraf.Metric {
type timeSeriesBuckets map[uint64][]*monitoringpb.TimeSeries type timeSeriesBuckets map[uint64][]*monitoringpb.TimeSeries
func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monitoringpb.TimeSeries) { func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f []*telegraf.Field, ts *monitoringpb.TimeSeries) {
h := fnv.New64a() h := fnv.New64a()
h.Write([]byte(m.Name())) h.Write([]byte(m.Name()))
h.Write([]byte{'\n'}) h.Write([]byte{'\n'})
h.Write([]byte(f.Key)) for _, field := range f {
h.Write([]byte{'\n'}) h.Write([]byte(field.Key))
h.Write([]byte{'\n'})
}
for key, value := range m.Tags() { for key, value := range m.Tags() {
h.Write([]byte(key)) h.Write([]byte(key))
h.Write([]byte{'\n'}) h.Write([]byte{'\n'})
@ -205,34 +218,45 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
buckets := make(timeSeriesBuckets) buckets := make(timeSeriesBuckets)
for _, m := range batch { for _, m := range batch {
for _, f := range m.FieldList() { // Set metric types based on user-provided filter
value, err := s.getStackdriverTypedValue(f.Value) metricType := m.Type()
if s.filterCounter != nil && s.filterCounter.Match(m.Name()) {
metricType = telegraf.Counter
}
if s.filterGauge != nil && s.filterGauge.Match(m.Name()) {
metricType = telegraf.Gauge
}
if s.fitlerHistogram != nil && s.fitlerHistogram.Match(m.Name()) {
metricType = telegraf.Histogram
}
metricKind, err := getStackdriverMetricKind(metricType)
if err != nil {
s.Log.Errorf("Get kind for metric %q (%T) failed: %s", m.Name(), metricType, err)
continue
}
// Convert any declared tag to a resource label and remove it from
// the metric
resourceLabels := make(map[string]string, len(s.ResourceLabels)+len(s.TagsAsResourceLabels))
for k, v := range s.ResourceLabels {
resourceLabels[k] = v
}
for _, tag := range s.TagsAsResourceLabels {
if val, ok := m.GetTag(tag); ok {
resourceLabels[tag] = val
m.RemoveTag(tag)
}
}
if m.Type() == telegraf.Histogram {
value, err := s.buildHistogram(m)
if err != nil { if err != nil {
s.Log.Errorf("Get type failed: %q", err) s.Log.Errorf("Unable to build distribution from metric %s: %s", m, err)
continue continue
} }
if value == nil { startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, nil, s.counterCache)
continue
}
// Set metric types based on user-provided filter
metricType := m.Type()
if s.filterCounter != nil && s.filterCounter.Match(m.Name()) {
metricType = telegraf.Counter
}
if s.filterGauge != nil && s.filterGauge.Match(m.Name()) {
metricType = telegraf.Gauge
}
metricKind, err := getStackdriverMetricKind(metricType)
if err != nil {
s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), metricType, f, err)
continue
}
startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, f, s.counterCache)
timeInterval, err := getStackdriverTimeInterval(metricKind, startTime, endTime) timeInterval, err := getStackdriverTimeInterval(metricKind, startTime, endTime)
if err != nil { if err != nil {
s.Log.Errorf("Get time interval failed: %s", err) s.Log.Errorf("Get time interval failed: %s", err)
@ -245,17 +269,47 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
Value: value, Value: value,
} }
// Convert any declared tag to a resource label and remove it from // Prepare time series.
// the metric timeSeries := &monitoringpb.TimeSeries{
resourceLabels := make(map[string]string, len(s.ResourceLabels)+len(s.TagsAsResourceLabels)) Metric: &metricpb.Metric{
for k, v := range s.ResourceLabels { Type: s.generateHistogramName(m),
resourceLabels[k] = v Labels: s.getStackdriverLabels(m.TagList()),
},
MetricKind: metricKind,
Resource: &monitoredrespb.MonitoredResource{
Type: s.ResourceType,
Labels: resourceLabels,
},
Points: []*monitoringpb.Point{
dataPoint,
},
} }
for _, tag := range s.TagsAsResourceLabels {
if val, ok := m.GetTag(tag); ok { buckets.Add(m, m.FieldList(), timeSeries)
resourceLabels[tag] = val continue
m.RemoveTag(tag) }
}
for _, f := range m.FieldList() {
value, err := s.getStackdriverTypedValue(f.Value)
if err != nil {
s.Log.Errorf("Get type failed: %q", err)
continue
}
if value == nil {
continue
}
startTime, endTime := getStackdriverIntervalEndpoints(metricKind, value, m, f, s.counterCache)
timeInterval, err := getStackdriverTimeInterval(metricKind, startTime, endTime)
if err != nil {
s.Log.Errorf("Get time interval failed: %s", err)
continue
}
// Prepare an individual data point.
dataPoint := &monitoringpb.Point{
Interval: timeInterval,
Value: value,
} }
// Prepare time series. // Prepare time series.
@ -274,7 +328,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
}, },
} }
buckets.Add(m, f, timeSeries) buckets.Add(m, []*telegraf.Field{f}, timeSeries)
// If the metric is untyped, it will end with unknown. We will also // If the metric is untyped, it will end with unknown. We will also
// send another metric with the unknown:counter suffix. Google will // send another metric with the unknown:counter suffix. Google will
@ -307,7 +361,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
dataPoint, dataPoint,
}, },
} }
buckets.Add(m, f, counterTimeSeries) buckets.Add(m, []*telegraf.Field{f}, counterTimeSeries)
} }
} }
} }
@ -388,6 +442,19 @@ func (s *Stackdriver) generateMetricName(m telegraf.Metric, metricType telegraf.
return path.Join(s.MetricTypePrefix, name, kind) return path.Join(s.MetricTypePrefix, name, kind)
} }
func (s *Stackdriver) generateHistogramName(m telegraf.Metric) string {
if s.MetricNameFormat == "path" {
return path.Join(s.MetricTypePrefix, s.Namespace, m.Name())
}
name := m.Name()
if s.Namespace != "" {
name = s.Namespace + "_" + m.Name()
}
return path.Join(s.MetricTypePrefix, name, "histogram")
}
func getStackdriverIntervalEndpoints( func getStackdriverIntervalEndpoints(
kind metricpb.MetricDescriptor_MetricKind, kind metricpb.MetricDescriptor_MetricKind,
value *monitoringpb.TypedValue, value *monitoringpb.TypedValue,
@ -436,7 +503,9 @@ func getStackdriverMetricKind(vt telegraf.ValueType) (metricpb.MetricDescriptor_
return metricpb.MetricDescriptor_GAUGE, nil return metricpb.MetricDescriptor_GAUGE, nil
case telegraf.Counter: case telegraf.Counter:
return metricpb.MetricDescriptor_CUMULATIVE, nil return metricpb.MetricDescriptor_CUMULATIVE, nil
case telegraf.Histogram, telegraf.Summary: case telegraf.Histogram:
return metricpb.MetricDescriptor_CUMULATIVE, nil
case telegraf.Summary:
fallthrough fallthrough
default: default:
return metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, fmt.Errorf("unsupported telegraf value type: %T", vt) return metricpb.MetricDescriptor_METRIC_KIND_UNSPECIFIED, fmt.Errorf("unsupported telegraf value type: %T", vt)
@ -497,6 +566,89 @@ func (s *Stackdriver) getStackdriverTypedValue(value interface{}) (*monitoringpb
} }
} }
func (s *Stackdriver) buildHistogram(m telegraf.Metric) (*monitoringpb.TypedValue, error) {
sumInter, ok := m.GetField("sum")
if !ok {
return nil, fmt.Errorf("no sum field present")
}
sum, err := internal.ToFloat64(sumInter)
if err != nil {
return nil, fmt.Errorf("unable to convert sum value to float64: %w", err)
}
m.RemoveField("sum")
countInter, ok := m.GetField("count")
if !ok {
return nil, fmt.Errorf("no count field present")
}
count, err := internal.ToFloat64(countInter)
if err != nil {
return nil, fmt.Errorf("unable to convert count value to float64: %w", err)
}
m.RemoveField("count")
// Build map of the buckets and their values
buckets := make([]float64, 0)
bucketCounts := make([]int64, 0)
for _, field := range m.FieldList() {
// Add the +inf value to bucket counts, no need to define a bound
if strings.Contains(strings.ToLower(field.Key), "+inf") {
count, err := internal.ToInt64(field.Value)
if err != nil {
continue
}
bucketCounts = append(bucketCounts, count)
continue
}
bucket, err := strconv.ParseFloat(field.Key, 64)
if err != nil {
continue
}
count, err := internal.ToInt64(field.Value)
if err != nil {
continue
}
buckets = append(buckets, bucket)
bucketCounts = append(bucketCounts, count)
}
sort.Slice(buckets, func(i, j int) bool {
return buckets[i] < buckets[j]
})
sort.Slice(bucketCounts, func(i, j int) bool {
return bucketCounts[i] < bucketCounts[j]
})
// Bucket counts contain the count for a specific bucket, not the running
// total like Prometheus histograms use. Loop backwards to determine the
// count of each bucket rather than the running total count.
for i := len(bucketCounts) - 1; i > 0; i-- {
bucketCounts[i] = bucketCounts[i] - bucketCounts[i-1]
}
v := &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distribution.Distribution{
Count: int64(count),
Mean: sum / count,
BucketCounts: bucketCounts,
BucketOptions: &distribution.Distribution_BucketOptions{
Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{
Bounds: buckets,
},
},
},
},
},
}
return v, nil
}
func (s *Stackdriver) getStackdriverLabels(tags []*telegraf.Tag) map[string]string { func (s *Stackdriver) getStackdriverLabels(tags []*telegraf.Tag) map[string]string {
labels := make(map[string]string) labels := make(map[string]string)
for _, t := range tags { for _, t := range tags {

View File

@ -239,6 +239,7 @@ func TestWriteMetricTypesOfficial(t *testing.T) {
MetricNameFormat: "official", MetricNameFormat: "official",
MetricCounter: []string{"mem_c"}, MetricCounter: []string{"mem_c"},
MetricGauge: []string{"mem_g"}, MetricGauge: []string{"mem_g"},
MetricHistogram: []string{"mem_h"},
Log: testutil.Logger{}, Log: testutil.Logger{},
client: c, client: c,
} }
@ -259,6 +260,19 @@ func TestWriteMetricTypesOfficial(t *testing.T) {
}, },
time.Unix(3, 0), time.Unix(3, 0),
), ),
testutil.MustMetric("mem_h",
map[string]string{},
map[string]interface{}{
"sum": 1,
"count": 1,
"5.0": 0.0,
"10.0": 0.0,
"15.0": 1.0,
"+Inf": 1.0,
},
time.Unix(3, 0),
telegraf.Histogram,
),
} }
require.NoError(t, s.Connect()) require.NoError(t, s.Connect())
@ -266,13 +280,15 @@ func TestWriteMetricTypesOfficial(t *testing.T) {
require.Len(t, mockMetric.reqs, 1) require.Len(t, mockMetric.reqs, 1)
request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest) request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest)
require.Len(t, request.TimeSeries, 2) require.Len(t, request.TimeSeries, 3)
for _, ts := range request.TimeSeries { for _, ts := range request.TimeSeries {
switch ts.Metric.Type { switch ts.Metric.Type {
case "custom.googleapis.com/test_mem_c_value/counter": case "custom.googleapis.com/test_mem_c_value/counter":
require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind) require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind)
case "custom.googleapis.com/test_mem_g_value/gauge": case "custom.googleapis.com/test_mem_g_value/gauge":
require.Equal(t, metricpb.MetricDescriptor_GAUGE, ts.MetricKind) require.Equal(t, metricpb.MetricDescriptor_GAUGE, ts.MetricKind)
case "custom.googleapis.com/test_mem_h/histogram":
require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind)
default: default:
require.False(t, true, "Unknown metric type", ts.Metric.Type) require.False(t, true, "Unknown metric type", ts.Metric.Type)
} }
@ -642,6 +658,24 @@ func TestGetStackdriverLabels(t *testing.T) {
{Key: "host", Value: "this"}, {Key: "host", Value: "this"},
{Key: "name", Value: "bat"}, {Key: "name", Value: "bat"},
{Key: "device", Value: "local"}, {Key: "device", Value: "local"},
{Key: "foo", Value: "bar"},
{Key: "hostname", Value: "local"},
{Key: "a", Value: "1"},
{Key: "b", Value: "2"},
{Key: "c", Value: "3"},
{Key: "d", Value: "4"},
{Key: "e", Value: "5"},
{Key: "f", Value: "6"},
{Key: "g", Value: "7"},
{Key: "h", Value: "8"},
{Key: "i", Value: "9"},
{Key: "j", Value: "10"},
{Key: "k", Value: "11"},
{Key: "l", Value: "12"},
{Key: "m", Value: "13"},
{Key: "n", Value: "14"},
{Key: "o", Value: "15"},
{Key: "p", Value: "16"},
{Key: "reserve", Value: "publication"}, {Key: "reserve", Value: "publication"},
{Key: "xpfqacltlmpguimhtjlou2qlmf9uqqwk3teajwlwqkoxtsppbnjksaxvzc1aa973pho9m96gfnl5op8ku7sv93rexyx42qe3zty12ityv", Value: "keyquota"}, {Key: "xpfqacltlmpguimhtjlou2qlmf9uqqwk3teajwlwqkoxtsppbnjksaxvzc1aa973pho9m96gfnl5op8ku7sv93rexyx42qe3zty12ityv", Value: "keyquota"},
{ {
@ -963,6 +997,147 @@ func TestStackdriverMetricNameOfficial(t *testing.T) {
} }
} }
func TestGenerateHistogramName(t *testing.T) {
tests := []struct {
name string
prefix string
namespace string
format string
expected string
metric telegraf.Metric
}{
{
name: "path",
prefix: "",
namespace: "",
format: "path",
expected: "uptime",
metric: metric.New(
"uptime",
map[string]string{},
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Histogram,
),
},
{
name: "path with namespace",
prefix: "",
namespace: "name",
format: "path",
expected: "name/uptime",
metric: metric.New(
"uptime",
map[string]string{},
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Histogram,
),
},
{
name: "path with namespace+prefix",
prefix: "prefix",
namespace: "name",
format: "path",
expected: "prefix/name/uptime",
metric: metric.New(
"uptime",
map[string]string{},
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Histogram,
),
},
{
name: "official",
prefix: "",
namespace: "",
format: "official",
expected: "uptime/histogram",
metric: metric.New(
"uptime",
map[string]string{},
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Histogram,
),
},
{
name: "official with namespace",
prefix: "",
namespace: "name",
format: "official",
expected: "name_uptime/histogram",
metric: metric.New(
"uptime",
map[string]string{},
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Histogram,
),
},
{
name: "official with prefix+namespace",
prefix: "prefix",
namespace: "name",
format: "official",
expected: "prefix/name_uptime/histogram",
metric: metric.New(
"uptime",
map[string]string{},
map[string]interface{}{"value": 42},
time.Now(),
telegraf.Histogram,
),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Stackdriver{
Namespace: tt.namespace,
MetricTypePrefix: tt.prefix,
MetricNameFormat: tt.format,
}
require.Equal(t, tt.expected, s.generateHistogramName(tt.metric))
})
}
}
func TestBuildHistogram(t *testing.T) {
s := &Stackdriver{
MetricNameFormat: "official",
Log: testutil.Logger{},
}
m := testutil.MustMetric(
"http_server_duration",
map[string]string{},
map[string]interface{}{
"sum": 1,
"count": 2,
"5.0": 0.0,
"10.0": 1.0,
"15.0": 1.0,
"20.0": 2.0,
"+Inf": 3.0,
"foo": 4.0,
},
time.Unix(0, 0),
)
value, err := s.buildHistogram(m)
require.NoError(t, err)
dist := value.GetDistributionValue()
require.NotNil(t, dist)
require.Equal(t, int64(2), dist.Count)
require.Equal(t, 0.5, dist.Mean)
require.Len(t, dist.BucketCounts, 5)
require.Equal(t, []int64{0, 1, 0, 1, 1}, dist.BucketCounts)
require.Len(t, dist.BucketOptions.GetExplicitBuckets().Bounds, 4)
require.Equal(t, []float64{5.0, 10.0, 15.0, 20.0}, dist.BucketOptions.GetExplicitBuckets().Bounds)
}
func TestStackdriverValueInvalid(t *testing.T) { func TestStackdriverValueInvalid(t *testing.T) {
s := &Stackdriver{ s := &Stackdriver{
MetricDataType: "foobar", MetricDataType: "foobar",