feat: Adds the ability to create and name a tag containing the filename using the directory monitor input plugin (#9860)

Co-authored-by: Ehsan Yazdi <Ehsan.Yazdi@elexonmining.com>
This commit is contained in:
etycomputer 2021-10-08 06:38:20 +10:00 committed by GitHub
parent 128ed8849b
commit d06b387528
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 89 additions and 4 deletions

View File

@ -39,6 +39,12 @@ This plugin is intended to read files that are moved or copied to the monitored
## Lowering this value will result in *slightly* less memory use, with a potential sacrifice in speed efficiency, if absolutely necessary.
# file_queue_size = 100000
#
## Name a tag containing the name of the file the data was parsed from. Leave empty
## to disable. Cautious when file name variation is high, this can increase the cardinality
## significantly. Read more about cardinality here:
## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality
# file_tag = ""
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
## more about them here:

View File

@ -55,6 +55,12 @@ const sampleConfig = `
## Lowering this value will result in *slightly* less memory use, with a potential sacrifice in speed efficiency, if absolutely necessary.
# file_queue_size = 100000
#
## Name a tag containing the name of the file the data was parsed from. Leave empty
## to disable. Cautious when file name variation is high, this can increase the cardinality
## significantly. Read more about cardinality here:
## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality
# file_tag = ""
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
## more about them here:
@ -75,6 +81,7 @@ type DirectoryMonitor struct {
Directory string `toml:"directory"`
FinishedDirectory string `toml:"finished_directory"`
ErrorDirectory string `toml:"error_directory"`
FileTag string `toml:"file_tag"`
FilesToMonitor []string `toml:"files_to_monitor"`
FilesToIgnore []string `toml:"files_to_ignore"`
@ -250,10 +257,10 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error {
reader = file
}
return monitor.parseFile(parser, reader)
return monitor.parseFile(parser, reader, file.Name())
}
func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader) error {
func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error {
// Read the file line-by-line and parse with the configured parse method.
firstLine := true
scanner := bufio.NewScanner(reader)
@ -264,6 +271,12 @@ func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Read
}
firstLine = false
if monitor.FileTag != "" {
for _, m := range metrics {
m.AddTag(monitor.FileTag, filepath.Base(fileName))
}
}
if err := monitor.sendMetrics(metrics); err != nil {
return err
}

View File

@ -134,3 +134,62 @@ func TestMultipleJSONFileImports(t *testing.T) {
// Verify that we read each JSON line once to a single metric.
require.Equal(t, len(acc.Metrics), 5)
}
func TestFileTag(t *testing.T) {
acc := testutil.Accumulator{}
testJSONFile := "test.json"
// Establish process directory and finished directory.
finishedDirectory, err := os.MkdirTemp("", "finished")
require.NoError(t, err)
processDirectory, err := os.MkdirTemp("", "test")
require.NoError(t, err)
defer os.RemoveAll(processDirectory)
defer os.RemoveAll(finishedDirectory)
// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
FileTag: "filename",
MaxBufferedMetrics: 1000,
FileQueueSize: 1000,
}
err = r.Init()
require.NoError(t, err)
parserConfig := parsers.Config{
DataFormat: "json",
JSONNameKey: "Name",
}
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
})
// Let's drop a 1-line LINE-DELIMITED json.
// Write csv file to process into the 'process' directory.
f, err := os.Create(filepath.Join(processDirectory, testJSONFile))
require.NoError(t, err)
_, err = f.WriteString("{\"Name\": \"event1\",\"Speed\": 100.1,\"Length\": 20.1}")
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)
err = r.Start(&acc)
r.Log = testutil.Logger{}
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(1)
r.Stop()
// Verify that we read each JSON line once to a single metric.
require.Equal(t, len(acc.Metrics), 1)
for _, m := range acc.Metrics {
for key, value := range m.Tags {
require.Equal(t, r.FileTag, key)
require.Equal(t, filepath.Base(testJSONFile), value)
}
}
}

View File

@ -20,8 +20,11 @@ plugin instead.
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Name a tag containing the name of the file the data was parsed from. Leave empty
## to disable.
## to disable. Cautious when file name variation is high, this can increase the cardinality
## significantly. Read more about cardinality here:
## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality
# file_tag = ""
```

View File

@ -29,9 +29,13 @@ const sampleConfig = `
## as well as ** to match recursive files and directories.
files = ["/tmp/metrics.out"]
## Name a tag containing the name of the file the data was parsed from. Leave empty
## to disable.
## to disable. Cautious when file name variation is high, this can increase the cardinality
## significantly. Read more about cardinality here:
## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality
# file_tag = ""
#
## Character encoding to use when interpreting the file contents. Invalid
## characters are replaced using the unicode replacement character. When set