feat(outputs.stackdriver): Add metric type config options (#14017)
* feat(outputs.stackdriver): Add metric type config options This adds two new user config options that take metric names, with globs as well. Metric's matching these names will have their type set before sending to Stackdriver. fixes: #14006 * run make docs
This commit is contained in:
parent
69612a8e4a
commit
6b01384c6e
|
|
@ -69,6 +69,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## Custom resource type
|
## Custom resource type
|
||||||
# resource_type = "generic_node"
|
# resource_type = "generic_node"
|
||||||
|
|
||||||
|
## Override metric type by metric name
|
||||||
|
## Metric names matching the values here, globbing supported, will have the
|
||||||
|
## metric type set to the cooresponding type.
|
||||||
|
# metric_counter = []
|
||||||
|
# metric_gauge = []
|
||||||
|
|
||||||
## NOTE: Due to the way TOML is parsed, tables must be at the END of the
|
## NOTE: Due to the way TOML is parsed, tables must be at the END of the
|
||||||
## plugin definition, otherwise additional config options are read as part of
|
## plugin definition, otherwise additional config options are read as part of
|
||||||
## the table
|
## the table
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,12 @@
|
||||||
## Custom resource type
|
## Custom resource type
|
||||||
# resource_type = "generic_node"
|
# resource_type = "generic_node"
|
||||||
|
|
||||||
|
## Override metric type by metric name
|
||||||
|
## Metric names matching the values here, globbing supported, will have the
|
||||||
|
## metric type set to the cooresponding type.
|
||||||
|
# metric_counter = []
|
||||||
|
# metric_gauge = []
|
||||||
|
|
||||||
## NOTE: Due to the way TOML is parsed, tables must be at the END of the
|
## NOTE: Due to the way TOML is parsed, tables must be at the END of the
|
||||||
## plugin definition, otherwise additional config options are read as part of
|
## plugin definition, otherwise additional config options are read as part of
|
||||||
## the table
|
## the table
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/filter"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
)
|
)
|
||||||
|
|
@ -36,10 +37,14 @@ type Stackdriver struct {
|
||||||
MetricNameFormat string `toml:"metric_name_format"`
|
MetricNameFormat string `toml:"metric_name_format"`
|
||||||
MetricDataType string `toml:"metric_data_type"`
|
MetricDataType string `toml:"metric_data_type"`
|
||||||
TagsAsResourceLabels []string `toml:"tags_as_resource_label"`
|
TagsAsResourceLabels []string `toml:"tags_as_resource_label"`
|
||||||
|
MetricCounter []string `toml:"metric_counter"`
|
||||||
|
MetricGauge []string `toml:"metric_gauge"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
client *monitoring.MetricClient
|
client *monitoring.MetricClient
|
||||||
counterCache *counterCache
|
counterCache *counterCache
|
||||||
|
filterCounter filter.Filter
|
||||||
|
filterGauge filter.Filter
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -78,6 +83,16 @@ func (s *Stackdriver) Init() error {
|
||||||
return fmt.Errorf("unrecognized metric data type: %s", s.MetricDataType)
|
return fmt.Errorf("unrecognized metric data type: %s", s.MetricDataType)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
s.filterCounter, err = filter.Compile(s.MetricCounter)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating counter filter failed: %w", err)
|
||||||
|
}
|
||||||
|
s.filterGauge, err = filter.Compile(s.MetricGauge)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating gauge filter failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -201,9 +216,18 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
metricKind, err := getStackdriverMetricKind(m.Type())
|
// Set metric types based on user-provided filter
|
||||||
|
metricType := m.Type()
|
||||||
|
if s.filterCounter != nil && s.filterCounter.Match(m.Name()) {
|
||||||
|
metricType = telegraf.Counter
|
||||||
|
}
|
||||||
|
if s.filterGauge != nil && s.filterGauge.Match(m.Name()) {
|
||||||
|
metricType = telegraf.Gauge
|
||||||
|
}
|
||||||
|
|
||||||
|
metricKind, err := getStackdriverMetricKind(metricType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), m.Type(), f, err)
|
s.Log.Errorf("Get kind for metric %q (%T) field %q failed: %s", m.Name(), metricType, f, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -237,7 +261,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
|
||||||
// Prepare time series.
|
// Prepare time series.
|
||||||
timeSeries := &monitoringpb.TimeSeries{
|
timeSeries := &monitoringpb.TimeSeries{
|
||||||
Metric: &metricpb.Metric{
|
Metric: &metricpb.Metric{
|
||||||
Type: s.generateMetricName(m, f.Key),
|
Type: s.generateMetricName(m, metricType, f.Key),
|
||||||
Labels: s.getStackdriverLabels(m.TagList()),
|
Labels: s.getStackdriverLabels(m.TagList()),
|
||||||
},
|
},
|
||||||
MetricKind: metricKind,
|
MetricKind: metricKind,
|
||||||
|
|
@ -271,7 +295,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
|
||||||
|
|
||||||
counterTimeSeries := &monitoringpb.TimeSeries{
|
counterTimeSeries := &monitoringpb.TimeSeries{
|
||||||
Metric: &metricpb.Metric{
|
Metric: &metricpb.Metric{
|
||||||
Type: s.generateMetricName(m, f.Key) + ":counter",
|
Type: s.generateMetricName(m, metricType, f.Key) + ":counter",
|
||||||
Labels: s.getStackdriverLabels(m.TagList()),
|
Labels: s.getStackdriverLabels(m.TagList()),
|
||||||
},
|
},
|
||||||
MetricKind: metricpb.MetricDescriptor_CUMULATIVE,
|
MetricKind: metricpb.MetricDescriptor_CUMULATIVE,
|
||||||
|
|
@ -337,7 +361,7 @@ func (s *Stackdriver) sendBatch(batch []telegraf.Metric) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Stackdriver) generateMetricName(m telegraf.Metric, key string) string {
|
func (s *Stackdriver) generateMetricName(m telegraf.Metric, metricType telegraf.ValueType, key string) string {
|
||||||
if s.MetricNameFormat == "path" {
|
if s.MetricNameFormat == "path" {
|
||||||
return path.Join(s.MetricTypePrefix, s.Namespace, m.Name(), key)
|
return path.Join(s.MetricTypePrefix, s.Namespace, m.Name(), key)
|
||||||
}
|
}
|
||||||
|
|
@ -348,7 +372,7 @@ func (s *Stackdriver) generateMetricName(m telegraf.Metric, key string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
var kind string
|
var kind string
|
||||||
switch m.Type() {
|
switch metricType {
|
||||||
case telegraf.Gauge:
|
case telegraf.Gauge:
|
||||||
kind = "gauge"
|
kind = "gauge"
|
||||||
case telegraf.Untyped:
|
case telegraf.Untyped:
|
||||||
|
|
|
||||||
|
|
@ -219,6 +219,126 @@ func TestWriteTagsAsResourceLabels(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWriteMetricTypesOfficial(t *testing.T) {
|
||||||
|
expectedResponse := &emptypb.Empty{}
|
||||||
|
mockMetric.err = nil
|
||||||
|
mockMetric.reqs = nil
|
||||||
|
mockMetric.resps = append(mockMetric.resps[:0], expectedResponse)
|
||||||
|
|
||||||
|
c, err := monitoring.NewMetricClient(context.Background(), clientOpt)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Stackdriver{
|
||||||
|
Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
|
||||||
|
Namespace: "test",
|
||||||
|
ResourceLabels: map[string]string{
|
||||||
|
"mylabel": "myvalue",
|
||||||
|
},
|
||||||
|
MetricNameFormat: "official",
|
||||||
|
MetricCounter: []string{"mem_c"},
|
||||||
|
MetricGauge: []string{"mem_g"},
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
client: c,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
testutil.MustMetric("mem_g",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42,
|
||||||
|
},
|
||||||
|
time.Unix(3, 0),
|
||||||
|
),
|
||||||
|
testutil.MustMetric("mem_c",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42,
|
||||||
|
},
|
||||||
|
time.Unix(3, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, s.Connect())
|
||||||
|
require.NoError(t, s.Write(metrics))
|
||||||
|
require.Len(t, mockMetric.reqs, 1)
|
||||||
|
|
||||||
|
request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest)
|
||||||
|
require.Len(t, request.TimeSeries, 2)
|
||||||
|
for _, ts := range request.TimeSeries {
|
||||||
|
switch ts.Metric.Type {
|
||||||
|
case "custom.googleapis.com/test_mem_c_value/counter":
|
||||||
|
require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind)
|
||||||
|
case "custom.googleapis.com/test_mem_g_value/gauge":
|
||||||
|
require.Equal(t, metricpb.MetricDescriptor_GAUGE, ts.MetricKind)
|
||||||
|
default:
|
||||||
|
require.False(t, true, "Unknown metric type", ts.Metric.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWriteMetricTypesPath(t *testing.T) {
|
||||||
|
expectedResponse := &emptypb.Empty{}
|
||||||
|
mockMetric.err = nil
|
||||||
|
mockMetric.reqs = nil
|
||||||
|
mockMetric.resps = append(mockMetric.resps[:0], expectedResponse)
|
||||||
|
|
||||||
|
c, err := monitoring.NewMetricClient(context.Background(), clientOpt)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &Stackdriver{
|
||||||
|
Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
|
||||||
|
Namespace: "test",
|
||||||
|
ResourceLabels: map[string]string{
|
||||||
|
"mylabel": "myvalue",
|
||||||
|
},
|
||||||
|
MetricNameFormat: "path",
|
||||||
|
MetricCounter: []string{"mem_c"},
|
||||||
|
MetricGauge: []string{"mem_g"},
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
client: c,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
|
metrics := []telegraf.Metric{
|
||||||
|
testutil.MustMetric("mem_g",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42,
|
||||||
|
},
|
||||||
|
time.Unix(3, 0),
|
||||||
|
),
|
||||||
|
testutil.MustMetric("mem_c",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 42,
|
||||||
|
},
|
||||||
|
time.Unix(3, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, s.Connect())
|
||||||
|
require.NoError(t, s.Write(metrics))
|
||||||
|
require.Len(t, mockMetric.reqs, 1)
|
||||||
|
|
||||||
|
request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest)
|
||||||
|
require.Len(t, request.TimeSeries, 2)
|
||||||
|
for _, ts := range request.TimeSeries {
|
||||||
|
switch ts.Metric.Type {
|
||||||
|
case "custom.googleapis.com/test/mem_c/value":
|
||||||
|
require.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind)
|
||||||
|
case "custom.googleapis.com/test/mem_g/value":
|
||||||
|
require.Equal(t, metricpb.MetricDescriptor_GAUGE, ts.MetricKind)
|
||||||
|
default:
|
||||||
|
require.False(t, true, "Unknown metric type", ts.Metric.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWriteAscendingTime(t *testing.T) {
|
func TestWriteAscendingTime(t *testing.T) {
|
||||||
expectedResponse := &emptypb.Empty{}
|
expectedResponse := &emptypb.Empty{}
|
||||||
mockMetric.err = nil
|
mockMetric.err = nil
|
||||||
|
|
@ -749,7 +869,7 @@ func TestStackdriverMetricNamePath(t *testing.T) {
|
||||||
time.Now(),
|
time.Now(),
|
||||||
telegraf.Gauge,
|
telegraf.Gauge,
|
||||||
)
|
)
|
||||||
require.Equal(t, "foo/namespace/uptime/key", s.generateMetricName(m, "key"))
|
require.Equal(t, "foo/namespace/uptime/key", s.generateMetricName(m, m.Type(), "key"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStackdriverMetricNameOfficial(t *testing.T) {
|
func TestStackdriverMetricNameOfficial(t *testing.T) {
|
||||||
|
|
@ -838,7 +958,7 @@ func TestStackdriverMetricNameOfficial(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
require.Equal(t, tt.expected, s.generateMetricName(tt.metric, tt.key))
|
require.Equal(t, tt.expected, s.generateMetricName(tt.metric, tt.metric.Type(), tt.key))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue