package starlark import ( "errors" "fmt" "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/processors" "go.starlark.net/resolve" "go.starlark.net/starlark" "go.starlark.net/starlarkjson" ) const ( description = "Process metrics using a Starlark script" sampleConfig = ` ## The Starlark source can be set as a string in this configuration file, or ## by referencing a file containing the script. Only one source or script ## should be set at once. ## ## Source of the Starlark script. source = ''' def apply(metric): return metric ''' ## File containing a Starlark script. # script = "/usr/local/bin/myscript.star" ## The constants of the Starlark script. # [processors.starlark.constants] # max_size = 10 # threshold = 0.75 # default_name = "Julia" # debug_mode = true ` ) type Starlark struct { Source string `toml:"source"` Script string `toml:"script"` Constants map[string]interface{} `toml:"constants"` Log telegraf.Logger `toml:"-"` thread *starlark.Thread applyFunc *starlark.Function args starlark.Tuple results []telegraf.Metric } func (s *Starlark) Init() error { if s.Source == "" && s.Script == "" { return errors.New("one of source or script must be set") } if s.Source != "" && s.Script != "" { return errors.New("both source or script cannot be set") } s.thread = &starlark.Thread{ Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) }, Load: func(thread *starlark.Thread, module string) (starlark.StringDict, error) { return loadFunc(thread, module, s.Log) }, } builtins := starlark.StringDict{} builtins["Metric"] = starlark.NewBuiltin("Metric", newMetric) builtins["deepcopy"] = starlark.NewBuiltin("deepcopy", deepcopy) builtins["catch"] = starlark.NewBuiltin("catch", catch) s.addConstants(&builtins) program, err := s.sourceProgram(builtins) if err != nil { return err } // Execute source globals, err := program.Init(s.thread, builtins) if err != nil { return err } // Make available a shared state to the apply function globals["state"] = starlark.NewDict(0) // Freeze the global state. This prevents modifications to the processor // state and prevents scripts from containing errors storing tracking // metrics. Tasks that require global state will not be possible due to // this, so maybe we should relax this in the future. globals.Freeze() // The source should define an apply function. apply := globals["apply"] if apply == nil { return errors.New("apply is not defined") } var ok bool if s.applyFunc, ok = apply.(*starlark.Function); !ok { return errors.New("apply is not a function") } if s.applyFunc.NumParams() != 1 { return errors.New("apply function must take one parameter") } // Reusing the same metric wrapper to skip an allocation. This will cause // any saved references to point to the new metric, but due to freezing the // globals none should exist. s.args = make(starlark.Tuple, 1) s.args[0] = &Metric{} // Preallocate a slice for return values. s.results = make([]telegraf.Metric, 0, 10) return nil } func (s *Starlark) sourceProgram(builtins starlark.StringDict) (*starlark.Program, error) { if s.Source != "" { _, program, err := starlark.SourceProgram("processor.starlark", s.Source, builtins.Has) return program, err } _, program, err := starlark.SourceProgram(s.Script, nil, builtins.Has) return program, err } func (s *Starlark) SampleConfig() string { return sampleConfig } func (s *Starlark) Description() string { return description } func (s *Starlark) Start(acc telegraf.Accumulator) error { return nil } func (s *Starlark) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { s.args[0].(*Metric).Wrap(metric) rv, err := starlark.Call(s.thread, s.applyFunc, s.args, nil) if err != nil { if err, ok := err.(*starlark.EvalError); ok { for _, line := range strings.Split(err.Backtrace(), "\n") { s.Log.Error(line) } } metric.Reject() return err } switch rv := rv.(type) { case *starlark.List: iter := rv.Iterate() defer iter.Done() var v starlark.Value for iter.Next(&v) { switch v := v.(type) { case *Metric: m := v.Unwrap() if containsMetric(s.results, m) { s.Log.Errorf("Duplicate metric reference detected") continue } s.results = append(s.results, m) acc.AddMetric(m) default: s.Log.Errorf("Invalid type returned in list: %s", v.Type()) } } // If the script didn't return the original metrics, mark it as // successfully handled. if !containsMetric(s.results, metric) { metric.Accept() } // clear results for i := range s.results { s.results[i] = nil } s.results = s.results[:0] case *Metric: m := rv.Unwrap() // If the script returned a different metric, mark this metric as // successfully handled. if m != metric { metric.Accept() } acc.AddMetric(m) case starlark.NoneType: metric.Drop() default: return fmt.Errorf("Invalid type returned: %T", rv) } return nil } func (s *Starlark) Stop() error { return nil } // Add all the constants defined in the plugin as constants of the script func (s *Starlark) addConstants(builtins *starlark.StringDict) { for key, val := range s.Constants { sVal, err := asStarlarkValue(val) if err != nil { s.Log.Errorf("Unsupported type: %T", val) } (*builtins)[key] = sVal } } func containsMetric(metrics []telegraf.Metric, metric telegraf.Metric) bool { for _, m := range metrics { if m == metric { return true } } return false } func init() { // https://github.com/bazelbuild/starlark/issues/20 resolve.AllowNestedDef = true resolve.AllowLambda = true resolve.AllowFloat = true resolve.AllowSet = true resolve.AllowGlobalReassign = true resolve.AllowRecursion = true } func init() { processors.AddStreaming("starlark", func() telegraf.StreamingProcessor { return &Starlark{} }) } func loadFunc(thread *starlark.Thread, module string, logger telegraf.Logger) (starlark.StringDict, error) { switch module { case "json.star": return starlark.StringDict{ "json": starlarkjson.Module, }, nil case "logging.star": return starlark.StringDict{ "log": LogModule(logger), }, nil default: return nil, errors.New("module " + module + " is not available") } }