diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index b8d64db8f..dbec69f95 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -27,6 +27,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/kinesis" _ "github.com/influxdata/telegraf/plugins/outputs/librato" _ "github.com/influxdata/telegraf/plugins/outputs/logzio" + _ "github.com/influxdata/telegraf/plugins/outputs/loki" _ "github.com/influxdata/telegraf/plugins/outputs/mqtt" _ "github.com/influxdata/telegraf/plugins/outputs/nats" _ "github.com/influxdata/telegraf/plugins/outputs/newrelic" diff --git a/plugins/outputs/loki/README.md b/plugins/outputs/loki/README.md new file mode 100644 index 000000000..9c48f95ba --- /dev/null +++ b/plugins/outputs/loki/README.md @@ -0,0 +1,34 @@ +# Loki Output Plugin + +This plugin sends logs to Loki, using tags as labels, +log line will content all fields in `key="value"` format which is easily parsable with `logfmt` parser in Loki. + +### Configuration: + +```toml +# A plugin that can transmit logs to Loki +[[outputs.loki]] + ## The domain of Loki + domain = "https://loki.domain.tld" + + ## Endpoint to write api + # endpoint = "/loki/api/v1/push" + + ## Connection timeout, defaults to "5s" if not set. + # timeout = "5s" + + ## Basic auth credential + # username = "loki" + # password = "pass" + + ## Additional HTTP headers + # http_headers = {"X-Scope-OrgID" = "1"} + + ## If the request must be gzip encoded + # gzip_request = false + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" +``` diff --git a/plugins/outputs/loki/loki.go b/plugins/outputs/loki/loki.go new file mode 100644 index 000000000..c097d21fd --- /dev/null +++ b/plugins/outputs/loki/loki.go @@ -0,0 +1,209 @@ +package loki + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/outputs" + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" +) + +const ( + defaultEndpoint = "/loki/api/v1/push" + defaultClientTimeout = 5 * time.Second +) + +var sampleConfig = ` + ## The domain of Loki + domain = "https://loki.domain.tld" + + ## Endpoint to write api + # endpoint = "/loki/api/v1/push" + + ## Connection timeout, defaults to "5s" if not set. + # timeout = "5s" + + ## Basic auth credential + # username = "loki" + # password = "pass" + + ## Additional HTTP headers + # http_headers = {"X-Scope-OrgID" = "1"} + + ## If the request must be gzip encoded + # gzip_request = false + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" +` + +type Loki struct { + Domain string `toml:"domain"` + Endpoint string `toml:"endpoint"` + Timeout internal.Duration `toml:"timeout"` + Username string `toml:"username"` + Password string `toml:"password"` + Headers map[string]string `toml:"headers"` + ClientID string `toml:"client_id"` + ClientSecret string `toml:"client_secret"` + TokenURL string `toml:"token_url"` + Scopes []string `toml:"scopes"` + GZipRequest bool `toml:"gzip_request"` + + url string + client *http.Client + tls.ClientConfig +} + +func (l *Loki) SampleConfig() string { + return sampleConfig +} + +func (l *Loki) Description() string { + return "Send logs to Loki" +} + +func (l *Loki) createClient(ctx context.Context) (*http.Client, error) { + tlsCfg, err := l.ClientConfig.TLSConfig() + if err != nil { + return nil, fmt.Errorf("tls config fail: %w", err) + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + Proxy: http.ProxyFromEnvironment, + }, + Timeout: l.Timeout.Duration, + } + + if l.ClientID != "" && l.ClientSecret != "" && l.TokenURL != "" { + oauthConfig := clientcredentials.Config{ + ClientID: l.ClientID, + ClientSecret: l.ClientSecret, + TokenURL: l.TokenURL, + Scopes: l.Scopes, + } + ctx = context.WithValue(ctx, oauth2.HTTPClient, client) + client = oauthConfig.Client(ctx) + } + + return client, nil +} + +func (l *Loki) Connect() (err error) { + if l.Domain == "" { + return fmt.Errorf("domain is required") + } + + if l.Endpoint == "" { + l.Endpoint = defaultEndpoint + } + + l.url = fmt.Sprintf("%s%s", l.Domain, l.Endpoint) + + if l.Timeout.Duration == 0 { + l.Timeout.Duration = defaultClientTimeout + } + + ctx := context.Background() + l.client, err = l.createClient(ctx) + if err != nil { + return fmt.Errorf("http client fail: %w", err) + } + + return +} + +func (l *Loki) Close() error { + l.client.CloseIdleConnections() + + return nil +} + +func (l *Loki) Write(metrics []telegraf.Metric) error { + s := Streams{} + + for _, m := range metrics { + tags := m.TagList() + var line string + + for _, f := range m.FieldList() { + line += fmt.Sprintf("%s=\"%v\" ", f.Key, f.Value) + } + + s.insertLog(tags, Log{fmt.Sprintf("%d", m.Time().UnixNano()), line}) + } + + return l.write(s) +} + +func (l *Loki) write(s Streams) error { + bs, err := json.Marshal(s) + if err != nil { + return fmt.Errorf("json.Marshal: %w", err) + } + + var reqBodyBuffer io.Reader = bytes.NewBuffer(bs) + + if l.GZipRequest { + rc, err := internal.CompressWithGzip(reqBodyBuffer) + if err != nil { + return err + } + defer rc.Close() + reqBodyBuffer = rc + } + + req, err := http.NewRequest(http.MethodPost, l.url, reqBodyBuffer) + if err != nil { + return err + } + + if l.Username != "" { + req.SetBasicAuth(l.Username, l.Password) + } + + for k, v := range l.Headers { + if strings.ToLower(k) == "host" { + req.Host = v + } + req.Header.Set(k, v) + } + + req.Header.Set("User-Agent", internal.ProductToken()) + req.Header.Set("Content-Type", "application/json") + if l.GZipRequest { + req.Header.Set("Content-Encoding", "gzip") + } + + resp, err := l.client.Do(req) + if err != nil { + return err + } + _ = resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("when writing to [%s] received status code: %d", l.url, resp.StatusCode) + } + + return nil +} + +func init() { + outputs.Add("loki", func() telegraf.Output { + return &Loki{} + }) +} diff --git a/plugins/outputs/loki/loki_test.go b/plugins/outputs/loki/loki_test.go new file mode 100644 index 000000000..1b8b61e34 --- /dev/null +++ b/plugins/outputs/loki/loki_test.go @@ -0,0 +1,356 @@ +package loki + +import ( + "compress/gzip" + "encoding/json" + "fmt" + "github.com/influxdata/telegraf/testutil" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/stretchr/testify/require" +) + +func getMetric() telegraf.Metric { + return testutil.MustMetric( + "log", + map[string]string{ + "key1": "value1", + }, + map[string]interface{}{ + "line": "my log", + "field": 3.14, + }, + time.Unix(123, 0), + ) +} + +func TestStatusCode(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + plugin *Loki + statusCode int + errFunc func(t *testing.T, err error) + }{ + { + name: "success", + plugin: &Loki{ + Domain: u.String(), + }, + statusCode: http.StatusNoContent, + errFunc: func(t *testing.T, err error) { + require.NoError(t, err) + }, + }, + { + name: "1xx status is an error", + plugin: &Loki{ + Domain: u.String(), + }, + statusCode: 103, + errFunc: func(t *testing.T, err error) { + require.Error(t, err) + }, + }, + { + name: "3xx status is an error", + plugin: &Loki{ + Domain: u.String(), + }, + statusCode: http.StatusMultipleChoices, + errFunc: func(t *testing.T, err error) { + require.Error(t, err) + }, + }, + { + name: "4xx status is an error", + plugin: &Loki{ + Domain: u.String(), + }, + statusCode: http.StatusMultipleChoices, + errFunc: func(t *testing.T, err error) { + require.Error(t, err) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.statusCode) + }) + + err = tt.plugin.Connect() + require.NoError(t, err) + + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) + tt.errFunc(t, err) + }) + } +} + +func TestContentType(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + plugin *Loki + expected string + }{ + { + name: "default is application/json", + plugin: &Loki{ + Domain: u.String(), + }, + expected: "application/json", + }, + { + name: "overwrite content_type", + plugin: &Loki{ + Domain: u.String(), + Headers: map[string]string{"Content-Type": "plain/text"}, + }, + // plugin force content-type + expected: "application/json", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, tt.expected, r.Header.Get("Content-Type")) + w.WriteHeader(http.StatusOK) + }) + + err = tt.plugin.Connect() + require.NoError(t, err) + + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) + require.NoError(t, err) + }) + } +} + +func TestContentEncodingGzip(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + plugin *Loki + expected string + }{ + { + name: "default is no content encoding", + plugin: &Loki{ + Domain: u.String(), + }, + expected: "", + }, + { + name: "overwrite content_encoding", + plugin: &Loki{ + Domain: u.String(), + GZipRequest: true, + }, + expected: "gzip", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, tt.expected, r.Header.Get("Content-Encoding")) + + body := r.Body + var err error + if r.Header.Get("Content-Encoding") == "gzip" { + body, err = gzip.NewReader(r.Body) + require.NoError(t, err) + } + + payload, err := ioutil.ReadAll(body) + require.NoError(t, err) + + var s Request + err = json.Unmarshal(payload, &s) + require.NoError(t, err) + require.Len(t, s.Streams, 1) + require.Len(t, s.Streams[0].Logs, 1) + require.Len(t, s.Streams[0].Logs[0], 2) + require.Equal(t, map[string]string{"key1": "value1"}, s.Streams[0].Labels) + require.Equal(t, "123000000000", s.Streams[0].Logs[0][0]) + require.Contains(t, s.Streams[0].Logs[0][1], "line=\"my log\"") + require.Contains(t, s.Streams[0].Logs[0][1], "field=\"3.14\"") + + w.WriteHeader(http.StatusNoContent) + }) + + err = tt.plugin.Connect() + require.NoError(t, err) + + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) + require.NoError(t, err) + }) + } +} + +func TestBasicAuth(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + plugin *Loki + }{ + { + name: "default", + plugin: &Loki{ + Domain: u.String(), + }, + }, + { + name: "username and password", + plugin: &Loki{ + Domain: u.String(), + Username: "username", + Password: "pa$$word", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + username, password, _ := r.BasicAuth() + require.Equal(t, tt.plugin.Username, username) + require.Equal(t, tt.plugin.Password, password) + w.WriteHeader(http.StatusOK) + }) + + err = tt.plugin.Connect() + require.NoError(t, err) + + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) + require.NoError(t, err) + }) + } +} + +type TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request) + +func TestOAuthClientCredentialsGrant(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + var token = "2YotnFZFEjr1zCsicMWpAA" + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + tests := []struct { + name string + plugin *Loki + tokenHandler TestHandlerFunc + handler TestHandlerFunc + }{ + { + name: "no credentials", + plugin: &Loki{ + Domain: u.String(), + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Len(t, r.Header["Authorization"], 0) + w.WriteHeader(http.StatusOK) + }, + }, + { + name: "success", + plugin: &Loki{ + Domain: u.String(), + ClientID: "howdy", + ClientSecret: "secret", + TokenURL: u.String() + "/token", + Scopes: []string{"urn:opc:idm:__myscopes__"}, + }, + tokenHandler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + values := url.Values{} + values.Add("access_token", token) + values.Add("token_type", "bearer") + values.Add("expires_in", "3600") + w.Write([]byte(values.Encode())) + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"]) + w.WriteHeader(http.StatusOK) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case defaultEndpoint: + tt.handler(t, w, r) + case "/token": + tt.tokenHandler(t, w, r) + } + }) + + err = tt.plugin.Connect() + require.NoError(t, err) + + err = tt.plugin.Write([]telegraf.Metric{getMetric()}) + require.NoError(t, err) + }) + } +} + +func TestDefaultUserAgent(t *testing.T) { + ts := httptest.NewServer(http.NotFoundHandler()) + defer ts.Close() + + u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String())) + require.NoError(t, err) + + t.Run("default-user-agent", func(t *testing.T) { + ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, internal.ProductToken(), r.Header.Get("User-Agent")) + w.WriteHeader(http.StatusOK) + }) + + client := &Loki{ + Domain: u.String(), + } + + err = client.Connect() + require.NoError(t, err) + + err = client.Write([]telegraf.Metric{getMetric()}) + require.NoError(t, err) + }) +} diff --git a/plugins/outputs/loki/stream.go b/plugins/outputs/loki/stream.go new file mode 100644 index 000000000..4f9f9c072 --- /dev/null +++ b/plugins/outputs/loki/stream.go @@ -0,0 +1,70 @@ +package loki + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/influxdata/telegraf" +) + +type ( + Log []string + + Streams map[string]*Stream + + Stream struct { + Labels map[string]string `json:"stream"` + Logs []Log `json:"values"` + } + + Request struct { + Streams []Stream `json:"streams"` + } +) + +func (s Streams) insertLog(ts []*telegraf.Tag, l Log) { + key := uniqKeyFromTagList(ts) + + if _, ok := s[key]; !ok { + s[key] = newStream(ts) + } + + s[key].Logs = append(s[key].Logs, l) +} + +func (s Streams) MarshalJSON() ([]byte, error) { + r := Request{ + Streams: make([]Stream, 0, len(s)), + } + + for _, stream := range s { + r.Streams = append(r.Streams, *stream) + } + + return json.Marshal(r) +} + +func uniqKeyFromTagList(ts []*telegraf.Tag) (k string) { + for _, t := range ts { + k += fmt.Sprintf("%s-%s-", + strings.ReplaceAll(t.Key, "-", "--"), + strings.ReplaceAll(t.Value, "-", "--"), + ) + } + + return k +} + +func newStream(ts []*telegraf.Tag) *Stream { + s := &Stream{ + Logs: make([]Log, 0), + Labels: map[string]string{}, + } + + for _, t := range ts { + s.Labels[t.Key] = t.Value + } + + return s +} diff --git a/plugins/outputs/loki/stream_test.go b/plugins/outputs/loki/stream_test.go new file mode 100644 index 000000000..7a47de5cc --- /dev/null +++ b/plugins/outputs/loki/stream_test.go @@ -0,0 +1,157 @@ +package loki + +import ( + "testing" + + "github.com/influxdata/telegraf" + "github.com/stretchr/testify/require" +) + +type tuple struct { + key, value string +} + +func generateLabelsAndTag(tt ...tuple) (map[string]string, []*telegraf.Tag) { + labels := map[string]string{} + var tags []*telegraf.Tag + + for _, t := range tt { + labels[t.key] = t.value + tags = append(tags, &telegraf.Tag{Key: t.key, Value: t.value}) + } + + return labels, tags +} + +func TestGenerateLabelsAndTag(t *testing.T) { + labels, tags := generateLabelsAndTag( + tuple{key: "key1", value: "value1"}, + tuple{key: "key2", value: "value2"}, + tuple{key: "key3", value: "value3"}, + ) + + expectedTags := []*telegraf.Tag{ + {Key: "key1", Value: "value1"}, + {Key: "key2", Value: "value2"}, + {Key: "key3", Value: "value3"}, + } + + require.Len(t, labels, 3) + require.Len(t, tags, 3) + require.Equal(t, map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"}, labels) + require.Equal(t, map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"}, labels) + require.Equal(t, expectedTags, tags) +} + +func TestStream_insertLog(t *testing.T) { + s := Streams{} + log1 := Log{"123", "this log isn't useful"} + log2 := Log{"124", "this log isn't useful neither"} + log3 := Log{"122", "again"} + + key1 := "key1-value1-key2-value2-key3-value3-" + labels1, tags1 := generateLabelsAndTag( + tuple{key: "key1", value: "value1"}, + tuple{key: "key2", value: "value2"}, + tuple{key: "key3", value: "value3"}, + ) + + key2 := "key2-value2-" + labels2, tags2 := generateLabelsAndTag( + tuple{key: "key2", value: "value2"}, + ) + + s.insertLog(tags1, log1) + + require.Len(t, s, 1) + require.Contains(t, s, key1) + require.Len(t, s[key1].Logs, 1) + require.Equal(t, labels1, s[key1].Labels) + require.Equal(t, "123", s[key1].Logs[0][0]) + require.Equal(t, "this log isn't useful", s[key1].Logs[0][1]) + + s.insertLog(tags1, log2) + + require.Len(t, s, 1) + require.Len(t, s[key1].Logs, 2) + require.Equal(t, "124", s[key1].Logs[1][0]) + require.Equal(t, "this log isn't useful neither", s[key1].Logs[1][1]) + + s.insertLog(tags2, log3) + + require.Len(t, s, 2) + require.Contains(t, s, key2) + require.Len(t, s[key2].Logs, 1) + require.Equal(t, labels2, s[key2].Labels) + require.Equal(t, "122", s[key2].Logs[0][0]) + require.Equal(t, "again", s[key2].Logs[0][1]) +} + +func TestUniqKeyFromTagList(t *testing.T) { + tests := []struct { + in []*telegraf.Tag + out string + }{ + { + in: []*telegraf.Tag{ + {Key: "key1", Value: "value1"}, + {Key: "key2", Value: "value2"}, + {Key: "key3", Value: "value3"}, + }, + out: "key1-value1-key2-value2-key3-value3-", + }, + { + in: []*telegraf.Tag{ + {Key: "key1", Value: "value1"}, + {Key: "key3", Value: "value3"}, + {Key: "key4", Value: "value4"}, + }, + out: "key1-value1-key3-value3-key4-value4-", + }, + { + in: []*telegraf.Tag{ + {Key: "target", Value: "local"}, + {Key: "host", Value: "host"}, + {Key: "service", Value: "dns"}, + }, + out: "target-local-host-host-service-dns-", + }, + { + in: []*telegraf.Tag{ + {Key: "target", Value: "localhost"}, + {Key: "hostservice", Value: "dns"}, + }, + out: "target-localhost-hostservice-dns-", + }, + { + in: []*telegraf.Tag{ + {Key: "target-local", Value: "host-"}, + }, + out: "target--local-host---", + }, + } + + for _, test := range tests { + require.Equal(t, test.out, uniqKeyFromTagList(test.in)) + } +} + +func Test_newStream(t *testing.T) { + labels, tags := generateLabelsAndTag( + tuple{key: "key1", value: "value1"}, + tuple{key: "key2", value: "value2"}, + tuple{key: "key3", value: "value3"}, + ) + + s := newStream(tags) + + require.Empty(t, s.Logs) + require.Equal(t, s.Labels, labels) +} + +func Test_newStream_noTag(t *testing.T) { + s := newStream(nil) + + require.Empty(t, s.Logs) + require.Empty(t, s.Labels) +}