Change to NewStreamParser to accept larger inputs from scanner (#8892)

* change to NewStreamParser to accept larger inputs from scanner

* fmt changes
This commit is contained in:
Dominic Tootell 2021-04-13 22:13:46 +01:00 committed by GitHub
parent 5f26582582
commit f3229f5ec1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 19 deletions

View File

@ -1,14 +1,13 @@
package shim
import (
"bufio"
"fmt"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/processors"
)
@ -37,12 +36,7 @@ func (s *Shim) RunProcessor() error {
acc := agent.NewAccumulator(s, s.metricCh)
acc.SetPrecision(time.Nanosecond)
parser, err := parsers.NewInfluxParser()
if err != nil {
return fmt.Errorf("Failed to create new parser: %w", err)
}
err = s.Processor.Start(acc)
err := s.Processor.Start(acc)
if err != nil {
return fmt.Errorf("failed to start processor: %w", err)
}
@ -54,13 +48,21 @@ func (s *Shim) RunProcessor() error {
wg.Done()
}()
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
m, err := parser.ParseLine(scanner.Text())
parser := influx.NewStreamParser(s.stdin)
for {
m, err := parser.Next()
if err != nil {
fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", err)
if err == influx.EOF {
break // stream ended
}
if parseErr, isParseError := err.(*influx.ParseError); isParseError {
fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", parseErr)
continue
}
fmt.Fprintf(s.stderr, "Failure during reading stdin: %s\b", err)
continue
}
s.Processor.Add(m, acc)
}

View File

@ -4,6 +4,7 @@ import (
"bufio"
"io"
"io/ioutil"
"math/rand"
"sync"
"testing"
"time"
@ -16,7 +17,21 @@ import (
)
func TestProcessorShim(t *testing.T) {
p := &testProcessor{}
testSendAndRecieve(t, "f1", "fv1")
}
func TestProcessorShimWithLargerThanDefaultScannerBufferSize(t *testing.T) {
letters := []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, bufio.MaxScanTokenSize*2)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
testSendAndRecieve(t, "f1", string(b))
}
func testSendAndRecieve(t *testing.T, fieldKey string, fieldValue string) {
p := &testProcessor{"hi", "mom"}
stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()
@ -45,7 +60,8 @@ func TestProcessorShim(t *testing.T) {
"a": "b",
},
map[string]interface{}{
"v": 1,
"v": 1,
fieldKey: fieldValue,
},
time.Now(),
)
@ -62,19 +78,24 @@ func TestProcessorShim(t *testing.T) {
mOut, err := parser.ParseLine(out)
require.NoError(t, err)
val, ok := mOut.GetTag("hi")
val, ok := mOut.GetTag(p.tagName)
require.True(t, ok)
require.Equal(t, "mom", val)
require.Equal(t, p.tagValue, val)
val2, ok := mOut.Fields()[fieldKey]
require.True(t, ok)
require.Equal(t, fieldValue, val2)
go ioutil.ReadAll(r)
wg.Wait()
}
type testProcessor struct{}
type testProcessor struct {
tagName string
tagValue string
}
func (p *testProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
metric.AddTag("hi", "mom")
metric.AddTag(p.tagName, p.tagValue)
}
return in
}