diff --git a/README.md b/README.md index 8bfb9fed6..573c6e596 100644 --- a/README.md +++ b/README.md @@ -437,3 +437,4 @@ For documentation on the latest development code see the [documentation index][d * [udp](./plugins/outputs/socket_writer) * [warp10](./plugins/outputs/warp10) * [wavefront](./plugins/outputs/wavefront) +* [sumologic](./plugins/outputs/sumologic) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index c0fe636f4..b44f41add 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -1267,6 +1267,62 @@ # # location = "eu-north0" +# # A plugin that can transmit metrics to Sumo Logic HTTP Source +# [[outputs.sumologic]] +# ## Unique URL generated for your HTTP Metrics Source. +# ## This is the address to send metrics to. +# # url = "https://events.sumologic.net/receiver/v1/http/" +# +# ## Data format to be used for sending metrics. +# ## This will set the "Content-Type" header accordingly. +# ## Currently supported formats: +# ## * graphite - for Content-Type of application/vnd.sumologic.graphite +# ## * carbon2 - for Content-Type of application/vnd.sumologic.carbon2 +# ## * prometheus - for Content-Type of application/vnd.sumologic.prometheus +# ## +# ## More information can be found at: +# ## https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Metrics-to-an-HTTP-Source#content-type-headers-for-metrics +# ## +# ## NOTE: +# ## When unset, telegraf will by default use the influx serializer which is currently unsupported +# ## in HTTP Source. +# data_format = "carbon2" +# +# ## Timeout used for HTTP request +# # timeout = "5s" +# +# ## HTTP method, one of: "POST" or "PUT". "POST" is used by default if unset. +# # method = "POST" +# +# ## Max HTTP request body size in bytes before compression (if applied). +# ## By default 1MB is recommended. +# ## NOTE: +# ## Bear in mind that in some serializer a metric even though serialized to multiple +# ## lines cannot be split any further so setting this very low might not work +# ## as expected. +# # max_request_body_size = 1_000_000 +# +# ## Additional, Sumo specific options. +# ## Full list can be found here: +# ## https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Metrics-to-an-HTTP-Source#supported-http-headers +# +# ## Desired source name. +# ## Useful if you want to override the source name configured for the source. +# # source_name = "" +# +# ## Desired host name. +# ## Useful if you want to override the source host configured for the source. +# # source_host = "" +# +# ## Desired source category. +# ## Useful if you want to override the source category configured for the source. +# # source_category = "" +# +# ## Comma-separated key=value list of dimensions to apply to every metric. +# ## Custom dimensions will allow you to query your metrics at a more granular level. +# # dimensions = "" + + # # Configuration for Syslog server to send metrics to # [[outputs.syslog]] # ## URL to connect to diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 3a2813f4e..a1ac77621 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -35,6 +35,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy" _ "github.com/influxdata/telegraf/plugins/outputs/socket_writer" _ "github.com/influxdata/telegraf/plugins/outputs/stackdriver" + _ "github.com/influxdata/telegraf/plugins/outputs/sumologic" _ "github.com/influxdata/telegraf/plugins/outputs/syslog" _ "github.com/influxdata/telegraf/plugins/outputs/warp10" _ "github.com/influxdata/telegraf/plugins/outputs/wavefront" diff --git a/plugins/outputs/sumologic/README.md b/plugins/outputs/sumologic/README.md new file mode 100644 index 000000000..165315121 --- /dev/null +++ b/plugins/outputs/sumologic/README.md @@ -0,0 +1,70 @@ +# Sumo Logic Output Plugin + +This plugin sends metrics to [Sumo Logic HTTP Source](https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Metrics-to-an-HTTP-Source) +in HTTP messages, encoded using one of the output data formats. + +Currently metrics can be sent using one of the following data formats, supported +by Sumologic HTTP Source: + + * `graphite` - for Content-Type of `application/vnd.sumologic.graphite` + * `carbon2` - for Content-Type of `application/vnd.sumologic.carbon2` + * `prometheus` - for Content-Type of `application/vnd.sumologic.prometheus` + +### Configuration: + +```toml +# A plugin that can send metrics to Sumo Logic HTTP metric collector. +[[outputs.sumologic]] + ## Unique URL generated for your HTTP Metrics Source. + ## This is the address to send metrics to. + # url = "https://events.sumologic.net/receiver/v1/http/" + + ## Data format to be used for sending metrics. + ## This will set the "Content-Type" header accordingly. + ## Currently supported formats: + ## * graphite - for Content-Type of application/vnd.sumologic.graphite + ## * carbon2 - for Content-Type of application/vnd.sumologic.carbon2 + ## * prometheus - for Content-Type of application/vnd.sumologic.prometheus + ## + ## More information can be found at: + ## https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Metrics-to-an-HTTP-Source#content-type-headers-for-metrics + ## + ## NOTE: + ## When unset, telegraf will by default use the influx serializer which is currently unsupported + ## in HTTP Source. + data_format = "carbon2" + + ## Timeout used for HTTP request + # timeout = "5s" + + ## HTTP method, one of: "POST" or "PUT". "POST" is used by default if unset. + # method = "POST" + + ## Max HTTP request body size in bytes before compression (if applied). + ## By default 1MB is recommended. + ## NOTE: + ## Bear in mind that in some serializer a metric even though serialized to multiple + ## lines cannot be split any further so setting this very low might not work + ## as expected. + # max_request_body_size = 1_000_000 + + ## Additional, Sumo specific options. + ## Full list can be found here: + ## https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Metrics-to-an-HTTP-Source#supported-http-headers + + ## Desired source name. + ## Useful if you want to override the source name configured for the source. + # source_name = "" + + ## Desired host name. + ## Useful if you want to override the source host configured for the source. + # source_host = "" + + ## Desired source category. + ## Useful if you want to override the source category configured for the source. + # source_category = "" + + ## Comma-separated key=value list of dimensions to apply to every metric. + ## Custom dimensions will allow you to query your metrics at a more granular level. + # dimensions = "" +``` diff --git a/plugins/outputs/sumologic/sumologic.go b/plugins/outputs/sumologic/sumologic.go new file mode 100644 index 000000000..aca0fb56a --- /dev/null +++ b/plugins/outputs/sumologic/sumologic.go @@ -0,0 +1,306 @@ +package sumologic + +import ( + "bytes" + "compress/gzip" + "context" + "fmt" + "net/http" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/carbon2" + "github.com/influxdata/telegraf/plugins/serializers/graphite" + "github.com/influxdata/telegraf/plugins/serializers/prometheus" +) + +const ( + sampleConfig = ` + ## Unique URL generated for your HTTP Metrics Source. + ## This is the address to send metrics to. + # url = "https://events.sumologic.net/receiver/v1/http/" + + ## Data format to be used for sending metrics. + ## This will set the "Content-Type" header accordingly. + ## Currently supported formats: + ## * graphite - for Content-Type of application/vnd.sumologic.graphite + ## * carbon2 - for Content-Type of application/vnd.sumologic.carbon2 + ## * prometheus - for Content-Type of application/vnd.sumologic.prometheus + ## + ## More information can be found at: + ## https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Metrics-to-an-HTTP-Source#content-type-headers-for-metrics + ## + ## NOTE: + ## When unset, telegraf will by default use the influx serializer which is currently unsupported + ## in HTTP Source. + data_format = "carbon2" + + ## Timeout used for HTTP request + # timeout = "5s" + + ## HTTP method, one of: "POST" or "PUT". "POST" is used by default if unset. + # method = "POST" + + ## Max HTTP request body size in bytes before compression (if applied). + ## By default 1MB is recommended. + ## NOTE: + ## Bear in mind that in some serializer a metric even though serialized to multiple + ## lines cannot be split any further so setting this very low might not work + ## as expected. + # max_request_body_size = 1_000_000 + + ## Additional, Sumo specific options. + ## Full list can be found here: + ## https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Metrics-to-an-HTTP-Source#supported-http-headers + + ## Desired source name. + ## Useful if you want to override the source name configured for the source. + # source_name = "" + + ## Desired host name. + ## Useful if you want to override the source host configured for the source. + # source_host = "" + + ## Desired source category. + ## Useful if you want to override the source category configured for the source. + # source_category = "" + + ## Comma-separated key=value list of dimensions to apply to every metric. + ## Custom dimensions will allow you to query your metrics at a more granular level. + # dimensions = "" +` + + defaultClientTimeout = 5 * time.Second + defaultMethod = http.MethodPost + defaultMaxRequestBodySize = 1_000_000 + + contentTypeHeader = "Content-Type" + carbon2ContentType = "application/vnd.sumologic.carbon2" + graphiteContentType = "application/vnd.sumologic.graphite" + prometheusContentType = "application/vnd.sumologic.prometheus" +) + +type header string + +const ( + sourceNameHeader header = `X-Sumo-Name` + sourceHostHeader header = `X-Sumo-Host` + sourceCategoryHeader header = `X-Sumo-Category` + dimensionsHeader header = `X-Sumo-Dimensions` +) + +type SumoLogic struct { + URL string `toml:"url"` + Timeout internal.Duration `toml:"timeout"` + Method string `toml:"method"` + MaxRequstBodySize config.Size `toml:"max_request_body_size"` + + SourceName string `toml:"source_name"` + SourceHost string `toml:"source_host"` + SourceCategory string `toml:"source_category"` + Dimensions string `toml:"dimensions"` + + client *http.Client + serializer serializers.Serializer + + err error + headers map[string]string +} + +func (s *SumoLogic) SetSerializer(serializer serializers.Serializer) { + if s.headers == nil { + s.headers = make(map[string]string) + } + + switch serializer.(type) { + case *carbon2.Serializer: + s.headers[contentTypeHeader] = carbon2ContentType + case *graphite.GraphiteSerializer: + s.headers[contentTypeHeader] = graphiteContentType + case *prometheus.Serializer: + s.headers[contentTypeHeader] = prometheusContentType + + default: + s.err = errors.Errorf("unsupported serializer %T", serializer) + } + + s.serializer = serializer +} + +func (s *SumoLogic) createClient(ctx context.Context) (*http.Client, error) { + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + }, + Timeout: s.Timeout.Duration, + }, nil +} + +func (s *SumoLogic) Connect() error { + if s.err != nil { + return errors.Wrap(s.err, "sumologic: incorrect configuration") + } + + if s.Method == "" { + s.Method = defaultMethod + } + s.Method = strings.ToUpper(s.Method) + if s.Method != http.MethodPost && s.Method != http.MethodPut { + return fmt.Errorf("invalid method [%s] %s", s.URL, s.Method) + } + + if s.Timeout.Duration == 0 { + s.Timeout.Duration = defaultClientTimeout + } + + client, err := s.createClient(context.Background()) + if err != nil { + return err + } + + s.client = client + + return nil +} + +func (s *SumoLogic) Close() error { + return s.err +} + +func (s *SumoLogic) Description() string { + return "A plugin that can transmit metrics to Sumo Logic HTTP Source" +} + +func (s *SumoLogic) SampleConfig() string { + return sampleConfig +} + +func (s *SumoLogic) Write(metrics []telegraf.Metric) error { + if s.err != nil { + return errors.Wrap(s.err, "sumologic: incorrect configuration") + } + if s.serializer == nil { + return errors.New("sumologic: serializer unset") + } + + reqBody, err := s.serializer.SerializeBatch(metrics) + if err != nil { + return err + } + + if l := len(reqBody); l > int(s.MaxRequstBodySize) { + var ( + // Do the rounded up integer division + numChunks = (l + int(s.MaxRequstBodySize) - 1) / int(s.MaxRequstBodySize) + chunks = make([][]byte, 0, numChunks) + numMetrics = len(metrics) + // Do the rounded up integer division + stepMetrics = (numMetrics + numChunks - 1) / numChunks + ) + + for i := 0; i < numMetrics; i += stepMetrics { + boundary := i + stepMetrics + if boundary > numMetrics { + boundary = numMetrics - 1 + } + + chunkBody, err := s.serializer.SerializeBatch(metrics[i:boundary]) + if err != nil { + return err + } + chunks = append(chunks, chunkBody) + } + + return s.writeRequestChunks(chunks) + } + + return s.write(reqBody) +} + +func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error { + for _, reqChunk := range chunks { + if err := s.write(reqChunk); err != nil { + return err + } + } + return nil +} + +func (s *SumoLogic) write(reqBody []byte) error { + var ( + err error + buff bytes.Buffer + gz = gzip.NewWriter(&buff) + ) + + if _, err = gz.Write(reqBody); err != nil { + return err + } + + if err = gz.Close(); err != nil { + return err + } + + req, err := http.NewRequest(s.Method, s.URL, &buff) + if err != nil { + return err + } + + req.Header.Set("Content-Encoding", "gzip") + req.Header.Set("User-Agent", internal.ProductToken()) + + // Set headers coming from the configuration. + for k, v := range s.headers { + req.Header.Set(k, v) + } + + setHeaderIfSetInConfig(req, sourceNameHeader, s.SourceName) + setHeaderIfSetInConfig(req, sourceHostHeader, s.SourceHost) + setHeaderIfSetInConfig(req, sourceCategoryHeader, s.SourceCategory) + setHeaderIfSetInConfig(req, dimensionsHeader, s.Dimensions) + + resp, err := s.client.Do(req) + if err != nil { + return errors.Wrapf(err, "sumologic: failed sending request to [%s]", s.URL) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return errors.Errorf( + "sumologic: when writing to [%s] received status code: %d", + s.URL, resp.StatusCode, + ) + } + + return nil +} + +func setHeaderIfSetInConfig(r *http.Request, h header, value string) { + if value != "" { + r.Header.Set(string(h), value) + } +} + +func Default() *SumoLogic { + return &SumoLogic{ + Timeout: internal.Duration{ + Duration: defaultClientTimeout, + }, + Method: defaultMethod, + MaxRequstBodySize: defaultMaxRequestBodySize, + headers: make(map[string]string), + } +} + +func init() { + outputs.Add("sumologic", func() telegraf.Output { + return Default() + }) +} diff --git a/plugins/outputs/sumologic/sumologic_test.go b/plugins/outputs/sumologic/sumologic_test.go new file mode 100644 index 000000000..603ecf73c --- /dev/null +++ b/plugins/outputs/sumologic/sumologic_test.go @@ -0,0 +1,576 @@ +package sumologic + +import ( + "compress/gzip" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/plugins/serializers/carbon2" + "github.com/influxdata/telegraf/plugins/serializers/graphite" + "github.com/influxdata/telegraf/plugins/serializers/prometheus" +) + +func getMetric(t *testing.T) telegraf.Metric { + m, err := metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ) + require.NoError(t, err) + return m +} + +func getMetrics(t *testing.T) []telegraf.Metric { + const count = 10 + var metrics = make([]telegraf.Metric, count) + + for i := 0; i < count; i++ { + m, err := metric.New( + fmt.Sprintf("cpu-%d", i), + map[string]string{ + "ec2_instance": "aws-129038123", + "image": "aws-ami-1234567890", + }, + map[string]interface{}{ + "idle": 5876876, + "steal": 5876876, + "system": 5876876, + "user": 5876876, + "temp": 70.0, + }, + time.Unix(0, 0), + ) + require.NoError(t, err) + metrics[i] = m + } + return metrics +} + +func TestInvalidMethod(t *testing.T) { + plugin := &SumoLogic{ + URL: "", + Method: http.MethodGet, + } + + err := plugin.Connect() + require.Error(t, err) +} + +func TestMethod(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + plugin func() *SumoLogic + expectedMethod string + connectError bool + }{ + { + name: "default method is POST", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + return s + }, + expectedMethod: http.MethodPost, + }, + { + name: "put is okay", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.Method = http.MethodPut + return s + }, + expectedMethod: http.MethodPut, + }, + { + name: "get is invalid", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.Method = http.MethodGet + return s + }, + connectError: true, + }, + { + name: "method is case insensitive", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.Method = "poST" + return s + }, + expectedMethod: http.MethodPost, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, tt.expectedMethod, r.Method) + w.WriteHeader(http.StatusOK) + }) + + serializer, err := carbon2.NewSerializer() + require.NoError(t, err) + + plugin := tt.plugin() + plugin.SetSerializer(serializer) + err = plugin.Connect() + if tt.connectError { + require.Error(t, err) + return + } + require.NoError(t, err) + + err = plugin.Write([]telegraf.Metric{getMetric(t)}) + require.NoError(t, err) + }) + } +} + +func TestStatusCode(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + pluginFn := func() *SumoLogic { + s := Default() + s.URL = u.String() + return s + } + + tests := []struct { + name string + plugin *SumoLogic + statusCode int + errFunc func(t *testing.T, err error) + }{ + { + name: "success", + plugin: pluginFn(), + statusCode: http.StatusOK, + errFunc: func(t *testing.T, err error) { + require.NoError(t, err) + }, + }, + { + name: "1xx status is an error", + plugin: pluginFn(), + statusCode: http.StatusSwitchingProtocols, + errFunc: func(t *testing.T, err error) { + require.Error(t, err) + }, + }, + { + name: "3xx status is an error", + plugin: pluginFn(), + statusCode: http.StatusMultipleChoices, + errFunc: func(t *testing.T, err error) { + require.Error(t, err) + }, + }, + { + name: "4xx status is an error", + plugin: pluginFn(), + statusCode: http.StatusBadRequest, + errFunc: func(t *testing.T, err error) { + require.Error(t, err) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.statusCode) + }) + + serializer, err := carbon2.NewSerializer() + require.NoError(t, err) + + tt.plugin.SetSerializer(serializer) + err = tt.plugin.Connect() + require.NoError(t, err) + + err = tt.plugin.Write([]telegraf.Metric{getMetric(t)}) + tt.errFunc(t, err) + }) + } +} + +func TestContentType(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + carbon2Serializer, err := carbon2.NewSerializer() + require.NoError(t, err) + + tests := []struct { + name string + plugin func() *SumoLogic + expectedErr bool + serializer serializers.Serializer + }{ + { + name: "carbon2 is supported", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.headers = map[string]string{ + contentTypeHeader: carbon2ContentType, + } + return s + }, + serializer: carbon2Serializer, + expectedErr: false, + }, + { + name: "graphite is supported", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.headers = map[string]string{ + contentTypeHeader: graphiteContentType, + } + return s + }, + serializer: &graphite.GraphiteSerializer{}, + expectedErr: false, + }, + { + name: "prometheus is supported", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.headers = map[string]string{ + contentTypeHeader: prometheusContentType, + } + return s + }, + serializer: &prometheus.Serializer{}, + expectedErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := tt.plugin() + + plugin.SetSerializer(tt.serializer) + + err := plugin.Connect() + require.NoError(t, err) + + err = plugin.Write([]telegraf.Metric{getMetric(t)}) + require.NoError(t, err) + }) + } +} + +func TestContentEncodingGzip(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + plugin func() *SumoLogic + }{ + { + name: "default content_encoding=gzip works", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + return s + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, "gzip", r.Header.Get("Content-Encoding")) + + body, err := gzip.NewReader(r.Body) + require.NoError(t, err) + + payload, err := ioutil.ReadAll(body) + require.NoError(t, err) + + assert.Equal(t, string(payload), "metric=cpu field=value 42 0\n") + + w.WriteHeader(http.StatusNoContent) + }) + + serializer, err := carbon2.NewSerializer() + require.NoError(t, err) + + plugin := tt.plugin() + + plugin.SetSerializer(serializer) + err = plugin.Connect() + require.NoError(t, err) + + err = plugin.Write([]telegraf.Metric{getMetric(t)}) + require.NoError(t, err) + }) + } +} + +type TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) + +func TestDefaultUserAgent(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + t.Run("default-user-agent", func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, internal.ProductToken(), r.Header.Get("User-Agent")) + w.WriteHeader(http.StatusOK) + }) + + plugin := &SumoLogic{ + URL: u.String(), + Method: defaultMethod, + MaxRequstBodySize: Default().MaxRequstBodySize, + } + + serializer, err := carbon2.NewSerializer() + require.NoError(t, err) + + plugin.SetSerializer(serializer) + err = plugin.Connect() + require.NoError(t, err) + + err = plugin.Write([]telegraf.Metric{getMetric(t)}) + require.NoError(t, err) + }) +} + +func TestTOMLConfig(t *testing.T) { + testcases := []struct { + name string + configBytes []byte + expectedError bool + }{ + { + name: "carbon2 content type is supported", + configBytes: []byte(` +[[outputs.sumologic]] + url = "https://localhost:3000" + data_format = "carbon2" + `), + expectedError: false, + }, + { + name: "graphite content type is supported", + configBytes: []byte(` +[[outputs.sumologic]] + url = "https://localhost:3000" + data_format = "graphite" + `), + expectedError: false, + }, + { + name: "prometheus content type is supported", + configBytes: []byte(` +[[outputs.sumologic]] + url = "https://localhost:3000" + data_format = "prometheus" + `), + expectedError: false, + }, + { + name: "setting extra headers is not possible", + configBytes: []byte(` +[[outputs.sumologic]] + url = "https://localhost:3000" + data_format = "carbon2" + [outputs.sumologic.headers] + X-Sumo-Name = "dummy" + X-Sumo-Host = "dummy" + X-Sumo-Category = "dummy" + X-Sumo-Dimensions = "dummy" + `), + expectedError: true, + }, + { + name: "full example from sample config is correct", + configBytes: []byte(` +[[outputs.sumologic]] + url = "https://localhost:3000" + data_format = "carbon2" + timeout = "5s" + method = "POST" + source_name = "name" + source_host = "hosta" + source_category = "category" + dimensions = "dimensions" + `), + expectedError: false, + }, + { + name: "unknown key - sumo_metadata - in config fails", + configBytes: []byte(` +[[outputs.sumologic]] + url = "https://localhost:3000" + data_format = "carbon2" + timeout = "5s" + method = "POST" + source_name = "name" + sumo_metadata = "metadata" + `), + expectedError: true, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + c := config.NewConfig() + + if tt.expectedError { + require.Error(t, c.LoadConfigData(tt.configBytes)) + } else { + require.NoError(t, c.LoadConfigData(tt.configBytes)) + } + }) + } +} + +func TestMaxRequestBodySize(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + testcases := []struct { + name string + plugin func() *SumoLogic + metrics []telegraf.Metric + expectedError bool + expectedRequestCount int + }{ + { + name: "default max request body size is 1MB and doesn't split small enough metric slices", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + return s + }, + metrics: []telegraf.Metric{getMetric(t)}, + expectedError: false, + expectedRequestCount: 1, + }, + { + name: "default max request body size is 1MB and doesn't split small even medium sized metrics", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + return s + }, + metrics: getMetrics(t), + expectedError: false, + expectedRequestCount: 1, + }, + { + name: "max request body size properly splits requests - max 2500", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.MaxRequstBodySize = 2500 + return s + }, + metrics: getMetrics(t), + expectedError: false, + expectedRequestCount: 2, + }, + { + name: "max request body size properly splits requests - max 1000", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.MaxRequstBodySize = 1000 + return s + }, + metrics: getMetrics(t), + expectedError: false, + expectedRequestCount: 5, + }, + { + name: "max request body size properly splits requests - max 300", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.MaxRequstBodySize = 300 + return s + }, + metrics: getMetrics(t), + expectedError: false, + expectedRequestCount: 10, + }, + } + + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + var requestCount int + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ + w.WriteHeader(http.StatusOK) + }) + + serializer, err := carbon2.NewSerializer() + require.NoError(t, err) + + plugin := tt.plugin() + plugin.SetSerializer(serializer) + + err = plugin.Connect() + require.NoError(t, err) + + err = plugin.Write(tt.metrics) + if tt.expectedError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectedRequestCount, requestCount) + } + }) + } +} diff --git a/plugins/serializers/carbon2/carbon2.go b/plugins/serializers/carbon2/carbon2.go index fc11de062..7e6058b0b 100644 --- a/plugins/serializers/carbon2/carbon2.go +++ b/plugins/serializers/carbon2/carbon2.go @@ -3,24 +3,25 @@ package carbon2 import ( "bytes" "fmt" - "github.com/influxdata/telegraf" "strconv" "strings" + + "github.com/influxdata/telegraf" ) -type serializer struct { +type Serializer struct { } -func NewSerializer() (*serializer, error) { - s := &serializer{} +func NewSerializer() (*Serializer, error) { + s := &Serializer{} return s, nil } -func (s *serializer) Serialize(metric telegraf.Metric) ([]byte, error) { +func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) { return s.createObject(metric), nil } -func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { +func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { var batch bytes.Buffer for _, metric := range metrics { batch.Write(s.createObject(metric)) @@ -28,7 +29,7 @@ func (s *serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { return batch.Bytes(), nil } -func (s *serializer) createObject(metric telegraf.Metric) []byte { +func (s *Serializer) createObject(metric telegraf.Metric) []byte { var m bytes.Buffer for fieldName, fieldValue := range metric.Fields() { if isNumeric(fieldValue) {