fix: Statefull parser handling (#10575)

This commit is contained in:
Sven Rebhan 2022-02-03 17:15:38 +01:00 committed by GitHub
parent f8d5f385c8
commit 4ae8c60178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 252 additions and 70 deletions

View File

@ -11,17 +11,16 @@ import (
"github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/common/encoding" "github.com/influxdata/telegraf/plugins/common/encoding"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
) )
type File struct { type File struct {
Files []string `toml:"files"` Files []string `toml:"files"`
FileTag string `toml:"file_tag"` FileTag string `toml:"file_tag"`
CharacterEncoding string `toml:"character_encoding"` CharacterEncoding string `toml:"character_encoding"`
parser parsers.Parser
filenames []string parserFunc telegraf.ParserFunc
decoder *encoding.Decoder filenames []string
decoder *encoding.Decoder
} }
const sampleConfig = ` const sampleConfig = `
@ -89,8 +88,8 @@ func (f *File) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
func (f *File) SetParser(p parsers.Parser) { func (f *File) SetParserFunc(fn telegraf.ParserFunc) {
f.parser = p f.parserFunc = fn
} }
func (f *File) refreshFilePaths() error { func (f *File) refreshFilePaths() error {
@ -121,9 +120,13 @@ func (f *File) readMetric(filename string) ([]telegraf.Metric, error) {
r, _ := utfbom.Skip(f.decoder.Reader(file)) r, _ := utfbom.Skip(f.decoder.Reader(file))
fileContents, err := io.ReadAll(r) fileContents, err := io.ReadAll(r)
if err != nil { if err != nil {
return nil, fmt.Errorf("E! Error file: %v could not be read, %s", filename, err) return nil, fmt.Errorf("could not read %q: %s", filename, err)
} }
return f.parser.Parse(fileContents) parser, err := f.parserFunc()
if err != nil {
return nil, fmt.Errorf("could not instantiate parser: %s", err)
}
return parser.Parse(fileContents)
} }
func init() { func init() {

View File

@ -49,9 +49,7 @@ func TestFileTag(t *testing.T) {
parserConfig := parsers.Config{ parserConfig := parsers.Config{
DataFormat: "json", DataFormat: "json",
} }
nParser, err := parsers.NewParser(&parserConfig) r.SetParserFunc(func() (telegraf.Parser, error) { return parsers.NewParser(&parserConfig) })
require.NoError(t, err)
r.parser = nParser
err = r.Gather(&acc) err = r.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
@ -76,9 +74,7 @@ func TestJSONParserCompile(t *testing.T) {
DataFormat: "json", DataFormat: "json",
TagKeys: []string{"parent_ignored_child"}, TagKeys: []string{"parent_ignored_child"},
} }
nParser, err := parsers.NewParser(&parserConfig) r.SetParserFunc(func() (telegraf.Parser, error) { return parsers.NewParser(&parserConfig) })
require.NoError(t, err)
r.parser = nParser
require.NoError(t, r.Gather(&acc)) require.NoError(t, r.Gather(&acc))
require.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags) require.Equal(t, map[string]string{"parent_ignored_child": "hi"}, acc.Metrics[0].Tags)
@ -99,9 +95,7 @@ func TestGrokParser(t *testing.T) {
GrokPatterns: []string{"%{COMMON_LOG_FORMAT}"}, GrokPatterns: []string{"%{COMMON_LOG_FORMAT}"},
} }
nParser, err := parsers.NewParser(&parserConfig) r.SetParserFunc(func() (telegraf.Parser, error) { return parsers.NewParser(&parserConfig) })
r.parser = nParser
require.NoError(t, err)
err = r.Gather(&acc) err = r.Gather(&acc)
require.NoError(t, err) require.NoError(t, err)
@ -183,7 +177,7 @@ func TestCharacterEncoding(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
plugin *File plugin *File
csv *csv.Parser csv csv.Parser
file string file string
}{ }{
{ {
@ -192,7 +186,7 @@ func TestCharacterEncoding(t *testing.T) {
Files: []string{"testdata/mtr-utf-8.csv"}, Files: []string{"testdata/mtr-utf-8.csv"},
CharacterEncoding: "", CharacterEncoding: "",
}, },
csv: &csv.Parser{ csv: csv.Parser{
MetricName: "file", MetricName: "file",
SkipRows: 1, SkipRows: 1,
ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"},
@ -205,7 +199,7 @@ func TestCharacterEncoding(t *testing.T) {
Files: []string{"testdata/mtr-utf-8.csv"}, Files: []string{"testdata/mtr-utf-8.csv"},
CharacterEncoding: "utf-8", CharacterEncoding: "utf-8",
}, },
csv: &csv.Parser{ csv: csv.Parser{
MetricName: "file", MetricName: "file",
SkipRows: 1, SkipRows: 1,
ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"},
@ -218,7 +212,7 @@ func TestCharacterEncoding(t *testing.T) {
Files: []string{"testdata/mtr-utf-16le.csv"}, Files: []string{"testdata/mtr-utf-16le.csv"},
CharacterEncoding: "utf-16le", CharacterEncoding: "utf-16le",
}, },
csv: &csv.Parser{ csv: csv.Parser{
MetricName: "file", MetricName: "file",
SkipRows: 1, SkipRows: 1,
ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"},
@ -231,7 +225,7 @@ func TestCharacterEncoding(t *testing.T) {
Files: []string{"testdata/mtr-utf-16be.csv"}, Files: []string{"testdata/mtr-utf-16be.csv"},
CharacterEncoding: "utf-16be", CharacterEncoding: "utf-16be",
}, },
csv: &csv.Parser{ csv: csv.Parser{
MetricName: "file", MetricName: "file",
SkipRows: 1, SkipRows: 1,
ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"},
@ -244,9 +238,11 @@ func TestCharacterEncoding(t *testing.T) {
err := tt.plugin.Init() err := tt.plugin.Init()
require.NoError(t, err) require.NoError(t, err)
parser := tt.csv tt.plugin.SetParserFunc(func() (telegraf.Parser, error) {
require.NoError(t, parser.Init()) parser := tt.csv
tt.plugin.SetParser(parser) err := parser.Init()
return &parser, err
})
var acc testutil.Accumulator var acc testutil.Accumulator
err = tt.plugin.Gather(&acc) err = tt.plugin.Gather(&acc)
@ -256,3 +252,120 @@ func TestCharacterEncoding(t *testing.T) {
}) })
} }
} }
func TestStatefulParsers(t *testing.T) {
expected := []telegraf.Metric{
testutil.MustMetric("file",
map[string]string{
"dest": "example.org",
"hop": "1",
"ip": "12.122.114.5",
},
map[string]interface{}{
"avg": 21.55,
"best": 19.34,
"loss": 0.0,
"snt": 10,
"status": "OK",
"stdev": 2.05,
"worst": 26.83,
},
time.Unix(0, 0),
),
testutil.MustMetric("file",
map[string]string{
"dest": "example.org",
"hop": "2",
"ip": "192.205.32.238",
},
map[string]interface{}{
"avg": 25.11,
"best": 20.8,
"loss": 0.0,
"snt": 10,
"status": "OK",
"stdev": 6.03,
"worst": 38.85,
},
time.Unix(0, 0),
),
testutil.MustMetric("file",
map[string]string{
"dest": "example.org",
"hop": "3",
"ip": "152.195.85.133",
},
map[string]interface{}{
"avg": 20.18,
"best": 19.75,
"loss": 0.0,
"snt": 10,
"status": "OK",
"stdev": 0.0,
"worst": 20.78,
},
time.Unix(0, 0),
),
testutil.MustMetric("file",
map[string]string{
"dest": "example.org",
"hop": "4",
"ip": "93.184.216.34",
},
map[string]interface{}{
"avg": 24.02,
"best": 19.75,
"loss": 0.0,
"snt": 10,
"status": "OK",
"stdev": 4.67,
"worst": 32.41,
},
time.Unix(0, 0),
),
}
tests := []struct {
name string
plugin *File
csv csv.Parser
file string
count int
}{
{
name: "read file twice",
plugin: &File{
Files: []string{"testdata/mtr-utf-8.csv"},
CharacterEncoding: "",
},
csv: csv.Parser{
MetricName: "file",
SkipRows: 1,
ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"},
TagColumns: []string{"dest", "hop", "ip"},
},
count: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.plugin.Init()
require.NoError(t, err)
tt.plugin.SetParserFunc(func() (telegraf.Parser, error) {
parser := tt.csv
err := parser.Init()
return &parser, err
})
var acc testutil.Accumulator
for i := 0; i < tt.count; i++ {
require.NoError(t, tt.plugin.Gather(&acc))
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
acc.ClearMetrics()
}
})
}
}

View File

@ -13,7 +13,6 @@ import (
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
httpconfig "github.com/influxdata/telegraf/plugins/common/http" httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
) )
type HTTP struct { type HTTP struct {
@ -33,13 +32,12 @@ type HTTP struct {
SuccessStatusCodes []int `toml:"success_status_codes"` SuccessStatusCodes []int `toml:"success_status_codes"`
client *http.Client
httpconfig.HTTPClientConfig
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
// The parser will automatically be set by Telegraf core code because httpconfig.HTTPClientConfig
// this plugin implements the ParserInput interface (i.e. the SetParser method)
parser parsers.Parser client *http.Client
parserFunc telegraf.ParserFunc
} }
var sampleConfig = ` var sampleConfig = `
@ -153,9 +151,9 @@ func (h *HTTP) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
// SetParser takes the data_format from the config and finds the right parser for that format // SetParserFunc takes the data_format from the config and finds the right parser for that format
func (h *HTTP) SetParser(parser parsers.Parser) { func (h *HTTP) SetParserFunc(fn telegraf.ParserFunc) {
h.parser = parser h.parserFunc = fn
} }
// Gathers data from a particular URL // Gathers data from a particular URL
@ -230,12 +228,17 @@ func (h *HTTP) gatherURL(
b, err := io.ReadAll(resp.Body) b, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return err return fmt.Errorf("reading body failed: %v", err)
} }
metrics, err := h.parser.Parse(b) // Instantiate a new parser for the new data to avoid trouble with stateful parsers
parser, err := h.parserFunc()
if err != nil { if err != nil {
return err return fmt.Errorf("instantiating parser failed: %v", err)
}
metrics, err := parser.Parse(b)
if err != nil {
return fmt.Errorf("parsing metrics failed: %v", err)
} }
for _, metric := range metrics { for _, metric := range metrics {

View File

@ -8,13 +8,16 @@ import (
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
httpconfig "github.com/influxdata/telegraf/plugins/common/http" httpconfig "github.com/influxdata/telegraf/plugins/common/http"
"github.com/influxdata/telegraf/plugins/common/oauth" "github.com/influxdata/telegraf/plugins/common/oauth"
httpplugin "github.com/influxdata/telegraf/plugins/inputs/http" httpplugin "github.com/influxdata/telegraf/plugins/inputs/http"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/csv"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
) )
@ -35,11 +38,12 @@ func TestHTTPWithJSONFormat(t *testing.T) {
} }
metricName := "metricName" metricName := "metricName"
p, _ := parsers.NewParser(&parsers.Config{ plugin.SetParserFunc(func() (telegraf.Parser, error) {
DataFormat: "json", return parsers.NewParser(&parsers.Config{
MetricName: "metricName", DataFormat: "json",
MetricName: "metricName",
})
}) })
plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
@ -78,11 +82,12 @@ func TestHTTPHeaders(t *testing.T) {
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
p, _ := parsers.NewParser(&parsers.Config{ plugin.SetParserFunc(func() (telegraf.Parser, error) {
DataFormat: "json", return parsers.NewParser(&parsers.Config{
MetricName: "metricName", DataFormat: "json",
MetricName: "metricName",
})
}) })
plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
@ -101,12 +106,12 @@ func TestInvalidStatusCode(t *testing.T) {
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
metricName := "metricName" plugin.SetParserFunc(func() (telegraf.Parser, error) {
p, _ := parsers.NewParser(&parsers.Config{ return parsers.NewParser(&parsers.Config{
DataFormat: "json", DataFormat: "json",
MetricName: metricName, MetricName: "metricName",
})
}) })
plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
@ -126,12 +131,12 @@ func TestSuccessStatusCodes(t *testing.T) {
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
metricName := "metricName" plugin.SetParserFunc(func() (telegraf.Parser, error) {
p, _ := parsers.NewParser(&parsers.Config{ return parsers.NewParser(&parsers.Config{
DataFormat: "json", DataFormat: "json",
MetricName: metricName, MetricName: "metricName",
})
}) })
plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
@ -154,11 +159,12 @@ func TestMethod(t *testing.T) {
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
p, _ := parsers.NewParser(&parsers.Config{ plugin.SetParserFunc(func() (telegraf.Parser, error) {
DataFormat: "json", return parsers.NewParser(&parsers.Config{
MetricName: "metricName", DataFormat: "json",
MetricName: "metricName",
})
}) })
plugin.SetParser(p)
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
@ -170,6 +176,11 @@ const simpleJSON = `
"a": 1.2 "a": 1.2
} }
` `
const simpleCSVWithHeader = `
# Simple CSV with header(s)
a,b,c
1.2,3.1415,ok
`
func TestBodyAndContentEncoding(t *testing.T) { func TestBodyAndContentEncoding(t *testing.T) {
ts := httptest.NewServer(http.NotFoundHandler()) ts := httptest.NewServer(http.NotFoundHandler())
@ -253,15 +264,13 @@ func TestBodyAndContentEncoding(t *testing.T) {
tt.queryHandlerFunc(t, w, r) tt.queryHandlerFunc(t, w, r)
}) })
parser, err := parsers.NewParser(&parsers.Config{DataFormat: "influx"}) tt.plugin.SetParserFunc(func() (telegraf.Parser, error) {
require.NoError(t, err) return parsers.NewParser(&parsers.Config{DataFormat: "influx"})
})
tt.plugin.SetParser(parser)
var acc testutil.Accumulator var acc testutil.Accumulator
require.NoError(t, tt.plugin.Init()) require.NoError(t, tt.plugin.Init())
err = tt.plugin.Gather(&acc) require.NoError(t, tt.plugin.Gather(&acc))
require.NoError(t, err)
}) })
} }
} }
@ -335,8 +344,10 @@ func TestOAuthClientCredentialsGrant(t *testing.T) {
} }
}) })
parser, _ := parsers.NewValueParser("metric", "string", "", nil) tt.plugin.SetParserFunc(func() (telegraf.Parser, error) {
tt.plugin.SetParser(parser) return parsers.NewValueParser("metric", "string", "", nil)
})
err = tt.plugin.Init() err = tt.plugin.Init()
require.NoError(t, err) require.NoError(t, err)
@ -346,3 +357,55 @@ func TestOAuthClientCredentialsGrant(t *testing.T) {
}) })
} }
} }
func TestHTTPWithCSVFormat(t *testing.T) {
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
_, _ = w.Write([]byte(simpleCSVWithHeader))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer fakeServer.Close()
address := fakeServer.URL + "/endpoint"
plugin := &httpplugin.HTTP{
URLs: []string{address},
Log: testutil.Logger{},
}
plugin.SetParserFunc(func() (telegraf.Parser, error) {
parser := &csv.Parser{
MetricName: "metricName",
SkipRows: 2,
ColumnNames: []string{"a", "b", "c"},
TagColumns: []string{"c"},
}
err := parser.Init()
return parser, err
})
expected := []telegraf.Metric{
testutil.MustMetric("metricName",
map[string]string{
"url": address,
"c": "ok",
},
map[string]interface{}{
"a": 1.2,
"b": 3.1415,
},
time.Unix(0, 0),
),
}
var acc testutil.Accumulator
require.NoError(t, plugin.Init())
require.NoError(t, acc.GatherError(plugin.Gather))
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
// Run the parser a second time to test for correct stateful handling
acc.ClearMetrics()
require.NoError(t, acc.GatherError(plugin.Gather))
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}