Shim refactor to support processors and output

This commit is contained in:
Steven Soroka 2020-06-04 19:09:22 -04:00
parent decd656a93
commit df26b037cb
19 changed files with 1032 additions and 85 deletions

View File

@ -84,6 +84,7 @@
- [#7726](https://github.com/influxdata/telegraf/pull/7726): Add laundry to mem plugin on FreeBSD.
- [#7762](https://github.com/influxdata/telegraf/pull/7762): Allow per input overriding of collection_jitter and precision.
- [#7686](https://github.com/influxdata/telegraf/pull/7686): Improve performance of procstat: Up to 40/120x better performance.
- [#7677](https://github.com/influxdata/telegraf/pull/7677): Expand execd shim support for processor and outputs.
#### Bugfixes

View File

@ -0,0 +1,63 @@
# Telegraf Execd Go Shim
The goal of this _shim_ is to make it trivial to extract an internal input,
processor, or output plugin from the main Telegraf repo out to a stand-alone
repo. This allows anyone to build and run it as a separate app using one of the
execd plugins:
- [inputs.execd](/plugins/inputs/execd)
- [processors.execd](/plugins/processors/execd)
- [outputs.execd](/plugins/outputs/execd)
## Steps to externalize a plugin
1. Move the project to an external repo, it's recommended to preserve the path
structure, (but not strictly necessary). eg if your plugin was at
`plugins/inputs/cpu`, it's recommended that it also be under `plugins/inputs/cpu`
in the new repo. For a further example of what this might look like, take a
look at [ssoroka/rand](https://github.com/ssoroka/rand) or
[danielnelson/telegraf-plugins](https://github.com/danielnelson/telegraf-plugins)
1. Copy [main.go](./example/cmd/main.go) into your project under the `cmd` folder.
This will be the entrypoint to the plugin when run as a stand-alone program, and
it will call the shim code for you to make that happen. It's recommended to
have only one plugin per repo, as the shim is not designed to run multiple
plugins at the same time (it would vastly complicate things).
1. Edit the main.go file to import your plugin. Within Telegraf this would have
been done in an all.go file, but here we don't split the two apart, and the change
just goes in the top of main.go. If you skip this step, your plugin will do nothing.
eg: `_ "github.com/me/my-plugin-telegraf/plugins/inputs/cpu"`
1. Optionally add a [plugin.conf](./example/cmd/plugin.conf) for configuration
specific to your plugin. Note that this config file **must be separate from the
rest of the config for Telegraf, and must not be in a shared directory where
Telegraf is expecting to load all configs**. If Telegraf reads this config file
it will not know which plugin it relates to. Telegraf instead uses an execd config
block to look for this plugin.
## Steps to build and run your plugin
1. Build the cmd/main.go. For my rand project this looks like `go build -o rand cmd/main.go`
1. If you're building an input, you can test out the binary just by running it.
eg `./rand -config plugin.conf`
Depending on your polling settings and whether you implemented a service plugin or
an input gathering plugin, you may see data right away, or you may have to hit enter
first, or wait for your poll duration to elapse, but the metrics will be written to
STDOUT. Ctrl-C to end your test.
If you're testig a processor or output manually, you can still do this but you
will need to feed valid metrics in on STDIN to verify that it is doing what you
want. This can be a very valuable debugging technique before hooking it up to
Telegraf.
1. Configure Telegraf to call your new plugin binary. For an input, this would
look something like:
```
[[inputs.execd]]
command = ["/path/to/rand", "-config", "/path/to/plugin.conf"]
signal = "none"
```
Refer to the execd plugin readmes for more information.
## Congratulations!
You've done it! Consider publishing your plugin to github and open a Pull Request
back to the Telegraf repo letting us know about the availability of your
[external plugin](https://github.com/influxdata/telegraf/blob/master/EXTERNAL_PLUGINS.md).

View File

@ -0,0 +1,163 @@
package shim
import (
"errors"
"fmt"
"io/ioutil"
"os"
"github.com/BurntSushi/toml"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/processors"
)
type config struct {
Inputs map[string][]toml.Primitive
Processors map[string][]toml.Primitive
Outputs map[string][]toml.Primitive
}
type loadedConfig struct {
Input telegraf.Input
Processor telegraf.StreamingProcessor
Output telegraf.Output
}
// LoadConfig Adds plugins to the shim
func (s *Shim) LoadConfig(filePath *string) error {
conf, err := LoadConfig(filePath)
if err != nil {
return err
}
if conf.Input != nil {
if err = s.AddInput(conf.Input); err != nil {
return fmt.Errorf("Failed to add Input: %w", err)
}
} else if conf.Processor != nil {
if err = s.AddStreamingProcessor(conf.Processor); err != nil {
return fmt.Errorf("Failed to add Processor: %w", err)
}
} else if conf.Output != nil {
if err = s.AddOutput(conf.Output); err != nil {
return fmt.Errorf("Failed to add Output: %w", err)
}
}
return nil
}
// LoadConfig loads the config and returns inputs that later need to be loaded.
func LoadConfig(filePath *string) (loaded loadedConfig, err error) {
var data string
conf := config{}
if filePath != nil && *filePath != "" {
b, err := ioutil.ReadFile(*filePath)
if err != nil {
return loadedConfig{}, err
}
data = expandEnvVars(b)
} else {
conf, err = DefaultImportedPlugins()
if err != nil {
return loadedConfig{}, err
}
}
md, err := toml.Decode(data, &conf)
if err != nil {
return loadedConfig{}, err
}
return createPluginsWithTomlConfig(md, conf)
}
func expandEnvVars(contents []byte) string {
return os.Expand(string(contents), getEnv)
}
func getEnv(key string) string {
v := os.Getenv(key)
return envVarEscaper.Replace(v)
}
func createPluginsWithTomlConfig(md toml.MetaData, conf config) (loadedConfig, error) {
loadedConf := loadedConfig{}
for name, primitives := range conf.Inputs {
creator, ok := inputs.Inputs[name]
if !ok {
return loadedConf, errors.New("unknown input " + name)
}
plugin := creator()
if len(primitives) > 0 {
primitive := primitives[0]
if err := md.PrimitiveDecode(primitive, plugin); err != nil {
return loadedConf, err
}
}
loadedConf.Input = plugin
break
}
for name, primitives := range conf.Processors {
creator, ok := processors.Processors[name]
if !ok {
return loadedConf, errors.New("unknown processor " + name)
}
plugin := creator()
if len(primitives) > 0 {
primitive := primitives[0]
if err := md.PrimitiveDecode(primitive, plugin); err != nil {
return loadedConf, err
}
}
loadedConf.Processor = plugin
break
}
for name, primitives := range conf.Outputs {
creator, ok := outputs.Outputs[name]
if !ok {
return loadedConf, errors.New("unknown output " + name)
}
plugin := creator()
if len(primitives) > 0 {
primitive := primitives[0]
if err := md.PrimitiveDecode(primitive, plugin); err != nil {
return loadedConf, err
}
}
loadedConf.Output = plugin
break
}
return loadedConf, nil
}
// DefaultImportedPlugins defaults to whatever plugins happen to be loaded and
// have registered themselves with the registry. This makes loading plugins
// without having to define a config dead easy.
func DefaultImportedPlugins() (config, error) {
conf := config{}
for name := range inputs.Inputs {
conf.Inputs[name] = []toml.Primitive{}
return conf, nil
}
for name := range processors.Processors {
conf.Processors[name] = []toml.Primitive{}
return conf, nil
}
for name := range outputs.Outputs {
conf.Outputs[name] = []toml.Primitive{}
return conf, nil
}
return conf, nil
}

View File

@ -0,0 +1,29 @@
package shim
import (
"os"
"testing"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/stretchr/testify/require"
)
func TestLoadConfig(t *testing.T) {
os.Setenv("SECRET_TOKEN", "xxxxxxxxxx")
os.Setenv("SECRET_VALUE", `test"\test`)
inputs.Add("test", func() telegraf.Input {
return &serviceInput{}
})
c := "./testdata/plugin.conf"
conf, err := LoadConfig(&c)
require.NoError(t, err)
inp := conf.Input.(*serviceInput)
require.Equal(t, "awesome name", inp.ServiceName)
require.Equal(t, "xxxxxxxxxx", inp.SecretToken)
require.Equal(t, `test"\test`, inp.SecretValue)
}

View File

@ -0,0 +1,60 @@
package main
import (
"flag"
"fmt"
"os"
"time"
// TODO: import your plugins
// _ "github.com/my_github_user/my_plugin_repo/plugins/inputs/mypluginname"
"github.com/influxdata/telegraf/plugins/common/shim"
)
var pollInterval = flag.Duration("poll_interval", 1*time.Second, "how often to send metrics")
var pollIntervalDisabled = flag.Bool("poll_interval_disabled", false, "how often to send metrics")
var configFile = flag.String("config", "", "path to the config file for this plugin")
var err error
// This is designed to be simple; Just change the import above and you're good.
//
// However, if you want to do all your config in code, you can like so:
//
// // initialize your plugin with any settngs you want
// myInput := &mypluginname.MyPlugin{
// DefaultSettingHere: 3,
// }
//
// shim := shim.New()
//
// shim.AddInput(myInput)
//
// // now the shim.Run() call as below.
//
func main() {
// parse command line options
flag.Parse()
if *pollIntervalDisabled {
*pollInterval = shim.PollIntervalDisabled
}
// create the shim. This is what will run your plugins.
shim := shim.New()
// If no config is specified, all imported plugins are loaded.
// otherwise follow what the config asks for.
// Check for settings from a config toml file,
// (or just use whatever plugins were imported above)
err = shim.LoadConfig(configFile)
if err != nil {
fmt.Fprintf(os.Stderr, "Err loading input: %s\n", err)
os.Exit(1)
}
// run the input plugin(s) until stdin closes or we receive a termination signal
if err := shim.Run(*pollInterval); err != nil {
fmt.Fprintf(os.Stderr, "Err: %s\n", err)
os.Exit(1)
}
}

View File

@ -0,0 +1,2 @@
[[inputs.my_plugin_name]]
value_name = "value"

View File

@ -0,0 +1,131 @@
package shim
import (
"context"
"fmt"
"io"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/serializers/influx"
)
type empty struct{}
var (
forever = 100 * 365 * 24 * time.Hour
envVarEscaper = strings.NewReplacer(
`"`, `\"`,
`\`, `\\`,
)
)
const (
// PollIntervalDisabled is used to indicate that you want to disable polling,
// as opposed to duration 0 meaning poll constantly.
PollIntervalDisabled = time.Duration(0)
)
// Shim allows you to wrap your inputs and run them as if they were part of Telegraf,
// except built externally.
type Shim struct {
Input telegraf.Input
Processor telegraf.StreamingProcessor
Output telegraf.Output
// streams
stdin io.Reader
stdout io.Writer
stderr io.Writer
// outgoing metric channel
metricCh chan telegraf.Metric
// input only
gatherPromptCh chan empty
}
// New creates a new shim interface
func New() *Shim {
return &Shim{
metricCh: make(chan telegraf.Metric, 1),
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
}
}
func (s *Shim) watchForShutdown(cancel context.CancelFunc) {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-quit // user-triggered quit
// cancel, but keep looping until the metric channel closes.
cancel()
}()
}
// Run the input plugins..
func (s *Shim) Run(pollInterval time.Duration) error {
if s.Input != nil {
err := s.RunInput(pollInterval)
if err != nil {
return fmt.Errorf("RunInput error: %w", err)
}
} else if s.Processor != nil {
err := s.RunProcessor()
if err != nil {
return fmt.Errorf("RunProcessor error: %w", err)
}
} else if s.Output != nil {
err := s.RunOutput()
if err != nil {
return fmt.Errorf("RunOutput error: %w", err)
}
} else {
return fmt.Errorf("Nothing to run")
}
return nil
}
func hasQuit(ctx context.Context) bool {
return ctx.Err() != nil
}
func (s *Shim) writeProcessedMetrics() error {
serializer := influx.NewSerializer()
for {
select {
case m, open := <-s.metricCh:
if !open {
return nil
}
b, err := serializer.Serialize(m)
if err != nil {
return fmt.Errorf("failed to serialize metric: %s", err)
}
// Write this to stdout
fmt.Fprint(s.stdout, string(b))
}
}
}
// LogName satisfies the MetricMaker interface
func (s *Shim) LogName() string {
return ""
}
// MakeMetric satisfies the MetricMaker interface
func (s *Shim) MakeMetric(m telegraf.Metric) telegraf.Metric {
return m // don't need to do anything to it.
}
// Log satisfies the MetricMaker interface
func (s *Shim) Log() telegraf.Logger {
return nil
}

View File

@ -0,0 +1,108 @@
package shim
import (
"bufio"
"context"
"fmt"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/agent"
)
// AddInput adds the input to the shim. Later calls to Run() will run this input.
func (s *Shim) AddInput(input telegraf.Input) error {
if p, ok := input.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return fmt.Errorf("failed to init input: %s", err)
}
}
s.Input = input
return nil
}
func (s *Shim) RunInput(pollInterval time.Duration) error {
// context is used only to close the stdin reader. everything else cascades
// from that point and closes cleanly when it's done.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.watchForShutdown(cancel)
acc := agent.NewAccumulator(s, s.metricCh)
acc.SetPrecision(time.Nanosecond)
if serviceInput, ok := s.Input.(telegraf.ServiceInput); ok {
if err := serviceInput.Start(acc); err != nil {
return fmt.Errorf("failed to start input: %s", err)
}
}
s.gatherPromptCh = make(chan empty, 1)
go func() {
s.startGathering(ctx, s.Input, acc, pollInterval)
if serviceInput, ok := s.Input.(telegraf.ServiceInput); ok {
serviceInput.Stop()
}
// closing the metric channel gracefully stops writing to stdout
close(s.metricCh)
}()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s.writeProcessedMetrics()
wg.Done()
}()
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
// push a non-blocking message to trigger metric collection.
s.pushCollectMetricsRequest()
}
cancel() // cancel gracefully stops gathering
wg.Wait() // wait for writing to stdout to finish
return nil
}
func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, pollInterval time.Duration) {
if pollInterval == PollIntervalDisabled {
pollInterval = forever
}
t := time.NewTicker(pollInterval)
defer t.Stop()
for {
// give priority to stopping.
if hasQuit(ctx) {
return
}
// see what's up
select {
case <-ctx.Done():
return
case <-s.gatherPromptCh:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(s.stderr, "failed to gather metrics: %s\n", err)
}
case <-t.C:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(s.stderr, "failed to gather metrics: %s\n", err)
}
}
}
}
// pushCollectMetricsRequest pushes a non-blocking (nil) message to the
// gatherPromptCh channel to trigger metric collection.
// The channel is defined with a buffer of 1, so while it's full, subsequent
// requests are discarded.
func (s *Shim) pushCollectMetricsRequest() {
// push a message out to each channel to collect metrics. don't block.
select {
case s.gatherPromptCh <- empty{}:
default:
}
}

View File

@ -0,0 +1,141 @@
package shim
import (
"bufio"
"io"
"io/ioutil"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/influxdata/telegraf"
)
func TestInputShimTimer(t *testing.T) {
stdoutReader, stdoutWriter := io.Pipe()
stdin, _ := io.Pipe() // hold the stdin pipe open
metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond, stdin, stdoutWriter, nil)
<-metricProcessed
r := bufio.NewReader(stdoutReader)
out, err := r.ReadString('\n')
require.NoError(t, err)
require.Contains(t, out, "\n")
metricLine := strings.Split(out, "\n")[0]
require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine)
}
func TestInputShimStdinSignalingWorks(t *testing.T) {
stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()
metricProcessed, exited := runInputPlugin(t, 40*time.Second, stdinReader, stdoutWriter, nil)
stdinWriter.Write([]byte("\n"))
<-metricProcessed
r := bufio.NewReader(stdoutReader)
out, err := r.ReadString('\n')
require.NoError(t, err)
require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out)
stdinWriter.Close()
go ioutil.ReadAll(r)
// check that it exits cleanly
<-exited
}
func runInputPlugin(t *testing.T, interval time.Duration, stdin io.Reader, stdout, stderr io.Writer) (metricProcessed chan bool, exited chan bool) {
metricProcessed = make(chan bool, 1)
exited = make(chan bool, 1)
inp := &testInput{
metricProcessed: metricProcessed,
}
shim := New()
if stdin != nil {
shim.stdin = stdin
}
if stdout != nil {
shim.stdout = stdout
}
if stderr != nil {
shim.stderr = stderr
}
shim.AddInput(inp)
go func() {
err := shim.Run(interval)
require.NoError(t, err)
exited <- true
}()
return metricProcessed, exited
}
type testInput struct {
metricProcessed chan bool
}
func (i *testInput) SampleConfig() string {
return ""
}
func (i *testInput) Description() string {
return ""
}
func (i *testInput) Gather(acc telegraf.Accumulator) error {
acc.AddFields("measurement",
map[string]interface{}{
"field": 1,
},
map[string]string{
"tag": "tag",
}, time.Unix(1234, 5678))
i.metricProcessed <- true
return nil
}
func (i *testInput) Start(acc telegraf.Accumulator) error {
return nil
}
func (i *testInput) Stop() {
}
type serviceInput struct {
ServiceName string `toml:"service_name"`
SecretToken string `toml:"secret_token"`
SecretValue string `toml:"secret_value"`
}
func (i *serviceInput) SampleConfig() string {
return ""
}
func (i *serviceInput) Description() string {
return ""
}
func (i *serviceInput) Gather(acc telegraf.Accumulator) error {
acc.AddFields("measurement",
map[string]interface{}{
"field": 1,
},
map[string]string{
"tag": "tag",
}, time.Unix(1234, 5678))
return nil
}
func (i *serviceInput) Start(acc telegraf.Accumulator) error {
return nil
}
func (i *serviceInput) Stop() {
}

View File

@ -0,0 +1,51 @@
package shim
import (
"bufio"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
)
// AddOutput adds the input to the shim. Later calls to Run() will run this.
func (s *Shim) AddOutput(output telegraf.Output) error {
if p, ok := output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return fmt.Errorf("failed to init input: %s", err)
}
}
s.Output = output
return nil
}
func (s *Shim) RunOutput() error {
parser, err := parsers.NewInfluxParser()
if err != nil {
return fmt.Errorf("Failed to create new parser: %w", err)
}
err = s.Output.Connect()
if err != nil {
return fmt.Errorf("failed to start processor: %w", err)
}
defer s.Output.Close()
var m telegraf.Metric
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
m, err = parser.ParseLine(scanner.Text())
if err != nil {
fmt.Fprintf(s.stderr, "Failed to parse metric: %s\n", err)
continue
}
if err = s.Output.Write([]telegraf.Metric{m}); err != nil {
fmt.Fprintf(s.stderr, "Failed to write metric: %s\n", err)
}
}
return nil
}

View File

@ -0,0 +1,82 @@
package shim
import (
"io"
"sync"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestOutputShim(t *testing.T) {
o := &testOutput{}
stdinReader, stdinWriter := io.Pipe()
s := New()
s.stdin = stdinReader
err := s.AddOutput(o)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err := s.RunOutput()
require.NoError(t, err)
wg.Done()
}()
serializer, _ := serializers.NewInfluxSerializer()
m, _ := metric.New("thing",
map[string]string{
"a": "b",
},
map[string]interface{}{
"v": 1,
},
time.Now(),
)
b, err := serializer.Serialize(m)
require.NoError(t, err)
_, err = stdinWriter.Write(b)
require.NoError(t, err)
err = stdinWriter.Close()
require.NoError(t, err)
wg.Wait()
require.Len(t, o.MetricsWritten, 1)
mOut := o.MetricsWritten[0]
testutil.RequireMetricEqual(t, m, mOut)
}
type testOutput struct {
MetricsWritten []telegraf.Metric
}
func (o *testOutput) Connect() error {
return nil
}
func (o *testOutput) Close() error {
return nil
}
func (o *testOutput) Write(metrics []telegraf.Metric) error {
o.MetricsWritten = append(o.MetricsWritten, metrics...)
return nil
}
func (o *testOutput) SampleConfig() string {
return ""
}
func (o *testOutput) Description() string {
return ""
}

View File

@ -0,0 +1,69 @@
package shim
import (
"bufio"
"fmt"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/agent"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/processors"
)
// AddProcessor adds the processor to the shim. Later calls to Run() will run this.
func (s *Shim) AddProcessor(processor telegraf.Processor) error {
p := processors.NewStreamingProcessorFromProcessor(processor)
return s.AddStreamingProcessor(p)
}
// AddStreamingProcessor adds the processor to the shim. Later calls to Run() will run this.
func (s *Shim) AddStreamingProcessor(processor telegraf.StreamingProcessor) error {
if p, ok := processor.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return fmt.Errorf("failed to init input: %s", err)
}
}
s.Processor = processor
return nil
}
func (s *Shim) RunProcessor() error {
acc := agent.NewAccumulator(s, s.metricCh)
acc.SetPrecision(time.Nanosecond)
parser, err := parsers.NewInfluxParser()
if err != nil {
return fmt.Errorf("Failed to create new parser: %w", err)
}
err = s.Processor.Start(acc)
if err != nil {
return fmt.Errorf("failed to start processor: %w", err)
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
s.writeProcessedMetrics()
wg.Done()
}()
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
m, err := parser.ParseLine(scanner.Text())
if err != nil {
fmt.Fprintf(s.stderr, "Failed to parse metric: %s\b", err)
continue
}
s.Processor.Add(m, acc)
}
close(s.metricCh)
s.Processor.Stop()
wg.Wait()
return nil
}

View File

@ -0,0 +1,88 @@
package shim
import (
"bufio"
"io"
"io/ioutil"
"sync"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/serializers"
"github.com/stretchr/testify/require"
)
func TestProcessorShim(t *testing.T) {
p := &testProcessor{}
stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()
s := New()
// inject test into shim
s.stdin = stdinReader
s.stdout = stdoutWriter
err := s.AddProcessor(p)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err := s.RunProcessor()
require.NoError(t, err)
wg.Done()
}()
serializer, _ := serializers.NewInfluxSerializer()
parser, _ := parsers.NewInfluxParser()
m, _ := metric.New("thing",
map[string]string{
"a": "b",
},
map[string]interface{}{
"v": 1,
},
time.Now(),
)
b, err := serializer.Serialize(m)
require.NoError(t, err)
_, err = stdinWriter.Write(b)
require.NoError(t, err)
err = stdinWriter.Close()
require.NoError(t, err)
r := bufio.NewReader(stdoutReader)
out, err := r.ReadString('\n')
require.NoError(t, err)
mOut, err := parser.ParseLine(out)
require.NoError(t, err)
val, ok := mOut.GetTag("hi")
require.True(t, ok)
require.Equal(t, "mom", val)
go ioutil.ReadAll(r)
wg.Wait()
}
type testProcessor struct{}
func (p *testProcessor) Apply(in ...telegraf.Metric) []telegraf.Metric {
for _, metric := range in {
metric.AddTag("hi", "mom")
}
return in
}
func (p *testProcessor) SampleConfig() string {
return ""
}
func (p *testProcessor) Description() string {
return ""
}

View File

@ -0,0 +1,4 @@
[[inputs.test]]
service_name = "awesome name"
secret_token = "${SECRET_TOKEN}"
secret_value = "$SECRET_VALUE"

View File

@ -1,48 +1,3 @@
# Telegraf Execd Go Shim
The goal of this _shim_ is to make it trivial to extract an internal input plugin
out to a stand-alone repo for the purpose of compiling it as a separate app and
running it from the inputs.execd plugin.
The execd-shim is still experimental and the interface may change in the future.
Especially as the concept expands to processors, aggregators, and outputs.
## Steps to externalize a plugin
1. Move the project to an external repo, optionally preserving the
_plugins/inputs/plugin_name_ folder structure. For an example of what this might
look at, take a look at [ssoroka/rand](https://github.com/ssoroka/rand) or
[danielnelson/telegraf-plugins](https://github.com/danielnelson/telegraf-plugins)
1. Copy [main.go](./example/cmd/main.go) into your project under the cmd folder.
This will be the entrypoint to the plugin when run as a stand-alone program, and
it will call the shim code for you to make that happen.
1. Edit the main.go file to import your plugin. Within Telegraf this would have
been done in an all.go file, but here we don't split the two apart, and the change
just goes in the top of main.go. If you skip this step, your plugin will do nothing.
1. Optionally add a [plugin.conf](./example/cmd/plugin.conf) for configuration
specific to your plugin. Note that this config file **must be separate from the
rest of the config for Telegraf, and must not be in a shared directory where
Telegraf is expecting to load all configs**. If Telegraf reads this config file
it will not know which plugin it relates to.
## Steps to build and run your plugin
1. Build the cmd/main.go. For my rand project this looks like `go build -o rand cmd/main.go`
1. Test out the binary if you haven't done this yet. eg `./rand -config plugin.conf`
Depending on your polling settings and whether you implemented a service plugin or
an input gathering plugin, you may see data right away, or you may have to hit enter
first, or wait for your poll duration to elapse, but the metrics will be written to
STDOUT. Ctrl-C to end your test.
1. Configure Telegraf to call your new plugin binary. eg:
```
[[inputs.execd]]
command = ["/path/to/rand", "-config", "/path/to/plugin.conf"]
signal = "none"
```
## Congratulations!
You've done it! Consider publishing your plugin to github and open a Pull Request
back to the Telegraf repo letting us know about the availability of your
[external plugin](https://github.com/influxdata/telegraf/blob/master/EXTERNAL_PLUGINS.md).
This is deprecated. Please see (/plugins/common/shim/README.md)[https://github.com/influxdata/telegraf/tree/master/plugins/common/shim/README.md]

View File

@ -24,10 +24,8 @@ import (
type empty struct{}
var (
stdout io.Writer = os.Stdout
stdin io.Reader = os.Stdin
forever = 100 * 365 * 24 * time.Hour
envVarEscaper = strings.NewReplacer(
forever = 100 * 365 * 24 * time.Hour
envVarEscaper = strings.NewReplacer(
`"`, `\"`,
`\`, `\\`,
)
@ -45,11 +43,19 @@ type Shim struct {
Inputs []telegraf.Input
gatherPromptChans []chan empty
metricCh chan telegraf.Metric
stdin io.Reader
stdout io.Writer
stderr io.Writer
}
// New creates a new shim interface
func New() *Shim {
return &Shim{}
return &Shim{
stdin: os.Stdin,
stdout: os.Stdout,
stderr: os.Stderr,
}
}
// AddInput adds the input to the shim. Later calls to Run() will run this input.
@ -108,7 +114,7 @@ func (s *Shim) Run(pollInterval time.Duration) error {
s.gatherPromptChans = append(s.gatherPromptChans, gatherPromptCh)
wg.Add(1) // one per input
go func(input telegraf.Input) {
startGathering(ctx, input, acc, gatherPromptCh, pollInterval)
s.startGathering(ctx, input, acc, gatherPromptCh, pollInterval)
if serviceInput, ok := input.(telegraf.ServiceInput); ok {
serviceInput.Stop()
}
@ -141,7 +147,7 @@ loop:
return fmt.Errorf("failed to serialize metric: %s", err)
}
// Write this to stdout
fmt.Fprint(stdout, string(b))
fmt.Fprint(s.stdout, string(b))
}
}
@ -163,7 +169,7 @@ func (s *Shim) stdinCollectMetricsPrompt(ctx context.Context, cancel context.Can
close(collectMetricsPrompt)
}()
scanner := bufio.NewScanner(stdin)
scanner := bufio.NewScanner(s.stdin)
// for every line read from stdin, make sure we're not supposed to quit,
// then push a message on to the collectMetricsPrompt
for scanner.Scan() {
@ -201,7 +207,7 @@ func (s *Shim) collectMetrics(ctx context.Context) {
}
}
func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, gatherPromptCh <-chan empty, pollInterval time.Duration) {
func (s *Shim) startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accumulator, gatherPromptCh <-chan empty, pollInterval time.Duration) {
if pollInterval == PollIntervalDisabled {
return // don't poll
}
@ -218,11 +224,11 @@ func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accu
return
case <-gatherPromptCh:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err)
fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err)
}
case <-t.C:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err)
fmt.Fprintf(s.stderr, "failed to gather metrics: %s", err)
}
}
}
@ -269,12 +275,7 @@ func LoadConfig(filePath *string) ([]telegraf.Input, error) {
return nil, err
}
loadedInputs, err := loadConfigIntoInputs(md, conf.Inputs)
if len(md.Undecoded()) > 0 {
fmt.Fprintf(stdout, "Some plugins were loaded but not used: %q\n", md.Undecoded())
}
return loadedInputs, err
return loadConfigIntoInputs(md, conf.Inputs)
}
func expandEnvVars(contents []byte) string {

View File

@ -23,12 +23,9 @@ func TestShimUSR1SignalingWorks(t *testing.T) {
stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()
stdin = stdinReader
stdout = stdoutWriter
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
metricProcessed, exited := runInputPlugin(t, 20*time.Minute)
metricProcessed, exited := runInputPlugin(t, 20*time.Minute, stdinReader, stdoutWriter, nil)
// signal USR1 to yourself.
pid := os.Getpid()

View File

@ -2,7 +2,6 @@ package shim
import (
"bufio"
"bytes"
"io"
"os"
"strings"
@ -16,20 +15,16 @@ import (
)
func TestShimWorks(t *testing.T) {
stdoutBytes := bytes.NewBufferString("")
stdout = stdoutBytes
stdoutReader, stdoutWriter := io.Pipe()
stdin, _ = io.Pipe() // hold the stdin pipe open
stdin, _ := io.Pipe() // hold the stdin pipe open
metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond)
metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond, stdin, stdoutWriter, nil)
<-metricProcessed
for stdoutBytes.Len() == 0 {
t.Log("Waiting for bytes available in stdout")
time.Sleep(10 * time.Millisecond)
}
out := string(stdoutBytes.Bytes())
r := bufio.NewReader(stdoutReader)
out, err := r.ReadString('\n')
require.NoError(t, err)
require.Contains(t, out, "\n")
metricLine := strings.Split(out, "\n")[0]
require.Equal(t, "measurement,tag=tag field=1i 1234000005678", metricLine)
@ -39,10 +34,7 @@ func TestShimStdinSignalingWorks(t *testing.T) {
stdinReader, stdinWriter := io.Pipe()
stdoutReader, stdoutWriter := io.Pipe()
stdin = stdinReader
stdout = stdoutWriter
metricProcessed, exited := runInputPlugin(t, 40*time.Second)
metricProcessed, exited := runInputPlugin(t, 40*time.Second, stdinReader, stdoutWriter, nil)
stdinWriter.Write([]byte("\n"))
@ -61,14 +53,24 @@ func TestShimStdinSignalingWorks(t *testing.T) {
<-exited
}
func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan bool, exited chan bool) {
metricProcessed = make(chan bool, 10)
func runInputPlugin(t *testing.T, interval time.Duration, stdin io.Reader, stdout, stderr io.Writer) (metricProcessed chan bool, exited chan bool) {
metricProcessed = make(chan bool)
exited = make(chan bool)
inp := &testInput{
metricProcessed: metricProcessed,
}
shim := New()
if stdin != nil {
shim.stdin = stdin
}
if stdout != nil {
shim.stdout = stdout
}
if stderr != nil {
shim.stderr = stderr
}
shim.AddInput(inp)
go func() {
err := shim.Run(interval)

View File

@ -12,7 +12,7 @@ Program output on standard error is mirrored to the telegraf log.
- Metrics with tracking will be considered "delivered" as soon as they are passed
to the external process. There is currently no way to match up which metric
coming out of the execd process relates to which metric going in (keep in mind
that processors can add and drop metrics, and that this is all done
that processors can add and drop metrics, and that this is all done
asynchronously).
- it's not currently possible to use a data_format other than "influx", due to
the requirement that it is serialize-parse symmetrical and does not lose any