Fix segfault in processors/parser (#9283)
This commit is contained in:
parent
908ad2f6ce
commit
6cc942fa6e
|
|
@ -1,19 +1,19 @@
|
|||
package parser
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/models"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/influxdata/telegraf/plugins/processors"
|
||||
)
|
||||
|
||||
type Parser struct {
|
||||
parsers.Config
|
||||
DropOriginal bool `toml:"drop_original"`
|
||||
Merge string `toml:"merge"`
|
||||
ParseFields []string `toml:"parse_fields"`
|
||||
Parser parsers.Parser
|
||||
DropOriginal bool `toml:"drop_original"`
|
||||
Merge string `toml:"merge"`
|
||||
ParseFields []string `toml:"parse_fields"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
parser parsers.Parser
|
||||
}
|
||||
|
||||
var SampleConfig = `
|
||||
|
|
@ -43,13 +43,14 @@ func (p *Parser) Description() string {
|
|||
}
|
||||
|
||||
func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
|
||||
if p.Parser == nil {
|
||||
if p.parser == nil {
|
||||
var err error
|
||||
p.Parser, err = parsers.NewParser(&p.Config)
|
||||
p.parser, err = parsers.NewParser(&p.Config)
|
||||
if err != nil {
|
||||
log.Printf("E! [processors.parser] could not create parser: %v", err)
|
||||
p.Log.Errorf("could not create parser: %v", err)
|
||||
return metrics
|
||||
}
|
||||
models.SetLoggerOnPlugin(p.parser, p.Log)
|
||||
}
|
||||
|
||||
results := []telegraf.Metric{}
|
||||
|
|
@ -67,7 +68,7 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
|
|||
case string:
|
||||
fromFieldMetric, err := p.parseField(value)
|
||||
if err != nil {
|
||||
log.Printf("E! [processors.parser] could not parse field %s: %v", key, err)
|
||||
p.Log.Errorf("could not parse field %s: %v", key, err)
|
||||
}
|
||||
|
||||
for _, m := range fromFieldMetric {
|
||||
|
|
@ -81,7 +82,7 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
|
|||
// prior to returning.
|
||||
newMetrics = append(newMetrics, fromFieldMetric...)
|
||||
default:
|
||||
log.Printf("E! [processors.parser] field '%s' not a string, skipping", key)
|
||||
p.Log.Errorf("field '%s' not a string, skipping", key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -114,7 +115,7 @@ func merge(base telegraf.Metric, metrics []telegraf.Metric) telegraf.Metric {
|
|||
}
|
||||
|
||||
func (p *Parser) parseField(value string) ([]telegraf.Metric, error) {
|
||||
return p.Parser.Parse([]byte(value))
|
||||
return p.parser.Parse([]byte(value))
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
|
|
|||
|
|
@ -7,17 +7,19 @@ import (
|
|||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
//compares metrics without comparing time
|
||||
func compareMetrics(t *testing.T, expected, actual []telegraf.Metric) {
|
||||
assert.Equal(t, len(expected), len(actual))
|
||||
for i, metric := range actual {
|
||||
require.Equal(t, expected[i].Name(), metric.Name())
|
||||
require.Equal(t, expected[i].Fields(), metric.Fields())
|
||||
require.Equal(t, expected[i].Tags(), metric.Tags())
|
||||
require.Equal(t, len(expected), len(actual))
|
||||
for i, m := range actual {
|
||||
require.Equal(t, expected[i].Name(), m.Name())
|
||||
require.Equal(t, expected[i].Fields(), m.Fields())
|
||||
require.Equal(t, expected[i].Tags(), m.Tags())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -503,6 +505,7 @@ func TestApply(t *testing.T) {
|
|||
ParseFields: tt.parseFields,
|
||||
DropOriginal: tt.dropOriginal,
|
||||
Merge: tt.merge,
|
||||
Log: testutil.Logger{Name: "processor.parser"},
|
||||
}
|
||||
|
||||
output := parser.Apply(tt.input)
|
||||
|
|
@ -573,6 +576,7 @@ func TestBadApply(t *testing.T) {
|
|||
parser := Parser{
|
||||
Config: tt.config,
|
||||
ParseFields: tt.parseFields,
|
||||
Log: testutil.Logger{Name: "processor.parser"},
|
||||
}
|
||||
|
||||
output := parser.Apply(tt.input)
|
||||
|
|
@ -584,17 +588,17 @@ func TestBadApply(t *testing.T) {
|
|||
|
||||
// Benchmarks
|
||||
|
||||
func getMetricFields(metric telegraf.Metric) interface{} {
|
||||
func getMetricFields(m telegraf.Metric) interface{} {
|
||||
key := "field3"
|
||||
if value, ok := metric.Fields()[key]; ok {
|
||||
if value, ok := m.Fields()[key]; ok {
|
||||
return value
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMetricFieldList(metric telegraf.Metric) interface{} {
|
||||
func getMetricFieldList(m telegraf.Metric) interface{} {
|
||||
key := "field3"
|
||||
fields := metric.FieldList()
|
||||
fields := m.FieldList()
|
||||
for _, field := range fields {
|
||||
if field.Key == key {
|
||||
return field.Value
|
||||
|
|
|
|||
Loading…
Reference in New Issue