From ca7252c64147c3f20540bbc51ff8aab5c982035e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= <69143962+pmalek-sumo@users.noreply.github.com> Date: Thu, 24 Sep 2020 22:13:37 +0200 Subject: [PATCH] Fix Sumo Logic output plugin not splitting requests properly (#25) (#8115) --- plugins/outputs/sumologic/sumologic.go | 93 ++++++++--- plugins/outputs/sumologic/sumologic_test.go | 166 ++++++++++++++++---- 2 files changed, 209 insertions(+), 50 deletions(-) diff --git a/plugins/outputs/sumologic/sumologic.go b/plugins/outputs/sumologic/sumologic.go index aca0fb56a..42ffc3dd3 100644 --- a/plugins/outputs/sumologic/sumologic.go +++ b/plugins/outputs/sumologic/sumologic.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "context" "fmt" + "log" "net/http" "strings" "time" @@ -107,6 +108,8 @@ type SumoLogic struct { SourceCategory string `toml:"source_category"` Dimensions string `toml:"dimensions"` + Log telegraf.Logger `toml:"-"` + client *http.Client serializer serializers.Serializer @@ -189,6 +192,9 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error { if s.serializer == nil { return errors.New("sumologic: serializer unset") } + if len(metrics) == 0 { + return nil + } reqBody, err := s.serializer.SerializeBatch(metrics) if err != nil { @@ -196,26 +202,9 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error { } if l := len(reqBody); l > int(s.MaxRequstBodySize) { - var ( - // Do the rounded up integer division - numChunks = (l + int(s.MaxRequstBodySize) - 1) / int(s.MaxRequstBodySize) - chunks = make([][]byte, 0, numChunks) - numMetrics = len(metrics) - // Do the rounded up integer division - stepMetrics = (numMetrics + numChunks - 1) / numChunks - ) - - for i := 0; i < numMetrics; i += stepMetrics { - boundary := i + stepMetrics - if boundary > numMetrics { - boundary = numMetrics - 1 - } - - chunkBody, err := s.serializer.SerializeBatch(metrics[i:boundary]) - if err != nil { - return err - } - chunks = append(chunks, chunkBody) + chunks, err := s.splitIntoChunks(metrics) + if err != nil { + return err } return s.writeRequestChunks(chunks) @@ -227,7 +216,7 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error { func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error { for _, reqChunk := range chunks { if err := s.write(reqChunk); err != nil { - return err + s.Log.Errorf("Error sending chunk: %v", err) } } return nil @@ -282,6 +271,68 @@ func (s *SumoLogic) write(reqBody []byte) error { return nil } +// splitIntoChunks splits metrics to be sent into chunks so that every request +// is smaller than s.MaxRequstBodySize unless it was configured so small so that +// even a single metric cannot fit. +// In such a situation metrics will be sent one by one with a warning being logged +// for every request sent even though they don't fit in s.MaxRequstBodySize bytes. +func (s *SumoLogic) splitIntoChunks(metrics []telegraf.Metric) ([][]byte, error) { + var ( + numMetrics = len(metrics) + chunks = make([][]byte, 0) + ) + + for i := 0; i < numMetrics; { + var toAppend []byte + for i < numMetrics { + chunkBody, err := s.serializer.Serialize(metrics[i]) + if err != nil { + return nil, err + } + + la := len(toAppend) + if la != 0 { + // We already have something to append ... + if la+len(chunkBody) > int(s.MaxRequstBodySize) { + // ... and it's just the right size, without currently processed chunk. + break + } else { + // ... we can try appending more. + i++ + toAppend = append(toAppend, chunkBody...) + continue + } + } else { // la == 0 + i++ + toAppend = chunkBody + + if len(chunkBody) > int(s.MaxRequstBodySize) { + log.Printf( + "W! [SumoLogic] max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split", + s.MaxRequstBodySize, len(chunkBody), + ) + + // The serialized metric is too big but we have no choice + // but to send it. + // max_request_body_size was set so small that it wouldn't + // even accomodate a single metric. + break + } + + continue + } + } + + if toAppend == nil { + break + } + + chunks = append(chunks, toAppend) + } + + return chunks, nil +} + func setHeaderIfSetInConfig(r *http.Request, h header, value string) { if value != "" { r.Header.Set(string(h), value) diff --git a/plugins/outputs/sumologic/sumologic_test.go b/plugins/outputs/sumologic/sumologic_test.go index 23db47c5b..ff9e39d8f 100644 --- a/plugins/outputs/sumologic/sumologic_test.go +++ b/plugins/outputs/sumologic/sumologic_test.go @@ -1,12 +1,15 @@ package sumologic import ( + "bufio" "compress/gzip" "fmt" + "io" "io/ioutil" "net/http" "net/http/httptest" "net/url" + "sync/atomic" "testing" "time" @@ -36,8 +39,7 @@ func getMetric(t *testing.T) telegraf.Metric { return m } -func getMetrics(t *testing.T) []telegraf.Metric { - const count = 10 +func getMetrics(t *testing.T, count int) []telegraf.Metric { var metrics = make([]telegraf.Metric, count) for i := 0; i < count; i++ { @@ -480,12 +482,15 @@ func TestMaxRequestBodySize(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) require.NoError(t, err) + const count = 100 + testcases := []struct { - name string - plugin func() *SumoLogic - metrics []telegraf.Metric - expectedError bool - expectedRequestCount int + name string + plugin func() *SumoLogic + metrics []telegraf.Metric + expectedError bool + expectedRequestCount int32 + expectedMetricLinesCount int32 }{ { name: "default max request body size is 1MB and doesn't split small enough metric slices", @@ -494,9 +499,10 @@ func TestMaxRequestBodySize(t *testing.T) { s.URL = u.String() return s }, - metrics: []telegraf.Metric{getMetric(t)}, - expectedError: false, - expectedRequestCount: 1, + metrics: []telegraf.Metric{getMetric(t)}, + expectedError: false, + expectedRequestCount: 1, + expectedMetricLinesCount: 1, }, { name: "default max request body size is 1MB and doesn't split small even medium sized metrics", @@ -505,33 +511,90 @@ func TestMaxRequestBodySize(t *testing.T) { s.URL = u.String() return s }, - metrics: getMetrics(t), - expectedError: false, - expectedRequestCount: 1, + metrics: getMetrics(t, count), + expectedError: false, + expectedRequestCount: 1, + expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, { - name: "max request body size properly splits requests - max 2500", + name: "when short by at least 1B the request is split", plugin: func() *SumoLogic { s := Default() s.URL = u.String() - s.MaxRequstBodySize = 2500 + // getMetrics returns metrics that serialized (using carbon2), + // uncompressed size is 43750B + s.MaxRequstBodySize = 43_749 return s }, - metrics: getMetrics(t), - expectedError: false, - expectedRequestCount: 2, + metrics: getMetrics(t, count), + expectedError: false, + expectedRequestCount: 2, + expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, { - name: "max request body size properly splits requests - max 1000", + name: "max request body size properly splits requests - max 10_000", plugin: func() *SumoLogic { s := Default() s.URL = u.String() - s.MaxRequstBodySize = 1000 + s.MaxRequstBodySize = 10_000 return s }, - metrics: getMetrics(t), - expectedError: false, - expectedRequestCount: 5, + metrics: getMetrics(t, count), + expectedError: false, + expectedRequestCount: 5, + expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 + }, + { + name: "max request body size properly splits requests - max 5_000", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.MaxRequstBodySize = 5_000 + return s + }, + metrics: getMetrics(t, count), + expectedError: false, + expectedRequestCount: 10, + expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 + }, + { + name: "max request body size properly splits requests - max 2_500", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.MaxRequstBodySize = 2_500 + return s + }, + metrics: getMetrics(t, count), + expectedError: false, + expectedRequestCount: 20, + expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 + }, + { + name: "max request body size properly splits requests - max 1_000", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.MaxRequstBodySize = 1_000 + return s + }, + metrics: getMetrics(t, count), + expectedError: false, + expectedRequestCount: 50, + expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 + }, + { + name: "max request body size properly splits requests - max 500", + plugin: func() *SumoLogic { + s := Default() + s.URL = u.String() + s.MaxRequstBodySize = 500 + return s + }, + metrics: getMetrics(t, count), + expectedError: false, + expectedRequestCount: 100, + expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, { name: "max request body size properly splits requests - max 300", @@ -541,17 +604,26 @@ func TestMaxRequestBodySize(t *testing.T) { s.MaxRequstBodySize = 300 return s }, - metrics: getMetrics(t), - expectedError: false, - expectedRequestCount: 10, + metrics: getMetrics(t, count), + expectedError: false, + expectedRequestCount: 100, + expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500 }, } for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { - var requestCount int + var ( + requestCount int32 + linesCount int32 + ) ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - requestCount++ + atomic.AddInt32(&requestCount, 1) + + if tt.expectedMetricLinesCount != 0 { + atomic.AddInt32(&linesCount, int32(countLines(t, r.Body))) + } + w.WriteHeader(http.StatusOK) }) @@ -569,8 +641,44 @@ func TestMaxRequestBodySize(t *testing.T) { require.Error(t, err) } else { require.NoError(t, err) - require.Equal(t, tt.expectedRequestCount, requestCount) + require.Equal(t, tt.expectedRequestCount, atomic.LoadInt32(&requestCount)) + require.Equal(t, tt.expectedMetricLinesCount, atomic.LoadInt32(&linesCount)) } }) } } + +func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + metrics := make([]telegraf.Metric, 0) + plugin := Default() + plugin.URL = u.String() + + serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate) + require.NoError(t, err) + plugin.SetSerializer(serializer) + + err = plugin.Connect() + require.NoError(t, err) + + err = plugin.Write(metrics) + require.NoError(t, err) +} + +func countLines(t *testing.T, body io.Reader) int { + // All requests coming from Sumo Logic output plugin are gzipped. + gz, err := gzip.NewReader(body) + require.NoError(t, err) + + var linesCount int + for s := bufio.NewScanner(gz); s.Scan(); { + linesCount++ + } + + return linesCount +}