telegraf/models/running_output.go

369 lines
8.1 KiB
Go
Raw Normal View History

package models
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
logging "github.com/influxdata/telegraf/logger"
"github.com/influxdata/telegraf/selfstat"
)
const (
// Default size of metrics batch size.
DefaultMetricBatchSize = 1000
// Default number of metrics kept. It should be a multiple of batch size.
DefaultMetricBufferLimit = 10000
)
// OutputConfig containing name and filter
type OutputConfig struct {
Name string
Alias string
ID string
StartupErrorBehavior string
Filter Filter
FlushInterval time.Duration
FlushJitter time.Duration
MetricBufferLimit int
MetricBatchSize int
NameOverride string
NamePrefix string
NameSuffix string
BufferStrategy string
BufferDirectory string
LogLevel string
}
// RunningOutput contains the output configuration
type RunningOutput struct {
// Must be 64-bit aligned
newMetricsCount int64
droppedMetrics int64
Output telegraf.Output
Config *OutputConfig
MetricBufferLimit int
MetricBatchSize int
MetricsFiltered selfstat.Stat
WriteTime selfstat.Stat
StartupErrors selfstat.Stat
BatchReady chan time.Time
buffer Buffer
log telegraf.Logger
started bool
retries uint64
aggMutex sync.Mutex
}
func NewRunningOutput(
output telegraf.Output,
config *OutputConfig,
batchSize int,
bufferLimit int,
) *RunningOutput {
tags := map[string]string{"output": config.Name}
if config.Alias != "" {
tags["alias"] = config.Alias
}
writeErrorsRegister := selfstat.Register("write", "errors", tags)
logger := logging.New("outputs", config.Name, config.Alias)
logger.RegisterErrorCallback(func() {
writeErrorsRegister.Incr(1)
})
if err := logger.SetLogLevel(config.LogLevel); err != nil {
logger.Error(err)
}
SetLoggerOnPlugin(output, logger)
if config.MetricBufferLimit > 0 {
bufferLimit = config.MetricBufferLimit
}
if bufferLimit == 0 {
bufferLimit = DefaultMetricBufferLimit
}
if config.MetricBatchSize > 0 {
batchSize = config.MetricBatchSize
}
if batchSize == 0 {
batchSize = DefaultMetricBatchSize
}
b, err := NewBuffer(config.Name, config.Alias, bufferLimit, config.BufferStrategy, config.BufferDirectory)
if err != nil {
panic(err)
}
ro := &RunningOutput{
buffer: b,
BatchReady: make(chan time.Time, 1),
Output: output,
Config: config,
MetricBufferLimit: bufferLimit,
MetricBatchSize: batchSize,
MetricsFiltered: selfstat.Register(
"write",
"metrics_filtered",
tags,
),
WriteTime: selfstat.RegisterTiming(
"write",
"write_time_ns",
tags,
),
StartupErrors: selfstat.Register(
"write",
"startup_errors",
tags,
),
log: logger,
}
return ro
}
func (r *RunningOutput) LogName() string {
return logName("outputs", r.Config.Name, r.Config.Alias)
}
func (r *RunningOutput) metricFiltered(metric telegraf.Metric) {
r.MetricsFiltered.Incr(1)
metric.Drop()
}
func (r *RunningOutput) ID() string {
if p, ok := r.Output.(telegraf.PluginWithID); ok {
return p.ID()
}
return r.Config.ID
}
func (r *RunningOutput) Init() error {
switch r.Config.StartupErrorBehavior {
case "", "error", "retry", "ignore":
default:
return fmt.Errorf("invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}
if p, ok := r.Output.(telegraf.Initializer); ok {
err := p.Init()
if err != nil {
return err
}
}
return nil
}
func (r *RunningOutput) Connect() error {
// Try to connect and exit early on success
err := r.Output.Connect()
if err == nil {
r.started = true
return nil
}
r.StartupErrors.Incr(1)
// Check if the plugin reports a retry-able error, otherwise we exit.
var serr *internal.StartupError
if !errors.As(err, &serr) || !serr.Retry {
return err
}
// Handle the retry-able error depending on the configured behavior
switch r.Config.StartupErrorBehavior {
case "", "error": // fall-trough to return the actual error
case "retry":
r.log.Infof("Connect failed: %v; retrying...", err)
return nil
case "ignore":
return &internal.FatalError{Err: serr}
default:
r.log.Errorf("Invalid 'startup_error_behavior' setting %q", r.Config.StartupErrorBehavior)
}
return err
}
// Close closes the output
func (r *RunningOutput) Close() {
if err := r.Output.Close(); err != nil {
r.log.Errorf("Error closing output: %v", err)
}
}
// AddMetric adds a metric to the output.
// Takes ownership of metric
func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
ok, err := r.Config.Filter.Select(metric)
if err != nil {
r.log.Errorf("filtering failed: %v", err)
} else if !ok {
r.metricFiltered(metric)
return
}
r.Config.Filter.Modify(metric)
if len(metric.FieldList()) == 0 {
r.metricFiltered(metric)
return
}
if output, ok := r.Output.(telegraf.AggregatingOutput); ok {
r.aggMutex.Lock()
output.Add(metric)
r.aggMutex.Unlock()
return
}
if len(r.Config.NameOverride) > 0 {
metric.SetName(r.Config.NameOverride)
}
if len(r.Config.NamePrefix) > 0 {
metric.AddPrefix(r.Config.NamePrefix)
}
if len(r.Config.NameSuffix) > 0 {
metric.AddSuffix(r.Config.NameSuffix)
}
dropped := r.buffer.Add(metric)
atomic.AddInt64(&r.droppedMetrics, int64(dropped))
count := atomic.AddInt64(&r.newMetricsCount, 1)
if count == int64(r.MetricBatchSize) {
atomic.StoreInt64(&r.newMetricsCount, 0)
select {
case r.BatchReady <- time.Now():
default:
}
}
}
// Write writes all metrics to the output, stopping when all have been sent on
// or error.
func (r *RunningOutput) Write() error {
// Try to connect if we are not yet started up
if !r.started {
r.retries++
if err := r.Output.Connect(); err != nil {
var serr *internal.StartupError
if !errors.As(err, &serr) || !serr.Retry || !serr.Partial {
r.StartupErrors.Incr(1)
return internal.ErrNotConnected
}
r.log.Debugf("Partially connected after %d attempts", r.retries)
} else {
r.started = true
r.log.Debugf("Successfully connected after %d attempts", r.retries)
}
}
if output, ok := r.Output.(telegraf.AggregatingOutput); ok {
r.aggMutex.Lock()
metrics := output.Push()
r.buffer.Add(metrics...)
output.Reset()
r.aggMutex.Unlock()
}
atomic.StoreInt64(&r.newMetricsCount, 0)
// Only process the metrics in the buffer now. Metrics added while we are
// writing will be sent on the next call.
nBuffer := r.buffer.Len()
nBatches := nBuffer/r.MetricBatchSize + 1
for i := 0; i < nBatches; i++ {
batch := r.buffer.Batch(r.MetricBatchSize)
if len(batch) == 0 {
break
}
err := r.writeMetrics(batch)
if err != nil {
r.buffer.Reject(batch)
return err
}
r.buffer.Accept(batch)
}
return nil
}
// WriteBatch writes a single batch of metrics to the output.
func (r *RunningOutput) WriteBatch() error {
// Try to connect if we are not yet started up
if !r.started {
r.retries++
if err := r.Output.Connect(); err != nil {
r.StartupErrors.Incr(1)
return internal.ErrNotConnected
}
r.started = true
r.log.Debugf("Successfully connected after %d attempts", r.retries)
}
batch := r.buffer.Batch(r.MetricBatchSize)
if len(batch) == 0 {
return nil
}
err := r.writeMetrics(batch)
if err != nil {
r.buffer.Reject(batch)
return err
}
r.buffer.Accept(batch)
return nil
}
func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error {
dropped := atomic.LoadInt64(&r.droppedMetrics)
if dropped > 0 {
r.log.Warnf("Metric buffer overflow; %d metrics have been dropped", dropped)
atomic.StoreInt64(&r.droppedMetrics, 0)
}
start := time.Now()
err := r.Output.Write(metrics)
elapsed := time.Since(start)
r.WriteTime.Incr(elapsed.Nanoseconds())
if err == nil {
r.log.Debugf("Wrote batch of %d metrics in %s", len(metrics), elapsed)
}
return err
}
func (r *RunningOutput) LogBufferStatus() {
nBuffer := r.buffer.Len()
if r.Config.BufferStrategy == "disk" {
r.log.Debugf("Buffer fullness: %d metrics", nBuffer)
} else {
r.log.Debugf("Buffer fullness: %d / %d metrics", nBuffer, r.MetricBufferLimit)
}
}
func (r *RunningOutput) Log() telegraf.Logger {
return r.log
}
func (r *RunningOutput) BufferLength() int {
return r.buffer.Len()
}