exec plugins should not truncate messages in debug mode (#8333)

This commit is contained in:
Steven Soroka 2021-03-18 17:21:30 -04:00 committed by GitHub
parent 30830c2ec2
commit 4dcc3c0ad7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 63 additions and 31 deletions

View File

@ -15,6 +15,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
@ -158,8 +159,9 @@ func runAgent(ctx context.Context,
} }
// Setup logging as configured. // Setup logging as configured.
telegraf.Debug = ag.Config.Agent.Debug || *fDebug
logConfig := logger.LogConfig{ logConfig := logger.LogConfig{
Debug: ag.Config.Agent.Debug || *fDebug, Debug: telegraf.Debug,
Quiet: ag.Config.Agent.Quiet || *fQuiet, Quiet: ag.Config.Agent.Quiet || *fQuiet,
LogTarget: ag.Config.Agent.LogTarget, LogTarget: ag.Config.Agent.LogTarget,
Logfile: ag.Config.Agent.Logfile, Logfile: ag.Config.Agent.Logfile,

View File

@ -1,5 +1,7 @@
package telegraf package telegraf
var Debug bool
// Initializer is an interface that all plugin types: Inputs, Outputs, // Initializer is an interface that all plugin types: Inputs, Outputs,
// Processors, and Aggregators can optionally implement to initialize the // Processors, and Aggregators can optionally implement to initialize the
// plugin. // plugin.
@ -21,7 +23,7 @@ type PluginDescriber interface {
Description() string Description() string
} }
// Logger defines an interface for logging. // Logger defines an plugin-related interface for logging.
type Logger interface { type Logger interface {
// Errorf logs an error message, patterned after log.Printf. // Errorf logs an error message, patterned after log.Printf.
Errorf(format string, args ...interface{}) Errorf(format string, args ...interface{})

View File

@ -3,6 +3,7 @@ package exec
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"runtime" "runtime"
@ -39,12 +40,12 @@ const sampleConfig = `
data_format = "influx" data_format = "influx"
` `
const MaxStderrBytes = 512 const MaxStderrBytes int = 512
type Exec struct { type Exec struct {
Commands []string Commands []string `toml:"commands"`
Command string Command string `toml:"command"`
Timeout internal.Duration Timeout internal.Duration `toml:"timeout"`
parser parsers.Parser parser parsers.Parser
@ -85,16 +86,16 @@ func (c CommandRunner) Run(
runErr := internal.RunTimeout(cmd, timeout) runErr := internal.RunTimeout(cmd, timeout)
out = removeCarriageReturns(out) out = removeWindowsCarriageReturns(out)
if stderr.Len() > 0 { if stderr.Len() > 0 && !telegraf.Debug {
stderr = removeCarriageReturns(stderr) stderr = removeWindowsCarriageReturns(stderr)
stderr = truncate(stderr) stderr = c.truncate(stderr)
} }
return out.Bytes(), stderr.Bytes(), runErr return out.Bytes(), stderr.Bytes(), runErr
} }
func truncate(buf bytes.Buffer) bytes.Buffer { func (c CommandRunner) truncate(buf bytes.Buffer) bytes.Buffer {
// Limit the number of bytes. // Limit the number of bytes.
didTruncate := false didTruncate := false
if buf.Len() > MaxStderrBytes { if buf.Len() > MaxStderrBytes {
@ -114,27 +115,21 @@ func truncate(buf bytes.Buffer) bytes.Buffer {
return buf return buf
} }
// removeCarriageReturns removes all carriage returns from the input if the // removeWindowsCarriageReturns removes all carriage returns from the input if the
// OS is Windows. It does not return any errors. // OS is Windows. It does not return any errors.
func removeCarriageReturns(b bytes.Buffer) bytes.Buffer { func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer {
if runtime.GOOS == "windows" { if runtime.GOOS == "windows" {
var buf bytes.Buffer var buf bytes.Buffer
for { for {
byt, er := b.ReadBytes(0x0D) byt, err := b.ReadBytes(0x0D)
end := len(byt) byt = bytes.TrimRight(byt, "\x0d")
if nil == er { if len(byt) > 0 {
end-- _, _ = buf.Write(byt)
} }
if nil != byt { if err == io.EOF {
buf.Write(byt[:end]) return buf
} else {
break
}
if nil != er {
break
} }
} }
b = buf
} }
return b return b
} }

View File

@ -259,9 +259,10 @@ func TestTruncate(t *testing.T) {
}, },
} }
c := CommandRunner{}
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
res := truncate(*tt.bufF()) res := c.truncate(*tt.bufF())
require.Equal(t, tt.expF().Bytes(), res.Bytes()) require.Equal(t, tt.expF().Bytes(), res.Bytes())
}) })
} }
@ -272,14 +273,14 @@ func TestRemoveCarriageReturns(t *testing.T) {
// Test that all carriage returns are removed // Test that all carriage returns are removed
for _, test := range crTests { for _, test := range crTests {
b := bytes.NewBuffer(test.input) b := bytes.NewBuffer(test.input)
out := removeCarriageReturns(*b) out := removeWindowsCarriageReturns(*b)
assert.True(t, bytes.Equal(test.output, out.Bytes())) assert.True(t, bytes.Equal(test.output, out.Bytes()))
} }
} else { } else {
// Test that the buffer is returned unaltered // Test that the buffer is returned unaltered
for _, test := range crTests { for _, test := range crTests {
b := bytes.NewBuffer(test.input) b := bytes.NewBuffer(test.input)
out := removeCarriageReturns(*b) out := removeWindowsCarriageReturns(*b)
assert.True(t, bytes.Equal(test.input, out.Bytes())) assert.True(t, bytes.Equal(test.input, out.Bytes()))
} }
} }

View File

@ -8,6 +8,8 @@ The command should be defined similar to docker's `exec` form:
On non-zero exit stderr will be logged at error level. On non-zero exit stderr will be logged at error level.
For better performance, consider execd, which runs continuously.
### Configuration ### Configuration
```toml ```toml

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"log" "log"
"os/exec" "os/exec"
"runtime"
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
@ -39,6 +40,10 @@ var sampleConfig = `
# data_format = "influx" # data_format = "influx"
` `
func (e *Exec) Init() error {
return nil
}
// SetSerializer sets the serializer for the output. // SetSerializer sets the serializer for the output.
func (e *Exec) SetSerializer(serializer serializers.Serializer) { func (e *Exec) SetSerializer(serializer serializers.Serializer) {
e.serializer = serializer e.serializer = serializer
@ -105,8 +110,13 @@ func (c *CommandRunner) Run(timeout time.Duration, command []string, buffer io.R
return fmt.Errorf("%q timed out and was killed", command) return fmt.Errorf("%q timed out and was killed", command)
} }
s = removeWindowsCarriageReturns(s)
if s.Len() > 0 { if s.Len() > 0 {
log.Printf("E! [outputs.exec] Command error: %q", truncate(s)) if !telegraf.Debug {
log.Printf("E! [outputs.exec] Command error: %q", c.truncate(s))
} else {
log.Printf("D! [outputs.exec] Command error: %q", s)
}
} }
if status, ok := internal.ExitStatus(err); ok { if status, ok := internal.ExitStatus(err); ok {
@ -121,7 +131,7 @@ func (c *CommandRunner) Run(timeout time.Duration, command []string, buffer io.R
return nil return nil
} }
func truncate(buf bytes.Buffer) string { func (c *CommandRunner) truncate(buf bytes.Buffer) string {
// Limit the number of bytes. // Limit the number of bytes.
didTruncate := false didTruncate := false
if buf.Len() > maxStderrBytes { if buf.Len() > maxStderrBytes {
@ -149,3 +159,22 @@ func init() {
} }
}) })
} }
// removeWindowsCarriageReturns removes all carriage returns from the input if the
// OS is Windows. It does not return any errors.
func removeWindowsCarriageReturns(b bytes.Buffer) bytes.Buffer {
if runtime.GOOS == "windows" {
var buf bytes.Buffer
for {
byt, err := b.ReadBytes(0x0D)
byt = bytes.TrimRight(byt, "\x0d")
if len(byt) > 0 {
_, _ = buf.Write(byt)
}
if err == io.EOF {
return buf
}
}
}
return b
}

View File

@ -83,9 +83,10 @@ func TestTruncate(t *testing.T) {
len: len("hola") + len("..."), len: len("hola") + len("..."),
}, },
} }
c := CommandRunner{}
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
s := truncate(*tt.buf) s := c.truncate(*tt.buf)
require.Equal(t, tt.len, len(s)) require.Equal(t, tt.len, len(s))
}) })
} }