Sumo Logic output plugin: carbon2 default to include field in metric (#8132)

This commit is contained in:
Patryk Małek 2020-09-29 20:00:33 +02:00 committed by GitHub
parent 8006068e94
commit a3a1224e58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 121 additions and 77 deletions

View File

@ -122,11 +122,19 @@ func (s *SumoLogic) SetSerializer(serializer serializers.Serializer) {
s.headers = make(map[string]string) s.headers = make(map[string]string)
} }
switch serializer.(type) { switch sr := serializer.(type) {
case *carbon2.Serializer: case *carbon2.Serializer:
s.headers[contentTypeHeader] = carbon2ContentType s.headers[contentTypeHeader] = carbon2ContentType
// In case Carbon2 is used and the metrics format was unset, default to
// include field in metric name.
if sr.IsMetricsFormatUnset() {
sr.SetMetricsFormat(carbon2.Carbon2FormatMetricIncludesField)
}
case *graphite.GraphiteSerializer: case *graphite.GraphiteSerializer:
s.headers[contentTypeHeader] = graphiteContentType s.headers[contentTypeHeader] = graphiteContentType
case *prometheus.Serializer: case *prometheus.Serializer:
s.headers[contentTypeHeader] = prometheusContentType s.headers[contentTypeHeader] = prometheusContentType

View File

@ -2,6 +2,7 @@ package sumologic
import ( import (
"bufio" "bufio"
"bytes"
"compress/gzip" "compress/gzip"
"fmt" "fmt"
"io" "io"
@ -20,7 +21,6 @@ import (
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/plugins/serializers/carbon2" "github.com/influxdata/telegraf/plugins/serializers/carbon2"
"github.com/influxdata/telegraf/plugins/serializers/graphite" "github.com/influxdata/telegraf/plugins/serializers/graphite"
"github.com/influxdata/telegraf/plugins/serializers/prometheus" "github.com/influxdata/telegraf/plugins/serializers/prometheus"
@ -135,7 +135,7 @@ func TestMethod(t *testing.T) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
}) })
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err) require.NoError(t, err)
plugin := tt.plugin() plugin := tt.plugin()
@ -212,7 +212,7 @@ func TestStatusCode(t *testing.T) {
w.WriteHeader(tt.statusCode) w.WriteHeader(tt.statusCode)
}) })
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err) require.NoError(t, err)
tt.plugin.SetSerializer(serializer) tt.plugin.SetSerializer(serializer)
@ -226,77 +226,102 @@ func TestStatusCode(t *testing.T) {
} }
func TestContentType(t *testing.T) { func TestContentType(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
carbon2Serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)
tests := []struct { tests := []struct {
name string name string
plugin func() *SumoLogic plugin func() *SumoLogic
expectedErr bool expectedBody []byte
serializer serializers.Serializer
}{ }{
{ {
name: "carbon2 is supported", name: "carbon2 (data format = field separate) is supported",
plugin: func() *SumoLogic { plugin: func() *SumoLogic {
s := Default() s := Default()
s.URL = u.String()
s.headers = map[string]string{ s.headers = map[string]string{
contentTypeHeader: carbon2ContentType, contentTypeHeader: carbon2ContentType,
} }
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err)
s.SetSerializer(sr)
return s return s
}, },
serializer: carbon2Serializer, expectedBody: []byte("metric=cpu field=value 42 0\n"),
expectedErr: false, },
{
name: "carbon2 (data format unset) is supported and falls back to include field in metric name",
plugin: func() *SumoLogic {
s := Default()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldEmpty))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
expectedBody: []byte("metric=cpu_value 42 0\n"),
},
{
name: "carbon2 (data format = metric includes field) is supported",
plugin: func() *SumoLogic {
s := Default()
s.headers = map[string]string{
contentTypeHeader: carbon2ContentType,
}
sr, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatMetricIncludesField))
require.NoError(t, err)
s.SetSerializer(sr)
return s
},
expectedBody: []byte("metric=cpu_value 42 0\n"),
}, },
{ {
name: "graphite is supported", name: "graphite is supported",
plugin: func() *SumoLogic { plugin: func() *SumoLogic {
s := Default() s := Default()
s.URL = u.String()
s.headers = map[string]string{ s.headers = map[string]string{
contentTypeHeader: graphiteContentType, contentTypeHeader: graphiteContentType,
} }
s.SetSerializer(&graphite.GraphiteSerializer{})
return s return s
}, },
serializer: &graphite.GraphiteSerializer{},
expectedErr: false,
}, },
{ {
name: "prometheus is supported", name: "prometheus is supported",
plugin: func() *SumoLogic { plugin: func() *SumoLogic {
s := Default() s := Default()
s.URL = u.String()
s.headers = map[string]string{ s.headers = map[string]string{
contentTypeHeader: prometheusContentType, contentTypeHeader: prometheusContentType,
} }
s.SetSerializer(&prometheus.Serializer{})
return s return s
}, },
serializer: &prometheus.Serializer{},
expectedErr: false,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
plugin := tt.plugin() var body bytes.Buffer
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gz, err := gzip.NewReader(r.Body)
require.NoError(t, err)
io.Copy(&body, gz)
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
plugin.SetSerializer(tt.serializer) u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
err := plugin.Connect()
require.NoError(t, err) require.NoError(t, err)
plugin := tt.plugin()
plugin.URL = u.String()
require.NoError(t, plugin.Connect())
err = plugin.Write([]telegraf.Metric{getMetric(t)}) err = plugin.Write([]telegraf.Metric{getMetric(t)})
require.NoError(t, err) require.NoError(t, err)
if tt.expectedBody != nil {
require.Equal(t, string(tt.expectedBody), body.String())
}
}) })
} }
} }
@ -338,7 +363,7 @@ func TestContentEncodingGzip(t *testing.T) {
w.WriteHeader(http.StatusNoContent) w.WriteHeader(http.StatusNoContent)
}) })
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err) require.NoError(t, err)
plugin := tt.plugin() plugin := tt.plugin()
@ -374,7 +399,7 @@ func TestDefaultUserAgent(t *testing.T) {
MaxRequstBodySize: Default().MaxRequstBodySize, MaxRequstBodySize: Default().MaxRequstBodySize,
} }
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err) require.NoError(t, err)
plugin.SetSerializer(serializer) plugin.SetSerializer(serializer)
@ -627,7 +652,7 @@ func TestMaxRequestBodySize(t *testing.T) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
}) })
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err) require.NoError(t, err)
plugin := tt.plugin() plugin := tt.plugin()
@ -659,7 +684,7 @@ func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) {
plugin := Default() plugin := Default()
plugin.URL = u.String() plugin.URL = u.String()
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) serializer, err := carbon2.NewSerializer(string(carbon2.Carbon2FormatFieldSeparate))
require.NoError(t, err) require.NoError(t, err)
plugin.SetSerializer(serializer) plugin.SetSerializer(serializer)

View File

@ -12,35 +12,29 @@ import (
type format string type format string
const ( const (
Carbon2FormatFieldSeparate string = "field_separate" Carbon2FormatFieldEmpty = format("")
Carbon2FormatMetricIncludesField string = "metric_includes_field" Carbon2FormatFieldSeparate = format("field_separate")
Carbon2FormatMetricIncludesField = format("metric_includes_field")
formatFieldSeparate = format(Carbon2FormatFieldSeparate)
formatMetricIncludesField = format(Carbon2FormatMetricIncludesField)
) )
var formats = map[string]format{ var formats = map[format]struct{}{
// Field separate is the default when no format specified. Carbon2FormatFieldEmpty: {},
"": formatFieldSeparate, Carbon2FormatFieldSeparate: {},
Carbon2FormatFieldSeparate: formatFieldSeparate, Carbon2FormatMetricIncludesField: {},
Carbon2FormatMetricIncludesField: formatMetricIncludesField,
} }
type Serializer struct { type Serializer struct {
metricsFormat format metricsFormat format
} }
func NewSerializer(f string) (*Serializer, error) { func NewSerializer(metricsFormat string) (*Serializer, error) {
var ( var f = format(metricsFormat)
ok bool if _, ok := formats[f]; !ok {
metricsFormat format
)
if metricsFormat, ok = formats[f]; !ok {
return nil, fmt.Errorf("unknown carbon2 format: %s", f) return nil, fmt.Errorf("unknown carbon2 format: %s", f)
} }
return &Serializer{ return &Serializer{
metricsFormat: metricsFormat, metricsFormat: f,
}, nil }, nil
} }
@ -58,17 +52,22 @@ func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
func (s *Serializer) createObject(metric telegraf.Metric) []byte { func (s *Serializer) createObject(metric telegraf.Metric) []byte {
var m bytes.Buffer var m bytes.Buffer
metricsFormat := s.getMetricsFormat()
for fieldName, fieldValue := range metric.Fields() { for fieldName, fieldValue := range metric.Fields() {
if !isNumeric(fieldValue) { if !isNumeric(fieldValue) {
continue continue
} }
switch s.metricsFormat { switch metricsFormat {
case formatFieldSeparate: // Field separate is the default when no format specified.
case Carbon2FormatFieldEmpty:
case Carbon2FormatFieldSeparate:
m.WriteString(serializeMetricFieldSeparate( m.WriteString(serializeMetricFieldSeparate(
metric.Name(), fieldName, metric.Name(), fieldName,
)) ))
case formatMetricIncludesField:
case Carbon2FormatMetricIncludesField:
m.WriteString(serializeMetricIncludeField( m.WriteString(serializeMetricIncludeField(
metric.Name(), fieldName, metric.Name(), fieldName,
)) ))
@ -93,6 +92,18 @@ func (s *Serializer) createObject(metric telegraf.Metric) []byte {
return m.Bytes() return m.Bytes()
} }
func (s *Serializer) SetMetricsFormat(f format) {
s.metricsFormat = f
}
func (s *Serializer) getMetricsFormat() format {
return s.metricsFormat
}
func (s *Serializer) IsMetricsFormatUnset() bool {
return s.metricsFormat == Carbon2FormatFieldEmpty
}
func serializeMetricFieldSeparate(name, fieldName string) string { func serializeMetricFieldSeparate(name, fieldName string) string {
return fmt.Sprintf("metric=%s field=%s ", return fmt.Sprintf("metric=%s field=%s ",
strings.Replace(name, " ", "_", -1), strings.Replace(name, " ", "_", -1),

View File

@ -31,7 +31,7 @@ func TestSerializeMetricFloat(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
testcases := []struct { testcases := []struct {
format string format format
expected string expected string
}{ }{
{ {
@ -45,8 +45,8 @@ func TestSerializeMetricFloat(t *testing.T) {
} }
for _, tc := range testcases { for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) { t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(tc.format) s, err := NewSerializer(string(tc.format))
require.NoError(t, err) require.NoError(t, err)
buf, err := s.Serialize(m) buf, err := s.Serialize(m)
@ -69,7 +69,7 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
testcases := []struct { testcases := []struct {
format string format format
expected string expected string
}{ }{
{ {
@ -83,8 +83,8 @@ func TestSerializeMetricWithEmptyStringTag(t *testing.T) {
} }
for _, tc := range testcases { for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) { t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(tc.format) s, err := NewSerializer(string(tc.format))
require.NoError(t, err) require.NoError(t, err)
buf, err := s.Serialize(m) buf, err := s.Serialize(m)
@ -107,7 +107,7 @@ func TestSerializeWithSpaces(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
testcases := []struct { testcases := []struct {
format string format format
expected string expected string
}{ }{
{ {
@ -121,8 +121,8 @@ func TestSerializeWithSpaces(t *testing.T) {
} }
for _, tc := range testcases { for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) { t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(tc.format) s, err := NewSerializer(string(tc.format))
require.NoError(t, err) require.NoError(t, err)
buf, err := s.Serialize(m) buf, err := s.Serialize(m)
@ -145,7 +145,7 @@ func TestSerializeMetricInt(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
testcases := []struct { testcases := []struct {
format string format format
expected string expected string
}{ }{
{ {
@ -159,8 +159,8 @@ func TestSerializeMetricInt(t *testing.T) {
} }
for _, tc := range testcases { for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) { t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(tc.format) s, err := NewSerializer(string(tc.format))
require.NoError(t, err) require.NoError(t, err)
buf, err := s.Serialize(m) buf, err := s.Serialize(m)
@ -183,7 +183,7 @@ func TestSerializeMetricString(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
testcases := []struct { testcases := []struct {
format string format format
expected string expected string
}{ }{
{ {
@ -197,8 +197,8 @@ func TestSerializeMetricString(t *testing.T) {
} }
for _, tc := range testcases { for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) { t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(tc.format) s, err := NewSerializer(string(tc.format))
require.NoError(t, err) require.NoError(t, err)
buf, err := s.Serialize(m) buf, err := s.Serialize(m)
@ -224,7 +224,7 @@ func TestSerializeBatch(t *testing.T) {
metrics := []telegraf.Metric{m, m} metrics := []telegraf.Metric{m, m}
testcases := []struct { testcases := []struct {
format string format format
expected string expected string
}{ }{
{ {
@ -242,8 +242,8 @@ metric=cpu_value 42 0
} }
for _, tc := range testcases { for _, tc := range testcases {
t.Run(tc.format, func(t *testing.T) { t.Run(string(tc.format), func(t *testing.T) {
s, err := NewSerializer(tc.format) s, err := NewSerializer(string(tc.format))
require.NoError(t, err) require.NoError(t, err)
buf, err := s.SerializeBatch(metrics) buf, err := s.SerializeBatch(metrics)