Adding a new directory monitor input plugin. (#8751)

This commit is contained in:
David Bennett 2021-03-02 17:30:59 -05:00 committed by GitHub
parent 30a0fd04cd
commit 600816826d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 598 additions and 0 deletions

View File

@ -185,6 +185,7 @@ following works:
- google.golang.org/genproto [Apache License 2.0](https://github.com/google/go-genproto/blob/master/LICENSE) - google.golang.org/genproto [Apache License 2.0](https://github.com/google/go-genproto/blob/master/LICENSE)
- google.golang.org/grpc [Apache License 2.0](https://github.com/grpc/grpc-go/blob/master/LICENSE) - google.golang.org/grpc [Apache License 2.0](https://github.com/grpc/grpc-go/blob/master/LICENSE)
- gopkg.in/asn1-ber.v1 [MIT License](https://github.com/go-asn1-ber/asn1-ber/blob/v1.3/LICENSE) - gopkg.in/asn1-ber.v1 [MIT License](https://github.com/go-asn1-ber/asn1-ber/blob/v1.3/LICENSE)
- gopkg.in/djherbis/times.v1 [MIT License](https://github.com/djherbis/times/blob/master/LICENSE)
- gopkg.in/fatih/pool.v2 [MIT License](https://github.com/fatih/pool/blob/v2.0.0/LICENSE) - gopkg.in/fatih/pool.v2 [MIT License](https://github.com/fatih/pool/blob/v2.0.0/LICENSE)
- gopkg.in/fsnotify.v1 [BSD 3-Clause "New" or "Revised" License](https://github.com/fsnotify/fsnotify/blob/v1.4.7/LICENSE) - gopkg.in/fsnotify.v1 [BSD 3-Clause "New" or "Revised" License](https://github.com/fsnotify/fsnotify/blob/v1.4.7/LICENSE)
- gopkg.in/gorethink/gorethink.v3 [Apache License 2.0](https://github.com/rethinkdb/rethinkdb-go/blob/v3.0.5/LICENSE) - gopkg.in/gorethink/gorethink.v3 [Apache License 2.0](https://github.com/rethinkdb/rethinkdb-go/blob/v3.0.5/LICENSE)

1
go.mod
View File

@ -148,6 +148,7 @@ require (
google.golang.org/api v0.20.0 google.golang.org/api v0.20.0
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884
google.golang.org/grpc v1.33.1 google.golang.org/grpc v1.33.1
gopkg.in/djherbis/times.v1 v1.2.0
gopkg.in/fatih/pool.v2 v2.0.0 // indirect gopkg.in/fatih/pool.v2 v2.0.0 // indirect
gopkg.in/gorethink/gorethink.v3 v3.0.5 gopkg.in/gorethink/gorethink.v3 v3.0.5
gopkg.in/ldap.v3 v3.1.0 gopkg.in/ldap.v3 v3.1.0

2
go.sum
View File

@ -998,6 +998,8 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogR
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U=
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/djherbis/times.v1 v1.2.0 h1:UCvDKl1L/fmBygl2Y7hubXCnY7t4Yj46ZrBFNUipFbM=
gopkg.in/djherbis/times.v1 v1.2.0/go.mod h1:AQlg6unIsrsCEdQYhTzERy542dz6SFdQFZFv6mUY0P8=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fatih/pool.v2 v2.0.0 h1:xIFeWtxifuQJGk/IEPKsTduEKcKvPmhoiVDGpC40nKg= gopkg.in/fatih/pool.v2 v2.0.0 h1:xIFeWtxifuQJGk/IEPKsTduEKcKvPmhoiVDGpC40nKg=
gopkg.in/fatih/pool.v2 v2.0.0/go.mod h1:8xVGeu1/2jr2wm5V9SPuMht2H5AEmf5aFMGSQixtjTY= gopkg.in/fatih/pool.v2 v2.0.0/go.mod h1:8xVGeu1/2jr2wm5V9SPuMht2H5AEmf5aFMGSQixtjTY=

View File

@ -30,6 +30,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/couchdb" _ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
_ "github.com/influxdata/telegraf/plugins/inputs/cpu" _ "github.com/influxdata/telegraf/plugins/inputs/cpu"
_ "github.com/influxdata/telegraf/plugins/inputs/dcos" _ "github.com/influxdata/telegraf/plugins/inputs/dcos"
_ "github.com/influxdata/telegraf/plugins/inputs/directory_monitor"
_ "github.com/influxdata/telegraf/plugins/inputs/disk" _ "github.com/influxdata/telegraf/plugins/inputs/disk"
_ "github.com/influxdata/telegraf/plugins/inputs/diskio" _ "github.com/influxdata/telegraf/plugins/inputs/diskio"
_ "github.com/influxdata/telegraf/plugins/inputs/disque" _ "github.com/influxdata/telegraf/plugins/inputs/disque"

View File

@ -0,0 +1,48 @@
# Directory Monitor Input Plugin
This plugin monitors a single directory (without looking at sub-directories), and takes in each file placed in the directory.
The plugin will gather all files in the directory at a configurable interval (`monitor_interval`), and parse the ones that haven't been picked up yet.
This plugin is intended to read files that are moved or copied to the monitored directory, and thus files should also not be used by another process or else they may fail to be gathered. Please be advised that this plugin pulls files directly after they've been in the directory for the length of the configurable `directory_duration_threshold`, and thus files should not be written 'live' to the monitored directory. If you absolutely must write files directly, they must be guaranteed to finish writing before the `directory_duration_threshold`.
### Configuration:
```toml
[[inputs.directory_monitor]]
## The directory to monitor and read files from.
directory = ""
#
## The directory to move finished files to.
finished_directory = ""
#
## The directory to move files to upon file error.
## If not provided, erroring files will stay in the monitored directory.
# error_directory = ""
#
## The amount of time a file is allowed to sit in the directory before it is picked up.
## This time can generally be low but if you choose to have a very large file written to the directory and it's potentially slow,
## set this higher so that the plugin will wait until the file is fully copied to the directory.
# directory_duration_threshold = "50ms"
#
## A list of the only file names to monitor, if necessary. Supports regex. If left blank, all files are ingested.
# files_to_monitor = ["^.*\.csv"]
#
## A list of files to ignore, if necessary. Supports regex.
# files_to_ignore = [".DS_Store"]
#
## Maximum lines of the file to process that have not yet be written by the
## output. For best throughput set to the size of the output's metric_buffer_limit.
## Warning: setting this number higher than the output's metric_buffer_limit can cause dropped metrics.
# max_buffered_metrics = 10000
#
## The maximum amount of file paths to queue up for processing at once, before waiting until files are processed to find more files.
## Lowering this value will result in *slightly* less memory use, with a potential sacrifice in speed efficiency, if absolutely necessary.
# file_queue_size = 100000
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
## NOTE: We currently only support parsing newline-delimited JSON. See the format here: https://github.com/ndjson/ndjson-spec
data_format = "influx"
```

View File

@ -0,0 +1,410 @@
package directory_monitor
import (
"bufio"
"context"
"errors"
"fmt"
"regexp"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/selfstat"
"golang.org/x/sync/semaphore"
"gopkg.in/djherbis/times.v1"
"compress/gzip"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
)
const sampleConfig = `
## The directory to monitor and read files from.
directory = ""
#
## The directory to move finished files to.
finished_directory = ""
#
## The directory to move files to upon file error.
## If not provided, erroring files will stay in the monitored directory.
# error_directory = ""
#
## The amount of time a file is allowed to sit in the directory before it is picked up.
## This time can generally be low but if you choose to have a very large file written to the directory and it's potentially slow,
## set this higher so that the plugin will wait until the file is fully copied to the directory.
# directory_duration_threshold = "50ms"
#
## A list of the only file names to monitor, if necessary. Supports regex. If left blank, all files are ingested.
# files_to_monitor = ["^.*\.csv"]
#
## A list of files to ignore, if necessary. Supports regex.
# files_to_ignore = [".DS_Store"]
#
## Maximum lines of the file to process that have not yet be written by the
## output. For best throughput set to the size of the output's metric_buffer_limit.
## Warning: setting this number higher than the output's metric_buffer_limit can cause dropped metrics.
# max_buffered_metrics = 10000
#
## The maximum amount of file paths to queue up for processing at once, before waiting until files are processed to find more files.
## Lowering this value will result in *slightly* less memory use, with a potential sacrifice in speed efficiency, if absolutely necessary.
# file_queue_size = 100000
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
## NOTE: We currently only support parsing newline-delimited JSON. See the format here: https://github.com/ndjson/ndjson-spec
data_format = "influx"
`
var (
defaultFilesToMonitor = []string{}
defaultFilesToIgnore = []string{}
defaultMaxBufferedMetrics = 10000
defaultDirectoryDurationThreshold = config.Duration(0 * time.Millisecond)
defaultFileQueueSize = 100000
)
type DirectoryMonitor struct {
Directory string `toml:"directory"`
FinishedDirectory string `toml:"finished_directory"`
ErrorDirectory string `toml:"error_directory"`
FilesToMonitor []string `toml:"files_to_monitor"`
FilesToIgnore []string `toml:"files_to_ignore"`
MaxBufferedMetrics int `toml:"max_buffered_metrics"`
DirectoryDurationThreshold config.Duration `toml:"directory_duration_threshold"`
Log telegraf.Logger `toml:"-"`
FileQueueSize int `toml:"file_queue_size"`
filesInUse sync.Map
cancel context.CancelFunc
context context.Context
parserFunc parsers.ParserFunc
filesProcessed selfstat.Stat
filesDropped selfstat.Stat
waitGroup *sync.WaitGroup
acc telegraf.TrackingAccumulator
sem *semaphore.Weighted
fileRegexesToMatch []*regexp.Regexp
fileRegexesToIgnore []*regexp.Regexp
filesToProcess chan string
}
func (monitor *DirectoryMonitor) SampleConfig() string {
return sampleConfig
}
func (monitor *DirectoryMonitor) Description() string {
return "Ingests files in a directory and then moves them to a target directory."
}
func (monitor *DirectoryMonitor) Gather(acc telegraf.Accumulator) error {
// Get all files sitting in the directory.
files, err := ioutil.ReadDir(monitor.Directory)
if err != nil {
return fmt.Errorf("unable to monitor the targeted directory: %w", err)
}
for _, file := range files {
filePath := monitor.Directory + "/" + file.Name()
// We've been cancelled via Stop().
if monitor.context.Err() != nil {
return nil
}
stat, err := times.Stat(filePath)
if err != nil {
continue
}
timeThresholdExceeded := time.Since(stat.AccessTime()) >= time.Duration(monitor.DirectoryDurationThreshold)
// If file is decaying, process it.
if timeThresholdExceeded {
monitor.processFile(file, acc)
}
}
return nil
}
func (monitor *DirectoryMonitor) Start(acc telegraf.Accumulator) error {
// Use tracking to determine when more metrics can be added without overflowing the outputs.
monitor.acc = acc.WithTracking(monitor.MaxBufferedMetrics)
go func() {
for range monitor.acc.Delivered() {
monitor.sem.Release(1)
}
}()
// Monitor the files channel and read what they receive.
monitor.waitGroup.Add(1)
go func() {
monitor.Monitor(acc)
monitor.waitGroup.Done()
}()
return nil
}
func (monitor *DirectoryMonitor) Stop() {
// Before stopping, wrap up all file-reading routines.
monitor.cancel()
close(monitor.filesToProcess)
monitor.Log.Warnf("Exiting the Directory Monitor plugin. Waiting to quit until all current files are finished.")
monitor.waitGroup.Wait()
}
func (monitor *DirectoryMonitor) Monitor(acc telegraf.Accumulator) {
for filePath := range monitor.filesToProcess {
if monitor.context.Err() != nil {
return
}
// Prevent goroutines from taking the same file as another.
if _, exists := monitor.filesInUse.LoadOrStore(filePath, true); exists {
continue
}
monitor.read(filePath)
// We've finished reading the file and moved it away, delete it from files in use.
monitor.filesInUse.Delete(filePath)
}
}
func (monitor *DirectoryMonitor) processFile(file os.FileInfo, acc telegraf.Accumulator) {
if file.IsDir() {
return
}
filePath := monitor.Directory + "/" + file.Name()
// File must be configured to be monitored, if any configuration...
if !monitor.isMonitoredFile(file.Name()) {
return
}
// ...and should not be configured to be ignored.
if monitor.isIgnoredFile(file.Name()) {
return
}
select {
case monitor.filesToProcess <- filePath:
default:
}
}
func (monitor *DirectoryMonitor) read(filePath string) {
// Open, read, and parse the contents of the file.
err := monitor.ingestFile(filePath)
if _, isPathError := err.(*os.PathError); isPathError {
return
}
// Handle a file read error. We don't halt execution but do document, log, and move the problematic file.
if err != nil {
monitor.Log.Errorf("Error while reading file: '" + filePath + "'. " + err.Error())
monitor.filesDropped.Incr(1)
if monitor.ErrorDirectory != "" {
monitor.moveFile(filePath, monitor.ErrorDirectory)
}
return
}
// File is finished, move it to the 'finished' directory.
monitor.moveFile(filePath, monitor.FinishedDirectory)
monitor.filesProcessed.Incr(1)
}
func (monitor *DirectoryMonitor) ingestFile(filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
parser, err := monitor.parserFunc()
if err != nil {
return fmt.Errorf("E! Creating parser: %s", err.Error())
}
// Handle gzipped files.
var reader io.Reader
if filepath.Ext(filePath) == ".gz" {
reader, err = gzip.NewReader(file)
if err != nil {
return err
}
} else {
reader = file
}
return monitor.parseFile(parser, reader)
}
func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader) error {
// Read the file line-by-line and parse with the configured parse method.
firstLine := true
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
metrics, err := monitor.parseLine(parser, scanner.Bytes(), firstLine)
if err != nil {
return err
}
if firstLine {
firstLine = false
}
monitor.sendMetrics(metrics)
}
return nil
}
func (monitor *DirectoryMonitor) parseLine(parser parsers.Parser, line []byte, firstLine bool) ([]telegraf.Metric, error) {
switch parser.(type) {
case *csv.Parser:
// The CSV parser parses headers in Parse and skips them in ParseLine.
if firstLine {
return parser.Parse(line)
}
m, err := parser.ParseLine(string(line))
if err != nil {
return nil, err
}
if m != nil {
return []telegraf.Metric{m}, nil
}
return []telegraf.Metric{}, nil
default:
return parser.Parse(line)
}
}
func (monitor *DirectoryMonitor) sendMetrics(metrics []telegraf.Metric) {
// Report the metrics for the file.
for _, m := range metrics {
// Block until metric can be written.
monitor.sem.Acquire(monitor.context, 1)
monitor.acc.AddTrackingMetricGroup([]telegraf.Metric{m})
}
}
func (monitor *DirectoryMonitor) moveFile(filePath string, directory string) {
err := os.Rename(filePath, directory+"/"+filepath.Base(filePath))
if err != nil {
monitor.Log.Errorf("Error while moving file '" + filePath + "' to another directory. Error: " + err.Error())
}
}
func (monitor *DirectoryMonitor) isMonitoredFile(fileName string) bool {
if len(monitor.fileRegexesToMatch) == 0 {
return true
}
// Only monitor matching files.
for _, regex := range monitor.fileRegexesToMatch {
if regex.MatchString(fileName) {
return true
}
}
return false
}
func (monitor *DirectoryMonitor) isIgnoredFile(fileName string) bool {
// Skip files that are set to be ignored.
for _, regex := range monitor.fileRegexesToIgnore {
if regex.MatchString(fileName) {
return true
}
}
return false
}
func (monitor *DirectoryMonitor) SetParserFunc(fn parsers.ParserFunc) {
monitor.parserFunc = fn
}
func (monitor *DirectoryMonitor) Init() error {
if monitor.Directory == "" || monitor.FinishedDirectory == "" {
return errors.New("Missing one of the following required config options: directory, finished_directory.")
}
if monitor.FileQueueSize <= 0 {
return errors.New("file queue size needs to be more than 0")
}
// Finished directory can be created if not exists for convenience.
if _, err := os.Stat(monitor.FinishedDirectory); os.IsNotExist(err) {
err = os.Mkdir(monitor.FinishedDirectory, 0777)
if err != nil {
return err
}
}
monitor.filesDropped = selfstat.Register("directory_monitor", "files_dropped", map[string]string{})
monitor.filesProcessed = selfstat.Register("directory_monitor", "files_processed", map[string]string{})
// If an error directory should be used but has not been configured yet, create one ourselves.
if monitor.ErrorDirectory != "" {
if _, err := os.Stat(monitor.ErrorDirectory); os.IsNotExist(err) {
err := os.Mkdir(monitor.ErrorDirectory, 0777)
if err != nil {
return err
}
}
}
monitor.waitGroup = &sync.WaitGroup{}
monitor.sem = semaphore.NewWeighted(int64(monitor.MaxBufferedMetrics))
monitor.context, monitor.cancel = context.WithCancel(context.Background())
monitor.filesToProcess = make(chan string, monitor.FileQueueSize)
// Establish file matching / exclusion regexes.
for _, matcher := range monitor.FilesToMonitor {
regex, err := regexp.Compile(matcher)
if err != nil {
return err
}
monitor.fileRegexesToMatch = append(monitor.fileRegexesToMatch, regex)
}
for _, matcher := range monitor.FilesToIgnore {
regex, err := regexp.Compile(matcher)
if err != nil {
return err
}
monitor.fileRegexesToIgnore = append(monitor.fileRegexesToIgnore, regex)
}
return nil
}
func init() {
inputs.Add("directory_monitor", func() telegraf.Input {
return &DirectoryMonitor{
FilesToMonitor: defaultFilesToMonitor,
FilesToIgnore: defaultFilesToIgnore,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
DirectoryDurationThreshold: defaultDirectoryDurationThreshold,
FileQueueSize: defaultFileQueueSize,
}
})
}

View File

@ -0,0 +1,135 @@
package directory_monitor
import (
"bytes"
"compress/gzip"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestCSVGZImport(t *testing.T) {
acc := testutil.Accumulator{}
testCsvFile := "test.csv"
testCsvGzFile := "test.csv.gz"
// Establish process directory and finished directory.
finishedDirectory, err := ioutil.TempDir("", "finished")
require.NoError(t, err)
processDirectory, err := ioutil.TempDir("", "test")
require.NoError(t, err)
defer os.RemoveAll(processDirectory)
defer os.RemoveAll(finishedDirectory)
// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 100000,
}
err = r.Init()
require.NoError(t, err)
parserConfig := parsers.Config{
DataFormat: "csv",
CSVHeaderRowCount: 1,
}
require.NoError(t, err)
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
})
r.Log = testutil.Logger{}
// Write csv file to process into the 'process' directory.
f, err := os.Create(filepath.Join(processDirectory, testCsvFile))
require.NoError(t, err)
f.WriteString("thing,color\nsky,blue\ngrass,green\nclifford,red\n")
f.Close()
// Write csv.gz file to process into the 'process' directory.
var b bytes.Buffer
w := gzip.NewWriter(&b)
w.Write([]byte("thing,color\nsky,blue\ngrass,green\nclifford,red\n"))
w.Close()
err = ioutil.WriteFile(filepath.Join(processDirectory, testCsvGzFile), b.Bytes(), 0666)
// Start plugin before adding file.
err = r.Start(&acc)
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(6)
r.Stop()
// Verify that we read both files once.
require.Equal(t, len(acc.Metrics), 6)
// File should have gone back to the test directory, as we configured.
_, err = os.Stat(filepath.Join(finishedDirectory, testCsvFile))
_, err = os.Stat(filepath.Join(finishedDirectory, testCsvGzFile))
require.NoError(t, err)
}
// For JSON data.
type event struct {
Name string
Speed float64
Length float64
}
func TestMultipleJSONFileImports(t *testing.T) {
acc := testutil.Accumulator{}
testJsonFile := "test.json"
// Establish process directory and finished directory.
finishedDirectory, err := ioutil.TempDir("", "finished")
require.NoError(t, err)
processDirectory, err := ioutil.TempDir("", "test")
require.NoError(t, err)
defer os.RemoveAll(processDirectory)
defer os.RemoveAll(finishedDirectory)
// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
MaxBufferedMetrics: 1000,
FileQueueSize: 1000,
}
err = r.Init()
require.NoError(t, err)
parserConfig := parsers.Config{
DataFormat: "json",
JSONNameKey: "Name",
}
r.SetParserFunc(func() (parsers.Parser, error) {
return parsers.NewParser(&parserConfig)
})
// Let's drop a 5-line LINE-DELIMITED json.
// Write csv file to process into the 'process' directory.
f, err := os.Create(filepath.Join(processDirectory, testJsonFile))
require.NoError(t, err)
f.WriteString("{\"Name\": \"event1\",\"Speed\": 100.1,\"Length\": 20.1}\n{\"Name\": \"event2\",\"Speed\": 500,\"Length\": 1.4}\n{\"Name\": \"event3\",\"Speed\": 200,\"Length\": 10.23}\n{\"Name\": \"event4\",\"Speed\": 80,\"Length\": 250}\n{\"Name\": \"event5\",\"Speed\": 120.77,\"Length\": 25.97}")
f.Close()
err = r.Start(&acc)
r.Log = testutil.Logger{}
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(5)
r.Stop()
// Verify that we read each JSON line once to a single metric.
require.Equal(t, len(acc.Metrics), 5)
}