diff --git a/plugins/aggregators/all/all.go b/plugins/aggregators/all/all.go index 20d5b5ea2..c3a6f274b 100644 --- a/plugins/aggregators/all/all.go +++ b/plugins/aggregators/all/all.go @@ -9,5 +9,6 @@ import ( _ "github.com/influxdata/telegraf/plugins/aggregators/merge" _ "github.com/influxdata/telegraf/plugins/aggregators/minmax" _ "github.com/influxdata/telegraf/plugins/aggregators/quantile" + _ "github.com/influxdata/telegraf/plugins/aggregators/starlark" _ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter" ) diff --git a/plugins/aggregators/starlark/README.md b/plugins/aggregators/starlark/README.md new file mode 100644 index 000000000..01bcf963c --- /dev/null +++ b/plugins/aggregators/starlark/README.md @@ -0,0 +1,103 @@ +# Starlark Aggregator + +The `starlark` aggregator allows to implement a custom aggregator plugin with a Starlark script. The Starlark +script needs to be composed of the three methods defined in the Aggregator plugin interface which are `add`, `push` and `reset`. + +The Starlark Aggregator plugin calls the Starlark function `add` to add the metrics to the aggregator, then calls the Starlark function `push` to push the resulting metrics into the accumulator and finally calls the Starlark function `reset` to reset the entire state of the plugin. + +The Starlark functions can use the global function `state` to keep temporary the metrics to aggregate. + +The Starlark language is a dialect of Python, and will be familiar to those who +have experience with the Python language. However, there are major [differences](#python-differences). +Existing Python code is unlikely to work unmodified. The execution environment +is sandboxed, and it is not possible to do I/O operations such as reading from +files or sockets. + +The **[Starlark specification][]** has details about the syntax and available +functions. + +## Configuration + +```toml +[[aggregators.starlark]] + ## The Starlark source can be set as a string in this configuration file, or + ## by referencing a file containing the script. Only one source or script + ## should be set at once. + ## + ## Source of the Starlark script. + source = ''' +state = {} + +def add(metric): + state["last"] = metric + +def push(): + return state.get("last") + +def reset(): + state.clear() +''' + + ## File containing a Starlark script. + # script = "/usr/local/bin/myscript.star" + + ## The constants of the Starlark script. + # [aggregators.starlark.constants] + # max_size = 10 + # threshold = 0.75 + # default_name = "Julia" + # debug_mode = true +``` + +## Usage + +The Starlark code should contain a function called `add` that takes a metric as argument. +The function will be called with each metric to add, and doesn't return anything. + +```python +def add(metric): + state["last"] = metric +``` + +The Starlark code should also contain a function called `push` that doesn't take any argument. +The function will be called to compute the aggregation, and returns the metrics to push to the accumulator. + +```python +def push(): + return state.get("last") +``` + +The Starlark code should also contain a function called `reset` that doesn't take any argument. +The function will be called to reset the plugin, and doesn't return anything. + +```python +def push(): + state.clear() +``` + +For a list of available types and functions that can be used in the code, see +the [Starlark specification][]. + +## Python Differences + +Refer to the section [Python Differences](plugins/processors/starlark/README.md#python-differences) of the documentation about the Starlark processor. + +## Libraries available + +Refer to the section [Libraries available](plugins/processors/starlark/README.md#libraries-available) of the documentation about the Starlark processor. + +## Common Questions + +Refer to the section [Common Questions](plugins/processors/starlark/README.md#common-questions) of the documentation about the Starlark processor. + +## Examples + +- [minmax](/plugins/aggregators/starlark/testdata/min_max.star) - A minmax aggregator implemented with a Starlark script. +- [merge](/plugins/aggregators/starlark/testdata/merge.star) - A merge aggregator implemented with a Starlark script. + +[All examples](/plugins/aggregators/starlark/testdata) are in the testdata folder. + +Open a Pull Request to add any other useful Starlark examples. + +[Starlark specification]: https://github.com/google/starlark-go/blob/master/doc/spec.md +[dict]: https://github.com/google/starlark-go/blob/master/doc/spec.md#dictionaries diff --git a/plugins/aggregators/starlark/starlark.go b/plugins/aggregators/starlark/starlark.go new file mode 100644 index 000000000..9fa7d9d62 --- /dev/null +++ b/plugins/aggregators/starlark/starlark.go @@ -0,0 +1,144 @@ +package starlark //nolint - Needed to avoid getting import-shadowing: The name 'starlark' shadows an import name (revive) + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/aggregators" + common "github.com/influxdata/telegraf/plugins/common/starlark" + "go.starlark.net/starlark" +) + +const ( + description = "Aggregate metrics using a Starlark script" + sampleConfig = ` + ## The Starlark source can be set as a string in this configuration file, or + ## by referencing a file containing the script. Only one source or script + ## should be set at once. + ## + ## Source of the Starlark script. + source = ''' +state = {} + +def add(metric): + state["last"] = metric + +def push(): + return state.get("last") + +def reset(): + state.clear() +''' + + ## File containing a Starlark script. + # script = "/usr/local/bin/myscript.star" + + ## The constants of the Starlark script. + # [aggregators.starlark.constants] + # max_size = 10 + # threshold = 0.75 + # default_name = "Julia" + # debug_mode = true +` +) + +type Starlark struct { + common.StarlarkCommon +} + +func (s *Starlark) Init() error { + // Execute source + err := s.StarlarkCommon.Init() + if err != nil { + return err + } + + // The source should define an add function. + err = s.AddFunction("add", &common.Metric{}) + if err != nil { + return err + } + + // The source should define a push function. + err = s.AddFunction("push") + if err != nil { + return err + } + + // The source should define a reset function. + err = s.AddFunction("reset") + if err != nil { + return err + } + + return nil +} + +func (s *Starlark) SampleConfig() string { + return sampleConfig +} + +func (s *Starlark) Description() string { + return description +} + +func (s *Starlark) Add(metric telegraf.Metric) { + parameters, found := s.GetParameters("add") + if !found { + s.Log.Errorf("The parameters of the add function could not be found") + return + } + parameters[0].(*common.Metric).Wrap(metric) + + _, err := s.Call("add") + if err != nil { + s.LogError(err) + } +} + +func (s *Starlark) Push(acc telegraf.Accumulator) { + rv, err := s.Call("push") + if err != nil { + s.LogError(err) + acc.AddError(err) + return + } + + switch rv := rv.(type) { + case *starlark.List: + iter := rv.Iterate() + defer iter.Done() + var v starlark.Value + for iter.Next(&v) { + switch v := v.(type) { + case *common.Metric: + m := v.Unwrap() + acc.AddMetric(m) + default: + s.Log.Errorf("Invalid type returned in list: %s", v.Type()) + } + } + case *common.Metric: + m := rv.Unwrap() + acc.AddMetric(m) + case starlark.NoneType: + default: + s.Log.Errorf("Invalid type returned: %T", rv) + } +} + +func (s *Starlark) Reset() { + _, err := s.Call("reset") + if err != nil { + s.LogError(err) + } +} + +// init initializes starlark aggregator plugin +func init() { + aggregators.Add("starlark", func() telegraf.Aggregator { + return &Starlark{ + StarlarkCommon: common.StarlarkCommon{ + StarlarkLoadFunc: common.LoadFunc, + }, + } + }) +} diff --git a/plugins/aggregators/starlark/starlark_test.go b/plugins/aggregators/starlark/starlark_test.go new file mode 100644 index 000000000..a45f9e84c --- /dev/null +++ b/plugins/aggregators/starlark/starlark_test.go @@ -0,0 +1,432 @@ +package starlark + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + common "github.com/influxdata/telegraf/plugins/common/starlark" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +var m1 = metric.New("m1", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "a": int64(1), + "b": int64(1), + "c": int64(1), + "d": int64(1), + "e": int64(1), + "f": int64(2), + "g": int64(2), + "h": int64(2), + "i": int64(2), + "j": int64(3), + }, + time.Now(), +) +var m2 = metric.New("m1", + map[string]string{"foo": "bar"}, + map[string]interface{}{ + "a": int64(1), + "b": int64(3), + "c": int64(3), + "d": int64(3), + "e": int64(3), + "f": int64(1), + "g": int64(1), + "h": int64(1), + "i": int64(1), + "j": int64(1), + "k": int64(200), + "l": int64(200), + "ignoreme": "string", + "andme": true, + }, + time.Now(), +) + +func BenchmarkApply(b *testing.B) { + minmax, _ := newMinMax() + + for n := 0; n < b.N; n++ { + minmax.Add(m1) + minmax.Add(m2) + } +} + +// Test two metrics getting added. +func TestMinMaxWithPeriod(t *testing.T) { + acc := testutil.Accumulator{} + minmax, err := newMinMax() + require.NoError(t, err) + + minmax.Add(m1) + minmax.Add(m2) + minmax.Push(&acc) + + expectedFields := map[string]interface{}{ + "a_max": int64(1), + "a_min": int64(1), + "b_max": int64(3), + "b_min": int64(1), + "c_max": int64(3), + "c_min": int64(1), + "d_max": int64(3), + "d_min": int64(1), + "e_max": int64(3), + "e_min": int64(1), + "f_max": int64(2), + "f_min": int64(1), + "g_max": int64(2), + "g_min": int64(1), + "h_max": int64(2), + "h_min": int64(1), + "i_max": int64(2), + "i_min": int64(1), + "j_max": int64(3), + "j_min": int64(1), + "k_max": int64(200), + "k_min": int64(200), + "l_max": int64(200), + "l_min": int64(200), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +// Test two metrics getting added with a push/reset in between (simulates +// getting added in different periods.) +func TestMinMaxDifferentPeriods(t *testing.T) { + acc := testutil.Accumulator{} + minmax, err := newMinMax() + require.NoError(t, err) + minmax.Add(m1) + minmax.Push(&acc) + expectedFields := map[string]interface{}{ + "a_max": int64(1), + "a_min": int64(1), + "b_max": int64(1), + "b_min": int64(1), + "c_max": int64(1), + "c_min": int64(1), + "d_max": int64(1), + "d_min": int64(1), + "e_max": int64(1), + "e_min": int64(1), + "f_max": int64(2), + "f_min": int64(2), + "g_max": int64(2), + "g_min": int64(2), + "h_max": int64(2), + "h_min": int64(2), + "i_max": int64(2), + "i_min": int64(2), + "j_max": int64(3), + "j_min": int64(3), + } + expectedTags := map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) + + acc.ClearMetrics() + minmax.Reset() + minmax.Add(m2) + minmax.Push(&acc) + expectedFields = map[string]interface{}{ + "a_max": int64(1), + "a_min": int64(1), + "b_max": int64(3), + "b_min": int64(3), + "c_max": int64(3), + "c_min": int64(3), + "d_max": int64(3), + "d_min": int64(3), + "e_max": int64(3), + "e_min": int64(3), + "f_max": int64(1), + "f_min": int64(1), + "g_max": int64(1), + "g_min": int64(1), + "h_max": int64(1), + "h_min": int64(1), + "i_max": int64(1), + "i_min": int64(1), + "j_max": int64(1), + "j_min": int64(1), + "k_max": int64(200), + "k_min": int64(200), + "l_max": int64(200), + "l_min": int64(200), + } + expectedTags = map[string]string{ + "foo": "bar", + } + acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) +} + +func newMinMax() (*Starlark, error) { + return newStarlarkFromScript("testdata/min_max.star") +} + +func TestSimple(t *testing.T) { + plugin, err := newMerge() + require.NoError(t, err) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + ) + require.NoError(t, err) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 0), + ), + ) + require.NoError(t, err) + + var acc testutil.Accumulator + plugin.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + "time_guest": 42, + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +func TestNanosecondPrecision(t *testing.T) { + plugin, err := newMerge() + + require.NoError(t, err) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 1), + ), + ) + require.NoError(t, err) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 1), + ), + ) + require.NoError(t, err) + + var acc testutil.Accumulator + acc.SetPrecision(time.Second) + plugin.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + "time_guest": 42, + }, + time.Unix(0, 1), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +func TestReset(t *testing.T) { + plugin, err := newMerge() + + require.NoError(t, err) + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + ) + require.NoError(t, err) + + var acc testutil.Accumulator + plugin.Push(&acc) + + plugin.Reset() + + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 0), + ), + ) + require.NoError(t, err) + + plugin.Push(&acc) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_guest": 42, + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics()) +} + +func newMerge() (*Starlark, error) { + return newStarlarkFromScript("testdata/merge.star") +} + +func TestLastFromSource(t *testing.T) { + acc := testutil.Accumulator{} + plugin, err := newStarlarkFromSource(` +state = {} +def add(metric): + state["last"] = metric + +def push(): + return state.get("last") + +def reset(): + state.clear() +`) + require.NoError(t, err) + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + ) + require.NoError(t, err) + plugin.Add( + testutil.MustMetric( + "cpu", + map[string]string{ + "cpu": "cpu2", + }, + map[string]interface{}{ + "time_idle": 31, + }, + time.Unix(0, 0), + ), + ) + require.NoError(t, err) + plugin.Push(&acc) + expectedFields := map[string]interface{}{ + "time_idle": int64(31), + } + expectedTags := map[string]string{ + "cpu": "cpu2", + } + acc.AssertContainsTaggedFields(t, "cpu", expectedFields, expectedTags) + plugin.Reset() +} + +func newStarlarkFromSource(source string) (*Starlark, error) { + plugin := &Starlark{ + StarlarkCommon: common.StarlarkCommon{ + StarlarkLoadFunc: common.LoadFunc, + Log: testutil.Logger{}, + Source: source, + }, + } + err := plugin.Init() + if err != nil { + return nil, err + } + return plugin, nil +} + +func newStarlarkFromScript(script string) (*Starlark, error) { + plugin := &Starlark{ + StarlarkCommon: common.StarlarkCommon{ + StarlarkLoadFunc: common.LoadFunc, + Log: testutil.Logger{}, + Script: script, + }, + } + err := plugin.Init() + if err != nil { + return nil, err + } + return plugin, nil +} diff --git a/plugins/aggregators/starlark/testdata/merge.star b/plugins/aggregators/starlark/testdata/merge.star new file mode 100644 index 000000000..77c5148ca --- /dev/null +++ b/plugins/aggregators/starlark/testdata/merge.star @@ -0,0 +1,31 @@ +# Example of a merge aggregator implemented with a starlark script. +load('time.star', 'time') +state = {} +def add(metric): + metrics = state.get("metrics") + if metrics == None: + metrics = {} + state["metrics"] = metrics + state["ordered"] = [] + gId = groupID(metric) + m = metrics.get(gId) + if m == None: + m = deepcopy(metric) + metrics[gId] = m + state["ordered"].append(m) + else: + for k, v in metric.fields.items(): + m.fields[k] = v + +def push(): + return state.get("ordered") + +def reset(): + state.clear() + +def groupID(metric): + key = metric.name + "-" + for k, v in metric.tags.items(): + key = key + k + "-" + v + "-" + key = key + "-" + str(metric.time) + return hash(key) \ No newline at end of file diff --git a/plugins/aggregators/starlark/testdata/min_max.star b/plugins/aggregators/starlark/testdata/min_max.star new file mode 100644 index 000000000..f8b23355c --- /dev/null +++ b/plugins/aggregators/starlark/testdata/min_max.star @@ -0,0 +1,53 @@ +# Example of a min_max aggregator implemented with a starlark script. + +supported_types = (["int", "float"]) +state = {} +def add(metric): + gId = groupID(metric) + aggregate = state.get(gId) + if aggregate == None: + aggregate = { + "name": metric.name, + "tags": metric.tags, + "fields": {} + } + for k, v in metric.fields.items(): + if type(v) in supported_types: + aggregate["fields"][k] = { + "min": v, + "max": v, + } + state[gId] = aggregate + else: + for k, v in metric.fields.items(): + if type(v) in supported_types: + min_max = aggregate["fields"].get(k) + if min_max == None: + aggregate["fields"][k] = { + "min": v, + "max": v, + } + elif v < min_max["min"]: + aggregate["fields"][k]["min"] = v + elif v > min_max["max"]: + aggregate["fields"][k]["max"] = v + +def push(): + metrics = [] + for a in state: + fields = {} + for k in state[a]["fields"]: + fields[k + "_min"] = state[a]["fields"][k]["min"] + fields[k + "_max"] = state[a]["fields"][k]["max"] + m = Metric(state[a]["name"], state[a]["tags"], fields) + metrics.append(m) + return metrics + +def reset(): + state.clear() + +def groupID(metric): + key = metric.name + "-" + for k, v in metric.tags.items(): + key = key + k + "-" + v + return hash(key) \ No newline at end of file diff --git a/plugins/processors/starlark/builtins.go b/plugins/common/starlark/builtins.go similarity index 91% rename from plugins/processors/starlark/builtins.go rename to plugins/common/starlark/builtins.go index 6876fe963..7adcd115d 100644 --- a/plugins/processors/starlark/builtins.go +++ b/plugins/common/starlark/builtins.go @@ -10,16 +10,42 @@ import ( ) func newMetric(_ *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { - var name starlark.String - if err := starlark.UnpackPositionalArgs("Metric", args, kwargs, 1, &name); err != nil { + var ( + name starlark.String + tags, fields starlark.Value + ) + if err := starlark.UnpackArgs("Metric", args, kwargs, "name", &name, "tags?", &tags, "fields?", &fields); err != nil { return nil, err } - m := metric.New(string(name), nil, nil, time.Now()) + allFields, err := toFields(fields) + if err != nil { + return nil, err + } + allTags, err := toTags(tags) + if err != nil { + return nil, err + } + + m := metric.New(string(name), allTags, allFields, time.Now()) return &Metric{metric: m}, nil } +func toString(value starlark.Value, errorMsg string) (string, error) { + if value, ok := value.(starlark.String); ok { + return string(value), nil + } + return "", fmt.Errorf(errorMsg, value) +} + +func items(value starlark.Value, errorMsg string) ([]starlark.Tuple, error) { + if iter, ok := value.(starlark.IterableMapping); ok { + return iter.Items(), nil + } + return nil, fmt.Errorf(errorMsg, value) +} + func deepcopy(_ *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { var sm *Metric if err := starlark.UnpackPositionalArgs("deepcopy", args, kwargs, 1, &sm); err != nil { diff --git a/plugins/processors/starlark/field_dict.go b/plugins/common/starlark/field_dict.go similarity index 91% rename from plugins/processors/starlark/field_dict.go rename to plugins/common/starlark/field_dict.go index 4a332b826..08f624902 100644 --- a/plugins/processors/starlark/field_dict.go +++ b/plugins/common/starlark/field_dict.go @@ -274,3 +274,27 @@ func asGoValue(value interface{}) (interface{}, error) { return nil, errors.New("invalid starlark type") } + +// ToFields converts a starlark.Value to a map of values. +func toFields(value starlark.Value) (map[string]interface{}, error) { + if value == nil { + return nil, nil + } + items, err := items(value, "The type %T is unsupported as type of collection of fields") + if err != nil { + return nil, err + } + result := make(map[string]interface{}, len(items)) + for _, item := range items { + key, err := toString(item[0], "The type %T is unsupported as type of key for fields") + if err != nil { + return nil, err + } + value, err := asGoValue(item[1]) + if err != nil { + return nil, err + } + result[key] = value + } + return result, nil +} diff --git a/plugins/processors/starlark/logging.go b/plugins/common/starlark/logging.go similarity index 100% rename from plugins/processors/starlark/logging.go rename to plugins/common/starlark/logging.go diff --git a/plugins/processors/starlark/metric.go b/plugins/common/starlark/metric.go similarity index 100% rename from plugins/processors/starlark/metric.go rename to plugins/common/starlark/metric.go diff --git a/plugins/common/starlark/starlark.go b/plugins/common/starlark/starlark.go new file mode 100644 index 000000000..5f3655198 --- /dev/null +++ b/plugins/common/starlark/starlark.go @@ -0,0 +1,182 @@ +package starlark //nolint - Needed to avoid getting import-shadowing: The name 'starlark' shadows an import name (revive) + +import ( + "errors" + "fmt" + "strings" + + "github.com/influxdata/telegraf" + "go.starlark.net/lib/math" + "go.starlark.net/lib/time" + "go.starlark.net/resolve" + "go.starlark.net/starlark" + "go.starlark.net/starlarkjson" +) + +type StarlarkCommon struct { + Source string `toml:"source"` + Script string `toml:"script"` + Constants map[string]interface{} `toml:"constants"` + + Log telegraf.Logger `toml:"-"` + StarlarkLoadFunc func(module string, logger telegraf.Logger) (starlark.StringDict, error) + + thread *starlark.Thread + globals starlark.StringDict + functions map[string]*starlark.Function + parameters map[string]starlark.Tuple +} + +func (s *StarlarkCommon) Init() error { + if s.Source == "" && s.Script == "" { + return errors.New("one of source or script must be set") + } + if s.Source != "" && s.Script != "" { + return errors.New("both source or script cannot be set") + } + + s.thread = &starlark.Thread{ + Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) }, + Load: func(thread *starlark.Thread, module string) (starlark.StringDict, error) { + return s.StarlarkLoadFunc(module, s.Log) + }, + } + + builtins := starlark.StringDict{} + builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric) + builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy) + builtins["catch"] = starlark.NewBuiltin("catch", catch) + err := s.addConstants(&builtins) + if err != nil { + return err + } + + program, err := s.sourceProgram(builtins, "") + if err != nil { + return err + } + + // Execute source + globals, err := program.Init(s.thread, builtins) + if err != nil { + return err + } + // Make available a shared state to the apply function + globals["state"] = starlark.NewDict(0) + + // Freeze the global state. This prevents modifications to the processor + // state and prevents scripts from containing errors storing tracking + // metrics. Tasks that require global state will not be possible due to + // this, so maybe we should relax this in the future. + globals.Freeze() + + s.globals = globals + s.functions = make(map[string]*starlark.Function) + s.parameters = make(map[string]starlark.Tuple) + return nil +} + +func (s *StarlarkCommon) GetParameters(name string) (starlark.Tuple, bool) { + parameters, found := s.parameters[name] + return parameters, found +} + +func (s *StarlarkCommon) AddFunction(name string, params ...starlark.Value) error { + globalFn, found := s.globals[name] + if !found { + return fmt.Errorf("%s is not defined", name) + } + + fn, found := globalFn.(*starlark.Function) + if !found { + return fmt.Errorf("%s is not a function", name) + } + + if fn.NumParams() != len(params) { + return fmt.Errorf("%s function must take %d parameter(s)", name, len(params)) + } + p := make(starlark.Tuple, len(params)) + for i, param := range params { + p[i] = param + } + s.functions[name] = fn + s.parameters[name] = params + return nil +} + +// Add all the constants defined in the plugin as constants of the script +func (s *StarlarkCommon) addConstants(builtins *starlark.StringDict) error { + for key, val := range s.Constants { + sVal, err := asStarlarkValue(val) + if err != nil { + return fmt.Errorf("converting type %T failed: %v", val, err) + } + (*builtins)[key] = sVal + } + return nil +} + +func (s *StarlarkCommon) sourceProgram(builtins starlark.StringDict, filename string) (*starlark.Program, error) { + var src interface{} + if s.Source != "" { + src = s.Source + } + _, program, err := starlark.SourceProgram(s.Script, src, builtins.Has) + return program, err +} + +// Call calls the function corresponding to the given name. +func (s *StarlarkCommon) Call(name string) (starlark.Value, error) { + fn, ok := s.functions[name] + if !ok { + return nil, fmt.Errorf("function %q does not exist", name) + } + args, ok := s.parameters[name] + if !ok { + return nil, fmt.Errorf("params for function %q do not exist", name) + } + return starlark.Call(s.thread, fn, args, nil) +} + +func (s *StarlarkCommon) LogError(err error) { + if err, ok := err.(*starlark.EvalError); ok { + for _, line := range strings.Split(err.Backtrace(), "\n") { + s.Log.Error(line) + } + } else { + s.Log.Error(err.Msg) + } +} + +func LoadFunc(module string, logger telegraf.Logger) (starlark.StringDict, error) { + switch module { + case "json.star": + return starlark.StringDict{ + "json": starlarkjson.Module, + }, nil + case "logging.star": + return starlark.StringDict{ + "log": LogModule(logger), + }, nil + case "math.star": + return starlark.StringDict{ + "math": math.Module, + }, nil + case "time.star": + return starlark.StringDict{ + "time": time.Module, + }, nil + default: + return nil, errors.New("module " + module + " is not available") + } +} + +func init() { + // https://github.com/bazelbuild/starlark/issues/20 + resolve.AllowNestedDef = true + resolve.AllowLambda = true + resolve.AllowFloat = true + resolve.AllowSet = true + resolve.AllowGlobalReassign = true + resolve.AllowRecursion = true +} diff --git a/plugins/processors/starlark/tag_dict.go b/plugins/common/starlark/tag_dict.go similarity index 87% rename from plugins/processors/starlark/tag_dict.go rename to plugins/common/starlark/tag_dict.go index 7dbb8c12d..999f87365 100644 --- a/plugins/processors/starlark/tag_dict.go +++ b/plugins/common/starlark/tag_dict.go @@ -196,3 +196,27 @@ func (i *TagIterator) Next(p *starlark.Value) bool { func (i *TagIterator) Done() { i.tagIterCount-- } + +// ToTags converts a starlark.Value to a map of string. +func toTags(value starlark.Value) (map[string]string, error) { + if value == nil { + return nil, nil + } + items, err := items(value, "The type %T is unsupported as type of collection of tags") + if err != nil { + return nil, err + } + result := make(map[string]string, len(items)) + for _, item := range items { + key, err := toString(item[0], "The type %T is unsupported as type of key for tags") + if err != nil { + return nil, err + } + value, err := toString(item[1], "The type %T is unsupported as type of value for tags") + if err != nil { + return nil, err + } + result[key] = value + } + return result, nil +} diff --git a/plugins/processors/starlark/starlark.go b/plugins/processors/starlark/starlark.go index 44f78fa6b..5bf441f2f 100644 --- a/plugins/processors/starlark/starlark.go +++ b/plugins/processors/starlark/starlark.go @@ -1,17 +1,12 @@ package starlark import ( - "errors" "fmt" - "strings" "github.com/influxdata/telegraf" + common "github.com/influxdata/telegraf/plugins/common/starlark" "github.com/influxdata/telegraf/plugins/processors" - "go.starlark.net/lib/math" - "go.starlark.net/lib/time" - "go.starlark.net/resolve" "go.starlark.net/starlark" - "go.starlark.net/starlarkjson" ) const ( @@ -40,97 +35,29 @@ def apply(metric): ) type Starlark struct { - Source string `toml:"source"` - Script string `toml:"script"` - Constants map[string]interface{} `toml:"constants"` + common.StarlarkCommon - Log telegraf.Logger `toml:"-"` - - thread *starlark.Thread - applyFunc *starlark.Function - args starlark.Tuple - results []telegraf.Metric - starlarkLoadFunc func(module string, logger telegraf.Logger) (starlark.StringDict, error) + results []telegraf.Metric } func (s *Starlark) Init() error { - if s.Source == "" && s.Script == "" { - return errors.New("one of source or script must be set") - } - if s.Source != "" && s.Script != "" { - return errors.New("both source or script cannot be set") - } - - s.thread = &starlark.Thread{ - Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) }, - Load: func(thread *starlark.Thread, module string) (starlark.StringDict, error) { - return s.starlarkLoadFunc(module, s.Log) - }, - } - - builtins := starlark.StringDict{} - builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric) - builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy) - builtins["catch"] = starlark.NewBuiltin("catch", catch) - s.addConstants(&builtins) - - program, err := s.sourceProgram(builtins) + err := s.StarlarkCommon.Init() if err != nil { return err } - // Execute source - globals, err := program.Init(s.thread, builtins) - if err != nil { - return err - } - - // Make available a shared state to the apply function - globals["state"] = starlark.NewDict(0) - - // Freeze the global state. This prevents modifications to the processor - // state and prevents scripts from containing errors storing tracking - // metrics. Tasks that require global state will not be possible due to - // this, so maybe we should relax this in the future. - globals.Freeze() - // The source should define an apply function. - apply := globals["apply"] - - if apply == nil { - return errors.New("apply is not defined") + err = s.AddFunction("apply", &common.Metric{}) + if err != nil { + return err } - var ok bool - if s.applyFunc, ok = apply.(*starlark.Function); !ok { - return errors.New("apply is not a function") - } - - if s.applyFunc.NumParams() != 1 { - return errors.New("apply function must take one parameter") - } - - // Reusing the same metric wrapper to skip an allocation. This will cause - // any saved references to point to the new metric, but due to freezing the - // globals none should exist. - s.args = make(starlark.Tuple, 1) - s.args[0] = &Metric{} - // Preallocate a slice for return values. s.results = make([]telegraf.Metric, 0, 10) return nil } -func (s *Starlark) sourceProgram(builtins starlark.StringDict) (*starlark.Program, error) { - if s.Source != "" { - _, program, err := starlark.SourceProgram("processor.starlark", s.Source, builtins.Has) - return program, err - } - _, program, err := starlark.SourceProgram(s.Script, nil, builtins.Has) - return program, err -} - func (s *Starlark) SampleConfig() string { return sampleConfig } @@ -144,15 +71,15 @@ func (s *Starlark) Start(_ telegraf.Accumulator) error { } func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { - s.args[0].(*Metric).Wrap(metric) + parameters, found := s.GetParameters("apply") + if !found { + return fmt.Errorf("The parameters of the apply function could not be found") + } + parameters[0].(*common.Metric).Wrap(metric) - rv, err := starlark.Call(s.thread, s.applyFunc, s.args, nil) + rv, err := s.Call("apply") if err != nil { - if err, ok := err.(*starlark.EvalError); ok { - for _, line := range strings.Split(err.Backtrace(), "\n") { - s.Log.Error(line) - } - } + s.LogError(err) metric.Reject() return err } @@ -164,7 +91,7 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { var v starlark.Value for iter.Next(&v) { switch v := v.(type) { - case *Metric: + case *common.Metric: m := v.Unwrap() if containsMetric(s.results, m) { s.Log.Errorf("Duplicate metric reference detected") @@ -188,7 +115,7 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { s.results[i] = nil } s.results = s.results[:0] - case *Metric: + case *common.Metric: m := rv.Unwrap() // If the script returned a different metric, mark this metric as @@ -209,17 +136,6 @@ func (s *Starlark) Stop() error { return nil } -// Add all the constants defined in the plugin as constants of the script -func (s *Starlark) addConstants(builtins *starlark.StringDict) { - for key, val := range s.Constants { - sVal, err := asStarlarkValue(val) - if err != nil { - s.Log.Errorf("Unsupported type: %T", val) - } - (*builtins)[key] = sVal - } -} - func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool { for _, m := range metrics { if m == metric { @@ -229,43 +145,12 @@ func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool { return false } -func init() { - // https://github.com/bazelbuild/starlark/issues/20 - resolve.AllowNestedDef = true - resolve.AllowLambda = true - resolve.AllowFloat = true - resolve.AllowSet = true - resolve.AllowGlobalReassign = true - resolve.AllowRecursion = true -} - func init() { processors.AddStreaming("starlark", func() telegraf.StreamingProcessor { return &Starlark{ - starlarkLoadFunc: loadFunc, + StarlarkCommon: common.StarlarkCommon{ + StarlarkLoadFunc: common.LoadFunc, + }, } }) } - -func loadFunc(module string, logger telegraf.Logger) (starlark.StringDict, error) { - switch module { - case "json.star": - return starlark.StringDict{ - "json": starlarkjson.Module, - }, nil - case "logging.star": - return starlark.StringDict{ - "log": LogModule(logger), - }, nil - case "math.star": - return starlark.StringDict{ - "math": math.Module, - }, nil - case "time.star": - return starlark.StringDict{ - "time": time.Module, - }, nil - default: - return nil, errors.New("module " + module + " is not available") - } -} diff --git a/plugins/processors/starlark/starlark_test.go b/plugins/processors/starlark/starlark_test.go index 6ad169bbf..3a1f955a8 100644 --- a/plugins/processors/starlark/starlark_test.go +++ b/plugins/processors/starlark/starlark_test.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + common "github.com/influxdata/telegraf/plugins/common/starlark" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" @@ -22,78 +23,63 @@ import ( // Tests for runtime errors in the processors Init function. func TestInitError(t *testing.T) { tests := []struct { - name string - plugin *Starlark + name string + constants map[string]interface{} + plugin *Starlark }{ { - name: "source must define apply", - plugin: &Starlark{ - Source: "", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "source must define apply", + plugin: newStarlarkFromSource(""), }, { name: "apply must be a function", - plugin: &Starlark{ - Source: ` + plugin: newStarlarkFromSource(` apply = 42 -`, - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, +`), }, { name: "apply function must take one arg", - plugin: &Starlark{ - Source: ` + plugin: newStarlarkFromSource(` def apply(): pass -`, - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, +`), }, { name: "package scope must have valid syntax", - plugin: &Starlark{ - Source: ` + plugin: newStarlarkFromSource(` for -`, - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, +`), }, { - name: "no source no script", - plugin: &Starlark{ - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "no source no script", + plugin: newStarlarkNoScript(), }, { name: "source and script", - plugin: &Starlark{ - Source: ` + plugin: newStarlarkFromSource(` def apply(): pass -`, - Script: "testdata/ratio.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, +`), }, { - name: "script file not found", - plugin: &Starlark{ - Script: "testdata/file_not_found.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, + name: "script file not found", + plugin: newStarlarkFromScript("testdata/file_not_found.star"), + }, + { + name: "source and script", + plugin: newStarlarkFromSource(` +def apply(metric): + metric.fields["p1"] = unsupported_type + return metric +`), + constants: map[string]interface{}{ + "unsupported_type": time.Now(), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + tt.plugin.Constants = tt.constants err := tt.plugin.Init() require.Error(t, err) }) @@ -227,11 +213,7 @@ def apply(metric): for _, tt := range applyTests { t.Run(tt.name, func(t *testing.T) { - plugin := &Starlark{ - Source: tt.source, - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - } + plugin := newStarlarkFromSource(tt.source) err := plugin.Init() require.NoError(t, err) @@ -2545,7 +2527,6 @@ def apply(metric): 2: "two", "3": "three", }, - "unsupported_type": time.Now(), }, input: []telegraf.Metric{ testutil.MustMetric("cpu", @@ -2575,12 +2556,8 @@ def apply(metric): for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - plugin := &Starlark{ - Source: tt.source, - Log: testutil.Logger{}, - Constants: tt.constants, - starlarkLoadFunc: testLoadFunc, - } + plugin := newStarlarkFromSource(tt.source) + plugin.Constants = tt.constants err := plugin.Init() require.NoError(t, err) @@ -2637,7 +2614,6 @@ def apply(metric): debug_mode = true supported_values = ["2", "3"] supported_entries = { "2" = "two", "3" = "three" } - unsupported_type = 2009-06-12 `, input: []telegraf.Metric{ testutil.MustMetric("cpu", @@ -2717,12 +2693,8 @@ func TestScript(t *testing.T) { expectedErrorStr string }{ { - name: "rename", - plugin: &Starlark{ - Script: "testdata/rename.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "rename", + plugin: newStarlarkFromScript("testdata/rename.star"), input: []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{ @@ -2745,12 +2717,8 @@ func TestScript(t *testing.T) { }, }, { - name: "drop fields by type", - plugin: &Starlark{ - Script: "testdata/drop_string_fields.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "drop fields by type", + plugin: newStarlarkFromScript("testdata/drop_string_fields.star"), input: []telegraf.Metric{ testutil.MustMetric("device", map[string]string{}, @@ -2777,12 +2745,8 @@ func TestScript(t *testing.T) { }, }, { - name: "drop fields with unexpected type", - plugin: &Starlark{ - Script: "testdata/drop_fields_with_unexpected_type.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "drop fields with unexpected type", + plugin: newStarlarkFromScript("testdata/drop_fields_with_unexpected_type.star"), input: []telegraf.Metric{ testutil.MustMetric("device", map[string]string{}, @@ -2812,12 +2776,8 @@ func TestScript(t *testing.T) { }, }, { - name: "scale", - plugin: &Starlark{ - Script: "testdata/scale.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "scale", + plugin: newStarlarkFromScript("testdata/scale.star"), input: []telegraf.Metric{ testutil.MustMetric("cpu", map[string]string{}, @@ -2834,12 +2794,8 @@ func TestScript(t *testing.T) { }, }, { - name: "ratio", - plugin: &Starlark{ - Script: "testdata/ratio.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "ratio", + plugin: newStarlarkFromScript("testdata/ratio.star"), input: []telegraf.Metric{ testutil.MustMetric("mem", map[string]string{}, @@ -2863,12 +2819,8 @@ func TestScript(t *testing.T) { }, }, { - name: "logging", - plugin: &Starlark{ - Script: "testdata/logging.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "logging", + plugin: newStarlarkFromScript("testdata/logging.star"), input: []telegraf.Metric{ testutil.MustMetric("log", map[string]string{}, @@ -2889,12 +2841,8 @@ func TestScript(t *testing.T) { }, }, { - name: "multiple_metrics", - plugin: &Starlark{ - Script: "testdata/multiple_metrics.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "multiple_metrics", + plugin: newStarlarkFromScript("testdata/multiple_metrics.star"), input: []telegraf.Metric{ testutil.MustMetric("mm", map[string]string{}, @@ -2922,12 +2870,8 @@ func TestScript(t *testing.T) { }, }, { - name: "multiple_metrics_with_json", - plugin: &Starlark{ - Script: "testdata/multiple_metrics_with_json.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "multiple_metrics_with_json", + plugin: newStarlarkFromScript("testdata/multiple_metrics_with_json.star"), input: []telegraf.Metric{ testutil.MustMetric("json", map[string]string{}, @@ -2955,12 +2899,8 @@ func TestScript(t *testing.T) { }, }, { - name: "fail", - plugin: &Starlark{ - Script: "testdata/fail.star", - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - }, + name: "fail", + plugin: newStarlarkFromScript("testdata/fail.star"), input: []telegraf.Metric{ testutil.MustMetric("fail", map[string]string{}, @@ -3246,11 +3186,7 @@ def apply(metric): for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - plugin := &Starlark{ - Source: tt.source, - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - } + plugin := newStarlarkFromSource(tt.source) err := plugin.Init() require.NoError(b, err) @@ -3292,11 +3228,7 @@ func TestAllScriptTestData(t *testing.T) { if expectedErrorStr == "" { outputMetrics = parseMetricsFrom(t, lines, "Example Output:") } - plugin := &Starlark{ - Script: fn, - Log: testutil.Logger{}, - starlarkLoadFunc: testLoadFunc, - } + plugin := newStarlarkFromScript(fn) require.NoError(t, plugin.Init()) acc := &testutil.Accumulator{} @@ -3370,7 +3302,7 @@ func parseErrorMessage(t *testing.T, lines []string, header string) string { } func testLoadFunc(module string, logger telegraf.Logger) (starlark.StringDict, error) { - result, err := loadFunc(module, logger) + result, err := common.LoadFunc(module, logger) if err != nil { return nil, err } @@ -3387,3 +3319,32 @@ func testLoadFunc(module string, logger telegraf.Logger) (starlark.StringDict, e func testNow(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { return starlarktime.Time(time.Date(2021, 4, 15, 12, 0, 0, 999, time.UTC)), nil } + +func newStarlarkFromSource(source string) *Starlark { + return &Starlark{ + StarlarkCommon: common.StarlarkCommon{ + StarlarkLoadFunc: testLoadFunc, + Log: testutil.Logger{}, + Source: source, + }, + } +} + +func newStarlarkFromScript(script string) *Starlark { + return &Starlark{ + StarlarkCommon: common.StarlarkCommon{ + StarlarkLoadFunc: testLoadFunc, + Log: testutil.Logger{}, + Script: script, + }, + } +} + +func newStarlarkNoScript() *Starlark { + return &Starlark{ + StarlarkCommon: common.StarlarkCommon{ + StarlarkLoadFunc: testLoadFunc, + Log: testutil.Logger{}, + }, + } +}