From 6b01384c6e94187da1cda629eab52a26c8be40ba Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Mon, 2 Oct 2023 02:30:32 -0600 Subject: [PATCH] feat(outputs.stackdriver): Add metric type config options (#14017) * feat(outputs.stackdriver): Add metric type config options This adds two new user config options that take metric names, with globs as well. Metric's matching these names will have their type set before sending to Stackdriver. fixes: #14006 * run make docs --- plugins/outputs/stackdriver/README.md | 6 + plugins/outputs/stackdriver/sample.conf | 6 + plugins/outputs/stackdriver/stackdriver.go | 40 ++++-- .../outputs/stackdriver/stackdriver_test.go | 124 +++++++++++++++++- 4 files changed, 166 insertions(+), 10 deletions(-) diff --git a/plugins/outputs/stackdriver/README.md b/plugins/outputs/stackdriver/README.md index 53c5a483d..570fa70ff 100644 --- a/plugins/outputs/stackdriver/README.md +++ b/plugins/outputs/stackdriver/README.md @@ -69,6 +69,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Custom resource type # resource_type = "generic_node" + ## Override metric type by metric name + ## Metric names matching the values here, globbing supported, will have the + ## metric type set to the cooresponding type. + # metric_counter = [] + # metric_gauge = [] + ## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## plugin definition, otherwise additional config options are read as part of ## the table diff --git a/plugins/outputs/stackdriver/sample.conf b/plugins/outputs/stackdriver/sample.conf index 1550d558a..e9896f993 100644 --- a/plugins/outputs/stackdriver/sample.conf +++ b/plugins/outputs/stackdriver/sample.conf @@ -34,6 +34,12 @@ ## Custom resource type # resource_type = "generic_node" + ## Override metric type by metric name + ## Metric names matching the values here, globbing supported, will have the + ## metric type set to the cooresponding type. + # metric_counter = [] + # metric_gauge = [] + ## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## plugin definition, otherwise additional config options are read as part of ## the table diff --git a/plugins/outputs/stackdriver/stackdriver.go b/plugins/outputs/stackdriver/stackdriver.go index d9eab15e4..d5f2b5269 100644 --- a/plugins/outputs/stackdriver/stackdriver.go +++ b/plugins/outputs/stackdriver/stackdriver.go @@ -19,6 +19,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" ) @@ -36,10 +37,14 @@ type Stackdriver struct { MetricNameFormat string `toml:"metric_name_format"` MetricDataType string `toml:"metric_data_type"` TagsAsResourceLabels []string `toml:"tags_as_resource_label"` + MetricCounter []string `toml:"metric_counter"` + MetricGauge []string `toml:"metric_gauge"` Log telegraf.Logger `toml:"-"` - client *monitoring.MetricClient - counterCache *counterCache + client *monitoring.MetricClient + counterCache *counterCache + filterCounter filter.Filter + filterGauge filter.Filter } const ( @@ -78,6 +83,16 @@ func (s *Stackdriver) Init() error { return fmt.Errorf("unrecognized metric data type: %s", s.MetricDataType) } + var err error + s.filterCounter, err = filter.Compile(s.MetricCounter) + if err != nil { + return fmt.Errorf("creating counter filter failed: %w", err) + } + s.filterGauge, err = filter.Compile(s.MetricGauge) + if err != nil { + return fmt.Errorf("creating gauge filter failed: %w", err) + } + return nil } @@ -201,9 +216,18 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { continue } - metricKind, err := getStackdriverMetricKind(m.Type()) + // Set metric types based on user-provided filter + metricType := m.Type() + if s.filterCounter != nil && s.filterCounter.Match(m.Name()) { + metricType = telegraf.Counter + } + if s.filterGauge != nil && s.filterGauge.Match(m.Name()) { + metricType = telegraf.Gauge + } + + metricKind, err := getStackdriverMetricKind(metricType) if err != nil { - s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), m.Type(), f, err) + s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), metricType, f, err) continue } @@ -237,7 +261,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { // Prepare time series. timeSeries := &monitoringpb.TimeSeries{ Metric: &metricpb.Metric{ - Type: s.generateMetricName(m, f.Key), + Type: s.generateMetricName(m, metricType, f.Key), Labels: s.getStackdriverLabels(m.TagList()), }, MetricKind: metricKind, @@ -271,7 +295,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { counterTimeSeries := &monitoringpb.TimeSeries{ Metric: &metricpb.Metric{ - Type: s.generateMetricName(m, f.Key) + ":counter", + Type: s.generateMetricName(m, metricType, f.Key) + ":counter", Labels: s.getStackdriverLabels(m.TagList()), }, MetricKind: metricpb.MetricDescriptor_CUMULATIVE, @@ -337,7 +361,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { return nil } -func (s *Stackdriver) generateMetricName(m telegraf.Metric, key string) string { +func (s *Stackdriver) generateMetricName(m telegraf.Metric, metricType telegraf.ValueType, key string) string { if s.MetricNameFormat == "path" { return path.Join(s.MetricTypePrefix, s.Namespace, m.Name(), key) } @@ -348,7 +372,7 @@ func (s *Stackdriver) generateMetricName(m telegraf.Metric, key string) string { } var kind string - switch m.Type() { + switch metricType { case telegraf.Gauge: kind = "gauge" case telegraf.Untyped: diff --git a/plugins/outputs/stackdriver/stackdriver_test.go b/plugins/outputs/stackdriver/stackdriver_test.go index 1ab69f5fc..26d05f96a 100644 --- a/plugins/outputs/stackdriver/stackdriver_test.go +++ b/plugins/outputs/stackdriver/stackdriver_test.go @@ -219,6 +219,126 @@ func TestWriteTagsAsResourceLabels(t *testing.T) { } } +func TestWriteMetricTypesOfficial(t *testing.T) { + expectedResponse := &emptypb.Empty{} + mockMetric.err = nil + mockMetric.reqs = nil + mockMetric.resps = append(mockMetric.resps[:0], expectedResponse) + + c, err := monitoring.NewMetricClient(context.Background(), clientOpt) + if err != nil { + t.Fatal(err) + } + + s := &Stackdriver{ + Project: fmt.Sprintf("projects/%s", "[PROJECT]"), + Namespace: "test", + ResourceLabels: map[string]string{ + "mylabel": "myvalue", + }, + MetricNameFormat: "official", + MetricCounter: []string{"mem_c"}, + MetricGauge: []string{"mem_g"}, + Log: testutil.Logger{}, + client: c, + } + require.NoError(t, s.Init()) + + metrics := []telegraf.Metric{ + testutil.MustMetric("mem_g", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(3, 0), + ), + testutil.MustMetric("mem_c", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(3, 0), + ), + } + + require.NoError(t, s.Connect()) + require.NoError(t, s.Write(metrics)) + require.Len(t, mockMetric.reqs, 1) + + request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 2) + for _, ts := range request.TimeSeries { + switch ts.Metric.Type { + case "custom.googleapis.com/test_mem_c_value/counter": + require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind) + case "custom.googleapis.com/test_mem_g_value/gauge": + require.Equal(t, metricpb.MetricDescriptor_GAUGE, ts.MetricKind) + default: + require.False(t, true, "Unknown metric type", ts.Metric.Type) + } + } +} + +func TestWriteMetricTypesPath(t *testing.T) { + expectedResponse := &emptypb.Empty{} + mockMetric.err = nil + mockMetric.reqs = nil + mockMetric.resps = append(mockMetric.resps[:0], expectedResponse) + + c, err := monitoring.NewMetricClient(context.Background(), clientOpt) + if err != nil { + t.Fatal(err) + } + + s := &Stackdriver{ + Project: fmt.Sprintf("projects/%s", "[PROJECT]"), + Namespace: "test", + ResourceLabels: map[string]string{ + "mylabel": "myvalue", + }, + MetricNameFormat: "path", + MetricCounter: []string{"mem_c"}, + MetricGauge: []string{"mem_g"}, + Log: testutil.Logger{}, + client: c, + } + require.NoError(t, s.Init()) + + metrics := []telegraf.Metric{ + testutil.MustMetric("mem_g", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(3, 0), + ), + testutil.MustMetric("mem_c", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(3, 0), + ), + } + + require.NoError(t, s.Connect()) + require.NoError(t, s.Write(metrics)) + require.Len(t, mockMetric.reqs, 1) + + request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 2) + for _, ts := range request.TimeSeries { + switch ts.Metric.Type { + case "custom.googleapis.com/test/mem_c/value": + require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind) + case "custom.googleapis.com/test/mem_g/value": + require.Equal(t, metricpb.MetricDescriptor_GAUGE, ts.MetricKind) + default: + require.False(t, true, "Unknown metric type", ts.Metric.Type) + } + } +} + func TestWriteAscendingTime(t *testing.T) { expectedResponse := &emptypb.Empty{} mockMetric.err = nil @@ -749,7 +869,7 @@ func TestStackdriverMetricNamePath(t *testing.T) { time.Now(), telegraf.Gauge, ) - require.Equal(t, "foo/namespace/uptime/key", s.generateMetricName(m, "key")) + require.Equal(t, "foo/namespace/uptime/key", s.generateMetricName(m, m.Type(), "key")) } func TestStackdriverMetricNameOfficial(t *testing.T) { @@ -838,7 +958,7 @@ func TestStackdriverMetricNameOfficial(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - require.Equal(t, tt.expected, s.generateMetricName(tt.metric, tt.key)) + require.Equal(t, tt.expected, s.generateMetricName(tt.metric, tt.metric.Type(), tt.key)) }) } }