2020-09-10 22:56:35 +08:00
|
|
|
package sumologic
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"compress/gzip"
|
2020-09-25 04:13:37 +08:00
|
|
|
"log"
|
2020-09-10 22:56:35 +08:00
|
|
|
"net/http"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
|
|
|
|
|
|
"github.com/influxdata/telegraf"
|
|
|
|
|
"github.com/influxdata/telegraf/config"
|
|
|
|
|
"github.com/influxdata/telegraf/internal"
|
|
|
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
|
|
|
"github.com/influxdata/telegraf/plugins/serializers"
|
|
|
|
|
"github.com/influxdata/telegraf/plugins/serializers/carbon2"
|
|
|
|
|
"github.com/influxdata/telegraf/plugins/serializers/graphite"
|
|
|
|
|
"github.com/influxdata/telegraf/plugins/serializers/prometheus"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
sampleConfig = `
|
|
|
|
|
## Unique URL generated for your HTTP Metrics Source.
|
|
|
|
|
## This is the address to send metrics to.
|
|
|
|
|
# url = "https://events.sumologic.net/receiver/v1/http/<UniqueHTTPCollectorCode>"
|
|
|
|
|
|
|
|
|
|
## Data format to be used for sending metrics.
|
|
|
|
|
## This will set the "Content-Type" header accordingly.
|
|
|
|
|
## Currently supported formats:
|
|
|
|
|
## * graphite - for Content-Type of application/vnd.sumologic.graphite
|
|
|
|
|
## * carbon2 - for Content-Type of application/vnd.sumologic.carbon2
|
|
|
|
|
## * prometheus - for Content-Type of application/vnd.sumologic.prometheus
|
|
|
|
|
##
|
|
|
|
|
## More information can be found at:
|
|
|
|
|
## https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Metrics-to-an-HTTP-Source#content-type-headers-for-metrics
|
|
|
|
|
##
|
|
|
|
|
## NOTE:
|
|
|
|
|
## When unset, telegraf will by default use the influx serializer which is currently unsupported
|
|
|
|
|
## in HTTP Source.
|
|
|
|
|
data_format = "carbon2"
|
|
|
|
|
|
|
|
|
|
## Timeout used for HTTP request
|
|
|
|
|
# timeout = "5s"
|
|
|
|
|
|
|
|
|
|
## Max HTTP request body size in bytes before compression (if applied).
|
|
|
|
|
## By default 1MB is recommended.
|
|
|
|
|
## NOTE:
|
|
|
|
|
## Bear in mind that in some serializer a metric even though serialized to multiple
|
|
|
|
|
## lines cannot be split any further so setting this very low might not work
|
|
|
|
|
## as expected.
|
2020-10-22 02:43:24 +08:00
|
|
|
# max_request_body_size = 1000000
|
2020-09-10 22:56:35 +08:00
|
|
|
|
|
|
|
|
## Additional, Sumo specific options.
|
|
|
|
|
## Full list can be found here:
|
|
|
|
|
## https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Metrics-to-an-HTTP-Source#supported-http-headers
|
|
|
|
|
|
|
|
|
|
## Desired source name.
|
|
|
|
|
## Useful if you want to override the source name configured for the source.
|
|
|
|
|
# source_name = ""
|
|
|
|
|
|
|
|
|
|
## Desired host name.
|
|
|
|
|
## Useful if you want to override the source host configured for the source.
|
|
|
|
|
# source_host = ""
|
|
|
|
|
|
|
|
|
|
## Desired source category.
|
|
|
|
|
## Useful if you want to override the source category configured for the source.
|
|
|
|
|
# source_category = ""
|
|
|
|
|
|
|
|
|
|
## Comma-separated key=value list of dimensions to apply to every metric.
|
|
|
|
|
## Custom dimensions will allow you to query your metrics at a more granular level.
|
|
|
|
|
# dimensions = ""
|
|
|
|
|
`
|
|
|
|
|
|
|
|
|
|
defaultClientTimeout = 5 * time.Second
|
|
|
|
|
defaultMethod = http.MethodPost
|
2021-01-07 06:23:29 +08:00
|
|
|
defaultMaxRequestBodySize = 1000000
|
2020-09-10 22:56:35 +08:00
|
|
|
|
|
|
|
|
contentTypeHeader = "Content-Type"
|
|
|
|
|
carbon2ContentType = "application/vnd.sumologic.carbon2"
|
|
|
|
|
graphiteContentType = "application/vnd.sumologic.graphite"
|
|
|
|
|
prometheusContentType = "application/vnd.sumologic.prometheus"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type header string
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
sourceNameHeader header = `X-Sumo-Name`
|
|
|
|
|
sourceHostHeader header = `X-Sumo-Host`
|
|
|
|
|
sourceCategoryHeader header = `X-Sumo-Category`
|
|
|
|
|
dimensionsHeader header = `X-Sumo-Dimensions`
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type SumoLogic struct {
|
|
|
|
|
URL string `toml:"url"`
|
|
|
|
|
Timeout internal.Duration `toml:"timeout"`
|
|
|
|
|
MaxRequstBodySize config.Size `toml:"max_request_body_size"`
|
|
|
|
|
|
|
|
|
|
SourceName string `toml:"source_name"`
|
|
|
|
|
SourceHost string `toml:"source_host"`
|
|
|
|
|
SourceCategory string `toml:"source_category"`
|
|
|
|
|
Dimensions string `toml:"dimensions"`
|
|
|
|
|
|
2020-09-25 04:13:37 +08:00
|
|
|
Log telegraf.Logger `toml:"-"`
|
|
|
|
|
|
2020-09-10 22:56:35 +08:00
|
|
|
client *http.Client
|
|
|
|
|
serializer serializers.Serializer
|
|
|
|
|
|
|
|
|
|
err error
|
|
|
|
|
headers map[string]string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *SumoLogic) SetSerializer(serializer serializers.Serializer) {
|
|
|
|
|
if s.headers == nil {
|
|
|
|
|
s.headers = make(map[string]string)
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-30 02:00:33 +08:00
|
|
|
switch sr := serializer.(type) {
|
2020-09-10 22:56:35 +08:00
|
|
|
case *carbon2.Serializer:
|
|
|
|
|
s.headers[contentTypeHeader] = carbon2ContentType
|
2020-09-30 02:00:33 +08:00
|
|
|
|
|
|
|
|
// In case Carbon2 is used and the metrics format was unset, default to
|
|
|
|
|
// include field in metric name.
|
|
|
|
|
if sr.IsMetricsFormatUnset() {
|
|
|
|
|
sr.SetMetricsFormat(carbon2.Carbon2FormatMetricIncludesField)
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-10 22:56:35 +08:00
|
|
|
case *graphite.GraphiteSerializer:
|
|
|
|
|
s.headers[contentTypeHeader] = graphiteContentType
|
2020-09-30 02:00:33 +08:00
|
|
|
|
2020-09-10 22:56:35 +08:00
|
|
|
case *prometheus.Serializer:
|
|
|
|
|
s.headers[contentTypeHeader] = prometheusContentType
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
s.err = errors.Errorf("unsupported serializer %T", serializer)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.serializer = serializer
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-23 01:21:36 +08:00
|
|
|
func (s *SumoLogic) createClient() *http.Client {
|
2020-09-10 22:56:35 +08:00
|
|
|
return &http.Client{
|
|
|
|
|
Transport: &http.Transport{
|
|
|
|
|
Proxy: http.ProxyFromEnvironment,
|
|
|
|
|
},
|
|
|
|
|
Timeout: s.Timeout.Duration,
|
2021-03-23 01:21:36 +08:00
|
|
|
}
|
2020-09-10 22:56:35 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *SumoLogic) Connect() error {
|
|
|
|
|
if s.err != nil {
|
|
|
|
|
return errors.Wrap(s.err, "sumologic: incorrect configuration")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if s.Timeout.Duration == 0 {
|
|
|
|
|
s.Timeout.Duration = defaultClientTimeout
|
|
|
|
|
}
|
|
|
|
|
|
2021-03-23 01:21:36 +08:00
|
|
|
s.client = s.createClient()
|
2020-09-10 22:56:35 +08:00
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *SumoLogic) Close() error {
|
|
|
|
|
return s.err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *SumoLogic) Description() string {
|
|
|
|
|
return "A plugin that can transmit metrics to Sumo Logic HTTP Source"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *SumoLogic) SampleConfig() string {
|
|
|
|
|
return sampleConfig
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
|
|
|
|
|
if s.err != nil {
|
|
|
|
|
return errors.Wrap(s.err, "sumologic: incorrect configuration")
|
|
|
|
|
}
|
|
|
|
|
if s.serializer == nil {
|
|
|
|
|
return errors.New("sumologic: serializer unset")
|
|
|
|
|
}
|
2020-09-25 04:13:37 +08:00
|
|
|
if len(metrics) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2020-09-10 22:56:35 +08:00
|
|
|
|
|
|
|
|
reqBody, err := s.serializer.SerializeBatch(metrics)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if l := len(reqBody); l > int(s.MaxRequstBodySize) {
|
2020-09-25 04:13:37 +08:00
|
|
|
chunks, err := s.splitIntoChunks(metrics)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
2020-09-10 22:56:35 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return s.writeRequestChunks(chunks)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return s.write(reqBody)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error {
|
|
|
|
|
for _, reqChunk := range chunks {
|
|
|
|
|
if err := s.write(reqChunk); err != nil {
|
2020-09-25 04:13:37 +08:00
|
|
|
s.Log.Errorf("Error sending chunk: %v", err)
|
2020-09-10 22:56:35 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *SumoLogic) write(reqBody []byte) error {
|
|
|
|
|
var (
|
|
|
|
|
err error
|
|
|
|
|
buff bytes.Buffer
|
|
|
|
|
gz = gzip.NewWriter(&buff)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if _, err = gz.Write(reqBody); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err = gz.Close(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2020-10-15 00:11:23 +08:00
|
|
|
req, err := http.NewRequest(defaultMethod, s.URL, &buff)
|
2020-09-10 22:56:35 +08:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
req.Header.Set("Content-Encoding", "gzip")
|
|
|
|
|
req.Header.Set("User-Agent", internal.ProductToken())
|
|
|
|
|
|
|
|
|
|
// Set headers coming from the configuration.
|
|
|
|
|
for k, v := range s.headers {
|
|
|
|
|
req.Header.Set(k, v)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHeaderIfSetInConfig(req, sourceNameHeader, s.SourceName)
|
|
|
|
|
setHeaderIfSetInConfig(req, sourceHostHeader, s.SourceHost)
|
|
|
|
|
setHeaderIfSetInConfig(req, sourceCategoryHeader, s.SourceCategory)
|
|
|
|
|
setHeaderIfSetInConfig(req, dimensionsHeader, s.Dimensions)
|
|
|
|
|
|
|
|
|
|
resp, err := s.client.Do(req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrapf(err, "sumologic: failed sending request to [%s]", s.URL)
|
|
|
|
|
}
|
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
|
|
|
return errors.Errorf(
|
|
|
|
|
"sumologic: when writing to [%s] received status code: %d",
|
|
|
|
|
s.URL, resp.StatusCode,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-25 04:13:37 +08:00
|
|
|
// splitIntoChunks splits metrics to be sent into chunks so that every request
|
|
|
|
|
// is smaller than s.MaxRequstBodySize unless it was configured so small so that
|
|
|
|
|
// even a single metric cannot fit.
|
|
|
|
|
// In such a situation metrics will be sent one by one with a warning being logged
|
|
|
|
|
// for every request sent even though they don't fit in s.MaxRequstBodySize bytes.
|
|
|
|
|
func (s *SumoLogic) splitIntoChunks(metrics []telegraf.Metric) ([][]byte, error) {
|
|
|
|
|
var (
|
|
|
|
|
numMetrics = len(metrics)
|
|
|
|
|
chunks = make([][]byte, 0)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for i := 0; i < numMetrics; {
|
|
|
|
|
var toAppend []byte
|
|
|
|
|
for i < numMetrics {
|
|
|
|
|
chunkBody, err := s.serializer.Serialize(metrics[i])
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
la := len(toAppend)
|
|
|
|
|
if la != 0 {
|
|
|
|
|
// We already have something to append ...
|
|
|
|
|
if la+len(chunkBody) > int(s.MaxRequstBodySize) {
|
|
|
|
|
// ... and it's just the right size, without currently processed chunk.
|
|
|
|
|
break
|
|
|
|
|
} else {
|
|
|
|
|
// ... we can try appending more.
|
|
|
|
|
i++
|
|
|
|
|
toAppend = append(toAppend, chunkBody...)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
} else { // la == 0
|
|
|
|
|
i++
|
|
|
|
|
toAppend = chunkBody
|
|
|
|
|
|
|
|
|
|
if len(chunkBody) > int(s.MaxRequstBodySize) {
|
|
|
|
|
log.Printf(
|
|
|
|
|
"W! [SumoLogic] max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split",
|
|
|
|
|
s.MaxRequstBodySize, len(chunkBody),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// The serialized metric is too big but we have no choice
|
|
|
|
|
// but to send it.
|
|
|
|
|
// max_request_body_size was set so small that it wouldn't
|
|
|
|
|
// even accomodate a single metric.
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if toAppend == nil {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
chunks = append(chunks, toAppend)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return chunks, nil
|
|
|
|
|
}
|
|
|
|
|
|
2020-09-10 22:56:35 +08:00
|
|
|
func setHeaderIfSetInConfig(r *http.Request, h header, value string) {
|
|
|
|
|
if value != "" {
|
|
|
|
|
r.Header.Set(string(h), value)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func Default() *SumoLogic {
|
|
|
|
|
return &SumoLogic{
|
|
|
|
|
Timeout: internal.Duration{
|
|
|
|
|
Duration: defaultClientTimeout,
|
|
|
|
|
},
|
|
|
|
|
MaxRequstBodySize: defaultMaxRequestBodySize,
|
|
|
|
|
headers: make(map[string]string),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
outputs.Add("sumologic", func() telegraf.Output {
|
|
|
|
|
return Default()
|
|
|
|
|
})
|
|
|
|
|
}
|