feat(processors.starlark): Allow persistence of global state (#15170)

This commit is contained in:
Sven Rebhan 2024-04-25 14:24:57 -04:00 committed by GitHub
parent e3d23229c3
commit 274333921f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 297 additions and 16 deletions

View File

@ -106,11 +106,6 @@ func (a *Agent) Run(ctx context.Context) error {
time.Duration(a.Config.Agent.Interval), a.Config.Agent.Quiet,
a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval))
log.Printf("D! [agent] Initializing plugins")
if err := a.initPlugins(); err != nil {
return err
}
if a.Config.Persister != nil {
log.Printf("D! [agent] Initializing plugin states")
if err := a.initPersister(); err != nil {
@ -124,6 +119,11 @@ func (a *Agent) Run(ctx context.Context) error {
}
}
log.Printf("D! [agent] Initializing plugins")
if err := a.initPlugins(); err != nil {
return err
}
startTime := time.Now()
log.Printf("D! [agent] Connecting outputs")

View File

@ -1,6 +1,8 @@
package starlark
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"strings"
@ -26,6 +28,77 @@ type Common struct {
globals starlark.StringDict
functions map[string]*starlark.Function
parameters map[string]starlark.Tuple
state *starlark.Dict
}
func (s *Common) GetState() interface{} {
// Return the actual byte-type instead of nil allowing the persister
// to guess instantiate variable of the appropriate type
if s.state == nil {
return []byte{}
}
// Convert the starlark dict into a golang dictionary for serialization
state := make(map[string]interface{}, s.state.Len())
items := s.state.Items()
for _, item := range items {
if len(item) != 2 {
// We do expect key-value pairs in the state so there should be
// two items.
s.Log.Errorf("state item %+v does not contain a key-value pair", item)
continue
}
k, ok := item.Index(0).(starlark.String)
if !ok {
s.Log.Errorf("state item %+v has invalid key type %T", item, item.Index(0))
continue
}
v, err := asGoValue(item.Index(1))
if err != nil {
s.Log.Errorf("state item %+v value cannot be converted: %v", item, err)
continue
}
state[k.GoString()] = v
}
// Do a binary GOB encoding to preserve types
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(state); err != nil {
s.Log.Errorf("encoding state failed: %v", err)
return []byte{}
}
return buf.Bytes()
}
func (s *Common) SetState(state interface{}) error {
data, ok := state.([]byte)
if !ok {
return fmt.Errorf("unexpected type %T for state", state)
}
if len(data) == 0 {
return nil
}
// Decode the binary GOB encoding
var dict map[string]interface{}
if err := gob.NewDecoder(bytes.NewBuffer(data)).Decode(&dict); err != nil {
return fmt.Errorf("decoding state failed: %w", err)
}
// Convert the golang dict back to starlark types
s.state = starlark.NewDict(len(dict))
for k, v := range dict {
sv, err := asStarlarkValue(v)
if err != nil {
return fmt.Errorf("value %v of state item %q cannot be set: %w", v, k, err)
}
if err := s.state.SetKey(starlark.String(k), sv); err != nil {
return fmt.Errorf("state item %q cannot be set: %w", k, err)
}
}
return nil
}
func (s *Common) Init() error {
@ -47,14 +120,29 @@ func (s *Common) Init() error {
builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric)
builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy)
builtins["catch"] = starlark.NewBuiltin("catch", catch)
err := s.addConstants(&builtins)
if err != nil {
if err := s.addConstants(&builtins); err != nil {
return err
}
// Insert the persisted state if any
if s.state != nil {
builtins["state"] = s.state
}
// Load the program. In case of an error we can try to insert the state
// which can be used implicitly e.g. when persisting states
program, err := s.sourceProgram(builtins)
if err != nil {
return err
// Try again with a declared state. This might be necessary for
// state persistence.
s.state = starlark.NewDict(0)
builtins["state"] = s.state
p, serr := s.sourceProgram(builtins)
if serr != nil {
return err
}
program = p
}
// Execute source
@ -62,12 +150,16 @@ func (s *Common) Init() error {
if err != nil {
return err
}
// Make available a shared state to the apply function
globals["state"] = starlark.NewDict(0)
// Freeze the global state. This prevents modifications to the processor
// In case the program declares a global "state" we should insert it to
// avoid warnings about inserting into a frozen variable
if _, found := globals["state"]; found {
globals["state"] = starlark.NewDict(0)
}
// Freeze the global state. This prevents modifications to the processor
// state and prevents scripts from containing errors storing tracking
// metrics. Tasks that require global state will not be possible due to
// metrics. Tasks that require global state will not be possible due to
// this, so maybe we should relax this in the future.
globals.Freeze()
@ -107,6 +199,9 @@ func (s *Common) AddFunction(name string, params ...starlark.Value) error {
// Add all the constants defined in the plugin as constants of the script
func (s *Common) addConstants(builtins *starlark.StringDict) error {
for key, val := range s.Constants {
if key == "state" {
return errors.New("'state' constant uses reserved name")
}
sVal, err := asStarlarkValue(val)
if err != nil {
return fmt.Errorf("converting type %T failed: %w", val, err)

View File

@ -27,14 +27,12 @@ func (*Starlark) SampleConfig() string {
}
func (s *Starlark) Init() error {
err := s.Common.Init()
if err != nil {
if err := s.Common.Init(); err != nil {
return err
}
// The source should define an apply function.
err = s.AddFunction("apply", &common.Metric{})
if err != nil {
if err := s.AddFunction("apply", &common.Metric{}); err != nil {
return err
}
@ -118,6 +116,7 @@ func (s *Starlark) Add(origMetric telegraf.Metric, acc telegraf.Accumulator) err
default:
return fmt.Errorf("invalid type returned: %T", rv)
}
return nil
}

View File

@ -1,6 +1,8 @@
package starlark
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"os"
@ -3563,6 +3565,191 @@ def apply(metric):
}
}
func TestGlobalState(t *testing.T) {
source := `
def apply(metric):
count = state.get("count", 0)
count += 1
state["count"] = count
metric.fields["count"] = count
return metric
`
// Define the metrics
input := []telegraf.Metric{
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42},
time.Unix(1713188113, 10),
),
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42},
time.Unix(1713188113, 20),
),
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42},
time.Unix(1713188113, 30),
)}
expected := []telegraf.Metric{
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42, "count": 1},
time.Unix(1713188113, 10),
),
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42, "count": 2},
time.Unix(1713188113, 20),
),
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42, "count": 3},
time.Unix(1713188113, 30),
),
}
// Configure the plugin
plugin := &Starlark{
Common: common.Common{
StarlarkLoadFunc: testLoadFunc,
Source: source,
Log: testutil.Logger{},
},
}
require.NoError(t, plugin.Init())
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
// Do the processing
for _, m := range input {
require.NoError(t, plugin.Add(m, &acc))
}
plugin.Stop()
// Check
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual)
}
func TestStatePersistence(t *testing.T) {
source := `
def apply(metric):
count = state.get("count", 0)
count += 1
state["count"] = count
metric.fields["count"] = count
metric.tags["instance"] = state.get("instance", "unknown")
return metric
`
// Define the metrics
input := []telegraf.Metric{
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42},
time.Unix(1713188113, 10),
),
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42},
time.Unix(1713188113, 20),
),
metric.New(
"test",
map[string]string{},
map[string]interface{}{"value": 42},
time.Unix(1713188113, 30),
)}
expected := []telegraf.Metric{
metric.New(
"test",
map[string]string{"instance": "myhost"},
map[string]interface{}{"value": 42, "count": 1},
time.Unix(1713188113, 10),
),
metric.New(
"test",
map[string]string{"instance": "myhost"},
map[string]interface{}{"value": 42, "count": 2},
time.Unix(1713188113, 20),
),
metric.New(
"test",
map[string]string{"instance": "myhost"},
map[string]interface{}{"value": 42, "count": 3},
time.Unix(1713188113, 30),
),
}
// Configure the plugin
plugin := &Starlark{
Common: common.Common{
StarlarkLoadFunc: testLoadFunc,
Source: source,
Log: testutil.Logger{},
},
}
// Setup the "persisted" state
var pi telegraf.StatefulPlugin = plugin
var buf bytes.Buffer
require.NoError(t, gob.NewEncoder(&buf).Encode(map[string]interface{}{"instance": "myhost"}))
require.NoError(t, pi.SetState(buf.Bytes()))
require.NoError(t, plugin.Init())
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
// Do the processing
for _, m := range input {
require.NoError(t, plugin.Add(m, &acc))
}
plugin.Stop()
// Check
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual)
// Check getting the persisted state
expectedState := map[string]interface{}{"instance": "myhost", "count": int64(3)}
var actualState map[string]interface{}
stateData, ok := pi.GetState().([]byte)
require.True(t, ok, "state is not a bytes array")
require.NoError(t, gob.NewDecoder(bytes.NewBuffer(stateData)).Decode(&actualState))
require.EqualValues(t, expectedState, actualState, "mismatch in state")
}
func TestUsePredefinedStateName(t *testing.T) {
source := `
def apply(metric):
return metric
`
// Configure the plugin
plugin := &Starlark{
Common: common.Common{
StarlarkLoadFunc: testLoadFunc,
Source: source,
Constants: map[string]interface{}{"state": "invalid"},
Log: testutil.Logger{},
},
}
require.ErrorContains(t, plugin.Init(), "'state' constant uses reserved name")
}
// parses metric lines out of line protocol following a header, with a trailing blank line
func parseMetricsFrom(t *testing.T, lines []string, header string) (metrics []telegraf.Metric) {
parser := &influx.Parser{}