Allow to provide constants to a starlark script (#8772)
This commit is contained in:
parent
c43de16bce
commit
8ddbab47a4
|
|
@ -30,6 +30,13 @@ def apply(metric):
|
||||||
|
|
||||||
## File containing a Starlark script.
|
## File containing a Starlark script.
|
||||||
# script = "/usr/local/bin/myscript.star"
|
# script = "/usr/local/bin/myscript.star"
|
||||||
|
|
||||||
|
## The constants of the Starlark script.
|
||||||
|
# [processors.starlark.constants]
|
||||||
|
# max_size = 10
|
||||||
|
# threshold = 0.75
|
||||||
|
# default_name = "Julia"
|
||||||
|
# debug_mode = true
|
||||||
```
|
```
|
||||||
|
|
||||||
### Usage
|
### Usage
|
||||||
|
|
@ -182,6 +189,29 @@ def apply(metric):
|
||||||
def failing(metric):
|
def failing(metric):
|
||||||
json.decode("non-json-content")
|
json.decode("non-json-content")
|
||||||
```
|
```
|
||||||
|
**How to reuse the same script but with different parameters?**
|
||||||
|
|
||||||
|
In case you have a generic script that you would like to reuse for different instances of the plugin, you can use constants as input parameters of your script.
|
||||||
|
|
||||||
|
So for example, assuming that you have the next configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[processors.starlark]]
|
||||||
|
script = "/usr/local/bin/myscript.star"
|
||||||
|
|
||||||
|
[processors.starlark.constants]
|
||||||
|
somecustomnum = 10
|
||||||
|
somecustomstr = "mycustomfield"
|
||||||
|
```
|
||||||
|
|
||||||
|
Your script could then use the constants defined in the configuration as follows:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def apply(metric):
|
||||||
|
if metric.fields[somecustomstr] >= somecustomnum:
|
||||||
|
metric.fields.clear()
|
||||||
|
return metric
|
||||||
|
```
|
||||||
|
|
||||||
### Examples
|
### Examples
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package starlark
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
|
@ -210,17 +211,44 @@ func (i *FieldIterator) Done() {
|
||||||
|
|
||||||
// AsStarlarkValue converts a field value to a starlark.Value.
|
// AsStarlarkValue converts a field value to a starlark.Value.
|
||||||
func asStarlarkValue(value interface{}) (starlark.Value, error) {
|
func asStarlarkValue(value interface{}) (starlark.Value, error) {
|
||||||
switch v := value.(type) {
|
v := reflect.ValueOf(value)
|
||||||
case float64:
|
switch v.Kind() {
|
||||||
return starlark.Float(v), nil
|
case reflect.Slice:
|
||||||
case int64:
|
length := v.Len()
|
||||||
return starlark.MakeInt64(v), nil
|
array := make([]starlark.Value, length)
|
||||||
case uint64:
|
for i := 0; i < length; i++ {
|
||||||
return starlark.MakeUint64(v), nil
|
sVal, err := asStarlarkValue(v.Index(i).Interface())
|
||||||
case string:
|
if err != nil {
|
||||||
return starlark.String(v), nil
|
return starlark.None, err
|
||||||
case bool:
|
}
|
||||||
return starlark.Bool(v), nil
|
array[i] = sVal
|
||||||
|
}
|
||||||
|
return starlark.NewList(array), nil
|
||||||
|
case reflect.Map:
|
||||||
|
dict := starlark.NewDict(v.Len())
|
||||||
|
iter := v.MapRange()
|
||||||
|
for iter.Next() {
|
||||||
|
sKey, err := asStarlarkValue(iter.Key().Interface())
|
||||||
|
if err != nil {
|
||||||
|
return starlark.None, err
|
||||||
|
}
|
||||||
|
sValue, err := asStarlarkValue(iter.Value().Interface())
|
||||||
|
if err != nil {
|
||||||
|
return starlark.None, err
|
||||||
|
}
|
||||||
|
dict.SetKey(sKey, sValue)
|
||||||
|
}
|
||||||
|
return dict, nil
|
||||||
|
case reflect.Float32, reflect.Float64:
|
||||||
|
return starlark.Float(v.Float()), nil
|
||||||
|
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
|
||||||
|
return starlark.MakeInt64(v.Int()), nil
|
||||||
|
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
|
||||||
|
return starlark.MakeUint64(v.Uint()), nil
|
||||||
|
case reflect.String:
|
||||||
|
return starlark.String(v.String()), nil
|
||||||
|
case reflect.Bool:
|
||||||
|
return starlark.Bool(v.Bool()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return starlark.None, errors.New("invalid type")
|
return starlark.None, errors.New("invalid type")
|
||||||
|
|
|
||||||
|
|
@ -27,12 +27,20 @@ def apply(metric):
|
||||||
|
|
||||||
## File containing a Starlark script.
|
## File containing a Starlark script.
|
||||||
# script = "/usr/local/bin/myscript.star"
|
# script = "/usr/local/bin/myscript.star"
|
||||||
|
|
||||||
|
## The constants of the Starlark script.
|
||||||
|
# [processors.starlark.constants]
|
||||||
|
# max_size = 10
|
||||||
|
# threshold = 0.75
|
||||||
|
# default_name = "Julia"
|
||||||
|
# debug_mode = true
|
||||||
`
|
`
|
||||||
)
|
)
|
||||||
|
|
||||||
type Starlark struct {
|
type Starlark struct {
|
||||||
Source string `toml:"source"`
|
Source string `toml:"source"`
|
||||||
Script string `toml:"script"`
|
Script string `toml:"script"`
|
||||||
|
Constants map[string]interface{} `toml:"constants"`
|
||||||
|
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
|
|
@ -61,6 +69,7 @@ func (s *Starlark) Init() error {
|
||||||
builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric)
|
builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric)
|
||||||
builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy)
|
builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy)
|
||||||
builtins["catch"] = starlark.NewBuiltin("catch", catch)
|
builtins["catch"] = starlark.NewBuiltin("catch", catch)
|
||||||
|
s.addConstants(&builtins)
|
||||||
|
|
||||||
program, err := s.sourceProgram(builtins)
|
program, err := s.sourceProgram(builtins)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -197,6 +206,17 @@ func (s *Starlark) Stop() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add all the constants defined in the plugin as constants of the script
|
||||||
|
func (s *Starlark) addConstants(builtins *starlark.StringDict) {
|
||||||
|
for key, val := range s.Constants {
|
||||||
|
sVal, err := asStarlarkValue(val)
|
||||||
|
if err != nil {
|
||||||
|
s.Log.Errorf("Unsupported type: %T", val)
|
||||||
|
}
|
||||||
|
(*builtins)[key] = sVal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool {
|
func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool {
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
if m == metric {
|
if m == metric {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package starlark
|
package starlark
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
|
@ -10,6 +11,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/plugins/parsers"
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
@ -250,6 +252,7 @@ func TestMetric(t *testing.T) {
|
||||||
var tests = []struct {
|
var tests = []struct {
|
||||||
name string
|
name string
|
||||||
source string
|
source string
|
||||||
|
constants map[string]interface{}
|
||||||
input []telegraf.Metric
|
input []telegraf.Metric
|
||||||
expected []telegraf.Metric
|
expected []telegraf.Metric
|
||||||
expectedErrorStr string
|
expectedErrorStr string
|
||||||
|
|
@ -2418,13 +2421,64 @@ def process(metric):
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "support constants",
|
||||||
|
source: `
|
||||||
|
def apply(metric):
|
||||||
|
metric.fields["p1"] = max_size
|
||||||
|
metric.fields["p2"] = threshold
|
||||||
|
metric.fields["p3"] = default_name
|
||||||
|
metric.fields["p4"] = debug_mode
|
||||||
|
metric.fields["p5"] = supported_values[0]
|
||||||
|
metric.fields["p6"] = supported_values[1]
|
||||||
|
metric.fields["p7"] = supported_entries[2]
|
||||||
|
metric.fields["p8"] = supported_entries["3"]
|
||||||
|
return metric
|
||||||
|
`,
|
||||||
|
constants: map[string]interface{}{
|
||||||
|
"max_size": 10,
|
||||||
|
"threshold": 0.75,
|
||||||
|
"default_name": "Julia",
|
||||||
|
"debug_mode": true,
|
||||||
|
"supported_values": []interface{}{2, "3"},
|
||||||
|
"supported_entries": map[interface{}]interface{}{
|
||||||
|
2: "two",
|
||||||
|
"3": "three",
|
||||||
|
},
|
||||||
|
"unsupported_type": time.Now(),
|
||||||
|
},
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"p1": 10,
|
||||||
|
"p2": 0.75,
|
||||||
|
"p3": "Julia",
|
||||||
|
"p4": true,
|
||||||
|
"p5": 2,
|
||||||
|
"p6": "3",
|
||||||
|
"p7": "two",
|
||||||
|
"p8": "three",
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
plugin := &Starlark{
|
plugin := &Starlark{
|
||||||
Source: tt.source,
|
Source: tt.source,
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
|
Constants: tt.constants,
|
||||||
}
|
}
|
||||||
err := plugin.Init()
|
err := plugin.Init()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -2451,6 +2505,108 @@ def process(metric):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests the behavior of the plugin according the provided TOML configuration.
|
||||||
|
func TestConfig(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
name string
|
||||||
|
config string
|
||||||
|
input []telegraf.Metric
|
||||||
|
expected []telegraf.Metric
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "support constants from configuration",
|
||||||
|
config: `
|
||||||
|
[[processors.starlark]]
|
||||||
|
source = '''
|
||||||
|
def apply(metric):
|
||||||
|
metric.fields["p1"] = max_size
|
||||||
|
metric.fields["p2"] = threshold
|
||||||
|
metric.fields["p3"] = default_name
|
||||||
|
metric.fields["p4"] = debug_mode
|
||||||
|
metric.fields["p5"] = supported_values[0]
|
||||||
|
metric.fields["p6"] = supported_values[1]
|
||||||
|
metric.fields["p7"] = supported_entries["2"]
|
||||||
|
metric.fields["p8"] = supported_entries["3"]
|
||||||
|
return metric
|
||||||
|
'''
|
||||||
|
[processors.starlark.constants]
|
||||||
|
max_size = 10
|
||||||
|
threshold = 0.75
|
||||||
|
default_name = "Elsa"
|
||||||
|
debug_mode = true
|
||||||
|
supported_values = ["2", "3"]
|
||||||
|
supported_entries = { "2" = "two", "3" = "three" }
|
||||||
|
unsupported_type = 2009-06-12
|
||||||
|
`,
|
||||||
|
input: []telegraf.Metric{
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
expected: []telegraf.Metric{
|
||||||
|
testutil.MustMetric("cpu",
|
||||||
|
map[string]string{},
|
||||||
|
map[string]interface{}{
|
||||||
|
"p1": 10,
|
||||||
|
"p2": 0.75,
|
||||||
|
"p3": "Elsa",
|
||||||
|
"p4": true,
|
||||||
|
"p5": "2",
|
||||||
|
"p6": "3",
|
||||||
|
"p7": "two",
|
||||||
|
"p8": "three",
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
plugin, err := buildPlugin(tt.config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = plugin.Init()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
err = plugin.Start(&acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for _, m := range tt.input {
|
||||||
|
err = plugin.Add(m, &acc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = plugin.Stop()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build a Starlark plugin from the provided configuration.
|
||||||
|
func buildPlugin(configContent string) (*Starlark, error) {
|
||||||
|
c := config.NewConfig()
|
||||||
|
err := c.LoadConfigData([]byte(configContent))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(c.Processors) != 1 {
|
||||||
|
return nil, errors.New("Only one processor was expected")
|
||||||
|
}
|
||||||
|
plugin, ok := (c.Processors[0].Processor).(*Starlark)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("Only a Starlark processor was expected")
|
||||||
|
}
|
||||||
|
plugin.Log = testutil.Logger{}
|
||||||
|
return plugin, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestScript(t *testing.T) {
|
func TestScript(t *testing.T) {
|
||||||
var tests = []struct {
|
var tests = []struct {
|
||||||
name string
|
name string
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue