From 8ddbab47a46e256281392ad0aac876715189c117 Mon Sep 17 00:00:00 2001 From: Nicolas Filotto Date: Mon, 1 Feb 2021 20:34:44 +0100 Subject: [PATCH] Allow to provide constants to a starlark script (#8772) --- plugins/processors/starlark/README.md | 30 ++++ plugins/processors/starlark/field_dict.go | 50 ++++-- plugins/processors/starlark/starlark.go | 24 ++- plugins/processors/starlark/starlark_test.go | 160 ++++++++++++++++++- 4 files changed, 249 insertions(+), 15 deletions(-) diff --git a/plugins/processors/starlark/README.md b/plugins/processors/starlark/README.md index a22296f48..03d9f7a93 100644 --- a/plugins/processors/starlark/README.md +++ b/plugins/processors/starlark/README.md @@ -30,6 +30,13 @@ def apply(metric): ## File containing a Starlark script. # script = "/usr/local/bin/myscript.star" + + ## The constants of the Starlark script. + # [processors.starlark.constants] + # max_size = 10 + # threshold = 0.75 + # default_name = "Julia" + # debug_mode = true ``` ### Usage @@ -182,6 +189,29 @@ def apply(metric): def failing(metric): json.decode("non-json-content") ``` +**How to reuse the same script but with different parameters?** + +In case you have a generic script that you would like to reuse for different instances of the plugin, you can use constants as input parameters of your script. + +So for example, assuming that you have the next configuration: + +```toml +[[processors.starlark]] + script = "/usr/local/bin/myscript.star" + + [processors.starlark.constants] + somecustomnum = 10 + somecustomstr = "mycustomfield" +``` + +Your script could then use the constants defined in the configuration as follows: + +```python +def apply(metric): + if metric.fields[somecustomstr] >= somecustomnum: + metric.fields.clear() + return metric +``` ### Examples diff --git a/plugins/processors/starlark/field_dict.go b/plugins/processors/starlark/field_dict.go index e0c0349b6..1e48ac7c0 100644 --- a/plugins/processors/starlark/field_dict.go +++ b/plugins/processors/starlark/field_dict.go @@ -3,6 +3,7 @@ package starlark import ( "errors" "fmt" + "reflect" "strings" "github.com/influxdata/telegraf" @@ -210,17 +211,44 @@ func (i *FieldIterator) Done() { // AsStarlarkValue converts a field value to a starlark.Value. func asStarlarkValue(value interface{}) (starlark.Value, error) { - switch v := value.(type) { - case float64: - return starlark.Float(v), nil - case int64: - return starlark.MakeInt64(v), nil - case uint64: - return starlark.MakeUint64(v), nil - case string: - return starlark.String(v), nil - case bool: - return starlark.Bool(v), nil + v := reflect.ValueOf(value) + switch v.Kind() { + case reflect.Slice: + length := v.Len() + array := make([]starlark.Value, length) + for i := 0; i < length; i++ { + sVal, err := asStarlarkValue(v.Index(i).Interface()) + if err != nil { + return starlark.None, err + } + array[i] = sVal + } + return starlark.NewList(array), nil + case reflect.Map: + dict := starlark.NewDict(v.Len()) + iter := v.MapRange() + for iter.Next() { + sKey, err := asStarlarkValue(iter.Key().Interface()) + if err != nil { + return starlark.None, err + } + sValue, err := asStarlarkValue(iter.Value().Interface()) + if err != nil { + return starlark.None, err + } + dict.SetKey(sKey, sValue) + } + return dict, nil + case reflect.Float32, reflect.Float64: + return starlark.Float(v.Float()), nil + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return starlark.MakeInt64(v.Int()), nil + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + return starlark.MakeUint64(v.Uint()), nil + case reflect.String: + return starlark.String(v.String()), nil + case reflect.Bool: + return starlark.Bool(v.Bool()), nil } return starlark.None, errors.New("invalid type") diff --git a/plugins/processors/starlark/starlark.go b/plugins/processors/starlark/starlark.go index 9a055ce56..64666398d 100644 --- a/plugins/processors/starlark/starlark.go +++ b/plugins/processors/starlark/starlark.go @@ -27,12 +27,20 @@ def apply(metric): ## File containing a Starlark script. # script = "/usr/local/bin/myscript.star" + + ## The constants of the Starlark script. + # [processors.starlark.constants] + # max_size = 10 + # threshold = 0.75 + # default_name = "Julia" + # debug_mode = true ` ) type Starlark struct { - Source string `toml:"source"` - Script string `toml:"script"` + Source string `toml:"source"` + Script string `toml:"script"` + Constants map[string]interface{} `toml:"constants"` Log telegraf.Logger `toml:"-"` @@ -61,6 +69,7 @@ func (s *Starlark) Init() error { builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric) builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy) builtins["catch"] = starlark.NewBuiltin("catch", catch) + s.addConstants(&builtins) program, err := s.sourceProgram(builtins) if err != nil { @@ -197,6 +206,17 @@ func (s *Starlark) Stop() error { return nil } +// Add all the constants defined in the plugin as constants of the script +func (s *Starlark) addConstants(builtins *starlark.StringDict) { + for key, val := range s.Constants { + sVal, err := asStarlarkValue(val) + if err != nil { + s.Log.Errorf("Unsupported type: %T", val) + } + (*builtins)[key] = sVal + } +} + func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool { for _, m := range metrics { if m == metric { diff --git a/plugins/processors/starlark/starlark_test.go b/plugins/processors/starlark/starlark_test.go index afcb72102..f506e26ec 100644 --- a/plugins/processors/starlark/starlark_test.go +++ b/plugins/processors/starlark/starlark_test.go @@ -1,6 +1,7 @@ package starlark import ( + "errors" "fmt" "io/ioutil" "os" @@ -10,6 +11,7 @@ import ( "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -250,6 +252,7 @@ func TestMetric(t *testing.T) { var tests = []struct { name string source string + constants map[string]interface{} input []telegraf.Metric expected []telegraf.Metric expectedErrorStr string @@ -2418,13 +2421,64 @@ def process(metric): ), }, }, + { + name: "support constants", + source: ` +def apply(metric): + metric.fields["p1"] = max_size + metric.fields["p2"] = threshold + metric.fields["p3"] = default_name + metric.fields["p4"] = debug_mode + metric.fields["p5"] = supported_values[0] + metric.fields["p6"] = supported_values[1] + metric.fields["p7"] = supported_entries[2] + metric.fields["p8"] = supported_entries["3"] + return metric + `, + constants: map[string]interface{}{ + "max_size": 10, + "threshold": 0.75, + "default_name": "Julia", + "debug_mode": true, + "supported_values": []interface{}{2, "3"}, + "supported_entries": map[interface{}]interface{}{ + 2: "two", + "3": "three", + }, + "unsupported_type": time.Now(), + }, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "p1": 10, + "p2": 0.75, + "p3": "Julia", + "p4": true, + "p5": 2, + "p6": "3", + "p7": "two", + "p8": "three", + }, + time.Unix(0, 0), + ), + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { plugin := &Starlark{ - Source: tt.source, - Log: testutil.Logger{}, + Source: tt.source, + Log: testutil.Logger{}, + Constants: tt.constants, } err := plugin.Init() require.NoError(t, err) @@ -2451,6 +2505,108 @@ def process(metric): } } +// Tests the behavior of the plugin according the provided TOML configuration. +func TestConfig(t *testing.T) { + var tests = []struct { + name string + config string + input []telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "support constants from configuration", + config: ` +[[processors.starlark]] + source = ''' +def apply(metric): + metric.fields["p1"] = max_size + metric.fields["p2"] = threshold + metric.fields["p3"] = default_name + metric.fields["p4"] = debug_mode + metric.fields["p5"] = supported_values[0] + metric.fields["p6"] = supported_values[1] + metric.fields["p7"] = supported_entries["2"] + metric.fields["p8"] = supported_entries["3"] + return metric +''' + [processors.starlark.constants] + max_size = 10 + threshold = 0.75 + default_name = "Elsa" + debug_mode = true + supported_values = ["2", "3"] + supported_entries = { "2" = "two", "3" = "three" } + unsupported_type = 2009-06-12 + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "p1": 10, + "p2": 0.75, + "p3": "Elsa", + "p4": true, + "p5": "2", + "p6": "3", + "p7": "two", + "p8": "three", + }, + time.Unix(0, 0), + ), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin, err := buildPlugin(tt.config) + require.NoError(t, err) + err = plugin.Init() + require.NoError(t, err) + + var acc testutil.Accumulator + + err = plugin.Start(&acc) + require.NoError(t, err) + + for _, m := range tt.input { + err = plugin.Add(m, &acc) + require.NoError(t, err) + } + + err = plugin.Stop() + require.NoError(t, err) + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics()) + }) + } +} + +// Build a Starlark plugin from the provided configuration. +func buildPlugin(configContent string) (*Starlark, error) { + c := config.NewConfig() + err := c.LoadConfigData([]byte(configContent)) + if err != nil { + return nil, err + } + if len(c.Processors) != 1 { + return nil, errors.New("Only one processor was expected") + } + plugin, ok := (c.Processors[0].Processor).(*Starlark) + if !ok { + return nil, errors.New("Only a Starlark processor was expected") + } + plugin.Log = testutil.Logger{} + return plugin, nil +} + func TestScript(t *testing.T) { var tests = []struct { name string