fix: sort logs by timestamp before writing to Loki (#9571)
This commit is contained in:
parent
fa77d2fb07
commit
e6abb46d87
|
|
@ -3,6 +3,8 @@
|
|||
This plugin sends logs to Loki, using tags as labels,
|
||||
log line will content all fields in `key="value"` format which is easily parsable with `logfmt` parser in Loki.
|
||||
|
||||
Logs within each stream are sorted by timestamp before being sent to Loki.
|
||||
|
||||
### Configuration:
|
||||
|
||||
```toml
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
|
@ -137,6 +138,10 @@ func (l *Loki) Close() error {
|
|||
func (l *Loki) Write(metrics []telegraf.Metric) error {
|
||||
s := Streams{}
|
||||
|
||||
sort.SliceStable(metrics, func(i, j int) bool {
|
||||
return metrics[i].Time().Before(metrics[j].Time())
|
||||
})
|
||||
|
||||
for _, m := range metrics {
|
||||
tags := m.TagList()
|
||||
var line string
|
||||
|
|
|
|||
|
|
@ -31,6 +31,33 @@ func getMetric() telegraf.Metric {
|
|||
)
|
||||
}
|
||||
|
||||
func getOutOfOrderMetrics() []telegraf.Metric {
|
||||
return []telegraf.Metric{
|
||||
testutil.MustMetric(
|
||||
"log",
|
||||
map[string]string{
|
||||
"key1": "value1",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"line": "newer log",
|
||||
"field": 3.14,
|
||||
},
|
||||
time.Unix(1230, 0),
|
||||
),
|
||||
testutil.MustMetric(
|
||||
"log",
|
||||
map[string]string{
|
||||
"key1": "value1",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"line": "older log",
|
||||
"field": 3.14,
|
||||
},
|
||||
time.Unix(456, 0),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatusCode(t *testing.T) {
|
||||
ts := httptest.NewServer(http.NotFoundHandler())
|
||||
defer ts.Close()
|
||||
|
|
@ -354,3 +381,47 @@ func TestDefaultUserAgent(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMetricSorting(t *testing.T) {
|
||||
ts := httptest.NewServer(http.NotFoundHandler())
|
||||
defer ts.Close()
|
||||
|
||||
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("out of order metrics", func(t *testing.T) {
|
||||
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body := r.Body
|
||||
var err error
|
||||
|
||||
payload, err := ioutil.ReadAll(body)
|
||||
require.NoError(t, err)
|
||||
|
||||
var s Request
|
||||
err = json.Unmarshal(payload, &s)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, s.Streams, 1)
|
||||
require.Len(t, s.Streams[0].Logs, 2)
|
||||
require.Len(t, s.Streams[0].Logs[0], 2)
|
||||
require.Equal(t, map[string]string{"key1": "value1"}, s.Streams[0].Labels)
|
||||
require.Equal(t, "456000000000", s.Streams[0].Logs[0][0])
|
||||
require.Contains(t, s.Streams[0].Logs[0][1], "line=\"older log\"")
|
||||
require.Contains(t, s.Streams[0].Logs[0][1], "field=\"3.14\"")
|
||||
require.Equal(t, "1230000000000", s.Streams[0].Logs[1][0])
|
||||
require.Contains(t, s.Streams[0].Logs[1][1], "line=\"newer log\"")
|
||||
require.Contains(t, s.Streams[0].Logs[1][1], "field=\"3.14\"")
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
})
|
||||
|
||||
client := &Loki{
|
||||
Domain: u.String(),
|
||||
}
|
||||
|
||||
err = client.Connect()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = client.Write(getOutOfOrderMetrics())
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue