fix(outputs.remotefile): Handle tracking metrics correctly (#16289)
This commit is contained in:
parent
73b41f5e6d
commit
e01f5f77ce
|
|
@ -177,7 +177,12 @@ func (f *File) Write(metrics []telegraf.Metric) error {
|
|||
|
||||
// Group the metrics per output file
|
||||
groups := make(map[string][]telegraf.Metric)
|
||||
for _, m := range metrics {
|
||||
for _, raw := range metrics {
|
||||
m := raw
|
||||
if wm, ok := raw.(telegraf.UnwrappableMetric); ok {
|
||||
m = wm.Unwrap()
|
||||
}
|
||||
|
||||
for _, tmpl := range f.templates {
|
||||
buf.Reset()
|
||||
if err := tmpl.Execute(&buf, m); err != nil {
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -393,3 +394,130 @@ func TestForgettingFiles(t *testing.T) {
|
|||
require.Len(t, plugin.serializers, 1)
|
||||
require.Contains(t, plugin.serializers, "test-b.csv")
|
||||
}
|
||||
|
||||
func TestTrackingMetrics(t *testing.T) {
|
||||
// see issue #16045
|
||||
inputRaw := []telegraf.Metric{
|
||||
metric.New(
|
||||
"test",
|
||||
map[string]string{"source": "localhost"},
|
||||
map[string]interface{}{"value": 23},
|
||||
time.Unix(1719410465, 0),
|
||||
),
|
||||
metric.New(
|
||||
"test",
|
||||
map[string]string{"source": "remotehost"},
|
||||
map[string]interface{}{"value": 21},
|
||||
time.Unix(1719410465, 0),
|
||||
),
|
||||
metric.New(
|
||||
"test",
|
||||
map[string]string{"source": "localhost"},
|
||||
map[string]interface{}{"value": 42},
|
||||
time.Unix(1719410485, 0),
|
||||
),
|
||||
metric.New(
|
||||
"test",
|
||||
map[string]string{"source": "remotehost"},
|
||||
map[string]interface{}{"value": 66},
|
||||
time.Unix(1719410485, 0),
|
||||
),
|
||||
metric.New(
|
||||
"test",
|
||||
map[string]string{"source": "remotehost"},
|
||||
map[string]interface{}{"value": 55},
|
||||
time.Unix(1716310124, 0),
|
||||
),
|
||||
metric.New(
|
||||
"test",
|
||||
map[string]string{"source": "remotehost"},
|
||||
map[string]interface{}{"value": 1},
|
||||
time.Unix(1716310174, 0),
|
||||
),
|
||||
}
|
||||
|
||||
// Create tracking metrics as inputs for the test
|
||||
var mu sync.Mutex
|
||||
delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw))
|
||||
notify := func(di telegraf.DeliveryInfo) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
delivered = append(delivered, di)
|
||||
}
|
||||
input := make([]telegraf.Metric, 0, len(inputRaw))
|
||||
for _, m := range inputRaw {
|
||||
tm, _ := metric.WithTracking(m, notify)
|
||||
input = append(input, tm)
|
||||
}
|
||||
|
||||
// Create the expectations
|
||||
expected := map[string][]string{
|
||||
"localhost-2024-06-26": {
|
||||
"test,source=localhost value=23i 1719410465000000000\n",
|
||||
"test,source=localhost value=42i 1719410485000000000\n",
|
||||
},
|
||||
"remotehost-2024-06-26": {
|
||||
"test,source=remotehost value=21i 1719410465000000000\n",
|
||||
"test,source=remotehost value=66i 1719410485000000000\n",
|
||||
},
|
||||
"remotehost-2024-05-21": {
|
||||
"test,source=remotehost value=55i 1716310124000000000\n",
|
||||
"test,source=remotehost value=1i 1716310174000000000\n",
|
||||
},
|
||||
}
|
||||
|
||||
// Prepare the output filesystem
|
||||
tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
// Setup the plugin including the serializer
|
||||
plugin := &File{
|
||||
Remote: config.NewSecret([]byte("local:" + tmpdir)),
|
||||
Files: []string{`{{.Tag "source"}}-{{.Time.Format "2006-01-02"}}`},
|
||||
WriteBackInterval: config.Duration(100 * time.Millisecond),
|
||||
Log: &testutil.Logger{},
|
||||
}
|
||||
|
||||
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
|
||||
serializer := &influx.Serializer{}
|
||||
err := serializer.Init()
|
||||
return serializer, err
|
||||
})
|
||||
require.NoError(t, plugin.Init())
|
||||
require.NoError(t, plugin.Connect())
|
||||
defer plugin.Close()
|
||||
|
||||
// Write the metrics and wait for the data to settle to disk
|
||||
require.NoError(t, plugin.Write(input))
|
||||
require.Eventually(t, func() bool {
|
||||
ok := true
|
||||
for fn := range expected {
|
||||
_, err := os.Stat(filepath.Join(tmpdir, fn))
|
||||
ok = ok && err == nil
|
||||
}
|
||||
return ok
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
|
||||
// Check the result
|
||||
for fn, lines := range expected {
|
||||
tmpfn := filepath.Join(tmpdir, fn)
|
||||
require.FileExists(t, tmpfn)
|
||||
|
||||
actual, err := os.ReadFile(tmpfn)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, strings.Join(lines, ""), string(actual))
|
||||
}
|
||||
|
||||
// Simulate output acknowledging delivery
|
||||
for _, m := range input {
|
||||
m.Accept()
|
||||
}
|
||||
|
||||
// Check delivery
|
||||
require.Eventuallyf(t, func() bool {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return len(input) == len(delivered)
|
||||
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected))
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue