Carbon2 configuration option - include field in metric name (#8094)

This commit is contained in:
Patryk Małek 2020-09-10 18:14:11 +02:00 committed by GitHub
parent e9dcade0a8
commit 5534b9955c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 293 additions and 82 deletions

View File

@ -1933,6 +1933,14 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
} }
} }
if node, ok := tbl.Fields["carbon2_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.Carbon2Format = str.Value
}
}
}
if node, ok := tbl.Fields["influx_max_line_bytes"]; ok { if node, ok := tbl.Fields["influx_max_line_bytes"]; ok {
if kv, ok := node.(*ast.KeyValue); ok { if kv, ok := node.(*ast.KeyValue); ok {
if integer, ok := kv.Value.(*ast.Integer); ok { if integer, ok := kv.Value.(*ast.Integer); ok {
@ -2089,6 +2097,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
} }
} }
delete(tbl.Fields, "carbon2_format")
delete(tbl.Fields, "influx_max_line_bytes") delete(tbl.Fields, "influx_max_line_bytes")
delete(tbl.Fields, "influx_sort_fields") delete(tbl.Fields, "influx_sort_fields")
delete(tbl.Fields, "influx_uint_support") delete(tbl.Fields, "influx_uint_support")

View File

@ -133,7 +133,7 @@ func TestMethod(t *testing.T) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
}) })
serializer, err := carbon2.NewSerializer() serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err) require.NoError(t, err)
plugin := tt.plugin() plugin := tt.plugin()
@ -210,7 +210,7 @@ func TestStatusCode(t *testing.T) {
w.WriteHeader(tt.statusCode) w.WriteHeader(tt.statusCode)
}) })
serializer, err := carbon2.NewSerializer() serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err) require.NoError(t, err)
tt.plugin.SetSerializer(serializer) tt.plugin.SetSerializer(serializer)
@ -234,7 +234,7 @@ func TestContentType(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err) require.NoError(t, err)
carbon2Serializer, err := carbon2.NewSerializer() carbon2Serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err) require.NoError(t, err)
tests := []struct { tests := []struct {
@ -336,7 +336,7 @@ func TestContentEncodingGzip(t *testing.T) {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
}) })
serializer, err := carbon2.NewSerializer() serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err) require.NoError(t, err)
plugin := tt.plugin() plugin := tt.plugin()
@ -372,7 +372,7 @@ func TestDefaultUserAgent(t *testing.T) {
MaxRequstBodySize: Default().MaxRequstBodySize, MaxRequstBodySize: Default().MaxRequstBodySize,
} }
serializer, err := carbon2.NewSerializer() serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err) require.NoError(t, err)
plugin.SetSerializer(serializer) plugin.SetSerializer(serializer)
@ -555,7 +555,7 @@ func TestMaxRequestBodySize(t *testing.T) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
}) })
serializer, err := carbon2.NewSerializer() serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err) require.NoError(t, err)
plugin := tt.plugin() plugin := tt.plugin()

View File

@ -2,7 +2,7 @@
The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 format](http://metrics20.org/implementations/). The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 format](http://metrics20.org/implementations/).
### Configuration ## Configuration
```toml ```toml
[[outputs.file]] [[outputs.file]]
@ -14,20 +14,51 @@ The `carbon2` serializer translates the Telegraf metric format to the [Carbon2 f
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "carbon2" data_format = "carbon2"
## Optionally configure metrics format, whether to merge metric name and field name.
## Possible options:
## * "field_separate"
## * "metric_includes_field"
## * "" - defaults to "field_separate"
# carbon2_format = "field_separate"
``` ```
Standard form: Standard form:
``` ```
metric=name field=field_1 host=foo 30 1234567890 metric=name field=field_1 host=foo 30 1234567890
metric=name field=field_2 host=foo 4 1234567890 metric=name field=field_2 host=foo 4 1234567890
metric=name field=field_N host=foo 59 1234567890 metric=name field=field_N host=foo 59 1234567890
``` ```
### Metrics ### Metrics format
The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields. So, if one Telegraf metric has 4 fields, the `carbon2` output will be 4 separate metrics. There will be a `metric` tag that represents the name of the metric and a `field` tag to represent the field. `Carbon2` serializer has a configuration option - `carbon2_format` - to change how
metrics names are being constructed.
### Example By default `metric` will only inclue the metric name and a separate field `field`
will contain the field name.
This is the behavior of `carbon2_format = "field_separate"` which is the default
behavior (even if unspecified).
Optionally user can opt in to change this to make the metric inclue the field name
after the `_`.
This is the behavior of `carbon2_format = "metric_includes_field"` which would
make the above example look like:
```
metric=name_field_1 host=foo 30 1234567890
metric=name_field_2 host=foo 4 1234567890
metric=name_field_N host=foo 59 1234567890
```
## Metrics
The serializer converts the metrics by creating `intrinsic_tags` using the combination of metric name and fields.
So, if one Telegraf metric has 4 fields, the `carbon2` output will be 4 separate metrics.
There will be a `metric` tag that represents the name of the metric and a `field` tag to represent the field.
## Example
If we take the following InfluxDB Line Protocol: If we take the following InfluxDB Line Protocol:
@ -42,8 +73,10 @@ metric=weather field=temperature location=us-midwest season=summer 82 123456789
metric=weather field=wind location=us-midwest season=summer 100 1234567890 metric=weather field=wind location=us-midwest season=summer 100 1234567890
``` ```
### Fields and Tags with spaces ## Fields and Tags with spaces
When a field key or tag key/value have spaces, spaces will be replaced with `_`. When a field key or tag key/value have spaces, spaces will be replaced with `_`.
### Tags with empty values ## Tags with empty values
When a tag's value is empty, it will be replaced with `null` When a tag's value is empty, it will be replaced with `null`

View File

@ -9,12 +9,39 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
) )
type Serializer struct { type format string
const (
Carbon2FormatFieldSeparate string = "field_separate"
Carbon2FormatMetricIncludesField string = "metric_includes_field"
formatFieldSeparate = format(Carbon2FormatFieldSeparate)
formatMetricIncludesField = format(Carbon2FormatMetricIncludesField)
)
var formats = map[string]format{
// Field separate is the default when no format specified.
"": formatFieldSeparate,
Carbon2FormatFieldSeparate: formatFieldSeparate,
Carbon2FormatMetricIncludesField: formatMetricIncludesField,
} }
func NewSerializer() (*Serializer, error) { type Serializer struct {
s := &Serializer{} metricsFormat format
return s, nil }
func NewSerializer(f string) (*Serializer, error) {
var (
ok bool
metricsFormat format
)
if metricsFormat, ok = formats[f]; !ok {
return nil, fmt.Errorf("unknown carbon2 format: %s", f)
}
return &Serializer{
metricsFormat: metricsFormat,
}, nil
} }
func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
@ -32,32 +59,54 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
func (s *Serializer) createObject(metric telegraf.Metric) []byte { func (s *Serializer) createObject(metric telegraf.Metric) []byte {
var m bytes.Buffer var m bytes.Buffer
for fieldName, fieldValue := range metric.Fields() { for fieldName, fieldValue := range metric.Fields() {
if isNumeric(fieldValue) { if !isNumeric(fieldValue) {
m.WriteString("metric=") continue
m.WriteString(strings.Replace(metric.Name(), " ", "_", -1))
m.WriteString(" field=")
m.WriteString(strings.Replace(fieldName, " ", "_", -1))
m.WriteString(" ")
for _, tag := range metric.TagList() {
m.WriteString(strings.Replace(tag.Key, " ", "_", -1))
m.WriteString("=")
value := tag.Value
if len(value) == 0 {
value = "null"
}
m.WriteString(strings.Replace(value, " ", "_", -1))
m.WriteString(" ")
}
m.WriteString(" ")
m.WriteString(fmt.Sprintf("%v", fieldValue))
m.WriteString(" ")
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
m.WriteString("\n")
} }
switch s.metricsFormat {
case formatFieldSeparate:
m.WriteString(serializeMetricFieldSeparate(
metric.Name(), fieldName,
))
case formatMetricIncludesField:
m.WriteString(serializeMetricIncludeField(
metric.Name(), fieldName,
))
}
for _, tag := range metric.TagList() {
m.WriteString(strings.Replace(tag.Key, " ", "_", -1))
m.WriteString("=")
value := tag.Value
if len(value) == 0 {
value = "null"
}
m.WriteString(strings.Replace(value, " ", "_", -1))
m.WriteString(" ")
}
m.WriteString(" ")
m.WriteString(fmt.Sprintf("%v", fieldValue))
m.WriteString(" ")
m.WriteString(strconv.FormatInt(metric.Time().Unix(), 10))
m.WriteString("\n")
} }
return m.Bytes() return m.Bytes()
} }
func serializeMetricFieldSeparate(name, fieldName string) string {
return fmt.Sprintf("metric=%s field=%s ",
strings.Replace(name, " ", "_", -1),
strings.Replace(fieldName, " ", "_", -1),
)
}
func serializeMetricIncludeField(name, fieldName string) string {
return fmt.Sprintf("metric=%s_%s ",
strings.Replace(name, " ", "_", -1),
strings.Replace(fieldName, " ", "_", -1),
)
}
func isNumeric(v interface{}) bool { func isNumeric(v interface{}) bool {
switch v.(type) { switch v.(type) {
case string: case string:

View File

@ -2,13 +2,14 @@ package carbon2
import ( import (
"fmt" "fmt"
"github.com/stretchr/testify/require"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert"
) )
func MustMetric(v telegraf.Metric, err error) telegraf.Metric { func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
@ -27,14 +28,33 @@ func TestSerializeMetricFloat(t *testing.T) {
"usage_idle": float64(91.5), "usage_idle": float64(91.5),
} }
m, err := metric.New("cpu", tags, fields, now) m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err) require.NoError(t, err)
s, _ := NewSerializer() testcases := []struct {
var buf []byte format string
buf, err = s.Serialize(m) expected string
assert.NoError(t, err) }{
expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 91.5 %d`, now.Unix()) + "\n") {
assert.Equal(t, string(expS), string(buf)) format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=cpu0 91.5 %d\n", now.Unix()),
},
{
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu_usage_idle cpu=cpu0 91.5 %d\n", now.Unix()),
},
}
for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) {
s, err := NewSerializer(tc.format)
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
assert.Equal(t, tc.expected, string(buf))
})
}
} }
func TestSerializeMetricWithEmptyStringTag(t *testing.T) { func TestSerializeMetricWithEmptyStringTag(t *testing.T) {
@ -46,14 +66,33 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) {
"usage_idle": float64(91.5), "usage_idle": float64(91.5),
} }
m, err := metric.New("cpu", tags, fields, now) m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err) require.NoError(t, err)
s, _ := NewSerializer() testcases := []struct {
var buf []byte format string
buf, err = s.Serialize(m) expected string
assert.NoError(t, err) }{
expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=null 91.5 %d`, now.Unix()) + "\n") {
assert.Equal(t, string(expS), string(buf)) format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=null 91.5 %d\n", now.Unix()),
},
{
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu_usage_idle cpu=null 91.5 %d\n", now.Unix()),
},
}
for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) {
s, err := NewSerializer(tc.format)
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
assert.Equal(t, tc.expected, string(buf))
})
}
} }
func TestSerializeWithSpaces(t *testing.T) { func TestSerializeWithSpaces(t *testing.T) {
@ -65,14 +104,33 @@ func TestSerializeWithSpaces(t *testing.T) {
"usage_idle 1": float64(91.5), "usage_idle 1": float64(91.5),
} }
m, err := metric.New("cpu metric", tags, fields, now) m, err := metric.New("cpu metric", tags, fields, now)
assert.NoError(t, err) require.NoError(t, err)
s, _ := NewSerializer() testcases := []struct {
var buf []byte format string
buf, err = s.Serialize(m) expected string
assert.NoError(t, err) }{
expS := []byte(fmt.Sprintf(`metric=cpu_metric field=usage_idle_1 cpu_0=cpu_0 91.5 %d`, now.Unix()) + "\n") {
assert.Equal(t, string(expS), string(buf)) format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu_metric field=usage_idle_1 cpu_0=cpu_0 91.5 %d\n", now.Unix()),
},
{
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu_metric_usage_idle_1 cpu_0=cpu_0 91.5 %d\n", now.Unix()),
},
}
for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) {
s, err := NewSerializer(tc.format)
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
assert.Equal(t, tc.expected, string(buf))
})
}
} }
func TestSerializeMetricInt(t *testing.T) { func TestSerializeMetricInt(t *testing.T) {
@ -84,15 +142,33 @@ func TestSerializeMetricInt(t *testing.T) {
"usage_idle": int64(90), "usage_idle": int64(90),
} }
m, err := metric.New("cpu", tags, fields, now) m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err) require.NoError(t, err)
s, _ := NewSerializer() testcases := []struct {
var buf []byte format string
buf, err = s.Serialize(m) expected string
assert.NoError(t, err) }{
{
format: Carbon2FormatFieldSeparate,
expected: fmt.Sprintf("metric=cpu field=usage_idle cpu=cpu0 90 %d\n", now.Unix()),
},
{
format: Carbon2FormatMetricIncludesField,
expected: fmt.Sprintf("metric=cpu_usage_idle cpu=cpu0 90 %d\n", now.Unix()),
},
}
expS := []byte(fmt.Sprintf(`metric=cpu field=usage_idle cpu=cpu0 90 %d`, now.Unix()) + "\n") for _, tc := range testcases {
assert.Equal(t, string(expS), string(buf)) t.Run(tc.format, func(t *testing.T) {
s, err := NewSerializer(tc.format)
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
assert.Equal(t, tc.expected, string(buf))
})
}
} }
func TestSerializeMetricString(t *testing.T) { func TestSerializeMetricString(t *testing.T) {
@ -106,13 +182,31 @@ func TestSerializeMetricString(t *testing.T) {
m, err := metric.New("cpu", tags, fields, now) m, err := metric.New("cpu", tags, fields, now)
assert.NoError(t, err) assert.NoError(t, err)
s, _ := NewSerializer() testcases := []struct {
var buf []byte format string
buf, err = s.Serialize(m) expected string
assert.NoError(t, err) }{
{
format: Carbon2FormatFieldSeparate,
expected: "",
},
{
format: Carbon2FormatMetricIncludesField,
expected: "",
},
}
expS := []byte("") for _, tc := range testcases {
assert.Equal(t, string(expS), string(buf)) t.Run(tc.format, func(t *testing.T) {
s, err := NewSerializer(tc.format)
require.NoError(t, err)
buf, err := s.Serialize(m)
require.NoError(t, err)
assert.Equal(t, tc.expected, string(buf))
})
}
} }
func TestSerializeBatch(t *testing.T) { func TestSerializeBatch(t *testing.T) {
@ -128,11 +222,34 @@ func TestSerializeBatch(t *testing.T) {
) )
metrics := []telegraf.Metric{m, m} metrics := []telegraf.Metric{m, m}
s, _ := NewSerializer()
buf, err := s.SerializeBatch(metrics) testcases := []struct {
require.NoError(t, err) format string
expS := []byte(`metric=cpu field=value 42 0 expected string
}{
{
format: Carbon2FormatFieldSeparate,
expected: `metric=cpu field=value 42 0
metric=cpu field=value 42 0 metric=cpu field=value 42 0
`) `,
assert.Equal(t, string(expS), string(buf)) },
{
format: Carbon2FormatMetricIncludesField,
expected: `metric=cpu_value 42 0
metric=cpu_value 42 0
`,
},
}
for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) {
s, err := NewSerializer(tc.format)
require.NoError(t, err)
buf, err := s.SerializeBatch(metrics)
require.NoError(t, err)
assert.Equal(t, tc.expected, string(buf))
})
}
} }

View File

@ -48,6 +48,9 @@ type Config struct {
// Dataformat can be one of the serializer types listed in NewSerializer. // Dataformat can be one of the serializer types listed in NewSerializer.
DataFormat string `toml:"data_format"` DataFormat string `toml:"data_format"`
// Carbon2 metric format.
Carbon2Format string `toml:"carbon2_format"`
// Support tags in graphite protocol // Support tags in graphite protocol
GraphiteTagSupport bool `toml:"graphite_tag_support"` GraphiteTagSupport bool `toml:"graphite_tag_support"`
@ -118,7 +121,7 @@ func NewSerializer(config *Config) (Serializer, error) {
case "nowmetric": case "nowmetric":
serializer, err = NewNowSerializer() serializer, err = NewNowSerializer()
case "carbon2": case "carbon2":
serializer, err = NewCarbon2Serializer() serializer, err = NewCarbon2Serializer(config.Carbon2Format)
case "wavefront": case "wavefront":
serializer, err = NewWavefrontSerializer(config.Prefix, config.WavefrontUseStrict, config.WavefrontSourceOverride) serializer, err = NewWavefrontSerializer(config.Prefix, config.WavefrontUseStrict, config.WavefrontSourceOverride)
case "prometheus": case "prometheus":
@ -160,8 +163,8 @@ func NewJsonSerializer(timestampUnits time.Duration) (Serializer, error) {
return json.NewSerializer(timestampUnits) return json.NewSerializer(timestampUnits)
} }
func NewCarbon2Serializer() (Serializer, error) { func NewCarbon2Serializer(carbon2format string) (Serializer, error) {
return carbon2.NewSerializer() return carbon2.NewSerializer(carbon2format)
} }
func NewSplunkmetricSerializer(splunkmetric_hec_routing bool, splunkmetric_multimetric bool) (Serializer, error) { func NewSplunkmetricSerializer(splunkmetric_hec_routing bool, splunkmetric_multimetric bool) (Serializer, error) {