A new Logz.io output plugin (#8202)

This commit is contained in:
Ido Halevi 2020-10-22 18:53:08 +03:00 committed by GitHub
parent e158255d9b
commit 9b23a04b69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 314 additions and 0 deletions

View File

@ -430,6 +430,7 @@ For documentation on the latest development code see the [documentation index][d
* [instrumental](./plugins/outputs/instrumental)
* [kafka](./plugins/outputs/kafka)
* [librato](./plugins/outputs/librato)
* [logz.io](./plugins/outputs/logzio)
* [mqtt](./plugins/outputs/mqtt)
* [nats](./plugins/outputs/nats)
* [newrelic](./plugins/outputs/newrelic)

View File

@ -25,6 +25,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"
_ "github.com/influxdata/telegraf/plugins/outputs/kinesis"
_ "github.com/influxdata/telegraf/plugins/outputs/librato"
_ "github.com/influxdata/telegraf/plugins/outputs/logzio"
_ "github.com/influxdata/telegraf/plugins/outputs/mqtt"
_ "github.com/influxdata/telegraf/plugins/outputs/nats"
_ "github.com/influxdata/telegraf/plugins/outputs/newrelic"

View File

@ -0,0 +1,43 @@
# Logz.io Output Plugin
This plugin sends metrics to Logz.io over HTTPs.
### Configuration:
```toml
# A plugin that can send metrics over HTTPs to Logz.io
[[outputs.logzio]]
## Set to true if Logz.io sender checks the disk space before adding metrics to the disk queue.
# check_disk_space = true
## The percent of used file system space at which the sender will stop queueing.
## When we will reach that percentage, the file system in which the queue is stored will drop
## all new logs until the percentage of used space drops below that threshold.
# disk_threshold = 98
## How often Logz.io sender should drain the queue.
## Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# drain_duration = "3s"
## Where Logz.io sender should store the queue
## queue_dir = Sprintf("%s%s%s%s%d", os.TempDir(), string(os.PathSeparator),
## "logzio-buffer", string(os.PathSeparator), time.Now().UnixNano())
## Logz.io account token
token = "your Logz.io token" # required
## Use your listener URL for your Logz.io account region.
# url = "https://listener.logz.io:8071"
```
### Required parameters:
* `token`: Your Logz.io token, which can be found under "settings" in your account.
### Optional parameters:
* `check_disk_space`: Set to true if Logz.io sender checks the disk space before adding metrics to the disk queue.
* `disk_threshold`: If the queue_dir space crosses this threshold (in % of disk usage), the plugin will start dropping logs.
* `drain_duration`: Time to sleep between sending attempts.
* `queue_dir`: Metrics disk path. All the unsent metrics are saved to the disk in this location.
* `url`: Logz.io listener URL.

View File

@ -0,0 +1,175 @@
package logzio
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
)
const (
defaultLogzioURL = "https://listener.logz.io:8071"
logzioDescription = "Send aggregate metrics to Logz.io"
logzioType = "telegraf"
)
var sampleConfig = `
## Connection timeout, defaults to "5s" if not set.
timeout = "5s"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Logz.io account token
token = "your logz.io token" # required
## Use your listener URL for your Logz.io account region.
# url = "https://listener.logz.io:8071"
`
type Logzio struct {
Log telegraf.Logger `toml:"-"`
Timeout internal.Duration `toml:"timeout"`
Token string `toml:"token"`
URL string `toml:"url"`
tls.ClientConfig
client *http.Client
}
type TimeSeries struct {
Series []*Metric
}
type Metric struct {
Metric map[string]interface{} `json:"metrics"`
Dimensions map[string]string `json:"dimensions"`
Time time.Time `json:"@timestamp"`
Type string `json:"type"`
}
// Connect to the Output
func (l *Logzio) Connect() error {
l.Log.Debug("Connecting to logz.io output...")
if l.Token == "" || l.Token == "your logz.io token" {
return fmt.Errorf("token is required")
}
tlsCfg, err := l.ClientConfig.TLSConfig()
if err != nil {
return err
}
l.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsCfg,
},
Timeout: l.Timeout.Duration,
}
return nil
}
// Close any connections to the Output
func (l *Logzio) Close() error {
l.Log.Debug("Closing logz.io output")
return nil
}
// Description returns a one-sentence description on the Output
func (l *Logzio) Description() string {
return logzioDescription
}
// SampleConfig returns the default configuration of the Output
func (l *Logzio) SampleConfig() string {
return sampleConfig
}
// Write takes in group of points to be written to the Output
func (l *Logzio) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}
var buff bytes.Buffer
gz := gzip.NewWriter(&buff)
for _, metric := range metrics {
m := l.parseMetric(metric)
serialized, err := json.Marshal(m)
if err != nil {
return fmt.Errorf("unable to marshal metric, %s\n", err.Error())
}
_, err = gz.Write(append(serialized, '\n'))
if err != nil {
return fmt.Errorf("unable to write gzip meric, %s\n", err.Error())
}
}
err := gz.Close()
if err != nil {
return fmt.Errorf("unable to close gzip, %s\n", err.Error())
}
return l.send(buff.Bytes())
}
func (l *Logzio) send(metrics []byte) error {
req, err := http.NewRequest("POST", l.authUrl(), bytes.NewBuffer(metrics))
if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
}
req.Header.Add("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")
resp, err := l.client.Do(req)
if err != nil {
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 209 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
}
return nil
}
func (l *Logzio) authUrl() string {
return fmt.Sprintf("%s/?token=%s", l.URL, l.Token)
}
func (l *Logzio) parseMetric(metric telegraf.Metric) *Metric {
return &Metric{
Metric: map[string]interface{}{
metric.Name(): metric.Fields(),
},
Dimensions: metric.Tags(),
Time: metric.Time(),
Type: logzioType,
}
}
func init() {
outputs.Add("logzio", func() telegraf.Output {
return &Logzio{
URL: defaultLogzioURL,
Timeout: internal.Duration{Duration: time.Second * 5},
}
})
}

View File

@ -0,0 +1,94 @@
package logzio
import (
"bytes"
"compress/gzip"
"encoding/json"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"io"
"net/http"
"net/http/httptest"
"testing"
)
const (
testToken = "123456789"
testURL = "https://logzio.com"
)
func TestConnetWithoutToken(t *testing.T) {
l := &Logzio{
URL: testURL,
Log: testutil.Logger{},
}
err := l.Connect()
require.Error(t, err)
}
func TestParseMetric(t *testing.T) {
l := &Logzio{}
for _, tm := range testutil.MockMetrics() {
lm := l.parseMetric(tm)
require.Equal(t, tm.Fields(), lm.Metric[tm.Name()])
require.Equal(t, logzioType, lm.Type)
require.Equal(t, tm.Tags(), lm.Dimensions)
require.Equal(t, tm.Time(), lm.Time)
}
}
func TestBadStatusCode(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer ts.Close()
l := &Logzio{
Token: testToken,
URL: ts.URL,
Log: testutil.Logger{},
}
err := l.Connect()
require.NoError(t, err)
err = l.Write(testutil.MockMetrics())
require.Error(t, err)
}
func TestWrite(t *testing.T) {
tm := testutil.TestMetric(float64(3.14), "test1")
var body bytes.Buffer
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gz, err := gzip.NewReader(r.Body)
require.NoError(t, err)
_, err = io.Copy(&body, gz)
require.NoError(t, err)
var lm Metric
err = json.Unmarshal(body.Bytes(), &lm)
require.NoError(t, err)
require.Equal(t, tm.Fields(), lm.Metric[tm.Name()])
require.Equal(t, logzioType, lm.Type)
require.Equal(t, tm.Tags(), lm.Dimensions)
require.Equal(t, tm.Time(), lm.Time)
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()
l := &Logzio{
Token: testToken,
URL: ts.URL,
Log: testutil.Logger{},
}
err := l.Connect()
require.NoError(t, err)
err = l.Write([]telegraf.Metric{tm})
require.NoError(t, err)
}