diff --git a/agent/agent.go b/agent/agent.go index 72e906a59..a48ed590a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -508,7 +508,9 @@ func (a *Agent) runProcessors( acc := NewAccumulator(unit.processor, unit.dst) for m := range unit.src { - unit.processor.Add(m, acc) + if err := unit.processor.Add(m, acc); err != nil { + acc.AddError(err) + } } unit.processor.Stop() close(unit.dst) diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index a45b194b1..e3ec5c824 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -55,6 +55,7 @@ following works: - github.com/go-ole/go-ole [MIT License](https://github.com/go-ole/go-ole/blob/master/LICENSE) - github.com/go-redis/redis [BSD 2-Clause "Simplified" License](https://github.com/go-redis/redis/blob/master/LICENSE) - github.com/go-sql-driver/mysql [Mozilla Public License 2.0](https://github.com/go-sql-driver/mysql/blob/master/LICENSE) +- go.starlark.net [BSD 3-Clause "New" or "Revised" License](https://github.com/google/starlark-go/blob/master/LICENSE) - github.com/goburrow/modbus [BSD 3-Clause "New" or "Revised" License](https://github.com/goburrow/modbus/blob/master/LICENSE) - github.com/goburrow/serial [MIT License](https://github.com/goburrow/serial/LICENSE) - github.com/gobwas/glob [MIT License](https://github.com/gobwas/glob/blob/master/LICENSE) diff --git a/go.mod b/go.mod index ed4f292f4..e9c8d73c8 100644 --- a/go.mod +++ b/go.mod @@ -126,6 +126,7 @@ require ( github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 // indirect + go.starlark.net v0.0.0-20191227232015-caa3e9aa5008 golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect golang.org/x/net v0.0.0-20200301022130-244492dfa37a diff --git a/go.sum b/go.sum index af6ee7e77..f5542ade3 100644 --- a/go.sum +++ b/go.sum @@ -581,6 +581,8 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.starlark.net v0.0.0-20191227232015-caa3e9aa5008 h1:PUpdYMZifLwPlUnFfT/2Hkqr7p0SSpOR7xrDiPaD52k= +go.starlark.net v0.0.0-20191227232015-caa3e9aa5008/go.mod h1:nmDLcffg48OtT/PSW0Hg7FvpRQsQh5OSqIylirxKC7o= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -701,6 +703,8 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 h1:ng0gs1AKnRRuEMZoTLLlbOd+C17zUDepwGQBb/n+JVg= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191002063906-3421d5a6bb1c h1:Vco5b+cuG5NNfORVxZy6bYZQ7rsigisU1WQFkvQ0L5E= +golang.org/x/sys v0.0.0-20191002063906-3421d5a6bb1c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191003212358-c178f38b412c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/models/running_processor.go b/models/running_processor.go index 86b1887a1..40e573e70 100644 --- a/models/running_processor.go +++ b/models/running_processor.go @@ -78,21 +78,21 @@ func (r *RunningProcessor) Start(acc telegraf.Accumulator) error { return r.Processor.Start(acc) } -func (r *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) { +func (r *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error { if ok := r.Config.Filter.Select(m); !ok { // pass downstream acc.AddMetric(m) - return + return nil } r.Config.Filter.Modify(m) if len(m.FieldList()) == 0 { // drop metric r.metricFiltered(m) - return + return nil } - r.Processor.Add(m, acc) + return r.Processor.Add(m, acc) } func (r *RunningProcessor) Stop() { diff --git a/plugins/processors/all/all.go b/plugins/processors/all/all.go index dbf8a12e5..6dc2e2b0d 100644 --- a/plugins/processors/all/all.go +++ b/plugins/processors/all/all.go @@ -16,6 +16,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/processors/regex" _ "github.com/influxdata/telegraf/plugins/processors/rename" _ "github.com/influxdata/telegraf/plugins/processors/s2geo" + _ "github.com/influxdata/telegraf/plugins/processors/starlark" _ "github.com/influxdata/telegraf/plugins/processors/strings" _ "github.com/influxdata/telegraf/plugins/processors/tag_limit" _ "github.com/influxdata/telegraf/plugins/processors/template" diff --git a/plugins/processors/starlark/README.md b/plugins/processors/starlark/README.md new file mode 100644 index 000000000..2b25711e4 --- /dev/null +++ b/plugins/processors/starlark/README.md @@ -0,0 +1,150 @@ +# Starlark Processor + +The `starlark` processor calls a Starlark function for each matched metric, +allowing for custom programmatic metric processing. + +The Starlark language is a dialect of Python, and will be familiar to those who +have experience with the Python language. However, keep in mind that it is not +Python and that there are major syntax [differences][#Python Differences]. +Existing Python code is unlikely to work unmodified. The execution environment +is sandboxed, and it is not possible to do I/O operations such as reading from +files or sockets. + +The Starlark [specification][] has details about the syntax and available +functions. + +### Configuration + +```toml +[[processors.starlark]] + ## The Starlark source can be set as a string in this configuration file, or + ## by referencing a file containing the script. Only one source or script + ## should be set at once. + ## + ## Source of the Starlark script. + source = ''' +def apply(metric): + return metric +''' + + ## File containing a Starlark script. + # script = "/usr/local/bin/myscript.star" +``` + +### Usage + +The script should contain a function called `apply` that takes the metric as +its single argument. The function will be called with each metric, and can +return `None`, a single metric, or a list of metrics. +```python +def apply(metric): + return metric +``` + +Reference the Starlark [specification][] to see the list of available types and +functions that can be used in the script. In addition to these the following +types and functions are exposed to the script. + +**Metric(*name*)**: +Create a new metric with the given measurement name. The metric will have no +tags or fields and defaults to the current time. + +- **name**: +The name is a [string][string] containing the metric measurement name. + +- **tags**: +A [dict-like][dict] object containing the metric's tags. + + +- **fields**: +A [dict-like][dict] object containing the metric's fields. The values may be +of type int, float, string, or bool. + +- **time**: +The timestamp of the metric as an integer in nanoseconds since the Unix +epoch. + +**deepcopy(*metric*)**: Make a copy of an existing metric. + +### Python Differences + +While Starlark is similar to Python it is not the same. + +- Starlark has limited support for error handling and no exceptions. If an + error occurs the script will immediately end and Telegraf will drop the + metric. Check the Telegraf logfile for details about the error. + +- It is not possible to import other packages and the Python standard library + is not available. As such, it is not possible to open files or sockets. + +- These common keywords are **not supported** in the Starlark grammar: + ``` + as finally nonlocal + assert from raise + class global try + del import with + except is yield + ``` + +### Common Questions + +**How can I drop/delete a metric?** + +If you don't return the metric it will be deleted. Usually this means the +function should `return None`. + +**How should I make a copy of a metric?** + +Use `deepcopy(metric)` to create a copy of the metric. + +**How can I return multiple metrics?** + +You can return a list of metrics: +```python +def apply(metric): + m2 = deepcopy(metric) + return [metric, m2] +``` + +**What happens to a tracking metric if an error occurs in the script?** + +The metric is marked as undelivered. + +**How do I create a new metric?** + +Use the `Metric(name)` function and set at least one field. + +**What is the fastest way to iterate over tags/fields?** + +The fastest way to iterate is to use a for-loop on the tags or fields attribute: +```python +def apply(metric): + for k in metric.tags: + pass + return metric +``` + +When you use this form, it is not possible to modify the tags inside the loop, +if this is needed you should use the `.keys()`, `.values()`, or `.items()` forms: +```python +def apply(metric): + for k, v in metric.tags.items(): + pass + return metric +``` + +**How can I save values across multiple calls to the script?** + +Telegraf freezes the global scope, which prevents it from being modified. +Attempting to modify the global scope will fail with an error. + + +### Examples + +- [ratio](/plugins/processors/starlark/testdata/ratio.star) +- [rename](/plugins/processors/starlark/testdata/rename.star) +- [scale](/plugins/processors/starlark/testdata/scale.star) + +[specification]: https://github.com/google/starlark-go/blob/master/doc/spec.md +[string]: https://github.com/google/starlark-go/blob/master/doc/spec.md#strings +[dict]: https://github.com/google/starlark-go/blob/master/doc/spec.md#dictionaries diff --git a/plugins/processors/starlark/builtins.go b/plugins/processors/starlark/builtins.go new file mode 100644 index 000000000..4eda39b7d --- /dev/null +++ b/plugins/processors/starlark/builtins.go @@ -0,0 +1,261 @@ +package starlark + +import ( + "fmt" + "sort" + "time" + + "github.com/influxdata/telegraf/metric" + "go.starlark.net/starlark" +) + +func newMetric(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + var name starlark.String + if err := starlark.UnpackPositionalArgs("Metric", args, kwargs, 1, &name); err != nil { + return nil, err + } + + m, err := metric.New(string(name), nil, nil, time.Now()) + if err != nil { + return nil, err + } + + return &Metric{metric: m}, nil +} + +func deepcopy(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + var sm *Metric + if err := starlark.UnpackPositionalArgs("deepcopy", args, kwargs, 1, &sm); err != nil { + return nil, err + } + + dup := sm.metric.Copy() + dup.Drop() + return &Metric{metric: dup}, nil +} + +type builtinMethod func(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) + +func builtinAttr(recv starlark.Value, name string, methods map[string]builtinMethod) (starlark.Value, error) { + method := methods[name] + if method == nil { + return starlark.None, fmt.Errorf("no such method '%s'", name) + } + + // Allocate a closure over 'method'. + impl := func(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + return method(b, args, kwargs) + } + return starlark.NewBuiltin(name, impl).BindReceiver(recv), nil +} + +func builtinAttrNames(methods map[string]builtinMethod) []string { + names := make([]string, 0, len(methods)) + for name := range methods { + names = append(names, name) + } + sort.Strings(names) + return names +} + +// nameErr returns an error message of the form "name: msg" +// where name is b.Name() and msg is a string or error. +func nameErr(b *starlark.Builtin, msg interface{}) error { + return fmt.Errorf("%s: %v", b.Name(), msg) +} + +// --- dictionary methods --- + +// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·clear +func dict_clear(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } + + type HasClear interface { + Clear() error + } + return starlark.None, b.Receiver().(HasClear).Clear() +} + +// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·pop +func dict_pop(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + var k, d starlark.Value + if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 1, &k, &d); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } + + type HasDelete interface { + Delete(k starlark.Value) (starlark.Value, bool, error) + } + if v, found, err := b.Receiver().(HasDelete).Delete(k); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) // dict is frozen or key is unhashable + } else if found { + return v, nil + } else if d != nil { + return d, nil + } + return starlark.None, fmt.Errorf("%s: missing key", b.Name()) +} + +// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·popitem +func dict_popitem(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } + + type HasPopItem interface { + PopItem() (starlark.Value, error) + } + return b.Receiver().(HasPopItem).PopItem() +} + +// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·get +func dict_get(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + var key, dflt starlark.Value + if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 1, &key, &dflt); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } + if v, ok, err := b.Receiver().(starlark.Mapping).Get(key); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } else if ok { + return v, nil + } else if dflt != nil { + return dflt, nil + } + return starlark.None, nil +} + +// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·setdefault +func dict_setdefault(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + var key, dflt starlark.Value = nil, starlark.None + if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 1, &key, &dflt); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } + + recv := b.Receiver().(starlark.HasSetKey) + v, found, err := recv.Get(key) + if err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } + if !found { + v = dflt + if err := recv.SetKey(key, dflt); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } + } + return v, nil +} + +// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·update +func dict_update(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + // Unpack the arguments + if len(args) > 1 { + return nil, fmt.Errorf("update: got %d arguments, want at most 1", len(args)) + } + + // Get the target + dict := b.Receiver().(starlark.HasSetKey) + + if len(args) == 1 { + switch updates := args[0].(type) { + case starlark.IterableMapping: + // Iterate over dict's key/value pairs, not just keys. + for _, item := range updates.Items() { + if err := dict.SetKey(item[0], item[1]); err != nil { + return nil, err // dict is frozen + } + } + default: + // all other sequences + iter := starlark.Iterate(updates) + if iter == nil { + return nil, fmt.Errorf("got %s, want iterable", updates.Type()) + } + defer iter.Done() + var pair starlark.Value + for i := 0; iter.Next(&pair); i++ { + iter2 := starlark.Iterate(pair) + if iter2 == nil { + return nil, fmt.Errorf("dictionary update sequence element #%d is not iterable (%s)", i, pair.Type()) + + } + defer iter2.Done() + len := starlark.Len(pair) + if len < 0 { + return nil, fmt.Errorf("dictionary update sequence element #%d has unknown length (%s)", i, pair.Type()) + } else if len != 2 { + return nil, fmt.Errorf("dictionary update sequence element #%d has length %d, want 2", i, len) + } + var k, v starlark.Value + iter2.Next(&k) + iter2.Next(&v) + if err := dict.SetKey(k, v); err != nil { + return nil, err + } + } + } + } + + // Then add the kwargs. + before := starlark.Len(dict) + for _, pair := range kwargs { + if err := dict.SetKey(pair[0], pair[1]); err != nil { + return nil, err // dict is frozen + } + } + // In the common case, each kwarg will add another dict entry. + // If that's not so, check whether it is because there was a duplicate kwarg. + if starlark.Len(dict) < before+len(kwargs) { + keys := make(map[starlark.String]bool, len(kwargs)) + for _, kv := range kwargs { + k := kv[0].(starlark.String) + if keys[k] { + return nil, fmt.Errorf("duplicate keyword arg: %v", k) + } + keys[k] = true + } + } + + return starlark.None, nil +} + +// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·items +func dict_items(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } + items := b.Receiver().(starlark.IterableMapping).Items() + res := make([]starlark.Value, len(items)) + for i, item := range items { + res[i] = item // convert [2]starlark.Value to starlark.Value + } + return starlark.NewList(res), nil +} + +// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·keys +func dict_keys(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } + + items := b.Receiver().(starlark.IterableMapping).Items() + res := make([]starlark.Value, len(items)) + for i, item := range items { + res[i] = item[0] + } + return starlark.NewList(res), nil +} + +// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·update +func dict_values(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil { + return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) + } + items := b.Receiver().(starlark.IterableMapping).Items() + res := make([]starlark.Value, len(items)) + for i, item := range items { + res[i] = item[1] + } + return starlark.NewList(res), nil +} diff --git a/plugins/processors/starlark/field_dict.go b/plugins/processors/starlark/field_dict.go new file mode 100644 index 000000000..e0c0349b6 --- /dev/null +++ b/plugins/processors/starlark/field_dict.go @@ -0,0 +1,247 @@ +package starlark + +import ( + "errors" + "fmt" + "strings" + + "github.com/influxdata/telegraf" + "go.starlark.net/starlark" +) + +// FieldDict is a starlark.Value for the metric fields. It is heavily based on the +// starlark.Dict. +type FieldDict struct { + *Metric +} + +func (d FieldDict) String() string { + buf := new(strings.Builder) + buf.WriteString("{") + sep := "" + for _, item := range d.Items() { + k, v := item[0], item[1] + buf.WriteString(sep) + buf.WriteString(k.String()) + buf.WriteString(": ") + buf.WriteString(v.String()) + sep = ", " + } + buf.WriteString("}") + return buf.String() +} + +func (d FieldDict) Type() string { + return "Fields" +} + +func (d FieldDict) Freeze() { + d.frozen = true +} + +func (d FieldDict) Truth() starlark.Bool { + return len(d.metric.FieldList()) != 0 +} + +func (d FieldDict) Hash() (uint32, error) { + return 0, errors.New("not hashable") +} + +// AttrNames implements the starlark.HasAttrs interface. +func (d FieldDict) AttrNames() []string { + return builtinAttrNames(FieldDictMethods) +} + +// Attr implements the starlark.HasAttrs interface. +func (d FieldDict) Attr(name string) (starlark.Value, error) { + return builtinAttr(d, name, FieldDictMethods) +} + +var FieldDictMethods = map[string]builtinMethod{ + "clear": dict_clear, + "get": dict_get, + "items": dict_items, + "keys": dict_keys, + "pop": dict_pop, + "popitem": dict_popitem, + "setdefault": dict_setdefault, + "update": dict_update, + "values": dict_values, +} + +// Get implements the starlark.Mapping interface. +func (d FieldDict) Get(key starlark.Value) (v starlark.Value, found bool, err error) { + if k, ok := key.(starlark.String); ok { + gv, found := d.metric.GetField(k.GoString()) + if !found { + return starlark.None, false, nil + } + + v, err := asStarlarkValue(gv) + if err != nil { + return starlark.None, false, err + } + return v, true, nil + } + + return starlark.None, false, errors.New("key must be of type 'str'") +} + +// SetKey implements the starlark.HasSetKey interface to support map update +// using x[k]=v syntax, like a dictionary. +func (d FieldDict) SetKey(k, v starlark.Value) error { + if d.fieldIterCount > 0 { + return fmt.Errorf("cannot insert during iteration") + } + + key, ok := k.(starlark.String) + if !ok { + return errors.New("field key must be of type 'str'") + } + + gv, err := asGoValue(v) + if err != nil { + return err + } + + d.metric.AddField(key.GoString(), gv) + return nil +} + +// Items implements the starlark.IterableMapping interface. +func (d FieldDict) Items() []starlark.Tuple { + items := make([]starlark.Tuple, 0, len(d.metric.FieldList())) + for _, field := range d.metric.FieldList() { + key := starlark.String(field.Key) + sv, err := asStarlarkValue(field.Value) + if err != nil { + continue + } + pair := starlark.Tuple{key, sv} + items = append(items, pair) + } + return items +} + +func (d FieldDict) Clear() error { + if d.fieldIterCount > 0 { + return fmt.Errorf("cannot delete during iteration") + } + + keys := make([]string, 0, len(d.metric.FieldList())) + for _, field := range d.metric.FieldList() { + keys = append(keys, field.Key) + } + + for _, key := range keys { + d.metric.RemoveField(key) + } + return nil +} + +func (d FieldDict) PopItem() (v starlark.Value, err error) { + if d.fieldIterCount > 0 { + return nil, fmt.Errorf("cannot delete during iteration") + } + + for _, field := range d.metric.FieldList() { + k := field.Key + v := field.Value + + d.metric.RemoveField(k) + + sk := starlark.String(k) + sv, err := asStarlarkValue(v) + if err != nil { + return nil, fmt.Errorf("could not convert to starlark value") + } + + return starlark.Tuple{sk, sv}, nil + } + + return nil, errors.New("popitem(): field dictionary is empty") +} + +func (d FieldDict) Delete(k starlark.Value) (v starlark.Value, found bool, err error) { + if d.fieldIterCount > 0 { + return nil, false, fmt.Errorf("cannot delete during iteration") + } + + if key, ok := k.(starlark.String); ok { + value, ok := d.metric.GetField(key.GoString()) + if ok { + d.metric.RemoveField(key.GoString()) + sv, err := asStarlarkValue(value) + return sv, ok, err + } + } + + return starlark.None, false, errors.New("key must be of type 'str'") +} + +// Items implements the starlark.Mapping interface. +func (d FieldDict) Iterate() starlark.Iterator { + d.fieldIterCount++ + return &FieldIterator{Metric: d.Metric, fields: d.metric.FieldList()} +} + +type FieldIterator struct { + *Metric + fields []*telegraf.Field +} + +// Next implements the starlark.Iterator interface. +func (i *FieldIterator) Next(p *starlark.Value) bool { + if len(i.fields) == 0 { + return false + } + + field := i.fields[0] + i.fields = i.fields[1:] + *p = starlark.String(field.Key) + + return true +} + +// Done implements the starlark.Iterator interface. +func (i *FieldIterator) Done() { + i.fieldIterCount-- +} + +// AsStarlarkValue converts a field value to a starlark.Value. +func asStarlarkValue(value interface{}) (starlark.Value, error) { + switch v := value.(type) { + case float64: + return starlark.Float(v), nil + case int64: + return starlark.MakeInt64(v), nil + case uint64: + return starlark.MakeUint64(v), nil + case string: + return starlark.String(v), nil + case bool: + return starlark.Bool(v), nil + } + + return starlark.None, errors.New("invalid type") +} + +// AsGoValue converts a starlark.Value to a field value. +func asGoValue(value interface{}) (interface{}, error) { + switch v := value.(type) { + case starlark.Float: + return float64(v), nil + case starlark.Int: + n, ok := v.Int64() + if !ok { + return nil, errors.New("cannot represent integer as int64") + } + return n, nil + case starlark.String: + return string(v), nil + case starlark.Bool: + return bool(v), nil + } + + return nil, errors.New("invalid starlark type") +} diff --git a/plugins/processors/starlark/metric.go b/plugins/processors/starlark/metric.go new file mode 100644 index 000000000..031d24ad6 --- /dev/null +++ b/plugins/processors/starlark/metric.go @@ -0,0 +1,148 @@ +package starlark + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/influxdata/telegraf" + "go.starlark.net/starlark" +) + +type Metric struct { + metric telegraf.Metric + tagIterCount int + fieldIterCount int + frozen bool +} + +// Wrap updates the starlark.Metric to wrap a new telegraf.Metric. +func (m *Metric) Wrap(metric telegraf.Metric) { + m.metric = metric + m.tagIterCount = 0 + m.fieldIterCount = 0 + m.frozen = false +} + +// Unwrap removes the telegraf.Metric from the startlark.Metric. +func (m *Metric) Unwrap() telegraf.Metric { + return m.metric +} + +// String returns the starlark representation of the Metric. +// +// The String function is called by both the repr() and str() functions, and so +// it behaves more like the repr function would in Python. +func (m *Metric) String() string { + buf := new(strings.Builder) + buf.WriteString("Metric(") + buf.WriteString(m.Name().String()) + buf.WriteString(", tags=") + buf.WriteString(m.Tags().String()) + buf.WriteString(", fields=") + buf.WriteString(m.Fields().String()) + buf.WriteString(", time=") + buf.WriteString(m.Time().String()) + buf.WriteString(")") + return buf.String() +} + +func (m *Metric) Type() string { + return "Metric" +} + +func (m *Metric) Freeze() { + m.frozen = true +} + +func (m *Metric) Truth() starlark.Bool { + return true +} + +func (m *Metric) Hash() (uint32, error) { + return 0, errors.New("not hashable") +} + +// AttrNames implements the starlark.HasAttrs interface. +func (m *Metric) AttrNames() []string { + return []string{"name", "tags", "fields", "time"} +} + +// Attr implements the starlark.HasAttrs interface. +func (m *Metric) Attr(name string) (starlark.Value, error) { + switch name { + case "name": + return m.Name(), nil + case "tags": + return m.Tags(), nil + case "fields": + return m.Fields(), nil + case "time": + return m.Time(), nil + default: + // Returning nil, nil indicates "no such field or method" + return nil, nil + } +} + +// SetField implements the starlark.HasSetField interface. +func (m *Metric) SetField(name string, value starlark.Value) error { + if m.frozen { + return fmt.Errorf("cannot modify frozen metric") + } + + switch name { + case "name": + return m.SetName(value) + case "time": + return m.SetTime(value) + case "tags": + return errors.New("cannot set tags") + case "fields": + return errors.New("cannot set fields") + default: + return starlark.NoSuchAttrError( + fmt.Sprintf("cannot assign to field '%s'", name)) + } +} + +func (m *Metric) Name() starlark.String { + return starlark.String(m.metric.Name()) +} + +func (m *Metric) SetName(value starlark.Value) error { + if str, ok := value.(starlark.String); ok { + m.metric.SetName(str.GoString()) + return nil + } + + return errors.New("type error") +} + +func (m *Metric) Tags() TagDict { + return TagDict{m} +} + +func (m *Metric) Fields() FieldDict { + return FieldDict{m} +} + +func (m *Metric) Time() starlark.Int { + return starlark.MakeInt64(m.metric.Time().UnixNano()) +} + +func (m *Metric) SetTime(value starlark.Value) error { + switch v := value.(type) { + case starlark.Int: + ns, ok := v.Int64() + if !ok { + return errors.New("type error: unrepresentable time") + } + tm := time.Unix(0, ns) + m.metric.SetTime(tm) + return nil + default: + return errors.New("type error") + } +} diff --git a/plugins/processors/starlark/starlark.go b/plugins/processors/starlark/starlark.go new file mode 100644 index 000000000..e2002a146 --- /dev/null +++ b/plugins/processors/starlark/starlark.go @@ -0,0 +1,215 @@ +package starlark + +import ( + "errors" + "fmt" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" + "go.starlark.net/resolve" + "go.starlark.net/starlark" +) + +const ( + description = "Process metrics using a Starlark script" + sampleConfig = ` + ## The Starlark source can be set as a string in this configuration file, or + ## by referencing a file containing the script. Only one source or script + ## should be set at once. + ## + ## Source of the Starlark script. + source = ''' +def apply(metric): + return metric +''' + + ## File containing a Starlark script. + # script = "/usr/local/bin/myscript.star" +` +) + +type Starlark struct { + Source string `toml:"source"` + Script string `toml:"script"` + + Log telegraf.Logger `toml:"-"` + + thread *starlark.Thread + applyFunc *starlark.Function + args starlark.Tuple + results []telegraf.Metric +} + +func (s *Starlark) Init() error { + if s.Source == "" && s.Script == "" { + return errors.New("one of source or script must be set") + } + if s.Source != "" && s.Script != "" { + return errors.New("both source or script cannot be set") + } + + s.thread = &starlark.Thread{ + Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) }, + } + + builtins := starlark.StringDict{} + builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric) + builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy) + + program, err := s.sourceProgram(builtins) + if err != nil { + return err + } + + // Execute source + globals, err := program.Init(s.thread, builtins) + if err != nil { + return err + } + + // Freeze the global state. This prevents modifications to the processor + // state and prevents scripts from containing errors storing tracking + // metrics. Tasks that require global state will not be possible due to + // this, so maybe we should relax this in the future. + globals.Freeze() + + // The source should define an apply function. + apply := globals["apply"] + + if apply == nil { + return errors.New("apply is not defined") + } + + var ok bool + if s.applyFunc, ok = apply.(*starlark.Function); !ok { + return errors.New("apply is not a function") + } + + if s.applyFunc.NumParams() != 1 { + return errors.New("apply function must take one parameter") + } + + // Reusing the same metric wrapper to skip an allocation. This will cause + // any saved references to point to the new metric, but due to freezing the + // globals none should exist. + s.args = make(starlark.Tuple, 1) + s.args[0] = &Metric{} + + // Preallocate a slice for return values. + s.results = make([]telegraf.Metric, 0, 10) + + return nil +} + +func (s *Starlark) sourceProgram(builtins starlark.StringDict) (*starlark.Program, error) { + if s.Source != "" { + _, program, err := starlark.SourceProgram("processor.starlark", s.Source, builtins.Has) + return program, err + } + _, program, err := starlark.SourceProgram(s.Script, nil, builtins.Has) + return program, err +} + +func (s *Starlark) SampleConfig() string { + return sampleConfig +} + +func (s *Starlark) Description() string { + return description +} + +func (s *Starlark) Start(acc telegraf.Accumulator) error { + return nil +} + +func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { + s.args[0].(*Metric).Wrap(metric) + + rv, err := starlark.Call(s.thread, s.applyFunc, s.args, nil) + if err != nil { + if err, ok := err.(*starlark.EvalError); ok { + for _, line := range strings.Split(err.Backtrace(), "\n") { + s.Log.Error(line) + } + } + metric.Reject() + return err + } + + switch rv := rv.(type) { + case *starlark.List: + iter := rv.Iterate() + defer iter.Done() + var v starlark.Value + for iter.Next(&v) { + switch v := v.(type) { + case *Metric: + m := v.Unwrap() + if containsMetric(s.results, m) { + s.Log.Errorf("Duplicate metric reference detected") + continue + } + s.results = append(s.results, m) + acc.AddMetric(m) + default: + s.Log.Errorf("Invalid type returned in list: %s", v.Type()) + } + } + + // If the script didn't return the original metrics, mark it as + // successfully handled. + if !containsMetric(s.results, metric) { + metric.Accept() + } + + // clear results + for i := range s.results { + s.results[i] = nil + } + s.results = s.results[:0] + case *Metric: + m := rv.Unwrap() + + // If the script returned a different metric, mark this metric as + // successfully handled. + if m != metric { + metric.Accept() + } + acc.AddMetric(m) + case starlark.NoneType: + metric.Drop() + default: + return fmt.Errorf("Invalid type returned: %T", rv) + } + return nil +} + +func (s *Starlark) Stop() error { + return nil +} + +func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool { + for _, m := range metrics { + if m == metric { + return true + } + } + return false +} + +func init() { + // https://github.com/bazelbuild/starlark/issues/20 + resolve.AllowNestedDef = true + resolve.AllowLambda = true + resolve.AllowFloat = true + resolve.AllowSet = true + resolve.AllowGlobalReassign = true + resolve.AllowRecursion = true +} + +func init() { + processors.AddStreaming("starlark", func() telegraf.StreamingProcessor { + return &Starlark{} + }) +} diff --git a/plugins/processors/starlark/starlark_test.go b/plugins/processors/starlark/starlark_test.go new file mode 100644 index 000000000..1cdd10db0 --- /dev/null +++ b/plugins/processors/starlark/starlark_test.go @@ -0,0 +1,2789 @@ +package starlark + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +// Tests for runtime errors in the processors Init function. +func TestInitError(t *testing.T) { + tests := []struct { + name string + plugin *Starlark + }{ + { + name: "source must define apply", + plugin: &Starlark{ + Source: "", + Log: testutil.Logger{}, + }, + }, + { + name: "apply must be a function", + plugin: &Starlark{ + Source: ` +apply = 42 +`, + Log: testutil.Logger{}, + }, + }, + { + name: "apply function must take one arg", + plugin: &Starlark{ + Source: ` +def apply(): + pass +`, + Log: testutil.Logger{}, + }, + }, + { + name: "package scope must have valid syntax", + plugin: &Starlark{ + Source: ` +for +`, + Log: testutil.Logger{}, + }, + }, + { + name: "no source no script", + plugin: &Starlark{ + Log: testutil.Logger{}, + }, + }, + { + name: "source and script", + plugin: &Starlark{ + Source: ` +def apply(): + pass +`, + Script: "testdata/ratio.star", + Log: testutil.Logger{}, + }, + }, + { + name: "script file not found", + plugin: &Starlark{ + Script: "testdata/file_not_found.star", + Log: testutil.Logger{}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.plugin.Init() + require.Error(t, err) + }) + } +} + +func TestApply(t *testing.T) { + // Tests for the behavior of the processors Apply function. + var applyTests = []struct { + name string + source string + input []telegraf.Metric + expected []telegraf.Metric + expectedErrorStr string + }{ + { + name: "drop metric", + source: ` +def apply(metric): + return None +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + }, + { + name: "passthrough", + source: ` +def apply(metric): + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "read value from global scope", + source: ` +names = { + 'cpu': 'cpu2', + 'mem': 'mem2', +} + +def apply(metric): + metric.name = names[metric.name] + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu2", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "cannot write to frozen global scope", + source: ` +cache = [] + +def apply(metric): + cache.append(deepcopy(metric)) + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 1.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "append: cannot append to frozen list", + }, + { + name: "cannot return multiple references to same metric", + source: ` +def apply(metric): + # Should be return [metric, deepcopy(metric)] + return [metric, metric] +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + } + + for _, tt := range applyTests { + t.Run(tt.name, func(t *testing.T) { + plugin := &Starlark{ + Source: tt.source, + Log: testutil.Logger{}, + } + err := plugin.Init() + require.NoError(t, err) + + var acc testutil.Accumulator + + err = plugin.Start(&acc) + require.NoError(t, err) + + for _, m := range tt.input { + err = plugin.Add(m, &acc) + if tt.expectedErrorStr != "" { + require.EqualError(t, err, tt.expectedErrorStr) + } else { + require.NoError(t, err) + } + } + + err = plugin.Stop() + require.NoError(t, err) + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics()) + }) + } +} + +// Tests for the behavior of the Metric type. +func TestMetric(t *testing.T) { + var tests = []struct { + name string + source string + input []telegraf.Metric + expected []telegraf.Metric + expectedErrorStr string + }{ + { + name: "create new metric", + source: ` +def apply(metric): + m = Metric('cpu') + m.fields['time_guest'] = 2.0 + m.time = 0 + return m +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_guest": 2.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "deepcopy", + source: ` +def apply(metric): + return [metric, deepcopy(metric)] +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "set name", + source: ` +def apply(metric): + metric.name = "howdy" + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("howdy", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "set name wrong type", + source: ` +def apply(metric): + metric.name = 42 + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "type error", + }, + { + name: "get name", + source: ` +def apply(metric): + metric.tags['measurement'] = metric.name + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "measurement": "cpu", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "getattr tags", + source: ` +def apply(metric): + metric.tags + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "setattr tags is not allowed", + source: ` +def apply(metric): + metric.tags = {} + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "cannot set tags", + }, + { + name: "empty tags are false", + source: ` +def apply(metric): + if not metric.tags: + return metric + return None +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "non-empty tags are true", + source: ` +def apply(metric): + if metric.tags: + return metric + return None +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "tags in operator", + source: ` +def apply(metric): + if 'host' not in metric.tags: + return + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "lookup tag", + source: ` +def apply(metric): + metric.tags['result'] = metric.tags['host'] + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "result": "example.org", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "lookup tag not set", + source: ` +def apply(metric): + metric.tags['foo'] + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: `key "foo" not in Tags`, + }, + { + name: "get tag", + source: ` +def apply(metric): + metric.tags['result'] = metric.tags.get('host') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "result": "example.org", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "get tag default", + source: ` +def apply(metric): + metric.tags['result'] = metric.tags.get('foo', 'example.org') + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "result": "example.org", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "get tag not set returns none", + source: ` +def apply(metric): + if metric.tags.get('foo') != None: + return + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "set tag", + source: ` +def apply(metric): + metric.tags['host'] = 'example.org' + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "set tag type error", + source: ` +def apply(metric): + metric.tags['host'] = 42 + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "tag value must be of type 'str'", + }, + { + name: "pop tag", + source: ` +def apply(metric): + metric.tags['host2'] = metric.tags.pop('host') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host2": "example.org", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "popitem tags", + source: ` +def apply(metric): + metric.tags['result'] = '='.join(metric.tags.popitem()) + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "result": "host=example.org", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "popitem tags empty dict", + source: ` +def apply(metric): + metric.tags.popitem() + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "popitem(): tag dictionary is empty", + }, + { + name: "tags setdefault key not set", + source: ` +def apply(metric): + metric.tags.setdefault('a', 'b') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "tags setdefault key already set", + source: ` +def apply(metric): + metric.tags.setdefault('a', 'c') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "tags update list of tuple", + source: ` +def apply(metric): + metric.tags.update([('b', 'y'), ('c', 'z')]) + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "x", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "x", + "b": "y", + "c": "z", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "tags update kwargs", + source: ` +def apply(metric): + metric.tags.update(b='y', c='z') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "x", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "x", + "b": "y", + "c": "z", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "tags update dict", + source: ` +def apply(metric): + metric.tags.update({'b': 'y', 'c': 'z'}) + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "x", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "x", + "b": "y", + "c": "z", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "tags update list tuple and kwargs", + source: ` +def apply(metric): + metric.tags.update([('b', 'y'), ('c', 'z')], d='zz') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "x", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "x", + "b": "y", + "c": "z", + "d": "zz", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate tags", + source: ` +def apply(metric): + for k in metric.tags: + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + "foo": "bar", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + "foo": "bar", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate tags and copy to fields", + source: ` +def apply(metric): + for k in metric.tags: + metric.fields[k] = k + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{ + "host": "host", + "cpu": "cpu", + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate tag keys", + source: ` +def apply(metric): + for k in metric.tags.keys(): + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + "foo": "bar", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + "foo": "bar", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate tag keys and copy to fields", + source: ` +def apply(metric): + for k in metric.tags.keys(): + metric.fields[k] = k + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{ + "host": "host", + "cpu": "cpu", + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate tag items", + source: ` +def apply(metric): + for k, v in metric.tags.items(): + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate tag items and copy to fields", + source: ` +def apply(metric): + for k, v in metric.tags.items(): + metric.fields[k] = v + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + "host": "example.org", + "cpu": "cpu0", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate tag values", + source: ` +def apply(metric): + for v in metric.tags.values(): + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate tag values and copy to fields", + source: ` +def apply(metric): + for v in metric.tags.values(): + metric.fields[v] = v + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + "cpu": "cpu0", + }, + map[string]interface{}{ + "time_idle": 42, + "example.org": "example.org", + "cpu0": "cpu0", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "clear tags", + source: ` +def apply(metric): + metric.tags.clear() + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + "c": "d", + "e": "f", + "g": "h", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "tags cannot pop while iterating", + source: ` +def apply(metric): + for k in metric.tags: + metric.tags.pop(k) + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + "c": "d", + "e": "f", + "g": "h", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "pop: cannot delete during iteration", + }, + { + name: "tags cannot popitem while iterating", + source: ` +def apply(metric): + for k in metric.tags: + metric.tags.popitem() + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + "c": "d", + "e": "f", + "g": "h", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "cannot delete during iteration", + }, + { + name: "tags cannot clear while iterating", + source: ` +def apply(metric): + for k in metric.tags: + metric.tags.clear() + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + "c": "d", + "e": "f", + "g": "h", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "cannot delete during iteration", + }, + { + name: "tags cannot insert while iterating", + source: ` +def apply(metric): + for k in metric.tags: + metric.tags['i'] = 'j' + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + "c": "d", + "e": "f", + "g": "h", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "cannot insert during iteration", + }, + { + name: "tags can be cleared after iterating", + source: ` +def apply(metric): + for k in metric.tags: + pass + metric.tags.clear() + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + }, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "getattr fields", + source: ` +def apply(metric): + metric.fields + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "setattr fields is not allowed", + source: ` +def apply(metric): + metric.fields = {} + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "cannot set fields", + }, + { + name: "empty fields are false", + source: ` +def apply(metric): + if not metric.fields: + metric.fields["time_idle"] = 42 + return metric + return None +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "non-empty fields are true", + source: ` +def apply(metric): + if metric.fields: + return metric + return None +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "fields in operator", + source: ` +def apply(metric): + if 'time_idle' not in metric.fields: + return + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "lookup string field", + source: ` +def apply(metric): + value = metric.fields['value'] + if value != "xyzzy" and type(value) != "str": + return + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": "xyzzy"}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": "xyzzy"}, + time.Unix(0, 0), + ), + }, + }, + { + name: "lookup integer field", + source: ` +def apply(metric): + value = metric.fields['value'] + if value != 42 and type(value) != "int": + return + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "lookup unsigned field", + source: ` +def apply(metric): + value = metric.fields['value'] + if value != 42 and type(value) != "int": + return + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": uint64(42)}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": uint64(42)}, + time.Unix(0, 0), + ), + }, + }, + { + name: "lookup bool field", + source: ` +def apply(metric): + value = metric.fields['value'] + if value != True and type(value) != "bool": + return + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": true}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": true}, + time.Unix(0, 0), + ), + }, + }, + { + name: "lookup float field", + source: ` +def apply(metric): + value = metric.fields['value'] + if value != 42.0 and type(value) != "float": + return + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"value": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "lookup field not set", + source: ` +def apply(metric): + metric.fields['foo'] + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: `key "foo" not in Fields`, + }, + { + name: "get field", + source: ` +def apply(metric): + metric.fields['result'] = metric.fields.get('time_idle') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + "result": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "get field default", + source: ` +def apply(metric): + metric.fields['result'] = metric.fields.get('foo', 'example.org') + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + "result": "example.org", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "get field not set returns none", + source: ` +def apply(metric): + if metric.fields.get('foo') != None: + return + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "set string field", + source: ` +def apply(metric): + metric.fields['host'] = 'example.org' + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "host": "example.org", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "set integer field", + source: ` +def apply(metric): + metric.fields['time_idle'] = 42 + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "set float field", + source: ` +def apply(metric): + metric.fields['time_idle'] = 42.0 + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "set bool field", + source: ` +def apply(metric): + metric.fields['time_idle'] = True + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": true, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "set field type error", + source: ` +def apply(metric): + metric.fields['time_idle'] = {} + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "invalid starlark type", + }, + { + name: "pop field", + source: ` +def apply(metric): + time_idle = metric.fields.pop('time_idle') + if time_idle != 0: + return + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 0, + "time_guest": 0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_guest": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "popitem field", + source: ` +def apply(metric): + item = metric.fields.popitem() + if item != ("time_idle", 0): + return + metric.fields['time_guest'] = 0 + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_guest": 0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "popitem fields empty dict", + source: ` +def apply(metric): + metric.fields.popitem() + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "popitem(): field dictionary is empty", + }, + { + name: "fields setdefault key not set", + source: ` +def apply(metric): + metric.fields.setdefault('a', 'b') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"a": "b"}, + time.Unix(0, 0), + ), + }, + }, + { + name: "fields setdefault key already set", + source: ` +def apply(metric): + metric.fields.setdefault('a', 'c') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"a": "b"}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"a": "b"}, + time.Unix(0, 0), + ), + }, + }, + { + name: "fields update list of tuple", + source: ` +def apply(metric): + metric.fields.update([('a', 'b'), ('c', 'd')]) + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "a": "b", + "c": "d", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "fields update kwargs", + source: ` +def apply(metric): + metric.fields.update(a='b', c='d') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "a": "b", + "c": "d", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "fields update dict", + source: ` +def apply(metric): + metric.fields.update({'a': 'b', 'c': 'd'}) + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "a": "b", + "c": "d", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "fields update list tuple and kwargs", + source: ` +def apply(metric): + metric.fields.update([('a', 'b'), ('c', 'd')], e='f') + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "a": "b", + "c": "d", + "e": "f", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate fields", + source: ` +def apply(metric): + for k in metric.fields: + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_guest": 1.0, + "time_idle": 2.0, + "time_system": 3.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_guest": 1.0, + "time_idle": 2.0, + "time_system": 3.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate field keys", + source: ` +def apply(metric): + for k in metric.fields.keys(): + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_guest": 1.0, + "time_idle": 2.0, + "time_system": 3.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_guest": 1.0, + "time_idle": 2.0, + "time_system": 3.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate field keys and copy to tags", + source: ` +def apply(metric): + for k in metric.fields.keys(): + metric.tags[k] = k + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_guest": 1.0, + "time_idle": 2.0, + "time_system": 3.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "time_guest": "time_guest", + "time_idle": "time_idle", + "time_system": "time_system", + }, + map[string]interface{}{ + "time_guest": 1.0, + "time_idle": 2.0, + "time_system": 3.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate field items", + source: ` +def apply(metric): + for k, v in metric.fields.items(): + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_guest": 1.0, + "time_idle": 2.0, + "time_system": 3.0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_guest": 1.0, + "time_idle": 2.0, + "time_system": 3.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate field items and copy to tags", + source: ` +def apply(metric): + for k, v in metric.fields.items(): + metric.tags[k] = str(v) + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_guest": 1.1, + "time_idle": 2.1, + "time_system": 3.1, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "time_guest": "1.1", + "time_idle": "2.1", + "time_system": "3.1", + }, + map[string]interface{}{ + "time_guest": 1.1, + "time_idle": 2.1, + "time_system": 3.1, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate field values", + source: ` +def apply(metric): + for v in metric.fields.values(): + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "a": "b", + "c": "d", + "e": "f", + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "a": "b", + "c": "d", + "e": "f", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate field values and copy to tags", + source: ` +def apply(metric): + for v in metric.fields.values(): + metric.tags[str(v)] = str(v) + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "a": "b", + "c": "d", + "e": "f", + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "b": "b", + "d": "d", + "f": "f", + }, + map[string]interface{}{ + "a": "b", + "c": "d", + "e": "f", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "clear fields", + source: ` +def apply(metric): + metric.fields.clear() + metric.fields['notempty'] = 0 + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 0, + "time_guest": 0, + "time_system": 0, + "time_user": 0, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "notempty": 0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "fields cannot pop while iterating", + source: ` +def apply(metric): + for k in metric.fields: + metric.fields.pop(k) + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "pop: cannot delete during iteration", + }, + { + name: "fields cannot popitem while iterating", + source: ` +def apply(metric): + for k in metric.fields: + metric.fields.popitem() + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "cannot delete during iteration", + }, + { + name: "fields cannot clear while iterating", + source: ` +def apply(metric): + for k in metric.fields: + metric.fields.clear() + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "cannot delete during iteration", + }, + { + name: "fields cannot insert while iterating", + source: ` +def apply(metric): + for k in metric.fields: + metric.fields['time_guest'] = 0 + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "cannot insert during iteration", + }, + { + name: "fields can be cleared after iterating", + source: ` +def apply(metric): + for k in metric.fields: + pass + metric.fields.clear() + metric.fields['notempty'] = 0 + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "notempty": 0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "set time", + source: ` +def apply(metric): + metric.time = 42 + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0).UTC(), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 42).UTC(), + ), + }, + }, + { + name: "set time wrong type", + source: ` +def apply(metric): + metric.time = 'howdy' + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0).UTC(), + ), + }, + expected: []telegraf.Metric{}, + expectedErrorStr: "type error", + }, + { + name: "get time", + source: ` +def apply(metric): + metric.time -= metric.time % 100000000 + return metric + `, + input: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(42, 11).UTC(), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(42, 0).UTC(), + ), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &Starlark{ + Source: tt.source, + Log: testutil.Logger{}, + } + err := plugin.Init() + require.NoError(t, err) + + var acc testutil.Accumulator + + err = plugin.Start(&acc) + require.NoError(t, err) + + for _, m := range tt.input { + err = plugin.Add(m, &acc) + if tt.expectedErrorStr != "" { + require.EqualError(t, err, tt.expectedErrorStr) + } else { + require.NoError(t, err) + } + } + + err = plugin.Stop() + require.NoError(t, err) + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics()) + }) + } +} + +func TestScript(t *testing.T) { + var tests = []struct { + name string + plugin *Starlark + input []telegraf.Metric + expected []telegraf.Metric + expectedErrorStr string + }{ + { + name: "rename", + plugin: &Starlark{ + Script: "testdata/rename.star", + Log: testutil.Logger{}, + }, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "lower": "0", + "upper": "10", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "min": "0", + "max": "10", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "scale", + plugin: &Starlark{ + Script: "testdata/scale.star", + Log: testutil.Logger{}, + }, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 10.0}, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 100.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "ratio", + plugin: &Starlark{ + Script: "testdata/ratio.star", + Log: testutil.Logger{}, + }, + input: []telegraf.Metric{ + testutil.MustMetric("mem", + map[string]string{}, + map[string]interface{}{ + "used": 2, + "total": 10, + }, + time.Unix(0, 0), + ), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("mem", + map[string]string{}, + map[string]interface{}{ + "used": 2, + "total": 10, + "usage": 20.0, + }, + time.Unix(0, 0), + ), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.plugin.Init() + require.NoError(t, err) + + var acc testutil.Accumulator + + err = tt.plugin.Start(&acc) + require.NoError(t, err) + + for _, m := range tt.input { + err = tt.plugin.Add(m, &acc) + if tt.expectedErrorStr != "" { + require.EqualError(t, err, tt.expectedErrorStr) + } else { + require.NoError(t, err) + } + } + + err = tt.plugin.Stop() + require.NoError(t, err) + + testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics()) + }) + } +} + +// Benchmarks modify the metric in place, so the scripts shouldn't modify the +// metric. +func Benchmark(b *testing.B) { + var tests = []struct { + name string + source string + input []telegraf.Metric + }{ + { + name: "passthrough", + source: ` +def apply(metric): + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "create new metric", + source: ` +def apply(metric): + m = Metric('cpu') + m.fields['time_guest'] = 2.0 + m.time = 0 + return m +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "set name", + source: ` +def apply(metric): + metric.name = "cpu" + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "set tag", + source: ` +def apply(metric): + metric.tags['host'] = 'example.org' + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "host": "example.org", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "tag in operator", + source: ` +def apply(metric): + if 'c' in metric.tags: + return metric + return None +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + "c": "d", + "e": "f", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate tags", + source: ` +def apply(metric): + for k in metric.tags: + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + "c": "d", + "e": "f", + "g": "h", + }, + map[string]interface{}{"time_idle": 42.0}, + time.Unix(0, 0), + ), + }, + }, + { + // This should be faster than calling items() + name: "iterate tags and get values", + source: ` +def apply(metric): + for k in metric.tags: + v = metric.tags[k] + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + "c": "d", + "e": "f", + "g": "h", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate tag items", + source: ` +def apply(metric): + for k, v in metric.tags.items(): + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{ + "a": "b", + "c": "d", + "e": "f", + "g": "h", + }, + map[string]interface{}{"time_idle": 42}, + time.Unix(0, 0), + ), + }, + }, + { + name: "set string field", + source: ` +def apply(metric): + metric.fields['host'] = 'example.org' + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "host": "example.org", + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate fields", + source: ` +def apply(metric): + for k in metric.fields: + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + "time_user": 42.0, + "time_guest": 42.0, + "time_system": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + // This should be faster than calling items() + name: "iterate fields and get values", + source: ` +def apply(metric): + for k in metric.fields: + v = metric.fields[k] + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "time_idle": 42.0, + "time_user": 42.0, + "time_guest": 42.0, + "time_system": 42.0, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "iterate field items", + source: ` +def apply(metric): + for k, v in metric.fields.items(): + pass + return metric +`, + input: []telegraf.Metric{ + testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "a": "b", + "c": "d", + "e": "f", + "g": "h", + }, + time.Unix(0, 0), + ), + }, + }, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + plugin := &Starlark{ + Source: tt.source, + Log: testutil.Logger{}, + } + + err := plugin.Init() + require.NoError(b, err) + + var acc testutil.NopAccumulator + + err = plugin.Start(&acc) + require.NoError(b, err) + + b.ResetTimer() + for n := 0; n < b.N; n++ { + for _, m := range tt.input { + plugin.Add(m, &acc) + } + } + + err = plugin.Stop() + require.NoError(b, err) + }) + } +} diff --git a/plugins/processors/starlark/tag_dict.go b/plugins/processors/starlark/tag_dict.go new file mode 100644 index 000000000..3d9526438 --- /dev/null +++ b/plugins/processors/starlark/tag_dict.go @@ -0,0 +1,197 @@ +package starlark + +import ( + "errors" + "fmt" + "strings" + + "github.com/influxdata/telegraf" + "go.starlark.net/starlark" +) + +// TagDict is a starlark.Value for the metric tags. It is heavily based on the +// starlark.Dict. +type TagDict struct { + *Metric +} + +func (d TagDict) String() string { + buf := new(strings.Builder) + buf.WriteString("{") + sep := "" + for _, item := range d.Items() { + k, v := item[0], item[1] + buf.WriteString(sep) + buf.WriteString(k.String()) + buf.WriteString(": ") + buf.WriteString(v.String()) + sep = ", " + } + buf.WriteString("}") + return buf.String() +} + +func (d TagDict) Type() string { + return "Tags" +} + +func (d TagDict) Freeze() { + d.frozen = true +} + +func (d TagDict) Truth() starlark.Bool { + return len(d.metric.TagList()) != 0 +} + +func (d TagDict) Hash() (uint32, error) { + return 0, errors.New("not hashable") +} + +// AttrNames implements the starlark.HasAttrs interface. +func (d TagDict) AttrNames() []string { + return builtinAttrNames(TagDictMethods) +} + +// Attr implements the starlark.HasAttrs interface. +func (d TagDict) Attr(name string) (starlark.Value, error) { + return builtinAttr(d, name, TagDictMethods) +} + +var TagDictMethods = map[string]builtinMethod{ + "clear": dict_clear, + "get": dict_get, + "items": dict_items, + "keys": dict_keys, + "pop": dict_pop, + "popitem": dict_popitem, + "setdefault": dict_setdefault, + "update": dict_update, + "values": dict_values, +} + +// Get implements the starlark.Mapping interface. +func (d TagDict) Get(key starlark.Value) (v starlark.Value, found bool, err error) { + if k, ok := key.(starlark.String); ok { + gv, found := d.metric.GetTag(k.GoString()) + if !found { + return starlark.None, false, nil + } + return starlark.String(gv), true, err + } + + return starlark.None, false, errors.New("key must be of type 'str'") +} + +// SetKey implements the starlark.HasSetKey interface to support map update +// using x[k]=v syntax, like a dictionary. +func (d TagDict) SetKey(k, v starlark.Value) error { + if d.tagIterCount > 0 { + return fmt.Errorf("cannot insert during iteration") + } + + key, ok := k.(starlark.String) + if !ok { + return errors.New("tag key must be of type 'str'") + } + + value, ok := v.(starlark.String) + if !ok { + return errors.New("tag value must be of type 'str'") + } + + d.metric.AddTag(key.GoString(), value.GoString()) + return nil +} + +// Items implements the starlark.IterableMapping interface. +func (d TagDict) Items() []starlark.Tuple { + items := make([]starlark.Tuple, 0, len(d.metric.TagList())) + for _, tag := range d.metric.TagList() { + key := starlark.String(tag.Key) + value := starlark.String(tag.Value) + pair := starlark.Tuple{key, value} + items = append(items, pair) + } + return items +} + +func (d TagDict) Clear() error { + if d.tagIterCount > 0 { + return fmt.Errorf("cannot delete during iteration") + } + + keys := make([]string, 0, len(d.metric.TagList())) + for _, tag := range d.metric.TagList() { + keys = append(keys, tag.Key) + } + + for _, key := range keys { + d.metric.RemoveTag(key) + } + return nil +} + +func (d TagDict) PopItem() (v starlark.Value, err error) { + if d.tagIterCount > 0 { + return nil, fmt.Errorf("cannot delete during iteration") + } + + for _, tag := range d.metric.TagList() { + k := tag.Key + v := tag.Value + + d.metric.RemoveTag(k) + + sk := starlark.String(k) + sv := starlark.String(v) + return starlark.Tuple{sk, sv}, nil + } + + return nil, errors.New("popitem(): tag dictionary is empty") +} + +func (d TagDict) Delete(k starlark.Value) (v starlark.Value, found bool, err error) { + if d.tagIterCount > 0 { + return nil, false, fmt.Errorf("cannot delete during iteration") + } + + if key, ok := k.(starlark.String); ok { + value, ok := d.metric.GetTag(key.GoString()) + if ok { + d.metric.RemoveTag(key.GoString()) + v := starlark.String(value) + return v, ok, err + } + } + + return starlark.None, false, errors.New("key must be of type 'str'") +} + +// Items implements the starlark.Mapping interface. +func (d TagDict) Iterate() starlark.Iterator { + d.tagIterCount++ + return &TagIterator{Metric: d.Metric, tags: d.metric.TagList()} +} + +type TagIterator struct { + *Metric + tags []*telegraf.Tag +} + +// Next implements the starlark.Iterator interface. +func (i *TagIterator) Next(p *starlark.Value) bool { + if len(i.tags) == 0 { + return false + } + + tag := i.tags[0] + i.tags = i.tags[1:] + *p = starlark.String(tag.Key) + + return true +} + +// Done implements the starlark.Iterator interface. +func (i *TagIterator) Done() { + i.tagIterCount-- +} diff --git a/plugins/processors/starlark/testdata/ratio.star b/plugins/processors/starlark/testdata/ratio.star new file mode 100644 index 000000000..086cd2e69 --- /dev/null +++ b/plugins/processors/starlark/testdata/ratio.star @@ -0,0 +1,7 @@ +# Compute the ratio of two integer fields. + +def apply(metric): + used = float(metric.fields['used']) + total = float(metric.fields['total']) + metric.fields['usage'] = (used / total) * 100 + return metric diff --git a/plugins/processors/starlark/testdata/rename.star b/plugins/processors/starlark/testdata/rename.star new file mode 100644 index 000000000..51a372911 --- /dev/null +++ b/plugins/processors/starlark/testdata/rename.star @@ -0,0 +1,13 @@ +# Rename any tags using the mapping in the renames dict. + +renames = { + 'lower': 'min', + 'upper': 'max', +} + +def apply(metric): + for k, v in metric.tags.items(): + if k in renames: + metric.tags[renames[k]] = v + metric.tags.pop(k) + return metric diff --git a/plugins/processors/starlark/testdata/scale.star b/plugins/processors/starlark/testdata/scale.star new file mode 100644 index 000000000..502a13146 --- /dev/null +++ b/plugins/processors/starlark/testdata/scale.star @@ -0,0 +1,7 @@ +# Multiply any float fields by 10 + +def apply(metric): + for k, v in metric.fields.items(): + if type(v) == "float": + metric.fields[k] = v * 10 + return metric diff --git a/plugins/processors/streamingprocessor.go b/plugins/processors/streamingprocessor.go index 5b4f69844..95b2e0748 100644 --- a/plugins/processors/streamingprocessor.go +++ b/plugins/processors/streamingprocessor.go @@ -31,10 +31,11 @@ func (sp *streamingProcessor) Start(acc telegraf.Accumulator) error { return nil } -func (sp *streamingProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) { +func (sp *streamingProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error { for _, m := range sp.processor.Apply(m) { acc.AddMetric(m) } + return nil } func (sp *streamingProcessor) Stop() error { diff --git a/processor.go b/processor.go index 5e2d46914..15d67eb40 100644 --- a/processor.go +++ b/processor.go @@ -20,7 +20,7 @@ type StreamingProcessor interface { Start(acc Accumulator) error // Add is called for each metric to be processed. - Add(metric Metric, acc Accumulator) + Add(metric Metric, acc Accumulator) error // Stop gives you a callback to free resources. // by the time Stop is called, the input stream will have already been closed