fix(inputs.execd): Read from stdout using ReadLine instead of scanner.Scan to overcome 64kb buffer limit (#12935)
This commit is contained in:
parent
c63f5515e9
commit
bfeae49e1b
|
|
@ -61,6 +61,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## Delay before the process is restarted after an unexpected termination
|
## Delay before the process is restarted after an unexpected termination
|
||||||
restart_delay = "10s"
|
restart_delay = "10s"
|
||||||
|
|
||||||
|
## Buffer size used to read from the command output stream
|
||||||
|
## Optional parameter. Default is 64 Kib, minimum is 16 bytes
|
||||||
|
# buffer_size = "64Kib"
|
||||||
|
|
||||||
## Data format to consume.
|
## Data format to consume.
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Each data format has its own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -28,6 +29,7 @@ type Execd struct {
|
||||||
Signal string `toml:"signal"`
|
Signal string `toml:"signal"`
|
||||||
RestartDelay config.Duration `toml:"restart_delay"`
|
RestartDelay config.Duration `toml:"restart_delay"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
BufferSize config.Size `toml:"buffer_size"`
|
||||||
|
|
||||||
process *process.Process
|
process *process.Process
|
||||||
acc telegraf.Accumulator
|
acc telegraf.Accumulator
|
||||||
|
|
@ -82,10 +84,18 @@ func (e *Execd) Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execd) cmdReadOut(out io.Reader) {
|
func (e *Execd) cmdReadOut(out io.Reader) {
|
||||||
scanner := bufio.NewScanner(out)
|
rdr := bufio.NewReaderSize(out, int(e.BufferSize))
|
||||||
|
|
||||||
|
for {
|
||||||
|
data, err := rdr.ReadBytes('\n')
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, io.EOF) || errors.Is(err, os.ErrClosed) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
e.acc.AddError(fmt.Errorf("error reading stdout: %w", err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
for scanner.Scan() {
|
|
||||||
data := scanner.Bytes()
|
|
||||||
metrics, err := e.parser.Parse(data)
|
metrics, err := e.parser.Parse(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.acc.AddError(fmt.Errorf("parse error: %w", err))
|
e.acc.AddError(fmt.Errorf("parse error: %w", err))
|
||||||
|
|
@ -95,10 +105,6 @@ func (e *Execd) cmdReadOut(out io.Reader) {
|
||||||
e.acc.AddMetric(metric)
|
e.acc.AddMetric(metric)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := scanner.Err(); err != nil {
|
|
||||||
e.acc.AddError(fmt.Errorf("error reading stdout: %w", err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Execd) cmdReadOutStream(out io.Reader) {
|
func (e *Execd) cmdReadOutStream(out io.Reader) {
|
||||||
|
|
@ -149,6 +155,7 @@ func init() {
|
||||||
return &Execd{
|
return &Execd{
|
||||||
Signal: "none",
|
Signal: "none",
|
||||||
RestartDelay: config.Duration(10 * time.Second),
|
RestartDelay: config.Duration(10 * time.Second),
|
||||||
|
BufferSize: config.Size(64 * 1024),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,10 @@
|
||||||
## Delay before the process is restarted after an unexpected termination
|
## Delay before the process is restarted after an unexpected termination
|
||||||
restart_delay = "10s"
|
restart_delay = "10s"
|
||||||
|
|
||||||
|
## Buffer size used to read from the command output stream
|
||||||
|
## Optional parameter. Default is 64 Kib, minimum is 16 bytes
|
||||||
|
# buffer_size = "64Kib"
|
||||||
|
|
||||||
## Data format to consume.
|
## Data format to consume.
|
||||||
## Each data format has its own unique set of configuration options, read
|
## Each data format has its own unique set of configuration options, read
|
||||||
## more about them here:
|
## more about them here:
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue