From be77fbab31e9f03a5e11a216e1c311b056472b24 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 7 Jul 2020 12:43:32 -0700 Subject: [PATCH] Support utf-16 in file and tail inputs (#7792) --- go.mod | 6 +- go.sum | 8 +- plugins/common/encoding/decoder.go | 36 ++ plugins/common/encoding/decoder_test.go | 78 ++++ plugins/inputs/file/file.go | 46 +- plugins/inputs/file/file_test.go | 160 +++++++ plugins/inputs/file/testdata/mtr-utf-16be.csv | Bin 0 -> 884 bytes plugins/inputs/file/testdata/mtr-utf-16le.csv | Bin 0 -> 884 bytes plugins/inputs/file/testdata/mtr-utf-8.csv | 5 + plugins/inputs/tail/README.md | 9 + plugins/inputs/tail/tail.go | 24 +- plugins/inputs/tail/tail_test.go | 243 ++++++---- .../inputs/tail/testdata/cpu-utf-16be.influx | Bin 0 -> 522 bytes .../inputs/tail/testdata/cpu-utf-16le.influx | Bin 0 -> 522 bytes plugins/inputs/tail/testdata/cpu-utf-8.influx | 5 + plugins/parsers/csv/parser.go | 87 +++- plugins/parsers/csv/parser_test.go | 428 ++++++++++++------ plugins/parsers/registry.go | 95 +--- 18 files changed, 886 insertions(+), 344 deletions(-) create mode 100644 plugins/common/encoding/decoder.go create mode 100644 plugins/common/encoding/decoder_test.go create mode 100644 plugins/inputs/file/testdata/mtr-utf-16be.csv create mode 100644 plugins/inputs/file/testdata/mtr-utf-16le.csv create mode 100644 plugins/inputs/file/testdata/mtr-utf-8.csv create mode 100644 plugins/inputs/tail/testdata/cpu-utf-16be.influx create mode 100644 plugins/inputs/tail/testdata/cpu-utf-16le.influx create mode 100644 plugins/inputs/tail/testdata/cpu-utf-8.influx diff --git a/go.mod b/go.mod index 8a4e23429..1f6bd6b82 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/influxdata/telegraf -go 1.12 +go 1.13 require ( cloud.google.com/go v0.53.0 @@ -38,6 +38,7 @@ require ( github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a // indirect github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4 github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/dimchansky/utfbom v1.1.0 github.com/docker/distribution v2.6.0-rc.1.0.20170726174610-edc3ab29cdff+incompatible // indirect github.com/docker/docker v1.4.2-0.20180327123150-ed7b6428c133 github.com/docker/go-connections v0.3.0 // indirect @@ -71,7 +72,7 @@ require ( github.com/hashicorp/memberlist v0.1.5 // indirect github.com/hashicorp/serf v0.8.1 // indirect github.com/influxdata/go-syslog/v2 v2.0.1 - github.com/influxdata/tail v1.0.1-0.20180327235535-c43482518d41 + github.com/influxdata/tail v1.0.1-0.20200707181643-03a791b270e4 github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65 github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect @@ -133,6 +134,7 @@ require ( golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 + golang.org/x/text v0.3.3 golang.org/x/tools v0.0.0-20200317043434-63da46f3035e // indirect golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4 gonum.org/v1/gonum v0.6.2 // indirect diff --git a/go.sum b/go.sum index 6f165c1fe..c4ff76cb3 100644 --- a/go.sum +++ b/go.sum @@ -335,8 +335,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/influxdata/go-syslog/v2 v2.0.1 h1:l44S4l4Q8MhGQcoOxJpbo+QQYxJqp0vdgIVHh4+DO0s= github.com/influxdata/go-syslog/v2 v2.0.1/go.mod h1:hjvie1UTaD5E1fTnDmxaCw8RRDrT4Ve+XHr5O2dKSCo= -github.com/influxdata/tail v1.0.1-0.20180327235535-c43482518d41 h1:HxQo1NpNXQDpvEBzthbQLmePvTLFTa5GzSFUjL03aEs= -github.com/influxdata/tail v1.0.1-0.20180327235535-c43482518d41/go.mod h1:xTFF2SILpIYc5N+Srb0d5qpx7d+f733nBrbasb13DtQ= +github.com/influxdata/tail v1.0.1-0.20200707181643-03a791b270e4 h1:K3A5vHPs/p8OjI4SL3l1+hs/98mhxTVDcV1Ap0c265E= +github.com/influxdata/tail v1.0.1-0.20200707181643-03a791b270e4/go.mod h1:VeiWgI3qaGdJWust2fP27a6J+koITo/1c/UhxeOxgaM= github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65 h1:vvyMtD5LTJc1W9sQKjDkAWdcg0478CszSdzlHtiAXCY= github.com/influxdata/toml v0.0.0-20190415235208-270119a8ce65/go.mod h1:zApaNFpP/bTpQItGZNNUMISDMDAnTXu9UqJ4yT3ocz8= github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 h1:W2IgzRCb0L9VzMujq/QuTaZUKcH8096jWwP519mHN6Q= @@ -720,6 +720,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -838,6 +840,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fatih/pool.v2 v2.0.0 h1:xIFeWtxifuQJGk/IEPKsTduEKcKvPmhoiVDGpC40nKg= gopkg.in/fatih/pool.v2 v2.0.0/go.mod h1:8xVGeu1/2jr2wm5V9SPuMht2H5AEmf5aFMGSQixtjTY= +gopkg.in/fsnotify.v1 v1.2.1/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gorethink/gorethink.v3 v3.0.5 h1:e2Uc/Xe+hpcVQFsj6MuHlYog3r0JYpnTzwDj/y2O4MU= @@ -861,6 +864,7 @@ gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/olivere/elastic.v5 v5.0.70 h1:DqFG2Odzs74JCz6SssgJjd6qpGnsOAzNc7+l5EnvsnE= gopkg.in/olivere/elastic.v5 v5.0.70/go.mod h1:FylZT6jQWtfHsicejzOm3jIMVPOAksa80i3o+6qtQRk= +gopkg.in/tomb.v1 v1.0.0-20140529071818-c131134a1947/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/plugins/common/encoding/decoder.go b/plugins/common/encoding/decoder.go new file mode 100644 index 000000000..d1c282d2c --- /dev/null +++ b/plugins/common/encoding/decoder.go @@ -0,0 +1,36 @@ +package encoding + +import ( + "errors" + + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/unicode" +) + +type Decoder = encoding.Decoder + +// NewDecoder returns a x/text Decoder for the specified text encoding. The +// Decoder converts a character encoding into utf-8 bytes. If a BOM is found +// it will be converted into a utf-8 BOM, you can use +// github.com/dimchansky/utfbom to strip the BOM. +// +// The "none" or "" encoding will pass through bytes unchecked. Use the utf-8 +// encoding if you want invalid bytes replaced using the the unicode +// replacement character. +// +// Detection of utf-16 endianness using the BOM is not currently provided due +// to the tail input plugins requirement to be able to start at the middle or +// end of the file. +func NewDecoder(enc string) (*Decoder, error) { + switch enc { + case "utf-8": + return unicode.UTF8.NewDecoder(), nil + case "utf-16le": + return unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewDecoder(), nil + case "utf-16be": + return unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM).NewDecoder(), nil + case "none", "": + return encoding.Nop.NewDecoder(), nil + } + return nil, errors.New("unknown character encoding") +} diff --git a/plugins/common/encoding/decoder_test.go b/plugins/common/encoding/decoder_test.go new file mode 100644 index 000000000..87115318a --- /dev/null +++ b/plugins/common/encoding/decoder_test.go @@ -0,0 +1,78 @@ +package encoding + +import ( + "bytes" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDecoder(t *testing.T) { + tests := []struct { + name string + encoding string + input []byte + expected []byte + expectedErr bool + }{ + { + name: "no decoder utf-8", + encoding: "", + input: []byte("howdy"), + expected: []byte("howdy"), + }, + { + name: "utf-8 decoder", + encoding: "utf-8", + input: []byte("howdy"), + expected: []byte("howdy"), + }, + { + name: "utf-8 decoder invalid bytes replaced with replacement char", + encoding: "utf-8", + input: []byte("\xff\xfe"), + expected: []byte("\uFFFD\uFFFD"), + }, + { + name: "utf-16le decoder no BOM", + encoding: "utf-16le", + input: []byte("h\x00o\x00w\x00d\x00y\x00"), + expected: []byte("howdy"), + }, + { + name: "utf-16le decoder with BOM", + encoding: "utf-16le", + input: []byte("\xff\xfeh\x00o\x00w\x00d\x00y\x00"), + expected: []byte("\xef\xbb\xbfhowdy"), + }, + { + name: "utf-16be decoder no BOM", + encoding: "utf-16be", + input: []byte("\x00h\x00o\x00w\x00d\x00y"), + expected: []byte("howdy"), + }, + { + name: "utf-16be decoder with BOM", + encoding: "utf-16be", + input: []byte("\xfe\xff\x00h\x00o\x00w\x00d\x00y"), + expected: []byte("\xef\xbb\xbfhowdy"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + decoder, err := NewDecoder(tt.encoding) + require.NoError(t, err) + buf := bytes.NewBuffer(tt.input) + r := decoder.Reader(buf) + actual, err := ioutil.ReadAll(r) + if tt.expectedErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tt.expected, actual) + }) + } +} diff --git a/plugins/inputs/file/file.go b/plugins/inputs/file/file.go index fe2a840fa..e431bc6df 100644 --- a/plugins/inputs/file/file.go +++ b/plugins/inputs/file/file.go @@ -3,20 +3,25 @@ package file import ( "fmt" "io/ioutil" + "os" "path/filepath" + "github.com/dimchansky/utfbom" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/globpath" + "github.com/influxdata/telegraf/plugins/common/encoding" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" ) type File struct { - Files []string `toml:"files"` - FileTag string `toml:"file_tag"` - parser parsers.Parser + Files []string `toml:"files"` + FileTag string `toml:"file_tag"` + CharacterEncoding string `toml:"character_encoding"` + parser parsers.Parser filenames []string + decoder *encoding.Decoder } const sampleConfig = ` @@ -24,15 +29,24 @@ const sampleConfig = ` ## as well as ** to match recursive files and directories. files = ["/tmp/metrics.out"] + ## Name a tag containing the name of the file the data was parsed from. Leave empty + ## to disable. + # file_tag = "" + + ## Character encoding to use when interpreting the file contents. Invalid + ## characters are replaced using the unicode replacement character. When set + ## to the empty string the data is not decoded to text. + ## ex: character_encoding = "utf-8" + ## character_encoding = "utf-16le" + ## character_encoding = "utf-16be" + ## character_encoding = "" + # character_encoding = "" + ## The dataformat to be read from files ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" - - ## Name a tag containing the name of the file the data was parsed from. Leave empty - ## to disable. - # file_tag = "" ` // SampleConfig returns the default configuration of the Input @@ -44,6 +58,12 @@ func (f *File) Description() string { return "Parse a complete file each interval" } +func (f *File) Init() error { + var err error + f.decoder, err = encoding.NewDecoder(f.CharacterEncoding) + return err +} + func (f *File) Gather(acc telegraf.Accumulator) error { err := f.refreshFilePaths() if err != nil { @@ -59,7 +79,7 @@ func (f *File) Gather(acc telegraf.Accumulator) error { if f.FileTag != "" { m.AddTag(f.FileTag, filepath.Base(k)) } - acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + acc.AddMetric(m) } } return nil @@ -88,12 +108,18 @@ func (f *File) refreshFilePaths() error { } func (f *File) readMetric(filename string) ([]telegraf.Metric, error) { - fileContents, err := ioutil.ReadFile(filename) + file, err := os.Open(filename) + if err != nil { + return nil, err + } + defer file.Close() + + r, _ := utfbom.Skip(f.decoder.Reader(file)) + fileContents, err := ioutil.ReadAll(r) if err != nil { return nil, fmt.Errorf("E! Error file: %v could not be read, %s", filename, err) } return f.parser.Parse(fileContents) - } func init() { diff --git a/plugins/inputs/file/file_test.go b/plugins/inputs/file/file_test.go index 19341fc08..427ff25d8 100644 --- a/plugins/inputs/file/file_test.go +++ b/plugins/inputs/file/file_test.go @@ -4,8 +4,11 @@ import ( "os" "path/filepath" "testing" + "time" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,6 +19,8 @@ func TestRefreshFilePaths(t *testing.T) { r := File{ Files: []string{filepath.Join(wd, "dev/testfiles/**.log")}, } + err = r.Init() + require.NoError(t, err) err = r.refreshFilePaths() require.NoError(t, err) @@ -30,6 +35,8 @@ func TestFileTag(t *testing.T) { Files: []string{filepath.Join(wd, "dev/testfiles/json_a.log")}, FileTag: "filename", } + err = r.Init() + require.NoError(t, err) parserConfig := parsers.Config{ DataFormat: "json", @@ -55,6 +62,8 @@ func TestJSONParserCompile(t *testing.T) { r := File{ Files: []string{filepath.Join(wd, "dev/testfiles/json_a.log")}, } + err := r.Init() + require.NoError(t, err) parserConfig := parsers.Config{ DataFormat: "json", TagKeys: []string{"parent_ignored_child"}, @@ -74,6 +83,8 @@ func TestGrokParser(t *testing.T) { r := File{ Files: []string{filepath.Join(wd, "dev/testfiles/grok_a.log")}, } + err := r.Init() + require.NoError(t, err) parserConfig := parsers.Config{ DataFormat: "grok", @@ -87,3 +98,152 @@ func TestGrokParser(t *testing.T) { err = r.Gather(&acc) assert.Equal(t, len(acc.Metrics), 2) } + +func TestCharacterEncoding(t *testing.T) { + expected := []telegraf.Metric{ + testutil.MustMetric("file", + map[string]string{ + "dest": "example.org", + "hop": "1", + "ip": "12.122.114.5", + }, + map[string]interface{}{ + "avg": 21.55, + "best": 19.34, + "loss": 0.0, + "snt": 10, + "status": "OK", + "stdev": 2.05, + "worst": 26.83, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("file", + map[string]string{ + "dest": "example.org", + "hop": "2", + "ip": "192.205.32.238", + }, + map[string]interface{}{ + "avg": 25.11, + "best": 20.8, + "loss": 0.0, + "snt": 10, + "status": "OK", + "stdev": 6.03, + "worst": 38.85, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("file", + map[string]string{ + "dest": "example.org", + "hop": "3", + "ip": "152.195.85.133", + }, + map[string]interface{}{ + "avg": 20.18, + "best": 19.75, + "loss": 0.0, + "snt": 10, + "status": "OK", + "stdev": 0.0, + "worst": 20.78, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("file", + map[string]string{ + "dest": "example.org", + "hop": "4", + "ip": "93.184.216.34", + }, + map[string]interface{}{ + "avg": 24.02, + "best": 19.75, + "loss": 0.0, + "snt": 10, + "status": "OK", + "stdev": 4.67, + "worst": 32.41, + }, + time.Unix(0, 0), + ), + } + + tests := []struct { + name string + plugin *File + csv *csv.Config + file string + }{ + { + name: "empty character_encoding with utf-8", + plugin: &File{ + Files: []string{"testdata/mtr-utf-8.csv"}, + CharacterEncoding: "", + }, + csv: &csv.Config{ + MetricName: "file", + SkipRows: 1, + ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, + TagColumns: []string{"dest", "hop", "ip"}, + }, + }, + { + name: "utf-8 character_encoding with utf-8", + plugin: &File{ + Files: []string{"testdata/mtr-utf-8.csv"}, + CharacterEncoding: "utf-8", + }, + csv: &csv.Config{ + MetricName: "file", + SkipRows: 1, + ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, + TagColumns: []string{"dest", "hop", "ip"}, + }, + }, + { + name: "utf-16le character_encoding with utf-16le", + plugin: &File{ + Files: []string{"testdata/mtr-utf-16le.csv"}, + CharacterEncoding: "utf-16le", + }, + csv: &csv.Config{ + MetricName: "file", + SkipRows: 1, + ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, + TagColumns: []string{"dest", "hop", "ip"}, + }, + }, + { + name: "utf-16be character_encoding with utf-16be", + plugin: &File{ + Files: []string{"testdata/mtr-utf-16be.csv"}, + CharacterEncoding: "utf-16be", + }, + csv: &csv.Config{ + MetricName: "file", + SkipRows: 1, + ColumnNames: []string{"", "", "status", "dest", "hop", "ip", "loss", "snt", "", "", "avg", "best", "worst", "stdev"}, + TagColumns: []string{"dest", "hop", "ip"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.plugin.Init() + require.NoError(t, err) + + parser, err := csv.NewParser(tt.csv) + require.NoError(t, err) + tt.plugin.SetParser(parser) + + var acc testutil.Accumulator + err = tt.plugin.Gather(&acc) + require.NoError(t, err) + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) + }) + } +} diff --git a/plugins/inputs/file/testdata/mtr-utf-16be.csv b/plugins/inputs/file/testdata/mtr-utf-16be.csv new file mode 100644 index 0000000000000000000000000000000000000000..c35596aa031fcd83333e36d7add5d4cf86da30a6 GIT binary patch literal 884 zcmb_a%Sr=55Ug{)A{TGVGBf+kQOO}eMKB>ALk>X%Vy_sm+HVZ+a6x-P zHhGK<-+cRD;)q8gS7ddD2a7IGMV=Br(jHOY68Fdv__b(=MO2ueK+V+sJQSz*KBksT@ZteOKZAGkq?aF*3W9N)m8eW@^;)^rT zV<^RmoQfshWXy`{e+6}&SM46HRU}c3=P7kwKV_!Z%1oI{e;_iHYE?r0@d{5&fg(vq zVXDs6+H2aFB3BwM!&AKZllf4p)k{s3RJJx1>b`2@eg0pwCMhs$?^({dRCT=@CAl(V I)x3_r0L|lWApigX literal 0 HcmV?d00001 diff --git a/plugins/inputs/file/testdata/mtr-utf-16le.csv b/plugins/inputs/file/testdata/mtr-utf-16le.csv new file mode 100644 index 0000000000000000000000000000000000000000..d82ea30719131e0040068afcf433dc622640f9bb GIT binary patch literal 884 zcmb_a%Sr=55Ug{)!i%?MnVEg&sCZCJ5E3y*$svS*QOT~suQyh8M?nnavMf8(Thm)r z-93k|DSGVij3=zPci7+!ugEYbz9iD~?2+h}=RatV4`yb#Wp2j|kKVc8LF>fu4wsDQ zWK+h4;mc$G74}#YxhAVKEKFUQQobR+XFS4`>NDgBd|NccA}WkgpytyMkBl=s;GXLW zpLD&YixQ)T*Fm8o+14@8D?txjk@-r=!1kdk~9 z=IT_hy{C;ia;MQM{Es((Dj&+VW~mz`m#y6jO<%q7IiL5en-o~J&n)M?RCj$ECAl(V I)qNe|8#)$lApigX literal 0 HcmV?d00001 diff --git a/plugins/inputs/file/testdata/mtr-utf-8.csv b/plugins/inputs/file/testdata/mtr-utf-8.csv new file mode 100644 index 000000000..f5db3cc1b --- /dev/null +++ b/plugins/inputs/file/testdata/mtr-utf-8.csv @@ -0,0 +1,5 @@ +Mtr_Version,Start_Time,Status,Host,Hop,Ip,Loss%,Snt, ,Last,Avg,Best,Wrst,StDev, +MTR.0.87,1593667013,OK,example.org,1,12.122.114.5,0.00,10,0,21.86,21.55,19.34,26.83,2.05 +MTR.0.87,1593667013,OK,example.org,2,192.205.32.238,0.00,10,0,32.83,25.11,20.80,38.85,6.03 +MTR.0.87,1593667013,OK,example.org,3,152.195.85.133,0.00,10,0,19.75,20.18,19.75,20.78,0.00 +MTR.0.87,1593667013,OK,example.org,4,93.184.216.34,0.00,10,0,19.75,24.02,19.75,32.41,4.67 diff --git a/plugins/inputs/tail/README.md b/plugins/inputs/tail/README.md index e9f9cc8cb..1be8a5e93 100644 --- a/plugins/inputs/tail/README.md +++ b/plugins/inputs/tail/README.md @@ -48,6 +48,15 @@ The plugin expects messages in one of the ## line and the size of the output's metric_batch_size. # max_undelivered_lines = 1000 + ## Character encoding to use when interpreting the file contents. Invalid + ## characters are replaced using the unicode replacement character. When set + ## to the empty string the data is not decoded to text. + ## ex: character_encoding = "utf-8" + ## character_encoding = "utf-16le" + ## character_encoding = "utf-16be" + ## character_encoding = "" + # character_encoding = "" + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 02d35c95b..70dc09e98 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -5,12 +5,15 @@ package tail import ( "context" "errors" + "io" "strings" "sync" + "github.com/dimchansky/utfbom" "github.com/influxdata/tail" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/globpath" + "github.com/influxdata/telegraf/plugins/common/encoding" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/csv" @@ -35,6 +38,7 @@ type Tail struct { Pipe bool `toml:"pipe"` WatchMethod string `toml:"watch_method"` MaxUndeliveredLines int `toml:"max_undelivered_lines"` + CharacterEncoding string `toml:"character_encoding"` Log telegraf.Logger `toml:"-"` tailers map[string]*tail.Tail @@ -45,6 +49,7 @@ type Tail struct { cancel context.CancelFunc acc telegraf.TrackingAccumulator sem semaphore + decoder *encoding.Decoder } func NewTail() *Tail { @@ -88,6 +93,15 @@ const sampleConfig = ` ## line and the size of the output's metric_batch_size. # max_undelivered_lines = 1000 + ## Character encoding to use when interpreting the file contents. Invalid + ## characters are replaced using the unicode replacement character. When set + ## to the empty string the data is not decoded to text. + ## ex: character_encoding = "utf-8" + ## character_encoding = "utf-16le" + ## character_encoding = "utf-16be" + ## character_encoding = "" + # character_encoding = "" + ## Data format to consume. ## Each data format has its own unique set of configuration options, read ## more about them here: @@ -108,7 +122,10 @@ func (t *Tail) Init() error { return errors.New("max_undelivered_lines must be positive") } t.sem = make(semaphore, t.MaxUndeliveredLines) - return nil + + var err error + t.decoder, err = encoding.NewDecoder(t.CharacterEncoding) + return err } func (t *Tail) Gather(acc telegraf.Accumulator) error { @@ -190,6 +207,10 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { Poll: poll, Pipe: t.Pipe, Logger: tail.DiscardingLogger, + OpenReaderFunc: func(rd io.Reader) io.Reader { + r, _ := utfbom.Skip(t.decoder.Reader(rd)) + return r + }, }) if err != nil { t.Log.Debugf("Failed to open file (%s): %v", file, err) @@ -201,6 +222,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { parser, err := t.parserFunc() if err != nil { t.Log.Errorf("Creating parser: %s", err.Error()) + continue } // create a goroutine for each "tailer" diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index e0b351f45..fb5bc17c1 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -5,108 +5,32 @@ import ( "io/ioutil" "log" "os" - "runtime" "testing" "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/csv" + "github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestTailFromBeginning(t *testing.T) { - if os.Getenv("CIRCLE_PROJECT_REPONAME") != "" { - t.Skip("Skipping CI testing due to race conditions") - } - - tmpfile, err := ioutil.TempFile("", "") - require.NoError(t, err) - defer os.Remove(tmpfile.Name()) - defer tmpfile.Close() - _, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n") - require.NoError(t, err) - - tt := NewTail() - tt.Log = testutil.Logger{} - tt.FromBeginning = true - tt.Files = []string{tmpfile.Name()} - tt.SetParserFunc(parsers.NewInfluxParser) - - err = tt.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} - require.NoError(t, tt.Start(&acc)) - defer tt.Stop() - require.NoError(t, acc.GatherError(tt.Gather)) - - acc.Wait(1) - acc.AssertContainsTaggedFields(t, "cpu", - map[string]interface{}{ - "usage_idle": float64(100), - }, - map[string]string{ - "mytag": "foo", - "path": tmpfile.Name(), - }) -} - -func TestTailFromEnd(t *testing.T) { - if os.Getenv("CIRCLE_PROJECT_REPONAME") != "" { - t.Skip("Skipping CI testing due to race conditions") - } - - tmpfile, err := ioutil.TempFile("", "") - require.NoError(t, err) - defer os.Remove(tmpfile.Name()) - defer tmpfile.Close() - _, err = tmpfile.WriteString("cpu,mytag=foo usage_idle=100\n") - require.NoError(t, err) - - tt := NewTail() - tt.Log = testutil.Logger{} - tt.Files = []string{tmpfile.Name()} - tt.SetParserFunc(parsers.NewInfluxParser) - - err = tt.Init() - require.NoError(t, err) - - acc := testutil.Accumulator{} - require.NoError(t, tt.Start(&acc)) - defer tt.Stop() - for _, tailer := range tt.tailers { - for n, err := tailer.Tell(); err == nil && n == 0; n, err = tailer.Tell() { - // wait for tailer to jump to end - runtime.Gosched() - } - } - - _, err = tmpfile.WriteString("cpu,othertag=foo usage_idle=100\n") - require.NoError(t, err) - require.NoError(t, acc.GatherError(tt.Gather)) - - acc.Wait(1) - acc.AssertContainsTaggedFields(t, "cpu", - map[string]interface{}{ - "usage_idle": float64(100), - }, - map[string]string{ - "othertag": "foo", - "path": tmpfile.Name(), - }) - assert.Len(t, acc.Metrics, 1) -} - func TestTailBadLine(t *testing.T) { tmpfile, err := ioutil.TempFile("", "") require.NoError(t, err) defer os.Remove(tmpfile.Name()) defer tmpfile.Close() + _, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n") + require.NoError(t, err) + + // Write good metric so we can detect when processing is complete + _, err = tmpfile.WriteString("cpu usage_idle=100\n") + require.NoError(t, err) + tt := NewTail() tt.Log = testutil.Logger{} tt.FromBeginning = true @@ -124,10 +48,8 @@ func TestTailBadLine(t *testing.T) { require.NoError(t, acc.GatherError(tt.Gather)) - _, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n") - require.NoError(t, err) + acc.Wait(1) - time.Sleep(500 * time.Millisecond) tt.Stop() assert.Contains(t, buf.String(), "Malformed log line") } @@ -186,11 +108,11 @@ cpu,42 plugin.FromBeginning = true plugin.Files = []string{tmpfile.Name()} plugin.SetParserFunc(func() (parsers.Parser, error) { - return &csv.Parser{ + return csv.NewParser(&csv.Config{ MeasurementColumn: "measurement", HeaderRowCount: 1, TimeFunc: func() time.Time { return time.Unix(0, 0) }, - }, nil + }) }) err = plugin.Init() @@ -284,3 +206,146 @@ func TestMultipleMetricsOnFirstLine(t *testing.T) { testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) } + +func TestCharacterEncoding(t *testing.T) { + full := []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "usage_active": 11.9, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{ + "cpu": "cpu1", + }, + map[string]interface{}{ + "usage_active": 26.0, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{ + "cpu": "cpu2", + }, + map[string]interface{}{ + "usage_active": 14.0, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{ + "cpu": "cpu3", + }, + map[string]interface{}{ + "usage_active": 20.4, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{ + "cpu": "cpu-total", + }, + map[string]interface{}{ + "usage_active": 18.4, + }, + time.Unix(0, 0), + ), + } + + tests := []struct { + name string + plugin *Tail + offset int64 + expected []telegraf.Metric + }{ + { + name: "utf-8", + plugin: &Tail{ + Files: []string{"testdata/cpu-utf-8.influx"}, + FromBeginning: true, + MaxUndeliveredLines: 1000, + Log: testutil.Logger{}, + CharacterEncoding: "utf-8", + }, + expected: full, + }, + { + name: "utf-8 seek", + plugin: &Tail{ + Files: []string{"testdata/cpu-utf-8.influx"}, + MaxUndeliveredLines: 1000, + Log: testutil.Logger{}, + CharacterEncoding: "utf-8", + }, + offset: 0x33, + expected: full[1:], + }, + { + name: "utf-16le", + plugin: &Tail{ + Files: []string{"testdata/cpu-utf-16le.influx"}, + FromBeginning: true, + MaxUndeliveredLines: 1000, + Log: testutil.Logger{}, + CharacterEncoding: "utf-16le", + }, + expected: full, + }, + { + name: "utf-16le seek", + plugin: &Tail{ + Files: []string{"testdata/cpu-utf-16le.influx"}, + MaxUndeliveredLines: 1000, + Log: testutil.Logger{}, + CharacterEncoding: "utf-16le", + }, + offset: 0x68, + expected: full[1:], + }, + { + name: "utf-16be", + plugin: &Tail{ + Files: []string{"testdata/cpu-utf-16be.influx"}, + FromBeginning: true, + MaxUndeliveredLines: 1000, + Log: testutil.Logger{}, + CharacterEncoding: "utf-16be", + }, + expected: full, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.plugin.SetParserFunc(func() (parsers.Parser, error) { + handler := influx.NewMetricHandler() + return influx.NewParser(handler), nil + }) + + if tt.offset != 0 { + tt.plugin.offsets = map[string]int64{ + tt.plugin.Files[0]: tt.offset, + } + } + + err := tt.plugin.Init() + require.NoError(t, err) + + var acc testutil.Accumulator + err = tt.plugin.Start(&acc) + require.NoError(t, err) + acc.Wait(len(tt.expected)) + tt.plugin.Stop() + + actual := acc.GetTelegrafMetrics() + for _, m := range actual { + m.RemoveTag("path") + } + + testutil.RequireMetricsEqual(t, tt.expected, actual, testutil.IgnoreTime()) + }) + } +} diff --git a/plugins/inputs/tail/testdata/cpu-utf-16be.influx b/plugins/inputs/tail/testdata/cpu-utf-16be.influx new file mode 100644 index 0000000000000000000000000000000000000000..2ac4bb73af45294457f941306ef120e3dd376186 GIT binary patch literal 522 zcmb`EK?;LF3`O6%r|1FfFd1vD3(q1Vr4$4UtsdX@3+}{$G9@IRguweD^M{TnBiX9; zs-B~!FIw*0Xrvu)9!%Twd literal 0 HcmV?d00001 diff --git a/plugins/inputs/tail/testdata/cpu-utf-16le.influx b/plugins/inputs/tail/testdata/cpu-utf-16le.influx new file mode 100644 index 0000000000000000000000000000000000000000..0f78471507dec32a11f7b98cd1e9a5c39da35327 GIT binary patch literal 522 zcmb`E(F%f442Hk!o}vdZIXhO$O4{(^!Kf@?vXoLiMlL<$xGH88%H75e%hWZ+q7=_XlzEeV6d$FS_WlPY Zi&DJqNJZLv%Sm5o 1 { + return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", c.Delimiter) + } + } + + if c.Comment != "" { + runeStr := []rune(c.Comment) + if len(runeStr) > 1 { + return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", c.Comment) + } + } + + if len(c.ColumnNames) > 0 && len(c.ColumnTypes) > 0 && len(c.ColumnNames) != len(c.ColumnTypes) { + return nil, fmt.Errorf("csv_column_names field count doesn't match with csv_column_types") + } + + if c.TimeFunc == nil { + c.TimeFunc = time.Now + } + + return &Parser{Config: c}, nil } func (p *Parser) SetTimeFunc(fn TimeFunc) { p.TimeFunc = fn } -func (p *Parser) compile(r *bytes.Reader) (*csv.Reader, error) { +func (p *Parser) compile(r io.Reader) (*csv.Reader, error) { csvReader := csv.NewReader(r) // ensures that the reader reads records of different lengths without an error csvReader.FieldsPerRecord = -1 @@ -60,7 +97,10 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { } // skip first rows for i := 0; i < p.SkipRows; i++ { - csvReader.Read() + _, err := csvReader.Read() + if err != nil { + return nil, err + } } // if there is a header and nothing in DataColumns // set DataColumns to names extracted from the header @@ -88,7 +128,10 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { } else { // if columns are named, just skip header rows for i := 0; i < p.HeaderRowCount; i++ { - csvReader.Read() + _, err := csvReader.Read() + if err != nil { + return nil, err + } } } @@ -208,8 +251,10 @@ outer: // will default to plugin name measurementName := p.MetricName - if recordFields[p.MeasurementColumn] != nil && recordFields[p.MeasurementColumn] != "" { - measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn]) + if p.MeasurementColumn != "" { + if recordFields[p.MeasurementColumn] != nil && recordFields[p.MeasurementColumn] != "" { + measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn]) + } } metricTime, err := parseTimestamp(p.TimeFunc, recordFields, p.TimestampColumn, p.TimestampFormat, p.Timezone) diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index 28c8ef451..c0f489365 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -16,22 +16,28 @@ var DefaultTime = func() time.Time { } func TestBasicCSV(t *testing.T) { - p := Parser{ - ColumnNames: []string{"first", "second", "third"}, - TagColumns: []string{"third"}, - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + ColumnNames: []string{"first", "second", "third"}, + TagColumns: []string{"third"}, + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) - _, err := p.ParseLine("1.4,true,hi") + _, err = p.ParseLine("1.4,true,hi") require.NoError(t, err) } func TestHeaderConcatenationCSV(t *testing.T) { - p := Parser{ - HeaderRowCount: 2, - MeasurementColumn: "3", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 2, + MeasurementColumn: "3", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `first,second 1,2,3 3.4,70,test_name` @@ -42,12 +48,15 @@ func TestHeaderConcatenationCSV(t *testing.T) { } func TestHeaderOverride(t *testing.T) { - p := Parser{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `line1,line2,line3 3.4,70,test_name` metrics, err := p.Parse([]byte(testCSV)) @@ -56,14 +65,16 @@ func TestHeaderOverride(t *testing.T) { } func TestTimestamp(t *testing.T) { - p := Parser{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimestampFormat: "02/01/06 03:04:05 PM", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "02/01/06 03:04:05 PM", + TimeFunc: DefaultTime, + }, + ) testCSV := `line1,line2,line3 23/05/09 04:05:06 PM,70,test_name 07/11/09 04:05:06 PM,80,test_name2` @@ -75,29 +86,35 @@ func TestTimestamp(t *testing.T) { } func TestTimestampError(t *testing.T) { - p := Parser{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `line1,line2,line3 23/05/09 04:05:06 PM,70,test_name 07/11/09 04:05:06 PM,80,test_name2` - _, err := p.Parse([]byte(testCSV)) + _, err = p.Parse([]byte(testCSV)) require.Equal(t, fmt.Errorf("timestamp format must be specified"), err) } func TestTimestampUnixFormat(t *testing.T) { - p := Parser{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimestampFormat: "unix", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "unix", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `line1,line2,line3 1243094706,70,test_name 1257609906,80,test_name2` @@ -108,14 +125,17 @@ func TestTimestampUnixFormat(t *testing.T) { } func TestTimestampUnixMSFormat(t *testing.T) { - p := Parser{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimestampFormat: "unix_ms", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "unix_ms", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `line1,line2,line3 1243094706123,70,test_name 1257609906123,80,test_name2` @@ -126,12 +146,15 @@ func TestTimestampUnixMSFormat(t *testing.T) { } func TestQuotedCharacter(t *testing.T) { - p := Parser{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `line1,line2,line3 "3,4",70,test_name` @@ -141,13 +164,16 @@ func TestQuotedCharacter(t *testing.T) { } func TestDelimiter(t *testing.T) { - p := Parser{ - HeaderRowCount: 1, - Delimiter: "%", - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 1, + Delimiter: "%", + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `line1%line2%line3 3,4%70%test_name` @@ -157,13 +183,16 @@ func TestDelimiter(t *testing.T) { } func TestValueConversion(t *testing.T) { - p := Parser{ - HeaderRowCount: 0, - Delimiter: ",", - ColumnNames: []string{"first", "second", "third", "fourth"}, - MetricName: "test_value", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 0, + Delimiter: ",", + ColumnNames: []string{"first", "second", "third", "fourth"}, + MetricName: "test_value", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `3.3,4,true,hello` expectedTags := make(map[string]string) @@ -199,13 +228,16 @@ func TestValueConversion(t *testing.T) { } func TestSkipComment(t *testing.T) { - p := Parser{ - HeaderRowCount: 0, - Comment: "#", - ColumnNames: []string{"first", "second", "third", "fourth"}, - MetricName: "test_value", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 0, + Comment: "#", + ColumnNames: []string{"first", "second", "third", "fourth"}, + MetricName: "test_value", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `#3.3,4,true,hello 4,9.9,true,name_this` @@ -222,13 +254,16 @@ func TestSkipComment(t *testing.T) { } func TestTrimSpace(t *testing.T) { - p := Parser{ - HeaderRowCount: 0, - TrimSpace: true, - ColumnNames: []string{"first", "second", "third", "fourth"}, - MetricName: "test_value", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 0, + TrimSpace: true, + ColumnNames: []string{"first", "second", "third", "fourth"}, + MetricName: "test_value", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := ` 3.3, 4, true,hello` expectedFields := map[string]interface{}{ @@ -244,12 +279,15 @@ func TestTrimSpace(t *testing.T) { } func TestTrimSpaceDelimitedBySpace(t *testing.T) { - p := Parser{ - Delimiter: " ", - HeaderRowCount: 1, - TrimSpace: true, - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + Delimiter: " ", + HeaderRowCount: 1, + TrimSpace: true, + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := ` first second third fourth abcdefgh 0 2 false abcdef 3.3 4 true @@ -268,13 +306,15 @@ abcdefgh 0 2 false } func TestSkipRows(t *testing.T) { - p := Parser{ - HeaderRowCount: 1, - SkipRows: 1, - TagColumns: []string{"line1"}, - MeasurementColumn: "line3", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + HeaderRowCount: 1, + SkipRows: 1, + TagColumns: []string{"line1"}, + MeasurementColumn: "line3", + TimeFunc: DefaultTime, + }, + ) testCSV := `garbage nonsense line1,line2,line3 hello,80,test_name2` @@ -289,11 +329,14 @@ hello,80,test_name2` } func TestSkipColumns(t *testing.T) { - p := Parser{ - SkipColumns: 1, - ColumnNames: []string{"line1", "line2"}, - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + SkipColumns: 1, + ColumnNames: []string{"line1", "line2"}, + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `hello,80,test_name` expectedFields := map[string]interface{}{ @@ -306,11 +349,14 @@ func TestSkipColumns(t *testing.T) { } func TestSkipColumnsWithHeader(t *testing.T) { - p := Parser{ - SkipColumns: 1, - HeaderRowCount: 2, - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + SkipColumns: 1, + HeaderRowCount: 2, + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) testCSV := `col,col,col 1,2,3 trash,80,test_name` @@ -322,11 +368,14 @@ func TestSkipColumnsWithHeader(t *testing.T) { } func TestParseStream(t *testing.T) { - p := Parser{ - MetricName: "csv", - HeaderRowCount: 1, - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + MetricName: "csv", + HeaderRowCount: 1, + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) csvHeader := "a,b,c" csvBody := "1,2,3" @@ -349,13 +398,16 @@ func TestParseStream(t *testing.T) { } func TestTimestampUnixFloatPrecision(t *testing.T) { - p := Parser{ - MetricName: "csv", - ColumnNames: []string{"time", "value"}, - TimestampColumn: "time", - TimestampFormat: "unix", - TimeFunc: DefaultTime, - } + p, err := NewParser( + &Config{ + MetricName: "csv", + ColumnNames: []string{"time", "value"}, + TimestampColumn: "time", + TimestampFormat: "unix", + TimeFunc: DefaultTime, + }, + ) + require.NoError(t, err) data := `1551129661.95456123352050781250,42` expected := []telegraf.Metric{ @@ -375,14 +427,17 @@ func TestTimestampUnixFloatPrecision(t *testing.T) { } func TestSkipMeasurementColumn(t *testing.T) { - p := Parser{ - MetricName: "csv", - HeaderRowCount: 1, - TimestampColumn: "timestamp", - TimestampFormat: "unix", - TimeFunc: DefaultTime, - TrimSpace: true, - } + p, err := NewParser( + &Config{ + MetricName: "csv", + HeaderRowCount: 1, + TimestampColumn: "timestamp", + TimestampFormat: "unix", + TimeFunc: DefaultTime, + TrimSpace: true, + }, + ) + require.NoError(t, err) data := `id,value,timestamp 1,5,1551129661.954561233` @@ -404,14 +459,17 @@ func TestSkipMeasurementColumn(t *testing.T) { } func TestSkipTimestampColumn(t *testing.T) { - p := Parser{ - MetricName: "csv", - HeaderRowCount: 1, - TimestampColumn: "timestamp", - TimestampFormat: "unix", - TimeFunc: DefaultTime, - TrimSpace: true, - } + p, err := NewParser( + &Config{ + MetricName: "csv", + HeaderRowCount: 1, + TimestampColumn: "timestamp", + TimestampFormat: "unix", + TimeFunc: DefaultTime, + TrimSpace: true, + }, + ) + require.NoError(t, err) data := `id,value,timestamp 1,5,1551129661.954561233` @@ -433,15 +491,18 @@ func TestSkipTimestampColumn(t *testing.T) { } func TestTimestampTimezone(t *testing.T) { - p := Parser{ - HeaderRowCount: 1, - ColumnNames: []string{"first", "second", "third"}, - MeasurementColumn: "third", - TimestampColumn: "first", - TimestampFormat: "02/01/06 03:04:05 PM", - TimeFunc: DefaultTime, - Timezone: "Asia/Jakarta", - } + p, err := NewParser( + &Config{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "02/01/06 03:04:05 PM", + TimeFunc: DefaultTime, + Timezone: "Asia/Jakarta", + }, + ) + require.NoError(t, err) testCSV := `line1,line2,line3 23/05/09 11:05:06 PM,70,test_name 07/11/09 11:05:06 PM,80,test_name2` @@ -451,3 +512,84 @@ func TestTimestampTimezone(t *testing.T) { require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094706000000000)) require.Equal(t, metrics[1].Time().UnixNano(), int64(1257609906000000000)) } + +func TestEmptyMeasurementName(t *testing.T) { + p, err := NewParser( + &Config{ + MetricName: "csv", + HeaderRowCount: 1, + ColumnNames: []string{"", "b"}, + MeasurementColumn: "", + }, + ) + require.NoError(t, err) + testCSV := `,b +1,2` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric("csv", + map[string]string{}, + map[string]interface{}{ + "b": 2, + }, + time.Unix(0, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime()) +} + +func TestNumericMeasurementName(t *testing.T) { + p, err := NewParser( + &Config{ + MetricName: "csv", + HeaderRowCount: 1, + ColumnNames: []string{"a", "b"}, + MeasurementColumn: "a", + }, + ) + require.NoError(t, err) + testCSV := `a,b +1,2` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric("1", + map[string]string{}, + map[string]interface{}{ + "b": 2, + }, + time.Unix(0, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime()) +} + +func TestStaticMeasurementName(t *testing.T) { + p, err := NewParser( + &Config{ + MetricName: "csv", + HeaderRowCount: 1, + ColumnNames: []string{"a", "b"}, + }, + ) + require.NoError(t, err) + testCSV := `a,b +1,2` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric("csv", + map[string]string{}, + map[string]interface{}{ + "a": 1, + "b": 2, + }, + time.Unix(0, 0), + ), + } + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime()) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index ac8c74381..729ed048c 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -2,7 +2,6 @@ package parsers import ( "fmt" - "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/parsers/collectd" @@ -206,21 +205,25 @@ func NewParser(config *Config) (Parser, error) { config.GrokTimezone, config.GrokUniqueTimestamp) case "csv": - parser, err = newCSVParser(config.MetricName, - config.CSVHeaderRowCount, - config.CSVSkipRows, - config.CSVSkipColumns, - config.CSVDelimiter, - config.CSVComment, - config.CSVTrimSpace, - config.CSVColumnNames, - config.CSVColumnTypes, - config.CSVTagColumns, - config.CSVMeasurementColumn, - config.CSVTimestampColumn, - config.CSVTimestampFormat, - config.CSVTimezone, - config.DefaultTags) + config := &csv.Config{ + MetricName: config.MetricName, + HeaderRowCount: config.CSVHeaderRowCount, + SkipRows: config.CSVSkipRows, + SkipColumns: config.CSVSkipColumns, + Delimiter: config.CSVDelimiter, + Comment: config.CSVComment, + TrimSpace: config.CSVTrimSpace, + ColumnNames: config.CSVColumnNames, + ColumnTypes: config.CSVColumnTypes, + TagColumns: config.CSVTagColumns, + MeasurementColumn: config.CSVMeasurementColumn, + TimestampColumn: config.CSVTimestampColumn, + TimestampFormat: config.CSVTimestampFormat, + Timezone: config.CSVTimezone, + DefaultTags: config.DefaultTags, + } + + return csv.NewParser(config) case "logfmt": parser, err = NewLogFmtParser(config.MetricName, config.DefaultTags) case "form_urlencoded": @@ -235,66 +238,6 @@ func NewParser(config *Config) (Parser, error) { return parser, err } -func newCSVParser(metricName string, - headerRowCount int, - skipRows int, - skipColumns int, - delimiter string, - comment string, - trimSpace bool, - columnNames []string, - columnTypes []string, - tagColumns []string, - nameColumn string, - timestampColumn string, - timestampFormat string, - timezone string, - defaultTags map[string]string) (Parser, error) { - - if headerRowCount == 0 && len(columnNames) == 0 { - return nil, fmt.Errorf("`csv_header_row_count` must be defined if `csv_column_names` is not specified") - } - - if delimiter != "" { - runeStr := []rune(delimiter) - if len(runeStr) > 1 { - return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", delimiter) - } - } - - if comment != "" { - runeStr := []rune(comment) - if len(runeStr) > 1 { - return nil, fmt.Errorf("csv_delimiter must be a single character, got: %s", comment) - } - } - - if len(columnNames) > 0 && len(columnTypes) > 0 && len(columnNames) != len(columnTypes) { - return nil, fmt.Errorf("csv_column_names field count doesn't match with csv_column_types") - } - - parser := &csv.Parser{ - MetricName: metricName, - HeaderRowCount: headerRowCount, - SkipRows: skipRows, - SkipColumns: skipColumns, - Delimiter: delimiter, - Comment: comment, - TrimSpace: trimSpace, - ColumnNames: columnNames, - ColumnTypes: columnTypes, - TagColumns: tagColumns, - MeasurementColumn: nameColumn, - TimestampColumn: timestampColumn, - TimestampFormat: timestampFormat, - Timezone: timezone, - DefaultTags: defaultTags, - TimeFunc: time.Now, - } - - return parser, nil -} - func newGrokParser(metricName string, patterns []string, nPatterns []string, cPatterns string, cPatternFiles []string,