From f11ead9980a82b4b99033a315e8158ac63217990 Mon Sep 17 00:00:00 2001 From: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> Date: Fri, 19 Jul 2024 08:47:30 -0500 Subject: [PATCH] chore(agent.buffer): Add experimental warnings and increased logging (#15638) --- config/config.go | 11 ++++++++++- docs/CONFIGURATION.md | 6 ++++++ models/buffer_disk.go | 3 +++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/config/config.go b/config/config.go index 87605890c..e7a8ea2d3 100644 --- a/config/config.go +++ b/config/config.go @@ -279,7 +279,12 @@ type AgentConfig struct { // startup. Set to -1 for unlimited attempts. ConfigURLRetryAttempts int `toml:"config_url_retry_attempts"` - BufferStrategy string `toml:"buffer_strategy"` + // BufferStrategy is the metric buffer type to use for a given output plugin. + // Supported types currently are "memory" and "disk". + BufferStrategy string `toml:"buffer_strategy"` + + // BufferDirectory is the directory to store buffer files for serialized + // to disk metrics when using the "disk" buffer strategy. BufferDirectory string `toml:"buffer_directory"` } @@ -1531,6 +1536,10 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, return nil, c.firstErr() } + if oc.BufferStrategy == "disk" { + log.Printf("W! Using disk buffer strategy for plugin outputs.%s, this is an experimental feature", name) + } + // Generate an ID for the plugin oc.ID, err = generatePluginID("outputs."+name, tbl) return oc, err diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index 1c3f5f7d8..58c6b58b1 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -352,6 +352,12 @@ The agent table configures Telegraf and the defaults used across all plugins. By default, processors are run a second time after aggregators. Changing this setting to true will skip the second run of processors. +- **buffer_strategy**: + The type of buffer to use for telegraf output plugins. Supported modes are + `memory`, the default and original buffer type, and `disk`, an experimental + disk-backed buffer which will serialize all metrics to disk as needed to + improve data durability and reduce the chance for data loss. + ## Plugins Telegraf plugins are divided into 4 types: [inputs][], [outputs][], diff --git a/models/buffer_disk.go b/models/buffer_disk.go index 5d0e687f1..e5fc1b219 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -3,6 +3,7 @@ package models import ( "errors" "fmt" + "log" "os" "path/filepath" "sync" @@ -141,6 +142,7 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric { } if err != nil { // non-recoverable error in deserialization, abort + log.Printf("E! raw metric data: %v", data) panic(err) } if _, ok := m.(telegraf.TrackingMetric); ok && readIndex < b.originalEnd { @@ -171,6 +173,7 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) { } else { err := b.file.TruncateFront(b.batchFirst + uint64(len(batch))) if err != nil { + log.Printf("E! batch length: %d, batchFirst: %d, batchSize: %d", len(batch), b.batchFirst, b.batchSize) panic(err) } }