diff --git a/README.md b/README.md index 168db50fd..acaf9c181 100644 --- a/README.md +++ b/README.md @@ -430,6 +430,7 @@ For documentation on the latest development code see the [documentation index][d * [instrumental](./plugins/outputs/instrumental) * [kafka](./plugins/outputs/kafka) * [librato](./plugins/outputs/librato) +* [logz.io](./plugins/outputs/logzio) * [mqtt](./plugins/outputs/mqtt) * [nats](./plugins/outputs/nats) * [newrelic](./plugins/outputs/newrelic) diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index f81aa9d71..9d89976dd 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -25,6 +25,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/kafka" _ "github.com/influxdata/telegraf/plugins/outputs/kinesis" _ "github.com/influxdata/telegraf/plugins/outputs/librato" + _ "github.com/influxdata/telegraf/plugins/outputs/logzio" _ "github.com/influxdata/telegraf/plugins/outputs/mqtt" _ "github.com/influxdata/telegraf/plugins/outputs/nats" _ "github.com/influxdata/telegraf/plugins/outputs/newrelic" diff --git a/plugins/outputs/logzio/README.md b/plugins/outputs/logzio/README.md new file mode 100644 index 000000000..5cf61233e --- /dev/null +++ b/plugins/outputs/logzio/README.md @@ -0,0 +1,43 @@ +# Logz.io Output Plugin + +This plugin sends metrics to Logz.io over HTTPs. + +### Configuration: + +```toml +# A plugin that can send metrics over HTTPs to Logz.io +[[outputs.logzio]] + ## Set to true if Logz.io sender checks the disk space before adding metrics to the disk queue. + # check_disk_space = true + + ## The percent of used file system space at which the sender will stop queueing. + ## When we will reach that percentage, the file system in which the queue is stored will drop + ## all new logs until the percentage of used space drops below that threshold. + # disk_threshold = 98 + + ## How often Logz.io sender should drain the queue. + ## Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + # drain_duration = "3s" + + ## Where Logz.io sender should store the queue + ## queue_dir = Sprintf("%s%s%s%s%d", os.TempDir(), string(os.PathSeparator), + ## "logzio-buffer", string(os.PathSeparator), time.Now().UnixNano()) + + ## Logz.io account token + token = "your Logz.io token" # required + + ## Use your listener URL for your Logz.io account region. + # url = "https://listener.logz.io:8071" +``` + +### Required parameters: + +* `token`: Your Logz.io token, which can be found under "settings" in your account. + +### Optional parameters: + +* `check_disk_space`: Set to true if Logz.io sender checks the disk space before adding metrics to the disk queue. +* `disk_threshold`: If the queue_dir space crosses this threshold (in % of disk usage), the plugin will start dropping logs. +* `drain_duration`: Time to sleep between sending attempts. +* `queue_dir`: Metrics disk path. All the unsent metrics are saved to the disk in this location. +* `url`: Logz.io listener URL. \ No newline at end of file diff --git a/plugins/outputs/logzio/logzio.go b/plugins/outputs/logzio/logzio.go new file mode 100644 index 000000000..e46e9bf82 --- /dev/null +++ b/plugins/outputs/logzio/logzio.go @@ -0,0 +1,175 @@ +package logzio + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + + "net/http" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/outputs" +) + +const ( + defaultLogzioURL = "https://listener.logz.io:8071" + + logzioDescription = "Send aggregate metrics to Logz.io" + logzioType = "telegraf" +) + +var sampleConfig = ` + ## Connection timeout, defaults to "5s" if not set. + timeout = "5s" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Logz.io account token + token = "your logz.io token" # required + + ## Use your listener URL for your Logz.io account region. + # url = "https://listener.logz.io:8071" +` + +type Logzio struct { + Log telegraf.Logger `toml:"-"` + Timeout internal.Duration `toml:"timeout"` + Token string `toml:"token"` + URL string `toml:"url"` + + tls.ClientConfig + client *http.Client +} + +type TimeSeries struct { + Series []*Metric +} + +type Metric struct { + Metric map[string]interface{} `json:"metrics"` + Dimensions map[string]string `json:"dimensions"` + Time time.Time `json:"@timestamp"` + Type string `json:"type"` +} + +// Connect to the Output +func (l *Logzio) Connect() error { + l.Log.Debug("Connecting to logz.io output...") + + if l.Token == "" || l.Token == "your logz.io token" { + return fmt.Errorf("token is required") + } + + tlsCfg, err := l.ClientConfig.TLSConfig() + if err != nil { + return err + } + + l.client = &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsCfg, + }, + Timeout: l.Timeout.Duration, + } + + return nil +} + +// Close any connections to the Output +func (l *Logzio) Close() error { + l.Log.Debug("Closing logz.io output") + return nil +} + +// Description returns a one-sentence description on the Output +func (l *Logzio) Description() string { + return logzioDescription +} + +// SampleConfig returns the default configuration of the Output +func (l *Logzio) SampleConfig() string { + return sampleConfig +} + +// Write takes in group of points to be written to the Output +func (l *Logzio) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + var buff bytes.Buffer + gz := gzip.NewWriter(&buff) + for _, metric := range metrics { + m := l.parseMetric(metric) + + serialized, err := json.Marshal(m) + if err != nil { + return fmt.Errorf("unable to marshal metric, %s\n", err.Error()) + } + + _, err = gz.Write(append(serialized, '\n')) + if err != nil { + return fmt.Errorf("unable to write gzip meric, %s\n", err.Error()) + } + } + + err := gz.Close() + if err != nil { + return fmt.Errorf("unable to close gzip, %s\n", err.Error()) + } + + return l.send(buff.Bytes()) +} + +func (l *Logzio) send(metrics []byte) error { + req, err := http.NewRequest("POST", l.authUrl(), bytes.NewBuffer(metrics)) + if err != nil { + return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) + } + req.Header.Add("Content-Type", "application/json") + req.Header.Set("Content-Encoding", "gzip") + + resp, err := l.client.Do(req) + if err != nil { + return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode > 209 { + return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) + } + + return nil +} + +func (l *Logzio) authUrl() string { + return fmt.Sprintf("%s/?token=%s", l.URL, l.Token) +} + +func (l *Logzio) parseMetric(metric telegraf.Metric) *Metric { + return &Metric{ + Metric: map[string]interface{}{ + metric.Name(): metric.Fields(), + }, + Dimensions: metric.Tags(), + Time: metric.Time(), + Type: logzioType, + } +} + +func init() { + outputs.Add("logzio", func() telegraf.Output { + return &Logzio{ + URL: defaultLogzioURL, + Timeout: internal.Duration{Duration: time.Second * 5}, + } + }) +} diff --git a/plugins/outputs/logzio/logzio_test.go b/plugins/outputs/logzio/logzio_test.go new file mode 100644 index 000000000..074192e06 --- /dev/null +++ b/plugins/outputs/logzio/logzio_test.go @@ -0,0 +1,94 @@ +package logzio + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "io" + "net/http" + "net/http/httptest" + "testing" +) + +const ( + testToken = "123456789" + testURL = "https://logzio.com" +) + +func TestConnetWithoutToken(t *testing.T) { + l := &Logzio{ + URL: testURL, + Log: testutil.Logger{}, + } + err := l.Connect() + require.Error(t, err) +} + +func TestParseMetric(t *testing.T) { + l := &Logzio{} + for _, tm := range testutil.MockMetrics() { + lm := l.parseMetric(tm) + require.Equal(t, tm.Fields(), lm.Metric[tm.Name()]) + require.Equal(t, logzioType, lm.Type) + require.Equal(t, tm.Tags(), lm.Dimensions) + require.Equal(t, tm.Time(), lm.Time) + } +} + +func TestBadStatusCode(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer ts.Close() + + l := &Logzio{ + Token: testToken, + URL: ts.URL, + Log: testutil.Logger{}, + } + + err := l.Connect() + require.NoError(t, err) + + err = l.Write(testutil.MockMetrics()) + require.Error(t, err) +} + +func TestWrite(t *testing.T) { + tm := testutil.TestMetric(float64(3.14), "test1") + var body bytes.Buffer + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gz, err := gzip.NewReader(r.Body) + require.NoError(t, err) + + _, err = io.Copy(&body, gz) + require.NoError(t, err) + + var lm Metric + err = json.Unmarshal(body.Bytes(), &lm) + require.NoError(t, err) + + require.Equal(t, tm.Fields(), lm.Metric[tm.Name()]) + require.Equal(t, logzioType, lm.Type) + require.Equal(t, tm.Tags(), lm.Dimensions) + require.Equal(t, tm.Time(), lm.Time) + + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + l := &Logzio{ + Token: testToken, + URL: ts.URL, + Log: testutil.Logger{}, + } + + err := l.Connect() + require.NoError(t, err) + + err = l.Write([]telegraf.Metric{tm}) + require.NoError(t, err) +}