chore(serializers.prometheus): Migrate to new-style framework (#13347)
This commit is contained in:
parent
9771f0805d
commit
5a8ccbde6f
|
|
@ -1478,11 +1478,6 @@ func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error)
|
||||||
c.getFieldString(tbl, "template", &sc.Template)
|
c.getFieldString(tbl, "template", &sc.Template)
|
||||||
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
|
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
|
||||||
|
|
||||||
c.getFieldBool(tbl, "prometheus_export_timestamp", &sc.PrometheusExportTimestamp)
|
|
||||||
c.getFieldBool(tbl, "prometheus_sort_metrics", &sc.PrometheusSortMetrics)
|
|
||||||
c.getFieldBool(tbl, "prometheus_string_as_label", &sc.PrometheusStringAsLabel)
|
|
||||||
c.getFieldBool(tbl, "prometheus_compact_encoding", &sc.PrometheusCompactEncoding)
|
|
||||||
|
|
||||||
if c.hasErrs() {
|
if c.hasErrs() {
|
||||||
return nil, c.firstErr()
|
return nil, c.firstErr()
|
||||||
}
|
}
|
||||||
|
|
@ -1548,9 +1543,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
|
||||||
case "data_type", "influx_parser_type":
|
case "data_type", "influx_parser_type":
|
||||||
|
|
||||||
// Serializer options to ignore
|
// Serializer options to ignore
|
||||||
case "prefix", "template", "templates",
|
case "prefix", "template", "templates":
|
||||||
"prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
|
|
||||||
"prometheus_compact_encoding":
|
|
||||||
default:
|
default:
|
||||||
c.unusedFieldsMutex.Lock()
|
c.unusedFieldsMutex.Lock()
|
||||||
c.UnusedFields[key] = true
|
c.UnusedFields[key] = true
|
||||||
|
|
|
||||||
|
|
@ -44,18 +44,14 @@ type Collector struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCollector(expire time.Duration, stringsAsLabel bool, exportTimestamp bool) *Collector {
|
func NewCollector(expire time.Duration, stringsAsLabel bool, exportTimestamp bool) *Collector {
|
||||||
config := serializer.FormatConfig{}
|
cfg := serializer.FormatConfig{
|
||||||
if stringsAsLabel {
|
StringAsLabel: stringsAsLabel,
|
||||||
config.StringHandling = serializer.StringAsLabel
|
ExportTimestamp: exportTimestamp,
|
||||||
}
|
|
||||||
|
|
||||||
if exportTimestamp {
|
|
||||||
config.TimestampExport = serializer.ExportTimestamp
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Collector{
|
return &Collector{
|
||||||
expireDuration: expire,
|
expireDuration: expire,
|
||||||
coll: serializer.NewCollection(config),
|
coll: serializer.NewCollection(cfg),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
//go:build !custom || serializers || serializers.prometheus
|
||||||
|
|
||||||
|
package all
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/serializers/prometheus" // register plugin
|
||||||
|
)
|
||||||
|
|
@ -145,7 +145,7 @@ func (c *Collection) createLabels(metric telegraf.Metric) []LabelPair {
|
||||||
labels = append(labels, LabelPair{Name: name, Value: tag.Value})
|
labels = append(labels, LabelPair{Name: name, Value: tag.Value})
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.config.StringHandling != StringAsLabel {
|
if !c.config.StringAsLabel {
|
||||||
return labels
|
return labels
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -352,13 +352,13 @@ func (c *Collection) Expire(now time.Time, age time.Duration) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Collection) GetEntries(order MetricSortOrder) []Entry {
|
func (c *Collection) GetEntries() []Entry {
|
||||||
entries := make([]Entry, 0, len(c.Entries))
|
entries := make([]Entry, 0, len(c.Entries))
|
||||||
for _, entry := range c.Entries {
|
for _, entry := range c.Entries {
|
||||||
entries = append(entries, entry)
|
entries = append(entries, entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
if order == SortMetrics {
|
if c.config.SortMetrics {
|
||||||
sort.Slice(entries, func(i, j int) bool {
|
sort.Slice(entries, func(i, j int) bool {
|
||||||
lhs := entries[i].Family
|
lhs := entries[i].Family
|
||||||
rhs := entries[j].Family
|
rhs := entries[j].Family
|
||||||
|
|
@ -372,13 +372,13 @@ func (c *Collection) GetEntries(order MetricSortOrder) []Entry {
|
||||||
return entries
|
return entries
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Collection) GetMetrics(entry Entry, order MetricSortOrder) []*Metric {
|
func (c *Collection) GetMetrics(entry Entry) []*Metric {
|
||||||
metrics := make([]*Metric, 0, len(entry.Metrics))
|
metrics := make([]*Metric, 0, len(entry.Metrics))
|
||||||
for _, metric := range entry.Metrics {
|
for _, metric := range entry.Metrics {
|
||||||
metrics = append(metrics, metric)
|
metrics = append(metrics, metric)
|
||||||
}
|
}
|
||||||
|
|
||||||
if order == SortMetrics {
|
if c.config.SortMetrics {
|
||||||
sort.Slice(metrics, func(i, j int) bool {
|
sort.Slice(metrics, func(i, j int) bool {
|
||||||
lhs := metrics[i].Labels
|
lhs := metrics[i].Labels
|
||||||
rhs := metrics[j].Labels
|
rhs := metrics[j].Labels
|
||||||
|
|
@ -409,7 +409,7 @@ func (c *Collection) GetMetrics(entry Entry, order MetricSortOrder) []*Metric {
|
||||||
func (c *Collection) GetProto() []*dto.MetricFamily {
|
func (c *Collection) GetProto() []*dto.MetricFamily {
|
||||||
result := make([]*dto.MetricFamily, 0, len(c.Entries))
|
result := make([]*dto.MetricFamily, 0, len(c.Entries))
|
||||||
|
|
||||||
for _, entry := range c.GetEntries(c.config.MetricSortOrder) {
|
for _, entry := range c.GetEntries() {
|
||||||
mf := &dto.MetricFamily{
|
mf := &dto.MetricFamily{
|
||||||
Name: proto.String(entry.Family.Name),
|
Name: proto.String(entry.Family.Name),
|
||||||
Type: MetricType(entry.Family.Type),
|
Type: MetricType(entry.Family.Type),
|
||||||
|
|
@ -419,7 +419,7 @@ func (c *Collection) GetProto() []*dto.MetricFamily {
|
||||||
mf.Help = proto.String(helpString)
|
mf.Help = proto.String(helpString)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, metric := range c.GetMetrics(entry, c.config.MetricSortOrder) {
|
for _, metric := range c.GetMetrics(entry) {
|
||||||
l := make([]*dto.LabelPair, 0, len(metric.Labels))
|
l := make([]*dto.LabelPair, 0, len(metric.Labels))
|
||||||
for _, label := range metric.Labels {
|
for _, label := range metric.Labels {
|
||||||
l = append(l, &dto.LabelPair{
|
l = append(l, &dto.LabelPair{
|
||||||
|
|
@ -432,7 +432,7 @@ func (c *Collection) GetProto() []*dto.MetricFamily {
|
||||||
Label: l,
|
Label: l,
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.config.TimestampExport == ExportTimestamp {
|
if c.config.ExportTimestamp {
|
||||||
m.TimestampMs = proto.Int64(metric.Time.UnixNano() / int64(time.Millisecond))
|
m.TimestampMs = proto.Int64(metric.Time.UnixNano() / int64(time.Millisecond))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -830,7 +830,7 @@ func TestExportTimestamps(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
c := NewCollection(FormatConfig{TimestampExport: ExportTimestamp})
|
c := NewCollection(FormatConfig{ExportTimestamp: true})
|
||||||
for _, item := range tt.input {
|
for _, item := range tt.input {
|
||||||
c.Add(item.metric, item.addtime)
|
c.Add(item.metric, item.addtime)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,48 +7,21 @@ import (
|
||||||
"github.com/prometheus/common/expfmt"
|
"github.com/prometheus/common/expfmt"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
)
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
|
|
||||||
// TimestampExport controls if the output contains timestamps.
|
|
||||||
type TimestampExport int
|
|
||||||
|
|
||||||
const (
|
|
||||||
NoExportTimestamp TimestampExport = iota
|
|
||||||
ExportTimestamp
|
|
||||||
)
|
|
||||||
|
|
||||||
// MetricSortOrder controls if the output is sorted.
|
|
||||||
type MetricSortOrder int
|
|
||||||
|
|
||||||
const (
|
|
||||||
NoSortMetrics MetricSortOrder = iota
|
|
||||||
SortMetrics
|
|
||||||
)
|
|
||||||
|
|
||||||
// StringHandling defines how to process string fields.
|
|
||||||
type StringHandling int
|
|
||||||
|
|
||||||
const (
|
|
||||||
DiscardStrings StringHandling = iota
|
|
||||||
StringAsLabel
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type FormatConfig struct {
|
type FormatConfig struct {
|
||||||
TimestampExport TimestampExport
|
ExportTimestamp bool `toml:"prometheus_export_timestamp"`
|
||||||
MetricSortOrder MetricSortOrder
|
SortMetrics bool `toml:"prometheus_sort_metrics"`
|
||||||
StringHandling StringHandling
|
StringAsLabel bool `toml:"prometheus_string_as_label"`
|
||||||
// CompactEncoding defines whether to include
|
// CompactEncoding defines whether to include
|
||||||
// HELP metadata in Prometheus payload. Setting to true
|
// HELP metadata in Prometheus payload. Setting to true
|
||||||
// helps to reduce payload size.
|
// helps to reduce payload size.
|
||||||
CompactEncoding bool
|
CompactEncoding bool `toml:"prometheus_compact_encoding"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Serializer struct {
|
type Serializer struct {
|
||||||
config FormatConfig
|
FormatConfig
|
||||||
}
|
|
||||||
|
|
||||||
func NewSerializer(config FormatConfig) *Serializer {
|
|
||||||
return &Serializer{config: config}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
|
|
@ -56,7 +29,7 @@ func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
coll := NewCollection(s.config)
|
coll := NewCollection(s.FormatConfig)
|
||||||
for _, metric := range metrics {
|
for _, metric := range metrics {
|
||||||
coll.Add(metric, time.Now())
|
coll.Add(metric, time.Now())
|
||||||
}
|
}
|
||||||
|
|
@ -72,3 +45,21 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
|
|
||||||
return buf.Bytes(), nil
|
return buf.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
serializers.Add("prometheus",
|
||||||
|
func() serializers.Serializer {
|
||||||
|
return &Serializer{}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||||
|
func (s *Serializer) InitFromConfig(cfg *serializers.Config) error {
|
||||||
|
s.FormatConfig.CompactEncoding = cfg.PrometheusCompactEncoding
|
||||||
|
s.FormatConfig.SortMetrics = cfg.PrometheusSortMetrics
|
||||||
|
s.FormatConfig.StringAsLabel = cfg.PrometheusStringAsLabel
|
||||||
|
s.FormatConfig.ExportTimestamp = cfg.PrometheusExportTimestamp
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -141,7 +141,7 @@ http_request_duration_seconds_count 0
|
||||||
{
|
{
|
||||||
name: "simple with timestamp",
|
name: "simple with timestamp",
|
||||||
config: FormatConfig{
|
config: FormatConfig{
|
||||||
TimestampExport: ExportTimestamp,
|
ExportTimestamp: true,
|
||||||
},
|
},
|
||||||
metric: testutil.MustMetric(
|
metric: testutil.MustMetric(
|
||||||
"cpu",
|
"cpu",
|
||||||
|
|
@ -182,12 +182,14 @@ cpu_time_idle{host="example.org"} 42
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
s := NewSerializer(FormatConfig{
|
s := &Serializer{
|
||||||
MetricSortOrder: SortMetrics,
|
FormatConfig{
|
||||||
TimestampExport: tt.config.TimestampExport,
|
SortMetrics: true,
|
||||||
StringHandling: tt.config.StringHandling,
|
ExportTimestamp: tt.config.ExportTimestamp,
|
||||||
CompactEncoding: tt.config.CompactEncoding,
|
StringAsLabel: tt.config.StringAsLabel,
|
||||||
})
|
CompactEncoding: tt.config.CompactEncoding,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
actual, err := s.Serialize(tt.metric)
|
actual, err := s.Serialize(tt.metric)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -536,7 +538,7 @@ cpu_time_idle 42
|
||||||
{
|
{
|
||||||
name: "string as label",
|
name: "string as label",
|
||||||
config: FormatConfig{
|
config: FormatConfig{
|
||||||
StringHandling: StringAsLabel,
|
StringAsLabel: true,
|
||||||
},
|
},
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
|
|
@ -558,7 +560,7 @@ cpu_time_idle{cpu="cpu0"} 42
|
||||||
{
|
{
|
||||||
name: "string as label duplicate tag",
|
name: "string as label duplicate tag",
|
||||||
config: FormatConfig{
|
config: FormatConfig{
|
||||||
StringHandling: StringAsLabel,
|
StringAsLabel: true,
|
||||||
},
|
},
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
|
|
@ -582,7 +584,7 @@ cpu_time_idle{cpu="cpu0"} 42
|
||||||
{
|
{
|
||||||
name: "replace characters when using string as label",
|
name: "replace characters when using string as label",
|
||||||
config: FormatConfig{
|
config: FormatConfig{
|
||||||
StringHandling: StringAsLabel,
|
StringAsLabel: true,
|
||||||
},
|
},
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
|
|
@ -698,11 +700,13 @@ rpc_duration_seconds_count 2693
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
s := NewSerializer(FormatConfig{
|
s := &Serializer{
|
||||||
MetricSortOrder: SortMetrics,
|
FormatConfig{
|
||||||
TimestampExport: tt.config.TimestampExport,
|
SortMetrics: true,
|
||||||
StringHandling: tt.config.StringHandling,
|
ExportTimestamp: tt.config.ExportTimestamp,
|
||||||
})
|
StringAsLabel: tt.config.StringAsLabel,
|
||||||
|
},
|
||||||
|
}
|
||||||
actual, err := s.SerializeBatch(tt.metrics)
|
actual, err := s.SerializeBatch(tt.metrics)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Creator is the function to create a new serializer
|
// Creator is the function to create a new serializer
|
||||||
|
|
@ -154,50 +153,18 @@ type Config struct {
|
||||||
|
|
||||||
// NewSerializer a Serializer interface based on the given config.
|
// NewSerializer a Serializer interface based on the given config.
|
||||||
func NewSerializer(config *Config) (Serializer, error) {
|
func NewSerializer(config *Config) (Serializer, error) {
|
||||||
var err error
|
creator, found := Serializers[config.DataFormat]
|
||||||
var serializer Serializer
|
if !found {
|
||||||
switch config.DataFormat {
|
return nil, fmt.Errorf("invalid data format: %s", config.DataFormat)
|
||||||
case "prometheus":
|
|
||||||
serializer, err = NewPrometheusSerializer(config), nil
|
|
||||||
default:
|
|
||||||
creator, found := Serializers[config.DataFormat]
|
|
||||||
if !found {
|
|
||||||
return nil, fmt.Errorf("invalid data format: %s", config.DataFormat)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to create new-style serializers the old way...
|
|
||||||
serializer := creator()
|
|
||||||
p, ok := serializer.(SerializerCompatibility)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("serializer for %q cannot be created the old way", config.DataFormat)
|
|
||||||
}
|
|
||||||
err := p.InitFromConfig(config)
|
|
||||||
|
|
||||||
return serializer, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Try to create new-style serializers the old way...
|
||||||
|
serializer := creator()
|
||||||
|
p, ok := serializer.(SerializerCompatibility)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("serializer for %q cannot be created the old way", config.DataFormat)
|
||||||
|
}
|
||||||
|
err := p.InitFromConfig(config)
|
||||||
|
|
||||||
return serializer, err
|
return serializer, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPrometheusSerializer(config *Config) Serializer {
|
|
||||||
exportTimestamp := prometheus.NoExportTimestamp
|
|
||||||
if config.PrometheusExportTimestamp {
|
|
||||||
exportTimestamp = prometheus.ExportTimestamp
|
|
||||||
}
|
|
||||||
|
|
||||||
sortMetrics := prometheus.NoSortMetrics
|
|
||||||
if config.PrometheusExportTimestamp {
|
|
||||||
sortMetrics = prometheus.SortMetrics
|
|
||||||
}
|
|
||||||
|
|
||||||
stringAsLabels := prometheus.DiscardStrings
|
|
||||||
if config.PrometheusStringAsLabel {
|
|
||||||
stringAsLabels = prometheus.StringAsLabel
|
|
||||||
}
|
|
||||||
|
|
||||||
return prometheus.NewSerializer(prometheus.FormatConfig{
|
|
||||||
TimestampExport: exportTimestamp,
|
|
||||||
MetricSortOrder: sortMetrics,
|
|
||||||
StringHandling: stringAsLabels,
|
|
||||||
CompactEncoding: config.PrometheusCompactEncoding,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue