diff --git a/plugins/parsers/all/all.go b/plugins/parsers/all/all.go index 32821d732..24f433128 100644 --- a/plugins/parsers/all/all.go +++ b/plugins/parsers/all/all.go @@ -5,5 +5,6 @@ import ( _ "github.com/influxdata/telegraf/plugins/parsers/csv" _ "github.com/influxdata/telegraf/plugins/parsers/json" _ "github.com/influxdata/telegraf/plugins/parsers/json_v2" + _ "github.com/influxdata/telegraf/plugins/parsers/wavefront" _ "github.com/influxdata/telegraf/plugins/parsers/xpath" ) diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index b97a045c6..79276282a 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -18,7 +18,6 @@ import ( "github.com/influxdata/telegraf/plugins/parsers/temporary/json_v2" "github.com/influxdata/telegraf/plugins/parsers/temporary/xpath" "github.com/influxdata/telegraf/plugins/parsers/value" - "github.com/influxdata/telegraf/plugins/parsers/wavefront" ) // Creator is the function to create a new parser @@ -232,8 +231,6 @@ func NewParser(config *Config) (Parser, error) { config.DefaultTags, config.Separator, config.Templates) - case "wavefront": - parser, err = NewWavefrontParser(config.DefaultTags) case "grok": parser, err = newGrokParser( config.MetricName, @@ -365,10 +362,6 @@ func NewLogFmtParser(metricName string, defaultTags map[string]string, tagKeys [ return parser, err } -func NewWavefrontParser(defaultTags map[string]string) (Parser, error) { - return wavefront.NewWavefrontParser(defaultTags), nil -} - func NewFormUrlencodedParser( metricName string, defaultTags map[string]string, diff --git a/plugins/parsers/wavefront/parser.go b/plugins/parsers/wavefront/parser.go index 6ef509cad..7bae2725f 100644 --- a/plugins/parsers/wavefront/parser.go +++ b/plugins/parsers/wavefront/parser.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" ) const MaxBufferSize = 2 @@ -22,10 +23,10 @@ type Point struct { Tags map[string]string } -type WavefrontParser struct { +type Parser struct { parsers *sync.Pool - defaultTags map[string]string - Log telegraf.Logger `toml:"-"` + DefaultTags map[string]string `toml:"-"` + Log telegraf.Logger `toml:"-"` } // PointParser is a thread-unsafe parser and must be kept in a pool. @@ -39,7 +40,7 @@ type PointParser struct { scanBuf bytes.Buffer // buffer reused for scanning tokens writeBuf bytes.Buffer // buffer reused for parsing elements Elements []ElementParser - parent *WavefrontParser + parent *Parser } // NewWavefrontElements returns a slice of ElementParser's for the Graphite format @@ -53,22 +54,17 @@ func NewWavefrontElements() []ElementParser { return elements } -func NewWavefrontParser(defaultTags map[string]string) *WavefrontParser { - wp := &WavefrontParser{defaultTags: defaultTags} - wp.parsers = &sync.Pool{ +func (p *Parser) Init() error { + p.parsers = &sync.Pool{ New: func() interface{} { - return NewPointParser(wp) + elements := NewWavefrontElements() + return &PointParser{Elements: elements, parent: p} }, } - return wp + return nil } -func NewPointParser(parent *WavefrontParser) *PointParser { - elements := NewWavefrontElements() - return &PointParser{Elements: elements, parent: parent} -} - -func (p *WavefrontParser) ParseLine(line string) (telegraf.Metric, error) { +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { buf := []byte(line) metrics, err := p.Parse(buf) @@ -83,7 +79,7 @@ func (p *WavefrontParser) ParseLine(line string) (telegraf.Metric, error) { return nil, nil } -func (p *WavefrontParser) Parse(buf []byte) ([]telegraf.Metric, error) { +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { pp := p.parsers.Get().(*PointParser) defer p.parsers.Put(pp) return pp.Parse(buf) @@ -127,8 +123,8 @@ func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) { return metrics, nil } -func (p *WavefrontParser) SetDefaultTags(tags map[string]string) { - p.defaultTags = tags +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags } func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.Metric, error) { @@ -140,7 +136,7 @@ func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.M tags[k] = v } // apply default tags after parsed tags - for k, v := range p.parent.defaultTags { + for k, v := range p.parent.DefaultTags { tags[k] = v } @@ -218,3 +214,14 @@ func (p *PointParser) reset(buf []byte) { } p.buf.n = 0 } + +func (p *Parser) InitFromConfig(_ *parsers.Config) error { + return p.Init() +} + +func init() { + parsers.Add("wavefront", + func(_ string) telegraf.Parser { + return &Parser{} + }) +} diff --git a/plugins/parsers/wavefront/parser_test.go b/plugins/parsers/wavefront/parser_test.go index 5b655b73d..aadf140d1 100644 --- a/plugins/parsers/wavefront/parser_test.go +++ b/plugins/parsers/wavefront/parser_test.go @@ -11,7 +11,8 @@ import ( ) func TestParse(t *testing.T) { - parser := NewWavefrontParser(nil) + parser := &Parser{} + require.NoError(t, parser.Init()) parsedMetrics, err := parser.Parse([]byte("test.metric 1")) require.NoError(t, err) @@ -78,7 +79,8 @@ func TestParse(t *testing.T) { } func TestParseLine(t *testing.T) { - parser := NewWavefrontParser(nil) + parser := &Parser{} + require.NoError(t, parser.Init()) parsedMetric, err := parser.ParseLine("test.metric 1") require.NoError(t, err) @@ -113,7 +115,8 @@ func TestParseLine(t *testing.T) { } func TestParseMultiple(t *testing.T) { - parser := NewWavefrontParser(nil) + parser := &Parser{} + require.NoError(t, parser.Init()) parsedMetrics, err := parser.Parse([]byte("test.metric 1\ntest.metric2 2 1530939936")) require.NoError(t, err) @@ -148,7 +151,8 @@ func TestParseMultiple(t *testing.T) { } func TestParseSpecial(t *testing.T) { - parser := NewWavefrontParser(nil) + parser := &Parser{} + require.NoError(t, parser.Init()) parsedMetric, err := parser.ParseLine("\"test.metric\" 1 1530939936") require.NoError(t, err) @@ -162,7 +166,8 @@ func TestParseSpecial(t *testing.T) { } func TestParseInvalid(t *testing.T) { - parser := NewWavefrontParser(nil) + parser := &Parser{} + require.NoError(t, parser.Init()) _, err := parser.Parse([]byte("test.metric")) require.Error(t, err) @@ -193,7 +198,9 @@ func TestParseInvalid(t *testing.T) { } func TestParseDefaultTags(t *testing.T) { - parser := NewWavefrontParser(map[string]string{"myDefault": "value1", "another": "test2"}) + parser := &Parser{} + require.NoError(t, parser.Init()) + parser.SetDefaultTags(map[string]string{"myDefault": "value1", "another": "test2"}) parsedMetrics, err := parser.Parse([]byte("test.metric 1 1530939936")) require.NoError(t, err)