diff --git a/config/config_test.go b/config/config_test.go index 709c88972..3a58a912d 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -34,6 +34,7 @@ import ( "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/serializers" _ "github.com/influxdata/telegraf/plugins/serializers/all" // Blank import to have all serializers for testing + promserializer "github.com/influxdata/telegraf/plugins/serializers/prometheus" "github.com/influxdata/telegraf/testutil" ) @@ -612,6 +613,7 @@ func TestConfig_SerializerInterfaceNewFormat(t *testing.T) { // Ignore all unexported fields and fields not relevant for functionality options := []cmp.Option{ cmpopts.IgnoreUnexported(stype), + cmpopts.IgnoreUnexported(reflect.Indirect(reflect.ValueOf(promserializer.MetricTypes{})).Interface()), cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}), cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}), } @@ -703,6 +705,7 @@ func TestConfig_SerializerInterfaceOldFormat(t *testing.T) { // Ignore all unexported fields and fields not relevant for functionality options := []cmp.Option{ cmpopts.IgnoreUnexported(stype), + cmpopts.IgnoreUnexported(reflect.Indirect(reflect.ValueOf(promserializer.MetricTypes{})).Interface()), cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}), cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}), } diff --git a/plugins/outputs/prometheus_client/README.md b/plugins/outputs/prometheus_client/README.md index 7725d9353..7d37aede8 100644 --- a/plugins/outputs/prometheus_client/README.md +++ b/plugins/outputs/prometheus_client/README.md @@ -62,6 +62,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Export metric collection time. # export_timestamp = false + + ## Specify the metric type explicitly. + ## This overrides the metric-type of the Telegraf metric. Globbing is allowed. + # [outputs.prometheus_client.prometheus_metric_types] + # counter = [] + # gauge = [] ``` ## Metrics diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go index 610ed9444..9fcf60997 100644 --- a/plugins/outputs/prometheus_client/prometheus_client.go +++ b/plugins/outputs/prometheus_client/prometheus_client.go @@ -23,6 +23,7 @@ import ( "github.com/influxdata/telegraf/plugins/outputs" v1 "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v1" v2 "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v2" + serializer "github.com/influxdata/telegraf/plugins/serializers/prometheus" ) //go:embed sample.conf @@ -43,18 +44,19 @@ type Collector interface { } type PrometheusClient struct { - Listen string `toml:"listen"` - ReadTimeout config.Duration `toml:"read_timeout"` - WriteTimeout config.Duration `toml:"write_timeout"` - MetricVersion int `toml:"metric_version"` - BasicUsername string `toml:"basic_username"` - BasicPassword string `toml:"basic_password"` - IPRange []string `toml:"ip_range"` - ExpirationInterval config.Duration `toml:"expiration_interval"` - Path string `toml:"path"` - CollectorsExclude []string `toml:"collectors_exclude"` - StringAsLabel bool `toml:"string_as_label"` - ExportTimestamp bool `toml:"export_timestamp"` + Listen string `toml:"listen"` + ReadTimeout config.Duration `toml:"read_timeout"` + WriteTimeout config.Duration `toml:"write_timeout"` + MetricVersion int `toml:"metric_version"` + BasicUsername string `toml:"basic_username"` + BasicPassword string `toml:"basic_password"` + IPRange []string `toml:"ip_range"` + ExpirationInterval config.Duration `toml:"expiration_interval"` + Path string `toml:"path"` + CollectorsExclude []string `toml:"collectors_exclude"` + StringAsLabel bool `toml:"string_as_label"` + ExportTimestamp bool `toml:"export_timestamp"` + TypeMappings serializer.MetricTypes `toml:"metric_types"` tlsint.ServerConfig Log telegraf.Logger `toml:"-"` @@ -96,17 +98,32 @@ func (p *PrometheusClient) Init() error { } } + if err := p.TypeMappings.Init(); err != nil { + return err + } + switch p.MetricVersion { default: fallthrough case 1: - p.collector = v1.NewCollector(time.Duration(p.ExpirationInterval), p.StringAsLabel, p.ExportTimestamp, p.Log) + p.collector = v1.NewCollector( + time.Duration(p.ExpirationInterval), + p.StringAsLabel, + p.ExportTimestamp, + p.TypeMappings, + p.Log, + ) err := registry.Register(p.collector) if err != nil { return err } case 2: - p.collector = v2.NewCollector(time.Duration(p.ExpirationInterval), p.StringAsLabel, p.ExportTimestamp) + p.collector = v2.NewCollector( + time.Duration(p.ExpirationInterval), + p.StringAsLabel, + p.ExportTimestamp, + p.TypeMappings, + ) err := registry.Register(p.collector) if err != nil { return err diff --git a/plugins/outputs/prometheus_client/prometheus_client_v1_test.go b/plugins/outputs/prometheus_client/prometheus_client_v1_test.go index 475b2cadf..43923a0bc 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_v1_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_v1_test.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/telegraf" inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus" + "github.com/influxdata/telegraf/plugins/serializers/prometheus" "github.com/influxdata/telegraf/testutil" ) @@ -268,24 +269,76 @@ rpc_duration_seconds{quantile="0.9"} 9001 rpc_duration_seconds{quantile="0.99"} 76656 rpc_duration_seconds_sum 1.7560473e+07 rpc_duration_seconds_count 2693 +`), + }, + { + name: "prometheus untyped forced to counter", + output: &PrometheusClient{ + Listen: ":0", + MetricVersion: 1, + CollectorsExclude: []string{"gocollector", "process"}, + Path: "/metrics", + TypeMappings: prometheus.MetricTypes{Counter: []string{"cpu_time_idle"}}, + Log: logger, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu_time_idle", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + }, + expected: []byte(` +# HELP cpu_time_idle Telegraf collected metric +# TYPE cpu_time_idle counter +cpu_time_idle{host="example.org"} 42 +`), + }, + { + name: "prometheus untyped forced to gauge", + output: &PrometheusClient{ + Listen: ":0", + MetricVersion: 1, + CollectorsExclude: []string{"gocollector", "process"}, + Path: "/metrics", + TypeMappings: prometheus.MetricTypes{Gauge: []string{"cpu_time_idle"}}, + Log: logger, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "cpu_time_idle", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []byte(` +# HELP cpu_time_idle Telegraf collected metric +# TYPE cpu_time_idle gauge +cpu_time_idle{host="example.org"} 42 `), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := tt.output.Init() - require.NoError(t, err) + require.NoError(t, tt.output.Init()) - err = tt.output.Connect() - require.NoError(t, err) + require.NoError(t, tt.output.Connect()) defer func() { - err := tt.output.Close() - require.NoError(t, err) + require.NoError(t, tt.output.Close()) }() - err = tt.output.Write(tt.metrics) - require.NoError(t, err) + require.NoError(t, tt.output.Write(tt.metrics)) resp, err := http.Get(tt.output.URL()) require.NoError(t, err) diff --git a/plugins/outputs/prometheus_client/prometheus_client_v2_test.go b/plugins/outputs/prometheus_client/prometheus_client_v2_test.go index 6a4ce7a87..5d4f13c85 100644 --- a/plugins/outputs/prometheus_client/prometheus_client_v2_test.go +++ b/plugins/outputs/prometheus_client/prometheus_client_v2_test.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/telegraf" inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus" + "github.com/influxdata/telegraf/plugins/serializers/prometheus" "github.com/influxdata/telegraf/testutil" ) @@ -299,24 +300,75 @@ cpu_usage_idle_count{cpu="cpu1"} 20 cpu_usage_idle_bucket{cpu="cpu1",le="+Inf"} 20 cpu_usage_idle_sum{cpu="cpu1"} 2000 cpu_usage_idle_count{cpu="cpu1"} 20 +`), + }, + { + name: "untyped forced to counter", + output: &PrometheusClient{ + Listen: ":0", + MetricVersion: 2, + CollectorsExclude: []string{"gocollector", "process"}, + Path: "/metrics", + TypeMappings: prometheus.MetricTypes{Counter: []string{"cpu_time_idle"}}, + Log: logger, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{ + "cpu_time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + expected: []byte(` +# HELP cpu_time_idle Telegraf collected metric +# TYPE cpu_time_idle counter +cpu_time_idle{host="example.org"} 42 +`), + }, + { + name: "untyped forced to gauge", + output: &PrometheusClient{ + Listen: ":0", + MetricVersion: 2, + CollectorsExclude: []string{"gocollector", "process"}, + Path: "/metrics", + TypeMappings: prometheus.MetricTypes{Gauge: []string{"cpu_time_idle"}}, + Log: logger, + }, + metrics: []telegraf.Metric{ + testutil.MustMetric( + "prometheus", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{ + "cpu_time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []byte(` +# HELP cpu_time_idle Telegraf collected metric +# TYPE cpu_time_idle gauge +cpu_time_idle{host="example.org"} 42 `), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := tt.output.Init() - require.NoError(t, err) - - err = tt.output.Connect() - require.NoError(t, err) + require.NoError(t, tt.output.Init()) + require.NoError(t, tt.output.Connect()) defer func() { - err := tt.output.Close() - require.NoError(t, err) + require.NoError(t, tt.output.Close()) }() - err = tt.output.Write(tt.metrics) - require.NoError(t, err) + require.NoError(t, tt.output.Write(tt.metrics)) resp, err := http.Get(tt.output.URL()) require.NoError(t, err) diff --git a/plugins/outputs/prometheus_client/sample.conf b/plugins/outputs/prometheus_client/sample.conf index 9620178f3..e6d65b940 100644 --- a/plugins/outputs/prometheus_client/sample.conf +++ b/plugins/outputs/prometheus_client/sample.conf @@ -45,3 +45,9 @@ ## Export metric collection time. # export_timestamp = false + + ## Specify the metric type explicitly. + ## This overrides the metric-type of the Telegraf metric. Globbing is allowed. + # [outputs.prometheus_client.prometheus_metric_types] + # counter = [] + # gauge = [] diff --git a/plugins/outputs/prometheus_client/v1/collector.go b/plugins/outputs/prometheus_client/v1/collector.go index 700691cba..b25a766af 100644 --- a/plugins/outputs/prometheus_client/v1/collector.go +++ b/plugins/outputs/prometheus_client/v1/collector.go @@ -54,6 +54,7 @@ type Collector struct { ExpirationInterval time.Duration StringAsLabel bool ExportTimestamp bool + TypeMapping serializer.MetricTypes Log telegraf.Logger sync.Mutex @@ -61,11 +62,18 @@ type Collector struct { expireTicker *time.Ticker } -func NewCollector(expire time.Duration, stringsAsLabel bool, exportTimestamp bool, logger telegraf.Logger) *Collector { +func NewCollector( + expire time.Duration, + stringsAsLabel bool, + exportTimestamp bool, + typeMapping serializer.MetricTypes, + logger telegraf.Logger, +) *Collector { c := &Collector{ ExpirationInterval: expire, StringAsLabel: stringsAsLabel, ExportTimestamp: exportTimestamp, + TypeMapping: typeMapping, Log: logger, fam: make(map[string]*MetricFamily), } @@ -176,9 +184,10 @@ func (c *Collector) addMetricFamily(point telegraf.Metric, sample *Sample, mname var fam *MetricFamily var ok bool if fam, ok = c.fam[mname]; !ok { + pointType := c.TypeMapping.DetermineType(mname, point) fam = &MetricFamily{ Samples: make(map[SampleID]*Sample), - TelegrafValueType: point.Type(), + TelegrafValueType: pointType, LabelSet: make(map[string]int), } c.fam[mname] = fam diff --git a/plugins/outputs/prometheus_client/v2/collector.go b/plugins/outputs/prometheus_client/v2/collector.go index 4505c980d..875a7cef5 100644 --- a/plugins/outputs/prometheus_client/v2/collector.go +++ b/plugins/outputs/prometheus_client/v2/collector.go @@ -43,10 +43,16 @@ type Collector struct { coll *serializer.Collection } -func NewCollector(expire time.Duration, stringsAsLabel bool, exportTimestamp bool) *Collector { +func NewCollector( + expire time.Duration, + stringsAsLabel bool, + exportTimestamp bool, + typeMapping serializer.MetricTypes, +) *Collector { cfg := serializer.FormatConfig{ StringAsLabel: stringsAsLabel, ExportTimestamp: exportTimestamp, + TypeMappings: typeMapping, } return &Collector{ diff --git a/plugins/serializers/prometheus/README.md b/plugins/serializers/prometheus/README.md index 0fbcc5eaf..df38a8c37 100644 --- a/plugins/serializers/prometheus/README.md +++ b/plugins/serializers/prometheus/README.md @@ -40,6 +40,12 @@ reporting others every bucket/quantile will continue to exist. ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "prometheus" + + ## Specify the metric type explicitly. + ## This overrides the metric-type of the Telegraf metric. Globbing is allowed. + [outputs.file.prometheus_metric_types] + counter = [] + gauge = [] ``` ### Metrics diff --git a/plugins/serializers/prometheus/collection.go b/plugins/serializers/prometheus/collection.go index 3754c2920..63f55cf82 100644 --- a/plugins/serializers/prometheus/collection.go +++ b/plugins/serializers/prometheus/collection.go @@ -188,10 +188,11 @@ func (c *Collection) Add(metric telegraf.Metric, now time.Time) { if !ok { continue } + metricType := c.config.TypeMappings.DetermineType(metricName, metric) family := MetricFamily{ Name: metricName, - Type: metric.Type(), + Type: metricType, } entry, ok := c.Entries[family] diff --git a/plugins/serializers/prometheus/prometheus.go b/plugins/serializers/prometheus/prometheus.go index d259640fb..215754cb2 100644 --- a/plugins/serializers/prometheus/prometheus.go +++ b/plugins/serializers/prometheus/prometheus.go @@ -2,14 +2,49 @@ package prometheus import ( "bytes" + "fmt" "time" "github.com/prometheus/common/expfmt" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/plugins/serializers" ) +type MetricTypes struct { + Counter []string `toml:"counter"` + Gauge []string `toml:"gauge"` + + filterCounter filter.Filter + filterGauge filter.Filter +} + +func (mt *MetricTypes) Init() error { + // Setup the explicit type mappings + var err error + mt.filterCounter, err = filter.Compile(mt.Counter) + if err != nil { + return fmt.Errorf("creating counter filter failed: %w", err) + } + mt.filterGauge, err = filter.Compile(mt.Gauge) + if err != nil { + return fmt.Errorf("creating gauge filter failed: %w", err) + } + return nil +} + +func (mt *MetricTypes) DetermineType(name string, m telegraf.Metric) telegraf.ValueType { + metricType := m.Type() + if mt.filterCounter != nil && mt.filterCounter.Match(name) { + metricType = telegraf.Counter + } + if mt.filterGauge != nil && mt.filterGauge.Match(name) { + metricType = telegraf.Gauge + } + return metricType +} + type FormatConfig struct { ExportTimestamp bool `toml:"prometheus_export_timestamp"` SortMetrics bool `toml:"prometheus_sort_metrics"` @@ -17,13 +52,18 @@ type FormatConfig struct { // CompactEncoding defines whether to include // HELP metadata in Prometheus payload. Setting to true // helps to reduce payload size. - CompactEncoding bool `toml:"prometheus_compact_encoding"` + CompactEncoding bool `toml:"prometheus_compact_encoding"` + TypeMappings MetricTypes `toml:"prometheus_metric_types"` } type Serializer struct { FormatConfig } +func (s *Serializer) Init() error { + return s.FormatConfig.TypeMappings.Init() +} + func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { return s.SerializeBatch([]telegraf.Metric{metric}) } diff --git a/plugins/serializers/prometheus/prometheus_test.go b/plugins/serializers/prometheus/prometheus_test.go index 356e4363c..a7b2b8f9d 100644 --- a/plugins/serializers/prometheus/prometheus_test.go +++ b/plugins/serializers/prometheus/prometheus_test.go @@ -177,6 +177,48 @@ cpu_time_idle{host="example.org"} 42 1574279268000 expected: []byte(` # TYPE cpu_time_idle untyped cpu_time_idle{host="example.org"} 42 +`), + }, + { + name: "untyped forced to counter", + config: FormatConfig{ + TypeMappings: MetricTypes{Counter: []string{"cpu_time_idle"}}, + }, + metric: testutil.MustMetric( + "cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + expected: []byte(` +# HELP cpu_time_idle Telegraf collected metric +# TYPE cpu_time_idle counter +cpu_time_idle{host="example.org"} 42 +`), + }, + { + name: "untyped forced to gauge", + config: FormatConfig{ + TypeMappings: MetricTypes{Gauge: []string{"cpu_time_idle"}}, + }, + metric: testutil.MustMetric( + "cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + expected: []byte(` +# HELP cpu_time_idle Telegraf collected metric +# TYPE cpu_time_idle gauge +cpu_time_idle{host="example.org"} 42 `), }, } @@ -188,8 +230,10 @@ cpu_time_idle{host="example.org"} 42 ExportTimestamp: tt.config.ExportTimestamp, StringAsLabel: tt.config.StringAsLabel, CompactEncoding: tt.config.CompactEncoding, + TypeMappings: tt.config.TypeMappings, }, } + require.NoError(t, s.Init()) actual, err := s.Serialize(tt.metric) require.NoError(t, err)