feat: Add support of aggregator as Starlark script (#9419)

This commit is contained in:
Nicolas Filotto 2021-11-18 23:37:59 +01:00 committed by GitHub
parent a439841015
commit 4f2ade5305
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1124 additions and 258 deletions

View File

@ -9,5 +9,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/aggregators/merge"
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
_ "github.com/influxdata/telegraf/plugins/aggregators/quantile"
_ "github.com/influxdata/telegraf/plugins/aggregators/starlark"
_ "github.com/influxdata/telegraf/plugins/aggregators/valuecounter"
)

View File

@ -0,0 +1,103 @@
# Starlark Aggregator
The `starlark` aggregator allows to implement a custom aggregator plugin with a Starlark script. The Starlark
script needs to be composed of the three methods defined in the Aggregator plugin interface which are `add`, `push` and `reset`.
The Starlark Aggregator plugin calls the Starlark function `add` to add the metrics to the aggregator, then calls the Starlark function `push` to push the resulting metrics into the accumulator and finally calls the Starlark function `reset` to reset the entire state of the plugin.
The Starlark functions can use the global function `state` to keep temporary the metrics to aggregate.
The Starlark language is a dialect of Python, and will be familiar to those who
have experience with the Python language. However, there are major [differences](#python-differences).
Existing Python code is unlikely to work unmodified. The execution environment
is sandboxed, and it is not possible to do I/O operations such as reading from
files or sockets.
The **[Starlark specification][]** has details about the syntax and available
functions.
## Configuration
```toml
[[aggregators.starlark]]
## The Starlark source can be set as a string in this configuration file, or
## by referencing a file containing the script. Only one source or script
## should be set at once.
##
## Source of the Starlark script.
source = '''
state = {}
def add(metric):
state["last"] = metric
def push():
return state.get("last")
def reset():
state.clear()
'''
## File containing a Starlark script.
# script = "/usr/local/bin/myscript.star"
## The constants of the Starlark script.
# [aggregators.starlark.constants]
# max_size = 10
# threshold = 0.75
# default_name = "Julia"
# debug_mode = true
```
## Usage
The Starlark code should contain a function called `add` that takes a metric as argument.
The function will be called with each metric to add, and doesn't return anything.
```python
def add(metric):
state["last"] = metric
```
The Starlark code should also contain a function called `push` that doesn't take any argument.
The function will be called to compute the aggregation, and returns the metrics to push to the accumulator.
```python
def push():
return state.get("last")
```
The Starlark code should also contain a function called `reset` that doesn't take any argument.
The function will be called to reset the plugin, and doesn't return anything.
```python
def push():
state.clear()
```
For a list of available types and functions that can be used in the code, see
the [Starlark specification][].
## Python Differences
Refer to the section [Python Differences](plugins/processors/starlark/README.md#python-differences) of the documentation about the Starlark processor.
## Libraries available
Refer to the section [Libraries available](plugins/processors/starlark/README.md#libraries-available) of the documentation about the Starlark processor.
## Common Questions
Refer to the section [Common Questions](plugins/processors/starlark/README.md#common-questions) of the documentation about the Starlark processor.
## Examples
- [minmax](/plugins/aggregators/starlark/testdata/min_max.star) - A minmax aggregator implemented with a Starlark script.
- [merge](/plugins/aggregators/starlark/testdata/merge.star) - A merge aggregator implemented with a Starlark script.
[All examples](/plugins/aggregators/starlark/testdata) are in the testdata folder.
Open a Pull Request to add any other useful Starlark examples.
[Starlark specification]: https://github.com/google/starlark-go/blob/master/doc/spec.md
[dict]: https://github.com/google/starlark-go/blob/master/doc/spec.md#dictionaries

View File

@ -0,0 +1,144 @@
package starlark //nolint - Needed to avoid getting import-shadowing: The name 'starlark' shadows an import name (revive)
import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
common "github.com/influxdata/telegraf/plugins/common/starlark"
"go.starlark.net/starlark"
)
const (
description = "Aggregate metrics using a Starlark script"
sampleConfig = `
## The Starlark source can be set as a string in this configuration file, or
## by referencing a file containing the script. Only one source or script
## should be set at once.
##
## Source of the Starlark script.
source = '''
state = {}
def add(metric):
state["last"] = metric
def push():
return state.get("last")
def reset():
state.clear()
'''
## File containing a Starlark script.
# script = "/usr/local/bin/myscript.star"
## The constants of the Starlark script.
# [aggregators.starlark.constants]
# max_size = 10
# threshold = 0.75
# default_name = "Julia"
# debug_mode = true
`
)
type Starlark struct {
common.StarlarkCommon
}
func (s *Starlark) Init() error {
// Execute source
err := s.StarlarkCommon.Init()
if err != nil {
return err
}
// The source should define an add function.
err = s.AddFunction("add", &common.Metric{})
if err != nil {
return err
}
// The source should define a push function.
err = s.AddFunction("push")
if err != nil {
return err
}
// The source should define a reset function.
err = s.AddFunction("reset")
if err != nil {
return err
}
return nil
}
func (s *Starlark) SampleConfig() string {
return sampleConfig
}
func (s *Starlark) Description() string {
return description
}
func (s *Starlark) Add(metric telegraf.Metric) {
parameters, found := s.GetParameters("add")
if !found {
s.Log.Errorf("The parameters of the add function could not be found")
return
}
parameters[0].(*common.Metric).Wrap(metric)
_, err := s.Call("add")
if err != nil {
s.LogError(err)
}
}
func (s *Starlark) Push(acc telegraf.Accumulator) {
rv, err := s.Call("push")
if err != nil {
s.LogError(err)
acc.AddError(err)
return
}
switch rv := rv.(type) {
case *starlark.List:
iter := rv.Iterate()
defer iter.Done()
var v starlark.Value
for iter.Next(&v) {
switch v := v.(type) {
case *common.Metric:
m := v.Unwrap()
acc.AddMetric(m)
default:
s.Log.Errorf("Invalid type returned in list: %s", v.Type())
}
}
case *common.Metric:
m := rv.Unwrap()
acc.AddMetric(m)
case starlark.NoneType:
default:
s.Log.Errorf("Invalid type returned: %T", rv)
}
}
func (s *Starlark) Reset() {
_, err := s.Call("reset")
if err != nil {
s.LogError(err)
}
}
// init initializes starlark aggregator plugin
func init() {
aggregators.Add("starlark", func() telegraf.Aggregator {
return &Starlark{
StarlarkCommon: common.StarlarkCommon{
StarlarkLoadFunc: common.LoadFunc,
},
}
})
}

View File

@ -0,0 +1,432 @@
package starlark
import (
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
common "github.com/influxdata/telegraf/plugins/common/starlark"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
var m1 = metric.New("m1",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": int64(1),
"b": int64(1),
"c": int64(1),
"d": int64(1),
"e": int64(1),
"f": int64(2),
"g": int64(2),
"h": int64(2),
"i": int64(2),
"j": int64(3),
},
time.Now(),
)
var m2 = metric.New("m1",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": int64(1),
"b": int64(3),
"c": int64(3),
"d": int64(3),
"e": int64(3),
"f": int64(1),
"g": int64(1),
"h": int64(1),
"i": int64(1),
"j": int64(1),
"k": int64(200),
"l": int64(200),
"ignoreme": "string",
"andme": true,
},
time.Now(),
)
func BenchmarkApply(b *testing.B) {
minmax, _ := newMinMax()
for n := 0; n < b.N; n++ {
minmax.Add(m1)
minmax.Add(m2)
}
}
// Test two metrics getting added.
func TestMinMaxWithPeriod(t *testing.T) {
acc := testutil.Accumulator{}
minmax, err := newMinMax()
require.NoError(t, err)
minmax.Add(m1)
minmax.Add(m2)
minmax.Push(&acc)
expectedFields := map[string]interface{}{
"a_max": int64(1),
"a_min": int64(1),
"b_max": int64(3),
"b_min": int64(1),
"c_max": int64(3),
"c_min": int64(1),
"d_max": int64(3),
"d_min": int64(1),
"e_max": int64(3),
"e_min": int64(1),
"f_max": int64(2),
"f_min": int64(1),
"g_max": int64(2),
"g_min": int64(1),
"h_max": int64(2),
"h_min": int64(1),
"i_max": int64(2),
"i_min": int64(1),
"j_max": int64(3),
"j_min": int64(1),
"k_max": int64(200),
"k_min": int64(200),
"l_max": int64(200),
"l_min": int64(200),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
// Test two metrics getting added with a push/reset in between (simulates
// getting added in different periods.)
func TestMinMaxDifferentPeriods(t *testing.T) {
acc := testutil.Accumulator{}
minmax, err := newMinMax()
require.NoError(t, err)
minmax.Add(m1)
minmax.Push(&acc)
expectedFields := map[string]interface{}{
"a_max": int64(1),
"a_min": int64(1),
"b_max": int64(1),
"b_min": int64(1),
"c_max": int64(1),
"c_min": int64(1),
"d_max": int64(1),
"d_min": int64(1),
"e_max": int64(1),
"e_min": int64(1),
"f_max": int64(2),
"f_min": int64(2),
"g_max": int64(2),
"g_min": int64(2),
"h_max": int64(2),
"h_min": int64(2),
"i_max": int64(2),
"i_min": int64(2),
"j_max": int64(3),
"j_min": int64(3),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
acc.ClearMetrics()
minmax.Reset()
minmax.Add(m2)
minmax.Push(&acc)
expectedFields = map[string]interface{}{
"a_max": int64(1),
"a_min": int64(1),
"b_max": int64(3),
"b_min": int64(3),
"c_max": int64(3),
"c_min": int64(3),
"d_max": int64(3),
"d_min": int64(3),
"e_max": int64(3),
"e_min": int64(3),
"f_max": int64(1),
"f_min": int64(1),
"g_max": int64(1),
"g_min": int64(1),
"h_max": int64(1),
"h_min": int64(1),
"i_max": int64(1),
"i_min": int64(1),
"j_max": int64(1),
"j_min": int64(1),
"k_max": int64(200),
"k_min": int64(200),
"l_max": int64(200),
"l_min": int64(200),
}
expectedTags = map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}
func newMinMax() (*Starlark, error) {
return newStarlarkFromScript("testdata/min_max.star")
}
func TestSimple(t *testing.T) {
plugin, err := newMerge()
require.NoError(t, err)
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
)
require.NoError(t, err)
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 0),
),
)
require.NoError(t, err)
var acc testutil.Accumulator
plugin.Push(&acc)
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
"time_guest": 42,
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestNanosecondPrecision(t *testing.T) {
plugin, err := newMerge()
require.NoError(t, err)
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 1),
),
)
require.NoError(t, err)
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 1),
),
)
require.NoError(t, err)
var acc testutil.Accumulator
acc.SetPrecision(time.Second)
plugin.Push(&acc)
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
"time_guest": 42,
},
time.Unix(0, 1),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func TestReset(t *testing.T) {
plugin, err := newMerge()
require.NoError(t, err)
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
)
require.NoError(t, err)
var acc testutil.Accumulator
plugin.Push(&acc)
plugin.Reset()
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 0),
),
)
require.NoError(t, err)
plugin.Push(&acc)
expected := []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_guest": 42,
},
time.Unix(0, 0),
),
}
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}
func newMerge() (*Starlark, error) {
return newStarlarkFromScript("testdata/merge.star")
}
func TestLastFromSource(t *testing.T) {
acc := testutil.Accumulator{}
plugin, err := newStarlarkFromSource(`
state = {}
def add(metric):
state["last"] = metric
def push():
return state.get("last")
def reset():
state.clear()
`)
require.NoError(t, err)
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu0",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
)
require.NoError(t, err)
plugin.Add(
testutil.MustMetric(
"cpu",
map[string]string{
"cpu": "cpu2",
},
map[string]interface{}{
"time_idle": 31,
},
time.Unix(0, 0),
),
)
require.NoError(t, err)
plugin.Push(&acc)
expectedFields := map[string]interface{}{
"time_idle": int64(31),
}
expectedTags := map[string]string{
"cpu": "cpu2",
}
acc.AssertContainsTaggedFields(t, "cpu", expectedFields, expectedTags)
plugin.Reset()
}
func newStarlarkFromSource(source string) (*Starlark, error) {
plugin := &Starlark{
StarlarkCommon: common.StarlarkCommon{
StarlarkLoadFunc: common.LoadFunc,
Log: testutil.Logger{},
Source: source,
},
}
err := plugin.Init()
if err != nil {
return nil, err
}
return plugin, nil
}
func newStarlarkFromScript(script string) (*Starlark, error) {
plugin := &Starlark{
StarlarkCommon: common.StarlarkCommon{
StarlarkLoadFunc: common.LoadFunc,
Log: testutil.Logger{},
Script: script,
},
}
err := plugin.Init()
if err != nil {
return nil, err
}
return plugin, nil
}

View File

@ -0,0 +1,31 @@
# Example of a merge aggregator implemented with a starlark script.
load('time.star', 'time')
state = {}
def add(metric):
metrics = state.get("metrics")
if metrics == None:
metrics = {}
state["metrics"] = metrics
state["ordered"] = []
gId = groupID(metric)
m = metrics.get(gId)
if m == None:
m = deepcopy(metric)
metrics[gId] = m
state["ordered"].append(m)
else:
for k, v in metric.fields.items():
m.fields[k] = v
def push():
return state.get("ordered")
def reset():
state.clear()
def groupID(metric):
key = metric.name + "-"
for k, v in metric.tags.items():
key = key + k + "-" + v + "-"
key = key + "-" + str(metric.time)
return hash(key)

View File

@ -0,0 +1,53 @@
# Example of a min_max aggregator implemented with a starlark script.
supported_types = (["int", "float"])
state = {}
def add(metric):
gId = groupID(metric)
aggregate = state.get(gId)
if aggregate == None:
aggregate = {
"name": metric.name,
"tags": metric.tags,
"fields": {}
}
for k, v in metric.fields.items():
if type(v) in supported_types:
aggregate["fields"][k] = {
"min": v,
"max": v,
}
state[gId] = aggregate
else:
for k, v in metric.fields.items():
if type(v) in supported_types:
min_max = aggregate["fields"].get(k)
if min_max == None:
aggregate["fields"][k] = {
"min": v,
"max": v,
}
elif v < min_max["min"]:
aggregate["fields"][k]["min"] = v
elif v > min_max["max"]:
aggregate["fields"][k]["max"] = v
def push():
metrics = []
for a in state:
fields = {}
for k in state[a]["fields"]:
fields[k + "_min"] = state[a]["fields"][k]["min"]
fields[k + "_max"] = state[a]["fields"][k]["max"]
m = Metric(state[a]["name"], state[a]["tags"], fields)
metrics.append(m)
return metrics
def reset():
state.clear()
def groupID(metric):
key = metric.name + "-"
for k, v in metric.tags.items():
key = key + k + "-" + v
return hash(key)

View File

@ -10,16 +10,42 @@ import (
)
func newMetric(_ *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var name starlark.String
if err := starlark.UnpackPositionalArgs("Metric", args, kwargs, 1, &name); err != nil {
var (
name starlark.String
tags, fields starlark.Value
)
if err := starlark.UnpackArgs("Metric", args, kwargs, "name", &name, "tags?", &tags, "fields?", &fields); err != nil {
return nil, err
}
m := metric.New(string(name), nil, nil, time.Now())
allFields, err := toFields(fields)
if err != nil {
return nil, err
}
allTags, err := toTags(tags)
if err != nil {
return nil, err
}
m := metric.New(string(name), allTags, allFields, time.Now())
return &Metric{metric: m}, nil
}
func toString(value starlark.Value, errorMsg string) (string, error) {
if value, ok := value.(starlark.String); ok {
return string(value), nil
}
return "", fmt.Errorf(errorMsg, value)
}
func items(value starlark.Value, errorMsg string) ([]starlark.Tuple, error) {
if iter, ok := value.(starlark.IterableMapping); ok {
return iter.Items(), nil
}
return nil, fmt.Errorf(errorMsg, value)
}
func deepcopy(_ *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var sm *Metric
if err := starlark.UnpackPositionalArgs("deepcopy", args, kwargs, 1, &sm); err != nil {

View File

@ -274,3 +274,27 @@ func asGoValue(value interface{}) (interface{}, error) {
return nil, errors.New("invalid starlark type")
}
// ToFields converts a starlark.Value to a map of values.
func toFields(value starlark.Value) (map[string]interface{}, error) {
if value == nil {
return nil, nil
}
items, err := items(value, "The type %T is unsupported as type of collection of fields")
if err != nil {
return nil, err
}
result := make(map[string]interface{}, len(items))
for _, item := range items {
key, err := toString(item[0], "The type %T is unsupported as type of key for fields")
if err != nil {
return nil, err
}
value, err := asGoValue(item[1])
if err != nil {
return nil, err
}
result[key] = value
}
return result, nil
}

View File

@ -0,0 +1,182 @@
package starlark //nolint - Needed to avoid getting import-shadowing: The name 'starlark' shadows an import name (revive)
import (
"errors"
"fmt"
"strings"
"github.com/influxdata/telegraf"
"go.starlark.net/lib/math"
"go.starlark.net/lib/time"
"go.starlark.net/resolve"
"go.starlark.net/starlark"
"go.starlark.net/starlarkjson"
)
type StarlarkCommon struct {
Source string `toml:"source"`
Script string `toml:"script"`
Constants map[string]interface{} `toml:"constants"`
Log telegraf.Logger `toml:"-"`
StarlarkLoadFunc func(module string, logger telegraf.Logger) (starlark.StringDict, error)
thread *starlark.Thread
globals starlark.StringDict
functions map[string]*starlark.Function
parameters map[string]starlark.Tuple
}
func (s *StarlarkCommon) Init() error {
if s.Source == "" && s.Script == "" {
return errors.New("one of source or script must be set")
}
if s.Source != "" && s.Script != "" {
return errors.New("both source or script cannot be set")
}
s.thread = &starlark.Thread{
Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) },
Load: func(thread *starlark.Thread, module string) (starlark.StringDict, error) {
return s.StarlarkLoadFunc(module, s.Log)
},
}
builtins := starlark.StringDict{}
builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric)
builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy)
builtins["catch"] = starlark.NewBuiltin("catch", catch)
err := s.addConstants(&builtins)
if err != nil {
return err
}
program, err := s.sourceProgram(builtins, "")
if err != nil {
return err
}
// Execute source
globals, err := program.Init(s.thread, builtins)
if err != nil {
return err
}
// Make available a shared state to the apply function
globals["state"] = starlark.NewDict(0)
// Freeze the global state. This prevents modifications to the processor
// state and prevents scripts from containing errors storing tracking
// metrics. Tasks that require global state will not be possible due to
// this, so maybe we should relax this in the future.
globals.Freeze()
s.globals = globals
s.functions = make(map[string]*starlark.Function)
s.parameters = make(map[string]starlark.Tuple)
return nil
}
func (s *StarlarkCommon) GetParameters(name string) (starlark.Tuple, bool) {
parameters, found := s.parameters[name]
return parameters, found
}
func (s *StarlarkCommon) AddFunction(name string, params ...starlark.Value) error {
globalFn, found := s.globals[name]
if !found {
return fmt.Errorf("%s is not defined", name)
}
fn, found := globalFn.(*starlark.Function)
if !found {
return fmt.Errorf("%s is not a function", name)
}
if fn.NumParams() != len(params) {
return fmt.Errorf("%s function must take %d parameter(s)", name, len(params))
}
p := make(starlark.Tuple, len(params))
for i, param := range params {
p[i] = param
}
s.functions[name] = fn
s.parameters[name] = params
return nil
}
// Add all the constants defined in the plugin as constants of the script
func (s *StarlarkCommon) addConstants(builtins *starlark.StringDict) error {
for key, val := range s.Constants {
sVal, err := asStarlarkValue(val)
if err != nil {
return fmt.Errorf("converting type %T failed: %v", val, err)
}
(*builtins)[key] = sVal
}
return nil
}
func (s *StarlarkCommon) sourceProgram(builtins starlark.StringDict, filename string) (*starlark.Program, error) {
var src interface{}
if s.Source != "" {
src = s.Source
}
_, program, err := starlark.SourceProgram(s.Script, src, builtins.Has)
return program, err
}
// Call calls the function corresponding to the given name.
func (s *StarlarkCommon) Call(name string) (starlark.Value, error) {
fn, ok := s.functions[name]
if !ok {
return nil, fmt.Errorf("function %q does not exist", name)
}
args, ok := s.parameters[name]
if !ok {
return nil, fmt.Errorf("params for function %q do not exist", name)
}
return starlark.Call(s.thread, fn, args, nil)
}
func (s *StarlarkCommon) LogError(err error) {
if err, ok := err.(*starlark.EvalError); ok {
for _, line := range strings.Split(err.Backtrace(), "\n") {
s.Log.Error(line)
}
} else {
s.Log.Error(err.Msg)
}
}
func LoadFunc(module string, logger telegraf.Logger) (starlark.StringDict, error) {
switch module {
case "json.star":
return starlark.StringDict{
"json": starlarkjson.Module,
}, nil
case "logging.star":
return starlark.StringDict{
"log": LogModule(logger),
}, nil
case "math.star":
return starlark.StringDict{
"math": math.Module,
}, nil
case "time.star":
return starlark.StringDict{
"time": time.Module,
}, nil
default:
return nil, errors.New("module " + module + " is not available")
}
}
func init() {
// https://github.com/bazelbuild/starlark/issues/20
resolve.AllowNestedDef = true
resolve.AllowLambda = true
resolve.AllowFloat = true
resolve.AllowSet = true
resolve.AllowGlobalReassign = true
resolve.AllowRecursion = true
}

View File

@ -196,3 +196,27 @@ func (i *TagIterator) Next(p *starlark.Value) bool {
func (i *TagIterator) Done() {
i.tagIterCount--
}
// ToTags converts a starlark.Value to a map of string.
func toTags(value starlark.Value) (map[string]string, error) {
if value == nil {
return nil, nil
}
items, err := items(value, "The type %T is unsupported as type of collection of tags")
if err != nil {
return nil, err
}
result := make(map[string]string, len(items))
for _, item := range items {
key, err := toString(item[0], "The type %T is unsupported as type of key for tags")
if err != nil {
return nil, err
}
value, err := toString(item[1], "The type %T is unsupported as type of value for tags")
if err != nil {
return nil, err
}
result[key] = value
}
return result, nil
}

View File

@ -1,17 +1,12 @@
package starlark
import (
"errors"
"fmt"
"strings"
"github.com/influxdata/telegraf"
common "github.com/influxdata/telegraf/plugins/common/starlark"
"github.com/influxdata/telegraf/plugins/processors"
"go.starlark.net/lib/math"
"go.starlark.net/lib/time"
"go.starlark.net/resolve"
"go.starlark.net/starlark"
"go.starlark.net/starlarkjson"
)
const (
@ -40,97 +35,29 @@ def apply(metric):
)
type Starlark struct {
Source string `toml:"source"`
Script string `toml:"script"`
Constants map[string]interface{} `toml:"constants"`
common.StarlarkCommon
Log telegraf.Logger `toml:"-"`
thread *starlark.Thread
applyFunc *starlark.Function
args starlark.Tuple
results []telegraf.Metric
starlarkLoadFunc func(module string, logger telegraf.Logger) (starlark.StringDict, error)
results []telegraf.Metric
}
func (s *Starlark) Init() error {
if s.Source == "" && s.Script == "" {
return errors.New("one of source or script must be set")
}
if s.Source != "" && s.Script != "" {
return errors.New("both source or script cannot be set")
}
s.thread = &starlark.Thread{
Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) },
Load: func(thread *starlark.Thread, module string) (starlark.StringDict, error) {
return s.starlarkLoadFunc(module, s.Log)
},
}
builtins := starlark.StringDict{}
builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric)
builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy)
builtins["catch"] = starlark.NewBuiltin("catch", catch)
s.addConstants(&builtins)
program, err := s.sourceProgram(builtins)
err := s.StarlarkCommon.Init()
if err != nil {
return err
}
// Execute source
globals, err := program.Init(s.thread, builtins)
if err != nil {
return err
}
// Make available a shared state to the apply function
globals["state"] = starlark.NewDict(0)
// Freeze the global state. This prevents modifications to the processor
// state and prevents scripts from containing errors storing tracking
// metrics. Tasks that require global state will not be possible due to
// this, so maybe we should relax this in the future.
globals.Freeze()
// The source should define an apply function.
apply := globals["apply"]
if apply == nil {
return errors.New("apply is not defined")
err = s.AddFunction("apply", &common.Metric{})
if err != nil {
return err
}
var ok bool
if s.applyFunc, ok = apply.(*starlark.Function); !ok {
return errors.New("apply is not a function")
}
if s.applyFunc.NumParams() != 1 {
return errors.New("apply function must take one parameter")
}
// Reusing the same metric wrapper to skip an allocation. This will cause
// any saved references to point to the new metric, but due to freezing the
// globals none should exist.
s.args = make(starlark.Tuple, 1)
s.args[0] = &Metric{}
// Preallocate a slice for return values.
s.results = make([]telegraf.Metric, 0, 10)
return nil
}
func (s *Starlark) sourceProgram(builtins starlark.StringDict) (*starlark.Program, error) {
if s.Source != "" {
_, program, err := starlark.SourceProgram("processor.starlark", s.Source, builtins.Has)
return program, err
}
_, program, err := starlark.SourceProgram(s.Script, nil, builtins.Has)
return program, err
}
func (s *Starlark) SampleConfig() string {
return sampleConfig
}
@ -144,15 +71,15 @@ func (s *Starlark) Start(_ telegraf.Accumulator) error {
}
func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
s.args[0].(*Metric).Wrap(metric)
parameters, found := s.GetParameters("apply")
if !found {
return fmt.Errorf("The parameters of the apply function could not be found")
}
parameters[0].(*common.Metric).Wrap(metric)
rv, err := starlark.Call(s.thread, s.applyFunc, s.args, nil)
rv, err := s.Call("apply")
if err != nil {
if err, ok := err.(*starlark.EvalError); ok {
for _, line := range strings.Split(err.Backtrace(), "\n") {
s.Log.Error(line)
}
}
s.LogError(err)
metric.Reject()
return err
}
@ -164,7 +91,7 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
var v starlark.Value
for iter.Next(&v) {
switch v := v.(type) {
case *Metric:
case *common.Metric:
m := v.Unwrap()
if containsMetric(s.results, m) {
s.Log.Errorf("Duplicate metric reference detected")
@ -188,7 +115,7 @@ func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
s.results[i] = nil
}
s.results = s.results[:0]
case *Metric:
case *common.Metric:
m := rv.Unwrap()
// If the script returned a different metric, mark this metric as
@ -209,17 +136,6 @@ func (s *Starlark) Stop() error {
return nil
}
// Add all the constants defined in the plugin as constants of the script
func (s *Starlark) addConstants(builtins *starlark.StringDict) {
for key, val := range s.Constants {
sVal, err := asStarlarkValue(val)
if err != nil {
s.Log.Errorf("Unsupported type: %T", val)
}
(*builtins)[key] = sVal
}
}
func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool {
for _, m := range metrics {
if m == metric {
@ -229,43 +145,12 @@ func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool {
return false
}
func init() {
// https://github.com/bazelbuild/starlark/issues/20
resolve.AllowNestedDef = true
resolve.AllowLambda = true
resolve.AllowFloat = true
resolve.AllowSet = true
resolve.AllowGlobalReassign = true
resolve.AllowRecursion = true
}
func init() {
processors.AddStreaming("starlark", func() telegraf.StreamingProcessor {
return &Starlark{
starlarkLoadFunc: loadFunc,
StarlarkCommon: common.StarlarkCommon{
StarlarkLoadFunc: common.LoadFunc,
},
}
})
}
func loadFunc(module string, logger telegraf.Logger) (starlark.StringDict, error) {
switch module {
case "json.star":
return starlark.StringDict{
"json": starlarkjson.Module,
}, nil
case "logging.star":
return starlark.StringDict{
"log": LogModule(logger),
}, nil
case "math.star":
return starlark.StringDict{
"math": math.Module,
}, nil
case "time.star":
return starlark.StringDict{
"time": time.Module,
}, nil
default:
return nil, errors.New("module " + module + " is not available")
}
}

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
common "github.com/influxdata/telegraf/plugins/common/starlark"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
@ -22,78 +23,63 @@ import (
// Tests for runtime errors in the processors Init function.
func TestInitError(t *testing.T) {
tests := []struct {
name string
plugin *Starlark
name string
constants map[string]interface{}
plugin *Starlark
}{
{
name: "source must define apply",
plugin: &Starlark{
Source: "",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "source must define apply",
plugin: newStarlarkFromSource(""),
},
{
name: "apply must be a function",
plugin: &Starlark{
Source: `
plugin: newStarlarkFromSource(`
apply = 42
`,
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
`),
},
{
name: "apply function must take one arg",
plugin: &Starlark{
Source: `
plugin: newStarlarkFromSource(`
def apply():
pass
`,
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
`),
},
{
name: "package scope must have valid syntax",
plugin: &Starlark{
Source: `
plugin: newStarlarkFromSource(`
for
`,
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
`),
},
{
name: "no source no script",
plugin: &Starlark{
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "no source no script",
plugin: newStarlarkNoScript(),
},
{
name: "source and script",
plugin: &Starlark{
Source: `
plugin: newStarlarkFromSource(`
def apply():
pass
`,
Script: "testdata/ratio.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
`),
},
{
name: "script file not found",
plugin: &Starlark{
Script: "testdata/file_not_found.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
name: "script file not found",
plugin: newStarlarkFromScript("testdata/file_not_found.star"),
},
{
name: "source and script",
plugin: newStarlarkFromSource(`
def apply(metric):
metric.fields["p1"] = unsupported_type
return metric
`),
constants: map[string]interface{}{
"unsupported_type": time.Now(),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.plugin.Constants = tt.constants
err := tt.plugin.Init()
require.Error(t, err)
})
@ -227,11 +213,7 @@ def apply(metric):
for _, tt := range applyTests {
t.Run(tt.name, func(t *testing.T) {
plugin := &Starlark{
Source: tt.source,
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}
plugin := newStarlarkFromSource(tt.source)
err := plugin.Init()
require.NoError(t, err)
@ -2545,7 +2527,6 @@ def apply(metric):
2: "two",
"3": "three",
},
"unsupported_type": time.Now(),
},
input: []telegraf.Metric{
testutil.MustMetric("cpu",
@ -2575,12 +2556,8 @@ def apply(metric):
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plugin := &Starlark{
Source: tt.source,
Log: testutil.Logger{},
Constants: tt.constants,
starlarkLoadFunc: testLoadFunc,
}
plugin := newStarlarkFromSource(tt.source)
plugin.Constants = tt.constants
err := plugin.Init()
require.NoError(t, err)
@ -2637,7 +2614,6 @@ def apply(metric):
debug_mode = true
supported_values = ["2", "3"]
supported_entries = { "2" = "two", "3" = "three" }
unsupported_type = 2009-06-12
`,
input: []telegraf.Metric{
testutil.MustMetric("cpu",
@ -2717,12 +2693,8 @@ func TestScript(t *testing.T) {
expectedErrorStr string
}{
{
name: "rename",
plugin: &Starlark{
Script: "testdata/rename.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "rename",
plugin: newStarlarkFromScript("testdata/rename.star"),
input: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
@ -2745,12 +2717,8 @@ func TestScript(t *testing.T) {
},
},
{
name: "drop fields by type",
plugin: &Starlark{
Script: "testdata/drop_string_fields.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "drop fields by type",
plugin: newStarlarkFromScript("testdata/drop_string_fields.star"),
input: []telegraf.Metric{
testutil.MustMetric("device",
map[string]string{},
@ -2777,12 +2745,8 @@ func TestScript(t *testing.T) {
},
},
{
name: "drop fields with unexpected type",
plugin: &Starlark{
Script: "testdata/drop_fields_with_unexpected_type.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "drop fields with unexpected type",
plugin: newStarlarkFromScript("testdata/drop_fields_with_unexpected_type.star"),
input: []telegraf.Metric{
testutil.MustMetric("device",
map[string]string{},
@ -2812,12 +2776,8 @@ func TestScript(t *testing.T) {
},
},
{
name: "scale",
plugin: &Starlark{
Script: "testdata/scale.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "scale",
plugin: newStarlarkFromScript("testdata/scale.star"),
input: []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{},
@ -2834,12 +2794,8 @@ func TestScript(t *testing.T) {
},
},
{
name: "ratio",
plugin: &Starlark{
Script: "testdata/ratio.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "ratio",
plugin: newStarlarkFromScript("testdata/ratio.star"),
input: []telegraf.Metric{
testutil.MustMetric("mem",
map[string]string{},
@ -2863,12 +2819,8 @@ func TestScript(t *testing.T) {
},
},
{
name: "logging",
plugin: &Starlark{
Script: "testdata/logging.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "logging",
plugin: newStarlarkFromScript("testdata/logging.star"),
input: []telegraf.Metric{
testutil.MustMetric("log",
map[string]string{},
@ -2889,12 +2841,8 @@ func TestScript(t *testing.T) {
},
},
{
name: "multiple_metrics",
plugin: &Starlark{
Script: "testdata/multiple_metrics.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "multiple_metrics",
plugin: newStarlarkFromScript("testdata/multiple_metrics.star"),
input: []telegraf.Metric{
testutil.MustMetric("mm",
map[string]string{},
@ -2922,12 +2870,8 @@ func TestScript(t *testing.T) {
},
},
{
name: "multiple_metrics_with_json",
plugin: &Starlark{
Script: "testdata/multiple_metrics_with_json.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "multiple_metrics_with_json",
plugin: newStarlarkFromScript("testdata/multiple_metrics_with_json.star"),
input: []telegraf.Metric{
testutil.MustMetric("json",
map[string]string{},
@ -2955,12 +2899,8 @@ func TestScript(t *testing.T) {
},
},
{
name: "fail",
plugin: &Starlark{
Script: "testdata/fail.star",
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
},
name: "fail",
plugin: newStarlarkFromScript("testdata/fail.star"),
input: []telegraf.Metric{
testutil.MustMetric("fail",
map[string]string{},
@ -3246,11 +3186,7 @@ def apply(metric):
for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) {
plugin := &Starlark{
Source: tt.source,
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}
plugin := newStarlarkFromSource(tt.source)
err := plugin.Init()
require.NoError(b, err)
@ -3292,11 +3228,7 @@ func TestAllScriptTestData(t *testing.T) {
if expectedErrorStr == "" {
outputMetrics = parseMetricsFrom(t, lines, "Example Output:")
}
plugin := &Starlark{
Script: fn,
Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}
plugin := newStarlarkFromScript(fn)
require.NoError(t, plugin.Init())
acc := &testutil.Accumulator{}
@ -3370,7 +3302,7 @@ func parseErrorMessage(t *testing.T, lines []string, header string) string {
}
func testLoadFunc(module string, logger telegraf.Logger) (starlark.StringDict, error) {
result, err := loadFunc(module, logger)
result, err := common.LoadFunc(module, logger)
if err != nil {
return nil, err
}
@ -3387,3 +3319,32 @@ func testLoadFunc(module string, logger telegraf.Logger) (starlark.StringDict, e
func testNow(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
return starlarktime.Time(time.Date(2021, 4, 15, 12, 0, 0, 999, time.UTC)), nil
}
func newStarlarkFromSource(source string) *Starlark {
return &Starlark{
StarlarkCommon: common.StarlarkCommon{
StarlarkLoadFunc: testLoadFunc,
Log: testutil.Logger{},
Source: source,
},
}
}
func newStarlarkFromScript(script string) *Starlark {
return &Starlark{
StarlarkCommon: common.StarlarkCommon{
StarlarkLoadFunc: testLoadFunc,
Log: testutil.Logger{},
Script: script,
},
}
}
func newStarlarkNoScript() *Starlark {
return &Starlark{
StarlarkCommon: common.StarlarkCommon{
StarlarkLoadFunc: testLoadFunc,
Log: testutil.Logger{},
},
}
}