telegraf/plugins/common/shim/output.go

124 lines
2.9 KiB
Go
Raw Normal View History

package shim
import (
"bufio"
"fmt"
"os"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/models"
"github.com/influxdata/telegraf/plugins/parsers/influx"
)
// AddOutput adds the input to the shim. Later calls to Run() will run this.
func (s *Shim) AddOutput(output telegraf.Output) error {
models.SetLoggerOnPlugin(output, s.Log())
if p, ok := output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return fmt.Errorf("failed to init input: %w", err)
}
}
s.Output = output
return nil
}
func (s *Shim) RunOutput() error {
// Create a parser for receiving the metrics in line-protocol format
parser := influx.Parser{}
if err := parser.Init(); err != nil {
return fmt.Errorf("failed to create new parser: %w", err)
}
// Connect the output
if err := s.Output.Connect(); err != nil {
return fmt.Errorf("failed to start processor: %w", err)
}
defer s.Output.Close()
// Collect the metrics from stdin. Note, we need to flush the metrics
// when the batch is full or after the configured time, whatever comes
// first. We need to lock the batch as we run into race conditions
// otherwise.
var mu sync.Mutex
metrics := make([]telegraf.Metric, 0, s.BatchSize)
// Prepare the flush timer...
flush := func(whole bool) {
mu.Lock()
defer mu.Unlock()
// Exit early if there is nothing to do
if len(metrics) == 0 {
return
}
// Determine the threshold on when to stop flushing depending on the
// given flag.
var threshold int
if whole {
threshold = s.BatchSize
}
// Flush out the metrics in batches of the configured size until we
// got all of them out or if there is less than a whole batch left.
for len(metrics) > 0 && len(metrics) >= threshold {
// Write the metrics and remove the batch
batch := metrics[:min(len(metrics), s.BatchSize)]
if err := s.Output.Write(batch); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write metrics: %s\n", err)
}
metrics = metrics[len(batch):]
}
}
// Setup the time-based flush
var timer *time.Timer
if s.BatchTimeout > 0 {
timer = time.AfterFunc(s.BatchTimeout, func() { flush(false) })
defer func() {
if timer != nil {
timer.Stop()
}
}()
}
// Start the processing loop
scanner := bufio.NewScanner(s.stdin)
for scanner.Scan() {
// Read metrics from stdin
m, err := parser.ParseLine(scanner.Text())
if err != nil {
fmt.Fprintf(s.stderr, "Failed to parse metric: %s\n", err)
continue
}
mu.Lock()
metrics = append(metrics, m)
shouldFlush := len(metrics) >= s.BatchSize
mu.Unlock()
// If we got more enough metrics to fill the batch flush it out and
// reset the time-based guard.
if shouldFlush {
if timer != nil {
timer.Stop()
}
flush(true)
if s.BatchTimeout > 0 {
timer = time.AfterFunc(s.BatchTimeout, func() { flush(false) })
}
}
}
// Output all remaining metrics
if timer != nil {
timer.Stop()
}
flush(false)
return nil
}