From b7152376061311fb353f51f4c4c564e480272cf1 Mon Sep 17 00:00:00 2001 From: Evgenii Kuznetsov Date: Wed, 23 Apr 2025 16:47:39 +0300 Subject: [PATCH] feat(common.shim): Add batch to shim (#16148) Co-authored-by: Thomas Casteleyn Co-authored-by: Sven Rebhan --- plugins/common/shim/goshim.go | 15 ++-- plugins/common/shim/output.go | 85 +++++++++++++++++-- plugins/common/shim/output_test.go | 130 +++++++++++++++++++++++++++-- 3 files changed, 209 insertions(+), 21 deletions(-) diff --git a/plugins/common/shim/goshim.go b/plugins/common/shim/goshim.go index 8fdeb2f43..ea0c337db 100644 --- a/plugins/common/shim/goshim.go +++ b/plugins/common/shim/goshim.go @@ -39,6 +39,9 @@ type Shim struct { Processor telegraf.StreamingProcessor Output telegraf.Output + BatchSize int + BatchTimeout time.Duration + log telegraf.Logger // streams @@ -56,11 +59,13 @@ type Shim struct { // New creates a new shim interface func New() *Shim { return &Shim{ - metricCh: make(chan telegraf.Metric, 1), - stdin: os.Stdin, - stdout: os.Stdout, - stderr: os.Stderr, - log: logger.New("", "", ""), + BatchSize: 1, + BatchTimeout: 10 * time.Second, + metricCh: make(chan telegraf.Metric, 1), + stdin: os.Stdin, + stdout: os.Stdout, + stderr: os.Stderr, + log: logger.New("", "", ""), } } diff --git a/plugins/common/shim/output.go b/plugins/common/shim/output.go index 88108673a..e1e542d43 100644 --- a/plugins/common/shim/output.go +++ b/plugins/common/shim/output.go @@ -3,6 +3,9 @@ package shim import ( "bufio" "fmt" + "os" + "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/models" @@ -24,31 +27,97 @@ func (s *Shim) AddOutput(output telegraf.Output) error { } func (s *Shim) RunOutput() error { + // Create a parser for receiving the metrics in line-protocol format parser := influx.Parser{} - err := parser.Init() - if err != nil { + if err := parser.Init(); err != nil { return fmt.Errorf("failed to create new parser: %w", err) } - err = s.Output.Connect() - if err != nil { + // Connect the output + if err := s.Output.Connect(); err != nil { return fmt.Errorf("failed to start processor: %w", err) } defer s.Output.Close() - var m telegraf.Metric + // Collect the metrics from stdin. Note, we need to flush the metrics + // when the batch is full or after the configured time, whatever comes + // first. We need to lock the batch as we run into race conditions + // otherwise. + var mu sync.Mutex + metrics := make([]telegraf.Metric, 0, s.BatchSize) + // Prepare the flush timer... + flush := func(whole bool) { + mu.Lock() + defer mu.Unlock() + + // Exit early if there is nothing to do + if len(metrics) == 0 { + return + } + + // Determine the threshold on when to stop flushing depending on the + // given flag. + var threshold int + if whole { + threshold = s.BatchSize + } + + // Flush out the metrics in batches of the configured size until we + // got all of them out or if there is less than a whole batch left. + for len(metrics) > 0 && len(metrics) >= threshold { + // Write the metrics and remove the batch + batch := metrics[:min(len(metrics), s.BatchSize)] + if err := s.Output.Write(batch); err != nil { + fmt.Fprintf(os.Stderr, "Failed to write metrics: %s\n", err) + } + metrics = metrics[len(batch):] + } + } + + // Setup the time-based flush + var timer *time.Timer + if s.BatchTimeout > 0 { + timer = time.AfterFunc(s.BatchTimeout, func() { flush(false) }) + defer func() { + if timer != nil { + timer.Stop() + } + }() + } + + // Start the processing loop scanner := bufio.NewScanner(s.stdin) for scanner.Scan() { - m, err = parser.ParseLine(scanner.Text()) + // Read metrics from stdin + m, err := parser.ParseLine(scanner.Text()) if err != nil { fmt.Fprintf(s.stderr, "Failed to parse metric: %s\n", err) continue } - if err = s.Output.Write([]telegraf.Metric{m}); err != nil { - fmt.Fprintf(s.stderr, "Failed to write metric: %s\n", err) + mu.Lock() + metrics = append(metrics, m) + shouldFlush := len(metrics) >= s.BatchSize + mu.Unlock() + + // If we got more enough metrics to fill the batch flush it out and + // reset the time-based guard. + if shouldFlush { + if timer != nil { + timer.Stop() + } + flush(true) + if s.BatchTimeout > 0 { + timer = time.AfterFunc(s.BatchTimeout, func() { flush(false) }) + } } } + // Output all remaining metrics + if timer != nil { + timer.Stop() + } + flush(false) + return nil } diff --git a/plugins/common/shim/output_test.go b/plugins/common/shim/output_test.go index 247496f7c..2522a4659 100644 --- a/plugins/common/shim/output_test.go +++ b/plugins/common/shim/output_test.go @@ -3,6 +3,7 @@ package shim import ( "io" "sync" + "sync/atomic" "testing" "time" @@ -21,11 +22,9 @@ func TestOutputShim(t *testing.T) { s := New() s.stdin = stdinReader - err := s.AddOutput(o) - require.NoError(t, err) - - wg := sync.WaitGroup{} + require.NoError(t, s.AddOutput(o)) + var wg sync.WaitGroup wg.Add(1) go func() { if err := s.RunOutput(); err != nil { @@ -50,19 +49,133 @@ func TestOutputShim(t *testing.T) { require.NoError(t, err) _, err = stdinWriter.Write(b) require.NoError(t, err) - err = stdinWriter.Close() - require.NoError(t, err) + require.NoError(t, stdinWriter.Close()) wg.Wait() require.Len(t, o.MetricsWritten, 1) - mOut := o.MetricsWritten[0] + testutil.RequireMetricEqual(t, m, o.MetricsWritten[0]) +} - testutil.RequireMetricEqual(t, m, mOut) +func TestOutputShimWithBatchSize(t *testing.T) { + o := &testOutput{} + + stdinReader, stdinWriter := io.Pipe() + + // Setup a shim with a batch size but no timeout + s := New() + s.stdin = stdinReader + s.BatchSize = 5 + s.BatchTimeout = 0 + require.NoError(t, s.AddOutput(o)) + + // Start the output processing + var wg sync.WaitGroup + wg.Add(1) + go func() { + if err := s.RunOutput(); err != nil { + t.Error(err) + } + wg.Done() + }() + + // Serialize the test metric + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) + m := metric.New("thing", + map[string]string{ + "a": "b", + }, + map[string]interface{}{ + "v": 1, + }, + time.Now(), + ) + payload, err := serializer.Serialize(m) + require.NoError(t, err) + + // Write a few more metrics than the batch-size and check that we only get + // a full batch before closing the input stream. + expected := make([]telegraf.Metric, 0, s.BatchSize+3) + for range cap(expected) { + _, err := stdinWriter.Write(payload) + require.NoError(t, err) + expected = append(expected, m) + } + + // Wait for the metrics to arrive + require.Eventually(t, func() bool { + return o.Count.Load() >= uint32(s.BatchSize) + }, 3*time.Second, 100*time.Millisecond) + testutil.RequireMetricsEqual(t, expected[:s.BatchSize], o.MetricsWritten) + + // Closing the input should force the remaining metrics to be written + require.NoError(t, stdinWriter.Close()) + wg.Wait() + testutil.RequireMetricsEqual(t, expected, o.MetricsWritten) +} + +func TestOutputShimWithFlushTimeout(t *testing.T) { + o := &testOutput{} + + stdinReader, stdinWriter := io.Pipe() + + // Setup a shim with a batch size and a short timeout + s := New() + s.stdin = stdinReader + s.BatchSize = 5 + s.BatchTimeout = 500 * time.Millisecond + require.NoError(t, s.AddOutput(o)) + + // Start the output processing + var wg sync.WaitGroup + wg.Add(1) + go func() { + if err := s.RunOutput(); err != nil { + t.Error(err) + } + wg.Done() + }() + + // Serialize the test metric + serializer := &influx.Serializer{} + require.NoError(t, serializer.Init()) + m := metric.New("thing", + map[string]string{ + "a": "b", + }, + map[string]interface{}{ + "v": 1, + }, + time.Now(), + ) + payload, err := serializer.Serialize(m) + require.NoError(t, err) + + // Write less metrics than the batch-size and check if the flush timeout + // triggers.. + expected := make([]telegraf.Metric, 0, s.BatchSize-1) + for range cap(expected) { + _, err := stdinWriter.Write(payload) + require.NoError(t, err) + expected = append(expected, m) + } + // Wait for the batch to be flushed + require.Eventually(t, func() bool { + return o.Count.Load() >= uint32(len(expected)) + }, 3*time.Second, 100*time.Millisecond) + + testutil.RequireMetricsEqual(t, expected, o.MetricsWritten) + + // Closing the input should not change anything + require.NoError(t, stdinWriter.Close()) + wg.Wait() + testutil.RequireMetricsEqual(t, expected, o.MetricsWritten) } type testOutput struct { MetricsWritten []telegraf.Metric + Count atomic.Uint32 } func (*testOutput) Connect() error { @@ -73,6 +186,7 @@ func (*testOutput) Close() error { } func (o *testOutput) Write(metrics []telegraf.Metric) error { o.MetricsWritten = append(o.MetricsWritten, metrics...) + o.Count.Store(uint32(len(o.MetricsWritten))) return nil }