chore(serializers.carbon2): Migrate to new-style framework (#13291)
This commit is contained in:
parent
ed72510fe3
commit
d06fb73228
|
|
@ -1452,8 +1452,6 @@ func (c *Config) buildSerializerOld(tbl *ast.Table) (telegraf.Serializer, error)
|
||||||
c.getFieldString(tbl, "prefix", &sc.Prefix)
|
c.getFieldString(tbl, "prefix", &sc.Prefix)
|
||||||
c.getFieldString(tbl, "template", &sc.Template)
|
c.getFieldString(tbl, "template", &sc.Template)
|
||||||
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
|
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
|
||||||
c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format)
|
|
||||||
c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar)
|
|
||||||
c.getFieldBool(tbl, "csv_column_prefix", &sc.CSVPrefix)
|
c.getFieldBool(tbl, "csv_column_prefix", &sc.CSVPrefix)
|
||||||
c.getFieldBool(tbl, "csv_header", &sc.CSVHeader)
|
c.getFieldBool(tbl, "csv_header", &sc.CSVHeader)
|
||||||
c.getFieldString(tbl, "csv_separator", &sc.CSVSeparator)
|
c.getFieldString(tbl, "csv_separator", &sc.CSVSeparator)
|
||||||
|
|
@ -1552,7 +1550,6 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
|
||||||
|
|
||||||
// Serializer options to ignore
|
// Serializer options to ignore
|
||||||
case "prefix", "template", "templates",
|
case "prefix", "template", "templates",
|
||||||
"carbon2_format", "carbon2_sanitize_replace_char",
|
|
||||||
"csv_column_prefix", "csv_header", "csv_separator", "csv_timestamp_format",
|
"csv_column_prefix", "csv_header", "csv_separator", "csv_timestamp_format",
|
||||||
"graphite_strict_sanitize_regex",
|
"graphite_strict_sanitize_regex",
|
||||||
"graphite_tag_sanitize_mode", "graphite_tag_support", "graphite_separator",
|
"graphite_tag_sanitize_mode", "graphite_tag_support", "graphite_separator",
|
||||||
|
|
|
||||||
|
|
@ -610,8 +610,10 @@ func TestConfig_SerializerInterfaceNewFormat(t *testing.T) {
|
||||||
|
|
||||||
var serializer telegraf.Serializer
|
var serializer telegraf.Serializer
|
||||||
if creator, found := serializers.Serializers[format]; found {
|
if creator, found := serializers.Serializers[format]; found {
|
||||||
|
t.Logf("new-style %q", format)
|
||||||
serializer = creator()
|
serializer = creator()
|
||||||
} else {
|
} else {
|
||||||
|
t.Logf("old-style %q", format)
|
||||||
var err error
|
var err error
|
||||||
serializer, err = serializers.NewSerializer(formatCfg)
|
serializer, err = serializers.NewSerializer(formatCfg)
|
||||||
require.NoErrorf(t, err, "No serializer for format %q", format)
|
require.NoErrorf(t, err, "No serializer for format %q", format)
|
||||||
|
|
@ -699,8 +701,10 @@ func TestConfig_SerializerInterfaceOldFormat(t *testing.T) {
|
||||||
|
|
||||||
var serializer serializers.Serializer
|
var serializer serializers.Serializer
|
||||||
if creator, found := serializers.Serializers[format]; found {
|
if creator, found := serializers.Serializers[format]; found {
|
||||||
|
t.Logf("new-style %q", format)
|
||||||
serializer = creator()
|
serializer = creator()
|
||||||
} else {
|
} else {
|
||||||
|
t.Logf("old-style %q", format)
|
||||||
var err error
|
var err error
|
||||||
serializer, err = serializers.NewSerializer(formatCfg)
|
serializer, err = serializers.NewSerializer(formatCfg)
|
||||||
require.NoErrorf(t, err, "No serializer for format %q", format)
|
require.NoErrorf(t, err, "No serializer for format %q", format)
|
||||||
|
|
|
||||||
|
|
@ -71,22 +71,13 @@ func (s *SumoLogic) SetSerializer(serializer serializers.Serializer) {
|
||||||
s.headers = make(map[string]string)
|
s.headers = make(map[string]string)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch sr := serializer.(type) {
|
switch serializer.(type) {
|
||||||
case *carbon2.Serializer:
|
case *carbon2.Serializer:
|
||||||
s.headers[contentTypeHeader] = carbon2ContentType
|
s.headers[contentTypeHeader] = carbon2ContentType
|
||||||
|
|
||||||
// In case Carbon2 is used and the metrics format was unset, default to
|
|
||||||
// include field in metric name.
|
|
||||||
if sr.IsMetricsFormatUnset() {
|
|
||||||
sr.SetMetricsFormat(carbon2.Carbon2FormatMetricIncludesField)
|
|
||||||
}
|
|
||||||
|
|
||||||
case *graphite.GraphiteSerializer:
|
case *graphite.GraphiteSerializer:
|
||||||
s.headers[contentTypeHeader] = graphiteContentType
|
s.headers[contentTypeHeader] = graphiteContentType
|
||||||
|
|
||||||
case *prometheus.Serializer:
|
case *prometheus.Serializer:
|
||||||
s.headers[contentTypeHeader] = prometheusContentType
|
s.headers[contentTypeHeader] = prometheusContentType
|
||||||
|
|
||||||
default:
|
default:
|
||||||
s.err = fmt.Errorf("unsupported serializer %T", serializer)
|
s.err = fmt.Errorf("unsupported serializer %T", serializer)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -94,8 +94,10 @@ func TestMethod(t *testing.T) {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
})
|
})
|
||||||
|
|
||||||
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
|
serializer := &carbon2.Serializer{
|
||||||
require.NoError(t, err)
|
Format: "field_separate",
|
||||||
|
}
|
||||||
|
require.NoError(t, serializer.Init())
|
||||||
|
|
||||||
plugin := tt.plugin()
|
plugin := tt.plugin()
|
||||||
plugin.SetSerializer(serializer)
|
plugin.SetSerializer(serializer)
|
||||||
|
|
@ -171,8 +173,10 @@ func TestStatusCode(t *testing.T) {
|
||||||
w.WriteHeader(tt.statusCode)
|
w.WriteHeader(tt.statusCode)
|
||||||
})
|
})
|
||||||
|
|
||||||
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
|
serializer := &carbon2.Serializer{
|
||||||
require.NoError(t, err)
|
Format: "field_separate",
|
||||||
|
}
|
||||||
|
require.NoError(t, serializer.Init())
|
||||||
|
|
||||||
tt.plugin.SetSerializer(serializer)
|
tt.plugin.SetSerializer(serializer)
|
||||||
err = tt.plugin.Connect()
|
err = tt.plugin.Connect()
|
||||||
|
|
@ -197,9 +201,11 @@ func TestContentType(t *testing.T) {
|
||||||
s.headers = map[string]string{
|
s.headers = map[string]string{
|
||||||
contentTypeHeader: carbon2ContentType,
|
contentTypeHeader: carbon2ContentType,
|
||||||
}
|
}
|
||||||
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
|
serializer := &carbon2.Serializer{
|
||||||
require.NoError(t, err)
|
Format: "field_separate",
|
||||||
s.SetSerializer(sr)
|
}
|
||||||
|
require.NoError(t, serializer.Init())
|
||||||
|
s.SetSerializer(serializer)
|
||||||
return s
|
return s
|
||||||
},
|
},
|
||||||
expectedBody: []byte("metric=cpu field=value 42 0\n"),
|
expectedBody: []byte("metric=cpu field=value 42 0\n"),
|
||||||
|
|
@ -211,9 +217,11 @@ func TestContentType(t *testing.T) {
|
||||||
s.headers = map[string]string{
|
s.headers = map[string]string{
|
||||||
contentTypeHeader: carbon2ContentType,
|
contentTypeHeader: carbon2ContentType,
|
||||||
}
|
}
|
||||||
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField), carbon2.DefaultSanitizeReplaceChar)
|
serializer := &carbon2.Serializer{
|
||||||
require.NoError(t, err)
|
Format: "metric_includes_field",
|
||||||
s.SetSerializer(sr)
|
}
|
||||||
|
require.NoError(t, serializer.Init())
|
||||||
|
s.SetSerializer(serializer)
|
||||||
return s
|
return s
|
||||||
},
|
},
|
||||||
expectedBody: []byte("metric=cpu_value 42 0\n"),
|
expectedBody: []byte("metric=cpu_value 42 0\n"),
|
||||||
|
|
@ -316,8 +324,10 @@ func TestContentEncodingGzip(t *testing.T) {
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
})
|
})
|
||||||
|
|
||||||
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
|
serializer := &carbon2.Serializer{
|
||||||
require.NoError(t, err)
|
Format: "field_separate",
|
||||||
|
}
|
||||||
|
require.NoError(t, serializer.Init())
|
||||||
|
|
||||||
plugin := tt.plugin()
|
plugin := tt.plugin()
|
||||||
|
|
||||||
|
|
@ -349,8 +359,10 @@ func TestDefaultUserAgent(t *testing.T) {
|
||||||
MaxRequstBodySize: Default().MaxRequstBodySize,
|
MaxRequstBodySize: Default().MaxRequstBodySize,
|
||||||
}
|
}
|
||||||
|
|
||||||
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
|
serializer := &carbon2.Serializer{
|
||||||
require.NoError(t, err)
|
Format: "field_separate",
|
||||||
|
}
|
||||||
|
require.NoError(t, serializer.Init())
|
||||||
|
|
||||||
plugin.SetSerializer(serializer)
|
plugin.SetSerializer(serializer)
|
||||||
err = plugin.Connect()
|
err = plugin.Connect()
|
||||||
|
|
@ -598,8 +610,10 @@ func TestMaxRequestBodySize(t *testing.T) {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
})
|
})
|
||||||
|
|
||||||
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
|
serializer := &carbon2.Serializer{
|
||||||
require.NoError(t, err)
|
Format: "field_separate",
|
||||||
|
}
|
||||||
|
require.NoError(t, serializer.Init())
|
||||||
|
|
||||||
plugin := tt.plugin()
|
plugin := tt.plugin()
|
||||||
plugin.SetSerializer(serializer)
|
plugin.SetSerializer(serializer)
|
||||||
|
|
@ -631,8 +645,10 @@ func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) {
|
||||||
plugin := Default()
|
plugin := Default()
|
||||||
plugin.URL = u.String()
|
plugin.URL = u.String()
|
||||||
|
|
||||||
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
|
serializer := &carbon2.Serializer{
|
||||||
require.NoError(t, err)
|
Format: "field_separate",
|
||||||
|
}
|
||||||
|
require.NoError(t, serializer.Init())
|
||||||
plugin.SetSerializer(serializer)
|
plugin.SetSerializer(serializer)
|
||||||
|
|
||||||
err = plugin.Connect()
|
err = plugin.Connect()
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
//go:build !custom || serializers || serializers.carbon2
|
||||||
|
|
||||||
|
package all
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/serializers/carbon2" // register plugin
|
||||||
|
)
|
||||||
|
|
@ -8,54 +8,48 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/plugins/serializers"
|
||||||
)
|
)
|
||||||
|
|
||||||
type format string
|
const sanitizedChars = "!@#$%^&*()+`'\"[]{};<>,?/\\|="
|
||||||
|
|
||||||
const (
|
|
||||||
carbon2FormatFieldEmpty = format("")
|
|
||||||
Carbon2FormatFieldSeparate = format("field_separate")
|
|
||||||
Carbon2FormatMetricIncludesField = format("metric_includes_field")
|
|
||||||
)
|
|
||||||
|
|
||||||
var formats = map[format]struct{}{
|
|
||||||
carbon2FormatFieldEmpty: {},
|
|
||||||
Carbon2FormatFieldSeparate: {},
|
|
||||||
Carbon2FormatMetricIncludesField: {},
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
DefaultSanitizeReplaceChar = ":"
|
|
||||||
sanitizedChars = "!@#$%^&*()+`'\"[]{};<>,?/\\|="
|
|
||||||
)
|
|
||||||
|
|
||||||
type Serializer struct {
|
type Serializer struct {
|
||||||
metricsFormat format
|
Format string `toml:"carbon2_format"`
|
||||||
|
SanitizeReplaceChar string `toml:"carbon2_sanitize_replace_char"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
sanitizeReplacer *strings.Replacer
|
sanitizeReplacer *strings.Replacer
|
||||||
|
template string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSerializer(metricsFormat string, sanitizeReplaceChar string) (*Serializer, error) {
|
func (s *Serializer) Init() error {
|
||||||
if sanitizeReplaceChar == "" {
|
if s.SanitizeReplaceChar == "" {
|
||||||
sanitizeReplaceChar = DefaultSanitizeReplaceChar
|
s.SanitizeReplaceChar = ":"
|
||||||
} else if len(sanitizeReplaceChar) > 1 {
|
|
||||||
return nil, errors.New("sanitize replace char has to be a singular character")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var f = format(metricsFormat)
|
if len(s.SanitizeReplaceChar) > 1 {
|
||||||
|
return errors.New("sanitize replace char has to be a singular character")
|
||||||
if _, ok := formats[f]; !ok {
|
|
||||||
return nil, fmt.Errorf("unknown carbon2 format: %s", f)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// When unset, default to field separate.
|
// Create replacer to replacing all characters requiring sanitization with the user-specified replacement
|
||||||
if f == carbon2FormatFieldEmpty {
|
pairs := make([]string, 0, 2*len(sanitizedChars))
|
||||||
f = Carbon2FormatFieldSeparate
|
for _, c := range sanitizedChars {
|
||||||
|
pairs = append(pairs, string(c), s.SanitizeReplaceChar)
|
||||||
|
}
|
||||||
|
s.sanitizeReplacer = strings.NewReplacer(pairs...)
|
||||||
|
|
||||||
|
switch s.Format {
|
||||||
|
case "", "field_separate":
|
||||||
|
s.Format = "field_separate"
|
||||||
|
s.template = "metric=%s field=%s "
|
||||||
|
case "metric_includes_field":
|
||||||
|
s.template = "metric=%s_%s "
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown carbon2 format: %s", s.Format)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Serializer{
|
return nil
|
||||||
metricsFormat: f,
|
|
||||||
sanitizeReplacer: createSanitizeReplacer(sanitizedChars, rune(sanitizeReplaceChar[0])),
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
|
||||||
|
|
@ -72,23 +66,31 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
|
||||||
|
|
||||||
func (s *Serializer) createObject(metric telegraf.Metric) []byte {
|
func (s *Serializer) createObject(metric telegraf.Metric) []byte {
|
||||||
var m bytes.Buffer
|
var m bytes.Buffer
|
||||||
metricsFormat := s.getMetricsFormat()
|
|
||||||
|
|
||||||
for fieldName, fieldValue := range metric.Fields() {
|
for fieldName, fieldValue := range metric.Fields() {
|
||||||
if isString(fieldValue) {
|
if _, ok := fieldValue.(string); ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
name := s.sanitizeReplacer.Replace(metric.Name())
|
name := s.sanitizeReplacer.Replace(metric.Name())
|
||||||
|
|
||||||
switch metricsFormat {
|
var value string
|
||||||
case Carbon2FormatFieldSeparate:
|
if v, ok := fieldValue.(bool); ok {
|
||||||
m.WriteString(serializeMetricFieldSeparate(name, fieldName))
|
if v {
|
||||||
|
value = "1"
|
||||||
case Carbon2FormatMetricIncludesField:
|
} else {
|
||||||
m.WriteString(serializeMetricIncludeField(name, fieldName))
|
value = "0"
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
var err error
|
||||||
|
value, err = internal.ToString(fieldValue)
|
||||||
|
if err != nil {
|
||||||
|
s.Log.Warnf("Cannot convert %v (%T) to string", fieldValue, fieldValue)
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.WriteString(fmt.Sprintf(s.template, strings.ReplaceAll(name, " ", "_"), strings.ReplaceAll(fieldName, " ", "_")))
|
||||||
for _, tag := range metric.TagList() {
|
for _, tag := range metric.TagList() {
|
||||||
m.WriteString(strings.ReplaceAll(tag.Key, " ", "_"))
|
m.WriteString(strings.ReplaceAll(tag.Key, " ", "_"))
|
||||||
m.WriteString("=")
|
m.WriteString("=")
|
||||||
|
|
@ -100,7 +102,7 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte {
|
||||||
m.WriteString(" ")
|
m.WriteString(" ")
|
||||||
}
|
}
|
||||||
m.WriteString(" ")
|
m.WriteString(" ")
|
||||||
m.WriteString(formatValue(fieldValue))
|
m.WriteString(value)
|
||||||
m.WriteString(" ")
|
m.WriteString(" ")
|
||||||
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
|
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
|
||||||
m.WriteString("\n")
|
m.WriteString("\n")
|
||||||
|
|
@ -108,69 +110,18 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte {
|
||||||
return m.Bytes()
|
return m.Bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Serializer) SetMetricsFormat(f format) {
|
func init() {
|
||||||
s.metricsFormat = f
|
serializers.Add("carbon2",
|
||||||
}
|
func() serializers.Serializer {
|
||||||
|
return &Serializer{}
|
||||||
func (s *Serializer) getMetricsFormat() format {
|
},
|
||||||
return s.metricsFormat
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Serializer) IsMetricsFormatUnset() bool {
|
|
||||||
return s.metricsFormat == carbon2FormatFieldEmpty
|
|
||||||
}
|
|
||||||
|
|
||||||
func serializeMetricFieldSeparate(name, fieldName string) string {
|
|
||||||
return fmt.Sprintf("metric=%s field=%s ",
|
|
||||||
strings.ReplaceAll(name, " ", "_"),
|
|
||||||
strings.ReplaceAll(fieldName, " ", "_"),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func serializeMetricIncludeField(name, fieldName string) string {
|
// InitFromConfig is a compatibility function to construct the parser the old way
|
||||||
return fmt.Sprintf("metric=%s_%s ",
|
func (s *Serializer) InitFromConfig(cfg *serializers.Config) error {
|
||||||
strings.ReplaceAll(name, " ", "_"),
|
s.Format = cfg.Carbon2Format
|
||||||
strings.ReplaceAll(fieldName, " ", "_"),
|
s.SanitizeReplaceChar = cfg.Carbon2SanitizeReplaceChar
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func formatValue(fieldValue interface{}) string {
|
return nil
|
||||||
switch v := fieldValue.(type) {
|
|
||||||
case bool:
|
|
||||||
// Print bools as 0s and 1s
|
|
||||||
return fmt.Sprintf("%d", bool2int(v))
|
|
||||||
default:
|
|
||||||
return fmt.Sprintf("%v", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func isString(v interface{}) bool {
|
|
||||||
switch v.(type) {
|
|
||||||
case string:
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func bool2int(b bool) int {
|
|
||||||
// Slightly more optimized than a usual if ... return ... else return ... .
|
|
||||||
// See: https://0x0f.me/blog/golang-compiler-optimization/
|
|
||||||
var i int
|
|
||||||
if b {
|
|
||||||
i = 1
|
|
||||||
} else {
|
|
||||||
i = 0
|
|
||||||
}
|
|
||||||
return i
|
|
||||||
}
|
|
||||||
|
|
||||||
// createSanitizeReplacer creates string replacer replacing all provided
|
|
||||||
// characters with the replaceChar.
|
|
||||||
func createSanitizeReplacer(sanitizedChars string, replaceChar rune) *strings.Replacer {
|
|
||||||
sanitizeCharPairs := make([]string, 0, 2*len(sanitizedChars))
|
|
||||||
for _, c := range sanitizedChars {
|
|
||||||
sanitizeCharPairs = append(sanitizeCharPairs, string(c), string(replaceChar))
|
|
||||||
}
|
|
||||||
return strings.NewReplacer(sanitizeCharPairs...)
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,23 +22,25 @@ func TestSerializeMetricFloat(t *testing.T) {
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
testcases := []struct {
|
testcases := []struct {
|
||||||
format format
|
format string
|
||||||
expected string
|
expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=cpu0 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=cpu0 91.5 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expected: fmt.Sprintf("metric=cpu_usage_idle cpu=cpu0 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu_usage_idle cpu=cpu0 91.5 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testcases {
|
for _, tc := range testcases {
|
||||||
t.Run(string(tc.format), func(t *testing.T) {
|
t.Run(tc.format, func(t *testing.T) {
|
||||||
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
|
s := &Serializer{
|
||||||
require.NoError(t, err)
|
Format: tc.format,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -59,23 +61,25 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) {
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
testcases := []struct {
|
testcases := []struct {
|
||||||
format format
|
format string
|
||||||
expected string
|
expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=null 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=null 91.5 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expected: fmt.Sprintf("metric=cpu_usage_idle cpu=null 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu_usage_idle cpu=null 91.5 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testcases {
|
for _, tc := range testcases {
|
||||||
t.Run(string(tc.format), func(t *testing.T) {
|
t.Run(tc.format, func(t *testing.T) {
|
||||||
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
|
s := &Serializer{
|
||||||
require.NoError(t, err)
|
Format: tc.format,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -96,23 +100,25 @@ func TestSerializeWithSpaces(t *testing.T) {
|
||||||
m := metric.New("cpu metric", tags, fields, now)
|
m := metric.New("cpu metric", tags, fields, now)
|
||||||
|
|
||||||
testcases := []struct {
|
testcases := []struct {
|
||||||
format format
|
format string
|
||||||
expected string
|
expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: fmt.Sprintf("metric=cpu_metric field=usage_idle_1 cpu_0=cpu_0 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu_metric field=usage_idle_1 cpu_0=cpu_0 91.5 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expected: fmt.Sprintf("metric=cpu_metric_usage_idle_1 cpu_0=cpu_0 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu_metric_usage_idle_1 cpu_0=cpu_0 91.5 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testcases {
|
for _, tc := range testcases {
|
||||||
t.Run(string(tc.format), func(t *testing.T) {
|
t.Run(tc.format, func(t *testing.T) {
|
||||||
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
|
s := &Serializer{
|
||||||
require.NoError(t, err)
|
Format: tc.format,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -133,23 +139,25 @@ func TestSerializeMetricInt(t *testing.T) {
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
testcases := []struct {
|
testcases := []struct {
|
||||||
format format
|
format string
|
||||||
expected string
|
expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=cpu0 90 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=cpu0 90 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expected: fmt.Sprintf("metric=cpu_usage_idle cpu=cpu0 90 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu_usage_idle cpu=cpu0 90 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testcases {
|
for _, tc := range testcases {
|
||||||
t.Run(string(tc.format), func(t *testing.T) {
|
t.Run(tc.format, func(t *testing.T) {
|
||||||
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
|
s := &Serializer{
|
||||||
require.NoError(t, err)
|
Format: tc.format,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -170,23 +178,25 @@ func TestSerializeMetricString(t *testing.T) {
|
||||||
m := metric.New("cpu", tags, fields, now)
|
m := metric.New("cpu", tags, fields, now)
|
||||||
|
|
||||||
testcases := []struct {
|
testcases := []struct {
|
||||||
format format
|
format string
|
||||||
expected string
|
expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: "",
|
expected: "",
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expected: "",
|
expected: "",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testcases {
|
for _, tc := range testcases {
|
||||||
t.Run(string(tc.format), func(t *testing.T) {
|
t.Run(tc.format, func(t *testing.T) {
|
||||||
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
|
s := &Serializer{
|
||||||
require.NoError(t, err)
|
Format: tc.format,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -214,35 +224,37 @@ func TestSerializeMetricBool(t *testing.T) {
|
||||||
|
|
||||||
testcases := []struct {
|
testcases := []struct {
|
||||||
metric telegraf.Metric
|
metric telegraf.Metric
|
||||||
format format
|
format string
|
||||||
expected string
|
expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
metric: requireMetric(now, false),
|
metric: requireMetric(now, false),
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: fmt.Sprintf("metric=cpu field=java_lang_GarbageCollector_Valid tag_name=tag_value 0 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu field=java_lang_GarbageCollector_Valid tag_name=tag_value 0 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
metric: requireMetric(now, false),
|
metric: requireMetric(now, false),
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expected: fmt.Sprintf("metric=cpu_java_lang_GarbageCollector_Valid tag_name=tag_value 0 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu_java_lang_GarbageCollector_Valid tag_name=tag_value 0 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
metric: requireMetric(now, true),
|
metric: requireMetric(now, true),
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: fmt.Sprintf("metric=cpu field=java_lang_GarbageCollector_Valid tag_name=tag_value 1 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu field=java_lang_GarbageCollector_Valid tag_name=tag_value 1 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
metric: requireMetric(now, true),
|
metric: requireMetric(now, true),
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expected: fmt.Sprintf("metric=cpu_java_lang_GarbageCollector_Valid tag_name=tag_value 1 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu_java_lang_GarbageCollector_Valid tag_name=tag_value 1 %d\n", now.Unix()),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testcases {
|
for _, tc := range testcases {
|
||||||
t.Run(string(tc.format), func(t *testing.T) {
|
t.Run(tc.format, func(t *testing.T) {
|
||||||
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
|
s := &Serializer{
|
||||||
require.NoError(t, err)
|
Format: tc.format,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
buf, err := s.Serialize(tc.metric)
|
buf, err := s.Serialize(tc.metric)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -265,17 +277,17 @@ func TestSerializeBatch(t *testing.T) {
|
||||||
metrics := []telegraf.Metric{m, m}
|
metrics := []telegraf.Metric{m, m}
|
||||||
|
|
||||||
testcases := []struct {
|
testcases := []struct {
|
||||||
format format
|
format string
|
||||||
expected string
|
expected string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: `metric=cpu field=value 42 0
|
expected: `metric=cpu field=value 42 0
|
||||||
metric=cpu field=value 42 0
|
metric=cpu field=value 42 0
|
||||||
`,
|
`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expected: `metric=cpu_value 42 0
|
expected: `metric=cpu_value 42 0
|
||||||
metric=cpu_value 42 0
|
metric=cpu_value 42 0
|
||||||
`,
|
`,
|
||||||
|
|
@ -283,9 +295,11 @@ metric=cpu_value 42 0
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testcases {
|
for _, tc := range testcases {
|
||||||
t.Run(string(tc.format), func(t *testing.T) {
|
t.Run(tc.format, func(t *testing.T) {
|
||||||
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
|
s := &Serializer{
|
||||||
require.NoError(t, err)
|
Format: tc.format,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.Init())
|
||||||
|
|
||||||
buf, err := s.SerializeBatch(metrics)
|
buf, err := s.SerializeBatch(metrics)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -300,7 +314,7 @@ func TestSerializeMetricIsProperlySanitized(t *testing.T) {
|
||||||
|
|
||||||
testcases := []struct {
|
testcases := []struct {
|
||||||
metricFunc func() telegraf.Metric
|
metricFunc func() telegraf.Metric
|
||||||
format format
|
format string
|
||||||
expected string
|
expected string
|
||||||
replaceChar string
|
replaceChar string
|
||||||
expectedErr bool
|
expectedErr bool
|
||||||
|
|
@ -312,9 +326,8 @@ func TestSerializeMetricIsProperlySanitized(t *testing.T) {
|
||||||
}
|
}
|
||||||
return metric.New("cpu=1", nil, fields, now)
|
return metric.New("cpu=1", nil, fields, now)
|
||||||
},
|
},
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: fmt.Sprintf("metric=cpu:1 field=usage_idle 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu:1 field=usage_idle 91.5 %d\n", now.Unix()),
|
||||||
replaceChar: DefaultSanitizeReplaceChar,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
metricFunc: func() telegraf.Metric {
|
metricFunc: func() telegraf.Metric {
|
||||||
|
|
@ -323,7 +336,7 @@ func TestSerializeMetricIsProperlySanitized(t *testing.T) {
|
||||||
}
|
}
|
||||||
return metric.New("cpu=1", nil, fields, now)
|
return metric.New("cpu=1", nil, fields, now)
|
||||||
},
|
},
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: fmt.Sprintf("metric=cpu_1 field=usage_idle 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu_1 field=usage_idle 91.5 %d\n", now.Unix()),
|
||||||
replaceChar: "_",
|
replaceChar: "_",
|
||||||
},
|
},
|
||||||
|
|
@ -334,9 +347,8 @@ func TestSerializeMetricIsProperlySanitized(t *testing.T) {
|
||||||
}
|
}
|
||||||
return metric.New("cpu=1=tmp$custom", nil, fields, now)
|
return metric.New("cpu=1=tmp$custom", nil, fields, now)
|
||||||
},
|
},
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: fmt.Sprintf("metric=cpu:1:tmp:custom field=usage_idle 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu:1:tmp:custom field=usage_idle 91.5 %d\n", now.Unix()),
|
||||||
replaceChar: DefaultSanitizeReplaceChar,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
metricFunc: func() telegraf.Metric {
|
metricFunc: func() telegraf.Metric {
|
||||||
|
|
@ -345,9 +357,8 @@ func TestSerializeMetricIsProperlySanitized(t *testing.T) {
|
||||||
}
|
}
|
||||||
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
|
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
|
||||||
},
|
},
|
||||||
format: Carbon2FormatFieldSeparate,
|
format: "field_separate",
|
||||||
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace field=usage_idle 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace field=usage_idle 91.5 %d\n", now.Unix()),
|
||||||
replaceChar: DefaultSanitizeReplaceChar,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
metricFunc: func() telegraf.Metric {
|
metricFunc: func() telegraf.Metric {
|
||||||
|
|
@ -356,9 +367,8 @@ func TestSerializeMetricIsProperlySanitized(t *testing.T) {
|
||||||
}
|
}
|
||||||
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
|
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
|
||||||
},
|
},
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace_usage_idle 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace_usage_idle 91.5 %d\n", now.Unix()),
|
||||||
replaceChar: DefaultSanitizeReplaceChar,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
metricFunc: func() telegraf.Metric {
|
metricFunc: func() telegraf.Metric {
|
||||||
|
|
@ -367,7 +377,7 @@ func TestSerializeMetricIsProperlySanitized(t *testing.T) {
|
||||||
}
|
}
|
||||||
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
|
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
|
||||||
},
|
},
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expected: fmt.Sprintf("metric=cpu_1_tmp_custom_namespace_usage_idle 91.5 %d\n", now.Unix()),
|
expected: fmt.Sprintf("metric=cpu_1_tmp_custom_namespace_usage_idle 91.5 %d\n", now.Unix()),
|
||||||
replaceChar: "_",
|
replaceChar: "_",
|
||||||
},
|
},
|
||||||
|
|
@ -378,22 +388,25 @@ func TestSerializeMetricIsProperlySanitized(t *testing.T) {
|
||||||
}
|
}
|
||||||
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
|
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
|
||||||
},
|
},
|
||||||
format: Carbon2FormatMetricIncludesField,
|
format: "metric_includes_field",
|
||||||
expectedErr: true,
|
expectedErr: true,
|
||||||
replaceChar: "___",
|
replaceChar: "___",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testcases {
|
for _, tc := range testcases {
|
||||||
t.Run(string(tc.format), func(t *testing.T) {
|
t.Run(tc.format, func(t *testing.T) {
|
||||||
m := tc.metricFunc()
|
m := tc.metricFunc()
|
||||||
|
|
||||||
s, err := NewSerializer(string(tc.format), tc.replaceChar)
|
s := &Serializer{
|
||||||
|
Format: tc.format,
|
||||||
|
SanitizeReplaceChar: tc.replaceChar,
|
||||||
|
}
|
||||||
|
err := s.Init()
|
||||||
if tc.expectedErr {
|
if tc.expectedErr {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
buf, err := s.Serialize(m)
|
buf, err := s.Serialize(m)
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
|
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/csv"
|
"github.com/influxdata/telegraf/plugins/serializers/csv"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
||||||
"github.com/influxdata/telegraf/plugins/serializers/json"
|
"github.com/influxdata/telegraf/plugins/serializers/json"
|
||||||
|
|
@ -185,8 +184,6 @@ func NewSerializer(config *Config) (Serializer, error) {
|
||||||
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric, config.SplunkmetricOmitEventTag), nil
|
serializer, err = NewSplunkmetricSerializer(config.HecRouting, config.SplunkmetricMultiMetric, config.SplunkmetricOmitEventTag), nil
|
||||||
case "nowmetric":
|
case "nowmetric":
|
||||||
serializer, err = NewNowSerializer()
|
serializer, err = NewNowSerializer()
|
||||||
case "carbon2":
|
|
||||||
serializer, err = NewCarbon2Serializer(config.Carbon2Format, config.Carbon2SanitizeReplaceChar)
|
|
||||||
case "wavefront":
|
case "wavefront":
|
||||||
serializer, err = NewWavefrontSerializer(
|
serializer, err = NewWavefrontSerializer(
|
||||||
config.Prefix,
|
config.Prefix,
|
||||||
|
|
@ -278,10 +275,6 @@ func NewJSONSerializer(config *Config) (Serializer, error) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) {
|
|
||||||
return carbon2.NewSerializer(carbon2format, carbon2SanitizeReplaceChar)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSplunkmetricSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool, splunkmetricOmitEventTag bool) Serializer {
|
func NewSplunkmetricSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool, splunkmetricOmitEventTag bool) Serializer {
|
||||||
return splunkmetric.NewSerializer(splunkmetricHecRouting, splunkmetricMultimetric, splunkmetricOmitEventTag)
|
return splunkmetric.NewSerializer(splunkmetricHecRouting, splunkmetricMultimetric, splunkmetricOmitEventTag)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue