diff --git a/config/config.go b/config/config.go index 008316fc5..876f7d4f4 100644 --- a/config/config.go +++ b/config/config.go @@ -1010,6 +1010,25 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table) return running, err } +func (c *Config) probeSerializer(table *ast.Table) bool { + dataFormat := c.getFieldString(table, "data_format") + if dataFormat == "" { + dataFormat = "influx" + } + + creator, ok := serializers.Serializers[dataFormat] + if !ok { + return false + } + + // Try to parse the options to detect if any of them is misspelled + serializer := creator() + //nolint:errcheck // We don't actually use the parser, so no need to check the error. + c.toml.UnmarshalTable(table, serializer) + + return true +} + func (c *Config) addSerializer(parentname string, table *ast.Table) (*models.RunningSerializer, error) { conf := &models.SerializerConfig{ Parent: parentname, @@ -1140,6 +1159,15 @@ func (c *Config) setupProcessor(name string, creator processors.StreamingCreator t.SetSerializer(serializer) optionTestCount++ } + if t, ok := processor.(telegraf.SerializerFuncPlugin); ok { + if !c.probeSerializer(table) { + return nil, 0, errors.New("serializer not found") + } + t.SetSerializerFunc(func() (telegraf.Serializer, error) { + return c.addSerializer(name, table) + }) + optionTestCount++ + } if err := c.toml.UnmarshalTable(table, processor); err != nil { return nil, 0, fmt.Errorf("unmarshalling failed: %w", err) @@ -1154,8 +1182,8 @@ func (c *Config) addOutput(name string, table *ast.Table) error { return nil } - // For inputs with parsers we need to compute the set of - // options that is not covered by both, the parser and the input. + // For outputs with serializers we need to compute the set of + // options that is not covered by both, the serializer and the input. // We achieve this by keeping a local book of missing entries // that counts the number of misses. In case we have a parser // for the input both need to miss the entry. We count the @@ -1196,6 +1224,16 @@ func (c *Config) addOutput(name string, table *ast.Table) error { t.SetSerializer(serializer) } + if t, ok := output.(telegraf.SerializerFuncPlugin); ok { + missThreshold = 1 + if !c.probeSerializer(table) { + return errors.New("serializer not found") + } + t.SetSerializerFunc(func() (telegraf.Serializer, error) { + return c.addSerializer(name, table) + }) + } + outputConfig, err := c.buildOutput(name, table) if err != nil { return err diff --git a/parser.go b/parser.go index 5d67de987..6111886df 100644 --- a/parser.go +++ b/parser.go @@ -22,6 +22,7 @@ type Parser interface { SetDefaultTags(tags map[string]string) } +// ParserFunc is a function to create a new instance of a parser type ParserFunc func() (Parser, error) // ParserPlugin is an interface for plugins that are able to parse diff --git a/plugins/outputs/remotefile/README.md b/plugins/outputs/remotefile/README.md index 9277707bd..28150b0b6 100644 --- a/plugins/outputs/remotefile/README.md +++ b/plugins/outputs/remotefile/README.md @@ -63,6 +63,15 @@ to use them. ## Maximum size of the cache on disk (infinite by default) # cache_max_size = -1 + ## Forget files after not being touched for longer than the given time + ## This is useful to prevent memory leaks when using time-based filenames + ## as it allows internal structures to be cleaned up. + ## Note: When writing to a file after is has been forgotten, the file is + ## treated as a new file which might cause file-headers to be appended + ## again by certain serializers like CSV. + ## By default files will be kept indefinitely. + # forget_files_after = "0s" + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/outputs/remotefile/remotefile.go b/plugins/outputs/remotefile/remotefile.go index c1c60a6ce..9443843ee 100644 --- a/plugins/outputs/remotefile/remotefile.go +++ b/plugins/outputs/remotefile/remotefile.go @@ -21,7 +21,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/outputs" - "github.com/influxdata/telegraf/plugins/serializers" ) //go:embed sample.conf @@ -35,22 +34,25 @@ type File struct { MaxCacheSize config.Size `toml:"cache_max_size"` UseBatchFormat bool `toml:"use_batch_format"` Trace bool `toml:"trace" deprecated:"1.33.0;1.35.0;use 'log_level = \"trace\"' instead"` + ForgetFiles config.Duration `toml:"forget_files_after"` Log telegraf.Logger `toml:"-"` root *vfs.VFS fscancel context.CancelFunc vfsopts vfscommon.Options - templates []*template.Template - serializer serializers.Serializer + templates []*template.Template + serializerFunc telegraf.SerializerFunc + serializers map[string]telegraf.Serializer + modified map[string]time.Time } func (*File) SampleConfig() string { return sampleConfig } -func (f *File) SetSerializer(serializer serializers.Serializer) { - f.serializer = serializer +func (f *File) SetSerializerFunc(sf telegraf.SerializerFunc) { + f.serializerFunc = sf } func (f *File) Init() error { @@ -101,6 +103,9 @@ func (f *File) Init() error { f.templates = append(f.templates, tmpl) } + f.serializers = make(map[string]telegraf.Serializer) + f.modified = make(map[string]time.Time) + return nil } @@ -187,8 +192,16 @@ func (f *File) Write(metrics []telegraf.Metric) error { // Serialize the metric groups groupBuffer := make(map[string][]byte, len(groups)) for fn, fnMetrics := range groups { + if _, found := f.serializers[fn]; !found { + var err error + if f.serializers[fn], err = f.serializerFunc(); err != nil { + return fmt.Errorf("creating serializer failed: %w", err) + } + } + serializer := f.serializers[fn] + if f.UseBatchFormat { - serialized, err := f.serializer.SerializeBatch(fnMetrics) + serialized, err := serializer.SerializeBatch(fnMetrics) if err != nil { f.Log.Errorf("Could not serialize metrics: %v", err) continue @@ -196,7 +209,7 @@ func (f *File) Write(metrics []telegraf.Metric) error { groupBuffer[fn] = serialized } else { for _, m := range fnMetrics { - serialized, err := f.serializer.Serialize(m) + serialized, err := serializer.Serialize(m) if err != nil { f.Log.Debugf("Could not serialize metric: %v", err) continue @@ -207,6 +220,7 @@ func (f *File) Write(metrics []telegraf.Metric) error { } // Write the files + t := time.Now() for fn, serialized := range groupBuffer { // Make sure the directory exists dir := filepath.Dir(filepath.ToSlash(fn)) @@ -232,6 +246,18 @@ func (f *File) Write(metrics []telegraf.Metric) error { return fmt.Errorf("writing metrics to file %q failed: %w", fn, err) } file.Close() + + f.modified[fn] = t + } + + // Cleanup internal structures for old files + if f.ForgetFiles > 0 { + for fn, tmod := range f.modified { + if t.Sub(tmod) > time.Duration(f.ForgetFiles) { + delete(f.serializers, fn) + delete(f.modified, fn) + } + } } return nil diff --git a/plugins/outputs/remotefile/remotefile_test.go b/plugins/outputs/remotefile/remotefile_test.go index 1d717e6e9..70ba9a919 100644 --- a/plugins/outputs/remotefile/remotefile_test.go +++ b/plugins/outputs/remotefile/remotefile_test.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/serializers/csv" "github.com/influxdata/telegraf/plugins/serializers/influx" "github.com/influxdata/telegraf/testutil" ) @@ -40,9 +41,11 @@ func TestStaticFileCreation(t *testing.T) { Log: &testutil.Logger{}, } - serializer := &influx.Serializer{} - require.NoError(t, serializer.Init()) - plugin.SetSerializer(serializer) + plugin.SetSerializerFunc(func() (telegraf.Serializer, error) { + serializer := &influx.Serializer{} + err := serializer.Init() + return serializer, err + }) require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -93,9 +96,11 @@ func TestStaticFileAppend(t *testing.T) { Log: &testutil.Logger{}, } - serializer := &influx.Serializer{} - require.NoError(t, serializer.Init()) - plugin.SetSerializer(serializer) + plugin.SetSerializerFunc(func() (telegraf.Serializer, error) { + serializer := &influx.Serializer{} + err := serializer.Init() + return serializer, err + }) require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -180,9 +185,11 @@ func TestDynamicFiles(t *testing.T) { Log: &testutil.Logger{}, } - serializer := &influx.Serializer{} - require.NoError(t, serializer.Init()) - plugin.SetSerializer(serializer) + plugin.SetSerializerFunc(func() (telegraf.Serializer, error) { + serializer := &influx.Serializer{} + err := serializer.Init() + return serializer, err + }) require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -246,9 +253,11 @@ func TestCustomTemplateFunctions(t *testing.T) { Log: &testutil.Logger{}, } - serializer := &influx.Serializer{} - require.NoError(t, serializer.Init()) - plugin.SetSerializer(serializer) + plugin.SetSerializerFunc(func() (telegraf.Serializer, error) { + serializer := &influx.Serializer{} + err := serializer.Init() + return serializer, err + }) require.NoError(t, plugin.Init()) require.NoError(t, plugin.Connect()) @@ -266,3 +275,121 @@ func TestCustomTemplateFunctions(t *testing.T) { require.NoError(t, err) require.Equal(t, expected, string(actual)) } + +func TestCSVSerialization(t *testing.T) { + input := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"source": "a"}, + map[string]interface{}{"value": 42}, + time.Unix(1587686400, 0), + ), + metric.New( + "test", + map[string]string{"source": "b"}, + map[string]interface{}{"value": 23}, + time.Unix(1587686400, 0), + ), + } + expected := map[string]string{ + "test-a.csv": "timestamp,measurement,source,value\n1587686400,test,a,42\n", + "test-b.csv": "timestamp,measurement,source,value\n1587686400,test,b,23\n", + } + + tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + // Setup the plugin including the serializer + plugin := &File{ + Remote: config.NewSecret([]byte("local:" + tmpdir)), + Files: []string{`test-{{.Tag "source"}}.csv`}, + WriteBackInterval: config.Duration(100 * time.Millisecond), + Log: &testutil.Logger{}, + } + + plugin.SetSerializerFunc(func() (telegraf.Serializer, error) { + serializer := &csv.Serializer{Header: true} + err := serializer.Init() + return serializer, err + }) + + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Write the input metrics and close the plugin. This is required to + // actually flush the data to disk + require.NoError(t, plugin.Write(input)) + plugin.Close() + + // Check the result + for expectedFilename, expectedContent := range expected { + require.FileExists(t, filepath.Join(tmpdir, expectedFilename)) + buf, err := os.ReadFile(filepath.Join(tmpdir, expectedFilename)) + require.NoError(t, err) + actual := strings.ReplaceAll(string(buf), "\r\n", "\n") + require.Equal(t, expectedContent, actual) + } + + require.Len(t, plugin.modified, 2) + require.Contains(t, plugin.modified, "test-a.csv") + require.Contains(t, plugin.modified, "test-b.csv") + require.Len(t, plugin.serializers, 2) + require.Contains(t, plugin.serializers, "test-a.csv") + require.Contains(t, plugin.serializers, "test-b.csv") +} + +func TestForgettingFiles(t *testing.T) { + input := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"source": "a"}, + map[string]interface{}{"value": 42}, + time.Unix(1587686400, 0), + ), + metric.New( + "test", + map[string]string{"source": "b"}, + map[string]interface{}{"value": 23}, + time.Unix(1587686400, 0), + ), + } + + tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + // Setup the plugin including the serializer + plugin := &File{ + Remote: config.NewSecret([]byte("local:" + tmpdir)), + Files: []string{`test-{{.Tag "source"}}.csv`}, + WriteBackInterval: config.Duration(100 * time.Millisecond), + ForgetFiles: config.Duration(10 * time.Millisecond), + Log: &testutil.Logger{}, + } + + plugin.SetSerializerFunc(func() (telegraf.Serializer, error) { + serializer := &csv.Serializer{Header: true} + err := serializer.Init() + return serializer, err + }) + + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Write the input metrics and close the plugin. This is required to + // actually flush the data to disk + require.NoError(t, plugin.Write(input[:1])) + time.Sleep(100 * time.Millisecond) + require.NoError(t, plugin.Write(input[1:])) + + plugin.Close() + + // Check the result + require.Len(t, plugin.modified, 1) + require.Contains(t, plugin.modified, "test-b.csv") + require.Len(t, plugin.serializers, 1) + require.Contains(t, plugin.serializers, "test-b.csv") +} diff --git a/plugins/outputs/remotefile/sample.conf b/plugins/outputs/remotefile/sample.conf index 273cb9879..4fa149581 100644 --- a/plugins/outputs/remotefile/sample.conf +++ b/plugins/outputs/remotefile/sample.conf @@ -33,6 +33,15 @@ ## Maximum size of the cache on disk (infinite by default) # cache_max_size = -1 + ## Forget files after not being touched for longer than the given time + ## This is useful to prevent memory leaks when using time-based filenames + ## as it allows internal structures to be cleaned up. + ## Note: When writing to a file after is has been forgotten, the file is + ## treated as a new file which might cause file-headers to be appended + ## again by certain serializers like CSV. + ## By default files will be kept indefinitely. + # forget_files_after = "0s" + ## Data format to output. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/serializer.go b/serializer.go index dcb151e42..b7c9d51f7 100644 --- a/serializer.go +++ b/serializer.go @@ -1,12 +1,5 @@ package telegraf -// SerializerPlugin is an interface for plugins that are able to -// serialize telegraf metrics into arbitrary data formats. -type SerializerPlugin interface { - // SetSerializer sets the serializer function for the interface. - SetSerializer(serializer Serializer) -} - // Serializer is an interface defining functions that a serializer plugin must // satisfy. // @@ -26,3 +19,20 @@ type Serializer interface { // line oriented framing. SerializeBatch(metrics []Metric) ([]byte, error) } + +// SerializerFunc is a function to create a new instance of a serializer +type SerializerFunc func() (Serializer, error) + +// SerializerPlugin is an interface for plugins that are able to +// serialize telegraf metrics into arbitrary data formats. +type SerializerPlugin interface { + // SetSerializer sets the serializer function for the interface. + SetSerializer(serializer Serializer) +} + +// SerializerFuncPlugin is an interface for plugins that are able to serialize +// arbitrary data formats and require multiple instances of a parser. +type SerializerFuncPlugin interface { + // GetParser returns a new parser. + SetSerializerFunc(fn SerializerFunc) +}