From d06b387528f36909515b0d27cf333f6ef60f430d Mon Sep 17 00:00:00 2001 From: etycomputer <57578566+etycomputer@users.noreply.github.com> Date: Fri, 8 Oct 2021 06:38:20 +1000 Subject: [PATCH] feat: Adds the ability to create and name a tag containing the filename using the directory monitor input plugin (#9860) Co-authored-by: Ehsan Yazdi --- plugins/inputs/directory_monitor/README.md | 6 ++ .../directory_monitor/directory_monitor.go | 17 +++++- .../directory_monitor_test.go | 59 +++++++++++++++++++ plugins/inputs/file/README.md | 5 +- plugins/inputs/file/file.go | 6 +- 5 files changed, 89 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/directory_monitor/README.md b/plugins/inputs/directory_monitor/README.md index 66d9eb51f..4e260f442 100644 --- a/plugins/inputs/directory_monitor/README.md +++ b/plugins/inputs/directory_monitor/README.md @@ -39,6 +39,12 @@ This plugin is intended to read files that are moved or copied to the monitored ## Lowering this value will result in *slightly* less memory use, with a potential sacrifice in speed efficiency, if absolutely necessary. # file_queue_size = 100000 # + ## Name a tag containing the name of the file the data was parsed from. Leave empty + ## to disable. Cautious when file name variation is high, this can increase the cardinality + ## significantly. Read more about cardinality here: + ## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality + # file_tag = "" + # ## The dataformat to be read from the files. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index a58c03942..ee1163e7a 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -55,6 +55,12 @@ const sampleConfig = ` ## Lowering this value will result in *slightly* less memory use, with a potential sacrifice in speed efficiency, if absolutely necessary. # file_queue_size = 100000 # + ## Name a tag containing the name of the file the data was parsed from. Leave empty + ## to disable. Cautious when file name variation is high, this can increase the cardinality + ## significantly. Read more about cardinality here: + ## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality + # file_tag = "" + # ## The dataformat to be read from the files. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -75,6 +81,7 @@ type DirectoryMonitor struct { Directory string `toml:"directory"` FinishedDirectory string `toml:"finished_directory"` ErrorDirectory string `toml:"error_directory"` + FileTag string `toml:"file_tag"` FilesToMonitor []string `toml:"files_to_monitor"` FilesToIgnore []string `toml:"files_to_ignore"` @@ -250,10 +257,10 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error { reader = file } - return monitor.parseFile(parser, reader) + return monitor.parseFile(parser, reader, file.Name()) } -func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader) error { +func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error { // Read the file line-by-line and parse with the configured parse method. firstLine := true scanner := bufio.NewScanner(reader) @@ -264,6 +271,12 @@ func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Read } firstLine = false + if monitor.FileTag != "" { + for _, m := range metrics { + m.AddTag(monitor.FileTag, filepath.Base(fileName)) + } + } + if err := monitor.sendMetrics(metrics); err != nil { return err } diff --git a/plugins/inputs/directory_monitor/directory_monitor_test.go b/plugins/inputs/directory_monitor/directory_monitor_test.go index 7cda5f2d7..3e954adb4 100644 --- a/plugins/inputs/directory_monitor/directory_monitor_test.go +++ b/plugins/inputs/directory_monitor/directory_monitor_test.go @@ -134,3 +134,62 @@ func TestMultipleJSONFileImports(t *testing.T) { // Verify that we read each JSON line once to a single metric. require.Equal(t, len(acc.Metrics), 5) } + +func TestFileTag(t *testing.T) { + acc := testutil.Accumulator{} + testJSONFile := "test.json" + + // Establish process directory and finished directory. + finishedDirectory, err := os.MkdirTemp("", "finished") + require.NoError(t, err) + processDirectory, err := os.MkdirTemp("", "test") + require.NoError(t, err) + defer os.RemoveAll(processDirectory) + defer os.RemoveAll(finishedDirectory) + + // Init plugin. + r := DirectoryMonitor{ + Directory: processDirectory, + FinishedDirectory: finishedDirectory, + FileTag: "filename", + MaxBufferedMetrics: 1000, + FileQueueSize: 1000, + } + err = r.Init() + require.NoError(t, err) + + parserConfig := parsers.Config{ + DataFormat: "json", + JSONNameKey: "Name", + } + + r.SetParserFunc(func() (parsers.Parser, error) { + return parsers.NewParser(&parserConfig) + }) + + // Let's drop a 1-line LINE-DELIMITED json. + // Write csv file to process into the 'process' directory. + f, err := os.Create(filepath.Join(processDirectory, testJSONFile)) + require.NoError(t, err) + _, err = f.WriteString("{\"Name\": \"event1\",\"Speed\": 100.1,\"Length\": 20.1}") + require.NoError(t, err) + err = f.Close() + require.NoError(t, err) + + err = r.Start(&acc) + r.Log = testutil.Logger{} + require.NoError(t, err) + err = r.Gather(&acc) + require.NoError(t, err) + acc.Wait(1) + r.Stop() + + // Verify that we read each JSON line once to a single metric. + require.Equal(t, len(acc.Metrics), 1) + for _, m := range acc.Metrics { + for key, value := range m.Tags { + require.Equal(t, r.FileTag, key) + require.Equal(t, filepath.Base(testJSONFile), value) + } + } +} diff --git a/plugins/inputs/file/README.md b/plugins/inputs/file/README.md index ef0fb90b0..8ec406da7 100644 --- a/plugins/inputs/file/README.md +++ b/plugins/inputs/file/README.md @@ -20,8 +20,11 @@ plugin instead. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" + ## Name a tag containing the name of the file the data was parsed from. Leave empty - ## to disable. + ## to disable. Cautious when file name variation is high, this can increase the cardinality + ## significantly. Read more about cardinality here: + ## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality # file_tag = "" ``` diff --git a/plugins/inputs/file/file.go b/plugins/inputs/file/file.go index 22af282db..fbfc536a6 100644 --- a/plugins/inputs/file/file.go +++ b/plugins/inputs/file/file.go @@ -29,9 +29,13 @@ const sampleConfig = ` ## as well as ** to match recursive files and directories. files = ["/tmp/metrics.out"] + ## Name a tag containing the name of the file the data was parsed from. Leave empty - ## to disable. + ## to disable. Cautious when file name variation is high, this can increase the cardinality + ## significantly. Read more about cardinality here: + ## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality # file_tag = "" + # ## Character encoding to use when interpreting the file contents. Invalid ## characters are replaced using the unicode replacement character. When set