feat(prometheus): Allow to specify metric type (#13874)

This commit is contained in:
Sven Rebhan 2023-09-11 15:45:16 +02:00 committed by GitHub
parent dd5c221656
commit 84b3b587da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 279 additions and 36 deletions

View File

@ -34,6 +34,7 @@ import (
"github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers"
_ "github.com/influxdata/telegraf/plugins/serializers/all" // Blank import to have all serializers for testing _ "github.com/influxdata/telegraf/plugins/serializers/all" // Blank import to have all serializers for testing
promserializer "github.com/influxdata/telegraf/plugins/serializers/prometheus"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -612,6 +613,7 @@ func TestConfig_SerializerInterfaceNewFormat(t *testing.T) {
// Ignore all unexported fields and fields not relevant for functionality // Ignore all unexported fields and fields not relevant for functionality
options := []cmp.Option{ options := []cmp.Option{
cmpopts.IgnoreUnexported(stype), cmpopts.IgnoreUnexported(stype),
cmpopts.IgnoreUnexported(reflect.Indirect(reflect.ValueOf(promserializer.MetricTypes{})).Interface()),
cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}), cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}),
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}), cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
} }
@ -703,6 +705,7 @@ func TestConfig_SerializerInterfaceOldFormat(t *testing.T) {
// Ignore all unexported fields and fields not relevant for functionality // Ignore all unexported fields and fields not relevant for functionality
options := []cmp.Option{ options := []cmp.Option{
cmpopts.IgnoreUnexported(stype), cmpopts.IgnoreUnexported(stype),
cmpopts.IgnoreUnexported(reflect.Indirect(reflect.ValueOf(promserializer.MetricTypes{})).Interface()),
cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}), cmpopts.IgnoreTypes(sync.Mutex{}, regexp.Regexp{}),
cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}), cmpopts.IgnoreInterfaces(struct{ telegraf.Logger }{}),
} }

View File

@ -62,6 +62,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Export metric collection time. ## Export metric collection time.
# export_timestamp = false # export_timestamp = false
## Specify the metric type explicitly.
## This overrides the metric-type of the Telegraf metric. Globbing is allowed.
# [outputs.prometheus_client.prometheus_metric_types]
# counter = []
# gauge = []
``` ```
## Metrics ## Metrics

View File

@ -23,6 +23,7 @@ import (
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
v1 "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v1" v1 "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v1"
v2 "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v2" v2 "github.com/influxdata/telegraf/plugins/outputs/prometheus_client/v2"
serializer "github.com/influxdata/telegraf/plugins/serializers/prometheus"
) )
//go:embed sample.conf //go:embed sample.conf
@ -43,18 +44,19 @@ type Collector interface {
} }
type PrometheusClient struct { type PrometheusClient struct {
Listen string `toml:"listen"` Listen string `toml:"listen"`
ReadTimeout config.Duration `toml:"read_timeout"` ReadTimeout config.Duration `toml:"read_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"` WriteTimeout config.Duration `toml:"write_timeout"`
MetricVersion int `toml:"metric_version"` MetricVersion int `toml:"metric_version"`
BasicUsername string `toml:"basic_username"` BasicUsername string `toml:"basic_username"`
BasicPassword string `toml:"basic_password"` BasicPassword string `toml:"basic_password"`
IPRange []string `toml:"ip_range"` IPRange []string `toml:"ip_range"`
ExpirationInterval config.Duration `toml:"expiration_interval"` ExpirationInterval config.Duration `toml:"expiration_interval"`
Path string `toml:"path"` Path string `toml:"path"`
CollectorsExclude []string `toml:"collectors_exclude"` CollectorsExclude []string `toml:"collectors_exclude"`
StringAsLabel bool `toml:"string_as_label"` StringAsLabel bool `toml:"string_as_label"`
ExportTimestamp bool `toml:"export_timestamp"` ExportTimestamp bool `toml:"export_timestamp"`
TypeMappings serializer.MetricTypes `toml:"metric_types"`
tlsint.ServerConfig tlsint.ServerConfig
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
@ -96,17 +98,32 @@ func (p *PrometheusClient) Init() error {
} }
} }
if err := p.TypeMappings.Init(); err != nil {
return err
}
switch p.MetricVersion { switch p.MetricVersion {
default: default:
fallthrough fallthrough
case 1: case 1:
p.collector = v1.NewCollector(time.Duration(p.ExpirationInterval), p.StringAsLabel, p.ExportTimestamp, p.Log) p.collector = v1.NewCollector(
time.Duration(p.ExpirationInterval),
p.StringAsLabel,
p.ExportTimestamp,
p.TypeMappings,
p.Log,
)
err := registry.Register(p.collector) err := registry.Register(p.collector)
if err != nil { if err != nil {
return err return err
} }
case 2: case 2:
p.collector = v2.NewCollector(time.Duration(p.ExpirationInterval), p.StringAsLabel, p.ExportTimestamp) p.collector = v2.NewCollector(
time.Duration(p.ExpirationInterval),
p.StringAsLabel,
p.ExportTimestamp,
p.TypeMappings,
)
err := registry.Register(p.collector) err := registry.Register(p.collector)
if err != nil { if err != nil {
return err return err

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus" inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -268,24 +269,76 @@ rpc_duration_seconds{quantile="0.9"} 9001
rpc_duration_seconds{quantile="0.99"} 76656 rpc_duration_seconds{quantile="0.99"} 76656
rpc_duration_seconds_sum 1.7560473e+07 rpc_duration_seconds_sum 1.7560473e+07
rpc_duration_seconds_count 2693 rpc_duration_seconds_count 2693
`),
},
{
name: "prometheus untyped forced to counter",
output: &PrometheusClient{
Listen: ":0",
MetricVersion: 1,
CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics",
TypeMappings: prometheus.MetricTypes{Counter: []string{"cpu_time_idle"}},
Log: logger,
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"host": "example.org",
},
map[string]interface{}{
"value": 42,
},
time.Unix(0, 0),
),
},
expected: []byte(`
# HELP cpu_time_idle Telegraf collected metric
# TYPE cpu_time_idle counter
cpu_time_idle{host="example.org"} 42
`),
},
{
name: "prometheus untyped forced to gauge",
output: &PrometheusClient{
Listen: ":0",
MetricVersion: 1,
CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics",
TypeMappings: prometheus.MetricTypes{Gauge: []string{"cpu_time_idle"}},
Log: logger,
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cpu_time_idle",
map[string]string{
"host": "example.org",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
expected: []byte(`
# HELP cpu_time_idle Telegraf collected metric
# TYPE cpu_time_idle gauge
cpu_time_idle{host="example.org"} 42
`), `),
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
err := tt.output.Init() require.NoError(t, tt.output.Init())
require.NoError(t, err)
err = tt.output.Connect() require.NoError(t, tt.output.Connect())
require.NoError(t, err)
defer func() { defer func() {
err := tt.output.Close() require.NoError(t, tt.output.Close())
require.NoError(t, err)
}() }()
err = tt.output.Write(tt.metrics) require.NoError(t, tt.output.Write(tt.metrics))
require.NoError(t, err)
resp, err := http.Get(tt.output.URL()) resp, err := http.Get(tt.output.URL())
require.NoError(t, err) require.NoError(t, err)

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus" inputs "github.com/influxdata/telegraf/plugins/inputs/prometheus"
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -299,24 +300,75 @@ cpu_usage_idle_count{cpu="cpu1"} 20
cpu_usage_idle_bucket{cpu="cpu1",le="+Inf"} 20 cpu_usage_idle_bucket{cpu="cpu1",le="+Inf"} 20
cpu_usage_idle_sum{cpu="cpu1"} 2000 cpu_usage_idle_sum{cpu="cpu1"} 2000
cpu_usage_idle_count{cpu="cpu1"} 20 cpu_usage_idle_count{cpu="cpu1"} 20
`),
},
{
name: "untyped forced to counter",
output: &PrometheusClient{
Listen: ":0",
MetricVersion: 2,
CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics",
TypeMappings: prometheus.MetricTypes{Counter: []string{"cpu_time_idle"}},
Log: logger,
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"prometheus",
map[string]string{
"host": "example.org",
},
map[string]interface{}{
"cpu_time_idle": 42,
},
time.Unix(0, 0),
),
},
expected: []byte(`
# HELP cpu_time_idle Telegraf collected metric
# TYPE cpu_time_idle counter
cpu_time_idle{host="example.org"} 42
`),
},
{
name: "untyped forced to gauge",
output: &PrometheusClient{
Listen: ":0",
MetricVersion: 2,
CollectorsExclude: []string{"gocollector", "process"},
Path: "/metrics",
TypeMappings: prometheus.MetricTypes{Gauge: []string{"cpu_time_idle"}},
Log: logger,
},
metrics: []telegraf.Metric{
testutil.MustMetric(
"prometheus",
map[string]string{
"host": "example.org",
},
map[string]interface{}{
"cpu_time_idle": 42.0,
},
time.Unix(0, 0),
),
},
expected: []byte(`
# HELP cpu_time_idle Telegraf collected metric
# TYPE cpu_time_idle gauge
cpu_time_idle{host="example.org"} 42
`), `),
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
err := tt.output.Init() require.NoError(t, tt.output.Init())
require.NoError(t, err) require.NoError(t, tt.output.Connect())
err = tt.output.Connect()
require.NoError(t, err)
defer func() { defer func() {
err := tt.output.Close() require.NoError(t, tt.output.Close())
require.NoError(t, err)
}() }()
err = tt.output.Write(tt.metrics) require.NoError(t, tt.output.Write(tt.metrics))
require.NoError(t, err)
resp, err := http.Get(tt.output.URL()) resp, err := http.Get(tt.output.URL())
require.NoError(t, err) require.NoError(t, err)

View File

@ -45,3 +45,9 @@
## Export metric collection time. ## Export metric collection time.
# export_timestamp = false # export_timestamp = false
## Specify the metric type explicitly.
## This overrides the metric-type of the Telegraf metric. Globbing is allowed.
# [outputs.prometheus_client.prometheus_metric_types]
# counter = []
# gauge = []

View File

@ -54,6 +54,7 @@ type Collector struct {
ExpirationInterval time.Duration ExpirationInterval time.Duration
StringAsLabel bool StringAsLabel bool
ExportTimestamp bool ExportTimestamp bool
TypeMapping serializer.MetricTypes
Log telegraf.Logger Log telegraf.Logger
sync.Mutex sync.Mutex
@ -61,11 +62,18 @@ type Collector struct {
expireTicker *time.Ticker expireTicker *time.Ticker
} }
func NewCollector(expire time.Duration, stringsAsLabel bool, exportTimestamp bool, logger telegraf.Logger) *Collector { func NewCollector(
expire time.Duration,
stringsAsLabel bool,
exportTimestamp bool,
typeMapping serializer.MetricTypes,
logger telegraf.Logger,
) *Collector {
c := &Collector{ c := &Collector{
ExpirationInterval: expire, ExpirationInterval: expire,
StringAsLabel: stringsAsLabel, StringAsLabel: stringsAsLabel,
ExportTimestamp: exportTimestamp, ExportTimestamp: exportTimestamp,
TypeMapping: typeMapping,
Log: logger, Log: logger,
fam: make(map[string]*MetricFamily), fam: make(map[string]*MetricFamily),
} }
@ -176,9 +184,10 @@ func (c *Collector) addMetricFamily(point telegraf.Metric, sample *Sample, mname
var fam *MetricFamily var fam *MetricFamily
var ok bool var ok bool
if fam, ok = c.fam[mname]; !ok { if fam, ok = c.fam[mname]; !ok {
pointType := c.TypeMapping.DetermineType(mname, point)
fam = &MetricFamily{ fam = &MetricFamily{
Samples: make(map[SampleID]*Sample), Samples: make(map[SampleID]*Sample),
TelegrafValueType: point.Type(), TelegrafValueType: pointType,
LabelSet: make(map[string]int), LabelSet: make(map[string]int),
} }
c.fam[mname] = fam c.fam[mname] = fam

View File

@ -43,10 +43,16 @@ type Collector struct {
coll *serializer.Collection coll *serializer.Collection
} }
func NewCollector(expire time.Duration, stringsAsLabel bool, exportTimestamp bool) *Collector { func NewCollector(
expire time.Duration,
stringsAsLabel bool,
exportTimestamp bool,
typeMapping serializer.MetricTypes,
) *Collector {
cfg := serializer.FormatConfig{ cfg := serializer.FormatConfig{
StringAsLabel: stringsAsLabel, StringAsLabel: stringsAsLabel,
ExportTimestamp: exportTimestamp, ExportTimestamp: exportTimestamp,
TypeMappings: typeMapping,
} }
return &Collector{ return &Collector{

View File

@ -40,6 +40,12 @@ reporting others every bucket/quantile will continue to exist.
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "prometheus" data_format = "prometheus"
## Specify the metric type explicitly.
## This overrides the metric-type of the Telegraf metric. Globbing is allowed.
[outputs.file.prometheus_metric_types]
counter = []
gauge = []
``` ```
### Metrics ### Metrics

View File

@ -188,10 +188,11 @@ func (c *Collection) Add(metric telegraf.Metric, now time.Time) {
if !ok { if !ok {
continue continue
} }
metricType := c.config.TypeMappings.DetermineType(metricName, metric)
family := MetricFamily{ family := MetricFamily{
Name: metricName, Name: metricName,
Type: metric.Type(), Type: metricType,
} }
entry, ok := c.Entries[family] entry, ok := c.Entries[family]

View File

@ -2,14 +2,49 @@ package prometheus
import ( import (
"bytes" "bytes"
"fmt"
"time" "time"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers"
) )
type MetricTypes struct {
Counter []string `toml:"counter"`
Gauge []string `toml:"gauge"`
filterCounter filter.Filter
filterGauge filter.Filter
}
func (mt *MetricTypes) Init() error {
// Setup the explicit type mappings
var err error
mt.filterCounter, err = filter.Compile(mt.Counter)
if err != nil {
return fmt.Errorf("creating counter filter failed: %w", err)
}
mt.filterGauge, err = filter.Compile(mt.Gauge)
if err != nil {
return fmt.Errorf("creating gauge filter failed: %w", err)
}
return nil
}
func (mt *MetricTypes) DetermineType(name string, m telegraf.Metric) telegraf.ValueType {
metricType := m.Type()
if mt.filterCounter != nil && mt.filterCounter.Match(name) {
metricType = telegraf.Counter
}
if mt.filterGauge != nil && mt.filterGauge.Match(name) {
metricType = telegraf.Gauge
}
return metricType
}
type FormatConfig struct { type FormatConfig struct {
ExportTimestamp bool `toml:"prometheus_export_timestamp"` ExportTimestamp bool `toml:"prometheus_export_timestamp"`
SortMetrics bool `toml:"prometheus_sort_metrics"` SortMetrics bool `toml:"prometheus_sort_metrics"`
@ -17,13 +52,18 @@ type FormatConfig struct {
// CompactEncoding defines whether to include // CompactEncoding defines whether to include
// HELP metadata in Prometheus payload. Setting to true // HELP metadata in Prometheus payload. Setting to true
// helps to reduce payload size. // helps to reduce payload size.
CompactEncoding bool `toml:"prometheus_compact_encoding"` CompactEncoding bool `toml:"prometheus_compact_encoding"`
TypeMappings MetricTypes `toml:"prometheus_metric_types"`
} }
type Serializer struct { type Serializer struct {
FormatConfig FormatConfig
} }
func (s *Serializer) Init() error {
return s.FormatConfig.TypeMappings.Init()
}
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
return s.SerializeBatch([]telegraf.Metric{metric}) return s.SerializeBatch([]telegraf.Metric{metric})
} }

View File

@ -177,6 +177,48 @@ cpu_time_idle{host="example.org"} 42 1574279268000
expected: []byte(` expected: []byte(`
# TYPE cpu_time_idle untyped # TYPE cpu_time_idle untyped
cpu_time_idle{host="example.org"} 42 cpu_time_idle{host="example.org"} 42
`),
},
{
name: "untyped forced to counter",
config: FormatConfig{
TypeMappings: MetricTypes{Counter: []string{"cpu_time_idle"}},
},
metric: testutil.MustMetric(
"cpu",
map[string]string{
"host": "example.org",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
expected: []byte(`
# HELP cpu_time_idle Telegraf collected metric
# TYPE cpu_time_idle counter
cpu_time_idle{host="example.org"} 42
`),
},
{
name: "untyped forced to gauge",
config: FormatConfig{
TypeMappings: MetricTypes{Gauge: []string{"cpu_time_idle"}},
},
metric: testutil.MustMetric(
"cpu",
map[string]string{
"host": "example.org",
},
map[string]interface{}{
"time_idle": 42.0,
},
time.Unix(0, 0),
),
expected: []byte(`
# HELP cpu_time_idle Telegraf collected metric
# TYPE cpu_time_idle gauge
cpu_time_idle{host="example.org"} 42
`), `),
}, },
} }
@ -188,8 +230,10 @@ cpu_time_idle{host="example.org"} 42
ExportTimestamp: tt.config.ExportTimestamp, ExportTimestamp: tt.config.ExportTimestamp,
StringAsLabel: tt.config.StringAsLabel, StringAsLabel: tt.config.StringAsLabel,
CompactEncoding: tt.config.CompactEncoding, CompactEncoding: tt.config.CompactEncoding,
TypeMappings: tt.config.TypeMappings,
}, },
} }
require.NoError(t, s.Init())
actual, err := s.Serialize(tt.metric) actual, err := s.Serialize(tt.metric)
require.NoError(t, err) require.NoError(t, err)