feat(parsers.influx): New influx line protocol via feature flag (#10749)

This commit is contained in:
Joshua Powers 2022-03-10 15:09:58 -07:00 committed by GitHub
parent d8a8816ebc
commit 40ed7fb693
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 2056 additions and 499 deletions

View File

@ -1603,6 +1603,9 @@ func (c *Config) getParserConfig(name string, tbl *ast.Table) (*parsers.Config,
c.getFieldString(tbl, "value_field_name", &pc.ValueFieldName) c.getFieldString(tbl, "value_field_name", &pc.ValueFieldName)
// for influx parser
c.getFieldString(tbl, "influx_parser_type", &pc.InfluxParserType)
//for XPath parser family //for XPath parser family
if choice.Contains(pc.DataFormat, []string{"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf"}) { if choice.Contains(pc.DataFormat, []string{"xml", "xpath_json", "xpath_msgpack", "xpath_protobuf"}) {
c.getFieldString(tbl, "xpath_protobuf_file", &pc.XPathProtobufFile) c.getFieldString(tbl, "xpath_protobuf_file", &pc.XPathProtobufFile)
@ -1823,7 +1826,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
"fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys", "fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys",
"grace", "graphite_separator", "graphite_tag_sanitize_mode", "graphite_tag_support", "grace", "graphite_separator", "graphite_tag_sanitize_mode", "graphite_tag_support",
"grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns", "grok_custom_pattern_files", "grok_custom_patterns", "grok_named_patterns", "grok_patterns",
"grok_timezone", "grok_unique_timestamp", "influx_max_line_bytes", "influx_sort_fields", "grok_timezone", "grok_unique_timestamp", "influx_max_line_bytes", "influx_parser_type", "influx_sort_fields",
"influx_uint_support", "interval", "json_name_key", "json_query", "json_strict", "influx_uint_support", "interval", "json_name_key", "json_query", "json_strict",
"json_string_fields", "json_time_format", "json_time_key", "json_timestamp_format", "json_timestamp_units", "json_timezone", "json_v2", "json_string_fields", "json_time_format", "json_time_key", "json_timestamp_format", "json_timestamp_units", "json_timezone", "json_v2",
"lvm", "metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix", "lvm", "metric_batch_size", "metric_buffer_limit", "name_override", "name_prefix",

View File

@ -139,6 +139,7 @@ following works:
- github.com/influxdata/influxdb-observability/common [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE) - github.com/influxdata/influxdb-observability/common [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE)
- github.com/influxdata/influxdb-observability/influx2otel [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE) - github.com/influxdata/influxdb-observability/influx2otel [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE)
- github.com/influxdata/influxdb-observability/otel2influx [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE) - github.com/influxdata/influxdb-observability/otel2influx [MIT License](https://github.com/influxdata/influxdb-observability/blob/main/LICENSE)
- github.com/influxdata/line-protocol [MIT License](https://github.com/influxdata/line-protocol/blob/v2/LICENSE)
- github.com/influxdata/tail [MIT License](https://github.com/influxdata/tail/blob/master/LICENSE.txt) - github.com/influxdata/tail [MIT License](https://github.com/influxdata/tail/blob/master/LICENSE.txt)
- github.com/influxdata/toml [MIT License](https://github.com/influxdata/toml/blob/master/LICENSE) - github.com/influxdata/toml [MIT License](https://github.com/influxdata/toml/blob/master/LICENSE)
- github.com/influxdata/wlog [MIT License](https://github.com/influxdata/wlog/blob/master/LICENSE) - github.com/influxdata/wlog [MIT License](https://github.com/influxdata/wlog/blob/master/LICENSE)

1
go.mod
View File

@ -81,6 +81,7 @@ require (
github.com/influxdata/influxdb-observability/common v0.2.10 github.com/influxdata/influxdb-observability/common v0.2.10
github.com/influxdata/influxdb-observability/influx2otel v0.2.10 github.com/influxdata/influxdb-observability/influx2otel v0.2.10
github.com/influxdata/influxdb-observability/otel2influx v0.2.10 github.com/influxdata/influxdb-observability/otel2influx v0.2.10
github.com/influxdata/line-protocol/v2 v2.2.1
github.com/influxdata/tail v1.0.1-0.20210707231403-b283181d1fa7 github.com/influxdata/tail v1.0.1-0.20210707231403-b283181d1fa7
github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65 github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65
github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8

13
go.sum
View File

@ -788,8 +788,11 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.7.3/go.mod h1:V1d2J5pfxYH6EjBAgSK7YNXcXlTWxUHdE1sVDXkjnig= github.com/frankban/quicktest v1.7.3/go.mod h1:V1d2J5pfxYH6EjBAgSK7YNXcXlTWxUHdE1sVDXkjnig=
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.13.0 h1:yNZif1OkDfNoDfb9zZa9aXIpejNR4F23Wely0c+Qdqk=
github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
@ -1299,7 +1302,15 @@ github.com/influxdata/influxdb-observability/otel2influx v0.2.10 h1:sNZCYUExwCWs
github.com/influxdata/influxdb-observability/otel2influx v0.2.10/go.mod h1:UOa19v6sU7EpL1dPK79Yt+mZ+1/iOwvMqcFu9yVXenw= github.com/influxdata/influxdb-observability/otel2influx v0.2.10/go.mod h1:UOa19v6sU7EpL1dPK79Yt+mZ+1/iOwvMqcFu9yVXenw=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk= github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE=
github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98=
github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937 h1:MHJNQ+p99hFATQm6ORoLmpUCF7ovjwEFshs/NHzAbig=
github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo=
github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY=
github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY=
github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE=
github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM=
github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8= github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8=
github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE= github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE=
github.com/influxdata/tail v1.0.1-0.20210707231403-b283181d1fa7 h1:0rQOs1VHLVFpAAOIR0mJEvVOIaMYFgYdreeVbgI9sII= github.com/influxdata/tail v1.0.1-0.20210707231403-b283181d1fa7 h1:0rQOs1VHLVFpAAOIR0mJEvVOIaMYFgYdreeVbgI9sII=

View File

@ -62,6 +62,11 @@ submits data to InfluxDB determines the destination database.
## You probably want to make sure you have TLS configured above for this. ## You probably want to make sure you have TLS configured above for this.
# basic_username = "foobar" # basic_username = "foobar"
# basic_password = "barfoo" # basic_password = "barfoo"
## Influx line protocol parser
## 'internal' is the default. 'upstream' is a newer parser that is faster
## and more memory efficient.
# parser_type = "internal"
``` ```
## Metrics ## Metrics

View File

@ -16,6 +16,7 @@ import (
tlsint "github.com/influxdata/telegraf/plugins/common/tls" tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream"
"github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/selfstat"
) )
@ -38,6 +39,7 @@ type InfluxDBListener struct {
BasicPassword string `toml:"basic_password"` BasicPassword string `toml:"basic_password"`
DatabaseTag string `toml:"database_tag"` DatabaseTag string `toml:"database_tag"`
RetentionPolicyTag string `toml:"retention_policy_tag"` RetentionPolicyTag string `toml:"retention_policy_tag"`
ParserType string `toml:"parser_type"`
timeFunc influx.TimeFunc timeFunc influx.TimeFunc
@ -96,6 +98,11 @@ const sampleConfig = `
## You probably want to make sure you have TLS configured above for this. ## You probably want to make sure you have TLS configured above for this.
# basic_username = "foobar" # basic_username = "foobar"
# basic_password = "barfoo" # basic_password = "barfoo"
## Influx line protocol parser
## 'internal' is the default. 'upstream' is a newer parser that is faster
## and more memory efficient.
# parser_type = "internal"
` `
func (h *InfluxDBListener) SampleConfig() string { func (h *InfluxDBListener) SampleConfig() string {
@ -254,6 +261,15 @@ func (h *InfluxDBListener) handleDefault() http.HandlerFunc {
func (h *InfluxDBListener) handleWrite() http.HandlerFunc { func (h *InfluxDBListener) handleWrite() http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) { return func(res http.ResponseWriter, req *http.Request) {
if h.ParserType == "upstream" {
h.handleWriteUpstreamParser(res, req)
}
h.handleWriteInternalParser(res, req)
}
}
func (h *InfluxDBListener) handleWriteInternalParser(res http.ResponseWriter, req *http.Request) {
defer h.writesServed.Incr(1) defer h.writesServed.Incr(1)
// Check that the content length is not too large for us to handle. // Check that the content length is not too large for us to handle.
if req.ContentLength > int64(h.MaxBodySize) { if req.ContentLength > int64(h.MaxBodySize) {
@ -359,7 +375,114 @@ func (h *InfluxDBListener) handleWrite() http.HandlerFunc {
// http request success // http request success
res.WriteHeader(http.StatusNoContent) res.WriteHeader(http.StatusNoContent)
}
func (h *InfluxDBListener) handleWriteUpstreamParser(res http.ResponseWriter, req *http.Request) {
defer h.writesServed.Incr(1)
// Check that the content length is not too large for us to handle.
if req.ContentLength > int64(h.MaxBodySize) {
if err := tooLarge(res); err != nil {
h.Log.Debugf("error in too-large: %v", err)
} }
return
}
db := req.URL.Query().Get("db")
rp := req.URL.Query().Get("rp")
body := req.Body
body = http.MaxBytesReader(res, body, int64(h.MaxBodySize))
// Handle gzip request bodies
if req.Header.Get("Content-Encoding") == "gzip" {
var err error
body, err = gzip.NewReader(body)
if err != nil {
h.Log.Debugf("Error decompressing request body: %v", err.Error())
if err := badRequest(res, err.Error()); err != nil {
h.Log.Debugf("error in bad-request: %v", err)
}
return
}
defer body.Close()
}
parser := influx_upstream.NewStreamParser(body)
parser.SetTimeFunc(influx_upstream.TimeFunc(h.timeFunc))
precisionStr := req.URL.Query().Get("precision")
if precisionStr != "" {
precision := getPrecisionMultiplier(precisionStr)
parser.SetTimePrecision(precision)
}
if req.ContentLength >= 0 {
h.bytesRecv.Incr(req.ContentLength)
}
var m telegraf.Metric
var err error
var parseErrorCount int
var firstParseErrorStr string
for {
select {
case <-req.Context().Done():
// Shutting down before parsing is finished.
res.WriteHeader(http.StatusServiceUnavailable)
return
default:
}
m, err = parser.Next()
// Continue parsing metrics even if some are malformed
if parseErr, ok := err.(*influx_upstream.ParseError); ok {
parseErrorCount++
errStr := parseErr.Error()
if firstParseErrorStr == "" {
firstParseErrorStr = errStr
}
continue
} else if err != nil {
// Either we're exiting cleanly (err ==
// influx.ErrEOF) or there's an unexpected error
break
}
if h.DatabaseTag != "" && db != "" {
m.AddTag(h.DatabaseTag, db)
}
if h.RetentionPolicyTag != "" && rp != "" {
m.AddTag(h.RetentionPolicyTag, rp)
}
h.acc.AddMetric(m)
}
if err != influx_upstream.ErrEOF {
h.Log.Debugf("Error parsing the request body: %v", err.Error())
if err := badRequest(res, err.Error()); err != nil {
h.Log.Debugf("error in bad-request: %v", err)
}
return
}
if parseErrorCount > 0 {
var partialErrorString string
switch parseErrorCount {
case 1:
partialErrorString = firstParseErrorStr
case 2:
partialErrorString = fmt.Sprintf("%s (and 1 other parse error)", firstParseErrorStr)
default:
partialErrorString = fmt.Sprintf("%s (and %d other parse errors)", firstParseErrorStr, parseErrorCount-1)
}
if err := partialWrite(res, partialErrorString); err != nil {
h.Log.Debugf("error in partial-write: %v", err)
}
return
}
// http request success
res.WriteHeader(http.StatusNoContent)
} }
func tooLarge(res http.ResponseWriter) error { func tooLarge(res http.ResponseWriter) error {

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"fmt"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -45,6 +46,12 @@ cpu,host=c value1=1`
var ( var (
pki = testutil.NewPKI("../../../testutil/pki") pki = testutil.NewPKI("../../../testutil/pki")
parserTestCases = []struct {
parser string
}{
{"upstream"},
{"internal"},
}
) )
func newTestListener() *InfluxDBListener { func newTestListener() *InfluxDBListener {
@ -159,7 +166,10 @@ func TestWriteBasicAuth(t *testing.T) {
func TestWriteKeepDatabase(t *testing.T) { func TestWriteKeepDatabase(t *testing.T) {
testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n" testMsgWithDB := "cpu_load_short,host=server01,database=wrongdb value=12.0 1422568543702900257\n"
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
listener.DatabaseTag = "database" listener.DatabaseTag = "database"
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
@ -206,6 +216,8 @@ func TestWriteKeepDatabase(t *testing.T) {
map[string]string{"host": hostTag, "database": "mydb"}, map[string]string{"host": hostTag, "database": "mydb"},
) )
} }
})
}
} }
func TestWriteRetentionPolicyTag(t *testing.T) { func TestWriteRetentionPolicyTag(t *testing.T) {
@ -241,7 +253,10 @@ func TestWriteRetentionPolicyTag(t *testing.T) {
// http listener should add a newline at the end of the buffer if it's not there // http listener should add a newline at the end of the buffer if it's not there
func TestWriteNoNewline(t *testing.T) { func TestWriteNoNewline(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -259,10 +274,15 @@ func TestWriteNoNewline(t *testing.T) {
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"}, map[string]string{"host": "server01"},
) )
})
}
} }
func TestPartialWrite(t *testing.T) { func TestPartialWrite(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -284,12 +304,17 @@ func TestPartialWrite(t *testing.T) {
map[string]interface{}{"value1": float64(1)}, map[string]interface{}{"value1": float64(1)},
map[string]string{"host": "c"}, map[string]string{"host": "c"},
) )
})
}
} }
func TestWriteMaxLineSizeIncrease(t *testing.T) { func TestWriteMaxLineSizeIncrease(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := &InfluxDBListener{ listener := &InfluxDBListener{
Log: testutil.Logger{}, Log: testutil.Logger{},
ServiceAddress: "localhost:0", ServiceAddress: "localhost:0",
ParserType: tc.parser,
timeFunc: time.Now, timeFunc: time.Now,
} }
@ -303,13 +328,18 @@ func TestWriteMaxLineSizeIncrease(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
})
}
} }
func TestWriteVerySmallMaxBody(t *testing.T) { func TestWriteVerySmallMaxBody(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := &InfluxDBListener{ listener := &InfluxDBListener{
Log: testutil.Logger{}, Log: testutil.Logger{},
ServiceAddress: "localhost:0", ServiceAddress: "localhost:0",
MaxBodySize: config.Size(4096), MaxBodySize: config.Size(4096),
ParserType: tc.parser,
timeFunc: time.Now, timeFunc: time.Now,
} }
@ -318,16 +348,25 @@ func TestWriteVerySmallMaxBody(t *testing.T) {
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
for _, parser := range []string{"internal", "upstream"} {
listener.ParserType = parser
resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric))) resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(hugeMetric)))
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 413, resp.StatusCode) require.EqualValues(t, 413, resp.StatusCode)
}
})
}
} }
func TestWriteLargeLine(t *testing.T) { func TestWriteLargeLine(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := &InfluxDBListener{ listener := &InfluxDBListener{
Log: testutil.Logger{}, Log: testutil.Logger{},
ServiceAddress: "localhost:0", ServiceAddress: "localhost:0",
ParserType: tc.parser,
timeFunc: func() time.Time { timeFunc: func() time.Time {
return time.Unix(123456789, 0) return time.Unix(123456789, 0)
}, },
@ -396,11 +435,16 @@ func TestWriteLargeLine(t *testing.T) {
map[string]string{"host": hostTag}, map[string]string{"host": hostTag},
) )
} }
})
}
} }
// test that writing gzipped data works // test that writing gzipped data works
func TestWriteGzippedData(t *testing.T) { func TestWriteGzippedData(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -429,6 +473,8 @@ func TestWriteGzippedData(t *testing.T) {
map[string]string{"host": hostTag}, map[string]string{"host": hostTag},
) )
} }
})
}
} }
// writes 25,000 metrics to the listener with 10 different writers // writes 25,000 metrics to the listener with 10 different writers
@ -472,7 +518,10 @@ func TestWriteHighTraffic(t *testing.T) {
} }
func TestReceive404ForInvalidEndpoint(t *testing.T) { func TestReceive404ForInvalidEndpoint(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -484,10 +533,15 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 404, resp.StatusCode) require.EqualValues(t, 404, resp.StatusCode)
})
}
} }
func TestWriteInvalid(t *testing.T) { func TestWriteInvalid(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -499,10 +553,15 @@ func TestWriteInvalid(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 400, resp.StatusCode) require.EqualValues(t, 400, resp.StatusCode)
})
}
} }
func TestWriteEmpty(t *testing.T) { func TestWriteEmpty(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -514,10 +573,15 @@ func TestWriteEmpty(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
})
}
} }
func TestQuery(t *testing.T) { func TestQuery(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -530,10 +594,16 @@ func TestQuery(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode) require.EqualValues(t, 200, resp.StatusCode)
})
}
} }
func TestPing(t *testing.T) { func TestPing(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
@ -546,10 +616,16 @@ func TestPing(t *testing.T) {
require.Len(t, resp.Header["Content-Type"], 0) require.Len(t, resp.Header["Content-Type"], 0)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
})
}
} }
func TestPingVerbose(t *testing.T) { func TestPingVerbose(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
@ -562,10 +638,15 @@ func TestPingVerbose(t *testing.T) {
require.Equal(t, "application/json", resp.Header["Content-Type"][0]) require.Equal(t, "application/json", resp.Header["Content-Type"][0])
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 200, resp.StatusCode) require.EqualValues(t, 200, resp.StatusCode)
})
}
} }
func TestWriteWithPrecision(t *testing.T) { func TestWriteWithPrecision(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -584,10 +665,15 @@ func TestWriteWithPrecision(t *testing.T) {
// When timestamp is provided, the precision parameter is // When timestamp is provided, the precision parameter is
// overloaded to specify the timestamp's unit // overloaded to specify the timestamp's unit
require.Equal(t, time.Unix(0, 1422568543000000000), acc.Metrics[0].Time) require.Equal(t, time.Unix(0, 1422568543000000000), acc.Metrics[0].Time)
})
}
} }
func TestWriteWithPrecisionNoTimestamp(t *testing.T) { func TestWriteWithPrecisionNoTimestamp(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
listener.timeFunc = func() time.Time { listener.timeFunc = func() time.Time {
return time.Unix(42, 123456789) return time.Unix(42, 123456789)
} }
@ -610,6 +696,50 @@ func TestWriteWithPrecisionNoTimestamp(t *testing.T) {
// specifies the precision. The timestamp is set to the greatest // specifies the precision. The timestamp is set to the greatest
// integer unit less than the provided timestamp (floor). // integer unit less than the provided timestamp (floor).
require.Equal(t, time.Unix(42, 0), acc.Metrics[0].Time) require.Equal(t, time.Unix(42, 0), acc.Metrics[0].Time)
})
}
}
func TestWriteUpstreamParseErrors(t *testing.T) {
var tests = []struct {
name string
input string
expected string
}{
{
name: "one parse error",
input: "foo value=1.0\nfoo value=2asdf2.0\nfoo value=3.0\nfoo value=4.0",
expected: `metric parse error: cannot parse value for field key "value": invalid float value syntax at 2:11`,
},
{
name: "two parse errors",
input: "foo value=1asdf2.0\nfoo value=2.0\nfoo value=3asdf2.0\nfoo value=4.0",
expected: `metric parse error: cannot parse value for field key "value": invalid float value syntax at 1:11 (and 1 other parse error)`,
},
{
name: "three or more parse errors",
input: "foo value=1asdf2.0\nfoo value=2.0\nfoo value=3asdf2.0\nfoo value=4asdf2.0",
expected: `metric parse error: cannot parse value for field key "value": invalid float value syntax at 1:11 (and 2 other parse errors)`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
listener := newTestListener()
listener.ParserType = "upstream"
acc := &testutil.NopAccumulator{}
require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc))
defer listener.Stop()
// post single message to listener
resp, err := http.Post(createURL(listener, "http", "/write", ""), "", bytes.NewBuffer([]byte(tt.input)))
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.EqualValues(t, 400, resp.StatusCode)
require.Equal(t, tt.expected, resp.Header["X-Influxdb-Error"][0])
})
}
} }
func TestWriteParseErrors(t *testing.T) { func TestWriteParseErrors(t *testing.T) {

View File

@ -40,6 +40,11 @@ Telegraf minimum version: Telegraf 1.16.0
## Optional token to accept for HTTP authentication. ## Optional token to accept for HTTP authentication.
## You probably want to make sure you have TLS configured above for this. ## You probably want to make sure you have TLS configured above for this.
# token = "some-long-shared-secret-token" # token = "some-long-shared-secret-token"
## Influx line protocol parser
## 'internal' is the default. 'upstream' is a newer parser that is faster
## and more memory efficient.
# parser_type = "internal"
``` ```
## Metrics ## Metrics

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -17,6 +18,7 @@ import (
tlsint "github.com/influxdata/telegraf/plugins/common/tls" tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream"
"github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/selfstat"
) )
@ -26,6 +28,8 @@ const (
defaultMaxBodySize = 32 * 1024 * 1024 defaultMaxBodySize = 32 * 1024 * 1024
) )
var ErrEOF = errors.New("EOF")
// The BadRequestCode constants keep standard error messages // The BadRequestCode constants keep standard error messages
// see: https://v2.docs.influxdata.com/v2.0/api/#operation/PostWrite // see: https://v2.docs.influxdata.com/v2.0/api/#operation/PostWrite
type BadRequestCode string type BadRequestCode string
@ -43,6 +47,7 @@ type InfluxDBV2Listener struct {
MaxBodySize config.Size `toml:"max_body_size"` MaxBodySize config.Size `toml:"max_body_size"`
Token string `toml:"token"` Token string `toml:"token"`
BucketTag string `toml:"bucket_tag"` BucketTag string `toml:"bucket_tag"`
ParserType string `toml:"parser_type"`
timeFunc influx.TimeFunc timeFunc influx.TimeFunc
@ -92,6 +97,11 @@ const sampleConfig = `
## Optional token to accept for HTTP authentication. ## Optional token to accept for HTTP authentication.
## You probably want to make sure you have TLS configured above for this. ## You probably want to make sure you have TLS configured above for this.
# token = "some-long-shared-secret-token" # token = "some-long-shared-secret-token"
## Influx line protocol parser
## 'internal' is the default. 'upstream' is a newer parser that is faster
## and more memory efficient.
# parser_type = "internal"
` `
func (h *InfluxDBV2Listener) SampleConfig() string { func (h *InfluxDBV2Listener) SampleConfig() string {
@ -264,22 +274,35 @@ func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
} }
return return
} }
precisionStr := req.URL.Query().Get("precision")
var metrics []telegraf.Metric
var err error
if h.ParserType == "upstream" {
parser := influx_upstream.NewParser()
parser.SetTimeFunc(influx_upstream.TimeFunc(h.timeFunc))
if precisionStr != "" {
precision := getPrecisionMultiplier(precisionStr)
parser.SetTimePrecision(precision)
}
metrics, err = parser.Parse(bytes)
} else {
metricHandler := influx.NewMetricHandler() metricHandler := influx.NewMetricHandler()
parser := influx.NewParser(metricHandler) parser := influx.NewParser(metricHandler)
parser.SetTimeFunc(h.timeFunc) parser.SetTimeFunc(h.timeFunc)
precisionStr := req.URL.Query().Get("precision")
if precisionStr != "" { if precisionStr != "" {
precision := getPrecisionMultiplier(precisionStr) precision := getPrecisionMultiplier(precisionStr)
metricHandler.SetTimePrecision(precision) metricHandler.SetTimePrecision(precision)
} }
var metrics []telegraf.Metric
var err error
metrics, err = parser.Parse(bytes) metrics, err = parser.Parse(bytes)
}
if err != influx.EOF && err != nil { if err != ErrEOF && err != nil {
h.Log.Debugf("Error parsing the request body: %v", err.Error()) h.Log.Debugf("Error parsing the request body: %v", err.Error())
if err := badRequest(res, Invalid, err.Error()); err != nil { if err := badRequest(res, Invalid, err.Error()); err != nil {
h.Log.Debugf("error in bad-request: %v", err) h.Log.Debugf("error in bad-request: %v", err)

View File

@ -45,6 +45,12 @@ cpu,host=c value1=1`
var ( var (
pki = testutil.NewPKI("../../../testutil/pki") pki = testutil.NewPKI("../../../testutil/pki")
parserTestCases = []struct {
parser string
}{
{"upstream"},
{"internal"},
}
) )
func newTestListener() *InfluxDBV2Listener { func newTestListener() *InfluxDBV2Listener {
@ -209,7 +215,10 @@ func TestWriteKeepBucket(t *testing.T) {
// http listener should add a newline at the end of the buffer if it's not there // http listener should add a newline at the end of the buffer if it's not there
func TestWriteNoNewline(t *testing.T) { func TestWriteNoNewline(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -227,28 +236,37 @@ func TestWriteNoNewline(t *testing.T) {
map[string]interface{}{"value": float64(12)}, map[string]interface{}{"value": float64(12)},
map[string]string{"host": "server01"}, map[string]string{"host": "server01"},
) )
})
}
} }
func TestAllOrNothing(t *testing.T) { func TestAllOrNothing(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
require.NoError(t, listener.Start(acc)) require.NoError(t, listener.Start(acc))
defer listener.Stop() defer listener.Stop()
// post single message to listener
resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(testPartial))) resp, err := http.Post(createURL(listener, "http", "/api/v2/write", "bucket=mybucket"), "", bytes.NewBuffer([]byte(testPartial)))
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 400, resp.StatusCode) require.EqualValues(t, 400, resp.StatusCode)
})
}
} }
func TestWriteMaxLineSizeIncrease(t *testing.T) { func TestWriteMaxLineSizeIncrease(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := &InfluxDBV2Listener{ listener := &InfluxDBV2Listener{
Log: testutil.Logger{}, Log: testutil.Logger{},
ServiceAddress: "localhost:0", ServiceAddress: "localhost:0",
timeFunc: time.Now, timeFunc: time.Now,
ParserType: tc.parser,
} }
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
@ -261,14 +279,19 @@ func TestWriteMaxLineSizeIncrease(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
})
}
} }
func TestWriteVerySmallMaxBody(t *testing.T) { func TestWriteVerySmallMaxBody(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := &InfluxDBV2Listener{ listener := &InfluxDBV2Listener{
Log: testutil.Logger{}, Log: testutil.Logger{},
ServiceAddress: "localhost:0", ServiceAddress: "localhost:0",
MaxBodySize: config.Size(4096), MaxBodySize: config.Size(4096),
timeFunc: time.Now, timeFunc: time.Now,
ParserType: tc.parser,
} }
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
@ -280,6 +303,8 @@ func TestWriteVerySmallMaxBody(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 413, resp.StatusCode) require.EqualValues(t, 413, resp.StatusCode)
})
}
} }
func TestWriteLargeLine(t *testing.T) { func TestWriteLargeLine(t *testing.T) {
@ -430,7 +455,10 @@ func TestWriteHighTraffic(t *testing.T) {
} }
func TestReceive404ForInvalidEndpoint(t *testing.T) { func TestReceive404ForInvalidEndpoint(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -442,10 +470,15 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 404, resp.StatusCode) require.EqualValues(t, 404, resp.StatusCode)
})
}
} }
func TestWriteInvalid(t *testing.T) { func TestWriteInvalid(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -457,10 +490,15 @@ func TestWriteInvalid(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 400, resp.StatusCode) require.EqualValues(t, 400, resp.StatusCode)
})
}
} }
func TestWriteEmpty(t *testing.T) { func TestWriteEmpty(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -472,6 +510,8 @@ func TestWriteEmpty(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, resp.Body.Close()) require.NoError(t, resp.Body.Close())
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
})
}
} }
func TestReady(t *testing.T) { func TestReady(t *testing.T) {
@ -496,7 +536,10 @@ func TestReady(t *testing.T) {
} }
func TestWriteWithPrecision(t *testing.T) { func TestWriteWithPrecision(t *testing.T) {
for _, tc := range parserTestCases {
t.Run(fmt.Sprintf("parser %s", tc.parser), func(t *testing.T) {
listener := newTestListener() listener := newTestListener()
listener.ParserType = tc.parser
acc := &testutil.Accumulator{} acc := &testutil.Accumulator{}
require.NoError(t, listener.Init()) require.NoError(t, listener.Init())
@ -511,10 +554,11 @@ func TestWriteWithPrecision(t *testing.T) {
require.EqualValues(t, 204, resp.StatusCode) require.EqualValues(t, 204, resp.StatusCode)
acc.Wait(1) acc.Wait(1)
require.Equal(t, 1, len(acc.Metrics))
// When timestamp is provided, the precision parameter is // When timestamp is provided, the precision parameter is
// overloaded to specify the timestamp's unit // overloaded to specify the timestamp's unit
require.Equal(t, time.Unix(0, 1422568543000000000), acc.Metrics[0].Time) require.Equal(t, time.Unix(0, 1422568543000000000), acc.Metrics[0].Time)
})
}
} }
func TestWriteWithPrecisionNoTimestamp(t *testing.T) { func TestWriteWithPrecisionNoTimestamp(t *testing.T) {

View File

@ -1,9 +1,8 @@
# InfluxDB Line Protocol # Influx Line Protocol
There are no additional configuration options for InfluxDB [line protocol][]. The Parses metrics using the [Influx Line Protocol][].
metrics are parsed directly into Telegraf metrics.
[line protocol]: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/ [Influx Line Protocol]: https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
## Configuration ## Configuration
@ -16,4 +15,9 @@ metrics are parsed directly into Telegraf metrics.
## more about them here: ## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx" data_format = "influx"
## Influx line protocol parser
## 'internal' is the default. 'upstream' is a newer parser that is faster
## and more memory efficient.
## influx_parser_version = "internal"
``` ```

View File

@ -0,0 +1,4 @@
# Influx Line Protocol
This package implements the upstream Influx line protocol parser. See the
Influx README.md for more details.

View File

@ -0,0 +1,299 @@
package influx_upstream
import (
"errors"
"fmt"
"io"
"strings"
"time"
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
const (
maxErrorBufferSize = 1024
)
var (
ErrNoMetric = errors.New("no metric in line")
ErrEOF = errors.New("EOF")
)
type TimeFunc func() time.Time
// nthIndexAny finds the nth index of some unicode code point in a string or returns -1
func nthIndexAny(s, chars string, n int) int {
offset := 0
for found := 1; found <= n; found++ {
i := strings.IndexAny(s[offset:], chars)
if i < 0 {
break
}
offset += i
if found == n {
return offset
}
offset += len(chars)
}
return -1
}
// ParseError indicates a error in the parsing of the text.
type ParseError struct {
*lineprotocol.DecodeError
buf string
}
func (e *ParseError) Error() string {
// When an error occurs within the stream decoder, we do not have access
// to the internal buffer, so we cannot display the contents of the invalid
// metric
if e.buf == "" {
return fmt.Sprintf("metric parse error: %s at %d:%d", e.Err, e.Line, e.Column)
}
lineStart := nthIndexAny(e.buf, "\n", int(e.Line-1)) + 1
buffer := e.buf[lineStart:]
eol := strings.IndexAny(buffer, "\n")
if eol >= 0 {
buffer = strings.TrimSuffix(buffer[:eol], "\r")
}
if len(buffer) > maxErrorBufferSize {
startEllipsis := true
offset := e.Column - 1 - lineStart
if offset > len(buffer) || offset < 0 {
offset = len(buffer)
}
start := offset - maxErrorBufferSize
if start < 0 {
startEllipsis = false
start = 0
}
// if we trimmed it the column won't line up. it'll always be the last character,
// because the parser doesn't continue past it, but point it out anyway so
// it's obvious where the issue is.
buffer = buffer[start:offset] + "<-- here"
if startEllipsis {
buffer = "..." + buffer
}
}
return fmt.Sprintf("metric parse error: %s at %d:%d: %q", e.Err, e.Line, e.Column, buffer)
}
// convertToParseError attempts to convert a lineprotocol.DecodeError to a ParseError
func convertToParseError(input []byte, rawErr error) error {
err, ok := rawErr.(*lineprotocol.DecodeError)
if !ok {
return rawErr
}
return &ParseError{
DecodeError: err,
buf: string(input),
}
}
// Parser is an InfluxDB Line Protocol parser that implements the
// parsers.Parser interface.
type Parser struct {
DefaultTags map[string]string
defaultTime TimeFunc
precision lineprotocol.Precision
allowPartial bool
}
// NewParser returns a Parser than accepts line protocol
func NewParser() *Parser {
return &Parser{
defaultTime: time.Now,
precision: lineprotocol.Nanosecond,
}
}
// NewSeriesParser returns a Parser than accepts a measurement and tagset
func NewSeriesParser() *Parser {
return &Parser{
defaultTime: time.Now,
precision: lineprotocol.Nanosecond,
allowPartial: true,
}
}
func (p *Parser) SetTimeFunc(f TimeFunc) {
p.defaultTime = f
}
func (p *Parser) Parse(input []byte) ([]telegraf.Metric, error) {
metrics := make([]telegraf.Metric, 0)
decoder := lineprotocol.NewDecoderWithBytes(input)
for decoder.Next() {
m, err := nextMetric(decoder, p.precision, p.defaultTime, p.allowPartial)
if err != nil {
return nil, convertToParseError(input, err)
}
metrics = append(metrics, m)
}
p.applyDefaultTags(metrics)
return metrics, nil
}
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, ErrNoMetric
}
return metrics[0], nil
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func (p *Parser) SetTimePrecision(u time.Duration) {
switch u {
case time.Nanosecond:
p.precision = lineprotocol.Nanosecond
case time.Microsecond:
p.precision = lineprotocol.Microsecond
case time.Millisecond:
p.precision = lineprotocol.Millisecond
case time.Second:
p.precision = lineprotocol.Second
}
}
func (p *Parser) applyDefaultTags(metrics []telegraf.Metric) {
if len(p.DefaultTags) == 0 {
return
}
for _, m := range metrics {
p.applyDefaultTagsSingle(m)
}
}
func (p *Parser) applyDefaultTagsSingle(m telegraf.Metric) {
for k, v := range p.DefaultTags {
if !m.HasTag(k) {
m.AddTag(k, v)
}
}
}
// StreamParser is an InfluxDB Line Protocol parser. It is not safe for
// concurrent use in multiple goroutines.
type StreamParser struct {
decoder *lineprotocol.Decoder
defaultTime TimeFunc
precision lineprotocol.Precision
lastError error
}
func NewStreamParser(r io.Reader) *StreamParser {
return &StreamParser{
decoder: lineprotocol.NewDecoder(r),
defaultTime: time.Now,
precision: lineprotocol.Nanosecond,
}
}
// SetTimeFunc changes the function used to determine the time of metrics
// without a timestamp. The default TimeFunc is time.Now. Useful mostly for
// testing, or perhaps if you want all metrics to have the same timestamp.
func (sp *StreamParser) SetTimeFunc(f TimeFunc) {
sp.defaultTime = f
}
func (sp *StreamParser) SetTimePrecision(u time.Duration) {
switch u {
case time.Nanosecond:
sp.precision = lineprotocol.Nanosecond
case time.Microsecond:
sp.precision = lineprotocol.Microsecond
case time.Millisecond:
sp.precision = lineprotocol.Millisecond
case time.Second:
sp.precision = lineprotocol.Second
}
}
// Next parses the next item from the stream. You can repeat calls to this
// function if it returns ParseError to get the next metric or error.
func (sp *StreamParser) Next() (telegraf.Metric, error) {
if !sp.decoder.Next() {
if err := sp.decoder.Err(); err != nil && err != sp.lastError {
sp.lastError = err
return nil, err
}
return nil, ErrEOF
}
m, err := nextMetric(sp.decoder, sp.precision, sp.defaultTime, false)
if err != nil {
return nil, convertToParseError([]byte{}, err)
}
return m, nil
}
func nextMetric(decoder *lineprotocol.Decoder, precision lineprotocol.Precision, defaultTime TimeFunc, allowPartial bool) (telegraf.Metric, error) {
measurement, err := decoder.Measurement()
if err != nil {
return nil, err
}
m := metric.New(string(measurement), nil, nil, time.Time{})
for {
key, value, err := decoder.NextTag()
if err != nil {
// Allow empty tags for series parser
if strings.Contains(err.Error(), "empty tag name") && allowPartial {
break
}
return nil, err
} else if key == nil {
break
}
m.AddTag(string(key), string(value))
}
for {
key, value, err := decoder.NextField()
if err != nil {
// Allow empty fields for series parser
if strings.Contains(err.Error(), "expected field key") && allowPartial {
break
}
return nil, err
} else if key == nil {
break
}
m.AddField(string(key), value.Interface())
}
t, err := decoder.Time(precision, defaultTime())
if err != nil && !allowPartial {
return nil, err
}
m.SetTime(t)
return m, nil
}

View File

@ -0,0 +1,892 @@
package influx_upstream
import (
"bytes"
"errors"
"io"
"strings"
"testing"
"time"
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
var DefaultTime = func() time.Time {
return time.Unix(42, 0)
}
type parseTest struct {
name string
input []byte
timeFunc TimeFunc
metrics []telegraf.Metric
err error
}
func parseTests(stream bool) []parseTest {
// This is required as there is no way to get the internal buffer
// of the decoder to show where the error occurred. As such, the
// error `buf` will be empty when decoding from a stream.
var (
intOverflowBuf string
uintOverflowBuf string
invalidMeasurementBuf string
)
if stream {
intOverflowBuf = ""
uintOverflowBuf = ""
invalidMeasurementBuf = ""
} else {
intOverflowBuf = "cpu value=9223372036854775808i"
uintOverflowBuf = "cpu value=18446744073709551616u"
invalidMeasurementBuf = "cpu"
}
return []parseTest{
{
name: "minimal",
input: []byte("cpu value=42 0"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
err: nil,
},
{
name: "minimal with newline",
input: []byte("cpu value=42 0\n"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(0, 0),
),
},
err: nil,
},
{
name: "measurement escape space",
input: []byte(`c\ pu value=42`),
metrics: []telegraf.Metric{
metric.New(
"c pu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "measurement escape comma",
input: []byte(`c\,pu value=42`),
metrics: []telegraf.Metric{
metric.New(
"c,pu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "tags",
input: []byte(`cpu,cpu=cpu0,host=localhost value=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{
"cpu": "cpu0",
"host": "localhost",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "tags escape unescapable",
input: []byte(`cpu,ho\st=localhost value=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{
`ho\st`: "localhost",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "tags escape equals",
input: []byte(`cpu,ho\=st=localhost value=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{
"ho=st": "localhost",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "tags escape comma",
input: []byte(`cpu,ho\,st=localhost value=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{
"ho,st": "localhost",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "tag value escape space",
input: []byte(`cpu,host=two\ words value=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{
"host": "two words",
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "tag value double escape space",
input: []byte(`cpu,host=two\\ words value=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{
"host": `two\ words`,
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "tag value triple escape space",
input: []byte(`cpu,host=two\\\ words value=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{
"host": `two\\ words`,
},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field key escape not escapable",
input: []byte(`cpu va\lue=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
`va\lue`: 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field key escape equals",
input: []byte(`cpu va\=lue=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
`va=lue`: 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field key escape comma",
input: []byte(`cpu va\,lue=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
`va,lue`: 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field key escape space",
input: []byte(`cpu va\ lue=42`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
`va lue`: 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field int",
input: []byte("cpu value=42i"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field int overflow",
input: []byte("cpu value=9223372036854775808i"),
metrics: nil,
err: &ParseError{
DecodeError: &lineprotocol.DecodeError{
Line: 1,
Column: 11,
Err: errors.New(`cannot parse value for field key "value": line-protocol value out of range`),
},
buf: intOverflowBuf,
},
},
{
name: "field int max value",
input: []byte("cpu value=9223372036854775807i"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": int64(9223372036854775807),
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field uint",
input: []byte("cpu value=42u"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": uint64(42),
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field uint overflow",
input: []byte("cpu value=18446744073709551616u"),
metrics: nil,
err: &ParseError{
DecodeError: &lineprotocol.DecodeError{
Line: 1,
Column: 11,
Err: errors.New(`cannot parse value for field key "value": line-protocol value out of range`),
},
buf: uintOverflowBuf,
},
},
{
name: "field uint max value",
input: []byte("cpu value=18446744073709551615u"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": uint64(18446744073709551615),
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field boolean",
input: []byte("cpu value=true"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": true,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field string",
input: []byte(`cpu value="42"`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": "42",
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field string escape quote",
input: []byte(`cpu value="how\"dy"`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
`value`: `how"dy`,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field string escape backslash",
input: []byte(`cpu value="how\\dy"`),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
`value`: `how\dy`,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "field string newline",
input: []byte("cpu value=\"4\n2\""),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": "4\n2",
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "no timestamp",
input: []byte("cpu value=42"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "no timestamp",
input: []byte("cpu value=42"),
timeFunc: func() time.Time {
return time.Unix(42, 123456789)
},
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 123456789),
),
},
err: nil,
},
{
name: "multiple lines",
input: []byte("cpu value=42\ncpu value=42"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{
"value": 42.0,
},
time.Unix(42, 0),
),
},
err: nil,
},
{
name: "invalid measurement only",
input: []byte("cpu"),
metrics: nil,
err: &ParseError{
DecodeError: &lineprotocol.DecodeError{
Line: 1,
Column: 4,
Err: errors.New("empty tag name"),
},
buf: invalidMeasurementBuf,
},
},
{
name: "procstat",
input: []byte("procstat,exe=bash,process_name=bash voluntary_context_switches=42i,memory_rss=5103616i,rlimit_memory_data_hard=2147483647i,cpu_time_user=0.02,rlimit_file_locks_soft=2147483647i,pid=29417i,cpu_time_nice=0,rlimit_memory_locked_soft=65536i,read_count=259i,rlimit_memory_vms_hard=2147483647i,memory_swap=0i,rlimit_num_fds_soft=1024i,rlimit_nice_priority_hard=0i,cpu_time_soft_irq=0,cpu_time=0i,rlimit_memory_locked_hard=65536i,realtime_priority=0i,signals_pending=0i,nice_priority=20i,cpu_time_idle=0,memory_stack=139264i,memory_locked=0i,rlimit_memory_stack_soft=8388608i,cpu_time_iowait=0,cpu_time_guest=0,cpu_time_guest_nice=0,rlimit_memory_data_soft=2147483647i,read_bytes=0i,rlimit_cpu_time_soft=2147483647i,involuntary_context_switches=2i,write_bytes=106496i,cpu_time_system=0,cpu_time_irq=0,cpu_usage=0,memory_vms=21659648i,memory_data=1576960i,rlimit_memory_stack_hard=2147483647i,num_threads=1i,rlimit_memory_rss_soft=2147483647i,rlimit_realtime_priority_soft=0i,num_fds=4i,write_count=35i,rlimit_signals_pending_soft=78994i,cpu_time_steal=0,rlimit_num_fds_hard=4096i,rlimit_file_locks_hard=2147483647i,rlimit_cpu_time_hard=2147483647i,rlimit_signals_pending_hard=78994i,rlimit_nice_priority_soft=0i,rlimit_memory_rss_hard=2147483647i,rlimit_memory_vms_soft=2147483647i,rlimit_realtime_priority_hard=0i 1517620624000000000"),
metrics: []telegraf.Metric{
metric.New(
"procstat",
map[string]string{
"exe": "bash",
"process_name": "bash",
},
map[string]interface{}{
"cpu_time": 0,
"cpu_time_guest": float64(0),
"cpu_time_guest_nice": float64(0),
"cpu_time_idle": float64(0),
"cpu_time_iowait": float64(0),
"cpu_time_irq": float64(0),
"cpu_time_nice": float64(0),
"cpu_time_soft_irq": float64(0),
"cpu_time_steal": float64(0),
"cpu_time_system": float64(0),
"cpu_time_user": float64(0.02),
"cpu_usage": float64(0),
"involuntary_context_switches": 2,
"memory_data": 1576960,
"memory_locked": 0,
"memory_rss": 5103616,
"memory_stack": 139264,
"memory_swap": 0,
"memory_vms": 21659648,
"nice_priority": 20,
"num_fds": 4,
"num_threads": 1,
"pid": 29417,
"read_bytes": 0,
"read_count": 259,
"realtime_priority": 0,
"rlimit_cpu_time_hard": 2147483647,
"rlimit_cpu_time_soft": 2147483647,
"rlimit_file_locks_hard": 2147483647,
"rlimit_file_locks_soft": 2147483647,
"rlimit_memory_data_hard": 2147483647,
"rlimit_memory_data_soft": 2147483647,
"rlimit_memory_locked_hard": 65536,
"rlimit_memory_locked_soft": 65536,
"rlimit_memory_rss_hard": 2147483647,
"rlimit_memory_rss_soft": 2147483647,
"rlimit_memory_stack_hard": 2147483647,
"rlimit_memory_stack_soft": 8388608,
"rlimit_memory_vms_hard": 2147483647,
"rlimit_memory_vms_soft": 2147483647,
"rlimit_nice_priority_hard": 0,
"rlimit_nice_priority_soft": 0,
"rlimit_num_fds_hard": 4096,
"rlimit_num_fds_soft": 1024,
"rlimit_realtime_priority_hard": 0,
"rlimit_realtime_priority_soft": 0,
"rlimit_signals_pending_hard": 78994,
"rlimit_signals_pending_soft": 78994,
"signals_pending": 0,
"voluntary_context_switches": 42,
"write_bytes": 106496,
"write_count": 35,
},
time.Unix(0, 1517620624000000000),
),
},
err: nil,
},
}
}
func TestParser(t *testing.T) {
for _, tt := range parseTests(false) {
t.Run(tt.name, func(t *testing.T) {
parser := NewParser()
parser.SetTimeFunc(DefaultTime)
if tt.timeFunc != nil {
parser.SetTimeFunc(tt.timeFunc)
}
metrics, err := parser.Parse(tt.input)
if tt.err == nil {
require.NoError(t, err)
} else {
require.Equal(t, tt.err.Error(), err.Error())
}
require.Equal(t, len(tt.metrics), len(metrics))
for i, expected := range tt.metrics {
require.Equal(t, expected.Name(), metrics[i].Name())
require.Equal(t, expected.Tags(), metrics[i].Tags())
require.Equal(t, expected.Fields(), metrics[i].Fields())
require.Equal(t, expected.Time(), metrics[i].Time())
}
})
}
}
func BenchmarkParser(b *testing.B) {
for _, tt := range parseTests(false) {
b.Run(tt.name, func(b *testing.B) {
parser := NewParser()
for n := 0; n < b.N; n++ {
metrics, err := parser.Parse(tt.input)
_ = err
_ = metrics
}
})
}
}
func TestStreamParser(t *testing.T) {
for _, tt := range parseTests(true) {
t.Run(tt.name, func(t *testing.T) {
r := bytes.NewBuffer(tt.input)
parser := NewStreamParser(r)
parser.SetTimeFunc(DefaultTime)
if tt.timeFunc != nil {
parser.SetTimeFunc(tt.timeFunc)
}
var i int
for {
m, err := parser.Next()
if err != nil {
if err == ErrEOF {
break
}
require.Equal(t, tt.err.Error(), err.Error())
break
}
testutil.RequireMetricEqual(t, tt.metrics[i], m)
i++
}
})
}
}
func TestSeriesParser(t *testing.T) {
var tests = []struct {
name string
input []byte
timeFunc func() time.Time
metrics []telegraf.Metric
err error
}{
{
name: "empty",
input: []byte(""),
metrics: []telegraf.Metric{},
},
{
name: "minimal",
input: []byte("cpu"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{},
map[string]interface{}{},
time.Unix(0, 0),
),
},
},
{
name: "tags",
input: []byte("cpu,a=x,b=y"),
metrics: []telegraf.Metric{
metric.New(
"cpu",
map[string]string{
"a": "x",
"b": "y",
},
map[string]interface{}{},
time.Unix(0, 0),
),
},
},
{
name: "missing tag value",
input: []byte("cpu,a="),
metrics: []telegraf.Metric{},
err: &ParseError{
DecodeError: &lineprotocol.DecodeError{
Line: 1,
Column: 7,
Err: errors.New(`expected tag value after tag key "a", but none found`),
},
buf: "cpu,a=",
},
},
{
name: "error with carriage return in long line",
input: []byte("cpu,a=" + strings.Repeat("x", maxErrorBufferSize) + "\rcd,b"),
metrics: []telegraf.Metric{},
err: &ParseError{
DecodeError: &lineprotocol.DecodeError{
Line: 1,
Column: 1031,
Err: errors.New(`expected tag key or field but found '\r' instead`),
},
buf: "cpu,a=" + strings.Repeat("x", maxErrorBufferSize) + "\rcd,b",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
parser := NewSeriesParser()
if tt.timeFunc != nil {
parser.SetTimeFunc(tt.timeFunc)
}
metrics, err := parser.Parse(tt.input)
require.Equal(t, tt.err, err)
if err != nil {
require.Equal(t, tt.err.Error(), err.Error())
}
require.Equal(t, len(tt.metrics), len(metrics))
for i, expected := range tt.metrics {
require.Equal(t, expected.Name(), metrics[i].Name())
require.Equal(t, expected.Tags(), metrics[i].Tags())
}
})
}
}
func TestParserErrorString(t *testing.T) {
var ptests = []struct {
name string
input []byte
errString string
}{
{
name: "multiple line error",
input: []byte("cpu value=42\ncpu value=invalid\ncpu value=42"),
errString: `metric parse error: field value has unrecognized type at 2:11: "cpu value=invalid"`,
},
{
name: "handler error",
input: []byte("cpu value=9223372036854775808i\ncpu value=42"),
errString: `metric parse error: cannot parse value for field key "value": line-protocol value out of range at 1:11: "cpu value=9223372036854775808i"`,
},
{
name: "buffer too long",
input: []byte("cpu " + strings.Repeat("ab", maxErrorBufferSize) + "=invalid\ncpu value=42"),
errString: "metric parse error: field value has unrecognized type at 1:2054: \"...b" + strings.Repeat("ab", maxErrorBufferSize/2-1) + "=<-- here\"",
},
{
name: "multiple line error",
input: []byte("cpu value=42\ncpu value=invalid\ncpu value=42\ncpu value=invalid"),
errString: `metric parse error: field value has unrecognized type at 2:11: "cpu value=invalid"`,
},
}
for _, tt := range ptests {
t.Run(tt.name, func(t *testing.T) {
parser := NewParser()
_, err := parser.Parse(tt.input)
require.Equal(t, tt.errString, err.Error())
})
}
}
func TestStreamParserErrorString(t *testing.T) {
var ptests = []struct {
name string
input []byte
errs []string
}{
{
name: "multiple line error",
input: []byte("cpu value=42\ncpu value=invalid\ncpu value=42"),
errs: []string{
`metric parse error: field value has unrecognized type at 2:11`,
},
},
{
name: "handler error",
input: []byte("cpu value=9223372036854775808i\ncpu value=42"),
errs: []string{
`metric parse error: cannot parse value for field key "value": line-protocol value out of range at 1:11`,
},
},
{
name: "buffer too long",
input: []byte("cpu " + strings.Repeat("ab", maxErrorBufferSize) + "=invalid\ncpu value=42"),
errs: []string{
"metric parse error: field value has unrecognized type at 1:2054",
},
},
{
name: "multiple errors",
input: []byte("foo value=1asdf2.0\nfoo value=2.0\nfoo value=3asdf2.0\nfoo value=4.0"),
errs: []string{
`metric parse error: cannot parse value for field key "value": invalid float value syntax at 1:11`,
`metric parse error: cannot parse value for field key "value": invalid float value syntax at 3:11`,
},
},
}
for _, tt := range ptests {
t.Run(tt.name, func(t *testing.T) {
parser := NewStreamParser(bytes.NewBuffer(tt.input))
var errs []error
for i := 0; i < 20; i++ {
_, err := parser.Next()
if err == ErrEOF {
break
}
if err != nil {
errs = append(errs, err)
}
}
require.Equal(t, len(tt.errs), len(errs))
for i, err := range errs {
require.Equal(t, tt.errs[i], err.Error())
}
})
}
}
type MockReader struct {
ReadF func(p []byte) (int, error)
}
func (r *MockReader) Read(p []byte) (int, error) {
return r.ReadF(p)
}
// Errors from the Reader are returned from the Parser
func TestStreamParserReaderError(t *testing.T) {
readerErr := errors.New("error but not eof")
parser := NewStreamParser(&MockReader{
ReadF: func(p []byte) (int, error) {
return 0, readerErr
},
})
_, err := parser.Next()
require.Error(t, err)
require.Equal(t, err, readerErr)
_, err = parser.Next()
require.Equal(t, err, ErrEOF)
}
func TestStreamParserProducesAllAvailableMetrics(t *testing.T) {
r, w := io.Pipe()
parser := NewStreamParser(r)
parser.SetTimeFunc(DefaultTime)
go func() {
_, err := w.Write([]byte("metric value=1\nmetric2 value=1\n"))
require.NoError(t, err)
}()
_, err := parser.Next()
require.NoError(t, err)
// should not block on second read
_, err = parser.Next()
require.NoError(t, err)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/graphite" "github.com/influxdata/telegraf/plugins/parsers/graphite"
"github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/influx/influx_upstream"
"github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json"
"github.com/influxdata/telegraf/plugins/parsers/json_v2" "github.com/influxdata/telegraf/plugins/parsers/json_v2"
"github.com/influxdata/telegraf/plugins/parsers/logfmt" "github.com/influxdata/telegraf/plugins/parsers/logfmt"
@ -190,6 +191,9 @@ type Config struct {
// JSONPath configuration // JSONPath configuration
JSONV2Config []JSONV2Config `toml:"json_v2"` JSONV2Config []JSONV2Config `toml:"json_v2"`
// Influx configuration
InfluxParserType string `toml:"influx_parser_type"`
} }
type XPathConfig xpath.Config type XPathConfig xpath.Config
@ -222,7 +226,11 @@ func NewParser(config *Config) (Parser, error) {
parser, err = NewValueParser(config.MetricName, parser, err = NewValueParser(config.MetricName,
config.DataType, config.ValueFieldName, config.DefaultTags) config.DataType, config.ValueFieldName, config.DefaultTags)
case "influx": case "influx":
if config.InfluxParserType == "upstream" {
parser, err = NewInfluxUpstreamParser()
} else {
parser, err = NewInfluxParser() parser, err = NewInfluxParser()
}
case "nagios": case "nagios":
parser, err = NewNagiosParser() parser, err = NewNagiosParser()
case "graphite": case "graphite":
@ -323,6 +331,10 @@ func NewInfluxParser() (Parser, error) {
return influx.NewParser(handler), nil return influx.NewParser(handler), nil
} }
func NewInfluxUpstreamParser() (Parser, error) {
return influx_upstream.NewParser(), nil
}
func NewGraphiteParser( func NewGraphiteParser(
separator string, separator string,
templates []string, templates []string,