feat: migrate wavefront parser to new style (#11374)

This commit is contained in:
Sebastian Spaink 2022-06-27 14:02:50 -05:00 committed by GitHub
parent 9177b274e4
commit 6b009f3072
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 32 deletions

View File

@ -5,5 +5,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/parsers/csv" _ "github.com/influxdata/telegraf/plugins/parsers/csv"
_ "github.com/influxdata/telegraf/plugins/parsers/json" _ "github.com/influxdata/telegraf/plugins/parsers/json"
_ "github.com/influxdata/telegraf/plugins/parsers/json_v2" _ "github.com/influxdata/telegraf/plugins/parsers/json_v2"
_ "github.com/influxdata/telegraf/plugins/parsers/wavefront"
_ "github.com/influxdata/telegraf/plugins/parsers/xpath" _ "github.com/influxdata/telegraf/plugins/parsers/xpath"
) )

View File

@ -18,7 +18,6 @@ import (
"github.com/influxdata/telegraf/plugins/parsers/temporary/json_v2" "github.com/influxdata/telegraf/plugins/parsers/temporary/json_v2"
"github.com/influxdata/telegraf/plugins/parsers/temporary/xpath" "github.com/influxdata/telegraf/plugins/parsers/temporary/xpath"
"github.com/influxdata/telegraf/plugins/parsers/value" "github.com/influxdata/telegraf/plugins/parsers/value"
"github.com/influxdata/telegraf/plugins/parsers/wavefront"
) )
// Creator is the function to create a new parser // Creator is the function to create a new parser
@ -232,8 +231,6 @@ func NewParser(config *Config) (Parser, error) {
config.DefaultTags, config.DefaultTags,
config.Separator, config.Separator,
config.Templates) config.Templates)
case "wavefront":
parser, err = NewWavefrontParser(config.DefaultTags)
case "grok": case "grok":
parser, err = newGrokParser( parser, err = newGrokParser(
config.MetricName, config.MetricName,
@ -365,10 +362,6 @@ func NewLogFmtParser(metricName string, defaultTags map[string]string, tagKeys [
return parser, err return parser, err
} }
func NewWavefrontParser(defaultTags map[string]string) (Parser, error) {
return wavefront.NewWavefrontParser(defaultTags), nil
}
func NewFormUrlencodedParser( func NewFormUrlencodedParser(
metricName string, metricName string,
defaultTags map[string]string, defaultTags map[string]string,

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
) )
const MaxBufferSize = 2 const MaxBufferSize = 2
@ -22,10 +23,10 @@ type Point struct {
Tags map[string]string Tags map[string]string
} }
type WavefrontParser struct { type Parser struct {
parsers *sync.Pool parsers *sync.Pool
defaultTags map[string]string DefaultTags map[string]string `toml:"-"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
} }
// PointParser is a thread-unsafe parser and must be kept in a pool. // PointParser is a thread-unsafe parser and must be kept in a pool.
@ -39,7 +40,7 @@ type PointParser struct {
scanBuf bytes.Buffer // buffer reused for scanning tokens scanBuf bytes.Buffer // buffer reused for scanning tokens
writeBuf bytes.Buffer // buffer reused for parsing elements writeBuf bytes.Buffer // buffer reused for parsing elements
Elements []ElementParser Elements []ElementParser
parent *WavefrontParser parent *Parser
} }
// NewWavefrontElements returns a slice of ElementParser's for the Graphite format // NewWavefrontElements returns a slice of ElementParser's for the Graphite format
@ -53,22 +54,17 @@ func NewWavefrontElements() []ElementParser {
return elements return elements
} }
func NewWavefrontParser(defaultTags map[string]string) *WavefrontParser { func (p *Parser) Init() error {
wp := &WavefrontParser{defaultTags: defaultTags} p.parsers = &sync.Pool{
wp.parsers = &sync.Pool{
New: func() interface{} { New: func() interface{} {
return NewPointParser(wp) elements := NewWavefrontElements()
return &PointParser{Elements: elements, parent: p}
}, },
} }
return wp return nil
} }
func NewPointParser(parent *WavefrontParser) *PointParser { func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
elements := NewWavefrontElements()
return &PointParser{Elements: elements, parent: parent}
}
func (p *WavefrontParser) ParseLine(line string) (telegraf.Metric, error) {
buf := []byte(line) buf := []byte(line)
metrics, err := p.Parse(buf) metrics, err := p.Parse(buf)
@ -83,7 +79,7 @@ func (p *WavefrontParser) ParseLine(line string) (telegraf.Metric, error) {
return nil, nil return nil, nil
} }
func (p *WavefrontParser) Parse(buf []byte) ([]telegraf.Metric, error) { func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
pp := p.parsers.Get().(*PointParser) pp := p.parsers.Get().(*PointParser)
defer p.parsers.Put(pp) defer p.parsers.Put(pp)
return pp.Parse(buf) return pp.Parse(buf)
@ -127,8 +123,8 @@ func (p *PointParser) Parse(buf []byte) ([]telegraf.Metric, error) {
return metrics, nil return metrics, nil
} }
func (p *WavefrontParser) SetDefaultTags(tags map[string]string) { func (p *Parser) SetDefaultTags(tags map[string]string) {
p.defaultTags = tags p.DefaultTags = tags
} }
func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.Metric, error) { func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.Metric, error) {
@ -140,7 +136,7 @@ func (p *PointParser) convertPointToTelegrafMetric(points []Point) ([]telegraf.M
tags[k] = v tags[k] = v
} }
// apply default tags after parsed tags // apply default tags after parsed tags
for k, v := range p.parent.defaultTags { for k, v := range p.parent.DefaultTags {
tags[k] = v tags[k] = v
} }
@ -218,3 +214,14 @@ func (p *PointParser) reset(buf []byte) {
} }
p.buf.n = 0 p.buf.n = 0
} }
func (p *Parser) InitFromConfig(_ *parsers.Config) error {
return p.Init()
}
func init() {
parsers.Add("wavefront",
func(_ string) telegraf.Parser {
return &Parser{}
})
}

View File

@ -11,7 +11,8 @@ import (
) )
func TestParse(t *testing.T) { func TestParse(t *testing.T) {
parser := NewWavefrontParser(nil) parser := &Parser{}
require.NoError(t, parser.Init())
parsedMetrics, err := parser.Parse([]byte("test.metric 1")) parsedMetrics, err := parser.Parse([]byte("test.metric 1"))
require.NoError(t, err) require.NoError(t, err)
@ -78,7 +79,8 @@ func TestParse(t *testing.T) {
} }
func TestParseLine(t *testing.T) { func TestParseLine(t *testing.T) {
parser := NewWavefrontParser(nil) parser := &Parser{}
require.NoError(t, parser.Init())
parsedMetric, err := parser.ParseLine("test.metric 1") parsedMetric, err := parser.ParseLine("test.metric 1")
require.NoError(t, err) require.NoError(t, err)
@ -113,7 +115,8 @@ func TestParseLine(t *testing.T) {
} }
func TestParseMultiple(t *testing.T) { func TestParseMultiple(t *testing.T) {
parser := NewWavefrontParser(nil) parser := &Parser{}
require.NoError(t, parser.Init())
parsedMetrics, err := parser.Parse([]byte("test.metric 1\ntest.metric2 2 1530939936")) parsedMetrics, err := parser.Parse([]byte("test.metric 1\ntest.metric2 2 1530939936"))
require.NoError(t, err) require.NoError(t, err)
@ -148,7 +151,8 @@ func TestParseMultiple(t *testing.T) {
} }
func TestParseSpecial(t *testing.T) { func TestParseSpecial(t *testing.T) {
parser := NewWavefrontParser(nil) parser := &Parser{}
require.NoError(t, parser.Init())
parsedMetric, err := parser.ParseLine("\"test.metric\" 1 1530939936") parsedMetric, err := parser.ParseLine("\"test.metric\" 1 1530939936")
require.NoError(t, err) require.NoError(t, err)
@ -162,7 +166,8 @@ func TestParseSpecial(t *testing.T) {
} }
func TestParseInvalid(t *testing.T) { func TestParseInvalid(t *testing.T) {
parser := NewWavefrontParser(nil) parser := &Parser{}
require.NoError(t, parser.Init())
_, err := parser.Parse([]byte("test.metric")) _, err := parser.Parse([]byte("test.metric"))
require.Error(t, err) require.Error(t, err)
@ -193,7 +198,9 @@ func TestParseInvalid(t *testing.T) {
} }
func TestParseDefaultTags(t *testing.T) { func TestParseDefaultTags(t *testing.T) {
parser := NewWavefrontParser(map[string]string{"myDefault": "value1", "another": "test2"}) parser := &Parser{}
require.NoError(t, parser.Init())
parser.SetDefaultTags(map[string]string{"myDefault": "value1", "another": "test2"})
parsedMetrics, err := parser.Parse([]byte("test.metric 1 1530939936")) parsedMetrics, err := parser.Parse([]byte("test.metric 1 1530939936"))
require.NoError(t, err) require.NoError(t, err)