From ac8f4c1e15af245fd89d0d664e1f9d2057d5482e Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Wed, 14 Oct 2020 11:12:41 -0400 Subject: [PATCH] add plugin documentation --- docs/INPUTS.md | 4 +++- docs/OUTPUTS.md | 18 ++++++++++++------ docs/PROCESSORS.md | 23 +++++++++++++++++++++++ input.go | 7 +++++-- output.go | 7 +++++-- plugin.go | 4 +++- plugins/inputs/http_response/README.md | 8 +++----- processor.go | 24 +++++++++++++++++------- 8 files changed, 71 insertions(+), 24 deletions(-) diff --git a/docs/INPUTS.md b/docs/INPUTS.md index f8e906f31..179b67444 100644 --- a/docs/INPUTS.md +++ b/docs/INPUTS.md @@ -38,7 +38,8 @@ import ( ) type Simple struct { - Ok bool `toml:"ok"` + Ok bool `toml:"ok"` + Log telegraf.Logger `toml:"-"` } func (s *Simple) Description() string { @@ -52,6 +53,7 @@ func (s *Simple) SampleConfig() string { ` } +// Init is for setup, and validating config. func (s *Simple) Init() error { return nil } diff --git a/docs/OUTPUTS.md b/docs/OUTPUTS.md index c60cd96ba..1a27ca515 100644 --- a/docs/OUTPUTS.md +++ b/docs/OUTPUTS.md @@ -30,7 +30,8 @@ import ( ) type Simple struct { - Ok bool `toml:"ok"` + Ok bool `toml:"ok"` + Log telegraf.Logger `toml:"-"` } func (s *Simple) Description() string { @@ -43,20 +44,25 @@ func (s *Simple) SampleConfig() string { ` } +// Init is for setup, and validating config. func (s *Simple) Init() error { return nil } func (s *Simple) Connect() error { - // Make a connection to the URL here + // Make any connection required here return nil } func (s *Simple) Close() error { - // Close connection to the URL here + // Close any connections here. + // Write will not be called once Close is called, so there is no need to synchronize. return nil } +// Write should write immediately to the output, and not buffer writes +// (Telegraf manages the buffer for you). Returning an error will fail this +// batch of writes and the entire batch will be retried automatically. func (s *Simple) Write(metrics []telegraf.Metric) error { for _, metric := range metrics { // write `metric` to the output sink here @@ -102,9 +108,9 @@ Metrics are flushed to outputs when any of the following events happen: - The telegraf process has received a SIGUSR1 signal Note that if the flush takes longer than the `agent.interval` to write the metrics -to the output, you'll see a message saying the output `did not complete within its -flush interval`. This may mean your output is not keeping up with the flow of metrics, -and you may want to look into enabling compression, reducing the size of your metrics, +to the output, you'll see a message saying the output `did not complete within its +flush interval`. This may mean your output is not keeping up with the flow of metrics, +and you may want to look into enabling compression, reducing the size of your metrics, or investigate other reasons why the writes might be taking longer than expected. [file]: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/file diff --git a/docs/PROCESSORS.md b/docs/PROCESSORS.md index 47f29a416..25566fe32 100644 --- a/docs/PROCESSORS.md +++ b/docs/PROCESSORS.md @@ -33,6 +33,7 @@ import ( ) type Printer struct { + Log telegraf.Logger `toml:"-"` } var sampleConfig = ` @@ -46,6 +47,7 @@ func (p *Printer) Description() string { return "Print all metrics that pass through this filter." } +// Init is for setup, and validating config. func (p *Printer) Init() error { return nil } @@ -97,6 +99,7 @@ import ( ) type Printer struct { + Log telegraf.Logger `toml:"-"` } var sampleConfig = ` @@ -110,13 +113,27 @@ func (p *Printer) Description() string { return "Print all metrics that pass through this filter." } +// Init is for setup, and validating config. func (p *Printer) Init() error { return nil } +// Start is called once when the plugin starts; it is only called once per +// plugin instance, and never in parallel. +// Start should return once it is ready to receive metrics. +// The passed in accumulator is the same as the one passed to Add(), so you +// can choose to save it in the plugin, or use the one received from Add(). func (p *Printer) Start(acc telegraf.Accumulator) error { } +// Add is called for each metric to be processed. The Add() function does not +// need to wait for the metric to be processed before returning, and it may +// be acceptable to let background goroutine(s) handle the processing if you +// have slow processing you need to do in parallel. +// Keep in mind Add() should not spawn unbounded goroutines, so you may need +// to use a semaphore or pool of workers (eg: reverse_dns plugin does this). +// Metrics you don't want to pass downstream should have metric.Drop() called, +// rather than simply omitting the acc.AddMetric() call func (p *Printer) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { // print! fmt.Println(metric.String()) @@ -127,6 +144,12 @@ func (p *Printer) Add(metric telegraf.Metric, acc telegraf.Accumulator) error { return nil } +// Stop gives you an opportunity to gracefully shut down the processor. +// Once Stop() is called, Add() will not be called any more. If you are using +// goroutines, you should wait for any in-progress metrics to be processed +// before returning from Stop(). +// When stop returns, you should no longer be writing metrics to the +// accumulator. func (p *Printer) Stop() error { } diff --git a/input.go b/input.go index 08cfd75b9..0f2dac2a3 100644 --- a/input.go +++ b/input.go @@ -4,7 +4,7 @@ type Input interface { PluginDescriber // Gather takes in an accumulator and adds the metrics that the Input - // gathers. This is called every "interval" + // gathers. This is called every agent.interval Gather(Accumulator) error } @@ -15,6 +15,9 @@ type ServiceInput interface { // Stop returns. Start(Accumulator) error - // Stop stops the services and closes any necessary channels and connections + // Stop stops the services and closes any necessary channels and connections. + // Metrics should not be written out to the accumulator once stop returns, so + // Stop() should stop reading and wait for any in-flight metrics to write out + // to the accumulator before returning. Stop() } diff --git a/output.go b/output.go index 0045b2ca6..52755b5da 100644 --- a/output.go +++ b/output.go @@ -3,9 +3,12 @@ package telegraf type Output interface { PluginDescriber - // Connect to the Output + // Connect to the Output; connect is only called once when the plugin starts Connect() error - // Close any connections to the Output + // Close any connections to the Output. Close is called once when the output + // is shutting down. Close will not be called until all writes have finished, + // and Write() will not be called once Close() has been, so locking is not + // necessary. Close() error // Write takes in group of points to be written to the Output Write(metrics []Metric) error diff --git a/plugin.go b/plugin.go index 29e8bb683..0793fbb06 100644 --- a/plugin.go +++ b/plugin.go @@ -10,7 +10,9 @@ type Initializer interface { } // PluginDescriber contains the functions all plugins must implement to describe -// themselves to Telegraf +// themselves to Telegraf. Note that all plugins may define a logger that is +// not part of the interface, but will receive an injected logger if it's set. +// eg: Log telegraf.Logger `toml:"-"` type PluginDescriber interface { // SampleConfig returns the default configuration of the Processor SampleConfig() string diff --git a/plugins/inputs/http_response/README.md b/plugins/inputs/http_response/README.md index 889b6f4f3..67d0dc067 100644 --- a/plugins/inputs/http_response/README.md +++ b/plugins/inputs/http_response/README.md @@ -7,9 +7,7 @@ This input plugin checks HTTP/HTTPS connections. ```toml # HTTP/HTTPS request given an address a method and a timeout [[inputs.http_response]] - ## Deprecated in 1.12, use 'urls' - ## Server address (default http://localhost) - # address = "http://localhost" + ## address is Deprecated in 1.12, use 'urls' ## List of urls to query. # urls = ["http://localhost"] @@ -39,8 +37,8 @@ This input plugin checks HTTP/HTTPS connections. # {'fake':'data'} # ''' - ## Optional name of the field that will contain the body of the response. - ## By default it is set to an empty String indicating that the body's content won't be added + ## Optional name of the field that will contain the body of the response. + ## By default it is set to an empty String indicating that the body's content won't be added # response_body_field = '' ## Maximum allowed HTTP response body size in bytes. diff --git a/processor.go b/processor.go index 15d67eb40..128a426c3 100644 --- a/processor.go +++ b/processor.go @@ -14,17 +14,27 @@ type Processor interface { type StreamingProcessor interface { PluginDescriber - // Start is the initializer for the processor - // Start is only called once per plugin instance, and never in parallel. - // Start should exit immediately after setup + // Start is called once when the plugin starts; it is only called once per + // plugin instance, and never in parallel. + // Start should return once it is ready to receive metrics. + // The passed in accumulator is the same as the one passed to Add(), so you + // can choose to save it in the plugin, or use the one received from Add(). Start(acc Accumulator) error - // Add is called for each metric to be processed. + // Add is called for each metric to be processed. The Add() function does not + // need to wait for the metric to be processed before returning, and it may + // be acceptable to let background goroutine(s) handle the processing if you + // have slow processing you need to do in parallel. + // Keep in mind Add() should not spawn unbounded goroutines, so you may need + // to use a semaphore or pool of workers (eg: reverse_dns plugin does this) + // Metrics you don't want to pass downstream should have metric.Drop() called, + // rather than simply omitting the acc.AddMetric() call Add(metric Metric, acc Accumulator) error - // Stop gives you a callback to free resources. - // by the time Stop is called, the input stream will have already been closed - // and Add will not be called anymore. + // Stop gives you an opportunity to gracefully shut down the processor. + // Once Stop() is called, Add() will not be called any more. If you are using + // goroutines, you should wait for any in-progress metrics to be processed + // before returning from Stop(). // When stop returns, you should no longer be writing metrics to the // accumulator. Stop() error