Fix Sumo Logic output plugin not splitting requests properly (#25) (#8115)

This commit is contained in:
Patryk Małek 2020-09-24 22:13:37 +02:00 committed by GitHub
parent bb5c65f5f3
commit ca7252c641
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 209 additions and 50 deletions

View File

@ -5,6 +5,7 @@ import (
"compress/gzip"
"context"
"fmt"
"log"
"net/http"
"strings"
"time"
@ -107,6 +108,8 @@ type SumoLogic struct {
SourceCategory string `toml:"source_category"`
Dimensions string `toml:"dimensions"`
Log telegraf.Logger `toml:"-"`
client *http.Client
serializer serializers.Serializer
@ -189,6 +192,9 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
if s.serializer == nil {
return errors.New("sumologic: serializer unset")
}
if len(metrics) == 0 {
return nil
}
reqBody, err := s.serializer.SerializeBatch(metrics)
if err != nil {
@ -196,26 +202,9 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
}
if l := len(reqBody); l > int(s.MaxRequstBodySize) {
var (
// Do the rounded up integer division
numChunks = (l + int(s.MaxRequstBodySize) - 1) / int(s.MaxRequstBodySize)
chunks = make([][]byte, 0, numChunks)
numMetrics = len(metrics)
// Do the rounded up integer division
stepMetrics = (numMetrics + numChunks - 1) / numChunks
)
for i := 0; i < numMetrics; i += stepMetrics {
boundary := i + stepMetrics
if boundary > numMetrics {
boundary = numMetrics - 1
}
chunkBody, err := s.serializer.SerializeBatch(metrics[i:boundary])
if err != nil {
return err
}
chunks = append(chunks, chunkBody)
chunks, err := s.splitIntoChunks(metrics)
if err != nil {
return err
}
return s.writeRequestChunks(chunks)
@ -227,7 +216,7 @@ func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error {
for _, reqChunk := range chunks {
if err := s.write(reqChunk); err != nil {
return err
s.Log.Errorf("Error sending chunk: %v", err)
}
}
return nil
@ -282,6 +271,68 @@ func (s *SumoLogic) write(reqBody []byte) error {
return nil
}
// splitIntoChunks splits metrics to be sent into chunks so that every request
// is smaller than s.MaxRequstBodySize unless it was configured so small so that
// even a single metric cannot fit.
// In such a situation metrics will be sent one by one with a warning being logged
// for every request sent even though they don't fit in s.MaxRequstBodySize bytes.
func (s *SumoLogic) splitIntoChunks(metrics []telegraf.Metric) ([][]byte, error) {
var (
numMetrics = len(metrics)
chunks = make([][]byte, 0)
)
for i := 0; i < numMetrics; {
var toAppend []byte
for i < numMetrics {
chunkBody, err := s.serializer.Serialize(metrics[i])
if err != nil {
return nil, err
}
la := len(toAppend)
if la != 0 {
// We already have something to append ...
if la+len(chunkBody) > int(s.MaxRequstBodySize) {
// ... and it's just the right size, without currently processed chunk.
break
} else {
// ... we can try appending more.
i++
toAppend = append(toAppend, chunkBody...)
continue
}
} else { // la == 0
i++
toAppend = chunkBody
if len(chunkBody) > int(s.MaxRequstBodySize) {
log.Printf(
"W! [SumoLogic] max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split",
s.MaxRequstBodySize, len(chunkBody),
)
// The serialized metric is too big but we have no choice
// but to send it.
// max_request_body_size was set so small that it wouldn't
// even accomodate a single metric.
break
}
continue
}
}
if toAppend == nil {
break
}
chunks = append(chunks, toAppend)
}
return chunks, nil
}
func setHeaderIfSetInConfig(r *http.Request, h header, value string) {
if value != "" {
r.Header.Set(string(h), value)

View File

@ -1,12 +1,15 @@
package sumologic
import (
"bufio"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
"time"
@ -36,8 +39,7 @@ func getMetric(t *testing.T) telegraf.Metric {
return m
}
func getMetrics(t *testing.T) []telegraf.Metric {
const count = 10
func getMetrics(t *testing.T, count int) []telegraf.Metric {
var metrics = make([]telegraf.Metric, count)
for i := 0; i < count; i++ {
@ -480,12 +482,15 @@ func TestMaxRequestBodySize(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
const count = 100
testcases := []struct {
name string
plugin func() *SumoLogic
metrics []telegraf.Metric
expectedError bool
expectedRequestCount int
name string
plugin func() *SumoLogic
metrics []telegraf.Metric
expectedError bool
expectedRequestCount int32
expectedMetricLinesCount int32
}{
{
name: "default max request body size is 1MB and doesn't split small enough metric slices",
@ -494,9 +499,10 @@ func TestMaxRequestBodySize(t *testing.T) {
s.URL = u.String()
return s
},
metrics: []telegraf.Metric{getMetric(t)},
expectedError: false,
expectedRequestCount: 1,
metrics: []telegraf.Metric{getMetric(t)},
expectedError: false,
expectedRequestCount: 1,
expectedMetricLinesCount: 1,
},
{
name: "default max request body size is 1MB and doesn't split small even medium sized metrics",
@ -505,33 +511,90 @@ func TestMaxRequestBodySize(t *testing.T) {
s.URL = u.String()
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 1,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 1,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 2500",
name: "when short by at least 1B the request is split",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 2500
// getMetrics returns metrics that serialized (using carbon2),
// uncompressed size is 43750B
s.MaxRequstBodySize = 43_749
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 2,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 2,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 1000",
name: "max request body size properly splits requests - max 10_000",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 1000
s.MaxRequstBodySize = 10_000
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 5,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 5,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 5_000",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 5_000
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 10,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 2_500",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 2_500
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 20,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 1_000",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 1_000
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 50,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 500",
plugin: func() *SumoLogic {
s := Default()
s.URL = u.String()
s.MaxRequstBodySize = 500
return s
},
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 100,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
{
name: "max request body size properly splits requests - max 300",
@ -541,17 +604,26 @@ func TestMaxRequestBodySize(t *testing.T) {
s.MaxRequstBodySize = 300
return s
},
metrics: getMetrics(t),
expectedError: false,
expectedRequestCount: 10,
metrics: getMetrics(t, count),
expectedError: false,
expectedRequestCount: 100,
expectedMetricLinesCount: 500, // count (100) metrics, 5 lines per each (steal, idle, system, user, temp) = 500
},
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var requestCount int
var (
requestCount int32
linesCount int32
)
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
atomic.AddInt32(&requestCount, 1)
if tt.expectedMetricLinesCount != 0 {
atomic.AddInt32(&linesCount, int32(countLines(t, r.Body)))
}
w.WriteHeader(http.StatusOK)
})
@ -569,8 +641,44 @@ func TestMaxRequestBodySize(t *testing.T) {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tt.expectedRequestCount, requestCount)
require.Equal(t, tt.expectedRequestCount, atomic.LoadInt32(&requestCount))
require.Equal(t, tt.expectedMetricLinesCount, atomic.LoadInt32(&linesCount))
}
})
}
}
func TestTryingToSendEmptyMetricsDoesntFail(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
require.NoError(t, err)
metrics := make([]telegraf.Metric, 0)
plugin := Default()
plugin.URL = u.String()
serializer, err := carbon2.NewSerializer(carbon2.Carbon2FormatFieldSeparate)
require.NoError(t, err)
plugin.SetSerializer(serializer)
err = plugin.Connect()
require.NoError(t, err)
err = plugin.Write(metrics)
require.NoError(t, err)
}
func countLines(t *testing.T, body io.Reader) int {
// All requests coming from Sumo Logic output plugin are gzipped.
gz, err := gzip.NewReader(body)
require.NoError(t, err)
var linesCount int
for s := bufio.NewScanner(gz); s.Scan(); {
linesCount++
}
return linesCount
}