diff --git a/plugins/outputs/stackdriver/README.md b/plugins/outputs/stackdriver/README.md index 53c5a483d..570fa70ff 100644 --- a/plugins/outputs/stackdriver/README.md +++ b/plugins/outputs/stackdriver/README.md @@ -69,6 +69,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Custom resource type # resource_type = "generic_node" + ## Override metric type by metric name + ## Metric names matching the values here, globbing supported, will have the + ## metric type set to the cooresponding type. + # metric_counter = [] + # metric_gauge = [] + ## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## plugin definition, otherwise additional config options are read as part of ## the table diff --git a/plugins/outputs/stackdriver/sample.conf b/plugins/outputs/stackdriver/sample.conf index 1550d558a..e9896f993 100644 --- a/plugins/outputs/stackdriver/sample.conf +++ b/plugins/outputs/stackdriver/sample.conf @@ -34,6 +34,12 @@ ## Custom resource type # resource_type = "generic_node" + ## Override metric type by metric name + ## Metric names matching the values here, globbing supported, will have the + ## metric type set to the cooresponding type. + # metric_counter = [] + # metric_gauge = [] + ## NOTE: Due to the way TOML is parsed, tables must be at the END of the ## plugin definition, otherwise additional config options are read as part of ## the table diff --git a/plugins/outputs/stackdriver/stackdriver.go b/plugins/outputs/stackdriver/stackdriver.go index d9eab15e4..d5f2b5269 100644 --- a/plugins/outputs/stackdriver/stackdriver.go +++ b/plugins/outputs/stackdriver/stackdriver.go @@ -19,6 +19,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/outputs" ) @@ -36,10 +37,14 @@ type Stackdriver struct { MetricNameFormat string `toml:"metric_name_format"` MetricDataType string `toml:"metric_data_type"` TagsAsResourceLabels []string `toml:"tags_as_resource_label"` + MetricCounter []string `toml:"metric_counter"` + MetricGauge []string `toml:"metric_gauge"` Log telegraf.Logger `toml:"-"` - client *monitoring.MetricClient - counterCache *counterCache + client *monitoring.MetricClient + counterCache *counterCache + filterCounter filter.Filter + filterGauge filter.Filter } const ( @@ -78,6 +83,16 @@ func (s *Stackdriver) Init() error { return fmt.Errorf("unrecognized metric data type: %s", s.MetricDataType) } + var err error + s.filterCounter, err = filter.Compile(s.MetricCounter) + if err != nil { + return fmt.Errorf("creating counter filter failed: %w", err) + } + s.filterGauge, err = filter.Compile(s.MetricGauge) + if err != nil { + return fmt.Errorf("creating gauge filter failed: %w", err) + } + return nil } @@ -201,9 +216,18 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { continue } - metricKind, err := getStackdriverMetricKind(m.Type()) + // Set metric types based on user-provided filter + metricType := m.Type() + if s.filterCounter != nil && s.filterCounter.Match(m.Name()) { + metricType = telegraf.Counter + } + if s.filterGauge != nil && s.filterGauge.Match(m.Name()) { + metricType = telegraf.Gauge + } + + metricKind, err := getStackdriverMetricKind(metricType) if err != nil { - s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), m.Type(), f, err) + s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), metricType, f, err) continue } @@ -237,7 +261,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { // Prepare time series. timeSeries := &monitoringpb.TimeSeries{ Metric: &metricpb.Metric{ - Type: s.generateMetricName(m, f.Key), + Type: s.generateMetricName(m, metricType, f.Key), Labels: s.getStackdriverLabels(m.TagList()), }, MetricKind: metricKind, @@ -271,7 +295,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { counterTimeSeries := &monitoringpb.TimeSeries{ Metric: &metricpb.Metric{ - Type: s.generateMetricName(m, f.Key) + ":counter", + Type: s.generateMetricName(m, metricType, f.Key) + ":counter", Labels: s.getStackdriverLabels(m.TagList()), }, MetricKind: metricpb.MetricDescriptor_CUMULATIVE, @@ -337,7 +361,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error { return nil } -func (s *Stackdriver) generateMetricName(m telegraf.Metric, key string) string { +func (s *Stackdriver) generateMetricName(m telegraf.Metric, metricType telegraf.ValueType, key string) string { if s.MetricNameFormat == "path" { return path.Join(s.MetricTypePrefix, s.Namespace, m.Name(), key) } @@ -348,7 +372,7 @@ func (s *Stackdriver) generateMetricName(m telegraf.Metric, key string) string { } var kind string - switch m.Type() { + switch metricType { case telegraf.Gauge: kind = "gauge" case telegraf.Untyped: diff --git a/plugins/outputs/stackdriver/stackdriver_test.go b/plugins/outputs/stackdriver/stackdriver_test.go index 1ab69f5fc..26d05f96a 100644 --- a/plugins/outputs/stackdriver/stackdriver_test.go +++ b/plugins/outputs/stackdriver/stackdriver_test.go @@ -219,6 +219,126 @@ func TestWriteTagsAsResourceLabels(t *testing.T) { } } +func TestWriteMetricTypesOfficial(t *testing.T) { + expectedResponse := &emptypb.Empty{} + mockMetric.err = nil + mockMetric.reqs = nil + mockMetric.resps = append(mockMetric.resps[:0], expectedResponse) + + c, err := monitoring.NewMetricClient(context.Background(), clientOpt) + if err != nil { + t.Fatal(err) + } + + s := &Stackdriver{ + Project: fmt.Sprintf("projects/%s", "[PROJECT]"), + Namespace: "test", + ResourceLabels: map[string]string{ + "mylabel": "myvalue", + }, + MetricNameFormat: "official", + MetricCounter: []string{"mem_c"}, + MetricGauge: []string{"mem_g"}, + Log: testutil.Logger{}, + client: c, + } + require.NoError(t, s.Init()) + + metrics := []telegraf.Metric{ + testutil.MustMetric("mem_g", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(3, 0), + ), + testutil.MustMetric("mem_c", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(3, 0), + ), + } + + require.NoError(t, s.Connect()) + require.NoError(t, s.Write(metrics)) + require.Len(t, mockMetric.reqs, 1) + + request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 2) + for _, ts := range request.TimeSeries { + switch ts.Metric.Type { + case "custom.googleapis.com/test_mem_c_value/counter": + require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind) + case "custom.googleapis.com/test_mem_g_value/gauge": + require.Equal(t, metricpb.MetricDescriptor_GAUGE, ts.MetricKind) + default: + require.False(t, true, "Unknown metric type", ts.Metric.Type) + } + } +} + +func TestWriteMetricTypesPath(t *testing.T) { + expectedResponse := &emptypb.Empty{} + mockMetric.err = nil + mockMetric.reqs = nil + mockMetric.resps = append(mockMetric.resps[:0], expectedResponse) + + c, err := monitoring.NewMetricClient(context.Background(), clientOpt) + if err != nil { + t.Fatal(err) + } + + s := &Stackdriver{ + Project: fmt.Sprintf("projects/%s", "[PROJECT]"), + Namespace: "test", + ResourceLabels: map[string]string{ + "mylabel": "myvalue", + }, + MetricNameFormat: "path", + MetricCounter: []string{"mem_c"}, + MetricGauge: []string{"mem_g"}, + Log: testutil.Logger{}, + client: c, + } + require.NoError(t, s.Init()) + + metrics := []telegraf.Metric{ + testutil.MustMetric("mem_g", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(3, 0), + ), + testutil.MustMetric("mem_c", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(3, 0), + ), + } + + require.NoError(t, s.Connect()) + require.NoError(t, s.Write(metrics)) + require.Len(t, mockMetric.reqs, 1) + + request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest) + require.Len(t, request.TimeSeries, 2) + for _, ts := range request.TimeSeries { + switch ts.Metric.Type { + case "custom.googleapis.com/test/mem_c/value": + require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind) + case "custom.googleapis.com/test/mem_g/value": + require.Equal(t, metricpb.MetricDescriptor_GAUGE, ts.MetricKind) + default: + require.False(t, true, "Unknown metric type", ts.Metric.Type) + } + } +} + func TestWriteAscendingTime(t *testing.T) { expectedResponse := &emptypb.Empty{} mockMetric.err = nil @@ -749,7 +869,7 @@ func TestStackdriverMetricNamePath(t *testing.T) { time.Now(), telegraf.Gauge, ) - require.Equal(t, "foo/namespace/uptime/key", s.generateMetricName(m, "key")) + require.Equal(t, "foo/namespace/uptime/key", s.generateMetricName(m, m.Type(), "key")) } func TestStackdriverMetricNameOfficial(t *testing.T) { @@ -838,7 +958,7 @@ func TestStackdriverMetricNameOfficial(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - require.Equal(t, tt.expected, s.generateMetricName(tt.metric, tt.key)) + require.Equal(t, tt.expected, s.generateMetricName(tt.metric, tt.metric.Type(), tt.key)) }) } }