chore(agent.buffer): Add experimental warnings and increased logging (#15638)

This commit is contained in:
Dane Strandboge 2024-07-19 08:47:30 -05:00 committed by GitHub
parent a6270b71f9
commit f11ead9980
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 19 additions and 1 deletions

View File

@ -279,7 +279,12 @@ type AgentConfig struct {
// startup. Set to -1 for unlimited attempts.
ConfigURLRetryAttempts int `toml:"config_url_retry_attempts"`
BufferStrategy string `toml:"buffer_strategy"`
// BufferStrategy is the metric buffer type to use for a given output plugin.
// Supported types currently are "memory" and "disk".
BufferStrategy string `toml:"buffer_strategy"`
// BufferDirectory is the directory to store buffer files for serialized
// to disk metrics when using the "disk" buffer strategy.
BufferDirectory string `toml:"buffer_directory"`
}
@ -1531,6 +1536,10 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
return nil, c.firstErr()
}
if oc.BufferStrategy == "disk" {
log.Printf("W! Using disk buffer strategy for plugin outputs.%s, this is an experimental feature", name)
}
// Generate an ID for the plugin
oc.ID, err = generatePluginID("outputs."+name, tbl)
return oc, err

View File

@ -352,6 +352,12 @@ The agent table configures Telegraf and the defaults used across all plugins.
By default, processors are run a second time after aggregators. Changing
this setting to true will skip the second run of processors.
- **buffer_strategy**:
The type of buffer to use for telegraf output plugins. Supported modes are
`memory`, the default and original buffer type, and `disk`, an experimental
disk-backed buffer which will serialize all metrics to disk as needed to
improve data durability and reduce the chance for data loss.
## Plugins
Telegraf plugins are divided into 4 types: [inputs][], [outputs][],

View File

@ -3,6 +3,7 @@ package models
import (
"errors"
"fmt"
"log"
"os"
"path/filepath"
"sync"
@ -141,6 +142,7 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric {
}
if err != nil {
// non-recoverable error in deserialization, abort
log.Printf("E! raw metric data: %v", data)
panic(err)
}
if _, ok := m.(telegraf.TrackingMetric); ok && readIndex < b.originalEnd {
@ -171,6 +173,7 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) {
} else {
err := b.file.TruncateFront(b.batchFirst + uint64(len(batch)))
if err != nil {
log.Printf("E! batch length: %d, batchFirst: %d, batchSize: %d", len(batch), b.batchFirst, b.batchSize)
panic(err)
}
}