Carbon2 serializer: sanitize metric name (#9026)

This commit is contained in:
Patryk Małek 2021-04-08 16:31:31 +02:00 committed by GitHub
parent f0c85492c3
commit 2b41a1e1f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 182 additions and 26 deletions

View File

@ -1388,6 +1388,7 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)
c.getFieldString(tbl, "template", &sc.Template)
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format)
c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar)
c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes)
c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields)
@ -1449,9 +1450,9 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
func (c *Config) missingTomlField(_ reflect.Type, key string) error {
switch key {
case "alias", "carbon2_format", "collectd_auth_file", "collectd_parse_multivalue",
"collectd_security_level", "collectd_typesdb", "collection_jitter", "csv_column_names",
"csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"csv_column_names", "csv_column_types", "csv_comment", "csv_delimiter", "csv_header_row_count",
"csv_measurement_column", "csv_skip_columns", "csv_skip_rows", "csv_tag_columns",
"csv_timestamp_column", "csv_timestamp_format", "csv_timezone", "csv_trim_space", "csv_skip_values",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",

View File

@ -96,7 +96,7 @@ func TestMethod(t *testing.T) {
w.WriteHeader(http.StatusOK)
})
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
plugin := tt.plugin()
@ -173,7 +173,7 @@ func TestStatusCode(t *testing.T) {
w.WriteHeader(tt.statusCode)
})
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
tt.plugin.SetSerializer(serializer)
@ -199,7 +199,7 @@ func TestContentType(t *testing.T) {
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
s.SetSerializer(sr)
return s
@ -213,7 +213,7 @@ func TestContentType(t *testing.T) {
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField))
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
s.SetSerializer(sr)
return s
@ -310,7 +310,7 @@ func TestContentEncodingGzip(t *testing.T) {
w.WriteHeader(http.StatusNoContent)
})
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
plugin := tt.plugin()
@ -345,7 +345,7 @@ func TestDefaultUserAgent(t *testing.T) {
MaxRequstBodySize: Default().MaxRequstBodySize,
}
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
plugin.SetSerializer(serializer)
@ -594,7 +594,7 @@ func TestMaxRequestBodySize(t *testing.T) {
w.WriteHeader(http.StatusOK)
})
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
plugin := tt.plugin()
@ -626,7 +626,7 @@ func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) {
plugin := Default()
plugin.URL = u.String()
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate), carbon2.DefaultSanitizeReplaceChar)
require.NoError(t, err)
plugin.SetSerializer(serializer)

View File

@ -21,6 +21,11 @@ The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 f
## * "metric_includes_field"
## * "" - defaults to "field_separate"
# carbon2_format = "field_separate"
## Character used for replacing sanitized characters. By default ":" is used.
## The following character set is being replaced with sanitize replace char:
## !@#$%^&*()+`'\"[]{};<>,?/\\|=
# carbon2_sanitize_replace_char = ":"
```
Standard form:
@ -52,6 +57,17 @@ metric=name_field_2 host=foo 4 1234567890
metric=name_field_N host=foo 59 1234567890
```
### Metric name sanitization
In order to sanitize the metric name one can specify `carbon2_sanitize_replace_char`
in order to replace the following characters in the metric name:
```
!@#$%^&*()+`'\"[]{};<>,?/\\|=
```
By default they will be replaced with `:`.
## Metrics
The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields.

View File

@ -2,6 +2,7 @@ package carbon2
import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
@ -23,11 +24,23 @@ var formats = map[format]struct{}{
Carbon2FormatMetricIncludesField: {},
}
const (
DefaultSanitizeReplaceChar = ":"
sanitizedChars = "!@#$%^&*()+`'\"[]{};<>,?/\\|="
)
type Serializer struct {
metricsFormat format
metricsFormat format
sanitizeReplacer *strings.Replacer
}
func NewSerializer(metricsFormat string) (*Serializer, error) {
func NewSerializer(metricsFormat string, sanitizeReplaceChar string) (*Serializer, error) {
if sanitizeReplaceChar == "" {
sanitizeReplaceChar = DefaultSanitizeReplaceChar
} else if len(sanitizeReplaceChar) > 1 {
return nil, errors.New("sanitize replace char has to be a singular character")
}
var f = format(metricsFormat)
if _, ok := formats[f]; !ok {
@ -40,7 +53,8 @@ func NewSerializer(metricsFormat string) (*Serializer, error) {
}
return &Serializer{
metricsFormat: f,
metricsFormat: f,
sanitizeReplacer: createSanitizeReplacer(sanitizedChars, rune(sanitizeReplaceChar[0])),
}, nil
}
@ -65,15 +79,17 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte {
continue
}
name := s.sanitizeReplacer.Replace(metric.Name())
switch metricsFormat {
case Carbon2FormatFieldSeparate:
m.WriteString(serializeMetricFieldSeparate(
metric.Name(), fieldName,
name, fieldName,
))
case Carbon2FormatMetricIncludesField:
m.WriteString(serializeMetricIncludeField(
metric.Name(), fieldName,
name, fieldName,
))
}
@ -152,3 +168,13 @@ func bool2int(b bool) int {
}
return i
}
// createSanitizeReplacer creates string replacer replacing all provided
// characters with the replaceChar.
func createSanitizeReplacer(sanitizedChars string, replaceChar rune) *strings.Replacer {
sanitizeCharPairs := make([]string, 0, 2*len(sanitizedChars))
for _, c := range sanitizedChars {
sanitizeCharPairs = append(sanitizeCharPairs, string(c), string(replaceChar))
}
return strings.NewReplacer(sanitizeCharPairs...)
}

View File

@ -46,7 +46,7 @@ func TestSerializeMetricFloat(t *testing.T) {
for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)
buf, err := s.Serialize(m)
@ -84,7 +84,7 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) {
for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)
buf, err := s.Serialize(m)
@ -122,7 +122,7 @@ func TestSerializeWithSpaces(t *testing.T) {
for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)
buf, err := s.Serialize(m)
@ -160,7 +160,7 @@ func TestSerializeMetricInt(t *testing.T) {
for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)
buf, err := s.Serialize(m)
@ -198,7 +198,7 @@ func TestSerializeMetricString(t *testing.T) {
for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)
buf, err := s.Serialize(m)
@ -255,7 +255,7 @@ func TestSerializeMetricBool(t *testing.T) {
for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)
buf, err := s.Serialize(tc.metric)
@ -300,7 +300,7 @@ metric=cpu_value 42 0
for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(string(tc.format))
s, err := NewSerializer(string(tc.format), DefaultSanitizeReplaceChar)
require.NoError(t, err)
buf, err := s.SerializeBatch(metrics)
@ -310,3 +310,113 @@ metric=cpu_value 42 0
})
}
}
func TestSerializeMetricIsProperlySanitized(t *testing.T) {
now := time.Now()
testcases := []struct {
metricFunc func() (telegraf.Metric, error)
format format
expected string
replaceChar string
expectedErr bool
}{
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu:1 field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu_1 field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: "_",
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu:1:tmp:custom field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace field=usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu:1:tmp:custom:namespace_usage_idle 91.5 %d\n", now.Unix()),
replaceChar: DefaultSanitizeReplaceChar,
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu_1_tmp_custom_namespace_usage_idle 91.5 %d\n", now.Unix()),
replaceChar: "_",
},
{
metricFunc: func() (telegraf.Metric, error) {
fields := map[string]interface{}{
"usage_idle": float64(91.5),
}
return metric.New("cpu=1=tmp$custom%namespace", nil, fields, now)
},
format: Carbon2FormatMetricIncludesField,
expectedErr: true,
replaceChar: "___",
},
}
for _, tc := range testcases {
t.Run(string(tc.format), func(t *testing.T) {
m, err := tc.metricFunc()
require.NoError(t, err)
s, err := NewSerializer(string(tc.format), tc.replaceChar)
if tc.expectedErr {
require.Error(t, err)
return
}
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
assert.Equal(t, tc.expected, string(buf))
})
}
}

View File

@ -53,6 +53,9 @@ type Config struct {
// Carbon2 metric format.
Carbon2Format string `toml:"carbon2_format"`
// Character used for metric name sanitization in Carbon2.
Carbon2SanitizeReplaceChar string `toml:"carbon2_sanitize_replace_char"`
// Support tags in graphite protocol
GraphiteTagSupport bool `toml:"graphite_tag_support"`
@ -123,7 +126,7 @@ func NewSerializer(config *Config) (Serializer, error) {
case "nowmetric":
serializer, err = NewNowSerializer()
case "carbon2":
serializer, err = NewCarbon2Serializer(config.Carbon2Format)
serializer, err = NewCarbon2Serializer(config.Carbon2Format, config.Carbon2SanitizeReplaceChar)
case "wavefront":
serializer, err = NewWavefrontSerializer(config.Prefix, config.WavefrontUseStrict, config.WavefrontSourceOverride)
case "prometheus":
@ -186,8 +189,8 @@ func NewJSONSerializer(timestampUnits time.Duration) (Serializer, error) {
return json.NewSerializer(timestampUnits)
}
func NewCarbon2Serializer(carbon2format string) (Serializer, error) {
return carbon2.NewSerializer(carbon2format)
func NewCarbon2Serializer(carbon2format string, carbon2SanitizeReplaceChar string) (Serializer, error) {
return carbon2.NewSerializer(carbon2format, carbon2SanitizeReplaceChar)
}
func NewSplunkmetricSerializer(splunkmetricHecRouting bool, splunkmetricMultimetric bool) (Serializer, error) {