feat(processors.parser): Add base64 decode for fields (#15328)

This commit is contained in:
Dane Strandboge 2024-05-22 07:18:13 -05:00 committed by GitHub
parent 3f612ef857
commit 6f294220c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 192 additions and 31 deletions

View File

@ -20,6 +20,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## The name of the fields whose value will be parsed.
parse_fields = ["message"]
## Fields to base64 decode.
## These fields do not need to be specified in parse_fields.
## Fields specified here will have base64 decode applied to them.
# parse_fields_base64 = []
## The name of the tags whose value will be parsed.
# parse_tags = []

View File

@ -4,8 +4,10 @@ package parser
import (
"bytes"
_ "embed"
"encoding/base64"
gobin "encoding/binary"
"fmt"
"slices"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
@ -19,6 +21,7 @@ type Parser struct {
DropOriginal bool `toml:"drop_original"`
Merge string `toml:"merge"`
ParseFields []string `toml:"parse_fields"`
Base64Fields []string `toml:"parse_fields_base64"`
ParseTags []string `toml:"parse_tags"`
Log telegraf.Logger `toml:"-"`
parser telegraf.Parser
@ -53,39 +56,57 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
}
// parse fields
for _, key := range p.ParseFields {
for _, field := range metric.FieldList() {
if field.Key != key {
continue
}
value, err := p.toBytes(field.Value)
if err != nil {
p.Log.Errorf("could not convert field %s: %v; skipping", key, err)
continue
}
fromFieldMetric, err := p.parser.Parse(value)
if err != nil {
p.Log.Errorf("could not parse field %s: %v", key, err)
continue
}
for _, field := range metric.FieldList() {
plain := slices.Contains(p.ParseFields, field.Key)
b64 := slices.Contains(p.Base64Fields, field.Key)
for _, m := range fromFieldMetric {
// The parser get the parent plugin's name as
// default measurement name. Thus, in case the
// parsed metric does not provide a name itself,
// the parser will return 'parser' as we are in
// processors.parser. In those cases we want to
// keep the original metric name.
if m.Name() == "" || m.Name() == "parser" {
m.SetName(metric.Name())
}
}
// multiple parsed fields shouldn't create multiple
// metrics so we'll merge tags/fields down into one
// prior to returning.
newMetrics = append(newMetrics, fromFieldMetric...)
if !plain && !b64 {
continue
}
if plain && b64 {
p.Log.Errorf("field %s is listed in both parse fields and base64 fields; skipping", field.Key)
continue
}
value, err := p.toBytes(field.Value)
if err != nil {
p.Log.Errorf("could not convert field %s: %v; skipping", field.Key, err)
continue
}
if b64 {
decoded := make([]byte, base64.StdEncoding.DecodedLen(len(value)))
n, err := base64.StdEncoding.Decode(decoded, value)
if err != nil {
p.Log.Errorf("could not decode base64 field %s: %v; skipping", field.Key, err)
continue
}
value = decoded[:n]
}
fromFieldMetric, err := p.parser.Parse(value)
if err != nil {
p.Log.Errorf("could not parse field %s: %v", field.Key, err)
continue
}
for _, m := range fromFieldMetric {
// The parser get the parent plugin's name as
// default measurement name. Thus, in case the
// parsed metric does not provide a name itself,
// the parser will return 'parser' as we are in
// processors.parser. In those cases we want to
// keep the original metric name.
if m.Name() == "" || m.Name() == "parser" {
m.SetName(metric.Name())
}
}
// multiple parsed fields shouldn't create multiple
// metrics so we'll merge tags/fields down into one
// prior to returning.
newMetrics = append(newMetrics, fromFieldMetric...)
}
// parse tags

View File

@ -24,6 +24,7 @@ func TestApply(t *testing.T) {
name string
parseFields []string
parseTags []string
parseBase64 []string
parser telegraf.Parser
dropOriginal bool
merge string
@ -708,6 +709,103 @@ func TestApply(t *testing.T) {
time.Unix(1593287020, 0)),
},
},
{
name: "test base 64 field single",
parseBase64: []string{"sample"},
dropOriginal: true,
parser: &json.Parser{
TagKeys: []string{
"text",
},
},
input: metric.New(
"singleField",
map[string]string{
"some": "tag",
},
map[string]interface{}{
"sample": `eyJ0ZXh0IjogInRlc3QgYmFzZTY0In0=`,
},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"singleField",
map[string]string{
"text": "test base64",
},
map[string]interface{}{},
time.Unix(0, 0)),
},
},
{
name: "parse two base64 fields",
parseBase64: []string{"field_1", "field_2"},
dropOriginal: true,
parser: &json.Parser{
TagKeys: []string{"lvl", "msg", "err", "fatal"},
},
input: metric.New(
"bigMeasure",
map[string]string{},
map[string]interface{}{
"field_1": `eyJsdmwiOiJpbmZvIiwibXNnIjoiaHR0cCByZXF1ZXN0In0=`,
"field_2": `eyJlcnIiOiJmYXRhbCIsImZhdGFsIjoic2VjdXJpdHkgdGhyZWF0In0=`,
},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"bigMeasure",
map[string]string{
"lvl": "info",
"msg": "http request",
},
map[string]interface{}{},
time.Unix(0, 0)),
metric.New(
"bigMeasure",
map[string]string{
"err": "fatal",
"fatal": "security threat",
},
map[string]interface{}{},
time.Unix(0, 0)),
},
},
{
name: "parse two fields, one base64",
parseFields: []string{"field_2"},
parseBase64: []string{"field_1"},
dropOriginal: true,
parser: &json.Parser{
TagKeys: []string{"lvl", "msg", "err", "fatal"},
},
input: metric.New(
"bigMeasure",
map[string]string{},
map[string]interface{}{
"field_1": `eyJsdmwiOiJpbmZvIiwibXNnIjoiaHR0cCByZXF1ZXN0In0=`,
"field_2": `{"err":"fatal","fatal":"security threat"}`,
},
time.Unix(0, 0)),
expected: []telegraf.Metric{
metric.New(
"bigMeasure",
map[string]string{
"lvl": "info",
"msg": "http request",
},
map[string]interface{}{},
time.Unix(0, 0)),
metric.New(
"bigMeasure",
map[string]string{
"err": "fatal",
"fatal": "security threat",
},
map[string]interface{}{},
time.Unix(0, 0)),
},
},
}
for _, tt := range tests {
@ -718,6 +816,7 @@ func TestApply(t *testing.T) {
plugin := Parser{
ParseFields: tt.parseFields,
ParseTags: tt.parseTags,
Base64Fields: tt.parseBase64,
DropOriginal: tt.dropOriginal,
Merge: tt.merge,
Log: testutil.Logger{Name: "processor.parser"},
@ -812,6 +911,37 @@ func TestBadApply(t *testing.T) {
}
}
func TestBase64FieldValidation(t *testing.T) {
testMetric := metric.New(
"test",
map[string]string{},
map[string]interface{}{
"b": `eyJsdmwiOiJpbmZvIiwibXNnIjoiaHR0cCByZXF1ZXN0In0=`,
},
time.Unix(0, 0))
testLogger := &testutil.CaptureLogger{}
plugin := &Parser{
ParseFields: []string{"a"},
Base64Fields: []string{"b"},
Log: testLogger,
}
plugin.SetParser(&json.Parser{})
require.NoError(t, plugin.Init())
plugin.Apply(testMetric)
require.Empty(t, testLogger.Errors())
plugin = &Parser{
ParseFields: []string{"b"},
Base64Fields: []string{"b"},
Log: testLogger,
}
plugin.SetParser(&json.Parser{})
require.NoError(t, plugin.Init())
plugin.Apply(testMetric)
require.NotEmpty(t, testLogger.Errors())
}
func TestTracking(t *testing.T) {
var testCases = []struct {
name string

View File

@ -3,6 +3,11 @@
## The name of the fields whose value will be parsed.
parse_fields = ["message"]
## Fields to base64 decode.
## These fields do not need to be specified in parse_fields.
## Fields specified here will have base64 decode applied to them.
# parse_fields_base64 = []
## The name of the tags whose value will be parsed.
# parse_tags = []