fix(outputs.remotefile): Create a new serializer instance per output file (#15968)

This commit is contained in:
Sven Rebhan 2024-10-14 21:41:52 +02:00 committed by GitHub
parent c9a7582b12
commit 23fc01ce9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 248 additions and 28 deletions

View File

@ -1010,6 +1010,25 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
return running, err
}
func (c *Config) probeSerializer(table *ast.Table) bool {
dataFormat := c.getFieldString(table, "data_format")
if dataFormat == "" {
dataFormat = "influx"
}
creator, ok := serializers.Serializers[dataFormat]
if !ok {
return false
}
// Try to parse the options to detect if any of them is misspelled
serializer := creator()
//nolint:errcheck // We don't actually use the parser, so no need to check the error.
c.toml.UnmarshalTable(table, serializer)
return true
}
func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.RunningSerializer, error) {
conf := &models.SerializerConfig{
Parent: parentname,
@ -1140,6 +1159,15 @@ func (c *Config) setupProcessor(name string, creator processors.StreamingCreator
t.SetSerializer(serializer)
optionTestCount++
}
if t, ok := processor.(telegraf.SerializerFuncPlugin); ok {
if !c.probeSerializer(table) {
return nil, 0, errors.New("serializer not found")
}
t.SetSerializerFunc(func() (telegraf.Serializer, error) {
return c.addSerializer(name, table)
})
optionTestCount++
}
if err := c.toml.UnmarshalTable(table, processor); err != nil {
return nil, 0, fmt.Errorf("unmarshalling failed: %w", err)
@ -1154,8 +1182,8 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
return nil
}
// For inputs with parsers we need to compute the set of
// options that is not covered by both, the parser and the input.
// For outputs with serializers we need to compute the set of
// options that is not covered by both, the serializer and the input.
// We achieve this by keeping a local book of missing entries
// that counts the number of misses. In case we have a parser
// for the input both need to miss the entry. We count the
@ -1196,6 +1224,16 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
t.SetSerializer(serializer)
}
if t, ok := output.(telegraf.SerializerFuncPlugin); ok {
missThreshold = 1
if !c.probeSerializer(table) {
return errors.New("serializer not found")
}
t.SetSerializerFunc(func() (telegraf.Serializer, error) {
return c.addSerializer(name, table)
})
}
outputConfig, err := c.buildOutput(name, table)
if err != nil {
return err

View File

@ -22,6 +22,7 @@ type Parser interface {
SetDefaultTags(tags map[string]string)
}
// ParserFunc is a function to create a new instance of a parser
type ParserFunc func() (Parser, error)
// ParserPlugin is an interface for plugins that are able to parse

View File

@ -63,6 +63,15 @@ to use them.
## Maximum size of the cache on disk (infinite by default)
# cache_max_size = -1
## Forget files after not being touched for longer than the given time
## This is useful to prevent memory leaks when using time-based filenames
## as it allows internal structures to be cleaned up.
## Note: When writing to a file after is has been forgotten, the file is
## treated as a new file which might cause file-headers to be appended
## again by certain serializers like CSV.
## By default files will be kept indefinitely.
# forget_files_after = "0s"
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:

View File

@ -21,7 +21,6 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)
//go:embed sample.conf
@ -35,6 +34,7 @@ type File struct {
MaxCacheSize config.Size `toml:"cache_max_size"`
UseBatchFormat bool `toml:"use_batch_format"`
Trace bool `toml:"trace" deprecated:"1.33.0;1.35.0;use 'log_level = \"trace\"' instead"`
ForgetFiles config.Duration `toml:"forget_files_after"`
Log telegraf.Logger `toml:"-"`
root *vfs.VFS
@ -42,15 +42,17 @@ type File struct {
vfsopts vfscommon.Options
templates []*template.Template
serializer serializers.Serializer
serializerFunc telegraf.SerializerFunc
serializers map[string]telegraf.Serializer
modified map[string]time.Time
}
func (*File) SampleConfig() string {
return sampleConfig
}
func (f *File) SetSerializer(serializer serializers.Serializer) {
f.serializer = serializer
func (f *File) SetSerializerFunc(sf telegraf.SerializerFunc) {
f.serializerFunc = sf
}
func (f *File) Init() error {
@ -101,6 +103,9 @@ func (f *File) Init() error {
f.templates = append(f.templates, tmpl)
}
f.serializers = make(map[string]telegraf.Serializer)
f.modified = make(map[string]time.Time)
return nil
}
@ -187,8 +192,16 @@ func (f *File) Write(metrics []telegraf.Metric) error {
// Serialize the metric groups
groupBuffer := make(map[string][]byte, len(groups))
for fn, fnMetrics := range groups {
if _, found := f.serializers[fn]; !found {
var err error
if f.serializers[fn], err = f.serializerFunc(); err != nil {
return fmt.Errorf("creating serializer failed: %w", err)
}
}
serializer := f.serializers[fn]
if f.UseBatchFormat {
serialized, err := f.serializer.SerializeBatch(fnMetrics)
serialized, err := serializer.SerializeBatch(fnMetrics)
if err != nil {
f.Log.Errorf("Could not serialize metrics: %v", err)
continue
@ -196,7 +209,7 @@ func (f *File) Write(metrics []telegraf.Metric) error {
groupBuffer[fn] = serialized
} else {
for _, m := range fnMetrics {
serialized, err := f.serializer.Serialize(m)
serialized, err := serializer.Serialize(m)
if err != nil {
f.Log.Debugf("Could not serialize metric: %v", err)
continue
@ -207,6 +220,7 @@ func (f *File) Write(metrics []telegraf.Metric) error {
}
// Write the files
t := time.Now()
for fn, serialized := range groupBuffer {
// Make sure the directory exists
dir := filepath.Dir(filepath.ToSlash(fn))
@ -232,6 +246,18 @@ func (f *File) Write(metrics []telegraf.Metric) error {
return fmt.Errorf("writing metrics to file %q failed: %w", fn, err)
}
file.Close()
f.modified[fn] = t
}
// Cleanup internal structures for old files
if f.ForgetFiles > 0 {
for fn, tmod := range f.modified {
if t.Sub(tmod) > time.Duration(f.ForgetFiles) {
delete(f.serializers, fn)
delete(f.modified, fn)
}
}
}
return nil

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers/csv"
"github.com/influxdata/telegraf/plugins/serializers/influx"
"github.com/influxdata/telegraf/testutil"
)
@ -40,9 +41,11 @@ func TestStaticFileCreation(t *testing.T) {
Log: &testutil.Logger{},
}
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
err := serializer.Init()
return serializer, err
})
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
@ -93,9 +96,11 @@ func TestStaticFileAppend(t *testing.T) {
Log: &testutil.Logger{},
}
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
err := serializer.Init()
return serializer, err
})
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
@ -180,9 +185,11 @@ func TestDynamicFiles(t *testing.T) {
Log: &testutil.Logger{},
}
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
err := serializer.Init()
return serializer, err
})
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
@ -246,9 +253,11 @@ func TestCustomTemplateFunctions(t *testing.T) {
Log: &testutil.Logger{},
}
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &influx.Serializer{}
require.NoError(t, serializer.Init())
plugin.SetSerializer(serializer)
err := serializer.Init()
return serializer, err
})
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
@ -266,3 +275,121 @@ func TestCustomTemplateFunctions(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expected, string(actual))
}
func TestCSVSerialization(t *testing.T) {
input := []telegraf.Metric{
metric.New(
"test",
map[string]string{"source": "a"},
map[string]interface{}{"value": 42},
time.Unix(1587686400, 0),
),
metric.New(
"test",
map[string]string{"source": "b"},
map[string]interface{}{"value": 23},
time.Unix(1587686400, 0),
),
}
expected := map[string]string{
"test-a.csv": "timestamp,measurement,source,value\n1587686400,test,a,42\n",
"test-b.csv": "timestamp,measurement,source,value\n1587686400,test,b,23\n",
}
tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
// Setup the plugin including the serializer
plugin := &File{
Remote: config.NewSecret([]byte("local:" + tmpdir)),
Files: []string{`test-{{.Tag "source"}}.csv`},
WriteBackInterval: config.Duration(100 * time.Millisecond),
Log: &testutil.Logger{},
}
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &csv.Serializer{Header: true}
err := serializer.Init()
return serializer, err
})
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()
// Write the input metrics and close the plugin. This is required to
// actually flush the data to disk
require.NoError(t, plugin.Write(input))
plugin.Close()
// Check the result
for expectedFilename, expectedContent := range expected {
require.FileExists(t, filepath.Join(tmpdir, expectedFilename))
buf, err := os.ReadFile(filepath.Join(tmpdir, expectedFilename))
require.NoError(t, err)
actual := strings.ReplaceAll(string(buf), "\r\n", "\n")
require.Equal(t, expectedContent, actual)
}
require.Len(t, plugin.modified, 2)
require.Contains(t, plugin.modified, "test-a.csv")
require.Contains(t, plugin.modified, "test-b.csv")
require.Len(t, plugin.serializers, 2)
require.Contains(t, plugin.serializers, "test-a.csv")
require.Contains(t, plugin.serializers, "test-b.csv")
}
func TestForgettingFiles(t *testing.T) {
input := []telegraf.Metric{
metric.New(
"test",
map[string]string{"source": "a"},
map[string]interface{}{"value": 42},
time.Unix(1587686400, 0),
),
metric.New(
"test",
map[string]string{"source": "b"},
map[string]interface{}{"value": 23},
time.Unix(1587686400, 0),
),
}
tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)
// Setup the plugin including the serializer
plugin := &File{
Remote: config.NewSecret([]byte("local:" + tmpdir)),
Files: []string{`test-{{.Tag "source"}}.csv`},
WriteBackInterval: config.Duration(100 * time.Millisecond),
ForgetFiles: config.Duration(10 * time.Millisecond),
Log: &testutil.Logger{},
}
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &csv.Serializer{Header: true}
err := serializer.Init()
return serializer, err
})
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()
// Write the input metrics and close the plugin. This is required to
// actually flush the data to disk
require.NoError(t, plugin.Write(input[:1]))
time.Sleep(100 * time.Millisecond)
require.NoError(t, plugin.Write(input[1:]))
plugin.Close()
// Check the result
require.Len(t, plugin.modified, 1)
require.Contains(t, plugin.modified, "test-b.csv")
require.Len(t, plugin.serializers, 1)
require.Contains(t, plugin.serializers, "test-b.csv")
}

View File

@ -33,6 +33,15 @@
## Maximum size of the cache on disk (infinite by default)
# cache_max_size = -1
## Forget files after not being touched for longer than the given time
## This is useful to prevent memory leaks when using time-based filenames
## as it allows internal structures to be cleaned up.
## Note: When writing to a file after is has been forgotten, the file is
## treated as a new file which might cause file-headers to be appended
## again by certain serializers like CSV.
## By default files will be kept indefinitely.
# forget_files_after = "0s"
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:

View File

@ -1,12 +1,5 @@
package telegraf
// SerializerPlugin is an interface for plugins that are able to
// serialize telegraf metrics into arbitrary data formats.
type SerializerPlugin interface {
// SetSerializer sets the serializer function for the interface.
SetSerializer(serializer Serializer)
}
// Serializer is an interface defining functions that a serializer plugin must
// satisfy.
//
@ -26,3 +19,20 @@ type Serializer interface {
// line oriented framing.
SerializeBatch(metrics []Metric) ([]byte, error)
}
// SerializerFunc is a function to create a new instance of a serializer
type SerializerFunc func() (Serializer, error)
// SerializerPlugin is an interface for plugins that are able to
// serialize telegraf metrics into arbitrary data formats.
type SerializerPlugin interface {
// SetSerializer sets the serializer function for the interface.
SetSerializer(serializer Serializer)
}
// SerializerFuncPlugin is an interface for plugins that are able to serialize
// arbitrary data formats and require multiple instances of a parser.
type SerializerFuncPlugin interface {
// GetParser returns a new parser.
SetSerializerFunc(fn SerializerFunc)
}