chore(serializers.prometheusremotewrite): Migrate to new-style framework (#13341)
This commit is contained in:
parent
d769495276
commit
7a521ff73b
|
|
@ -0,0 +1,7 @@
|
||||||
|
//go:build !custom || serializers || serializers.prometheusremotewrite
|
||||||
|
|
||||||
|
package all
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/serializers/prometheusremotewrite" // register plugin
|
||||||
|
)
|
||||||
|
|
@ -13,38 +13,15 @@ import (
|
||||||
"github.com/prometheus/prometheus/prompb"
|
"github.com/prometheus/prometheus/prompb"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MetricKey uint64
|
type MetricKey uint64
|
||||||
|
|
||||||
// MetricSortOrder controls if the output is sorted.
|
|
||||||
type MetricSortOrder int
|
|
||||||
|
|
||||||
const (
|
|
||||||
NoSortMetrics MetricSortOrder = iota
|
|
||||||
SortMetrics
|
|
||||||
)
|
|
||||||
|
|
||||||
// StringHandling defines how to process string fields.
|
|
||||||
type StringHandling int
|
|
||||||
|
|
||||||
const (
|
|
||||||
DiscardStrings StringHandling = iota
|
|
||||||
StringAsLabel
|
|
||||||
)
|
|
||||||
|
|
||||||
type FormatConfig struct {
|
|
||||||
MetricSortOrder MetricSortOrder
|
|
||||||
StringHandling StringHandling
|
|
||||||
}
|
|
||||||
|
|
||||||
type Serializer struct {
|
type Serializer struct {
|
||||||
config FormatConfig
|
SortMetrics bool `toml:"prometheus_sort_metrics"`
|
||||||
}
|
StringAsLabel bool `toml:"prometheus_string_as_label"`
|
||||||
|
|
||||||
func NewSerializer(config FormatConfig) *Serializer {
|
|
||||||
return &Serializer{config: config}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
|
|
@ -204,7 +181,7 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.config.MetricSortOrder == SortMetrics {
|
if s.SortMetrics {
|
||||||
sort.Slice(promTS, func(i, j int) bool {
|
sort.Slice(promTS, func(i, j int) bool {
|
||||||
lhs := promTS[i].Labels
|
lhs := promTS[i].Labels
|
||||||
rhs := promTS[j].Labels
|
rhs := promTS[j].Labels
|
||||||
|
|
@ -274,7 +251,7 @@ func (s *Serializer) appendCommonLabels(labels []prompb.Label, metric telegraf.M
|
||||||
labels = append(labels, prompb.Label{Name: name, Value: tag.Value})
|
labels = append(labels, prompb.Label{Name: name, Value: tag.Value})
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.config.StringHandling != StringAsLabel {
|
if !s.StringAsLabel {
|
||||||
return labels
|
return labels
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -342,3 +319,19 @@ func (sl sortableLabels) Less(i, j int) bool {
|
||||||
func (sl sortableLabels) Swap(i, j int) {
|
func (sl sortableLabels) Swap(i, j int) {
|
||||||
sl[i], sl[j] = sl[j], sl[i]
|
sl[i], sl[j] = sl[j], sl[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
serializers.Add("prometheusremotewrite",
|
||||||
|
func() serializers.Serializer {
|
||||||
|
return &Serializer{}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||||
|
func (s *Serializer) InitFromConfig(cfg *serializers.Config) error {
|
||||||
|
s.SortMetrics = cfg.PrometheusSortMetrics
|
||||||
|
s.StringAsLabel = cfg.PrometheusStringAsLabel
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ func BenchmarkRemoteWrite(b *testing.B) {
|
||||||
time.Unix(0, 0),
|
time.Unix(0, 0),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
s := NewSerializer(FormatConfig{})
|
s := &Serializer{}
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
_, _ = s.SerializeBatch(batch)
|
_, _ = s.SerializeBatch(batch)
|
||||||
}
|
}
|
||||||
|
|
@ -41,7 +41,6 @@ func BenchmarkRemoteWrite(b *testing.B) {
|
||||||
func TestRemoteWriteSerialize(t *testing.T) {
|
func TestRemoteWriteSerialize(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
config FormatConfig
|
|
||||||
metric telegraf.Metric
|
metric telegraf.Metric
|
||||||
expected []byte
|
expected []byte
|
||||||
}{
|
}{
|
||||||
|
|
@ -186,10 +185,9 @@ http_request_duration_seconds_bucket{le="0.5"} 129389
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
s := NewSerializer(FormatConfig{
|
s := &Serializer{
|
||||||
MetricSortOrder: SortMetrics,
|
SortMetrics: true,
|
||||||
StringHandling: tt.config.StringHandling,
|
}
|
||||||
})
|
|
||||||
data, err := s.Serialize(tt.metric)
|
data, err := s.Serialize(tt.metric)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
actual, err := prompbToText(data)
|
actual, err := prompbToText(data)
|
||||||
|
|
@ -203,10 +201,10 @@ http_request_duration_seconds_bucket{le="0.5"} 129389
|
||||||
|
|
||||||
func TestRemoteWriteSerializeBatch(t *testing.T) {
|
func TestRemoteWriteSerializeBatch(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
config FormatConfig
|
metrics []telegraf.Metric
|
||||||
metrics []telegraf.Metric
|
stringAsLabel bool
|
||||||
expected []byte
|
expected []byte
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "simple",
|
name: "simple",
|
||||||
|
|
@ -515,11 +513,8 @@ cpu_time_idle 42
|
||||||
`),
|
`),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "string as label",
|
name: "string as label",
|
||||||
config: FormatConfig{
|
stringAsLabel: true,
|
||||||
MetricSortOrder: SortMetrics,
|
|
||||||
StringHandling: StringAsLabel,
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu",
|
"cpu",
|
||||||
|
|
@ -536,11 +531,8 @@ cpu_time_idle{cpu="cpu0"} 42
|
||||||
`),
|
`),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "string as label duplicate tag",
|
name: "string as label duplicate tag",
|
||||||
config: FormatConfig{
|
stringAsLabel: true,
|
||||||
MetricSortOrder: SortMetrics,
|
|
||||||
StringHandling: StringAsLabel,
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu",
|
"cpu",
|
||||||
|
|
@ -559,11 +551,8 @@ cpu_time_idle{cpu="cpu0"} 42
|
||||||
`),
|
`),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "replace characters when using string as label",
|
name: "replace characters when using string as label",
|
||||||
config: FormatConfig{
|
stringAsLabel: true,
|
||||||
MetricSortOrder: SortMetrics,
|
|
||||||
StringHandling: StringAsLabel,
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"cpu",
|
"cpu",
|
||||||
|
|
@ -666,11 +655,8 @@ rpc_duration_seconds_sum 17560473
|
||||||
`),
|
`),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "empty label string value",
|
name: "empty label string value",
|
||||||
config: FormatConfig{
|
stringAsLabel: true,
|
||||||
MetricSortOrder: SortMetrics,
|
|
||||||
StringHandling: StringAsLabel,
|
|
||||||
},
|
|
||||||
metrics: []telegraf.Metric{
|
metrics: []telegraf.Metric{
|
||||||
testutil.MustMetric(
|
testutil.MustMetric(
|
||||||
"prometheus",
|
"prometheus",
|
||||||
|
|
@ -690,10 +676,10 @@ rpc_duration_seconds_sum 17560473
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
s := NewSerializer(FormatConfig{
|
s := &Serializer{
|
||||||
MetricSortOrder: SortMetrics,
|
SortMetrics: true,
|
||||||
StringHandling: tt.config.StringHandling,
|
StringAsLabel: tt.stringAsLabel,
|
||||||
})
|
}
|
||||||
data, err := s.SerializeBatch(tt.metrics)
|
data, err := s.SerializeBatch(tt.metrics)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
actual, err := prompbToText(data)
|
actual, err := prompbToText(data)
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/prometheusremotewrite"
|
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/splunkmetric"
|
"github.com/influxdata/telegraf/plugins/serializers/splunkmetric"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/wavefront"
|
"github.com/influxdata/telegraf/plugins/serializers/wavefront"
|
||||||
)
|
)
|
||||||
|
|
@ -171,8 +170,6 @@ func NewSerializer(config *Config) (Serializer, error) {
|
||||||
), nil
|
), nil
|
||||||
case "prometheus":
|
case "prometheus":
|
||||||
serializer, err = NewPrometheusSerializer(config), nil
|
serializer, err = NewPrometheusSerializer(config), nil
|
||||||
case "prometheusremotewrite":
|
|
||||||
serializer, err = NewPrometheusRemoteWriteSerializer(config), nil
|
|
||||||
default:
|
default:
|
||||||
creator, found := Serializers[config.DataFormat]
|
creator, found := Serializers[config.DataFormat]
|
||||||
if !found {
|
if !found {
|
||||||
|
|
@ -192,23 +189,6 @@ func NewSerializer(config *Config) (Serializer, error) {
|
||||||
return serializer, err
|
return serializer, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPrometheusRemoteWriteSerializer(config *Config) Serializer {
|
|
||||||
sortMetrics := prometheusremotewrite.NoSortMetrics
|
|
||||||
if config.PrometheusExportTimestamp {
|
|
||||||
sortMetrics = prometheusremotewrite.SortMetrics
|
|
||||||
}
|
|
||||||
|
|
||||||
stringAsLabels := prometheusremotewrite.DiscardStrings
|
|
||||||
if config.PrometheusStringAsLabel {
|
|
||||||
stringAsLabels = prometheusremotewrite.StringAsLabel
|
|
||||||
}
|
|
||||||
|
|
||||||
return prometheusremotewrite.NewSerializer(prometheusremotewrite.FormatConfig{
|
|
||||||
MetricSortOrder: sortMetrics,
|
|
||||||
StringHandling: stringAsLabels,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPrometheusSerializer(config *Config) Serializer {
|
func NewPrometheusSerializer(config *Config) Serializer {
|
||||||
exportTimestamp := prometheus.NoExportTimestamp
|
exportTimestamp := prometheus.NoExportTimestamp
|
||||||
if config.PrometheusExportTimestamp {
|
if config.PrometheusExportTimestamp {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue