feat(processors.parser): Allow also non-string fields (#13553)

This commit is contained in:
Sven Rebhan 2023-07-10 13:55:55 +02:00 committed by GitHub
parent c050c010bc
commit c28df89c8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 122 additions and 25 deletions

View File

@ -2,10 +2,13 @@
package parser package parser
import ( import (
"bytes"
_ "embed" _ "embed"
gobin "encoding/binary"
"fmt" "fmt"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/processors"
) )
@ -50,34 +53,36 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
// parse fields // parse fields
for _, key := range p.ParseFields { for _, key := range p.ParseFields {
for _, field := range metric.FieldList() { for _, field := range metric.FieldList() {
if field.Key == key { if field.Key != key {
switch value := field.Value.(type) { continue
case string: }
fromFieldMetric, err := p.parseValue(value) value, err := p.toBytes(field.Value)
if err != nil { if err != nil {
p.Log.Errorf("could not parse field %s: %v", key, err) p.Log.Errorf("could not convert field %s: %v; skipping", key, err)
} continue
}
fromFieldMetric, err := p.parser.Parse(value)
if err != nil {
p.Log.Errorf("could not parse field %s: %v", key, err)
continue
}
for _, m := range fromFieldMetric { for _, m := range fromFieldMetric {
// The parser get the parent plugin's name as // The parser get the parent plugin's name as
// default measurement name. Thus, in case the // default measurement name. Thus, in case the
// parsed metric does not provide a name itself, // parsed metric does not provide a name itself,
// the parser will return 'parser' as we are in // the parser will return 'parser' as we are in
// processors.parser. In those cases we want to // processors.parser. In those cases we want to
// keep the original metric name. // keep the original metric name.
if m.Name() == "" || m.Name() == "parser" { if m.Name() == "" || m.Name() == "parser" {
m.SetName(metric.Name()) m.SetName(metric.Name())
}
}
// multiple parsed fields shouldn't create multiple
// metrics so we'll merge tags/fields down into one
// prior to returning.
newMetrics = append(newMetrics, fromFieldMetric...)
default:
p.Log.Errorf("field %q not a string, skipping", key)
} }
} }
// multiple parsed fields shouldn't create multiple
// metrics so we'll merge tags/fields down into one
// prior to returning.
newMetrics = append(newMetrics, fromFieldMetric...)
} }
} }
@ -153,6 +158,19 @@ func (p *Parser) parseValue(value string) ([]telegraf.Metric, error) {
return p.parser.Parse([]byte(value)) return p.parser.Parse([]byte(value))
} }
func (p *Parser) toBytes(value interface{}) ([]byte, error) {
if v, ok := value.(string); ok {
return []byte(v), nil
}
var buf bytes.Buffer
if err := gobin.Write(&buf, internal.HostEndianess, value); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func init() { func init() {
processors.Add("parser", func() telegraf.Processor { processors.Add("parser", func() telegraf.Processor {
return &Parser{DropOriginal: false} return &Parser{DropOriginal: false}

View File

@ -9,6 +9,7 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/binary"
"github.com/influxdata/telegraf/plugins/parsers/grok" "github.com/influxdata/telegraf/plugins/parsers/grok"
"github.com/influxdata/telegraf/plugins/parsers/influx" "github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/plugins/parsers/json" "github.com/influxdata/telegraf/plugins/parsers/json"
@ -628,6 +629,84 @@ func TestApply(t *testing.T) {
time.Unix(1593287020, 0)), time.Unix(1593287020, 0)),
}, },
}, },
{
name: "non-string field with binary parser",
parseFields: []string{"value"},
merge: "override",
parser: &binary.Parser{
Configs: []binary.Config{
{
MetricName: "parser",
Entries: []binary.Entry{
{
Name: "alarm_0",
Type: "bool",
Bits: 1,
},
{
Name: "alarm_1",
Type: "bool",
Bits: 1,
},
{
Name: "alarm_2",
Type: "bool",
Bits: 1,
},
{
Name: "alarm_3",
Type: "bool",
Bits: 1,
},
{
Name: "alarm_4",
Type: "bool",
Bits: 1,
},
{
Name: "alarm_5",
Type: "bool",
Bits: 1,
},
{
Name: "alarm_6",
Type: "bool",
Bits: 1,
},
{
Name: "alarm_7",
Type: "bool",
Bits: 1,
},
},
},
},
},
input: metric.New(
"myname",
map[string]string{},
map[string]interface{}{
"value": uint8(13),
},
time.Unix(1593287020, 0)),
expected: []telegraf.Metric{
metric.New(
"myname",
map[string]string{},
map[string]interface{}{
"value": uint8(13),
"alarm_0": false,
"alarm_1": false,
"alarm_2": false,
"alarm_3": false,
"alarm_4": true,
"alarm_5": true,
"alarm_6": false,
"alarm_7": true,
},
time.Unix(1593287020, 0)),
},
},
} }
for _, tt := range tests { for _, tt := range tests {