feat(inputs.directory_monitor): Traverse sub-directories (#11773)

This commit is contained in:
Sebastian Spaink 2022-09-07 14:08:42 -05:00 committed by GitHub
parent dc9abf3f04
commit f238df20ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 150 additions and 32 deletions

View File

@ -1,6 +1,6 @@
# Directory Monitor Input Plugin
This plugin monitors a single directory (without looking at sub-directories),
This plugin monitors a single directory (traversing sub-directories),
and takes in each file placed in the directory. The plugin will gather all
files in the directory at the configured interval, and parse the ones that
haven't been picked up yet.
@ -18,12 +18,15 @@ be guaranteed to finish writing before the `directory_duration_threshold`.
```toml @sample.conf
# Ingests files in a directory and then moves them to a target directory.
[[inputs.directory_monitor]]
## The directory to monitor and read files from.
## The directory to monitor and read files from (including sub-directories if "recursive" is true).
directory = ""
#
## The directory to move finished files to.
## The directory to move finished files to (maintaining directory hierachy from source).
finished_directory = ""
#
## Setting recursive to true will make the plugin recursively walk the directory and process all sub-directories.
# recursive = false
#
## The directory to move files to upon file error.
## If not provided, erroring files will stay in the monitored directory.
# error_directory = ""
@ -56,7 +59,7 @@ be guaranteed to finish writing before the `directory_duration_threshold`.
#
## Specify if the file can be read completely at once or if it needs to be read line by line (default).
## Possible values: "line-by-line", "at-once"
# parse_method = "line-by-line"
# parse_method = "line-by-line"
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
@ -69,3 +72,8 @@ be guaranteed to finish writing before the `directory_duration_threshold`.
The format of metrics produced by this plugin depends on the content and data
format of the file.
## Example Output
The metrics produced by this plugin depends on the content and data
format of the file.

View File

@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
@ -28,6 +29,7 @@ import (
)
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
//
//go:embed sample.conf
var sampleConfig string
@ -43,6 +45,7 @@ var (
type DirectoryMonitor struct {
Directory string `toml:"directory"`
FinishedDirectory string `toml:"finished_directory"`
Recursive bool `toml:"recursive"`
ErrorDirectory string `toml:"error_directory"`
FileTag string `toml:"file_tag"`
@ -73,31 +76,62 @@ func (*DirectoryMonitor) SampleConfig() string {
}
func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error {
// Get all files sitting in the directory.
files, err := os.ReadDir(monitor.Directory)
if err != nil {
return fmt.Errorf("unable to monitor the targeted directory: %w", err)
}
for _, file := range files {
filePath := monitor.Directory + "/" + file.Name()
processFile := func(path string, name string) error {
// We've been cancelled via Stop().
if monitor.context.Err() != nil {
//nolint:nilerr // context cancelation is not an error
return nil
return io.EOF
}
stat, err := times.Stat(filePath)
stat, err := times.Stat(path)
if err != nil {
continue
// Don't stop traversing if there is an eror
return nil //nolint:nilerr
}
timeThresholdExceeded := time.Since(stat.AccessTime()) >= time.Duration(monitor.DirectoryDurationThreshold)
// If file is decaying, process it.
if timeThresholdExceeded {
monitor.processFile(file)
monitor.processFile(name, path)
}
return nil
}
if monitor.Recursive {
err := filepath.Walk(monitor.Directory,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
return processFile(path, info.Name())
})
// We've been cancelled via Stop().
if err == io.EOF {
//nolint:nilerr // context cancelation is not an error
return nil
}
if err != nil {
return err
}
} else {
// Get all files sitting in the directory.
files, err := os.ReadDir(monitor.Directory)
if err != nil {
return fmt.Errorf("unable to monitor the targeted directory: %w", err)
}
for _, file := range files {
if file.IsDir() {
continue
}
path := monitor.Directory + "/" + file.Name()
err := processFile(path, file.Name())
// We've been cancelled via Stop().
if err == io.EOF {
//nolint:nilerr // context cancelation is not an error
return nil
}
}
}
@ -149,25 +183,19 @@ func (monitor *DirectoryMonitor) Monitor() {
}
}
func (monitor *DirectoryMonitor) processFile(file os.DirEntry) {
if file.IsDir() {
return
}
filePath := monitor.Directory + "/" + file.Name()
func (monitor *DirectoryMonitor) processFile(name string, path string) {
// File must be configured to be monitored, if any configuration...
if !monitor.isMonitoredFile(file.Name()) {
if !monitor.isMonitoredFile(name) {
return
}
// ...and should not be configured to be ignored.
if monitor.isIgnoredFile(file.Name()) {
if monitor.isIgnoredFile(name) {
return
}
select {
case monitor.filesToProcess <- filePath:
case monitor.filesToProcess <- path:
default:
}
}
@ -300,8 +328,15 @@ func (monitor *DirectoryMonitor) sendMetrics(metrics []telegraf.Metric) error {
}
func (monitor *DirectoryMonitor) moveFile(filePath string, directory string) {
err := os.Rename(filePath, directory+"/"+filepath.Base(filePath))
basePath := strings.Replace(filePath, monitor.Directory, "", 1)
newPath := filepath.Join(directory, basePath)
err := os.MkdirAll(filepath.Dir(newPath), os.ModePerm)
if err != nil {
monitor.Log.Errorf("Error creating directory hierachy for " + filePath + ". Error: " + err.Error())
}
err = os.Rename(filePath, newPath)
if err != nil {
monitor.Log.Errorf("Error while moving file '" + filePath + "' to another directory. Error: " + err.Error())
}

View File

@ -456,3 +456,75 @@ func TestParseCompleteFile(t *testing.T) {
require.Len(t, acc.Metrics, 1)
testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), acc.GetTelegrafMetrics()[0], testutil.IgnoreTime())
}
func TestParseSubdirectories(t *testing.T) {
acc := testutil.Accumulator{}
// Establish process directory and finished directory.
finishedDirectory := t.TempDir()
processDirectory := t.TempDir()
// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
Recursive: true,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: "at-once",
}
err := r.Init()
require.NoError(t, err)
r.Log = testutil.Logger{}
r.SetParserFunc(func() (parsers.Parser, error) {
parser := &json.Parser{
NameKey: "name",
TagKeys: []string{"tag1"},
}
err := parser.Init()
return parser, err
})
testJSON := `{
"name": "test1",
"value": 100.1,
"tag1": "value1"
}`
// Write json file to process into the 'process' directory.
testJSONFile := "test.json"
f, err := os.Create(filepath.Join(processDirectory, testJSONFile))
require.NoError(t, err)
_, err = f.WriteString(testJSON)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)
// Write json file to process into a subdirectory in the the 'process' directory.
err = os.Mkdir(filepath.Join(processDirectory, "sub"), os.ModePerm)
require.NoError(t, err)
f, err = os.Create(filepath.Join(processDirectory, "sub", testJSONFile))
require.NoError(t, err)
_, err = f.WriteString(testJSON)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)
err = r.Start(&acc)
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
r.Stop()
require.NoError(t, acc.FirstError())
require.Len(t, acc.Metrics, 2)
testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), acc.GetTelegrafMetrics()[0], testutil.IgnoreTime())
// File should have gone back to the test directory, as we configured.
_, err = os.Stat(filepath.Join(finishedDirectory, testJSONFile))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(finishedDirectory, "sub", testJSONFile))
require.NoError(t, err)
}

View File

@ -1,11 +1,14 @@
# Ingests files in a directory and then moves them to a target directory.
[[inputs.directory_monitor]]
## The directory to monitor and read files from.
## The directory to monitor and read files from (including sub-directories if "recursive" is true).
directory = ""
#
## The directory to move finished files to.
## The directory to move finished files to (maintaining directory hierachy from source).
finished_directory = ""
#
## Setting recursive to true will make the plugin recursively walk the directory and process all sub-directories.
# recursive = false
#
## The directory to move files to upon file error.
## If not provided, erroring files will stay in the monitored directory.
# error_directory = ""
@ -38,7 +41,7 @@
#
## Specify if the file can be read completely at once or if it needs to be read line by line (default).
## Possible values: "line-by-line", "at-once"
# parse_method = "line-by-line"
# parse_method = "line-by-line"
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read