feat(processors.batch): Add batch processor (#15869)
This commit is contained in:
parent
66245c41b9
commit
338282be87
|
|
@ -0,0 +1,5 @@
|
||||||
|
//go:build !custom || processors || processors.batch
|
||||||
|
|
||||||
|
package all
|
||||||
|
|
||||||
|
import _ "github.com/influxdata/telegraf/plugins/processors/batch" // register plugin
|
||||||
|
|
@ -0,0 +1,60 @@
|
||||||
|
# Batch Processor Plugin
|
||||||
|
|
||||||
|
This processor groups metrics into batches by adding a batch tag. This is
|
||||||
|
useful for parallel processing of metrics where downstream processors,
|
||||||
|
aggregators or outputs can then select a batch using `tagpass` or `metricpass`.
|
||||||
|
|
||||||
|
Metrics are distributed across batches using the round-robin scheme.
|
||||||
|
|
||||||
|
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
|
||||||
|
|
||||||
|
In addition to the plugin-specific configuration settings, plugins support
|
||||||
|
additional global and plugin configuration settings. These settings are used to
|
||||||
|
modify metrics, tags, and field or create aliases and configure ordering, etc.
|
||||||
|
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
|
|
||||||
|
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
```toml @sample.conf
|
||||||
|
## Batch metrics into separate batches by adding a tag indicating the batch index.
|
||||||
|
[[processors.batch]]
|
||||||
|
## The name of the tag to use for adding the batch index
|
||||||
|
batch_tag = "my_batch"
|
||||||
|
|
||||||
|
## The number of batches to create
|
||||||
|
batches = 16
|
||||||
|
|
||||||
|
## Do not assign metrics with an existing batch assignment to a
|
||||||
|
## different batch.
|
||||||
|
# skip_existing = false
|
||||||
|
```
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
The example below uses these settings:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[processors.batch]]
|
||||||
|
## The tag key to use for batching
|
||||||
|
batch_tag = "batch"
|
||||||
|
|
||||||
|
## The number of batches to create
|
||||||
|
batches = 3
|
||||||
|
```
|
||||||
|
|
||||||
|
```diff
|
||||||
|
- temperature cpu=25
|
||||||
|
- temperature cpu=50
|
||||||
|
- temperature cpu=75
|
||||||
|
- temperature cpu=25
|
||||||
|
- temperature cpu=50
|
||||||
|
- temperature cpu=75
|
||||||
|
+ temperature,batch=0 cpu=25
|
||||||
|
+ temperature,batch=1 cpu=50
|
||||||
|
+ temperature,batch=2 cpu=75
|
||||||
|
+ temperature,batch=0 cpu=25
|
||||||
|
+ temperature,batch=1 cpu=50
|
||||||
|
+ temperature,batch=2 cpu=75
|
||||||
|
```
|
||||||
|
|
@ -0,0 +1,48 @@
|
||||||
|
package batch
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "embed"
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
|
"strconv"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed sample.conf
|
||||||
|
var sampleConfig string
|
||||||
|
|
||||||
|
type Batch struct {
|
||||||
|
BatchTag string `toml:"batch_tag"`
|
||||||
|
NumBatches uint64 `toml:"batches"`
|
||||||
|
SkipExisting bool `toml:"skip_existing"`
|
||||||
|
|
||||||
|
// the number of metrics that have been processed so far
|
||||||
|
count atomic.Uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*Batch) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batch) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
|
out := make([]telegraf.Metric, 0, len(in))
|
||||||
|
for _, m := range in {
|
||||||
|
if b.SkipExisting && m.HasTag(b.BatchTag) {
|
||||||
|
out = append(out, m)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
oldCount := b.count.Add(1) - 1
|
||||||
|
batchID := oldCount % b.NumBatches
|
||||||
|
m.AddTag(b.BatchTag, strconv.FormatUint(batchID, 10))
|
||||||
|
out = append(out, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
processors.Add("batch", func() telegraf.Processor {
|
||||||
|
return &Batch{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,110 @@
|
||||||
|
package batch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
const batchTag = "?internal_batch_idx"
|
||||||
|
|
||||||
|
func Test_SingleMetricPutInBatch0(t *testing.T) {
|
||||||
|
b := &Batch{
|
||||||
|
BatchTag: batchTag,
|
||||||
|
NumBatches: 1,
|
||||||
|
}
|
||||||
|
m := testutil.MockMetricsWithValue(1)
|
||||||
|
expectedM := testutil.MockMetricsWithValue(1)
|
||||||
|
expectedM[0].AddTag(batchTag, "0")
|
||||||
|
|
||||||
|
res := b.Apply(m...)
|
||||||
|
testutil.RequireMetricsEqual(t, expectedM, res)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_MetricsSmallerThanBatchSizeAreInDifferentBatches(t *testing.T) {
|
||||||
|
b := &Batch{
|
||||||
|
BatchTag: batchTag,
|
||||||
|
NumBatches: 3,
|
||||||
|
}
|
||||||
|
|
||||||
|
ms := make([]telegraf.Metric, 0, 2)
|
||||||
|
for range cap(ms) {
|
||||||
|
ms = append(ms, testutil.MockMetrics()...)
|
||||||
|
}
|
||||||
|
|
||||||
|
res := b.Apply(ms...)
|
||||||
|
|
||||||
|
batchTagValue, ok := res[0].GetTag(batchTag)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "0", batchTagValue)
|
||||||
|
|
||||||
|
batchTagValue, ok = res[1].GetTag(batchTag)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "1", batchTagValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_MetricsEqualToBatchSizeInDifferentBatches(t *testing.T) {
|
||||||
|
b := &Batch{
|
||||||
|
BatchTag: batchTag,
|
||||||
|
NumBatches: 3,
|
||||||
|
}
|
||||||
|
|
||||||
|
ms := make([]telegraf.Metric, 0, 3)
|
||||||
|
for range cap(ms) {
|
||||||
|
ms = append(ms, testutil.MockMetrics()...)
|
||||||
|
}
|
||||||
|
|
||||||
|
res := b.Apply(ms...)
|
||||||
|
batchTagValue, ok := res[0].GetTag(batchTag)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "0", batchTagValue)
|
||||||
|
|
||||||
|
batchTagValue, ok = res[1].GetTag(batchTag)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "1", batchTagValue)
|
||||||
|
|
||||||
|
batchTagValue, ok = res[2].GetTag(batchTag)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "2", batchTagValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_MetricsMoreThanBatchSizeInSameBatch(t *testing.T) {
|
||||||
|
b := &Batch{
|
||||||
|
BatchTag: batchTag,
|
||||||
|
NumBatches: 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
ms := make([]telegraf.Metric, 0, 3)
|
||||||
|
for range cap(ms) {
|
||||||
|
ms = append(ms, testutil.MockMetrics()...)
|
||||||
|
}
|
||||||
|
|
||||||
|
res := b.Apply(ms...)
|
||||||
|
batchTagValue, ok := res[0].GetTag(batchTag)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "0", batchTagValue)
|
||||||
|
|
||||||
|
batchTagValue, ok = res[1].GetTag(batchTag)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "1", batchTagValue)
|
||||||
|
|
||||||
|
batchTagValue, ok = res[2].GetTag(batchTag)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "0", batchTagValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_MetricWithExistingTagNotChanged(t *testing.T) {
|
||||||
|
b := &Batch{
|
||||||
|
BatchTag: batchTag,
|
||||||
|
NumBatches: 2,
|
||||||
|
SkipExisting: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
m := testutil.MockMetricsWithValue(1)
|
||||||
|
m[0].AddTag(batchTag, "4")
|
||||||
|
res := b.Apply(m...)
|
||||||
|
tv, ok := res[0].GetTag(batchTag)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, "4", tv)
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
## Batch metrics into separate batches by adding a tag indicating the batch index.
|
||||||
|
[[processors.batch]]
|
||||||
|
## The name of the tag to use for adding the batch index
|
||||||
|
batch_tag = "my_batch"
|
||||||
|
|
||||||
|
## The number of batches to create
|
||||||
|
batches = 16
|
||||||
|
|
||||||
|
## Do not assign metrics with an existing batch assignment to a
|
||||||
|
## different batch.
|
||||||
|
# skip_existing = false
|
||||||
Loading…
Reference in New Issue