chore(serializers.graphite): Migrate to new-style framework (#13321)

This commit is contained in:
Sven Rebhan 2023-05-24 16:49:41 +02:00 committed by GitHub
parent 272add9b84
commit bbe30f769d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 262 additions and 168 deletions

View File

@ -1481,11 +1481,6 @@ func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error)
c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields) c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields)
c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport) c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport)
c.getFieldString(tbl, "graphite_strict_sanitize_regex", &sc.GraphiteStrictRegex)
c.getFieldBool(tbl, "graphite_tag_support", &sc.GraphiteTagSupport)
c.getFieldString(tbl, "graphite_tag_sanitize_mode", &sc.GraphiteTagSanitizeMode)
c.getFieldString(tbl, "graphite_separator", &sc.GraphiteSeparator)
c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits) c.getFieldDuration(tbl, "json_timestamp_units", &sc.TimestampUnits)
c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat) c.getFieldString(tbl, "json_timestamp_format", &sc.TimestampFormat)
c.getFieldString(tbl, "json_transformation", &sc.Transformation) c.getFieldString(tbl, "json_transformation", &sc.Transformation)
@ -1571,8 +1566,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
// Serializer options to ignore // Serializer options to ignore
case "prefix", "template", "templates", case "prefix", "template", "templates",
"graphite_strict_sanitize_regex",
"graphite_tag_sanitize_mode", "graphite_tag_support", "graphite_separator",
"influx_max_line_bytes", "influx_sort_fields", "influx_uint_support", "influx_max_line_bytes", "influx_sort_fields", "influx_uint_support",
"json_timestamp_format", "json_timestamp_units", "json_transformation", "json_timestamp_format", "json_timestamp_units", "json_transformation",
"json_nested_fields_include", "json_nested_fields_exclude", "json_nested_fields_include", "json_nested_fields_exclude",

View File

@ -15,7 +15,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
tlsint "github.com/influxdata/telegraf/plugins/common/tls" tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf/plugins/serializers/graphite"
) )
//go:embed sample.conf //go:embed sample.conf
@ -37,12 +37,32 @@ type Graphite struct {
conns []net.Conn conns []net.Conn
tlsint.ClientConfig tlsint.ClientConfig
failedServers []string failedServers []string
serializer *graphite.GraphiteSerializer
} }
func (*Graphite) SampleConfig() string { func (*Graphite) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (g *Graphite) Init() error {
s := &graphite.GraphiteSerializer{
Prefix: g.Prefix,
Template: g.Template,
StrictRegex: g.GraphiteStrictRegex,
TagSupport: g.GraphiteTagSupport,
TagSanitizeMode: g.GraphiteTagSanitizeMode,
Separator: g.GraphiteSeparator,
Templates: g.Templates,
}
if err := s.Init(); err != nil {
return err
}
g.serializer = s
return nil
}
func (g *Graphite) Connect() error { func (g *Graphite) Connect() error {
// Set default values // Set default values
if g.Timeout <= 0 { if g.Timeout <= 0 {
@ -168,28 +188,15 @@ func (g *Graphite) checkEOF(conn net.Conn) error {
func (g *Graphite) Write(metrics []telegraf.Metric) error { func (g *Graphite) Write(metrics []telegraf.Metric) error {
// Prepare data // Prepare data
var batch []byte var batch []byte
s, err := serializers.NewGraphiteSerializer(
g.Prefix,
g.Template,
g.GraphiteStrictRegex,
g.GraphiteTagSupport,
g.GraphiteTagSanitizeMode,
g.GraphiteSeparator,
g.Templates,
)
if err != nil {
return err
}
for _, metric := range metrics { for _, metric := range metrics {
buf, err := s.Serialize(metric) buf, err := g.serializer.Serialize(metric)
if err != nil { if err != nil {
g.Log.Errorf("Error serializing some metrics to graphite: %s", err.Error()) g.Log.Errorf("Error serializing some metrics to graphite: %s", err.Error())
} }
batch = append(batch, buf...) batch = append(batch, buf...)
} }
err = g.send(batch) err := g.send(batch)
// If a send failed for a server, try to reconnect to that server // If a send failed for a server, try to reconnect to that server
if len(g.failedServers) > 0 { if len(g.failedServers) > 0 {

View File

@ -22,6 +22,8 @@ func TestGraphiteError(t *testing.T) {
Prefix: "my.prefix", Prefix: "my.prefix",
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
require.NoError(t, g.Init())
// Init metrics // Init metrics
m1 := metric.New( m1 := metric.New(
"mymeasurement", "mymeasurement",
@ -56,6 +58,7 @@ func TestGraphiteReconnect(t *testing.T) {
Log: testutil.Logger{}, Log: testutil.Logger{},
GraphiteStrictRegex: `[^a-zA-Z0-9-:._=|\p{L}]`, GraphiteStrictRegex: `[^a-zA-Z0-9-:._=|\p{L}]`,
} }
require.NoError(t, g.Init())
t.Log("Writing metric, without any server up, expected to fail") t.Log("Writing metric, without any server up, expected to fail")
require.NoError(t, g.Connect()) require.NoError(t, g.Connect())
@ -98,6 +101,7 @@ func TestGraphiteOK(t *testing.T) {
Servers: []string{"localhost:12003"}, Servers: []string{"localhost:12003"},
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
require.NoError(t, g.Init())
// Init metrics // Init metrics
m1 := metric.New( m1 := metric.New(
@ -179,6 +183,7 @@ func TestGraphiteStrictRegex(t *testing.T) {
Log: testutil.Logger{}, Log: testutil.Logger{},
GraphiteStrictRegex: `[^a-zA-Z0-9-:._=|\p{L}]`, GraphiteStrictRegex: `[^a-zA-Z0-9-:._=|\p{L}]`,
} }
require.NoError(t, g.Init())
require.NoError(t, g.Connect()) require.NoError(t, g.Connect())
require.NoError(t, g.Write([]telegraf.Metric{m})) require.NoError(t, g.Write([]telegraf.Metric{m}))
@ -200,6 +205,7 @@ func TestGraphiteOkWithSeparatorDot(t *testing.T) {
Servers: []string{"localhost:12003"}, Servers: []string{"localhost:12003"},
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
require.NoError(t, g.Init())
// Init metrics // Init metrics
m1 := metric.New( m1 := metric.New(
@ -263,6 +269,7 @@ func TestGraphiteOkWithSeparatorUnderscore(t *testing.T) {
Servers: []string{"localhost:12003"}, Servers: []string{"localhost:12003"},
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
require.NoError(t, g.Init())
// Init metrics // Init metrics
m1 := metric.New( m1 := metric.New(
@ -330,6 +337,7 @@ func TestGraphiteOKWithMultipleTemplates(t *testing.T) {
Servers: []string{"localhost:12003"}, Servers: []string{"localhost:12003"},
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
require.NoError(t, g.Init())
// Init metrics // Init metrics
m1 := metric.New( m1 := metric.New(
@ -393,6 +401,7 @@ func TestGraphiteOkWithTags(t *testing.T) {
Servers: []string{"localhost:12003"}, Servers: []string{"localhost:12003"},
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
require.NoError(t, g.Init())
// Init metrics // Init metrics
m1 := metric.New( m1 := metric.New(
@ -457,6 +466,7 @@ func TestGraphiteOkWithTagsAndSeparatorDot(t *testing.T) {
Servers: []string{"localhost:12003"}, Servers: []string{"localhost:12003"},
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
require.NoError(t, g.Init())
// Init metrics // Init metrics
m1 := metric.New( m1 := metric.New(
@ -521,6 +531,7 @@ func TestGraphiteOkWithTagsAndSeparatorUnderscore(t *testing.T) {
Servers: []string{"localhost:12003"}, Servers: []string{"localhost:12003"},
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
require.NoError(t, g.Init())
// Init metrics // Init metrics
m1 := metric.New( m1 := metric.New(

View File

@ -15,7 +15,6 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/graphite"
) )
@ -39,7 +38,8 @@ type Instrumental struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
conn net.Conn conn net.Conn
serializer *graphite.GraphiteSerializer
} }
const ( const (
@ -53,6 +53,22 @@ func (*Instrumental) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (i *Instrumental) Init() error {
s := &graphite.GraphiteSerializer{
Prefix: i.Prefix,
Template: i.Template,
TagSanitizeMode: "strict",
Separator: ".",
Templates: i.Templates,
}
if err := s.Init(); err != nil {
return err
}
i.serializer = s
return nil
}
func (i *Instrumental) Connect() error { func (i *Instrumental) Connect() error {
connection, err := net.DialTimeout("tcp", i.Host+":8000", time.Duration(i.Timeout)) connection, err := net.DialTimeout("tcp", i.Host+":8000", time.Duration(i.Timeout))
@ -84,11 +100,6 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
} }
} }
s, err := serializers.NewGraphiteSerializer(i.Prefix, i.Template, "", false, "strict", ".", i.Templates)
if err != nil {
return err
}
var points []string var points []string
var metricType string var metricType string
@ -107,7 +118,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
metricType = m.Tags()["metric_type"] metricType = m.Tags()["metric_type"]
m.RemoveTag("metric_type") m.RemoveTag("metric_type")
buf, err := s.Serialize(m) buf, err := i.serializer.Serialize(m)
if err != nil { if err != nil {
i.Log.Debugf("Could not serialize metric: %v", err) i.Log.Debugf("Could not serialize metric: %v", err)
continue continue
@ -146,9 +157,7 @@ func (i *Instrumental) Write(metrics []telegraf.Metric) error {
} }
allPoints := strings.Join(points, "") allPoints := strings.Join(points, "")
_, err = fmt.Fprint(i.conn, allPoints) if _, err := fmt.Fprint(i.conn, allPoints); err != nil {
if err != nil {
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
_ = i.Close() _ = i.Close()
} }

View File

@ -25,6 +25,7 @@ func TestWrite(t *testing.T) {
APIToken: config.NewSecret([]byte("abc123token")), APIToken: config.NewSecret([]byte("abc123token")),
Prefix: "my.prefix", Prefix: "my.prefix",
} }
require.NoError(t, i.Init())
// Default to gauge // Default to gauge
m1 := metric.New( m1 := metric.New(

View File

@ -233,7 +233,9 @@ func TestContentType(t *testing.T) {
s.headers = map[string]string{ s.headers = map[string]string{
contentTypeHeader: graphiteContentType, contentTypeHeader: graphiteContentType,
} }
s.SetSerializer(&graphite.GraphiteSerializer{}) serializer := &graphite.GraphiteSerializer{}
require.NoError(t, serializer.Init())
s.SetSerializer(serializer)
return s return s
}, },
}, },

View File

@ -0,0 +1,7 @@
//go:build !custom || serializers || serializers.graphite
package all
import (
_ "github.com/influxdata/telegraf/plugins/serializers/graphite" // register plugin
)

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/serializers"
) )
const DefaultTemplate = "host.tags.measurement.field" const DefaultTemplate = "host.tags.measurement.field"
@ -38,20 +39,51 @@ type GraphiteTemplate struct {
} }
type GraphiteSerializer struct { type GraphiteSerializer struct {
Prefix string `json:"prefix"` Prefix string `toml:"prefix"`
Template string `json:"template"` Template string `toml:"template"`
StrictAllowedChars *regexp.Regexp `json:"graphite_strict_sanitize_regex"` StrictRegex string `toml:"graphite_strict_sanitize_regex"`
TagSupport bool `json:"graphite_tag_support"` TagSupport bool `toml:"graphite_tag_support"`
TagSanitizeMode string `json:"graphite_tag_sanitize_mode"` TagSanitizeMode string `toml:"graphite_tag_sanitize_mode"`
Separator string `json:"graphite_separator"` Separator string `toml:"graphite_separator"`
Templates []*GraphiteTemplate `json:"templates"` Templates []string `toml:"templates"`
tmplts []*GraphiteTemplate
strictAllowedChars *regexp.Regexp
}
func (s *GraphiteSerializer) Init() error {
graphiteTemplates, defaultTemplate, err := InitGraphiteTemplates(s.Templates)
if err != nil {
return err
}
s.tmplts = graphiteTemplates
if defaultTemplate != "" {
s.Template = defaultTemplate
}
if s.TagSanitizeMode == "" {
s.TagSanitizeMode = "strict"
}
if s.Separator == "" {
s.Separator = "."
}
if s.StrictRegex == "" {
s.strictAllowedChars = regexp.MustCompile(`[^a-zA-Z0-9-:._=\p{L}]`)
} else {
var err error
s.strictAllowedChars, err = regexp.Compile(s.StrictRegex)
if err != nil {
return fmt.Errorf("invalid regex provided %q: %w", s.StrictRegex, err)
}
}
return nil
} }
func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) { func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
if s.StrictAllowedChars == nil {
s.StrictAllowedChars = regexp.MustCompile(`[^a-zA-Z0-9-:._=\p{L}]`)
}
out := []byte{} out := []byte{}
// Convert UnixNano to Unix timestamps // Convert UnixNano to Unix timestamps
@ -76,7 +108,7 @@ func (s *GraphiteSerializer) Serialize(metric telegraf.Metric) ([]byte, error) {
} }
default: default:
template := s.Template template := s.Template
for _, graphiteTemplate := range s.Templates { for _, graphiteTemplate := range s.tmplts {
if graphiteTemplate.Filter.Match(metric.Name()) { if graphiteTemplate.Filter.Match(metric.Name()) {
template = graphiteTemplate.Value template = graphiteTemplate.Value
break break
@ -326,7 +358,7 @@ func (s *GraphiteSerializer) strictSanitize(value string) string {
// Apply rule to drop some chars to preserve backwards compatibility // Apply rule to drop some chars to preserve backwards compatibility
value = dropChars.Replace(value) value = dropChars.Replace(value)
// Replace any remaining illegal chars // Replace any remaining illegal chars
return s.StrictAllowedChars.ReplaceAllLiteralString(value, "_") return s.strictAllowedChars.ReplaceAllLiteralString(value, "_")
} }
func compatibleSanitize(name string, value string) string { func compatibleSanitize(name string, value string) string {
@ -335,3 +367,23 @@ func compatibleSanitize(name string, value string) string {
value = compatibleLeadingTildeDrop.FindStringSubmatch(value)[1] value = compatibleLeadingTildeDrop.FindStringSubmatch(value)[1]
return name + "=" + value return name + "=" + value
} }
func init() {
serializers.Add("graphite",
func() serializers.Serializer {
return &GraphiteSerializer{}
},
)
}
// InitFromConfig is a compatibility function to construct the parser the old way
func (s *GraphiteSerializer) InitFromConfig(cfg *serializers.Config) error {
s.Prefix = cfg.Prefix
s.Templates = cfg.Templates
s.StrictRegex = cfg.GraphiteStrictRegex
s.TagSupport = cfg.GraphiteTagSupport
s.TagSanitizeMode = cfg.GraphiteTagSanitizeMode
s.Separator = cfg.GraphiteSeparator
return nil
}

View File

@ -2,7 +2,6 @@ package graphite
import ( import (
"fmt" "fmt"
"regexp"
"sort" "sort"
"strings" "strings"
"testing" "testing"
@ -73,7 +72,9 @@ func TestSerializeMetricNoHost(t *testing.T) {
m := metric.New("cpu", tags, fields, now) m := metric.New("cpu", tags, fields, now)
s := GraphiteSerializer{} s := GraphiteSerializer{}
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -101,7 +102,9 @@ func TestSerializeMetricNoHostWithTagSupport(t *testing.T) {
TagSupport: true, TagSupport: true,
Separator: ".", Separator: ".",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -127,7 +130,10 @@ func TestSerializeMetricHost(t *testing.T) {
m := metric.New("cpu", tags, fields, now) m := metric.New("cpu", tags, fields, now)
s := GraphiteSerializer{} s := GraphiteSerializer{}
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -153,24 +159,23 @@ func TestSerializeMetricHostWithMultipleTemplates(t *testing.T) {
m1 := metric.New("cpu", tags, fields, now) m1 := metric.New("cpu", tags, fields, now)
m2 := metric.New("new_cpu", tags, fields, now) m2 := metric.New("new_cpu", tags, fields, now)
templates, defaultTemplate, err := InitGraphiteTemplates([]string{
"cp* tags.measurement.host.field",
"new_cpu tags.host.measurement.field",
})
require.NoError(t, err)
require.Equal(t, defaultTemplate, "")
s := GraphiteSerializer{ s := GraphiteSerializer{
Templates: templates, Templates: []string{
"cp* tags.measurement.host.field",
"new_cpu tags.host.measurement.field",
},
} }
require.NoError(t, s.Init())
require.Empty(t, s.Template)
buf, _ := s.Serialize(m1) buf, err := s.Serialize(m1)
buf2, _ := s.Serialize(m2) require.NoError(t, err)
buf2, err := s.Serialize(m2)
require.NoError(t, err)
buf = append(buf, buf2...) buf = append(buf, buf2...)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
require.NoError(t, err)
expS := []string{ expS := []string{
fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_idle 91.5 %d", now.Unix()), fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_idle 91.5 %d", now.Unix()),
@ -197,25 +202,23 @@ func TestSerializeMetricHostWithMultipleTemplatesWithDefault(t *testing.T) {
m1 := metric.New("cpu", tags, fields, now) m1 := metric.New("cpu", tags, fields, now)
m2 := metric.New("new_cpu", tags, fields, now) m2 := metric.New("new_cpu", tags, fields, now)
templates, defaultTemplate, err := InitGraphiteTemplates([]string{
"cp* tags.measurement.host.field",
"tags.host.measurement.field",
})
require.NoError(t, err)
require.Equal(t, defaultTemplate, "tags.host.measurement.field")
s := GraphiteSerializer{ s := GraphiteSerializer{
Templates: templates, Templates: []string{
Template: defaultTemplate, "cp* tags.measurement.host.field",
"tags.host.measurement.field",
},
} }
require.NoError(t, s.Init())
require.Equal(t, s.Template, "tags.host.measurement.field")
buf, _ := s.Serialize(m1) buf, err := s.Serialize(m1)
buf2, _ := s.Serialize(m2) require.NoError(t, err)
buf2, err := s.Serialize(m2)
require.NoError(t, err)
buf = append(buf, buf2...) buf = append(buf, buf2...)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
require.NoError(t, err)
expS := []string{ expS := []string{
fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_idle 91.5 %d", now.Unix()), fmt.Sprintf("cpu0.us-west-2.cpu.localhost.usage_idle 91.5 %d", now.Unix()),
@ -245,7 +248,10 @@ func TestSerializeMetricHostWithTagSupport(t *testing.T) {
TagSupport: true, TagSupport: true,
Separator: ".", Separator: ".",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -271,7 +277,10 @@ func TestSerializeValueField(t *testing.T) {
m := metric.New("cpu", tags, fields, now) m := metric.New("cpu", tags, fields, now)
s := GraphiteSerializer{} s := GraphiteSerializer{}
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -296,7 +305,10 @@ func TestSerializeValueFieldWithTagSupport(t *testing.T) {
TagSupport: true, TagSupport: true,
Separator: ".", Separator: ".",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -321,7 +333,10 @@ func TestSerializeValueField2(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
Template: "host.field.tags.measurement", Template: "host.field.tags.measurement",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -345,7 +360,10 @@ func TestSerializeValueString(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
Template: "host.field.tags.measurement", Template: "host.field.tags.measurement",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
require.Equal(t, "", mS[0]) require.Equal(t, "", mS[0])
} }
@ -366,7 +384,10 @@ func TestSerializeValueStringWithTagSupport(t *testing.T) {
TagSupport: true, TagSupport: true,
Separator: ".", Separator: ".",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
require.Equal(t, "", mS[0]) require.Equal(t, "", mS[0])
} }
@ -387,7 +408,10 @@ func TestSerializeValueBoolean(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
Template: "host.field.tags.measurement", Template: "host.field.tags.measurement",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -416,7 +440,10 @@ func TestSerializeValueBooleanWithTagSupport(t *testing.T) {
TagSupport: true, TagSupport: true,
Separator: ".", Separator: ".",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -437,6 +464,8 @@ func TestSerializeValueUnsigned(t *testing.T) {
m := metric.New("mem", tags, fields, now) m := metric.New("mem", tags, fields, now)
s := GraphiteSerializer{} s := GraphiteSerializer{}
require.NoError(t, s.Init())
buf, err := s.Serialize(m) buf, err := s.Serialize(m)
require.NoError(t, err) require.NoError(t, err)
@ -459,7 +488,10 @@ func TestSerializeFieldWithSpaces(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
Template: "host.tags.measurement.field", Template: "host.tags.measurement.field",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -484,7 +516,10 @@ func TestSerializeFieldWithSpacesWithTagSupport(t *testing.T) {
TagSupport: true, TagSupport: true,
Separator: ".", Separator: ".",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -509,7 +544,10 @@ func TestSerializeTagWithSpaces(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
Template: "host.tags.measurement.field", Template: "host.tags.measurement.field",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -534,7 +572,10 @@ func TestSerializeTagWithSpacesWithTagSupport(t *testing.T) {
TagSupport: true, TagSupport: true,
Separator: ".", Separator: ".",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -560,7 +601,10 @@ func TestSerializeTagWithSpacesWithTagSupportCompatibleSanitize(t *testing.T) {
TagSanitizeMode: "compatible", TagSanitizeMode: "compatible",
Separator: ".", Separator: ".",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -585,7 +629,10 @@ func TestSerializeValueField3(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
Template: "field.host.tags.measurement", Template: "field.host.tags.measurement",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -610,7 +657,10 @@ func TestSerializeValueField5(t *testing.T) {
s := GraphiteSerializer{ s := GraphiteSerializer{
Template: template5, Template: template5,
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -633,7 +683,10 @@ func TestSerializeMetricPrefix(t *testing.T) {
m := metric.New("cpu", tags, fields, now) m := metric.New("cpu", tags, fields, now)
s := GraphiteSerializer{Prefix: "prefix"} s := GraphiteSerializer{Prefix: "prefix"}
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -663,7 +716,10 @@ func TestSerializeMetricPrefixWithTagSupport(t *testing.T) {
TagSupport: true, TagSupport: true,
Separator: ".", Separator: ".",
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -689,9 +745,12 @@ func TestSerializeCustomRegex(t *testing.T) {
m := metric.New("cpu", tags, fields, now) m := metric.New("cpu", tags, fields, now)
s := GraphiteSerializer{ s := GraphiteSerializer{
StrictAllowedChars: regexp.MustCompile(`[^a-zA-Z0-9-:._=|\p{L}]`), StrictRegex: `[^a-zA-Z0-9-:._=|\p{L}]`,
} }
buf, _ := s.Serialize(m) require.NoError(t, s.Init())
buf, err := s.Serialize(m)
require.NoError(t, err)
mS := strings.Split(strings.TrimSpace(string(buf)), "\n") mS := strings.Split(strings.TrimSpace(string(buf)), "\n")
expS := []string{ expS := []string{
@ -889,11 +948,14 @@ func TestClean(t *testing.T) {
}, },
} }
s := GraphiteSerializer{}
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
s := GraphiteSerializer{}
require.NoError(t, s.Init())
m := metric.New(tt.metricName, tt.tags, tt.fields, now) m := metric.New(tt.metricName, tt.tags, tt.fields, now)
actual, _ := s.Serialize(m) actual, err := s.Serialize(m)
require.NoError(t, err)
require.Equal(t, tt.expected, string(actual)) require.Equal(t, tt.expected, string(actual))
}) })
} }
@ -980,12 +1042,14 @@ func TestCleanWithTagsSupport(t *testing.T) {
}, },
} }
s := GraphiteSerializer{
TagSupport: true,
Separator: ".",
}
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
s := GraphiteSerializer{
TagSupport: true,
Separator: ".",
}
require.NoError(t, s.Init())
m := metric.New(tt.metricName, tt.tags, tt.fields, now) m := metric.New(tt.metricName, tt.tags, tt.fields, now)
actual, _ := s.Serialize(m) actual, _ := s.Serialize(m)
require.Equal(t, tt.expected, string(actual)) require.Equal(t, tt.expected, string(actual))
@ -1074,13 +1138,15 @@ func TestCleanWithTagsSupportCompatibleSanitize(t *testing.T) {
}, },
} }
s := GraphiteSerializer{
TagSupport: true,
TagSanitizeMode: "compatible",
Separator: ".",
}
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
s := GraphiteSerializer{
TagSupport: true,
TagSanitizeMode: "compatible",
Separator: ".",
}
require.NoError(t, s.Init())
m := metric.New(tt.metricName, tt.tags, tt.fields, now) m := metric.New(tt.metricName, tt.tags, tt.fields, now)
actual, _ := s.Serialize(m) actual, _ := s.Serialize(m)
require.Equal(t, tt.expected, string(actual)) require.Equal(t, tt.expected, string(actual))
@ -1106,9 +1172,11 @@ func TestSerializeBatch(t *testing.T) {
}, },
} }
s := GraphiteSerializer{}
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
s := GraphiteSerializer{}
require.NoError(t, s.Init())
m := metric.New(tt.metricName, tt.tags, tt.fields, now) m := metric.New(tt.metricName, tt.tags, tt.fields, now)
actual, _ := s.SerializeBatch([]telegraf.Metric{m, m}) actual, _ := s.SerializeBatch([]telegraf.Metric{m, m})
require.Equal(t, tt.expected, string(actual)) require.Equal(t, tt.expected, string(actual))
@ -1134,12 +1202,14 @@ func TestSerializeBatchWithTagsSupport(t *testing.T) {
}, },
} }
s := GraphiteSerializer{
TagSupport: true,
Separator: ".",
}
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
s := GraphiteSerializer{
TagSupport: true,
Separator: ".",
}
require.NoError(t, s.Init())
m := metric.New(tt.metricName, tt.tags, tt.fields, now) m := metric.New(tt.metricName, tt.tags, tt.fields, now)
actual, _ := s.SerializeBatch([]telegraf.Metric{m, m}) actual, _ := s.SerializeBatch([]telegraf.Metric{m, m})
require.Equal(t, tt.expected, string(actual)) require.Equal(t, tt.expected, string(actual))

View File

@ -2,11 +2,9 @@ package serializers
import ( import (
"fmt" "fmt"
"regexp"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/json" "github.com/influxdata/telegraf/plugins/serializers/json"
"github.com/influxdata/telegraf/plugins/serializers/msgpack" "github.com/influxdata/telegraf/plugins/serializers/msgpack"
"github.com/influxdata/telegraf/plugins/serializers/nowmetric" "github.com/influxdata/telegraf/plugins/serializers/nowmetric"
@ -165,16 +163,6 @@ func NewSerializer(config *Config) (Serializer, error) {
var err error var err error
var serializer Serializer var serializer Serializer
switch config.DataFormat { switch config.DataFormat {
case "graphite":
serializer, err = NewGraphiteSerializer(
config.Prefix,
config.Template,
config.GraphiteStrictRegex,
config.GraphiteTagSupport,
config.GraphiteTagSanitizeMode,
config.GraphiteSeparator,
config.Templates,
)
case "json": case "json":
serializer, err = NewJSONSerializer(config) serializer, err = NewJSONSerializer(config)
case "splunkmetric": case "splunkmetric":
@ -276,52 +264,6 @@ func NewNowSerializer() (Serializer, error) {
return nowmetric.NewSerializer() return nowmetric.NewSerializer()
} }
//nolint:revive //argument-limit conditionally more arguments allowed
func NewGraphiteSerializer(
prefix,
template string,
strictRegex string,
tagSupport bool,
tagSanitizeMode string,
separator string,
templates []string,
) (Serializer, error) {
graphiteTemplates, defaultTemplate, err := graphite.InitGraphiteTemplates(templates)
if err != nil {
return nil, err
}
if defaultTemplate != "" {
template = defaultTemplate
}
if tagSanitizeMode == "" {
tagSanitizeMode = "strict"
}
if separator == "" {
separator = "."
}
strictAllowedChars := regexp.MustCompile(`[^a-zA-Z0-9-:._=\p{L}]`)
if strictRegex != "" {
strictAllowedChars, err = regexp.Compile(strictRegex)
if err != nil {
return nil, fmt.Errorf("invalid regex provided %q: %w", strictRegex, err)
}
}
return &graphite.GraphiteSerializer{
Prefix: prefix,
Template: template,
StrictAllowedChars: strictAllowedChars,
TagSupport: tagSupport,
TagSanitizeMode: tagSanitizeMode,
Separator: separator,
Templates: graphiteTemplates,
}, nil
}
func NewMsgpackSerializer() Serializer { func NewMsgpackSerializer() Serializer {
return msgpack.NewSerializer() return msgpack.NewSerializer()
} }