chore(serializers.wavefront): Migrate to new-style framework (#13344)
This commit is contained in:
parent
b20a3ac77b
commit
238478bc9c
|
|
@ -1478,10 +1478,6 @@ func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error)
|
||||||
c.getFieldString(tbl, "template", &sc.Template)
|
c.getFieldString(tbl, "template", &sc.Template)
|
||||||
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
|
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
|
||||||
|
|
||||||
c.getFieldStringSlice(tbl, "wavefront_source_override", &sc.WavefrontSourceOverride)
|
|
||||||
c.getFieldBool(tbl, "wavefront_use_strict", &sc.WavefrontUseStrict)
|
|
||||||
c.getFieldBool(tbl, "wavefront_disable_prefix_conversion", &sc.WavefrontDisablePrefixConversion)
|
|
||||||
|
|
||||||
c.getFieldBool(tbl, "prometheus_export_timestamp", &sc.PrometheusExportTimestamp)
|
c.getFieldBool(tbl, "prometheus_export_timestamp", &sc.PrometheusExportTimestamp)
|
||||||
c.getFieldBool(tbl, "prometheus_sort_metrics", &sc.PrometheusSortMetrics)
|
c.getFieldBool(tbl, "prometheus_sort_metrics", &sc.PrometheusSortMetrics)
|
||||||
c.getFieldBool(tbl, "prometheus_string_as_label", &sc.PrometheusStringAsLabel)
|
c.getFieldBool(tbl, "prometheus_string_as_label", &sc.PrometheusStringAsLabel)
|
||||||
|
|
@ -1554,8 +1550,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
|
||||||
// Serializer options to ignore
|
// Serializer options to ignore
|
||||||
case "prefix", "template", "templates",
|
case "prefix", "template", "templates",
|
||||||
"prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
|
"prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label",
|
||||||
"prometheus_compact_encoding",
|
"prometheus_compact_encoding":
|
||||||
"wavefront_disable_prefix_conversion", "wavefront_source_override", "wavefront_use_strict":
|
|
||||||
default:
|
default:
|
||||||
c.unusedFieldsMutex.Lock()
|
c.unusedFieldsMutex.Lock()
|
||||||
c.UnusedFields[key] = true
|
c.UnusedFields[key] = true
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
//go:build !custom || serializers || serializers.wavefront
|
||||||
|
|
||||||
|
package all
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/serializers/wavefront" // register plugin
|
||||||
|
)
|
||||||
|
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/wavefront"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Creator is the function to create a new serializer
|
// Creator is the function to create a new serializer
|
||||||
|
|
@ -158,13 +157,6 @@ func NewSerializer(config *Config) (Serializer, error) {
|
||||||
var err error
|
var err error
|
||||||
var serializer Serializer
|
var serializer Serializer
|
||||||
switch config.DataFormat {
|
switch config.DataFormat {
|
||||||
case "wavefront":
|
|
||||||
serializer, err = NewWavefrontSerializer(
|
|
||||||
config.Prefix,
|
|
||||||
config.WavefrontUseStrict,
|
|
||||||
config.WavefrontSourceOverride,
|
|
||||||
config.WavefrontDisablePrefixConversion,
|
|
||||||
), nil
|
|
||||||
case "prometheus":
|
case "prometheus":
|
||||||
serializer, err = NewPrometheusSerializer(config), nil
|
serializer, err = NewPrometheusSerializer(config), nil
|
||||||
default:
|
default:
|
||||||
|
|
@ -209,7 +201,3 @@ func NewPrometheusSerializer(config *Config) Serializer {
|
||||||
CompactEncoding: config.PrometheusCompactEncoding,
|
CompactEncoding: config.PrometheusCompactEncoding,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []string, disablePrefixConversions bool) Serializer {
|
|
||||||
return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions)
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -6,16 +6,17 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WavefrontSerializer : WavefrontSerializer struct
|
type Serializer struct {
|
||||||
type WavefrontSerializer struct {
|
Prefix string `toml:"prefix"`
|
||||||
Prefix string
|
UseStrict bool `toml:"wavefront_use_strict"`
|
||||||
UseStrict bool
|
SourceOverride []string `toml:"wavefront_source_override"`
|
||||||
SourceOverride []string
|
DisablePrefixConversions bool `toml:"wavefront_disable_prefix_conversion"`
|
||||||
DisablePrefixConversions bool
|
|
||||||
scratch buffer
|
scratch buffer
|
||||||
mu sync.Mutex // buffer mutex
|
mu sync.Mutex // buffer mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetricPoint struct {
|
type MetricPoint struct {
|
||||||
|
|
@ -26,17 +27,7 @@ type MetricPoint struct {
|
||||||
Tags map[string]string
|
Tags map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSerializer(prefix string, useStrict bool, sourceOverride []string, disablePrefixConversion bool) *WavefrontSerializer {
|
func (s *Serializer) serializeMetric(m telegraf.Metric) {
|
||||||
s := &WavefrontSerializer{
|
|
||||||
Prefix: prefix,
|
|
||||||
UseStrict: useStrict,
|
|
||||||
SourceOverride: sourceOverride,
|
|
||||||
DisablePrefixConversions: disablePrefixConversion,
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *WavefrontSerializer) serializeMetric(m telegraf.Metric) {
|
|
||||||
const metricSeparator = "."
|
const metricSeparator = "."
|
||||||
|
|
||||||
for fieldName, value := range m.Fields() {
|
for fieldName, value := range m.Fields() {
|
||||||
|
|
@ -59,7 +50,7 @@ func (s *WavefrontSerializer) serializeMetric(m telegraf.Metric) {
|
||||||
// bad value continue to next metric
|
// bad value continue to next metric
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
source, tags := buildTags(m.Tags(), s)
|
source, tags := s.buildTags(m.Tags())
|
||||||
metric := MetricPoint{
|
metric := MetricPoint{
|
||||||
Metric: name,
|
Metric: name,
|
||||||
Timestamp: m.Time().Unix(),
|
Timestamp: m.Time().Unix(),
|
||||||
|
|
@ -72,7 +63,7 @@ func (s *WavefrontSerializer) serializeMetric(m telegraf.Metric) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serialize : Serialize based on Wavefront format
|
// Serialize : Serialize based on Wavefront format
|
||||||
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.scratch.Reset()
|
s.scratch.Reset()
|
||||||
s.serializeMetric(m)
|
s.serializeMetric(m)
|
||||||
|
|
@ -81,7 +72,7 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.scratch.Reset()
|
s.scratch.Reset()
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
|
@ -92,7 +83,7 @@ func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte,
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string {
|
func (s *Serializer) findSourceTag(mTags map[string]string) string {
|
||||||
if src, ok := mTags["source"]; ok {
|
if src, ok := mTags["source"]; ok {
|
||||||
delete(mTags, "source")
|
delete(mTags, "source")
|
||||||
return src
|
return src
|
||||||
|
|
@ -107,14 +98,14 @@ func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string {
|
||||||
return mTags["host"]
|
return mTags["host"]
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) {
|
func (s *Serializer) buildTags(mTags map[string]string) (string, map[string]string) {
|
||||||
// Remove all empty tags.
|
// Remove all empty tags.
|
||||||
for k, v := range mTags {
|
for k, v := range mTags {
|
||||||
if v == "" {
|
if v == "" {
|
||||||
delete(mTags, k)
|
delete(mTags, k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
source := findSourceTag(mTags, s)
|
source := s.findSourceTag(mTags)
|
||||||
delete(mTags, "host")
|
delete(mTags, "host")
|
||||||
return tagValueReplacer.Replace(source), mTags
|
return tagValueReplacer.Replace(source), mTags
|
||||||
}
|
}
|
||||||
|
|
@ -143,7 +134,7 @@ func buildValue(v interface{}, name string) (val float64, valid bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func formatMetricPoint(b *buffer, metricPoint *MetricPoint, s *WavefrontSerializer) []byte {
|
func formatMetricPoint(b *buffer, metricPoint *MetricPoint, s *Serializer) []byte {
|
||||||
b.WriteChar('"')
|
b.WriteChar('"')
|
||||||
b.WriteString(metricPoint.Metric)
|
b.WriteString(metricPoint.Metric)
|
||||||
b.WriteString(`" `)
|
b.WriteString(`" `)
|
||||||
|
|
@ -195,3 +186,21 @@ func (b *buffer) WriteUint64(val uint64) {
|
||||||
func (b *buffer) WriteFloat64(val float64) {
|
func (b *buffer) WriteFloat64(val float64) {
|
||||||
*b = strconv.AppendFloat(*b, val, 'f', 6, 64)
|
*b = strconv.AppendFloat(*b, val, 'f', 6, 64)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
serializers.Add("wavefront",
|
||||||
|
func() serializers.Serializer {
|
||||||
|
return &Serializer{}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||||
|
func (s *Serializer) InitFromConfig(cfg *serializers.Config) error {
|
||||||
|
s.Prefix = cfg.Prefix
|
||||||
|
s.UseStrict = cfg.WavefrontUseStrict
|
||||||
|
s.SourceOverride = cfg.WavefrontSourceOverride
|
||||||
|
s.DisablePrefixConversions = cfg.WavefrontDisablePrefixConversion
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -65,10 +65,10 @@ func TestBuildTags(t *testing.T) {
|
||||||
"ccc",
|
"ccc",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
s := WavefrontSerializer{SourceOverride: []string{"instanceid", "instance-id", "hostname", "snmp_host", "node_host"}}
|
s := &Serializer{SourceOverride: []string{"instanceid", "instance-id", "hostname", "snmp_host", "node_host"}}
|
||||||
|
|
||||||
for _, tt := range tagTests {
|
for _, tt := range tagTests {
|
||||||
source, tags := buildTags(tt.ptIn, &s)
|
source, tags := s.buildTags(tt.ptIn)
|
||||||
if !reflect.DeepEqual(tags, tt.outTags) {
|
if !reflect.DeepEqual(tags, tt.outTags) {
|
||||||
t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags)
|
t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags)
|
||||||
}
|
}
|
||||||
|
|
@ -90,10 +90,10 @@ func TestBuildTagsHostTag(t *testing.T) {
|
||||||
"snmpHost",
|
"snmpHost",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
s := WavefrontSerializer{SourceOverride: []string{"snmp_host"}}
|
s := &Serializer{SourceOverride: []string{"snmp_host"}}
|
||||||
|
|
||||||
for _, tt := range tagTests {
|
for _, tt := range tagTests {
|
||||||
source, tags := buildTags(tt.ptIn, &s)
|
source, tags := s.buildTags(tt.ptIn)
|
||||||
if !reflect.DeepEqual(tags, tt.outTags) {
|
if !reflect.DeepEqual(tags, tt.outTags) {
|
||||||
t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags)
|
t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags)
|
||||||
}
|
}
|
||||||
|
|
@ -130,10 +130,10 @@ func TestFormatMetricPoint(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
s := WavefrontSerializer{}
|
s := &Serializer{}
|
||||||
|
|
||||||
for _, pt := range pointTests {
|
for _, pt := range pointTests {
|
||||||
bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
|
bout := formatMetricPoint(new(buffer), pt.ptIn, s)
|
||||||
sout := string(bout[:])
|
sout := string(bout[:])
|
||||||
if sout != pt.out {
|
if sout != pt.out {
|
||||||
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
|
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
|
||||||
|
|
@ -158,10 +158,10 @@ func TestUseStrict(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
s := WavefrontSerializer{UseStrict: true}
|
s := &Serializer{UseStrict: true}
|
||||||
|
|
||||||
for _, pt := range pointTests {
|
for _, pt := range pointTests {
|
||||||
bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
|
bout := formatMetricPoint(new(buffer), pt.ptIn, s)
|
||||||
sout := string(bout[:])
|
sout := string(bout[:])
|
||||||
if sout != pt.out {
|
if sout != pt.out {
|
||||||
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
|
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
|
||||||
|
|
@ -180,7 +180,7 @@ func TestSerializeMetricFloat(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
s := WavefrontSerializer{}
|
s := &Serializer{}
|
||||||
buf, _ := s.Serialize(m)
|
buf, _ := s.Serialize(m)
|
||||||
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
|
||||||
|
|
@ -199,7 +199,7 @@ func TestSerializeMetricInt(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
s := WavefrontSerializer{}
|
s := &Serializer{}
|
||||||
buf, _ := s.Serialize(m)
|
buf, _ := s.Serialize(m)
|
||||||
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
|
||||||
|
|
@ -218,7 +218,7 @@ func TestSerializeMetricBoolTrue(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
s := WavefrontSerializer{}
|
s := &Serializer{}
|
||||||
buf, _ := s.Serialize(m)
|
buf, _ := s.Serialize(m)
|
||||||
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
|
||||||
|
|
@ -237,7 +237,7 @@ func TestSerializeMetricBoolFalse(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
s := WavefrontSerializer{}
|
s := &Serializer{}
|
||||||
buf, _ := s.Serialize(m)
|
buf, _ := s.Serialize(m)
|
||||||
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
|
||||||
|
|
@ -256,7 +256,7 @@ func TestSerializeMetricFieldValue(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
s := WavefrontSerializer{}
|
s := &Serializer{}
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
|
@ -276,7 +276,7 @@ func TestSerializeMetricPrefix(t *testing.T) {
|
||||||
}
|
}
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
s := WavefrontSerializer{Prefix: "telegraf."}
|
s := &Serializer{Prefix: "telegraf."}
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
|
||||||
|
|
@ -308,7 +308,7 @@ func benchmarkMetrics(b *testing.B) [4]telegraf.Metric {
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkSerialize(b *testing.B) {
|
func BenchmarkSerialize(b *testing.B) {
|
||||||
var s WavefrontSerializer
|
s := &Serializer{}
|
||||||
metrics := benchmarkMetrics(b)
|
metrics := benchmarkMetrics(b)
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
|
|
@ -318,7 +318,7 @@ func BenchmarkSerialize(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkSerializeBatch(b *testing.B) {
|
func BenchmarkSerializeBatch(b *testing.B) {
|
||||||
var s WavefrontSerializer
|
s := &Serializer{}
|
||||||
m := benchmarkMetrics(b)
|
m := benchmarkMetrics(b)
|
||||||
metrics := m[:]
|
metrics := m[:]
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue