From 338282be872fadd98f6c8d86897ed3cf5671c2ec Mon Sep 17 00:00:00 2001 From: Lars Stegman Date: Mon, 30 Sep 2024 16:22:42 +0200 Subject: [PATCH] feat(processors.batch): Add batch processor (#15869) --- plugins/processors/all/batch.go | 5 ++ plugins/processors/batch/README.md | 60 ++++++++++++++ plugins/processors/batch/batch.go | 48 +++++++++++ plugins/processors/batch/batch_test.go | 110 +++++++++++++++++++++++++ plugins/processors/batch/sample.conf | 11 +++ 5 files changed, 234 insertions(+) create mode 100644 plugins/processors/all/batch.go create mode 100644 plugins/processors/batch/README.md create mode 100644 plugins/processors/batch/batch.go create mode 100644 plugins/processors/batch/batch_test.go create mode 100644 plugins/processors/batch/sample.conf diff --git a/plugins/processors/all/batch.go b/plugins/processors/all/batch.go new file mode 100644 index 000000000..65d4e677b --- /dev/null +++ b/plugins/processors/all/batch.go @@ -0,0 +1,5 @@ +//go:build !custom || processors || processors.batch + +package all + +import _ "github.com/influxdata/telegraf/plugins/processors/batch" // register plugin diff --git a/plugins/processors/batch/README.md b/plugins/processors/batch/README.md new file mode 100644 index 000000000..95c5bba00 --- /dev/null +++ b/plugins/processors/batch/README.md @@ -0,0 +1,60 @@ +# Batch Processor Plugin + +This processor groups metrics into batches by adding a batch tag. This is +useful for parallel processing of metrics where downstream processors, +aggregators or outputs can then select a batch using `tagpass` or `metricpass`. + +Metrics are distributed across batches using the round-robin scheme. + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +## Batch metrics into separate batches by adding a tag indicating the batch index. +[[processors.batch]] + ## The name of the tag to use for adding the batch index + batch_tag = "my_batch" + + ## The number of batches to create + batches = 16 + + ## Do not assign metrics with an existing batch assignment to a + ## different batch. + # skip_existing = false +``` + +## Example + +The example below uses these settings: + +```toml +[[processors.batch]] + ## The tag key to use for batching + batch_tag = "batch" + + ## The number of batches to create + batches = 3 +``` + +```diff +- temperature cpu=25 +- temperature cpu=50 +- temperature cpu=75 +- temperature cpu=25 +- temperature cpu=50 +- temperature cpu=75 ++ temperature,batch=0 cpu=25 ++ temperature,batch=1 cpu=50 ++ temperature,batch=2 cpu=75 ++ temperature,batch=0 cpu=25 ++ temperature,batch=1 cpu=50 ++ temperature,batch=2 cpu=75 +``` diff --git a/plugins/processors/batch/batch.go b/plugins/processors/batch/batch.go new file mode 100644 index 000000000..49215d26f --- /dev/null +++ b/plugins/processors/batch/batch.go @@ -0,0 +1,48 @@ +package batch + +import ( + _ "embed" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/processors" + "strconv" + "sync/atomic" +) + +//go:embed sample.conf +var sampleConfig string + +type Batch struct { + BatchTag string `toml:"batch_tag"` + NumBatches uint64 `toml:"batches"` + SkipExisting bool `toml:"skip_existing"` + + // the number of metrics that have been processed so far + count atomic.Uint64 +} + +func (*Batch) SampleConfig() string { + return sampleConfig +} + +func (b *Batch) Apply(in ...telegraf.Metric) []telegraf.Metric { + out := make([]telegraf.Metric, 0, len(in)) + for _, m := range in { + if b.SkipExisting && m.HasTag(b.BatchTag) { + out = append(out, m) + continue + } + + oldCount := b.count.Add(1) - 1 + batchID := oldCount % b.NumBatches + m.AddTag(b.BatchTag, strconv.FormatUint(batchID, 10)) + out = append(out, m) + } + + return out +} + +func init() { + processors.Add("batch", func() telegraf.Processor { + return &Batch{} + }) +} diff --git a/plugins/processors/batch/batch_test.go b/plugins/processors/batch/batch_test.go new file mode 100644 index 000000000..a3b6f9705 --- /dev/null +++ b/plugins/processors/batch/batch_test.go @@ -0,0 +1,110 @@ +package batch + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "testing" +) + +const batchTag = "?internal_batch_idx" + +func Test_SingleMetricPutInBatch0(t *testing.T) { + b := &Batch{ + BatchTag: batchTag, + NumBatches: 1, + } + m := testutil.MockMetricsWithValue(1) + expectedM := testutil.MockMetricsWithValue(1) + expectedM[0].AddTag(batchTag, "0") + + res := b.Apply(m...) + testutil.RequireMetricsEqual(t, expectedM, res) +} + +func Test_MetricsSmallerThanBatchSizeAreInDifferentBatches(t *testing.T) { + b := &Batch{ + BatchTag: batchTag, + NumBatches: 3, + } + + ms := make([]telegraf.Metric, 0, 2) + for range cap(ms) { + ms = append(ms, testutil.MockMetrics()...) + } + + res := b.Apply(ms...) + + batchTagValue, ok := res[0].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "0", batchTagValue) + + batchTagValue, ok = res[1].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "1", batchTagValue) +} + +func Test_MetricsEqualToBatchSizeInDifferentBatches(t *testing.T) { + b := &Batch{ + BatchTag: batchTag, + NumBatches: 3, + } + + ms := make([]telegraf.Metric, 0, 3) + for range cap(ms) { + ms = append(ms, testutil.MockMetrics()...) + } + + res := b.Apply(ms...) + batchTagValue, ok := res[0].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "0", batchTagValue) + + batchTagValue, ok = res[1].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "1", batchTagValue) + + batchTagValue, ok = res[2].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "2", batchTagValue) +} + +func Test_MetricsMoreThanBatchSizeInSameBatch(t *testing.T) { + b := &Batch{ + BatchTag: batchTag, + NumBatches: 2, + } + + ms := make([]telegraf.Metric, 0, 3) + for range cap(ms) { + ms = append(ms, testutil.MockMetrics()...) + } + + res := b.Apply(ms...) + batchTagValue, ok := res[0].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "0", batchTagValue) + + batchTagValue, ok = res[1].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "1", batchTagValue) + + batchTagValue, ok = res[2].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "0", batchTagValue) +} + +func Test_MetricWithExistingTagNotChanged(t *testing.T) { + b := &Batch{ + BatchTag: batchTag, + NumBatches: 2, + SkipExisting: true, + } + + m := testutil.MockMetricsWithValue(1) + m[0].AddTag(batchTag, "4") + res := b.Apply(m...) + tv, ok := res[0].GetTag(batchTag) + require.True(t, ok) + require.Equal(t, "4", tv) +} diff --git a/plugins/processors/batch/sample.conf b/plugins/processors/batch/sample.conf new file mode 100644 index 000000000..b4e987726 --- /dev/null +++ b/plugins/processors/batch/sample.conf @@ -0,0 +1,11 @@ +## Batch metrics into separate batches by adding a tag indicating the batch index. +[[processors.batch]] + ## The name of the tag to use for adding the batch index + batch_tag = "my_batch" + + ## The number of batches to create + batches = 16 + + ## Do not assign metrics with an existing batch assignment to a + ## different batch. + # skip_existing = false