From 4ae8c60178bd414b8d3526ab34c51ecdeb1eb6ee Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 3 Feb 2022 17:15:38 +0100 Subject: [PATCH] fix: Statefull parser handling (#10575) --- plugins/inputs/file/file.go | 25 +++--- plugins/inputs/file/file_test.go | 147 +++++++++++++++++++++++++++---- plugins/inputs/http/http.go | 27 +++--- plugins/inputs/http/http_test.go | 123 +++++++++++++++++++------- 4 files changed, 252 insertions(+), 70 deletions(-) diff --git a/plugins/inputs/file/file.go b/plugins/inputs/file/file.go index fbfc536a6..5670fc34a 100644 --- a/plugins/inputs/file/file.go +++ b/plugins/inputs/file/file.go @@ -11,17 +11,16 @@ import ( "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/common/encoding" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/plugins/parsers" ) type File struct { Files []string `toml:"files"` FileTag string `toml:"file_tag"` CharacterEncoding string `toml:"character_encoding"` - parser parsers.Parser - filenames []string - decoder *encoding.Decoder + parserFunc telegraf.ParserFunc + filenames []string + decoder *encoding.Decoder } const sampleConfig = ` @@ -29,10 +28,10 @@ const sampleConfig = ` ## as well as ** to match recursive files and directories. files = ["/tmp/metrics.out"] - + ## Name a tag containing the name of the file the data was parsed from. Leave empty - ## to disable. Cautious when file name variation is high, this can increase the cardinality - ## significantly. Read more about cardinality here: + ## to disable. Cautious when file name variation is high, this can increase the cardinality + ## significantly. Read more about cardinality here: ## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality # file_tag = "" # @@ -89,8 +88,8 @@ func (f *File) Gather(acc telegraf.Accumulator) error { return nil } -func (f *File) SetParser(p parsers.Parser) { - f.parser = p +func (f *File) SetParserFunc(fn telegraf.ParserFunc) { + f.parserFunc = fn } func (f *File) refreshFilePaths() error { @@ -121,9 +120,13 @@ func (f *File) readMetric(filename string) ([]telegraf.Metric, error) { r, _ := utfbom.Skip(f.decoder.Reader(file)) fileContents, err := io.ReadAll(r) if err != nil { - return nil, fmt.Errorf("E! Error file: %v could not be read, %s", filename, err) + return nil, fmt.Errorf("could not read %q: %s", filename, err) } - return f.parser.Parse(fileContents) + parser, err := f.parserFunc() + if err != nil { + return nil, fmt.Errorf("could not instantiate parser: %s", err) + } + return parser.Parse(fileContents) } func init() { diff --git a/plugins/inputs/file/file_test.go b/plugins/inputs/file/file_test.go index d34cef175..e99f68f17 100644 --- a/plugins/inputs/file/file_test.go +++ b/plugins/inputs/file/file_test.go @@ -49,9 +49,7 @@ func TestFileTag(t *testing.T) { parserConfig := parsers.Config{ DataFormat: "json", } - nParser, err := parsers.NewParser(&parserConfig) - require.NoError(t, err) - r.parser = nParser + r.SetParserFunc(func() (telegraf.Parser, error) { return parsers.NewParser(&parserConfig) }) err = r.Gather(&acc) require.NoError(t, err) @@ -76,9 +74,7 @@ func TestJSONParserCompile(t *testing.T) { DataFormat: "json", TagKeys: []string{"parent_ignored_child"}, } - nParser, err := parsers.NewParser(&parserConfig) - require.NoError(t, err) - r.parser = nParser + r.SetParserFunc(func() (telegraf.Parser, error) { return parsers.NewParser(&parserConfig) }) require.NoError(t, r.Gather(&acc)) require.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags) @@ -99,9 +95,7 @@ func TestGrokParser(t *testing.T) { GrokPatterns: []string{"%{COMMON_LOG_FORMAT}"}, } - nParser, err := parsers.NewParser(&parserConfig) - r.parser = nParser - require.NoError(t, err) + r.SetParserFunc(func() (telegraf.Parser, error) { return parsers.NewParser(&parserConfig) }) err = r.Gather(&acc) require.NoError(t, err) @@ -183,7 +177,7 @@ func TestCharacterEncoding(t *testing.T) { tests := []struct { name string plugin *File - csv *csv.Parser + csv csv.Parser file string }{ { @@ -192,7 +186,7 @@ func TestCharacterEncoding(t *testing.T) { Files: []string{"testdata/mtr-utf-8.csv"}, CharacterEncoding: "", }, - csv: &csv.Parser{ + csv: csv.Parser{ MetricName: "file", SkipRows: 1, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, @@ -205,7 +199,7 @@ func TestCharacterEncoding(t *testing.T) { Files: []string{"testdata/mtr-utf-8.csv"}, CharacterEncoding: "utf-8", }, - csv: &csv.Parser{ + csv: csv.Parser{ MetricName: "file", SkipRows: 1, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, @@ -218,7 +212,7 @@ func TestCharacterEncoding(t *testing.T) { Files: []string{"testdata/mtr-utf-16le.csv"}, CharacterEncoding: "utf-16le", }, - csv: &csv.Parser{ + csv: csv.Parser{ MetricName: "file", SkipRows: 1, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, @@ -231,7 +225,7 @@ func TestCharacterEncoding(t *testing.T) { Files: []string{"testdata/mtr-utf-16be.csv"}, CharacterEncoding: "utf-16be", }, - csv: &csv.Parser{ + csv: csv.Parser{ MetricName: "file", SkipRows: 1, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, @@ -244,9 +238,11 @@ func TestCharacterEncoding(t *testing.T) { err := tt.plugin.Init() require.NoError(t, err) - parser := tt.csv - require.NoError(t, parser.Init()) - tt.plugin.SetParser(parser) + tt.plugin.SetParserFunc(func() (telegraf.Parser, error) { + parser := tt.csv + err := parser.Init() + return &parser, err + }) var acc testutil.Accumulator err = tt.plugin.Gather(&acc) @@ -256,3 +252,120 @@ func TestCharacterEncoding(t *testing.T) { }) } } + +func TestStatefulParsers(t *testing.T) { + expected := []telegraf.Metric{ + testutil.MustMetric("file", + map[string]string{ + "dest": "example.org", + "hop": "1", + "ip": "12.122.114.5", + }, + map[string]interface{}{ + "avg": 21.55, + "best": 19.34, + "loss": 0.0, + "snt": 10, + "status": "OK", + "stdev": 2.05, + "worst": 26.83, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("file", + map[string]string{ + "dest": "example.org", + "hop": "2", + "ip": "192.205.32.238", + }, + map[string]interface{}{ + "avg": 25.11, + "best": 20.8, + "loss": 0.0, + "snt": 10, + "status": "OK", + "stdev": 6.03, + "worst": 38.85, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("file", + map[string]string{ + "dest": "example.org", + "hop": "3", + "ip": "152.195.85.133", + }, + map[string]interface{}{ + "avg": 20.18, + "best": 19.75, + "loss": 0.0, + "snt": 10, + "status": "OK", + "stdev": 0.0, + "worst": 20.78, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("file", + map[string]string{ + "dest": "example.org", + "hop": "4", + "ip": "93.184.216.34", + }, + map[string]interface{}{ + "avg": 24.02, + "best": 19.75, + "loss": 0.0, + "snt": 10, + "status": "OK", + "stdev": 4.67, + "worst": 32.41, + }, + time.Unix(0, 0), + ), + } + + tests := []struct { + name string + plugin *File + csv csv.Parser + file string + count int + }{ + { + name: "read file twice", + plugin: &File{ + Files: []string{"testdata/mtr-utf-8.csv"}, + CharacterEncoding: "", + }, + csv: csv.Parser{ + MetricName: "file", + SkipRows: 1, + ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, + TagColumns: []string{"dest", "hop", "ip"}, + }, + count: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.plugin.Init() + require.NoError(t, err) + + tt.plugin.SetParserFunc(func() (telegraf.Parser, error) { + parser := tt.csv + err := parser.Init() + return &parser, err + }) + + var acc testutil.Accumulator + for i := 0; i < tt.count; i++ { + require.NoError(t, tt.plugin.Gather(&acc)) + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) + acc.ClearMetrics() + } + }) + } +} diff --git a/plugins/inputs/http/http.go b/plugins/inputs/http/http.go index 42bc10c62..4555d8909 100644 --- a/plugins/inputs/http/http.go +++ b/plugins/inputs/http/http.go @@ -13,7 +13,6 @@ import ( "github.com/influxdata/telegraf/internal" httpconfig "github.com/influxdata/telegraf/plugins/common/http" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/plugins/parsers" ) type HTTP struct { @@ -33,13 +32,12 @@ type HTTP struct { SuccessStatusCodes []int `toml:"success_status_codes"` - client *http.Client - httpconfig.HTTPClientConfig Log telegraf.Logger `toml:"-"` - // The parser will automatically be set by Telegraf core code because - // this plugin implements the ParserInput interface (i.e. the SetParser method) - parser parsers.Parser + httpconfig.HTTPClientConfig + + client *http.Client + parserFunc telegraf.ParserFunc } var sampleConfig = ` @@ -153,9 +151,9 @@ func (h *HTTP) Gather(acc telegraf.Accumulator) error { return nil } -// SetParser takes the data_format from the config and finds the right parser for that format -func (h *HTTP) SetParser(parser parsers.Parser) { - h.parser = parser +// SetParserFunc takes the data_format from the config and finds the right parser for that format +func (h *HTTP) SetParserFunc(fn telegraf.ParserFunc) { + h.parserFunc = fn } // Gathers data from a particular URL @@ -230,12 +228,17 @@ func (h *HTTP) gatherURL( b, err := io.ReadAll(resp.Body) if err != nil { - return err + return fmt.Errorf("reading body failed: %v", err) } - metrics, err := h.parser.Parse(b) + // Instantiate a new parser for the new data to avoid trouble with stateful parsers + parser, err := h.parserFunc() if err != nil { - return err + return fmt.Errorf("instantiating parser failed: %v", err) + } + metrics, err := parser.Parse(b) + if err != nil { + return fmt.Errorf("parsing metrics failed: %v", err) } for _, metric := range metrics { diff --git a/plugins/inputs/http/http_test.go b/plugins/inputs/http/http_test.go index 4c73a2411..80454e80c 100644 --- a/plugins/inputs/http/http_test.go +++ b/plugins/inputs/http/http_test.go @@ -8,13 +8,16 @@ import ( "net/http/httptest" "net/url" "testing" + "time" "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" httpconfig "github.com/influxdata/telegraf/plugins/common/http" "github.com/influxdata/telegraf/plugins/common/oauth" httpplugin "github.com/influxdata/telegraf/plugins/inputs/http" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/testutil" ) @@ -35,11 +38,12 @@ func TestHTTPWithJSONFormat(t *testing.T) { } metricName := "metricName" - p, _ := parsers.NewParser(&parsers.Config{ - DataFormat: "json", - MetricName: "metricName", + plugin.SetParserFunc(func() (telegraf.Parser, error) { + return parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "metricName", + }) }) - plugin.SetParser(p) var acc testutil.Accumulator require.NoError(t, plugin.Init()) @@ -78,11 +82,12 @@ func TestHTTPHeaders(t *testing.T) { Log: testutil.Logger{}, } - p, _ := parsers.NewParser(&parsers.Config{ - DataFormat: "json", - MetricName: "metricName", + plugin.SetParserFunc(func() (telegraf.Parser, error) { + return parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "metricName", + }) }) - plugin.SetParser(p) var acc testutil.Accumulator require.NoError(t, plugin.Init()) @@ -101,12 +106,12 @@ func TestInvalidStatusCode(t *testing.T) { Log: testutil.Logger{}, } - metricName := "metricName" - p, _ := parsers.NewParser(&parsers.Config{ - DataFormat: "json", - MetricName: metricName, + plugin.SetParserFunc(func() (telegraf.Parser, error) { + return parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "metricName", + }) }) - plugin.SetParser(p) var acc testutil.Accumulator require.NoError(t, plugin.Init()) @@ -126,12 +131,12 @@ func TestSuccessStatusCodes(t *testing.T) { Log: testutil.Logger{}, } - metricName := "metricName" - p, _ := parsers.NewParser(&parsers.Config{ - DataFormat: "json", - MetricName: metricName, + plugin.SetParserFunc(func() (telegraf.Parser, error) { + return parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "metricName", + }) }) - plugin.SetParser(p) var acc testutil.Accumulator require.NoError(t, plugin.Init()) @@ -154,11 +159,12 @@ func TestMethod(t *testing.T) { Log: testutil.Logger{}, } - p, _ := parsers.NewParser(&parsers.Config{ - DataFormat: "json", - MetricName: "metricName", + plugin.SetParserFunc(func() (telegraf.Parser, error) { + return parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "metricName", + }) }) - plugin.SetParser(p) var acc testutil.Accumulator require.NoError(t, plugin.Init()) @@ -170,6 +176,11 @@ const simpleJSON = ` "a": 1.2 } ` +const simpleCSVWithHeader = ` +# Simple CSV with header(s) +a,b,c +1.2,3.1415,ok +` func TestBodyAndContentEncoding(t *testing.T) { ts := httptest.NewServer(http.NotFoundHandler()) @@ -253,15 +264,13 @@ func TestBodyAndContentEncoding(t *testing.T) { tt.queryHandlerFunc(t, w, r) }) - parser, err := parsers.NewParser(&parsers.Config{DataFormat: "influx"}) - require.NoError(t, err) - - tt.plugin.SetParser(parser) + tt.plugin.SetParserFunc(func() (telegraf.Parser, error) { + return parsers.NewParser(&parsers.Config{DataFormat: "influx"}) + }) var acc testutil.Accumulator require.NoError(t, tt.plugin.Init()) - err = tt.plugin.Gather(&acc) - require.NoError(t, err) + require.NoError(t, tt.plugin.Gather(&acc)) }) } } @@ -335,8 +344,10 @@ func TestOAuthClientCredentialsGrant(t *testing.T) { } }) - parser, _ := parsers.NewValueParser("metric", "string", "", nil) - tt.plugin.SetParser(parser) + tt.plugin.SetParserFunc(func() (telegraf.Parser, error) { + return parsers.NewValueParser("metric", "string", "", nil) + }) + err = tt.plugin.Init() require.NoError(t, err) @@ -346,3 +357,55 @@ func TestOAuthClientCredentialsGrant(t *testing.T) { }) } } + +func TestHTTPWithCSVFormat(t *testing.T) { + fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/endpoint" { + _, _ = w.Write([]byte(simpleCSVWithHeader)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer fakeServer.Close() + + address := fakeServer.URL + "/endpoint" + plugin := &httpplugin.HTTP{ + URLs: []string{address}, + Log: testutil.Logger{}, + } + + plugin.SetParserFunc(func() (telegraf.Parser, error) { + parser := &csv.Parser{ + MetricName: "metricName", + SkipRows: 2, + ColumnNames: []string{"a", "b", "c"}, + TagColumns: []string{"c"}, + } + err := parser.Init() + return parser, err + }) + + expected := []telegraf.Metric{ + testutil.MustMetric("metricName", + map[string]string{ + "url": address, + "c": "ok", + }, + map[string]interface{}{ + "a": 1.2, + "b": 3.1415, + }, + time.Unix(0, 0), + ), + } + + var acc testutil.Accumulator + require.NoError(t, plugin.Init()) + require.NoError(t, acc.GatherError(plugin.Gather)) + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) + + // Run the parser a second time to test for correct stateful handling + acc.ClearMetrics() + require.NoError(t, acc.GatherError(plugin.Gather)) + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) +}