feat(inputs.directory_monitor): Improve internal stats (#13089)

This commit is contained in:
Thomas Casteleyn 2023-05-17 20:05:48 +02:00 committed by GitHub
parent f171d62ac1
commit f0dc15fd9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 0 deletions

View File

@ -82,7 +82,23 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
The format of metrics produced by this plugin depends on the content and data The format of metrics produced by this plugin depends on the content and data
format of the file. format of the file.
When the [internal][] input is enabled:
- internal_directory_monitor
- fields:
- files_processed - How many files have been processed (counter)
- files_dropped - How many files have been dropped (counter)
- internal_directory_monitor
- tags:
- directory - The monitored directory
- fields:
- files_processed_per_dir - How many files have been processed (counter)
- files_dropped_per_dir - How many files have been dropped (counter)
- files_queue_per_dir - How many files to be processed (gauge)
## Example Output ## Example Output
The metrics produced by this plugin depends on the content and data The metrics produced by this plugin depends on the content and data
format of the file. format of the file.
[internal]: /plugins/inputs/internal

View File

@ -59,7 +59,10 @@ type DirectoryMonitor struct {
context context.Context context context.Context
parserFunc parsers.ParserFunc parserFunc parsers.ParserFunc
filesProcessed selfstat.Stat filesProcessed selfstat.Stat
filesProcessedDir selfstat.Stat
filesDropped selfstat.Stat filesDropped selfstat.Stat
filesDroppedDir selfstat.Stat
filesQueuedDir selfstat.Stat
waitGroup *sync.WaitGroup waitGroup *sync.WaitGroup
acc telegraf.TrackingAccumulator acc telegraf.TrackingAccumulator
sem *semaphore.Weighted sem *semaphore.Weighted
@ -174,6 +177,9 @@ func (monitor *DirectoryMonitor) Monitor() {
// We've finished reading the file and moved it away, delete it from files in use. // We've finished reading the file and moved it away, delete it from files in use.
monitor.filesInUse.Delete(filePath) monitor.filesInUse.Delete(filePath)
// Keep track of how many files still to process
monitor.filesQueuedDir.Set(int64(len(monitor.filesToProcess)))
} }
} }
@ -208,6 +214,7 @@ func (monitor *DirectoryMonitor) read(filePath string) {
if err != nil { if err != nil {
monitor.Log.Errorf("Error while reading file: '" + filePath + "'. " + err.Error()) monitor.Log.Errorf("Error while reading file: '" + filePath + "'. " + err.Error())
monitor.filesDropped.Incr(1) monitor.filesDropped.Incr(1)
monitor.filesDroppedDir.Incr(1)
if monitor.ErrorDirectory != "" { if monitor.ErrorDirectory != "" {
monitor.moveFile(filePath, monitor.ErrorDirectory) monitor.moveFile(filePath, monitor.ErrorDirectory)
} }
@ -217,6 +224,7 @@ func (monitor *DirectoryMonitor) read(filePath string) {
// File is finished, move it to the 'finished' directory. // File is finished, move it to the 'finished' directory.
monitor.moveFile(filePath, monitor.FinishedDirectory) monitor.moveFile(filePath, monitor.FinishedDirectory)
monitor.filesProcessed.Incr(1) monitor.filesProcessed.Incr(1)
monitor.filesProcessedDir.Incr(1)
} }
func (monitor *DirectoryMonitor) ingestFile(filePath string) error { func (monitor *DirectoryMonitor) ingestFile(filePath string) error {
@ -404,8 +412,14 @@ func (monitor *DirectoryMonitor) Init() error {
} }
} }
tags := map[string]string{
"directory": monitor.Directory,
}
monitor.filesDropped = selfstat.Register("directory_monitor", "files_dropped", map[string]string{}) monitor.filesDropped = selfstat.Register("directory_monitor", "files_dropped", map[string]string{})
monitor.filesDroppedDir = selfstat.Register("directory_monitor", "files_dropped_per_dir", tags)
monitor.filesProcessed = selfstat.Register("directory_monitor", "files_processed", map[string]string{}) monitor.filesProcessed = selfstat.Register("directory_monitor", "files_processed", map[string]string{})
monitor.filesProcessedDir = selfstat.Register("directory_monitor", "files_processed_per_dir", tags)
monitor.filesQueuedDir = selfstat.Register("directory_monitor", "files_queue_per_dir", tags)
// If an error directory should be used but has not been configured yet, create one ourselves. // If an error directory should be used but has not been configured yet, create one ourselves.
if monitor.ErrorDirectory != "" { if monitor.ErrorDirectory != "" {