fix(inputs/directory_monitor): Add support for multiline file parsing (#11234)

This commit is contained in:
Thomas Casteleyn 2022-06-13 16:24:17 +02:00 committed by GitHub
parent f7aab29381
commit 45c88f84c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 152 additions and 29 deletions

View File

@ -2,8 +2,8 @@
This plugin monitors a single directory (without looking at sub-directories),
and takes in each file placed in the directory. The plugin will gather all
files in the directory at a configurable interval (`monitor_interval`), and
parse the ones that haven't been picked up yet.
files in the directory at the configured interval, and parse the ones that
haven't been picked up yet.
This plugin is intended to read files that are moved or copied to the monitored
directory, and thus files should also not be used by another process or else
@ -54,10 +54,18 @@ be guaranteed to finish writing before the `directory_duration_threshold`.
## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality
# file_tag = ""
#
## Specify if the file can be read completely at once or if it needs to be read line by line (default).
## Possible values: "line-by-line", "at-once"
# parse_method = "line-by-line"
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
## NOTE: We currently only support parsing newline-delimited JSON. See the format here: https://github.com/ndjson/ndjson-spec
data_format = "influx"
```
## Metrics
The format of metrics produced by this plugin depends on the content and data
format of the file.

View File

@ -20,6 +20,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
@ -36,6 +37,7 @@ var (
defaultMaxBufferedMetrics = 10000
defaultDirectoryDurationThreshold = config.Duration(0 * time.Millisecond)
defaultFileQueueSize = 100000
defaultParseMethod = "line-by-line"
)
type DirectoryMonitor struct {
@ -50,6 +52,7 @@ type DirectoryMonitor struct {
DirectoryDurationThreshold config.Duration `toml:"directory_duration_threshold"`
Log telegraf.Logger `toml:"-"`
FileQueueSize int `toml:"file_queue_size"`
ParseMethod string `toml:"parse_method"`
filesInUse sync.Map
cancel context.CancelFunc
@ -200,7 +203,7 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error {
parser, err := monitor.parserFunc()
if err != nil {
return fmt.Errorf("E! Creating parser: %s", err.Error())
return fmt.Errorf("creating parser: %w", err)
}
// Handle gzipped files.
@ -218,41 +221,70 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error {
}
func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error {
var splitter bufio.SplitFunc
// Decide on how to split the file
switch monitor.ParseMethod {
case "at-once":
return monitor.parseAtOnce(parser, reader, fileName)
case "line-by-line":
splitter = bufio.ScanLines
default:
return fmt.Errorf("unknown parse method %q", monitor.ParseMethod)
}
scanner := bufio.NewScanner(reader)
scanner.Split(splitter)
for scanner.Scan() {
metrics, err := monitor.parseLine(parser, scanner.Bytes())
metrics, err := monitor.parseMetrics(parser, scanner.Bytes(), fileName)
if err != nil {
return err
}
if monitor.FileTag != "" {
for _, m := range metrics {
m.AddTag(monitor.FileTag, filepath.Base(fileName))
}
}
if err := monitor.sendMetrics(metrics); err != nil {
return err
}
}
return nil
return scanner.Err()
}
func (monitor *DirectoryMonitor) parseLine(parser parsers.Parser, line []byte) ([]telegraf.Metric, error) {
func (monitor *DirectoryMonitor) parseAtOnce(parser parsers.Parser, reader io.Reader, fileName string) error {
bytes, err := io.ReadAll(reader)
if err != nil {
return err
}
metrics, err := monitor.parseMetrics(parser, bytes, fileName)
if err != nil {
return err
}
return monitor.sendMetrics(metrics)
}
func (monitor *DirectoryMonitor) parseMetrics(parser parsers.Parser, line []byte, fileName string) (metrics []telegraf.Metric, err error) {
switch parser.(type) {
case *csv.Parser:
m, err := parser.Parse(line)
metrics, err = parser.Parse(line)
if err != nil {
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, err
}
return m, err
default:
return parser.Parse(line)
metrics, err = parser.Parse(line)
}
if monitor.FileTag != "" {
for _, m := range metrics {
m.AddTag(monitor.FileTag, filepath.Base(fileName))
}
}
return metrics, err
}
func (monitor *DirectoryMonitor) sendMetrics(metrics []telegraf.Metric) error {
@ -357,6 +389,10 @@ func (monitor *DirectoryMonitor) Init() error {
monitor.fileRegexesToIgnore = append(monitor.fileRegexesToIgnore, regex)
}
if err := choice.Check(monitor.ParseMethod, []string{"line-by-line", "at-once"}); err != nil {
return fmt.Errorf("config option parse_method: %w", err)
}
return nil
}
@ -368,6 +404,7 @@ func init() {
MaxBufferedMetrics: defaultMaxBufferedMetrics,
DirectoryDurationThreshold: defaultDirectoryDurationThreshold,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
})
}

View File

@ -9,11 +9,28 @@ import (
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/testutil"
)
func TestCreator(t *testing.T) {
creator, found := inputs.Inputs["directory_monitor"]
require.True(t, found)
expected := &DirectoryMonitor{
FilesToMonitor: defaultFilesToMonitor,
FilesToIgnore: defaultFilesToIgnore,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
DirectoryDurationThreshold: defaultDirectoryDurationThreshold,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
require.Equal(t, expected, creator())
}
func TestCSVGZImport(t *testing.T) {
acc := testutil.Accumulator{}
testCsvFile := "test.csv"
@ -27,8 +44,9 @@ func TestCSVGZImport(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
@ -91,8 +109,9 @@ func TestMultipleJSONFileImports(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 1000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
@ -140,8 +159,9 @@ func TestFileTag(t *testing.T) {
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
FileTag: "filename",
MaxBufferedMetrics: 1000,
FileQueueSize: 1000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
@ -194,8 +214,9 @@ func TestCSVNoSkipRows(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
@ -262,8 +283,9 @@ func TestCSVSkipRows(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
@ -332,8 +354,9 @@ func TestCSVMultiHeader(t *testing.T) {
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: defaultParseMethod,
}
err := r.Init()
require.NoError(t, err)
@ -387,3 +410,55 @@ hello,80,test_name2`
require.Equal(t, expectedFields, m.Fields)
}
}
func TestParseCompleteFile(t *testing.T) {
acc := testutil.Accumulator{}
// Establish process directory and finished directory.
finishedDirectory := t.TempDir()
processDirectory := t.TempDir()
// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: "at-once",
}
err := r.Init()
require.NoError(t, err)
r.Log = testutil.Logger{}
parserConfig := parsers.Config{
DataFormat: "json",
JSONNameKey: "name",
TagKeys: []string{"tag1"},
}
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
})
testJSON := `{
"name": "test1",
"value": 100.1,
"tag1": "value1"
}`
// Write json file to process into the 'process' directory.
f, _ := os.CreateTemp(processDirectory, "test.json")
_, _ = f.WriteString(testJSON)
_ = f.Close()
err = r.Start(&acc)
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(1)
r.Stop()
require.NoError(t, acc.FirstError())
require.Len(t, acc.Metrics, 1)
testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), acc.GetTelegrafMetrics()[0], testutil.IgnoreTime())
}

View File

@ -36,9 +36,12 @@
## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality
# file_tag = ""
#
## Specify if the file can be read completely at once or if it needs to be read line by line (default).
## Possible values: "line-by-line", "at-once"
# parse_method = "line-by-line"
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
## NOTE: We currently only support parsing newline-delimited JSON. See the format here: https://github.com/ndjson/ndjson-spec
data_format = "influx"