From 40ed7fb693d856442e2b7aa6aa9216dd52b6ebc4 Mon Sep 17 00:00:00 2001 From: Joshua Powers Date: Thu, 10 Mar 2022 15:09:58 -0700 Subject: [PATCH] feat(parsers.influx): New influx line protocol via feature flag (#10749) --- config/config.go | 5 +- docs/LICENSE_OF_DEPENDENCIES.md | 1 + go.mod | 1 + go.sum | 13 +- plugins/inputs/influxdb_listener/README.md | 5 + .../influxdb_listener/influxdb_listener.go | 301 ++++-- .../influxdb_listener_test.go | 714 ++++++++------ plugins/inputs/influxdb_v2_listener/README.md | 5 + .../influxdb_v2_listener.go | 41 +- .../influxdb_v2_listener_test.go | 248 +++-- plugins/parsers/influx/README.md | 12 +- .../parsers/influx/influx_upstream/README.md | 4 + .../parsers/influx/influx_upstream/parser.go | 299 ++++++ .../influx/influx_upstream/parser_test.go | 892 ++++++++++++++++++ plugins/parsers/registry.go | 14 +- 15 files changed, 2056 insertions(+), 499 deletions(-) create mode 100644 plugins/parsers/influx/influx_upstream/README.md create mode 100644 plugins/parsers/influx/influx_upstream/parser.go create mode 100644 plugins/parsers/influx/influx_upstream/parser_test.go diff --git a/config/config.go b/config/config.go index 0998b47f1..309cb2f73 100644 --- a/config/config.go +++ b/config/config.go @@ -1603,6 +1603,9 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config, c.getFieldString(tbl, "value_field_name", &pc.ValueFieldName) + // for influx parser + c.getFieldString(tbl, "influx_parser_type", &pc.InfluxParserType) + //for XPath parser family if choice.Contains(pc.DataFormat, []string{"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf"}) { c.getFieldString(tbl, "xpath_protobuf_file", &pc.XPathProtobufFile) @@ -1823,7 +1826,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { "fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys", "grace", "graphite_separator", "graphite_tag_sanitize_mode", "graphite_tag_support", "grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns", - "grok_timezone", "grok_unique_timestamp", "influx_max_line_bytes", "influx_sort_fields", + "grok_timezone", "grok_unique_timestamp", "influx_max_line_bytes", "influx_parser_type", "influx_sort_fields", "influx_uint_support", "interval", "json_name_key", "json_query", "json_strict", "json_string_fields", "json_time_format", "json_time_key", "json_timestamp_format", "json_timestamp_units", "json_timezone", "json_v2", "lvm", "metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix", diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 31d9bc7d3..48b8c892d 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -139,6 +139,7 @@ following works: - github.com/influxdata/influxdb-observability/common [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE) - github.com/influxdata/influxdb-observability/influx2otel [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE) - github.com/influxdata/influxdb-observability/otel2influx [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE) +- github.com/influxdata/line-protocol [MIT License](https://github.com/influxdata/line-protocol/blob/v2/LICENSE) - github.com/influxdata/tail [MIT License](https://github.com/influxdata/tail/blob/master/LICENSE.txt) - github.com/influxdata/toml [MIT License](https://github.com/influxdata/toml/blob/master/LICENSE) - github.com/influxdata/wlog [MIT License](https://github.com/influxdata/wlog/blob/master/LICENSE) diff --git a/go.mod b/go.mod index e40b1d0b9..10a7b40c2 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( github.com/influxdata/influxdb-observability/common v0.2.10 github.com/influxdata/influxdb-observability/influx2otel v0.2.10 github.com/influxdata/influxdb-observability/otel2influx v0.2.10 + github.com/influxdata/line-protocol/v2 v2.2.1 github.com/influxdata/tail v1.0.1-0.20210707231403-b283181d1fa7 github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65 github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 diff --git a/go.sum b/go.sum index 4351253df..05e30a8ab 100644 --- a/go.sum +++ b/go.sum @@ -788,8 +788,11 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/frankban/quicktest v1.7.3/go.mod h1:V1d2J5pfxYH6EjBAgSK7YNXcXlTWxUHdE1sVDXkjnig= -github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= +github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk= +github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= @@ -1299,7 +1302,15 @@ github.com/influxdata/influxdb-observability/otel2influx v0.2.10 h1:sNZCYUExwCWs github.com/influxdata/influxdb-observability/otel2influx v0.2.10/go.mod h1:UOa19v6sU7EpL1dPK79Yt+mZ+1/iOwvMqcFu9yVXenw= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk= +github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= +github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937 h1:MHJNQ+p99hFATQm6ORoLmpUCF7ovjwEFshs/NHzAbig= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo= +github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY= +github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY= +github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE= +github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM= github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8= github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE= github.com/influxdata/tail v1.0.1-0.20210707231403-b283181d1fa7 h1:0rQOs1VHLVFpAAOIR0mJEvVOIaMYFgYdreeVbgI9sII= diff --git a/plugins/inputs/influxdb_listener/README.md b/plugins/inputs/influxdb_listener/README.md index 19cc1069a..540aada95 100644 --- a/plugins/inputs/influxdb_listener/README.md +++ b/plugins/inputs/influxdb_listener/README.md @@ -62,6 +62,11 @@ submits data to InfluxDB determines the destination database. ## You probably want to make sure you have TLS configured above for this. # basic_username = "foobar" # basic_password = "barfoo" + + ## Influx line protocol parser + ## 'internal' is the default. 'upstream' is a newer parser that is faster + ## and more memory efficient. + # parser_type = "internal" ``` ## Metrics diff --git a/plugins/inputs/influxdb_listener/influxdb_listener.go b/plugins/inputs/influxdb_listener/influxdb_listener.go index 754db2a78..ec19f10ce 100644 --- a/plugins/inputs/influxdb_listener/influxdb_listener.go +++ b/plugins/inputs/influxdb_listener/influxdb_listener.go @@ -16,6 +16,7 @@ import ( tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream" "github.com/influxdata/telegraf/selfstat" ) @@ -38,6 +39,7 @@ type InfluxDBListener struct { BasicPassword string `toml:"basic_password"` DatabaseTag string `toml:"database_tag"` RetentionPolicyTag string `toml:"retention_policy_tag"` + ParserType string `toml:"parser_type"` timeFunc influx.TimeFunc @@ -96,6 +98,11 @@ const sampleConfig = ` ## You probably want to make sure you have TLS configured above for this. # basic_username = "foobar" # basic_password = "barfoo" + + ## Influx line protocol parser + ## 'internal' is the default. 'upstream' is a newer parser that is faster + ## and more memory efficient. + # parser_type = "internal" ` func (h *InfluxDBListener) SampleConfig() string { @@ -254,112 +261,228 @@ func (h *InfluxDBListener) handleDefault() http.HandlerFunc { func (h *InfluxDBListener) handleWrite() http.HandlerFunc { return func(res http.ResponseWriter, req *http.Request) { - defer h.writesServed.Incr(1) - // Check that the content length is not too large for us to handle. - if req.ContentLength > int64(h.MaxBodySize) { - if err := tooLarge(res); err != nil { - h.Log.Debugf("error in too-large: %v", err) - } - return + if h.ParserType == "upstream" { + h.handleWriteUpstreamParser(res, req) } - db := req.URL.Query().Get("db") - rp := req.URL.Query().Get("rp") + h.handleWriteInternalParser(res, req) + } +} - body := req.Body - body = http.MaxBytesReader(res, body, int64(h.MaxBodySize)) - // Handle gzip request bodies - if req.Header.Get("Content-Encoding") == "gzip" { - var err error - body, err = gzip.NewReader(body) - if err != nil { - h.Log.Debugf("Error decompressing request body: %v", err.Error()) - if err := badRequest(res, err.Error()); err != nil { - h.Log.Debugf("error in bad-request: %v", err) - } - return - } - defer body.Close() +func (h *InfluxDBListener) handleWriteInternalParser(res http.ResponseWriter, req *http.Request) { + defer h.writesServed.Incr(1) + // Check that the content length is not too large for us to handle. + if req.ContentLength > int64(h.MaxBodySize) { + if err := tooLarge(res); err != nil { + h.Log.Debugf("error in too-large: %v", err) } + return + } - parser := influx.NewStreamParser(body) - parser.SetTimeFunc(h.timeFunc) + db := req.URL.Query().Get("db") + rp := req.URL.Query().Get("rp") - precisionStr := req.URL.Query().Get("precision") - if precisionStr != "" { - precision := getPrecisionMultiplier(precisionStr) - parser.SetTimePrecision(precision) - } - - var m telegraf.Metric + body := req.Body + body = http.MaxBytesReader(res, body, int64(h.MaxBodySize)) + // Handle gzip request bodies + if req.Header.Get("Content-Encoding") == "gzip" { var err error - var parseErrorCount int - var lastPos int - var firstParseErrorStr string - for { - select { - case <-req.Context().Done(): - // Shutting down before parsing is finished. - res.WriteHeader(http.StatusServiceUnavailable) - return - default: - } - - m, err = parser.Next() - pos := parser.Position() - h.bytesRecv.Incr(int64(pos - lastPos)) - lastPos = pos - - // Continue parsing metrics even if some are malformed - if parseErr, ok := err.(*influx.ParseError); ok { - parseErrorCount++ - errStr := parseErr.Error() - if firstParseErrorStr == "" { - firstParseErrorStr = errStr - } - continue - } else if err != nil { - // Either we're exiting cleanly (err == - // influx.EOF) or there's an unexpected error - break - } - - if h.DatabaseTag != "" && db != "" { - m.AddTag(h.DatabaseTag, db) - } - - if h.RetentionPolicyTag != "" && rp != "" { - m.AddTag(h.RetentionPolicyTag, rp) - } - - h.acc.AddMetric(m) - } - if err != influx.EOF { - h.Log.Debugf("Error parsing the request body: %v", err.Error()) + body, err = gzip.NewReader(body) + if err != nil { + h.Log.Debugf("Error decompressing request body: %v", err.Error()) if err := badRequest(res, err.Error()); err != nil { h.Log.Debugf("error in bad-request: %v", err) } return } - if parseErrorCount > 0 { - var partialErrorString string - switch parseErrorCount { - case 1: - partialErrorString = firstParseErrorStr - case 2: - partialErrorString = fmt.Sprintf("%s (and 1 other parse error)", firstParseErrorStr) - default: - partialErrorString = fmt.Sprintf("%s (and %d other parse errors)", firstParseErrorStr, parseErrorCount-1) + defer body.Close() + } + + parser := influx.NewStreamParser(body) + parser.SetTimeFunc(h.timeFunc) + + precisionStr := req.URL.Query().Get("precision") + if precisionStr != "" { + precision := getPrecisionMultiplier(precisionStr) + parser.SetTimePrecision(precision) + } + + var m telegraf.Metric + var err error + var parseErrorCount int + var lastPos int + var firstParseErrorStr string + for { + select { + case <-req.Context().Done(): + // Shutting down before parsing is finished. + res.WriteHeader(http.StatusServiceUnavailable) + return + default: + } + + m, err = parser.Next() + pos := parser.Position() + h.bytesRecv.Incr(int64(pos - lastPos)) + lastPos = pos + + // Continue parsing metrics even if some are malformed + if parseErr, ok := err.(*influx.ParseError); ok { + parseErrorCount++ + errStr := parseErr.Error() + if firstParseErrorStr == "" { + firstParseErrorStr = errStr } - if err := partialWrite(res, partialErrorString); err != nil { - h.Log.Debugf("error in partial-write: %v", err) + continue + } else if err != nil { + // Either we're exiting cleanly (err == + // influx.EOF) or there's an unexpected error + break + } + + if h.DatabaseTag != "" && db != "" { + m.AddTag(h.DatabaseTag, db) + } + + if h.RetentionPolicyTag != "" && rp != "" { + m.AddTag(h.RetentionPolicyTag, rp) + } + + h.acc.AddMetric(m) + } + if err != influx.EOF { + h.Log.Debugf("Error parsing the request body: %v", err.Error()) + if err := badRequest(res, err.Error()); err != nil { + h.Log.Debugf("error in bad-request: %v", err) + } + return + } + if parseErrorCount > 0 { + var partialErrorString string + switch parseErrorCount { + case 1: + partialErrorString = firstParseErrorStr + case 2: + partialErrorString = fmt.Sprintf("%s (and 1 other parse error)", firstParseErrorStr) + default: + partialErrorString = fmt.Sprintf("%s (and %d other parse errors)", firstParseErrorStr, parseErrorCount-1) + } + if err := partialWrite(res, partialErrorString); err != nil { + h.Log.Debugf("error in partial-write: %v", err) + } + return + } + + // http request success + res.WriteHeader(http.StatusNoContent) +} + +func (h *InfluxDBListener) handleWriteUpstreamParser(res http.ResponseWriter, req *http.Request) { + defer h.writesServed.Incr(1) + // Check that the content length is not too large for us to handle. + if req.ContentLength > int64(h.MaxBodySize) { + if err := tooLarge(res); err != nil { + h.Log.Debugf("error in too-large: %v", err) + } + return + } + + db := req.URL.Query().Get("db") + rp := req.URL.Query().Get("rp") + + body := req.Body + body = http.MaxBytesReader(res, body, int64(h.MaxBodySize)) + // Handle gzip request bodies + if req.Header.Get("Content-Encoding") == "gzip" { + var err error + body, err = gzip.NewReader(body) + if err != nil { + h.Log.Debugf("Error decompressing request body: %v", err.Error()) + if err := badRequest(res, err.Error()); err != nil { + h.Log.Debugf("error in bad-request: %v", err) } return } - - // http request success - res.WriteHeader(http.StatusNoContent) + defer body.Close() } + + parser := influx_upstream.NewStreamParser(body) + parser.SetTimeFunc(influx_upstream.TimeFunc(h.timeFunc)) + + precisionStr := req.URL.Query().Get("precision") + if precisionStr != "" { + precision := getPrecisionMultiplier(precisionStr) + parser.SetTimePrecision(precision) + } + + if req.ContentLength >= 0 { + h.bytesRecv.Incr(req.ContentLength) + } + + var m telegraf.Metric + var err error + var parseErrorCount int + var firstParseErrorStr string + for { + select { + case <-req.Context().Done(): + // Shutting down before parsing is finished. + res.WriteHeader(http.StatusServiceUnavailable) + return + default: + } + + m, err = parser.Next() + + // Continue parsing metrics even if some are malformed + if parseErr, ok := err.(*influx_upstream.ParseError); ok { + parseErrorCount++ + errStr := parseErr.Error() + if firstParseErrorStr == "" { + firstParseErrorStr = errStr + } + continue + } else if err != nil { + // Either we're exiting cleanly (err == + // influx.ErrEOF) or there's an unexpected error + break + } + + if h.DatabaseTag != "" && db != "" { + m.AddTag(h.DatabaseTag, db) + } + + if h.RetentionPolicyTag != "" && rp != "" { + m.AddTag(h.RetentionPolicyTag, rp) + } + + h.acc.AddMetric(m) + } + if err != influx_upstream.ErrEOF { + h.Log.Debugf("Error parsing the request body: %v", err.Error()) + if err := badRequest(res, err.Error()); err != nil { + h.Log.Debugf("error in bad-request: %v", err) + } + return + } + if parseErrorCount > 0 { + var partialErrorString string + switch parseErrorCount { + case 1: + partialErrorString = firstParseErrorStr + case 2: + partialErrorString = fmt.Sprintf("%s (and 1 other parse error)", firstParseErrorStr) + default: + partialErrorString = fmt.Sprintf("%s (and %d other parse errors)", firstParseErrorStr, parseErrorCount-1) + } + if err := partialWrite(res, partialErrorString); err != nil { + h.Log.Debugf("error in partial-write: %v", err) + } + return + } + + // http request success + res.WriteHeader(http.StatusNoContent) } func tooLarge(res http.ResponseWriter) error { diff --git a/plugins/inputs/influxdb_listener/influxdb_listener_test.go b/plugins/inputs/influxdb_listener/influxdb_listener_test.go index 36952f685..2871fa128 100644 --- a/plugins/inputs/influxdb_listener/influxdb_listener_test.go +++ b/plugins/inputs/influxdb_listener/influxdb_listener_test.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/tls" "crypto/x509" + "fmt" "net/http" "net/url" "os" @@ -44,7 +45,13 @@ cpu,host=c value1=1` ) var ( - pki = testutil.NewPKI("../../../testutil/pki") + pki = testutil.NewPKI("../../../testutil/pki") + parserTestCases = []struct { + parser string + }{ + {"upstream"}, + {"internal"}, + } ) func newTestListener() *InfluxDBListener { @@ -159,52 +166,57 @@ func TestWriteBasicAuth(t *testing.T) { func TestWriteKeepDatabase(t *testing.T) { testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n" - listener := newTestListener() - listener.DatabaseTag = "database" + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser + listener.DatabaseTag = "database" - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsg))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsg))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) - acc.Wait(1) - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01", "database": "mydb"}, - ) + acc.Wait(1) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01", "database": "mydb"}, + ) - // post single message to listener with a database tag in it already. It should be clobbered. - resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgWithDB))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + // post single message to listener with a database tag in it already. It should be clobbered. + resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgWithDB))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) - acc.Wait(1) - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01", "database": "mydb"}, - ) + acc.Wait(1) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01", "database": "mydb"}, + ) - // post multiple message to listener - resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgs))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + // post multiple message to listener + resp, err = http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgs))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) - acc.Wait(2) - hostTags := []string{"server02", "server03", - "server04", "server05", "server06"} - for _, hostTag := range hostTags { - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": hostTag, "database": "mydb"}, - ) + acc.Wait(2) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag, "database": "mydb"}, + ) + } + }) } } @@ -241,193 +253,227 @@ func TestWriteRetentionPolicyTag(t *testing.T) { // http listener should add a newline at the end of the buffer if it's not there func TestWriteNoNewline(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testMsgNoNewline))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) - acc.Wait(1) - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01"}, - ) + acc.Wait(1) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01"}, + ) + }) + } } func TestPartialWrite(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testPartial))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 400, resp.StatusCode) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(testPartial))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 400, resp.StatusCode) - acc.Wait(1) - acc.AssertContainsTaggedFields(t, "cpu", - map[string]interface{}{"value1": float64(1)}, - map[string]string{"host": "a"}, - ) - acc.AssertContainsTaggedFields(t, "cpu", - map[string]interface{}{"value1": float64(1)}, - map[string]string{"host": "c"}, - ) + acc.Wait(1) + acc.AssertContainsTaggedFields(t, "cpu", + map[string]interface{}{"value1": float64(1)}, + map[string]string{"host": "a"}, + ) + acc.AssertContainsTaggedFields(t, "cpu", + map[string]interface{}{"value1": float64(1)}, + map[string]string{"host": "c"}, + ) + }) + } } func TestWriteMaxLineSizeIncrease(t *testing.T) { - listener := &InfluxDBListener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:0", - timeFunc: time.Now, + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := &InfluxDBListener{ + Log: testutil.Logger{}, + ServiceAddress: "localhost:0", + ParserType: tc.parser, + timeFunc: time.Now, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + // Post a gigantic metric to the listener and verify that it writes OK this time: + resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) + }) } - - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() - - // Post a gigantic metric to the listener and verify that it writes OK this time: - resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(hugeMetric))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) } func TestWriteVerySmallMaxBody(t *testing.T) { - listener := &InfluxDBListener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:0", - MaxBodySize: config.Size(4096), - timeFunc: time.Now, + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := &InfluxDBListener{ + Log: testutil.Logger{}, + ServiceAddress: "localhost:0", + MaxBodySize: config.Size(4096), + ParserType: tc.parser, + timeFunc: time.Now, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + for _, parser := range []string{"internal", "upstream"} { + listener.ParserType = parser + + resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 413, resp.StatusCode) + } + }) } - - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() - - resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 413, resp.StatusCode) } func TestWriteLargeLine(t *testing.T) { - listener := &InfluxDBListener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:0", - timeFunc: func() time.Time { - return time.Unix(123456789, 0) - }, - } + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := &InfluxDBListener{ + Log: testutil.Logger{}, + ServiceAddress: "localhost:0", + ParserType: tc.parser, + timeFunc: func() time.Time { + return time.Unix(123456789, 0) + }, + } - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric+testMsgs))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - //todo: with the new parser, long lines aren't a problem. Do we need to skip them? - //require.EqualValues(t, 400, resp.StatusCode) + resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric+testMsgs))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + //todo: with the new parser, long lines aren't a problem. Do we need to skip them? + //require.EqualValues(t, 400, resp.StatusCode) - expected := testutil.MustMetric( - "super_long_metric", - map[string]string{"foo": "bar"}, - map[string]interface{}{ - "clients": 42, - "connected_followers": 43, - "evicted_keys": 44, - "expired_keys": 45, - "instantaneous_ops_per_sec": 46, - "keyspace_hitrate": 47.0, - "keyspace_hits": 48, - "keyspace_misses": 49, - "latest_fork_usec": 50, - "master_repl_offset": 51, - "mem_fragmentation_ratio": 52.58, - "pubsub_channels": 53, - "pubsub_patterns": 54, - "rdb_changes_since_last_save": 55, - "repl_backlog_active": 56, - "repl_backlog_histlen": 57, - "repl_backlog_size": 58, - "sync_full": 59, - "sync_partial_err": 60, - "sync_partial_ok": 61, - "total_commands_processed": 62, - "total_connections_received": 63, - "uptime": 64, - "used_cpu_sys": 65.07, - "used_cpu_sys_children": 66.0, - "used_cpu_user": 67.1, - "used_cpu_user_children": 68.0, - "used_memory": 692048, - "used_memory_lua": 70792, - "used_memory_peak": 711128, - "used_memory_rss": 7298144, - }, - time.Unix(123456789, 0), - ) + expected := testutil.MustMetric( + "super_long_metric", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "clients": 42, + "connected_followers": 43, + "evicted_keys": 44, + "expired_keys": 45, + "instantaneous_ops_per_sec": 46, + "keyspace_hitrate": 47.0, + "keyspace_hits": 48, + "keyspace_misses": 49, + "latest_fork_usec": 50, + "master_repl_offset": 51, + "mem_fragmentation_ratio": 52.58, + "pubsub_channels": 53, + "pubsub_patterns": 54, + "rdb_changes_since_last_save": 55, + "repl_backlog_active": 56, + "repl_backlog_histlen": 57, + "repl_backlog_size": 58, + "sync_full": 59, + "sync_partial_err": 60, + "sync_partial_ok": 61, + "total_commands_processed": 62, + "total_connections_received": 63, + "uptime": 64, + "used_cpu_sys": 65.07, + "used_cpu_sys_children": 66.0, + "used_cpu_user": 67.1, + "used_cpu_user_children": 68.0, + "used_memory": 692048, + "used_memory_lua": 70792, + "used_memory_peak": 711128, + "used_memory_rss": 7298144, + }, + time.Unix(123456789, 0), + ) - m, ok := acc.Get("super_long_metric") - require.True(t, ok) - testutil.RequireMetricEqual(t, expected, testutil.FromTestMetric(m)) + m, ok := acc.Get("super_long_metric") + require.True(t, ok) + testutil.RequireMetricEqual(t, expected, testutil.FromTestMetric(m)) - hostTags := []string{"server02", "server03", - "server04", "server05", "server06"} - acc.Wait(len(hostTags)) - for _, hostTag := range hostTags { - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": hostTag}, - ) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + acc.Wait(len(hostTags)) + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } + }) } } // test that writing gzipped data works func TestWriteGzippedData(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - data, err := os.ReadFile("./testdata/testmsgs.gz") - require.NoError(t, err) + data, err := os.ReadFile("./testdata/testmsgs.gz") + require.NoError(t, err) - req, err := http.NewRequest("POST", createURL(listener, "http", "/write", ""), bytes.NewBuffer(data)) - require.NoError(t, err) - req.Header.Set("Content-Encoding", "gzip") + req, err := http.NewRequest("POST", createURL(listener, "http", "/write", ""), bytes.NewBuffer(data)) + require.NoError(t, err) + req.Header.Set("Content-Encoding", "gzip") - client := &http.Client{} - resp, err := client.Do(req) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + client := &http.Client{} + resp, err := client.Do(req) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) - hostTags := []string{"server02", "server03", - "server04", "server05", "server06"} - acc.Wait(len(hostTags)) - for _, hostTag := range hostTags { - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": hostTag}, - ) + hostTags := []string{"server02", "server03", + "server04", "server05", "server06"} + acc.Wait(len(hostTags)) + for _, hostTag := range hostTags { + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": hostTag}, + ) + } + }) } } @@ -472,144 +518,228 @@ func TestWriteHighTraffic(t *testing.T) { } func TestReceive404ForInvalidEndpoint(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/foobar", ""), "", bytes.NewBuffer([]byte(testMsg))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 404, resp.StatusCode) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/foobar", ""), "", bytes.NewBuffer([]byte(testMsg))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 404, resp.StatusCode) + }) + } } func TestWriteInvalid(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(badMsg))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 400, resp.StatusCode) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(badMsg))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 400, resp.StatusCode) + }) + } } func TestWriteEmpty(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(emptyMsg))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/write", "db=mydb"), "", bytes.NewBuffer([]byte(emptyMsg))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) + }) + } } func TestQuery(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post query to listener - resp, err := http.Post( - createURL(listener, "http", "/query", "db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22"), "", nil) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 200, resp.StatusCode) + // post query to listener + resp, err := http.Post( + createURL(listener, "http", "/query", "db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22"), "", nil) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 200, resp.StatusCode) + }) + } } func TestPing(t *testing.T) { - listener := newTestListener() - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - // post ping to listener - resp, err := http.Post(createURL(listener, "http", "/ping", ""), "", nil) - require.NoError(t, err) - require.Equal(t, "1.0", resp.Header["X-Influxdb-Version"][0]) - require.Len(t, resp.Header["Content-Type"], 0) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + // post ping to listener + resp, err := http.Post(createURL(listener, "http", "/ping", ""), "", nil) + require.NoError(t, err) + require.Equal(t, "1.0", resp.Header["X-Influxdb-Version"][0]) + require.Len(t, resp.Header["Content-Type"], 0) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) + }) + } } func TestPingVerbose(t *testing.T) { - listener := newTestListener() - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - // post ping to listener - resp, err := http.Post(createURL(listener, "http", "/ping", "verbose=1"), "", nil) - require.NoError(t, err) - require.Equal(t, "1.0", resp.Header["X-Influxdb-Version"][0]) - require.Equal(t, "application/json", resp.Header["Content-Type"][0]) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 200, resp.StatusCode) + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + // post ping to listener + resp, err := http.Post(createURL(listener, "http", "/ping", "verbose=1"), "", nil) + require.NoError(t, err) + require.Equal(t, "1.0", resp.Header["X-Influxdb-Version"][0]) + require.Equal(t, "application/json", resp.Header["Content-Type"][0]) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 200, resp.StatusCode) + }) + } } func TestWriteWithPrecision(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - msg := "xyzzy value=42 1422568543\n" - resp, err := http.Post( - createURL(listener, "http", "/write", "precision=s"), "", bytes.NewBuffer([]byte(msg))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + msg := "xyzzy value=42 1422568543\n" + resp, err := http.Post( + createURL(listener, "http", "/write", "precision=s"), "", bytes.NewBuffer([]byte(msg))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) - acc.Wait(1) - require.Equal(t, 1, len(acc.Metrics)) - // When timestamp is provided, the precision parameter is - // overloaded to specify the timestamp's unit - require.Equal(t, time.Unix(0, 1422568543000000000), acc.Metrics[0].Time) + acc.Wait(1) + require.Equal(t, 1, len(acc.Metrics)) + // When timestamp is provided, the precision parameter is + // overloaded to specify the timestamp's unit + require.Equal(t, time.Unix(0, 1422568543000000000), acc.Metrics[0].Time) + }) + } } func TestWriteWithPrecisionNoTimestamp(t *testing.T) { - listener := newTestListener() - listener.timeFunc = func() time.Time { - return time.Unix(42, 123456789) + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser + listener.timeFunc = func() time.Time { + return time.Unix(42, 123456789) + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + msg := "xyzzy value=42\n" + resp, err := http.Post( + createURL(listener, "http", "/write", "precision=s"), "", bytes.NewBuffer([]byte(msg))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) + + acc.Wait(1) + require.Equal(t, 1, len(acc.Metrics)) + // When timestamp is omitted, the precision parameter actually + // specifies the precision. The timestamp is set to the greatest + // integer unit less than the provided timestamp (floor). + require.Equal(t, time.Unix(42, 0), acc.Metrics[0].Time) + }) } +} - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() +func TestWriteUpstreamParseErrors(t *testing.T) { + var tests = []struct { + name string + input string + expected string + }{ + { + name: "one parse error", + input: "foo value=1.0\nfoo value=2asdf2.0\nfoo value=3.0\nfoo value=4.0", + expected: `metric parse error: cannot parse value for field key "value": invalid float value syntax at 2:11`, + }, + { + name: "two parse errors", + input: "foo value=1asdf2.0\nfoo value=2.0\nfoo value=3asdf2.0\nfoo value=4.0", + expected: `metric parse error: cannot parse value for field key "value": invalid float value syntax at 1:11 (and 1 other parse error)`, + }, + { + name: "three or more parse errors", + input: "foo value=1asdf2.0\nfoo value=2.0\nfoo value=3asdf2.0\nfoo value=4asdf2.0", + expected: `metric parse error: cannot parse value for field key "value": invalid float value syntax at 1:11 (and 2 other parse errors)`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + listener := newTestListener() + listener.ParserType = "upstream" - msg := "xyzzy value=42\n" - resp, err := http.Post( - createURL(listener, "http", "/write", "precision=s"), "", bytes.NewBuffer([]byte(msg))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + acc := &testutil.NopAccumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - acc.Wait(1) - require.Equal(t, 1, len(acc.Metrics)) - // When timestamp is omitted, the precision parameter actually - // specifies the precision. The timestamp is set to the greatest - // integer unit less than the provided timestamp (floor). - require.Equal(t, time.Unix(42, 0), acc.Metrics[0].Time) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(tt.input))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 400, resp.StatusCode) + require.Equal(t, tt.expected, resp.Header["X-Influxdb-Error"][0]) + }) + } } func TestWriteParseErrors(t *testing.T) { diff --git a/plugins/inputs/influxdb_v2_listener/README.md b/plugins/inputs/influxdb_v2_listener/README.md index 11c95c696..dd7a258a3 100644 --- a/plugins/inputs/influxdb_v2_listener/README.md +++ b/plugins/inputs/influxdb_v2_listener/README.md @@ -40,6 +40,11 @@ Telegraf minimum version: Telegraf 1.16.0 ## Optional token to accept for HTTP authentication. ## You probably want to make sure you have TLS configured above for this. # token = "some-long-shared-secret-token" + + ## Influx line protocol parser + ## 'internal' is the default. 'upstream' is a newer parser that is faster + ## and more memory efficient. + # parser_type = "internal" ``` ## Metrics diff --git a/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go b/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go index 4df2f7dc8..3f8cae777 100644 --- a/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go +++ b/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "net" @@ -17,6 +18,7 @@ import ( tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream" "github.com/influxdata/telegraf/selfstat" ) @@ -26,6 +28,8 @@ const ( defaultMaxBodySize = 32 * 1024 * 1024 ) +var ErrEOF = errors.New("EOF") + // The BadRequestCode constants keep standard error messages // see: https://v2.docs.influxdata.com/v2.0/api/#operation/PostWrite type BadRequestCode string @@ -43,6 +47,7 @@ type InfluxDBV2Listener struct { MaxBodySize config.Size `toml:"max_body_size"` Token string `toml:"token"` BucketTag string `toml:"bucket_tag"` + ParserType string `toml:"parser_type"` timeFunc influx.TimeFunc @@ -92,6 +97,11 @@ const sampleConfig = ` ## Optional token to accept for HTTP authentication. ## You probably want to make sure you have TLS configured above for this. # token = "some-long-shared-secret-token" + + ## Influx line protocol parser + ## 'internal' is the default. 'upstream' is a newer parser that is faster + ## and more memory efficient. + # parser_type = "internal" ` func (h *InfluxDBV2Listener) SampleConfig() string { @@ -264,22 +274,35 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc { } return } - metricHandler := influx.NewMetricHandler() - parser := influx.NewParser(metricHandler) - parser.SetTimeFunc(h.timeFunc) precisionStr := req.URL.Query().Get("precision") - if precisionStr != "" { - precision := getPrecisionMultiplier(precisionStr) - metricHandler.SetTimePrecision(precision) - } var metrics []telegraf.Metric var err error + if h.ParserType == "upstream" { + parser := influx_upstream.NewParser() + parser.SetTimeFunc(influx_upstream.TimeFunc(h.timeFunc)) - metrics, err = parser.Parse(bytes) + if precisionStr != "" { + precision := getPrecisionMultiplier(precisionStr) + parser.SetTimePrecision(precision) + } - if err != influx.EOF && err != nil { + metrics, err = parser.Parse(bytes) + } else { + metricHandler := influx.NewMetricHandler() + parser := influx.NewParser(metricHandler) + parser.SetTimeFunc(h.timeFunc) + + if precisionStr != "" { + precision := getPrecisionMultiplier(precisionStr) + metricHandler.SetTimePrecision(precision) + } + + metrics, err = parser.Parse(bytes) + } + + if err != ErrEOF && err != nil { h.Log.Debugf("Error parsing the request body: %v", err.Error()) if err := badRequest(res, Invalid, err.Error()); err != nil { h.Log.Debugf("error in bad-request: %v", err) diff --git a/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener_test.go b/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener_test.go index 4338f34f8..7e2e2421b 100644 --- a/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener_test.go +++ b/plugins/inputs/influxdb_v2_listener/influxdb_v2_listener_test.go @@ -44,7 +44,13 @@ cpu,host=c value1=1` ) var ( - pki = testutil.NewPKI("../../../testutil/pki") + pki = testutil.NewPKI("../../../testutil/pki") + parserTestCases = []struct { + parser string + }{ + {"upstream"}, + {"internal"}, + } ) func newTestListener() *InfluxDBV2Listener { @@ -209,77 +215,96 @@ func TestWriteKeepBucket(t *testing.T) { // http listener should add a newline at the end of the buffer if it's not there func TestWriteNoNewline(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(testMsgNoNewline))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(testMsgNoNewline))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) - acc.Wait(1) - acc.AssertContainsTaggedFields(t, "cpu_load_short", - map[string]interface{}{"value": float64(12)}, - map[string]string{"host": "server01"}, - ) + acc.Wait(1) + acc.AssertContainsTaggedFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(12)}, + map[string]string{"host": "server01"}, + ) + }) + } } func TestAllOrNothing(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(testPartial))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 400, resp.StatusCode) + resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(testPartial))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 400, resp.StatusCode) + }) + } } func TestWriteMaxLineSizeIncrease(t *testing.T) { - listener := &InfluxDBV2Listener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:0", - timeFunc: time.Now, + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := &InfluxDBV2Listener{ + Log: testutil.Logger{}, + ServiceAddress: "localhost:0", + timeFunc: time.Now, + ParserType: tc.parser, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + // Post a gigantic metric to the listener and verify that it writes OK this time: + resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(hugeMetric))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) + }) } - - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() - - // Post a gigantic metric to the listener and verify that it writes OK this time: - resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(hugeMetric))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) } func TestWriteVerySmallMaxBody(t *testing.T) { - listener := &InfluxDBV2Listener{ - Log: testutil.Logger{}, - ServiceAddress: "localhost:0", - MaxBodySize: config.Size(4096), - timeFunc: time.Now, + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := &InfluxDBV2Listener{ + Log: testutil.Logger{}, + ServiceAddress: "localhost:0", + MaxBodySize: config.Size(4096), + timeFunc: time.Now, + ParserType: tc.parser, + } + + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() + + resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(hugeMetric))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 413, resp.StatusCode) + }) } - - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() - - resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(hugeMetric))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 413, resp.StatusCode) } func TestWriteLargeLine(t *testing.T) { @@ -430,48 +455,63 @@ func TestWriteHighTraffic(t *testing.T) { } func TestReceive404ForInvalidEndpoint(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/foobar", ""), "", bytes.NewBuffer([]byte(testMsg))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 404, resp.StatusCode) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/foobar", ""), "", bytes.NewBuffer([]byte(testMsg))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 404, resp.StatusCode) + }) + } } func TestWriteInvalid(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(badMsg))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 400, resp.StatusCode) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(badMsg))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 400, resp.StatusCode) + }) + } } func TestWriteEmpty(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - // post single message to listener - resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(emptyMsg))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + // post single message to listener + resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(emptyMsg))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) + }) + } } func TestReady(t *testing.T) { @@ -496,25 +536,29 @@ func TestReady(t *testing.T) { } func TestWriteWithPrecision(t *testing.T) { - listener := newTestListener() + for _, tc := range parserTestCases { + t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) { + listener := newTestListener() + listener.ParserType = tc.parser - acc := &testutil.Accumulator{} - require.NoError(t, listener.Init()) - require.NoError(t, listener.Start(acc)) - defer listener.Stop() + acc := &testutil.Accumulator{} + require.NoError(t, listener.Init()) + require.NoError(t, listener.Start(acc)) + defer listener.Stop() - msg := "xyzzy value=42 1422568543\n" - resp, err := http.Post( - createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s"), "", bytes.NewBuffer([]byte(msg))) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - require.EqualValues(t, 204, resp.StatusCode) + msg := "xyzzy value=42 1422568543\n" + resp, err := http.Post( + createURL(listener, "http", "/api/v2/write", "bucket=mybucket&precision=s"), "", bytes.NewBuffer([]byte(msg))) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.EqualValues(t, 204, resp.StatusCode) - acc.Wait(1) - require.Equal(t, 1, len(acc.Metrics)) - // When timestamp is provided, the precision parameter is - // overloaded to specify the timestamp's unit - require.Equal(t, time.Unix(0, 1422568543000000000), acc.Metrics[0].Time) + acc.Wait(1) + // When timestamp is provided, the precision parameter is + // overloaded to specify the timestamp's unit + require.Equal(t, time.Unix(0, 1422568543000000000), acc.Metrics[0].Time) + }) + } } func TestWriteWithPrecisionNoTimestamp(t *testing.T) { diff --git a/plugins/parsers/influx/README.md b/plugins/parsers/influx/README.md index b0624e217..594eff2b4 100644 --- a/plugins/parsers/influx/README.md +++ b/plugins/parsers/influx/README.md @@ -1,9 +1,8 @@ -# InfluxDB Line Protocol +# Influx Line Protocol -There are no additional configuration options for InfluxDB [line protocol][]. The -metrics are parsed directly into Telegraf metrics. +Parses metrics using the [Influx Line Protocol][]. -[line protocol]: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/ +[Influx Line Protocol]: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/ ## Configuration @@ -16,4 +15,9 @@ metrics are parsed directly into Telegraf metrics. ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" + + ## Influx line protocol parser + ## 'internal' is the default. 'upstream' is a newer parser that is faster + ## and more memory efficient. + ## influx_parser_version = "internal" ``` diff --git a/plugins/parsers/influx/influx_upstream/README.md b/plugins/parsers/influx/influx_upstream/README.md new file mode 100644 index 000000000..ce1035155 --- /dev/null +++ b/plugins/parsers/influx/influx_upstream/README.md @@ -0,0 +1,4 @@ +# Influx Line Protocol + +This package implements the upstream Influx line protocol parser. See the +Influx README.md for more details. diff --git a/plugins/parsers/influx/influx_upstream/parser.go b/plugins/parsers/influx/influx_upstream/parser.go new file mode 100644 index 000000000..153407a0e --- /dev/null +++ b/plugins/parsers/influx/influx_upstream/parser.go @@ -0,0 +1,299 @@ +package influx_upstream + +import ( + "errors" + "fmt" + "io" + "strings" + "time" + + "github.com/influxdata/line-protocol/v2/lineprotocol" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +const ( + maxErrorBufferSize = 1024 +) + +var ( + ErrNoMetric = errors.New("no metric in line") + ErrEOF = errors.New("EOF") +) + +type TimeFunc func() time.Time + +// nthIndexAny finds the nth index of some unicode code point in a string or returns -1 +func nthIndexAny(s, chars string, n int) int { + offset := 0 + for found := 1; found <= n; found++ { + i := strings.IndexAny(s[offset:], chars) + if i < 0 { + break + } + + offset += i + if found == n { + return offset + } + + offset += len(chars) + } + + return -1 +} + +// ParseError indicates a error in the parsing of the text. +type ParseError struct { + *lineprotocol.DecodeError + buf string +} + +func (e *ParseError) Error() string { + // When an error occurs within the stream decoder, we do not have access + // to the internal buffer, so we cannot display the contents of the invalid + // metric + if e.buf == "" { + return fmt.Sprintf("metric parse error: %s at %d:%d", e.Err, e.Line, e.Column) + } + + lineStart := nthIndexAny(e.buf, "\n", int(e.Line-1)) + 1 + buffer := e.buf[lineStart:] + eol := strings.IndexAny(buffer, "\n") + if eol >= 0 { + buffer = strings.TrimSuffix(buffer[:eol], "\r") + } + if len(buffer) > maxErrorBufferSize { + startEllipsis := true + offset := e.Column - 1 - lineStart + if offset > len(buffer) || offset < 0 { + offset = len(buffer) + } + start := offset - maxErrorBufferSize + if start < 0 { + startEllipsis = false + start = 0 + } + // if we trimmed it the column won't line up. it'll always be the last character, + // because the parser doesn't continue past it, but point it out anyway so + // it's obvious where the issue is. + buffer = buffer[start:offset] + "<-- here" + if startEllipsis { + buffer = "..." + buffer + } + } + return fmt.Sprintf("metric parse error: %s at %d:%d: %q", e.Err, e.Line, e.Column, buffer) +} + +// convertToParseError attempts to convert a lineprotocol.DecodeError to a ParseError +func convertToParseError(input []byte, rawErr error) error { + err, ok := rawErr.(*lineprotocol.DecodeError) + if !ok { + return rawErr + } + + return &ParseError{ + DecodeError: err, + buf: string(input), + } +} + +// Parser is an InfluxDB Line Protocol parser that implements the +// parsers.Parser interface. +type Parser struct { + DefaultTags map[string]string + + defaultTime TimeFunc + precision lineprotocol.Precision + allowPartial bool +} + +// NewParser returns a Parser than accepts line protocol +func NewParser() *Parser { + return &Parser{ + defaultTime: time.Now, + precision: lineprotocol.Nanosecond, + } +} + +// NewSeriesParser returns a Parser than accepts a measurement and tagset +func NewSeriesParser() *Parser { + return &Parser{ + defaultTime: time.Now, + precision: lineprotocol.Nanosecond, + allowPartial: true, + } +} + +func (p *Parser) SetTimeFunc(f TimeFunc) { + p.defaultTime = f +} + +func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) { + metrics := make([]telegraf.Metric, 0) + decoder := lineprotocol.NewDecoderWithBytes(input) + + for decoder.Next() { + m, err := nextMetric(decoder, p.precision, p.defaultTime, p.allowPartial) + if err != nil { + return nil, convertToParseError(input, err) + } + metrics = append(metrics, m) + } + + p.applyDefaultTags(metrics) + return metrics, nil +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line)) + if err != nil { + return nil, err + } + + if len(metrics) < 1 { + return nil, ErrNoMetric + } + + return metrics[0], nil +} + +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} + +func (p *Parser) SetTimePrecision(u time.Duration) { + switch u { + case time.Nanosecond: + p.precision = lineprotocol.Nanosecond + case time.Microsecond: + p.precision = lineprotocol.Microsecond + case time.Millisecond: + p.precision = lineprotocol.Millisecond + case time.Second: + p.precision = lineprotocol.Second + } +} + +func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) { + if len(p.DefaultTags) == 0 { + return + } + + for _, m := range metrics { + p.applyDefaultTagsSingle(m) + } +} + +func (p *Parser) applyDefaultTagsSingle(m telegraf.Metric) { + for k, v := range p.DefaultTags { + if !m.HasTag(k) { + m.AddTag(k, v) + } + } +} + +// StreamParser is an InfluxDB Line Protocol parser. It is not safe for +// concurrent use in multiple goroutines. +type StreamParser struct { + decoder *lineprotocol.Decoder + defaultTime TimeFunc + precision lineprotocol.Precision + lastError error +} + +func NewStreamParser(r io.Reader) *StreamParser { + return &StreamParser{ + decoder: lineprotocol.NewDecoder(r), + defaultTime: time.Now, + precision: lineprotocol.Nanosecond, + } +} + +// SetTimeFunc changes the function used to determine the time of metrics +// without a timestamp. The default TimeFunc is time.Now. Useful mostly for +// testing, or perhaps if you want all metrics to have the same timestamp. +func (sp *StreamParser) SetTimeFunc(f TimeFunc) { + sp.defaultTime = f +} + +func (sp *StreamParser) SetTimePrecision(u time.Duration) { + switch u { + case time.Nanosecond: + sp.precision = lineprotocol.Nanosecond + case time.Microsecond: + sp.precision = lineprotocol.Microsecond + case time.Millisecond: + sp.precision = lineprotocol.Millisecond + case time.Second: + sp.precision = lineprotocol.Second + } +} + +// Next parses the next item from the stream. You can repeat calls to this +// function if it returns ParseError to get the next metric or error. +func (sp *StreamParser) Next() (telegraf.Metric, error) { + if !sp.decoder.Next() { + if err := sp.decoder.Err(); err != nil && err != sp.lastError { + sp.lastError = err + return nil, err + } + + return nil, ErrEOF + } + + m, err := nextMetric(sp.decoder, sp.precision, sp.defaultTime, false) + if err != nil { + return nil, convertToParseError([]byte{}, err) + } + + return m, nil +} + +func nextMetric(decoder *lineprotocol.Decoder, precision lineprotocol.Precision, defaultTime TimeFunc, allowPartial bool) (telegraf.Metric, error) { + measurement, err := decoder.Measurement() + if err != nil { + return nil, err + } + m := metric.New(string(measurement), nil, nil, time.Time{}) + + for { + key, value, err := decoder.NextTag() + if err != nil { + // Allow empty tags for series parser + if strings.Contains(err.Error(), "empty tag name") && allowPartial { + break + } + + return nil, err + } else if key == nil { + break + } + + m.AddTag(string(key), string(value)) + } + + for { + key, value, err := decoder.NextField() + if err != nil { + // Allow empty fields for series parser + if strings.Contains(err.Error(), "expected field key") && allowPartial { + break + } + + return nil, err + } else if key == nil { + break + } + + m.AddField(string(key), value.Interface()) + } + + t, err := decoder.Time(precision, defaultTime()) + if err != nil && !allowPartial { + return nil, err + } + m.SetTime(t) + + return m, nil +} diff --git a/plugins/parsers/influx/influx_upstream/parser_test.go b/plugins/parsers/influx/influx_upstream/parser_test.go new file mode 100644 index 000000000..686aceb22 --- /dev/null +++ b/plugins/parsers/influx/influx_upstream/parser_test.go @@ -0,0 +1,892 @@ +package influx_upstream + +import ( + "bytes" + "errors" + "io" + "strings" + "testing" + "time" + + "github.com/influxdata/line-protocol/v2/lineprotocol" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" +) + +var DefaultTime = func() time.Time { + return time.Unix(42, 0) +} + +type parseTest struct { + name string + input []byte + timeFunc TimeFunc + metrics []telegraf.Metric + err error +} + +func parseTests(stream bool) []parseTest { + // This is required as there is no way to get the internal buffer + // of the decoder to show where the error occurred. As such, the + // error `buf` will be empty when decoding from a stream. + var ( + intOverflowBuf string + uintOverflowBuf string + invalidMeasurementBuf string + ) + if stream { + intOverflowBuf = "" + uintOverflowBuf = "" + invalidMeasurementBuf = "" + } else { + intOverflowBuf = "cpu value=9223372036854775808i" + uintOverflowBuf = "cpu value=18446744073709551616u" + invalidMeasurementBuf = "cpu" + } + + return []parseTest{ + { + name: "minimal", + input: []byte("cpu value=42 0"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + err: nil, + }, + { + name: "minimal with newline", + input: []byte("cpu value=42 0\n"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(0, 0), + ), + }, + err: nil, + }, + { + name: "measurement escape space", + input: []byte(`c\ pu value=42`), + metrics: []telegraf.Metric{ + metric.New( + "c pu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "measurement escape comma", + input: []byte(`c\,pu value=42`), + metrics: []telegraf.Metric{ + metric.New( + "c,pu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "tags", + input: []byte(`cpu,cpu=cpu0,host=localhost value=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "cpu": "cpu0", + "host": "localhost", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "tags escape unescapable", + input: []byte(`cpu,ho\st=localhost value=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + `ho\st`: "localhost", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "tags escape equals", + input: []byte(`cpu,ho\=st=localhost value=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "ho=st": "localhost", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "tags escape comma", + input: []byte(`cpu,ho\,st=localhost value=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "ho,st": "localhost", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "tag value escape space", + input: []byte(`cpu,host=two\ words value=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "host": "two words", + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "tag value double escape space", + input: []byte(`cpu,host=two\\ words value=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "host": `two\ words`, + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "tag value triple escape space", + input: []byte(`cpu,host=two\\\ words value=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "host": `two\\ words`, + }, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field key escape not escapable", + input: []byte(`cpu va\lue=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + `va\lue`: 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field key escape equals", + input: []byte(`cpu va\=lue=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + `va=lue`: 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field key escape comma", + input: []byte(`cpu va\,lue=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + `va,lue`: 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field key escape space", + input: []byte(`cpu va\ lue=42`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + `va lue`: 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field int", + input: []byte("cpu value=42i"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field int overflow", + input: []byte("cpu value=9223372036854775808i"), + metrics: nil, + err: &ParseError{ + DecodeError: &lineprotocol.DecodeError{ + Line: 1, + Column: 11, + Err: errors.New(`cannot parse value for field key "value": line-protocol value out of range`), + }, + buf: intOverflowBuf, + }, + }, + { + name: "field int max value", + input: []byte("cpu value=9223372036854775807i"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": int64(9223372036854775807), + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field uint", + input: []byte("cpu value=42u"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": uint64(42), + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field uint overflow", + input: []byte("cpu value=18446744073709551616u"), + metrics: nil, + err: &ParseError{ + DecodeError: &lineprotocol.DecodeError{ + Line: 1, + Column: 11, + Err: errors.New(`cannot parse value for field key "value": line-protocol value out of range`), + }, + buf: uintOverflowBuf, + }, + }, + { + name: "field uint max value", + input: []byte("cpu value=18446744073709551615u"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": uint64(18446744073709551615), + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field boolean", + input: []byte("cpu value=true"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": true, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field string", + input: []byte(`cpu value="42"`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": "42", + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field string escape quote", + input: []byte(`cpu value="how\"dy"`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + `value`: `how"dy`, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field string escape backslash", + input: []byte(`cpu value="how\\dy"`), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + `value`: `how\dy`, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "field string newline", + input: []byte("cpu value=\"4\n2\""), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": "4\n2", + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "no timestamp", + input: []byte("cpu value=42"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "no timestamp", + input: []byte("cpu value=42"), + timeFunc: func() time.Time { + return time.Unix(42, 123456789) + }, + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 123456789), + ), + }, + err: nil, + }, + { + name: "multiple lines", + input: []byte("cpu value=42\ncpu value=42"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42.0, + }, + time.Unix(42, 0), + ), + }, + err: nil, + }, + { + name: "invalid measurement only", + input: []byte("cpu"), + metrics: nil, + err: &ParseError{ + DecodeError: &lineprotocol.DecodeError{ + Line: 1, + Column: 4, + Err: errors.New("empty tag name"), + }, + buf: invalidMeasurementBuf, + }, + }, + { + name: "procstat", + input: []byte("procstat,exe=bash,process_name=bash voluntary_context_switches=42i,memory_rss=5103616i,rlimit_memory_data_hard=2147483647i,cpu_time_user=0.02,rlimit_file_locks_soft=2147483647i,pid=29417i,cpu_time_nice=0,rlimit_memory_locked_soft=65536i,read_count=259i,rlimit_memory_vms_hard=2147483647i,memory_swap=0i,rlimit_num_fds_soft=1024i,rlimit_nice_priority_hard=0i,cpu_time_soft_irq=0,cpu_time=0i,rlimit_memory_locked_hard=65536i,realtime_priority=0i,signals_pending=0i,nice_priority=20i,cpu_time_idle=0,memory_stack=139264i,memory_locked=0i,rlimit_memory_stack_soft=8388608i,cpu_time_iowait=0,cpu_time_guest=0,cpu_time_guest_nice=0,rlimit_memory_data_soft=2147483647i,read_bytes=0i,rlimit_cpu_time_soft=2147483647i,involuntary_context_switches=2i,write_bytes=106496i,cpu_time_system=0,cpu_time_irq=0,cpu_usage=0,memory_vms=21659648i,memory_data=1576960i,rlimit_memory_stack_hard=2147483647i,num_threads=1i,rlimit_memory_rss_soft=2147483647i,rlimit_realtime_priority_soft=0i,num_fds=4i,write_count=35i,rlimit_signals_pending_soft=78994i,cpu_time_steal=0,rlimit_num_fds_hard=4096i,rlimit_file_locks_hard=2147483647i,rlimit_cpu_time_hard=2147483647i,rlimit_signals_pending_hard=78994i,rlimit_nice_priority_soft=0i,rlimit_memory_rss_hard=2147483647i,rlimit_memory_vms_soft=2147483647i,rlimit_realtime_priority_hard=0i 1517620624000000000"), + metrics: []telegraf.Metric{ + metric.New( + "procstat", + map[string]string{ + "exe": "bash", + "process_name": "bash", + }, + map[string]interface{}{ + "cpu_time": 0, + "cpu_time_guest": float64(0), + "cpu_time_guest_nice": float64(0), + "cpu_time_idle": float64(0), + "cpu_time_iowait": float64(0), + "cpu_time_irq": float64(0), + "cpu_time_nice": float64(0), + "cpu_time_soft_irq": float64(0), + "cpu_time_steal": float64(0), + "cpu_time_system": float64(0), + "cpu_time_user": float64(0.02), + "cpu_usage": float64(0), + "involuntary_context_switches": 2, + "memory_data": 1576960, + "memory_locked": 0, + "memory_rss": 5103616, + "memory_stack": 139264, + "memory_swap": 0, + "memory_vms": 21659648, + "nice_priority": 20, + "num_fds": 4, + "num_threads": 1, + "pid": 29417, + "read_bytes": 0, + "read_count": 259, + "realtime_priority": 0, + "rlimit_cpu_time_hard": 2147483647, + "rlimit_cpu_time_soft": 2147483647, + "rlimit_file_locks_hard": 2147483647, + "rlimit_file_locks_soft": 2147483647, + "rlimit_memory_data_hard": 2147483647, + "rlimit_memory_data_soft": 2147483647, + "rlimit_memory_locked_hard": 65536, + "rlimit_memory_locked_soft": 65536, + "rlimit_memory_rss_hard": 2147483647, + "rlimit_memory_rss_soft": 2147483647, + "rlimit_memory_stack_hard": 2147483647, + "rlimit_memory_stack_soft": 8388608, + "rlimit_memory_vms_hard": 2147483647, + "rlimit_memory_vms_soft": 2147483647, + "rlimit_nice_priority_hard": 0, + "rlimit_nice_priority_soft": 0, + "rlimit_num_fds_hard": 4096, + "rlimit_num_fds_soft": 1024, + "rlimit_realtime_priority_hard": 0, + "rlimit_realtime_priority_soft": 0, + "rlimit_signals_pending_hard": 78994, + "rlimit_signals_pending_soft": 78994, + "signals_pending": 0, + "voluntary_context_switches": 42, + "write_bytes": 106496, + "write_count": 35, + }, + time.Unix(0, 1517620624000000000), + ), + }, + err: nil, + }, + } +} + +func TestParser(t *testing.T) { + for _, tt := range parseTests(false) { + t.Run(tt.name, func(t *testing.T) { + parser := NewParser() + parser.SetTimeFunc(DefaultTime) + if tt.timeFunc != nil { + parser.SetTimeFunc(tt.timeFunc) + } + + metrics, err := parser.Parse(tt.input) + if tt.err == nil { + require.NoError(t, err) + } else { + require.Equal(t, tt.err.Error(), err.Error()) + } + + require.Equal(t, len(tt.metrics), len(metrics)) + for i, expected := range tt.metrics { + require.Equal(t, expected.Name(), metrics[i].Name()) + require.Equal(t, expected.Tags(), metrics[i].Tags()) + require.Equal(t, expected.Fields(), metrics[i].Fields()) + require.Equal(t, expected.Time(), metrics[i].Time()) + } + }) + } +} + +func BenchmarkParser(b *testing.B) { + for _, tt := range parseTests(false) { + b.Run(tt.name, func(b *testing.B) { + parser := NewParser() + for n := 0; n < b.N; n++ { + metrics, err := parser.Parse(tt.input) + _ = err + _ = metrics + } + }) + } +} + +func TestStreamParser(t *testing.T) { + for _, tt := range parseTests(true) { + t.Run(tt.name, func(t *testing.T) { + r := bytes.NewBuffer(tt.input) + parser := NewStreamParser(r) + parser.SetTimeFunc(DefaultTime) + if tt.timeFunc != nil { + parser.SetTimeFunc(tt.timeFunc) + } + + var i int + for { + m, err := parser.Next() + if err != nil { + if err == ErrEOF { + break + } + require.Equal(t, tt.err.Error(), err.Error()) + break + } + + testutil.RequireMetricEqual(t, tt.metrics[i], m) + i++ + } + }) + } +} + +func TestSeriesParser(t *testing.T) { + var tests = []struct { + name string + input []byte + timeFunc func() time.Time + metrics []telegraf.Metric + err error + }{ + { + name: "empty", + input: []byte(""), + metrics: []telegraf.Metric{}, + }, + { + name: "minimal", + input: []byte("cpu"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + }, + { + name: "tags", + input: []byte("cpu,a=x,b=y"), + metrics: []telegraf.Metric{ + metric.New( + "cpu", + map[string]string{ + "a": "x", + "b": "y", + }, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + }, + { + name: "missing tag value", + input: []byte("cpu,a="), + metrics: []telegraf.Metric{}, + err: &ParseError{ + DecodeError: &lineprotocol.DecodeError{ + Line: 1, + Column: 7, + Err: errors.New(`expected tag value after tag key "a", but none found`), + }, + buf: "cpu,a=", + }, + }, + { + name: "error with carriage return in long line", + input: []byte("cpu,a=" + strings.Repeat("x", maxErrorBufferSize) + "\rcd,b"), + metrics: []telegraf.Metric{}, + err: &ParseError{ + DecodeError: &lineprotocol.DecodeError{ + Line: 1, + Column: 1031, + Err: errors.New(`expected tag key or field but found '\r' instead`), + }, + buf: "cpu,a=" + strings.Repeat("x", maxErrorBufferSize) + "\rcd,b", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := NewSeriesParser() + if tt.timeFunc != nil { + parser.SetTimeFunc(tt.timeFunc) + } + + metrics, err := parser.Parse(tt.input) + require.Equal(t, tt.err, err) + if err != nil { + require.Equal(t, tt.err.Error(), err.Error()) + } + + require.Equal(t, len(tt.metrics), len(metrics)) + for i, expected := range tt.metrics { + require.Equal(t, expected.Name(), metrics[i].Name()) + require.Equal(t, expected.Tags(), metrics[i].Tags()) + } + }) + } +} + +func TestParserErrorString(t *testing.T) { + var ptests = []struct { + name string + input []byte + errString string + }{ + { + name: "multiple line error", + input: []byte("cpu value=42\ncpu value=invalid\ncpu value=42"), + errString: `metric parse error: field value has unrecognized type at 2:11: "cpu value=invalid"`, + }, + { + name: "handler error", + input: []byte("cpu value=9223372036854775808i\ncpu value=42"), + errString: `metric parse error: cannot parse value for field key "value": line-protocol value out of range at 1:11: "cpu value=9223372036854775808i"`, + }, + { + name: "buffer too long", + input: []byte("cpu " + strings.Repeat("ab", maxErrorBufferSize) + "=invalid\ncpu value=42"), + errString: "metric parse error: field value has unrecognized type at 1:2054: \"...b" + strings.Repeat("ab", maxErrorBufferSize/2-1) + "=<-- here\"", + }, + { + name: "multiple line error", + input: []byte("cpu value=42\ncpu value=invalid\ncpu value=42\ncpu value=invalid"), + errString: `metric parse error: field value has unrecognized type at 2:11: "cpu value=invalid"`, + }, + } + + for _, tt := range ptests { + t.Run(tt.name, func(t *testing.T) { + parser := NewParser() + + _, err := parser.Parse(tt.input) + require.Equal(t, tt.errString, err.Error()) + }) + } +} + +func TestStreamParserErrorString(t *testing.T) { + var ptests = []struct { + name string + input []byte + errs []string + }{ + { + name: "multiple line error", + input: []byte("cpu value=42\ncpu value=invalid\ncpu value=42"), + errs: []string{ + `metric parse error: field value has unrecognized type at 2:11`, + }, + }, + { + name: "handler error", + input: []byte("cpu value=9223372036854775808i\ncpu value=42"), + errs: []string{ + `metric parse error: cannot parse value for field key "value": line-protocol value out of range at 1:11`, + }, + }, + { + name: "buffer too long", + input: []byte("cpu " + strings.Repeat("ab", maxErrorBufferSize) + "=invalid\ncpu value=42"), + errs: []string{ + "metric parse error: field value has unrecognized type at 1:2054", + }, + }, + { + name: "multiple errors", + input: []byte("foo value=1asdf2.0\nfoo value=2.0\nfoo value=3asdf2.0\nfoo value=4.0"), + errs: []string{ + `metric parse error: cannot parse value for field key "value": invalid float value syntax at 1:11`, + `metric parse error: cannot parse value for field key "value": invalid float value syntax at 3:11`, + }, + }, + } + + for _, tt := range ptests { + t.Run(tt.name, func(t *testing.T) { + parser := NewStreamParser(bytes.NewBuffer(tt.input)) + + var errs []error + for i := 0; i < 20; i++ { + _, err := parser.Next() + if err == ErrEOF { + break + } + + if err != nil { + errs = append(errs, err) + } + } + + require.Equal(t, len(tt.errs), len(errs)) + for i, err := range errs { + require.Equal(t, tt.errs[i], err.Error()) + } + }) + } +} + +type MockReader struct { + ReadF func(p []byte) (int, error) +} + +func (r *MockReader) Read(p []byte) (int, error) { + return r.ReadF(p) +} + +// Errors from the Reader are returned from the Parser +func TestStreamParserReaderError(t *testing.T) { + readerErr := errors.New("error but not eof") + + parser := NewStreamParser(&MockReader{ + ReadF: func(p []byte) (int, error) { + return 0, readerErr + }, + }) + _, err := parser.Next() + require.Error(t, err) + require.Equal(t, err, readerErr) + + _, err = parser.Next() + require.Equal(t, err, ErrEOF) +} + +func TestStreamParserProducesAllAvailableMetrics(t *testing.T) { + r, w := io.Pipe() + + parser := NewStreamParser(r) + parser.SetTimeFunc(DefaultTime) + + go func() { + _, err := w.Write([]byte("metric value=1\nmetric2 value=1\n")) + require.NoError(t, err) + }() + + _, err := parser.Next() + require.NoError(t, err) + + // should not block on second read + _, err = parser.Next() + require.NoError(t, err) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 985d2ab0d..5514b818c 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json_v2" "github.com/influxdata/telegraf/plugins/parsers/logfmt" @@ -190,6 +191,9 @@ type Config struct { // JSONPath configuration JSONV2Config []JSONV2Config `toml:"json_v2"` + + // Influx configuration + InfluxParserType string `toml:"influx_parser_type"` } type XPathConfig xpath.Config @@ -222,7 +226,11 @@ func NewParser(config *Config) (Parser, error) { parser, err = NewValueParser(config.MetricName, config.DataType, config.ValueFieldName, config.DefaultTags) case "influx": - parser, err = NewInfluxParser() + if config.InfluxParserType == "upstream" { + parser, err = NewInfluxUpstreamParser() + } else { + parser, err = NewInfluxParser() + } case "nagios": parser, err = NewNagiosParser() case "graphite": @@ -323,6 +331,10 @@ func NewInfluxParser() (Parser, error) { return influx.NewParser(handler), nil } +func NewInfluxUpstreamParser() (Parser, error) { + return influx_upstream.NewParser(), nil +} + func NewGraphiteParser( separator string, templates []string,