From 6f294220c20bb89e431fe5677870d89059c67aa9 Mon Sep 17 00:00:00 2001 From: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> Date: Wed, 22 May 2024 07:18:13 -0500 Subject: [PATCH] feat(processors.parser): Add base64 decode for fields (#15328) --- plugins/processors/parser/README.md | 5 + plugins/processors/parser/parser.go | 83 +++++++++------ plugins/processors/parser/parser_test.go | 130 +++++++++++++++++++++++ plugins/processors/parser/sample.conf | 5 + 4 files changed, 192 insertions(+), 31 deletions(-) diff --git a/plugins/processors/parser/README.md b/plugins/processors/parser/README.md index 529315075..6219ead0b 100644 --- a/plugins/processors/parser/README.md +++ b/plugins/processors/parser/README.md @@ -20,6 +20,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## The name of the fields whose value will be parsed. parse_fields = ["message"] + ## Fields to base64 decode. + ## These fields do not need to be specified in parse_fields. + ## Fields specified here will have base64 decode applied to them. + # parse_fields_base64 = [] + ## The name of the tags whose value will be parsed. # parse_tags = [] diff --git a/plugins/processors/parser/parser.go b/plugins/processors/parser/parser.go index 350b4bcdb..ae1911f17 100644 --- a/plugins/processors/parser/parser.go +++ b/plugins/processors/parser/parser.go @@ -4,8 +4,10 @@ package parser import ( "bytes" _ "embed" + "encoding/base64" gobin "encoding/binary" "fmt" + "slices" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -19,6 +21,7 @@ type Parser struct { DropOriginal bool `toml:"drop_original"` Merge string `toml:"merge"` ParseFields []string `toml:"parse_fields"` + Base64Fields []string `toml:"parse_fields_base64"` ParseTags []string `toml:"parse_tags"` Log telegraf.Logger `toml:"-"` parser telegraf.Parser @@ -53,39 +56,57 @@ func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric { } // parse fields - for _, key := range p.ParseFields { - for _, field := range metric.FieldList() { - if field.Key != key { - continue - } - value, err := p.toBytes(field.Value) - if err != nil { - p.Log.Errorf("could not convert field %s: %v; skipping", key, err) - continue - } - fromFieldMetric, err := p.parser.Parse(value) - if err != nil { - p.Log.Errorf("could not parse field %s: %v", key, err) - continue - } + for _, field := range metric.FieldList() { + plain := slices.Contains(p.ParseFields, field.Key) + b64 := slices.Contains(p.Base64Fields, field.Key) - for _, m := range fromFieldMetric { - // The parser get the parent plugin's name as - // default measurement name. Thus, in case the - // parsed metric does not provide a name itself, - // the parser will return 'parser' as we are in - // processors.parser. In those cases we want to - // keep the original metric name. - if m.Name() == "" || m.Name() == "parser" { - m.SetName(metric.Name()) - } - } - - // multiple parsed fields shouldn't create multiple - // metrics so we'll merge tags/fields down into one - // prior to returning. - newMetrics = append(newMetrics, fromFieldMetric...) + if !plain && !b64 { + continue } + + if plain && b64 { + p.Log.Errorf("field %s is listed in both parse fields and base64 fields; skipping", field.Key) + continue + } + + value, err := p.toBytes(field.Value) + if err != nil { + p.Log.Errorf("could not convert field %s: %v; skipping", field.Key, err) + continue + } + + if b64 { + decoded := make([]byte, base64.StdEncoding.DecodedLen(len(value))) + n, err := base64.StdEncoding.Decode(decoded, value) + if err != nil { + p.Log.Errorf("could not decode base64 field %s: %v; skipping", field.Key, err) + continue + } + value = decoded[:n] + } + + fromFieldMetric, err := p.parser.Parse(value) + if err != nil { + p.Log.Errorf("could not parse field %s: %v", field.Key, err) + continue + } + + for _, m := range fromFieldMetric { + // The parser get the parent plugin's name as + // default measurement name. Thus, in case the + // parsed metric does not provide a name itself, + // the parser will return 'parser' as we are in + // processors.parser. In those cases we want to + // keep the original metric name. + if m.Name() == "" || m.Name() == "parser" { + m.SetName(metric.Name()) + } + } + + // multiple parsed fields shouldn't create multiple + // metrics so we'll merge tags/fields down into one + // prior to returning. + newMetrics = append(newMetrics, fromFieldMetric...) } // parse tags diff --git a/plugins/processors/parser/parser_test.go b/plugins/processors/parser/parser_test.go index b662224f8..3f48a1013 100644 --- a/plugins/processors/parser/parser_test.go +++ b/plugins/processors/parser/parser_test.go @@ -24,6 +24,7 @@ func TestApply(t *testing.T) { name string parseFields []string parseTags []string + parseBase64 []string parser telegraf.Parser dropOriginal bool merge string @@ -708,6 +709,103 @@ func TestApply(t *testing.T) { time.Unix(1593287020, 0)), }, }, + { + name: "test base 64 field single", + parseBase64: []string{"sample"}, + dropOriginal: true, + parser: &json.Parser{ + TagKeys: []string{ + "text", + }, + }, + input: metric.New( + "singleField", + map[string]string{ + "some": "tag", + }, + map[string]interface{}{ + "sample": `eyJ0ZXh0IjogInRlc3QgYmFzZTY0In0=`, + }, + time.Unix(0, 0)), + expected: []telegraf.Metric{ + metric.New( + "singleField", + map[string]string{ + "text": "test base64", + }, + map[string]interface{}{}, + time.Unix(0, 0)), + }, + }, + { + name: "parse two base64 fields", + parseBase64: []string{"field_1", "field_2"}, + dropOriginal: true, + parser: &json.Parser{ + TagKeys: []string{"lvl", "msg", "err", "fatal"}, + }, + input: metric.New( + "bigMeasure", + map[string]string{}, + map[string]interface{}{ + "field_1": `eyJsdmwiOiJpbmZvIiwibXNnIjoiaHR0cCByZXF1ZXN0In0=`, + "field_2": `eyJlcnIiOiJmYXRhbCIsImZhdGFsIjoic2VjdXJpdHkgdGhyZWF0In0=`, + }, + time.Unix(0, 0)), + expected: []telegraf.Metric{ + metric.New( + "bigMeasure", + map[string]string{ + "lvl": "info", + "msg": "http request", + }, + map[string]interface{}{}, + time.Unix(0, 0)), + metric.New( + "bigMeasure", + map[string]string{ + "err": "fatal", + "fatal": "security threat", + }, + map[string]interface{}{}, + time.Unix(0, 0)), + }, + }, + { + name: "parse two fields, one base64", + parseFields: []string{"field_2"}, + parseBase64: []string{"field_1"}, + dropOriginal: true, + parser: &json.Parser{ + TagKeys: []string{"lvl", "msg", "err", "fatal"}, + }, + input: metric.New( + "bigMeasure", + map[string]string{}, + map[string]interface{}{ + "field_1": `eyJsdmwiOiJpbmZvIiwibXNnIjoiaHR0cCByZXF1ZXN0In0=`, + "field_2": `{"err":"fatal","fatal":"security threat"}`, + }, + time.Unix(0, 0)), + expected: []telegraf.Metric{ + metric.New( + "bigMeasure", + map[string]string{ + "lvl": "info", + "msg": "http request", + }, + map[string]interface{}{}, + time.Unix(0, 0)), + metric.New( + "bigMeasure", + map[string]string{ + "err": "fatal", + "fatal": "security threat", + }, + map[string]interface{}{}, + time.Unix(0, 0)), + }, + }, } for _, tt := range tests { @@ -718,6 +816,7 @@ func TestApply(t *testing.T) { plugin := Parser{ ParseFields: tt.parseFields, ParseTags: tt.parseTags, + Base64Fields: tt.parseBase64, DropOriginal: tt.dropOriginal, Merge: tt.merge, Log: testutil.Logger{Name: "processor.parser"}, @@ -812,6 +911,37 @@ func TestBadApply(t *testing.T) { } } +func TestBase64FieldValidation(t *testing.T) { + testMetric := metric.New( + "test", + map[string]string{}, + map[string]interface{}{ + "b": `eyJsdmwiOiJpbmZvIiwibXNnIjoiaHR0cCByZXF1ZXN0In0=`, + }, + time.Unix(0, 0)) + + testLogger := &testutil.CaptureLogger{} + plugin := &Parser{ + ParseFields: []string{"a"}, + Base64Fields: []string{"b"}, + Log: testLogger, + } + plugin.SetParser(&json.Parser{}) + require.NoError(t, plugin.Init()) + plugin.Apply(testMetric) + require.Empty(t, testLogger.Errors()) + + plugin = &Parser{ + ParseFields: []string{"b"}, + Base64Fields: []string{"b"}, + Log: testLogger, + } + plugin.SetParser(&json.Parser{}) + require.NoError(t, plugin.Init()) + plugin.Apply(testMetric) + require.NotEmpty(t, testLogger.Errors()) +} + func TestTracking(t *testing.T) { var testCases = []struct { name string diff --git a/plugins/processors/parser/sample.conf b/plugins/processors/parser/sample.conf index 5275f282a..7e8763e74 100644 --- a/plugins/processors/parser/sample.conf +++ b/plugins/processors/parser/sample.conf @@ -3,6 +3,11 @@ ## The name of the fields whose value will be parsed. parse_fields = ["message"] + ## Fields to base64 decode. + ## These fields do not need to be specified in parse_fields. + ## Fields specified here will have base64 decode applied to them. + # parse_fields_base64 = [] + ## The name of the tags whose value will be parsed. # parse_tags = []