plugin: output loki (#8450)
This commit is contained in:
parent
cf9ae34d03
commit
c17cc8cabb
|
|
@ -27,6 +27,7 @@ import (
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/kinesis"
|
_ "github.com/influxdata/telegraf/plugins/outputs/kinesis"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/librato"
|
_ "github.com/influxdata/telegraf/plugins/outputs/librato"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/logzio"
|
_ "github.com/influxdata/telegraf/plugins/outputs/logzio"
|
||||||
|
_ "github.com/influxdata/telegraf/plugins/outputs/loki"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/mqtt"
|
_ "github.com/influxdata/telegraf/plugins/outputs/mqtt"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/nats"
|
_ "github.com/influxdata/telegraf/plugins/outputs/nats"
|
||||||
_ "github.com/influxdata/telegraf/plugins/outputs/newrelic"
|
_ "github.com/influxdata/telegraf/plugins/outputs/newrelic"
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,34 @@
|
||||||
|
# Loki Output Plugin
|
||||||
|
|
||||||
|
This plugin sends logs to Loki, using tags as labels,
|
||||||
|
log line will content all fields in `key="value"` format which is easily parsable with `logfmt` parser in Loki.
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# A plugin that can transmit logs to Loki
|
||||||
|
[[outputs.loki]]
|
||||||
|
## The domain of Loki
|
||||||
|
domain = "https://loki.domain.tld"
|
||||||
|
|
||||||
|
## Endpoint to write api
|
||||||
|
# endpoint = "/loki/api/v1/push"
|
||||||
|
|
||||||
|
## Connection timeout, defaults to "5s" if not set.
|
||||||
|
# timeout = "5s"
|
||||||
|
|
||||||
|
## Basic auth credential
|
||||||
|
# username = "loki"
|
||||||
|
# password = "pass"
|
||||||
|
|
||||||
|
## Additional HTTP headers
|
||||||
|
# http_headers = {"X-Scope-OrgID" = "1"}
|
||||||
|
|
||||||
|
## If the request must be gzip encoded
|
||||||
|
# gzip_request = false
|
||||||
|
|
||||||
|
## Optional TLS Config
|
||||||
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
```
|
||||||
|
|
@ -0,0 +1,209 @@
|
||||||
|
package loki
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
|
"golang.org/x/oauth2"
|
||||||
|
"golang.org/x/oauth2/clientcredentials"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultEndpoint = "/loki/api/v1/push"
|
||||||
|
defaultClientTimeout = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
var sampleConfig = `
|
||||||
|
## The domain of Loki
|
||||||
|
domain = "https://loki.domain.tld"
|
||||||
|
|
||||||
|
## Endpoint to write api
|
||||||
|
# endpoint = "/loki/api/v1/push"
|
||||||
|
|
||||||
|
## Connection timeout, defaults to "5s" if not set.
|
||||||
|
# timeout = "5s"
|
||||||
|
|
||||||
|
## Basic auth credential
|
||||||
|
# username = "loki"
|
||||||
|
# password = "pass"
|
||||||
|
|
||||||
|
## Additional HTTP headers
|
||||||
|
# http_headers = {"X-Scope-OrgID" = "1"}
|
||||||
|
|
||||||
|
## If the request must be gzip encoded
|
||||||
|
# gzip_request = false
|
||||||
|
|
||||||
|
## Optional TLS Config
|
||||||
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
`
|
||||||
|
|
||||||
|
type Loki struct {
|
||||||
|
Domain string `toml:"domain"`
|
||||||
|
Endpoint string `toml:"endpoint"`
|
||||||
|
Timeout internal.Duration `toml:"timeout"`
|
||||||
|
Username string `toml:"username"`
|
||||||
|
Password string `toml:"password"`
|
||||||
|
Headers map[string]string `toml:"headers"`
|
||||||
|
ClientID string `toml:"client_id"`
|
||||||
|
ClientSecret string `toml:"client_secret"`
|
||||||
|
TokenURL string `toml:"token_url"`
|
||||||
|
Scopes []string `toml:"scopes"`
|
||||||
|
GZipRequest bool `toml:"gzip_request"`
|
||||||
|
|
||||||
|
url string
|
||||||
|
client *http.Client
|
||||||
|
tls.ClientConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Loki) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Loki) Description() string {
|
||||||
|
return "Send logs to Loki"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Loki) createClient(ctx context.Context) (*http.Client, error) {
|
||||||
|
tlsCfg, err := l.ClientConfig.TLSConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("tls config fail: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
TLSClientConfig: tlsCfg,
|
||||||
|
Proxy: http.ProxyFromEnvironment,
|
||||||
|
},
|
||||||
|
Timeout: l.Timeout.Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
if l.ClientID != "" && l.ClientSecret != "" && l.TokenURL != "" {
|
||||||
|
oauthConfig := clientcredentials.Config{
|
||||||
|
ClientID: l.ClientID,
|
||||||
|
ClientSecret: l.ClientSecret,
|
||||||
|
TokenURL: l.TokenURL,
|
||||||
|
Scopes: l.Scopes,
|
||||||
|
}
|
||||||
|
ctx = context.WithValue(ctx, oauth2.HTTPClient, client)
|
||||||
|
client = oauthConfig.Client(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Loki) Connect() (err error) {
|
||||||
|
if l.Domain == "" {
|
||||||
|
return fmt.Errorf("domain is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if l.Endpoint == "" {
|
||||||
|
l.Endpoint = defaultEndpoint
|
||||||
|
}
|
||||||
|
|
||||||
|
l.url = fmt.Sprintf("%s%s", l.Domain, l.Endpoint)
|
||||||
|
|
||||||
|
if l.Timeout.Duration == 0 {
|
||||||
|
l.Timeout.Duration = defaultClientTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
l.client, err = l.createClient(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("http client fail: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Loki) Close() error {
|
||||||
|
l.client.CloseIdleConnections()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Loki) Write(metrics []telegraf.Metric) error {
|
||||||
|
s := Streams{}
|
||||||
|
|
||||||
|
for _, m := range metrics {
|
||||||
|
tags := m.TagList()
|
||||||
|
var line string
|
||||||
|
|
||||||
|
for _, f := range m.FieldList() {
|
||||||
|
line += fmt.Sprintf("%s=\"%v\" ", f.Key, f.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.insertLog(tags, Log{fmt.Sprintf("%d", m.Time().UnixNano()), line})
|
||||||
|
}
|
||||||
|
|
||||||
|
return l.write(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Loki) write(s Streams) error {
|
||||||
|
bs, err := json.Marshal(s)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("json.Marshal: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var reqBodyBuffer io.Reader = bytes.NewBuffer(bs)
|
||||||
|
|
||||||
|
if l.GZipRequest {
|
||||||
|
rc, err := internal.CompressWithGzip(reqBodyBuffer)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer rc.Close()
|
||||||
|
reqBodyBuffer = rc
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodPost, l.url, reqBodyBuffer)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if l.Username != "" {
|
||||||
|
req.SetBasicAuth(l.Username, l.Password)
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, v := range l.Headers {
|
||||||
|
if strings.ToLower(k) == "host" {
|
||||||
|
req.Host = v
|
||||||
|
}
|
||||||
|
req.Header.Set(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("User-Agent", internal.ProductToken())
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
if l.GZipRequest {
|
||||||
|
req.Header.Set("Content-Encoding", "gzip")
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := l.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
|
return fmt.Errorf("when writing to [%s] received status code: %d", l.url, resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
outputs.Add("loki", func() telegraf.Output {
|
||||||
|
return &Loki{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,356 @@
|
||||||
|
package loki
|
||||||
|
|
||||||
|
import (
|
||||||
|
"compress/gzip"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getMetric() telegraf.Metric {
|
||||||
|
return testutil.MustMetric(
|
||||||
|
"log",
|
||||||
|
map[string]string{
|
||||||
|
"key1": "value1",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"line": "my log",
|
||||||
|
"field": 3.14,
|
||||||
|
},
|
||||||
|
time.Unix(123, 0),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatusCode(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
plugin *Loki
|
||||||
|
statusCode int
|
||||||
|
errFunc func(t *testing.T, err error)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "success",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
},
|
||||||
|
statusCode: http.StatusNoContent,
|
||||||
|
errFunc: func(t *testing.T, err error) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "1xx status is an error",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
},
|
||||||
|
statusCode: 103,
|
||||||
|
errFunc: func(t *testing.T, err error) {
|
||||||
|
require.Error(t, err)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "3xx status is an error",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
},
|
||||||
|
statusCode: http.StatusMultipleChoices,
|
||||||
|
errFunc: func(t *testing.T, err error) {
|
||||||
|
require.Error(t, err)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "4xx status is an error",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
},
|
||||||
|
statusCode: http.StatusMultipleChoices,
|
||||||
|
errFunc: func(t *testing.T, err error) {
|
||||||
|
require.Error(t, err)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(tt.statusCode)
|
||||||
|
})
|
||||||
|
|
||||||
|
err = tt.plugin.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
||||||
|
tt.errFunc(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestContentType(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
plugin *Loki
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "default is application/json",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
},
|
||||||
|
expected: "application/json",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "overwrite content_type",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
Headers: map[string]string{"Content-Type": "plain/text"},
|
||||||
|
},
|
||||||
|
// plugin force content-type
|
||||||
|
expected: "application/json",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
require.Equal(t, tt.expected, r.Header.Get("Content-Type"))
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
})
|
||||||
|
|
||||||
|
err = tt.plugin.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestContentEncodingGzip(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
plugin *Loki
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "default is no content encoding",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
},
|
||||||
|
expected: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "overwrite content_encoding",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
GZipRequest: true,
|
||||||
|
},
|
||||||
|
expected: "gzip",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
require.Equal(t, tt.expected, r.Header.Get("Content-Encoding"))
|
||||||
|
|
||||||
|
body := r.Body
|
||||||
|
var err error
|
||||||
|
if r.Header.Get("Content-Encoding") == "gzip" {
|
||||||
|
body, err = gzip.NewReader(r.Body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
payload, err := ioutil.ReadAll(body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var s Request
|
||||||
|
err = json.Unmarshal(payload, &s)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, s.Streams, 1)
|
||||||
|
require.Len(t, s.Streams[0].Logs, 1)
|
||||||
|
require.Len(t, s.Streams[0].Logs[0], 2)
|
||||||
|
require.Equal(t, map[string]string{"key1": "value1"}, s.Streams[0].Labels)
|
||||||
|
require.Equal(t, "123000000000", s.Streams[0].Logs[0][0])
|
||||||
|
require.Contains(t, s.Streams[0].Logs[0][1], "line=\"my log\"")
|
||||||
|
require.Contains(t, s.Streams[0].Logs[0][1], "field=\"3.14\"")
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
})
|
||||||
|
|
||||||
|
err = tt.plugin.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBasicAuth(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
plugin *Loki
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "default",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "username and password",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
Username: "username",
|
||||||
|
Password: "pa$$word",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
username, password, _ := r.BasicAuth()
|
||||||
|
require.Equal(t, tt.plugin.Username, username)
|
||||||
|
require.Equal(t, tt.plugin.Password, password)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
})
|
||||||
|
|
||||||
|
err = tt.plugin.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
|
||||||
|
|
||||||
|
func TestOAuthClientCredentialsGrant(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
var token = "2YotnFZFEjr1zCsicMWpAA"
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
plugin *Loki
|
||||||
|
tokenHandler TestHandlerFunc
|
||||||
|
handler TestHandlerFunc
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no credentials",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
},
|
||||||
|
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
require.Len(t, r.Header["Authorization"], 0)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "success",
|
||||||
|
plugin: &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
ClientID: "howdy",
|
||||||
|
ClientSecret: "secret",
|
||||||
|
TokenURL: u.String() + "/token",
|
||||||
|
Scopes: []string{"urn:opc:idm:__myscopes__"},
|
||||||
|
},
|
||||||
|
tokenHandler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
values := url.Values{}
|
||||||
|
values.Add("access_token", token)
|
||||||
|
values.Add("token_type", "bearer")
|
||||||
|
values.Add("expires_in", "3600")
|
||||||
|
w.Write([]byte(values.Encode()))
|
||||||
|
},
|
||||||
|
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||||
|
require.Equal(t, []string{"Bearer " + token}, r.Header["Authorization"])
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch r.URL.Path {
|
||||||
|
case defaultEndpoint:
|
||||||
|
tt.handler(t, w, r)
|
||||||
|
case "/token":
|
||||||
|
tt.tokenHandler(t, w, r)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
err = tt.plugin.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDefaultUserAgent(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Run("default-user-agent", func(t *testing.T) {
|
||||||
|
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
require.Equal(t, internal.ProductToken(), r.Header.Get("User-Agent"))
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
})
|
||||||
|
|
||||||
|
client := &Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
}
|
||||||
|
|
||||||
|
err = client.Connect()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = client.Write([]telegraf.Metric{getMetric()})
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,70 @@
|
||||||
|
package loki
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
Log []string
|
||||||
|
|
||||||
|
Streams map[string]*Stream
|
||||||
|
|
||||||
|
Stream struct {
|
||||||
|
Labels map[string]string `json:"stream"`
|
||||||
|
Logs []Log `json:"values"`
|
||||||
|
}
|
||||||
|
|
||||||
|
Request struct {
|
||||||
|
Streams []Stream `json:"streams"`
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s Streams) insertLog(ts []*telegraf.Tag, l Log) {
|
||||||
|
key := uniqKeyFromTagList(ts)
|
||||||
|
|
||||||
|
if _, ok := s[key]; !ok {
|
||||||
|
s[key] = newStream(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
s[key].Logs = append(s[key].Logs, l)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s Streams) MarshalJSON() ([]byte, error) {
|
||||||
|
r := Request{
|
||||||
|
Streams: make([]Stream, 0, len(s)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, stream := range s {
|
||||||
|
r.Streams = append(r.Streams, *stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func uniqKeyFromTagList(ts []*telegraf.Tag) (k string) {
|
||||||
|
for _, t := range ts {
|
||||||
|
k += fmt.Sprintf("%s-%s-",
|
||||||
|
strings.ReplaceAll(t.Key, "-", "--"),
|
||||||
|
strings.ReplaceAll(t.Value, "-", "--"),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
return k
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStream(ts []*telegraf.Tag) *Stream {
|
||||||
|
s := &Stream{
|
||||||
|
Logs: make([]Log, 0),
|
||||||
|
Labels: map[string]string{},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, t := range ts {
|
||||||
|
s.Labels[t.Key] = t.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,157 @@
|
||||||
|
package loki
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type tuple struct {
|
||||||
|
key, value string
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateLabelsAndTag(tt ...tuple) (map[string]string, []*telegraf.Tag) {
|
||||||
|
labels := map[string]string{}
|
||||||
|
var tags []*telegraf.Tag
|
||||||
|
|
||||||
|
for _, t := range tt {
|
||||||
|
labels[t.key] = t.value
|
||||||
|
tags = append(tags, &telegraf.Tag{Key: t.key, Value: t.value})
|
||||||
|
}
|
||||||
|
|
||||||
|
return labels, tags
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGenerateLabelsAndTag(t *testing.T) {
|
||||||
|
labels, tags := generateLabelsAndTag(
|
||||||
|
tuple{key: "key1", value: "value1"},
|
||||||
|
tuple{key: "key2", value: "value2"},
|
||||||
|
tuple{key: "key3", value: "value3"},
|
||||||
|
)
|
||||||
|
|
||||||
|
expectedTags := []*telegraf.Tag{
|
||||||
|
{Key: "key1", Value: "value1"},
|
||||||
|
{Key: "key2", Value: "value2"},
|
||||||
|
{Key: "key3", Value: "value3"},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, labels, 3)
|
||||||
|
require.Len(t, tags, 3)
|
||||||
|
require.Equal(t, map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"}, labels)
|
||||||
|
require.Equal(t, map[string]string{"key1": "value1", "key2": "value2", "key3": "value3"}, labels)
|
||||||
|
require.Equal(t, expectedTags, tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStream_insertLog(t *testing.T) {
|
||||||
|
s := Streams{}
|
||||||
|
log1 := Log{"123", "this log isn't useful"}
|
||||||
|
log2 := Log{"124", "this log isn't useful neither"}
|
||||||
|
log3 := Log{"122", "again"}
|
||||||
|
|
||||||
|
key1 := "key1-value1-key2-value2-key3-value3-"
|
||||||
|
labels1, tags1 := generateLabelsAndTag(
|
||||||
|
tuple{key: "key1", value: "value1"},
|
||||||
|
tuple{key: "key2", value: "value2"},
|
||||||
|
tuple{key: "key3", value: "value3"},
|
||||||
|
)
|
||||||
|
|
||||||
|
key2 := "key2-value2-"
|
||||||
|
labels2, tags2 := generateLabelsAndTag(
|
||||||
|
tuple{key: "key2", value: "value2"},
|
||||||
|
)
|
||||||
|
|
||||||
|
s.insertLog(tags1, log1)
|
||||||
|
|
||||||
|
require.Len(t, s, 1)
|
||||||
|
require.Contains(t, s, key1)
|
||||||
|
require.Len(t, s[key1].Logs, 1)
|
||||||
|
require.Equal(t, labels1, s[key1].Labels)
|
||||||
|
require.Equal(t, "123", s[key1].Logs[0][0])
|
||||||
|
require.Equal(t, "this log isn't useful", s[key1].Logs[0][1])
|
||||||
|
|
||||||
|
s.insertLog(tags1, log2)
|
||||||
|
|
||||||
|
require.Len(t, s, 1)
|
||||||
|
require.Len(t, s[key1].Logs, 2)
|
||||||
|
require.Equal(t, "124", s[key1].Logs[1][0])
|
||||||
|
require.Equal(t, "this log isn't useful neither", s[key1].Logs[1][1])
|
||||||
|
|
||||||
|
s.insertLog(tags2, log3)
|
||||||
|
|
||||||
|
require.Len(t, s, 2)
|
||||||
|
require.Contains(t, s, key2)
|
||||||
|
require.Len(t, s[key2].Logs, 1)
|
||||||
|
require.Equal(t, labels2, s[key2].Labels)
|
||||||
|
require.Equal(t, "122", s[key2].Logs[0][0])
|
||||||
|
require.Equal(t, "again", s[key2].Logs[0][1])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUniqKeyFromTagList(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
in []*telegraf.Tag
|
||||||
|
out string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
in: []*telegraf.Tag{
|
||||||
|
{Key: "key1", Value: "value1"},
|
||||||
|
{Key: "key2", Value: "value2"},
|
||||||
|
{Key: "key3", Value: "value3"},
|
||||||
|
},
|
||||||
|
out: "key1-value1-key2-value2-key3-value3-",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
in: []*telegraf.Tag{
|
||||||
|
{Key: "key1", Value: "value1"},
|
||||||
|
{Key: "key3", Value: "value3"},
|
||||||
|
{Key: "key4", Value: "value4"},
|
||||||
|
},
|
||||||
|
out: "key1-value1-key3-value3-key4-value4-",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
in: []*telegraf.Tag{
|
||||||
|
{Key: "target", Value: "local"},
|
||||||
|
{Key: "host", Value: "host"},
|
||||||
|
{Key: "service", Value: "dns"},
|
||||||
|
},
|
||||||
|
out: "target-local-host-host-service-dns-",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
in: []*telegraf.Tag{
|
||||||
|
{Key: "target", Value: "localhost"},
|
||||||
|
{Key: "hostservice", Value: "dns"},
|
||||||
|
},
|
||||||
|
out: "target-localhost-hostservice-dns-",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
in: []*telegraf.Tag{
|
||||||
|
{Key: "target-local", Value: "host-"},
|
||||||
|
},
|
||||||
|
out: "target--local-host---",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
require.Equal(t, test.out, uniqKeyFromTagList(test.in))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_newStream(t *testing.T) {
|
||||||
|
labels, tags := generateLabelsAndTag(
|
||||||
|
tuple{key: "key1", value: "value1"},
|
||||||
|
tuple{key: "key2", value: "value2"},
|
||||||
|
tuple{key: "key3", value: "value3"},
|
||||||
|
)
|
||||||
|
|
||||||
|
s := newStream(tags)
|
||||||
|
|
||||||
|
require.Empty(t, s.Logs)
|
||||||
|
require.Equal(t, s.Labels, labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_newStream_noTag(t *testing.T) {
|
||||||
|
s := newStream(nil)
|
||||||
|
|
||||||
|
require.Empty(t, s.Logs)
|
||||||
|
require.Empty(t, s.Labels)
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue