feat(inputs.directory_monitor): Support paths for files_to_ignore and files_to_monitor (#11784)
This commit is contained in:
parent
78bdf9a1b2
commit
ba62aca4e2
|
|
@ -76,7 +76,7 @@ func (*DirectoryMonitor) SampleConfig() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error {
|
func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error {
|
||||||
processFile := func(path string, name string) error {
|
processFile := func(path string) error {
|
||||||
// We've been cancelled via Stop().
|
// We've been cancelled via Stop().
|
||||||
if monitor.context.Err() != nil {
|
if monitor.context.Err() != nil {
|
||||||
return io.EOF
|
return io.EOF
|
||||||
|
|
@ -92,7 +92,7 @@ func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error {
|
||||||
|
|
||||||
// If file is decaying, process it.
|
// If file is decaying, process it.
|
||||||
if timeThresholdExceeded {
|
if timeThresholdExceeded {
|
||||||
monitor.processFile(name, path)
|
monitor.processFile(path)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -104,7 +104,7 @@ func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return processFile(path, info.Name())
|
return processFile(path)
|
||||||
})
|
})
|
||||||
// We've been cancelled via Stop().
|
// We've been cancelled via Stop().
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
|
@ -126,7 +126,7 @@ func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
path := monitor.Directory + "/" + file.Name()
|
path := monitor.Directory + "/" + file.Name()
|
||||||
err := processFile(path, file.Name())
|
err := processFile(path)
|
||||||
// We've been cancelled via Stop().
|
// We've been cancelled via Stop().
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
//nolint:nilerr // context cancelation is not an error
|
//nolint:nilerr // context cancelation is not an error
|
||||||
|
|
@ -183,14 +183,16 @@ func (monitor *DirectoryMonitor) Monitor() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (monitor *DirectoryMonitor) processFile(name string, path string) {
|
func (monitor *DirectoryMonitor) processFile(path string) {
|
||||||
|
basePath := strings.Replace(path, monitor.Directory, "", 1)
|
||||||
|
|
||||||
// File must be configured to be monitored, if any configuration...
|
// File must be configured to be monitored, if any configuration...
|
||||||
if !monitor.isMonitoredFile(name) {
|
if !monitor.isMonitoredFile(basePath) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ...and should not be configured to be ignored.
|
// ...and should not be configured to be ignored.
|
||||||
if monitor.isIgnoredFile(name) {
|
if monitor.isIgnoredFile(basePath) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
@ -528,3 +529,79 @@ func TestParseSubdirectories(t *testing.T) {
|
||||||
_, err = os.Stat(filepath.Join(finishedDirectory, "sub", testJSONFile))
|
_, err = os.Stat(filepath.Join(finishedDirectory, "sub", testJSONFile))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParseSubdirectoriesFilesIgnore(t *testing.T) {
|
||||||
|
acc := testutil.Accumulator{}
|
||||||
|
|
||||||
|
// Establish process directory and finished directory.
|
||||||
|
finishedDirectory := t.TempDir()
|
||||||
|
processDirectory := t.TempDir()
|
||||||
|
|
||||||
|
filesToIgnore := `sub/test.json`
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
filesToIgnore = `\\sub\\test.json`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init plugin.
|
||||||
|
r := DirectoryMonitor{
|
||||||
|
Directory: processDirectory,
|
||||||
|
FinishedDirectory: finishedDirectory,
|
||||||
|
Recursive: true,
|
||||||
|
MaxBufferedMetrics: defaultMaxBufferedMetrics,
|
||||||
|
FileQueueSize: defaultFileQueueSize,
|
||||||
|
ParseMethod: "at-once",
|
||||||
|
FilesToIgnore: []string{filesToIgnore},
|
||||||
|
}
|
||||||
|
err := r.Init()
|
||||||
|
require.NoError(t, err)
|
||||||
|
r.Log = testutil.Logger{}
|
||||||
|
|
||||||
|
r.SetParserFunc(func() (parsers.Parser, error) {
|
||||||
|
parser := &json.Parser{
|
||||||
|
NameKey: "name",
|
||||||
|
TagKeys: []string{"tag1"},
|
||||||
|
}
|
||||||
|
err := parser.Init()
|
||||||
|
return parser, err
|
||||||
|
})
|
||||||
|
|
||||||
|
testJSON := `{
|
||||||
|
"name": "test1",
|
||||||
|
"value": 100.1,
|
||||||
|
"tag1": "value1"
|
||||||
|
}`
|
||||||
|
|
||||||
|
// Write json file to process into the 'process' directory.
|
||||||
|
testJSONFile := "test.json"
|
||||||
|
f, err := os.Create(filepath.Join(processDirectory, testJSONFile))
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = f.WriteString(testJSON)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = f.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Write json file to process into a subdirectory in the the 'process' directory.
|
||||||
|
err = os.Mkdir(filepath.Join(processDirectory, "sub"), os.ModePerm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
f, err = os.Create(filepath.Join(processDirectory, "sub", testJSONFile))
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = f.WriteString(testJSON)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = f.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = r.Start(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = r.Gather(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
acc.Wait(1)
|
||||||
|
r.Stop()
|
||||||
|
|
||||||
|
require.NoError(t, acc.FirstError())
|
||||||
|
require.Len(t, acc.Metrics, 1)
|
||||||
|
testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), acc.GetTelegrafMetrics()[0], testutil.IgnoreTime())
|
||||||
|
|
||||||
|
// File should have gone back to the test directory, as we configured.
|
||||||
|
_, err = os.Stat(filepath.Join(finishedDirectory, testJSONFile))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue