diff --git a/config/config.go b/config/config.go index 442474366..12573eb5a 100644 --- a/config/config.go +++ b/config/config.go @@ -1478,10 +1478,6 @@ func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error) c.getFieldString(tbl, "template", &sc.Template) c.getFieldStringSlice(tbl, "templates", &sc.Templates) - c.getFieldStringSlice(tbl, "wavefront_source_override", &sc.WavefrontSourceOverride) - c.getFieldBool(tbl, "wavefront_use_strict", &sc.WavefrontUseStrict) - c.getFieldBool(tbl, "wavefront_disable_prefix_conversion", &sc.WavefrontDisablePrefixConversion) - c.getFieldBool(tbl, "prometheus_export_timestamp", &sc.PrometheusExportTimestamp) c.getFieldBool(tbl, "prometheus_sort_metrics", &sc.PrometheusSortMetrics) c.getFieldBool(tbl, "prometheus_string_as_label", &sc.PrometheusStringAsLabel) @@ -1554,8 +1550,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { // Serializer options to ignore case "prefix", "template", "templates", "prometheus_export_timestamp", "prometheus_sort_metrics", "prometheus_string_as_label", - "prometheus_compact_encoding", - "wavefront_disable_prefix_conversion", "wavefront_source_override", "wavefront_use_strict": + "prometheus_compact_encoding": default: c.unusedFieldsMutex.Lock() c.UnusedFields[key] = true diff --git a/plugins/serializers/all/wavefront.go b/plugins/serializers/all/wavefront.go new file mode 100644 index 000000000..2ba7b543c --- /dev/null +++ b/plugins/serializers/all/wavefront.go @@ -0,0 +1,7 @@ +//go:build !custom || serializers || serializers.wavefront + +package all + +import ( + _ "github.com/influxdata/telegraf/plugins/serializers/wavefront" // register plugin +) diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go index ac174e05e..101498df4 100644 --- a/plugins/serializers/registry.go +++ b/plugins/serializers/registry.go @@ -6,7 +6,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/serializers/prometheus" - "github.com/influxdata/telegraf/plugins/serializers/wavefront" ) // Creator is the function to create a new serializer @@ -158,13 +157,6 @@ func NewSerializer(config *Config) (Serializer, error) { var err error var serializer Serializer switch config.DataFormat { - case "wavefront": - serializer, err = NewWavefrontSerializer( - config.Prefix, - config.WavefrontUseStrict, - config.WavefrontSourceOverride, - config.WavefrontDisablePrefixConversion, - ), nil case "prometheus": serializer, err = NewPrometheusSerializer(config), nil default: @@ -209,7 +201,3 @@ func NewPrometheusSerializer(config *Config) Serializer { CompactEncoding: config.PrometheusCompactEncoding, }) } - -func NewWavefrontSerializer(prefix string, useStrict bool, sourceOverride []string, disablePrefixConversions bool) Serializer { - return wavefront.NewSerializer(prefix, useStrict, sourceOverride, disablePrefixConversions) -} diff --git a/plugins/serializers/wavefront/wavefront.go b/plugins/serializers/wavefront/wavefront.go index c99145394..f38c97589 100755 --- a/plugins/serializers/wavefront/wavefront.go +++ b/plugins/serializers/wavefront/wavefront.go @@ -6,16 +6,17 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/serializers" ) -// WavefrontSerializer : WavefrontSerializer struct -type WavefrontSerializer struct { - Prefix string - UseStrict bool - SourceOverride []string - DisablePrefixConversions bool - scratch buffer - mu sync.Mutex // buffer mutex +type Serializer struct { + Prefix string `toml:"prefix"` + UseStrict bool `toml:"wavefront_use_strict"` + SourceOverride []string `toml:"wavefront_source_override"` + DisablePrefixConversions bool `toml:"wavefront_disable_prefix_conversion"` + + scratch buffer + mu sync.Mutex // buffer mutex } type MetricPoint struct { @@ -26,17 +27,7 @@ type MetricPoint struct { Tags map[string]string } -func NewSerializer(prefix string, useStrict bool, sourceOverride []string, disablePrefixConversion bool) *WavefrontSerializer { - s := &WavefrontSerializer{ - Prefix: prefix, - UseStrict: useStrict, - SourceOverride: sourceOverride, - DisablePrefixConversions: disablePrefixConversion, - } - return s -} - -func (s *WavefrontSerializer) serializeMetric(m telegraf.Metric) { +func (s *Serializer) serializeMetric(m telegraf.Metric) { const metricSeparator = "." for fieldName, value := range m.Fields() { @@ -59,7 +50,7 @@ func (s *WavefrontSerializer) serializeMetric(m telegraf.Metric) { // bad value continue to next metric continue } - source, tags := buildTags(m.Tags(), s) + source, tags := s.buildTags(m.Tags()) metric := MetricPoint{ Metric: name, Timestamp: m.Time().Unix(), @@ -72,7 +63,7 @@ func (s *WavefrontSerializer) serializeMetric(m telegraf.Metric) { } // Serialize : Serialize based on Wavefront format -func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { +func (s *Serializer) Serialize(m telegraf.Metric) ([]byte, error) { s.mu.Lock() s.scratch.Reset() s.serializeMetric(m) @@ -81,7 +72,7 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) { return out, nil } -func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { +func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { s.mu.Lock() s.scratch.Reset() for _, m := range metrics { @@ -92,7 +83,7 @@ func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, return out, nil } -func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string { +func (s *Serializer) findSourceTag(mTags map[string]string) string { if src, ok := mTags["source"]; ok { delete(mTags, "source") return src @@ -107,14 +98,14 @@ func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string { return mTags["host"] } -func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) { +func (s *Serializer) buildTags(mTags map[string]string) (string, map[string]string) { // Remove all empty tags. for k, v := range mTags { if v == "" { delete(mTags, k) } } - source := findSourceTag(mTags, s) + source := s.findSourceTag(mTags) delete(mTags, "host") return tagValueReplacer.Replace(source), mTags } @@ -143,7 +134,7 @@ func buildValue(v interface{}, name string) (val float64, valid bool) { } } -func formatMetricPoint(b *buffer, metricPoint *MetricPoint, s *WavefrontSerializer) []byte { +func formatMetricPoint(b *buffer, metricPoint *MetricPoint, s *Serializer) []byte { b.WriteChar('"') b.WriteString(metricPoint.Metric) b.WriteString(`" `) @@ -195,3 +186,21 @@ func (b *buffer) WriteUint64(val uint64) { func (b *buffer) WriteFloat64(val float64) { *b = strconv.AppendFloat(*b, val, 'f', 6, 64) } + +func init() { + serializers.Add("wavefront", + func() serializers.Serializer { + return &Serializer{} + }, + ) +} + +// InitFromConfig is a compatibility function to construct the parser the old way +func (s *Serializer) InitFromConfig(cfg *serializers.Config) error { + s.Prefix = cfg.Prefix + s.UseStrict = cfg.WavefrontUseStrict + s.SourceOverride = cfg.WavefrontSourceOverride + s.DisablePrefixConversions = cfg.WavefrontDisablePrefixConversion + + return nil +} diff --git a/plugins/serializers/wavefront/wavefront_test.go b/plugins/serializers/wavefront/wavefront_test.go index 53bbc6432..a14174256 100755 --- a/plugins/serializers/wavefront/wavefront_test.go +++ b/plugins/serializers/wavefront/wavefront_test.go @@ -65,10 +65,10 @@ func TestBuildTags(t *testing.T) { "ccc", }, } - s := WavefrontSerializer{SourceOverride: []string{"instanceid", "instance-id", "hostname", "snmp_host", "node_host"}} + s := &Serializer{SourceOverride: []string{"instanceid", "instance-id", "hostname", "snmp_host", "node_host"}} for _, tt := range tagTests { - source, tags := buildTags(tt.ptIn, &s) + source, tags := s.buildTags(tt.ptIn) if !reflect.DeepEqual(tags, tt.outTags) { t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags) } @@ -90,10 +90,10 @@ func TestBuildTagsHostTag(t *testing.T) { "snmpHost", }, } - s := WavefrontSerializer{SourceOverride: []string{"snmp_host"}} + s := &Serializer{SourceOverride: []string{"snmp_host"}} for _, tt := range tagTests { - source, tags := buildTags(tt.ptIn, &s) + source, tags := s.buildTags(tt.ptIn) if !reflect.DeepEqual(tags, tt.outTags) { t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags) } @@ -130,10 +130,10 @@ func TestFormatMetricPoint(t *testing.T) { }, } - s := WavefrontSerializer{} + s := &Serializer{} for _, pt := range pointTests { - bout := formatMetricPoint(new(buffer), pt.ptIn, &s) + bout := formatMetricPoint(new(buffer), pt.ptIn, s) sout := string(bout[:]) if sout != pt.out { t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout) @@ -158,10 +158,10 @@ func TestUseStrict(t *testing.T) { }, } - s := WavefrontSerializer{UseStrict: true} + s := &Serializer{UseStrict: true} for _, pt := range pointTests { - bout := formatMetricPoint(new(buffer), pt.ptIn, &s) + bout := formatMetricPoint(new(buffer), pt.ptIn, s) sout := string(bout[:]) if sout != pt.out { t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout) @@ -180,7 +180,7 @@ func TestSerializeMetricFloat(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s := WavefrontSerializer{} + s := &Serializer{} buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -199,7 +199,7 @@ func TestSerializeMetricInt(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s := WavefrontSerializer{} + s := &Serializer{} buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -218,7 +218,7 @@ func TestSerializeMetricBoolTrue(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s := WavefrontSerializer{} + s := &Serializer{} buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -237,7 +237,7 @@ func TestSerializeMetricBoolFalse(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s := WavefrontSerializer{} + s := &Serializer{} buf, _ := s.Serialize(m) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -256,7 +256,7 @@ func TestSerializeMetricFieldValue(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s := WavefrontSerializer{} + s := &Serializer{} buf, err := s.Serialize(m) require.NoError(t, err) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -276,7 +276,7 @@ func TestSerializeMetricPrefix(t *testing.T) { } m := metric.New("cpu", tags, fields, now) - s := WavefrontSerializer{Prefix: "telegraf."} + s := &Serializer{Prefix: "telegraf."} buf, err := s.Serialize(m) require.NoError(t, err) mS := strings.Split(strings.TrimSpace(string(buf)), "\n") @@ -308,7 +308,7 @@ func benchmarkMetrics(b *testing.B) [4]telegraf.Metric { } func BenchmarkSerialize(b *testing.B) { - var s WavefrontSerializer + s := &Serializer{} metrics := benchmarkMetrics(b) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -318,7 +318,7 @@ func BenchmarkSerialize(b *testing.B) { } func BenchmarkSerializeBatch(b *testing.B) { - var s WavefrontSerializer + s := &Serializer{} m := benchmarkMetrics(b) metrics := m[:] b.ResetTimer()