feat(outputs.loki): Add option for metric name label (#13157)
This commit is contained in:
parent
1a7c274ddf
commit
8396f1a00d
|
|
@ -52,4 +52,10 @@ to use them.
|
||||||
# tls_ca = "/etc/telegraf/ca.pem"
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
# tls_cert = "/etc/telegraf/cert.pem"
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
# tls_key = "/etc/telegraf/key.pem"
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
|
||||||
|
## Metric Name Label
|
||||||
|
## Label to use for the metric name to when sending metrics. If set to an
|
||||||
|
## empty string, this will not add the label. This is NOT suggested as there
|
||||||
|
## is no way to differentiate between multiple metrics.
|
||||||
|
# metric_name_label = "__name"
|
||||||
```
|
```
|
||||||
|
|
|
||||||
|
|
@ -32,17 +32,18 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Loki struct {
|
type Loki struct {
|
||||||
Domain string `toml:"domain"`
|
Domain string `toml:"domain"`
|
||||||
Endpoint string `toml:"endpoint"`
|
Endpoint string `toml:"endpoint"`
|
||||||
Timeout config.Duration `toml:"timeout"`
|
Timeout config.Duration `toml:"timeout"`
|
||||||
Username config.Secret `toml:"username"`
|
Username config.Secret `toml:"username"`
|
||||||
Password config.Secret `toml:"password"`
|
Password config.Secret `toml:"password"`
|
||||||
Headers map[string]string `toml:"http_headers"`
|
Headers map[string]string `toml:"http_headers"`
|
||||||
ClientID string `toml:"client_id"`
|
ClientID string `toml:"client_id"`
|
||||||
ClientSecret string `toml:"client_secret"`
|
ClientSecret string `toml:"client_secret"`
|
||||||
TokenURL string `toml:"token_url"`
|
TokenURL string `toml:"token_url"`
|
||||||
Scopes []string `toml:"scopes"`
|
Scopes []string `toml:"scopes"`
|
||||||
GZipRequest bool `toml:"gzip_request"`
|
GZipRequest bool `toml:"gzip_request"`
|
||||||
|
MetricNameLabel string `toml:"metric_name_label"`
|
||||||
|
|
||||||
url string
|
url string
|
||||||
client *http.Client
|
client *http.Client
|
||||||
|
|
@ -119,7 +120,9 @@ func (l *Loki) Write(metrics []telegraf.Metric) error {
|
||||||
})
|
})
|
||||||
|
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
m.AddTag("__name", m.Name())
|
if l.MetricNameLabel != "" {
|
||||||
|
m.AddTag(l.MetricNameLabel, m.Name())
|
||||||
|
}
|
||||||
|
|
||||||
tags := m.TagList()
|
tags := m.TagList()
|
||||||
var line string
|
var line string
|
||||||
|
|
@ -197,6 +200,8 @@ func (l *Loki) writeMetrics(s Streams) error {
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
outputs.Add("loki", func() telegraf.Output {
|
outputs.Add("loki", func() telegraf.Output {
|
||||||
return &Loki{}
|
return &Loki{
|
||||||
|
MetricNameLabel: "__name",
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -121,8 +121,7 @@ func TestStatusCode(t *testing.T) {
|
||||||
w.WriteHeader(tt.statusCode)
|
w.WriteHeader(tt.statusCode)
|
||||||
})
|
})
|
||||||
|
|
||||||
err = tt.plugin.Connect()
|
require.NoError(t, tt.plugin.Connect())
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
||||||
tt.errFunc(t, err)
|
tt.errFunc(t, err)
|
||||||
|
|
@ -167,8 +166,7 @@ func TestContentType(t *testing.T) {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
})
|
})
|
||||||
|
|
||||||
err = tt.plugin.Connect()
|
require.NoError(t, tt.plugin.Connect())
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -226,7 +224,7 @@ func TestContentEncodingGzip(t *testing.T) {
|
||||||
require.Len(t, s.Streams, 1)
|
require.Len(t, s.Streams, 1)
|
||||||
require.Len(t, s.Streams[0].Logs, 1)
|
require.Len(t, s.Streams[0].Logs, 1)
|
||||||
require.Len(t, s.Streams[0].Logs[0], 2)
|
require.Len(t, s.Streams[0].Logs[0], 2)
|
||||||
require.Equal(t, map[string]string{"__name": "log", "key1": "value1"}, s.Streams[0].Labels)
|
require.Equal(t, map[string]string{"key1": "value1"}, s.Streams[0].Labels)
|
||||||
require.Equal(t, "123000000000", s.Streams[0].Logs[0][0])
|
require.Equal(t, "123000000000", s.Streams[0].Logs[0][0])
|
||||||
require.Contains(t, s.Streams[0].Logs[0][1], "line=\"my log\"")
|
require.Contains(t, s.Streams[0].Logs[0][1], "line=\"my log\"")
|
||||||
require.Contains(t, s.Streams[0].Logs[0][1], "field=\"3.14\"")
|
require.Contains(t, s.Streams[0].Logs[0][1], "field=\"3.14\"")
|
||||||
|
|
@ -234,8 +232,7 @@ func TestContentEncodingGzip(t *testing.T) {
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
})
|
})
|
||||||
|
|
||||||
err = tt.plugin.Connect()
|
require.NoError(t, tt.plugin.Connect())
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -243,6 +240,56 @@ func TestContentEncodingGzip(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMetricNameLabel(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
defer ts.Close()
|
||||||
|
|
||||||
|
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
metricNameLabel string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no label",
|
||||||
|
metricNameLabel: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "custom label",
|
||||||
|
metricNameLabel: "foobar",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
payload, err := io.ReadAll(r.Body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var s Request
|
||||||
|
require.NoError(t, json.Unmarshal(payload, &s))
|
||||||
|
|
||||||
|
switch tt.metricNameLabel {
|
||||||
|
case "":
|
||||||
|
require.Equal(t, map[string]string{"key1": "value1"}, s.Streams[0].Labels)
|
||||||
|
case "foobar":
|
||||||
|
require.Equal(t, map[string]string{"foobar": "log", "key1": "value1"}, s.Streams[0].Labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
})
|
||||||
|
|
||||||
|
l := Loki{
|
||||||
|
Domain: u.String(),
|
||||||
|
MetricNameLabel: tt.metricNameLabel,
|
||||||
|
}
|
||||||
|
require.NoError(t, l.Connect())
|
||||||
|
require.NoError(t, l.Write([]telegraf.Metric{getMetric()}))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestBasicAuth(t *testing.T) {
|
func TestBasicAuth(t *testing.T) {
|
||||||
ts := httptest.NewServer(http.NotFoundHandler())
|
ts := httptest.NewServer(http.NotFoundHandler())
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
@ -349,8 +396,7 @@ func TestOAuthClientCredentialsGrant(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
err = tt.plugin.Connect()
|
require.NoError(t, tt.plugin.Connect())
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -375,8 +421,7 @@ func TestDefaultUserAgent(t *testing.T) {
|
||||||
Domain: u.String(),
|
Domain: u.String(),
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.Connect()
|
require.NoError(t, client.Connect())
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
err = client.Write([]telegraf.Metric{getMetric()})
|
err = client.Write([]telegraf.Metric{getMetric()})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -404,7 +449,7 @@ func TestMetricSorting(t *testing.T) {
|
||||||
require.Len(t, s.Streams, 1)
|
require.Len(t, s.Streams, 1)
|
||||||
require.Len(t, s.Streams[0].Logs, 2)
|
require.Len(t, s.Streams[0].Logs, 2)
|
||||||
require.Len(t, s.Streams[0].Logs[0], 2)
|
require.Len(t, s.Streams[0].Logs[0], 2)
|
||||||
require.Equal(t, map[string]string{"__name": "log", "key1": "value1"}, s.Streams[0].Labels)
|
require.Equal(t, map[string]string{"key1": "value1"}, s.Streams[0].Labels)
|
||||||
require.Equal(t, "456000000000", s.Streams[0].Logs[0][0])
|
require.Equal(t, "456000000000", s.Streams[0].Logs[0][0])
|
||||||
require.Contains(t, s.Streams[0].Logs[0][1], "line=\"older log\"")
|
require.Contains(t, s.Streams[0].Logs[0][1], "line=\"older log\"")
|
||||||
require.Contains(t, s.Streams[0].Logs[0][1], "field=\"3.14\"")
|
require.Contains(t, s.Streams[0].Logs[0][1], "field=\"3.14\"")
|
||||||
|
|
@ -419,8 +464,7 @@ func TestMetricSorting(t *testing.T) {
|
||||||
Domain: u.String(),
|
Domain: u.String(),
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.Connect()
|
require.NoError(t, client.Connect())
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
err = client.Write(getOutOfOrderMetrics())
|
err = client.Write(getOutOfOrderMetrics())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
||||||
|
|
@ -23,3 +23,9 @@
|
||||||
# tls_ca = "/etc/telegraf/ca.pem"
|
# tls_ca = "/etc/telegraf/ca.pem"
|
||||||
# tls_cert = "/etc/telegraf/cert.pem"
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
# tls_key = "/etc/telegraf/key.pem"
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
|
||||||
|
## Metric Name Label
|
||||||
|
## Label to use for the metric name to when sending metrics. If set to an
|
||||||
|
## empty string, this will not add the label. This is NOT suggested as there
|
||||||
|
## is no way to differentiate between multiple metrics.
|
||||||
|
# metric_name_label = "__name"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue