Adding support for new lines in influx line protocol fields. (#8499)

This commit is contained in:
David Bennett 2020-12-04 16:47:58 -05:00 committed by GitHub
parent 2187baceea
commit f7950be107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 84 additions and 0 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/process"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/serializers"
)
@ -117,6 +118,12 @@ func (e *Execd) Stop() error {
}
func (e *Execd) cmdReadOut(out io.Reader) {
// Prefer using the StreamParser when parsing influx format.
if _, isInfluxParser := e.parser.(*influx.Parser); isInfluxParser {
e.cmdReadOutStream(out)
return
}
scanner := bufio.NewScanner(out)
scanBuf := make([]byte, 4096)
scanner.Buffer(scanBuf, 262144)
@ -137,6 +144,33 @@ func (e *Execd) cmdReadOut(out io.Reader) {
}
}
func (e *Execd) cmdReadOutStream(out io.Reader) {
parser := influx.NewStreamParser(out)
for {
metric, err := parser.Next()
if err != nil {
// Stop parsing when we've reached the end.
if err == influx.EOF {
break
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
// Continue past parse errors.
e.acc.AddError(parseErr)
continue
}
// Stop reading on any non-recoverable error.
e.acc.AddError(err)
return
}
e.acc.AddMetric(metric)
}
}
func (e *Execd) cmdReadErr(out io.Reader) {
scanner := bufio.NewScanner(out)

View File

@ -79,6 +79,56 @@ func TestExternalProcessorWorks(t *testing.T) {
}
}
func TestParseLinesWithNewLines(t *testing.T) {
e := New()
e.Log = testutil.Logger{}
exe, err := os.Executable()
require.NoError(t, err)
t.Log(exe)
e.Command = []string{exe, "-countmultiplier"}
e.RestartDelay = config.Duration(5 * time.Second)
acc := &testutil.Accumulator{}
require.NoError(t, e.Start(acc))
now := time.Now()
orig := now
m, err := metric.New("test",
map[string]string{
"author": "Mr. Gopher",
},
map[string]interface{}{
"phrase": "Gophers are amazing creatures.\nAbsolutely amazing.",
"count": 3,
},
now)
require.NoError(t, err)
e.Add(m, acc)
acc.Wait(1)
require.NoError(t, e.Stop())
processedMetric := acc.GetTelegrafMetrics()[0]
expectedMetric := testutil.MustMetric("test",
map[string]string{
"author": "Mr. Gopher",
},
map[string]interface{}{
"phrase": "Gophers are amazing creatures.\nAbsolutely amazing.",
"count": 6,
},
orig,
)
testutil.RequireMetricEqual(t, expectedMetric, processedMetric)
}
var countmultiplier = flag.Bool("countmultiplier", false,
"if true, act like line input program instead of test")