From 748af7f5d17efdd21e8b9fa42e5d4beb3055f03b Mon Sep 17 00:00:00 2001 From: Pontus Rydin Date: Mon, 2 Nov 2020 23:12:48 -0500 Subject: [PATCH] [output.wavefront] Introduced "immediate_flush" flag (#8165) --- plugins/outputs/wavefront/README.md | 6 ++++++ plugins/outputs/wavefront/wavefront.go | 20 ++++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/plugins/outputs/wavefront/README.md b/plugins/outputs/wavefront/README.md index 2daca328c..8439295bb 100644 --- a/plugins/outputs/wavefront/README.md +++ b/plugins/outputs/wavefront/README.md @@ -49,6 +49,12 @@ This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefro ## Truncate metric tags to a total of 254 characters for the tag name value. Wavefront will reject any ## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility. #truncate_tags = false + + ## Flush the internal buffers after each batch. This effectively bypasses the background sending of metrics + ## normally done by the Wavefront SDK. This can be used if you are experiencing buffer overruns. The sending + ## of metrics will block for a longer time, but this will be handled gracefully by the internal buffering in + ## Telegraf. + #immediate_flush = true ``` diff --git a/plugins/outputs/wavefront/wavefront.go b/plugins/outputs/wavefront/wavefront.go index 523549fb1..6ba82ce5c 100644 --- a/plugins/outputs/wavefront/wavefront.go +++ b/plugins/outputs/wavefront/wavefront.go @@ -25,6 +25,7 @@ type Wavefront struct { UseRegex bool UseStrict bool TruncateTags bool + ImmediateFlush bool SourceOverride []string StringToNumber map[string][]map[string]float64 @@ -101,6 +102,12 @@ var sampleConfig = ` ## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility. #truncate_tags = false + ## Flush the internal buffers after each batch. This effectively bypasses the background sending of metrics + ## normally done by the Wavefront SDK. This can be used if you are experiencing buffer overruns. The sending + ## of metrics will block for a longer time, but this will be handled gracefully by the internal buffering in + ## Telegraf. + #immediate_flush = true + ## Define a mapping, namespaced by metric prefix, from string values to numeric values ## deprecated in 1.9; use the enum processor plugin #[[outputs.wavefront.string_to_number.elasticsearch]] @@ -123,12 +130,16 @@ func (w *Wavefront) Connect() error { w.Log.Warn("The string_to_number option is deprecated; please use the enum processor instead") } + flushSeconds := 5 + if w.ImmediateFlush { + flushSeconds = 86400 // Set a very long flush interval if we're flushing directly + } if w.Url != "" { w.Log.Debug("connecting over http/https using Url: %s", w.Url) sender, err := wavefront.NewDirectSender(&wavefront.DirectConfiguration{ Server: w.Url, Token: w.Token, - FlushIntervalSeconds: 5, + FlushIntervalSeconds: flushSeconds, }) if err != nil { return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Url: %s", w.Url) @@ -139,7 +150,7 @@ func (w *Wavefront) Connect() error { sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{ Host: w.Host, MetricsPort: w.Port, - FlushIntervalSeconds: 5, + FlushIntervalSeconds: flushSeconds, }) if err != nil { return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %q and Port: %d", w.Host, w.Port) @@ -166,6 +177,10 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error { } } } + if w.ImmediateFlush { + w.Log.Debugf("Flushing batch of %d points", len(metrics)) + return w.sender.Flush() + } return nil } @@ -336,6 +351,7 @@ func init() { ConvertPaths: true, ConvertBool: true, TruncateTags: false, + ImmediateFlush: true, } }) }