[output.wavefront] Introduced "immediate_flush" flag (#8165)

This commit is contained in:
Pontus Rydin 2020-11-02 23:12:48 -05:00 committed by GitHub
parent 38796f035b
commit 748af7f5d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 2 deletions

View File

@ -49,6 +49,12 @@ This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefro
## Truncate metric tags to a total of 254 characters for the tag name value. Wavefront will reject any
## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility.
#truncate_tags = false
## Flush the internal buffers after each batch. This effectively bypasses the background sending of metrics
## normally done by the Wavefront SDK. This can be used if you are experiencing buffer overruns. The sending
## of metrics will block for a longer time, but this will be handled gracefully by the internal buffering in
## Telegraf.
#immediate_flush = true
```

View File

@ -25,6 +25,7 @@ type Wavefront struct {
UseRegex bool
UseStrict bool
TruncateTags bool
ImmediateFlush bool
SourceOverride []string
StringToNumber map[string][]map[string]float64
@ -101,6 +102,12 @@ var sampleConfig = `
## data point exceeding this limit if not truncated. Defaults to 'false' to provide backwards compatibility.
#truncate_tags = false
## Flush the internal buffers after each batch. This effectively bypasses the background sending of metrics
## normally done by the Wavefront SDK. This can be used if you are experiencing buffer overruns. The sending
## of metrics will block for a longer time, but this will be handled gracefully by the internal buffering in
## Telegraf.
#immediate_flush = true
## Define a mapping, namespaced by metric prefix, from string values to numeric values
## deprecated in 1.9; use the enum processor plugin
#[[outputs.wavefront.string_to_number.elasticsearch]]
@ -123,12 +130,16 @@ func (w *Wavefront) Connect() error {
w.Log.Warn("The string_to_number option is deprecated; please use the enum processor instead")
}
flushSeconds := 5
if w.ImmediateFlush {
flushSeconds = 86400 // Set a very long flush interval if we're flushing directly
}
if w.Url != "" {
w.Log.Debug("connecting over http/https using Url: %s", w.Url)
sender, err := wavefront.NewDirectSender(&wavefront.DirectConfiguration{
Server: w.Url,
Token: w.Token,
FlushIntervalSeconds: 5,
FlushIntervalSeconds: flushSeconds,
})
if err != nil {
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Url: %s", w.Url)
@ -139,7 +150,7 @@ func (w *Wavefront) Connect() error {
sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{
Host: w.Host,
MetricsPort: w.Port,
FlushIntervalSeconds: 5,
FlushIntervalSeconds: flushSeconds,
})
if err != nil {
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %q and Port: %d", w.Host, w.Port)
@ -166,6 +177,10 @@ func (w *Wavefront) Write(metrics []telegraf.Metric) error {
}
}
}
if w.ImmediateFlush {
w.Log.Debugf("Flushing batch of %d points", len(metrics))
return w.sender.Flush()
}
return nil
}
@ -336,6 +351,7 @@ func init() {
ConvertPaths: true,
ConvertBool: true,
TruncateTags: false,
ImmediateFlush: true,
}
})
}