Add starlark processor (#7660)

This commit is contained in:
Daniel Nelson 2020-06-23 14:15:14 -07:00 committed by GitHub
parent d98153e7bc
commit c7cce961c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 4051 additions and 7 deletions

View File

@ -508,7 +508,9 @@ func (a *Agent) runProcessors(
acc := NewAccumulator(unit.processor, unit.dst)
for m := range unit.src {
unit.processor.Add(m, acc)
if err := unit.processor.Add(m, acc); err != nil {
acc.AddError(err)
}
}
unit.processor.Stop()
close(unit.dst)

View File

@ -55,6 +55,7 @@ following works:
- github.com/go-ole/go-ole [MIT License](https://github.com/go-ole/go-ole/blob/master/LICENSE)
- github.com/go-redis/redis [BSD 2-Clause "Simplified" License](https://github.com/go-redis/redis/blob/master/LICENSE)
- github.com/go-sql-driver/mysql [Mozilla Public License 2.0](https://github.com/go-sql-driver/mysql/blob/master/LICENSE)
- go.starlark.net [BSD 3-Clause "New" or "Revised" License](https://github.com/google/starlark-go/blob/master/LICENSE)
- github.com/goburrow/modbus [BSD 3-Clause "New" or "Revised" License](https://github.com/goburrow/modbus/blob/master/LICENSE)
- github.com/goburrow/serial [MIT License](https://github.com/goburrow/serial/LICENSE)
- github.com/gobwas/glob [MIT License](https://github.com/gobwas/glob/blob/master/LICENSE)

1
go.mod
View File

@ -126,6 +126,7 @@ require (
github.com/wvanbergen/kafka v0.0.0-20171203153745-e2edea948ddf
github.com/wvanbergen/kazoo-go v0.0.0-20180202103751-f72d8611297a // indirect
github.com/yuin/gopher-lua v0.0.0-20180630135845-46796da1b0b4 // indirect
go.starlark.net v0.0.0-20191227232015-caa3e9aa5008
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200301022130-244492dfa37a

4
go.sum
View File

@ -581,6 +581,8 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.starlark.net v0.0.0-20191227232015-caa3e9aa5008 h1:PUpdYMZifLwPlUnFfT/2Hkqr7p0SSpOR7xrDiPaD52k=
go.starlark.net v0.0.0-20191227232015-caa3e9aa5008/go.mod h1:nmDLcffg48OtT/PSW0Hg7FvpRQsQh5OSqIylirxKC7o=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@ -701,6 +703,8 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 h1:ng0gs1AKnRRuEMZoTLLlbOd+C17zUDepwGQBb/n+JVg=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191002063906-3421d5a6bb1c h1:Vco5b+cuG5NNfORVxZy6bYZQ7rsigisU1WQFkvQ0L5E=
golang.org/x/sys v0.0.0-20191002063906-3421d5a6bb1c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191003212358-c178f38b412c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@ -78,21 +78,21 @@ func (r *RunningProcessor) Start(acc telegraf.Accumulator) error {
return r.Processor.Start(acc)
}
func (r *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) {
func (r *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error {
if ok := r.Config.Filter.Select(m); !ok {
// pass downstream
acc.AddMetric(m)
return
return nil
}
r.Config.Filter.Modify(m)
if len(m.FieldList()) == 0 {
// drop metric
r.metricFiltered(m)
return
return nil
}
r.Processor.Add(m, acc)
return r.Processor.Add(m, acc)
}
func (r *RunningProcessor) Stop() {

View File

@ -16,6 +16,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/processors/regex"
_ "github.com/influxdata/telegraf/plugins/processors/rename"
_ "github.com/influxdata/telegraf/plugins/processors/s2geo"
_ "github.com/influxdata/telegraf/plugins/processors/starlark"
_ "github.com/influxdata/telegraf/plugins/processors/strings"
_ "github.com/influxdata/telegraf/plugins/processors/tag_limit"
_ "github.com/influxdata/telegraf/plugins/processors/template"

View File

@ -0,0 +1,150 @@
# Starlark Processor
The `starlark` processor calls a Starlark function for each matched metric,
allowing for custom programmatic metric processing.
The Starlark language is a dialect of Python, and will be familiar to those who
have experience with the Python language. However, keep in mind that it is not
Python and that there are major syntax [differences][#Python Differences].
Existing Python code is unlikely to work unmodified. The execution environment
is sandboxed, and it is not possible to do I/O operations such as reading from
files or sockets.
The Starlark [specification][] has details about the syntax and available
functions.
### Configuration
```toml
[[processors.starlark]]
## The Starlark source can be set as a string in this configuration file, or
## by referencing a file containing the script. Only one source or script
## should be set at once.
##
## Source of the Starlark script.
source = '''
def apply(metric):
return metric
'''
## File containing a Starlark script.
# script = "/usr/local/bin/myscript.star"
```
### Usage
The script should contain a function called `apply` that takes the metric as
its single argument. The function will be called with each metric, and can
return `None`, a single metric, or a list of metrics.
```python
def apply(metric):
return metric
```
Reference the Starlark [specification][] to see the list of available types and
functions that can be used in the script. In addition to these the following
types and functions are exposed to the script.
**Metric(*name*)**:
Create a new metric with the given measurement name. The metric will have no
tags or fields and defaults to the current time.
- **name**:
The name is a [string][string] containing the metric measurement name.
- **tags**:
A [dict-like][dict] object containing the metric's tags.
- **fields**:
A [dict-like][dict] object containing the metric's fields. The values may be
of type int, float, string, or bool.
- **time**:
The timestamp of the metric as an integer in nanoseconds since the Unix
epoch.
**deepcopy(*metric*)**: Make a copy of an existing metric.
### Python Differences
While Starlark is similar to Python it is not the same.
- Starlark has limited support for error handling and no exceptions. If an
error occurs the script will immediately end and Telegraf will drop the
metric. Check the Telegraf logfile for details about the error.
- It is not possible to import other packages and the Python standard library
is not available. As such, it is not possible to open files or sockets.
- These common keywords are **not supported** in the Starlark grammar:
```
as finally nonlocal
assert from raise
class global try
del import with
except is yield
```
### Common Questions
**How can I drop/delete a metric?**
If you don't return the metric it will be deleted. Usually this means the
function should `return None`.
**How should I make a copy of a metric?**
Use `deepcopy(metric)` to create a copy of the metric.
**How can I return multiple metrics?**
You can return a list of metrics:
```python
def apply(metric):
m2 = deepcopy(metric)
return [metric, m2]
```
**What happens to a tracking metric if an error occurs in the script?**
The metric is marked as undelivered.
**How do I create a new metric?**
Use the `Metric(name)` function and set at least one field.
**What is the fastest way to iterate over tags/fields?**
The fastest way to iterate is to use a for-loop on the tags or fields attribute:
```python
def apply(metric):
for k in metric.tags:
pass
return metric
```
When you use this form, it is not possible to modify the tags inside the loop,
if this is needed you should use the `.keys()`, `.values()`, or `.items()` forms:
```python
def apply(metric):
for k, v in metric.tags.items():
pass
return metric
```
**How can I save values across multiple calls to the script?**
Telegraf freezes the global scope, which prevents it from being modified.
Attempting to modify the global scope will fail with an error.
### Examples
- [ratio](/plugins/processors/starlark/testdata/ratio.star)
- [rename](/plugins/processors/starlark/testdata/rename.star)
- [scale](/plugins/processors/starlark/testdata/scale.star)
[specification]: https://github.com/google/starlark-go/blob/master/doc/spec.md
[string]: https://github.com/google/starlark-go/blob/master/doc/spec.md#strings
[dict]: https://github.com/google/starlark-go/blob/master/doc/spec.md#dictionaries

View File

@ -0,0 +1,261 @@
package starlark
import (
"fmt"
"sort"
"time"
"github.com/influxdata/telegraf/metric"
"go.starlark.net/starlark"
)
func newMetric(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var name starlark.String
if err := starlark.UnpackPositionalArgs("Metric", args, kwargs, 1, &name); err != nil {
return nil, err
}
m, err := metric.New(string(name), nil, nil, time.Now())
if err != nil {
return nil, err
}
return &Metric{metric: m}, nil
}
func deepcopy(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var sm *Metric
if err := starlark.UnpackPositionalArgs("deepcopy", args, kwargs, 1, &sm); err != nil {
return nil, err
}
dup := sm.metric.Copy()
dup.Drop()
return &Metric{metric: dup}, nil
}
type builtinMethod func(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error)
func builtinAttr(recv starlark.Value, name string, methods map[string]builtinMethod) (starlark.Value, error) {
method := methods[name]
if method == nil {
return starlark.None, fmt.Errorf("no such method '%s'", name)
}
// Allocate a closure over 'method'.
impl := func(thread *starlark.Thread, b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
return method(b, args, kwargs)
}
return starlark.NewBuiltin(name, impl).BindReceiver(recv), nil
}
func builtinAttrNames(methods map[string]builtinMethod) []string {
names := make([]string, 0, len(methods))
for name := range methods {
names = append(names, name)
}
sort.Strings(names)
return names
}
// nameErr returns an error message of the form "name: msg"
// where name is b.Name() and msg is a string or error.
func nameErr(b *starlark.Builtin, msg interface{}) error {
return fmt.Errorf("%s: %v", b.Name(), msg)
}
// --- dictionary methods ---
// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·clear
func dict_clear(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
}
type HasClear interface {
Clear() error
}
return starlark.None, b.Receiver().(HasClear).Clear()
}
// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·pop
func dict_pop(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var k, d starlark.Value
if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 1, &k, &d); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
}
type HasDelete interface {
Delete(k starlark.Value) (starlark.Value, bool, error)
}
if v, found, err := b.Receiver().(HasDelete).Delete(k); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err) // dict is frozen or key is unhashable
} else if found {
return v, nil
} else if d != nil {
return d, nil
}
return starlark.None, fmt.Errorf("%s: missing key", b.Name())
}
// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·popitem
func dict_popitem(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
}
type HasPopItem interface {
PopItem() (starlark.Value, error)
}
return b.Receiver().(HasPopItem).PopItem()
}
// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·get
func dict_get(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var key, dflt starlark.Value
if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 1, &key, &dflt); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
}
if v, ok, err := b.Receiver().(starlark.Mapping).Get(key); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
} else if ok {
return v, nil
} else if dflt != nil {
return dflt, nil
}
return starlark.None, nil
}
// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·setdefault
func dict_setdefault(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
var key, dflt starlark.Value = nil, starlark.None
if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 1, &key, &dflt); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
}
recv := b.Receiver().(starlark.HasSetKey)
v, found, err := recv.Get(key)
if err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
}
if !found {
v = dflt
if err := recv.SetKey(key, dflt); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
}
}
return v, nil
}
// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·update
func dict_update(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
// Unpack the arguments
if len(args) > 1 {
return nil, fmt.Errorf("update: got %d arguments, want at most 1", len(args))
}
// Get the target
dict := b.Receiver().(starlark.HasSetKey)
if len(args) == 1 {
switch updates := args[0].(type) {
case starlark.IterableMapping:
// Iterate over dict's key/value pairs, not just keys.
for _, item := range updates.Items() {
if err := dict.SetKey(item[0], item[1]); err != nil {
return nil, err // dict is frozen
}
}
default:
// all other sequences
iter := starlark.Iterate(updates)
if iter == nil {
return nil, fmt.Errorf("got %s, want iterable", updates.Type())
}
defer iter.Done()
var pair starlark.Value
for i := 0; iter.Next(&pair); i++ {
iter2 := starlark.Iterate(pair)
if iter2 == nil {
return nil, fmt.Errorf("dictionary update sequence element #%d is not iterable (%s)", i, pair.Type())
}
defer iter2.Done()
len := starlark.Len(pair)
if len < 0 {
return nil, fmt.Errorf("dictionary update sequence element #%d has unknown length (%s)", i, pair.Type())
} else if len != 2 {
return nil, fmt.Errorf("dictionary update sequence element #%d has length %d, want 2", i, len)
}
var k, v starlark.Value
iter2.Next(&k)
iter2.Next(&v)
if err := dict.SetKey(k, v); err != nil {
return nil, err
}
}
}
}
// Then add the kwargs.
before := starlark.Len(dict)
for _, pair := range kwargs {
if err := dict.SetKey(pair[0], pair[1]); err != nil {
return nil, err // dict is frozen
}
}
// In the common case, each kwarg will add another dict entry.
// If that's not so, check whether it is because there was a duplicate kwarg.
if starlark.Len(dict) < before+len(kwargs) {
keys := make(map[starlark.String]bool, len(kwargs))
for _, kv := range kwargs {
k := kv[0].(starlark.String)
if keys[k] {
return nil, fmt.Errorf("duplicate keyword arg: %v", k)
}
keys[k] = true
}
}
return starlark.None, nil
}
// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·items
func dict_items(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
}
items := b.Receiver().(starlark.IterableMapping).Items()
res := make([]starlark.Value, len(items))
for i, item := range items {
res[i] = item // convert [2]starlark.Value to starlark.Value
}
return starlark.NewList(res), nil
}
// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·keys
func dict_keys(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
}
items := b.Receiver().(starlark.IterableMapping).Items()
res := make([]starlark.Value, len(items))
for i, item := range items {
res[i] = item[0]
}
return starlark.NewList(res), nil
}
// https://github.com/google/starlark-go/blob/master/doc/spec.md#dict·update
func dict_values(b *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
if err := starlark.UnpackPositionalArgs(b.Name(), args, kwargs, 0); err != nil {
return starlark.None, fmt.Errorf("%s: %v", b.Name(), err)
}
items := b.Receiver().(starlark.IterableMapping).Items()
res := make([]starlark.Value, len(items))
for i, item := range items {
res[i] = item[1]
}
return starlark.NewList(res), nil
}

View File

@ -0,0 +1,247 @@
package starlark
import (
"errors"
"fmt"
"strings"
"github.com/influxdata/telegraf"
"go.starlark.net/starlark"
)
// FieldDict is a starlark.Value for the metric fields. It is heavily based on the
// starlark.Dict.
type FieldDict struct {
*Metric
}
func (d FieldDict) String() string {
buf := new(strings.Builder)
buf.WriteString("{")
sep := ""
for _, item := range d.Items() {
k, v := item[0], item[1]
buf.WriteString(sep)
buf.WriteString(k.String())
buf.WriteString(": ")
buf.WriteString(v.String())
sep = ", "
}
buf.WriteString("}")
return buf.String()
}
func (d FieldDict) Type() string {
return "Fields"
}
func (d FieldDict) Freeze() {
d.frozen = true
}
func (d FieldDict) Truth() starlark.Bool {
return len(d.metric.FieldList()) != 0
}
func (d FieldDict) Hash() (uint32, error) {
return 0, errors.New("not hashable")
}
// AttrNames implements the starlark.HasAttrs interface.
func (d FieldDict) AttrNames() []string {
return builtinAttrNames(FieldDictMethods)
}
// Attr implements the starlark.HasAttrs interface.
func (d FieldDict) Attr(name string) (starlark.Value, error) {
return builtinAttr(d, name, FieldDictMethods)
}
var FieldDictMethods = map[string]builtinMethod{
"clear": dict_clear,
"get": dict_get,
"items": dict_items,
"keys": dict_keys,
"pop": dict_pop,
"popitem": dict_popitem,
"setdefault": dict_setdefault,
"update": dict_update,
"values": dict_values,
}
// Get implements the starlark.Mapping interface.
func (d FieldDict) Get(key starlark.Value) (v starlark.Value, found bool, err error) {
if k, ok := key.(starlark.String); ok {
gv, found := d.metric.GetField(k.GoString())
if !found {
return starlark.None, false, nil
}
v, err := asStarlarkValue(gv)
if err != nil {
return starlark.None, false, err
}
return v, true, nil
}
return starlark.None, false, errors.New("key must be of type 'str'")
}
// SetKey implements the starlark.HasSetKey interface to support map update
// using x[k]=v syntax, like a dictionary.
func (d FieldDict) SetKey(k, v starlark.Value) error {
if d.fieldIterCount > 0 {
return fmt.Errorf("cannot insert during iteration")
}
key, ok := k.(starlark.String)
if !ok {
return errors.New("field key must be of type 'str'")
}
gv, err := asGoValue(v)
if err != nil {
return err
}
d.metric.AddField(key.GoString(), gv)
return nil
}
// Items implements the starlark.IterableMapping interface.
func (d FieldDict) Items() []starlark.Tuple {
items := make([]starlark.Tuple, 0, len(d.metric.FieldList()))
for _, field := range d.metric.FieldList() {
key := starlark.String(field.Key)
sv, err := asStarlarkValue(field.Value)
if err != nil {
continue
}
pair := starlark.Tuple{key, sv}
items = append(items, pair)
}
return items
}
func (d FieldDict) Clear() error {
if d.fieldIterCount > 0 {
return fmt.Errorf("cannot delete during iteration")
}
keys := make([]string, 0, len(d.metric.FieldList()))
for _, field := range d.metric.FieldList() {
keys = append(keys, field.Key)
}
for _, key := range keys {
d.metric.RemoveField(key)
}
return nil
}
func (d FieldDict) PopItem() (v starlark.Value, err error) {
if d.fieldIterCount > 0 {
return nil, fmt.Errorf("cannot delete during iteration")
}
for _, field := range d.metric.FieldList() {
k := field.Key
v := field.Value
d.metric.RemoveField(k)
sk := starlark.String(k)
sv, err := asStarlarkValue(v)
if err != nil {
return nil, fmt.Errorf("could not convert to starlark value")
}
return starlark.Tuple{sk, sv}, nil
}
return nil, errors.New("popitem(): field dictionary is empty")
}
func (d FieldDict) Delete(k starlark.Value) (v starlark.Value, found bool, err error) {
if d.fieldIterCount > 0 {
return nil, false, fmt.Errorf("cannot delete during iteration")
}
if key, ok := k.(starlark.String); ok {
value, ok := d.metric.GetField(key.GoString())
if ok {
d.metric.RemoveField(key.GoString())
sv, err := asStarlarkValue(value)
return sv, ok, err
}
}
return starlark.None, false, errors.New("key must be of type 'str'")
}
// Items implements the starlark.Mapping interface.
func (d FieldDict) Iterate() starlark.Iterator {
d.fieldIterCount++
return &FieldIterator{Metric: d.Metric, fields: d.metric.FieldList()}
}
type FieldIterator struct {
*Metric
fields []*telegraf.Field
}
// Next implements the starlark.Iterator interface.
func (i *FieldIterator) Next(p *starlark.Value) bool {
if len(i.fields) == 0 {
return false
}
field := i.fields[0]
i.fields = i.fields[1:]
*p = starlark.String(field.Key)
return true
}
// Done implements the starlark.Iterator interface.
func (i *FieldIterator) Done() {
i.fieldIterCount--
}
// AsStarlarkValue converts a field value to a starlark.Value.
func asStarlarkValue(value interface{}) (starlark.Value, error) {
switch v := value.(type) {
case float64:
return starlark.Float(v), nil
case int64:
return starlark.MakeInt64(v), nil
case uint64:
return starlark.MakeUint64(v), nil
case string:
return starlark.String(v), nil
case bool:
return starlark.Bool(v), nil
}
return starlark.None, errors.New("invalid type")
}
// AsGoValue converts a starlark.Value to a field value.
func asGoValue(value interface{}) (interface{}, error) {
switch v := value.(type) {
case starlark.Float:
return float64(v), nil
case starlark.Int:
n, ok := v.Int64()
if !ok {
return nil, errors.New("cannot represent integer as int64")
}
return n, nil
case starlark.String:
return string(v), nil
case starlark.Bool:
return bool(v), nil
}
return nil, errors.New("invalid starlark type")
}

View File

@ -0,0 +1,148 @@
package starlark
import (
"errors"
"fmt"
"strings"
"time"
"github.com/influxdata/telegraf"
"go.starlark.net/starlark"
)
type Metric struct {
metric telegraf.Metric
tagIterCount int
fieldIterCount int
frozen bool
}
// Wrap updates the starlark.Metric to wrap a new telegraf.Metric.
func (m *Metric) Wrap(metric telegraf.Metric) {
m.metric = metric
m.tagIterCount = 0
m.fieldIterCount = 0
m.frozen = false
}
// Unwrap removes the telegraf.Metric from the startlark.Metric.
func (m *Metric) Unwrap() telegraf.Metric {
return m.metric
}
// String returns the starlark representation of the Metric.
//
// The String function is called by both the repr() and str() functions, and so
// it behaves more like the repr function would in Python.
func (m *Metric) String() string {
buf := new(strings.Builder)
buf.WriteString("Metric(")
buf.WriteString(m.Name().String())
buf.WriteString(", tags=")
buf.WriteString(m.Tags().String())
buf.WriteString(", fields=")
buf.WriteString(m.Fields().String())
buf.WriteString(", time=")
buf.WriteString(m.Time().String())
buf.WriteString(")")
return buf.String()
}
func (m *Metric) Type() string {
return "Metric"
}
func (m *Metric) Freeze() {
m.frozen = true
}
func (m *Metric) Truth() starlark.Bool {
return true
}
func (m *Metric) Hash() (uint32, error) {
return 0, errors.New("not hashable")
}
// AttrNames implements the starlark.HasAttrs interface.
func (m *Metric) AttrNames() []string {
return []string{"name", "tags", "fields", "time"}
}
// Attr implements the starlark.HasAttrs interface.
func (m *Metric) Attr(name string) (starlark.Value, error) {
switch name {
case "name":
return m.Name(), nil
case "tags":
return m.Tags(), nil
case "fields":
return m.Fields(), nil
case "time":
return m.Time(), nil
default:
// Returning nil, nil indicates "no such field or method"
return nil, nil
}
}
// SetField implements the starlark.HasSetField interface.
func (m *Metric) SetField(name string, value starlark.Value) error {
if m.frozen {
return fmt.Errorf("cannot modify frozen metric")
}
switch name {
case "name":
return m.SetName(value)
case "time":
return m.SetTime(value)
case "tags":
return errors.New("cannot set tags")
case "fields":
return errors.New("cannot set fields")
default:
return starlark.NoSuchAttrError(
fmt.Sprintf("cannot assign to field '%s'", name))
}
}
func (m *Metric) Name() starlark.String {
return starlark.String(m.metric.Name())
}
func (m *Metric) SetName(value starlark.Value) error {
if str, ok := value.(starlark.String); ok {
m.metric.SetName(str.GoString())
return nil
}
return errors.New("type error")
}
func (m *Metric) Tags() TagDict {
return TagDict{m}
}
func (m *Metric) Fields() FieldDict {
return FieldDict{m}
}
func (m *Metric) Time() starlark.Int {
return starlark.MakeInt64(m.metric.Time().UnixNano())
}
func (m *Metric) SetTime(value starlark.Value) error {
switch v := value.(type) {
case starlark.Int:
ns, ok := v.Int64()
if !ok {
return errors.New("type error: unrepresentable time")
}
tm := time.Unix(0, ns)
m.metric.SetTime(tm)
return nil
default:
return errors.New("type error")
}
}

View File

@ -0,0 +1,215 @@
package starlark
import (
"errors"
"fmt"
"strings"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/processors"
"go.starlark.net/resolve"
"go.starlark.net/starlark"
)
const (
description = "Process metrics using a Starlark script"
sampleConfig = `
## The Starlark source can be set as a string in this configuration file, or
## by referencing a file containing the script. Only one source or script
## should be set at once.
##
## Source of the Starlark script.
source = '''
def apply(metric):
return metric
'''
## File containing a Starlark script.
# script = "/usr/local/bin/myscript.star"
`
)
type Starlark struct {
Source string `toml:"source"`
Script string `toml:"script"`
Log telegraf.Logger `toml:"-"`
thread *starlark.Thread
applyFunc *starlark.Function
args starlark.Tuple
results []telegraf.Metric
}
func (s *Starlark) Init() error {
if s.Source == "" && s.Script == "" {
return errors.New("one of source or script must be set")
}
if s.Source != "" && s.Script != "" {
return errors.New("both source or script cannot be set")
}
s.thread = &starlark.Thread{
Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) },
}
builtins := starlark.StringDict{}
builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric)
builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy)
program, err := s.sourceProgram(builtins)
if err != nil {
return err
}
// Execute source
globals, err := program.Init(s.thread, builtins)
if err != nil {
return err
}
// Freeze the global state. This prevents modifications to the processor
// state and prevents scripts from containing errors storing tracking
// metrics. Tasks that require global state will not be possible due to
// this, so maybe we should relax this in the future.
globals.Freeze()
// The source should define an apply function.
apply := globals["apply"]
if apply == nil {
return errors.New("apply is not defined")
}
var ok bool
if s.applyFunc, ok = apply.(*starlark.Function); !ok {
return errors.New("apply is not a function")
}
if s.applyFunc.NumParams() != 1 {
return errors.New("apply function must take one parameter")
}
// Reusing the same metric wrapper to skip an allocation. This will cause
// any saved references to point to the new metric, but due to freezing the
// globals none should exist.
s.args = make(starlark.Tuple, 1)
s.args[0] = &Metric{}
// Preallocate a slice for return values.
s.results = make([]telegraf.Metric, 0, 10)
return nil
}
func (s *Starlark) sourceProgram(builtins starlark.StringDict) (*starlark.Program, error) {
if s.Source != "" {
_, program, err := starlark.SourceProgram("processor.starlark", s.Source, builtins.Has)
return program, err
}
_, program, err := starlark.SourceProgram(s.Script, nil, builtins.Has)
return program, err
}
func (s *Starlark) SampleConfig() string {
return sampleConfig
}
func (s *Starlark) Description() string {
return description
}
func (s *Starlark) Start(acc telegraf.Accumulator) error {
return nil
}
func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
s.args[0].(*Metric).Wrap(metric)
rv, err := starlark.Call(s.thread, s.applyFunc, s.args, nil)
if err != nil {
if err, ok := err.(*starlark.EvalError); ok {
for _, line := range strings.Split(err.Backtrace(), "\n") {
s.Log.Error(line)
}
}
metric.Reject()
return err
}
switch rv := rv.(type) {
case *starlark.List:
iter := rv.Iterate()
defer iter.Done()
var v starlark.Value
for iter.Next(&v) {
switch v := v.(type) {
case *Metric:
m := v.Unwrap()
if containsMetric(s.results, m) {
s.Log.Errorf("Duplicate metric reference detected")
continue
}
s.results = append(s.results, m)
acc.AddMetric(m)
default:
s.Log.Errorf("Invalid type returned in list: %s", v.Type())
}
}
// If the script didn't return the original metrics, mark it as
// successfully handled.
if !containsMetric(s.results, metric) {
metric.Accept()
}
// clear results
for i := range s.results {
s.results[i] = nil
}
s.results = s.results[:0]
case *Metric:
m := rv.Unwrap()
// If the script returned a different metric, mark this metric as
// successfully handled.
if m != metric {
metric.Accept()
}
acc.AddMetric(m)
case starlark.NoneType:
metric.Drop()
default:
return fmt.Errorf("Invalid type returned: %T", rv)
}
return nil
}
func (s *Starlark) Stop() error {
return nil
}
func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool {
for _, m := range metrics {
if m == metric {
return true
}
}
return false
}
func init() {
// https://github.com/bazelbuild/starlark/issues/20
resolve.AllowNestedDef = true
resolve.AllowLambda = true
resolve.AllowFloat = true
resolve.AllowSet = true
resolve.AllowGlobalReassign = true
resolve.AllowRecursion = true
}
func init() {
processors.AddStreaming("starlark", func() telegraf.StreamingProcessor {
return &Starlark{}
})
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,197 @@
package starlark
import (
"errors"
"fmt"
"strings"
"github.com/influxdata/telegraf"
"go.starlark.net/starlark"
)
// TagDict is a starlark.Value for the metric tags. It is heavily based on the
// starlark.Dict.
type TagDict struct {
*Metric
}
func (d TagDict) String() string {
buf := new(strings.Builder)
buf.WriteString("{")
sep := ""
for _, item := range d.Items() {
k, v := item[0], item[1]
buf.WriteString(sep)
buf.WriteString(k.String())
buf.WriteString(": ")
buf.WriteString(v.String())
sep = ", "
}
buf.WriteString("}")
return buf.String()
}
func (d TagDict) Type() string {
return "Tags"
}
func (d TagDict) Freeze() {
d.frozen = true
}
func (d TagDict) Truth() starlark.Bool {
return len(d.metric.TagList()) != 0
}
func (d TagDict) Hash() (uint32, error) {
return 0, errors.New("not hashable")
}
// AttrNames implements the starlark.HasAttrs interface.
func (d TagDict) AttrNames() []string {
return builtinAttrNames(TagDictMethods)
}
// Attr implements the starlark.HasAttrs interface.
func (d TagDict) Attr(name string) (starlark.Value, error) {
return builtinAttr(d, name, TagDictMethods)
}
var TagDictMethods = map[string]builtinMethod{
"clear": dict_clear,
"get": dict_get,
"items": dict_items,
"keys": dict_keys,
"pop": dict_pop,
"popitem": dict_popitem,
"setdefault": dict_setdefault,
"update": dict_update,
"values": dict_values,
}
// Get implements the starlark.Mapping interface.
func (d TagDict) Get(key starlark.Value) (v starlark.Value, found bool, err error) {
if k, ok := key.(starlark.String); ok {
gv, found := d.metric.GetTag(k.GoString())
if !found {
return starlark.None, false, nil
}
return starlark.String(gv), true, err
}
return starlark.None, false, errors.New("key must be of type 'str'")
}
// SetKey implements the starlark.HasSetKey interface to support map update
// using x[k]=v syntax, like a dictionary.
func (d TagDict) SetKey(k, v starlark.Value) error {
if d.tagIterCount > 0 {
return fmt.Errorf("cannot insert during iteration")
}
key, ok := k.(starlark.String)
if !ok {
return errors.New("tag key must be of type 'str'")
}
value, ok := v.(starlark.String)
if !ok {
return errors.New("tag value must be of type 'str'")
}
d.metric.AddTag(key.GoString(), value.GoString())
return nil
}
// Items implements the starlark.IterableMapping interface.
func (d TagDict) Items() []starlark.Tuple {
items := make([]starlark.Tuple, 0, len(d.metric.TagList()))
for _, tag := range d.metric.TagList() {
key := starlark.String(tag.Key)
value := starlark.String(tag.Value)
pair := starlark.Tuple{key, value}
items = append(items, pair)
}
return items
}
func (d TagDict) Clear() error {
if d.tagIterCount > 0 {
return fmt.Errorf("cannot delete during iteration")
}
keys := make([]string, 0, len(d.metric.TagList()))
for _, tag := range d.metric.TagList() {
keys = append(keys, tag.Key)
}
for _, key := range keys {
d.metric.RemoveTag(key)
}
return nil
}
func (d TagDict) PopItem() (v starlark.Value, err error) {
if d.tagIterCount > 0 {
return nil, fmt.Errorf("cannot delete during iteration")
}
for _, tag := range d.metric.TagList() {
k := tag.Key
v := tag.Value
d.metric.RemoveTag(k)
sk := starlark.String(k)
sv := starlark.String(v)
return starlark.Tuple{sk, sv}, nil
}
return nil, errors.New("popitem(): tag dictionary is empty")
}
func (d TagDict) Delete(k starlark.Value) (v starlark.Value, found bool, err error) {
if d.tagIterCount > 0 {
return nil, false, fmt.Errorf("cannot delete during iteration")
}
if key, ok := k.(starlark.String); ok {
value, ok := d.metric.GetTag(key.GoString())
if ok {
d.metric.RemoveTag(key.GoString())
v := starlark.String(value)
return v, ok, err
}
}
return starlark.None, false, errors.New("key must be of type 'str'")
}
// Items implements the starlark.Mapping interface.
func (d TagDict) Iterate() starlark.Iterator {
d.tagIterCount++
return &TagIterator{Metric: d.Metric, tags: d.metric.TagList()}
}
type TagIterator struct {
*Metric
tags []*telegraf.Tag
}
// Next implements the starlark.Iterator interface.
func (i *TagIterator) Next(p *starlark.Value) bool {
if len(i.tags) == 0 {
return false
}
tag := i.tags[0]
i.tags = i.tags[1:]
*p = starlark.String(tag.Key)
return true
}
// Done implements the starlark.Iterator interface.
func (i *TagIterator) Done() {
i.tagIterCount--
}

View File

@ -0,0 +1,7 @@
# Compute the ratio of two integer fields.
def apply(metric):
used = float(metric.fields['used'])
total = float(metric.fields['total'])
metric.fields['usage'] = (used / total) * 100
return metric

View File

@ -0,0 +1,13 @@
# Rename any tags using the mapping in the renames dict.
renames = {
'lower': 'min',
'upper': 'max',
}
def apply(metric):
for k, v in metric.tags.items():
if k in renames:
metric.tags[renames[k]] = v
metric.tags.pop(k)
return metric

View File

@ -0,0 +1,7 @@
# Multiply any float fields by 10
def apply(metric):
for k, v in metric.fields.items():
if type(v) == "float":
metric.fields[k] = v * 10
return metric

View File

@ -31,10 +31,11 @@ func (sp *streamingProcessor) Start(acc telegraf.Accumulator) error {
return nil
}
func (sp *streamingProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) {
func (sp *streamingProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error {
for _, m := range sp.processor.Apply(m) {
acc.AddMetric(m)
}
return nil
}
func (sp *streamingProcessor) Stop() error {

View File

@ -20,7 +20,7 @@ type StreamingProcessor interface {
Start(acc Accumulator) error
// Add is called for each metric to be processed.
Add(metric Metric, acc Accumulator)
Add(metric Metric, acc Accumulator) error
// Stop gives you a callback to free resources.
// by the time Stop is called, the input stream will have already been closed