feat(processors): Add processor to split a metric into multiple (#13785)
This commit is contained in:
parent
1e42262184
commit
dec4a90b07
|
|
@ -0,0 +1,5 @@
|
||||||
|
//go:build !custom || processors || processors.split
|
||||||
|
|
||||||
|
package all
|
||||||
|
|
||||||
|
import _ "github.com/influxdata/telegraf/plugins/processors/split" // register plugin
|
||||||
|
|
@ -0,0 +1,71 @@
|
||||||
|
# Split Processor Plugin
|
||||||
|
|
||||||
|
This plugin splits a metric up into one or more metrics based on a template
|
||||||
|
the user provides. The timestamp of the new metric is based on the source
|
||||||
|
metric. Templates can overlap, where a field or tag, is used across templates
|
||||||
|
and as a result end up in multiple metrics.
|
||||||
|
|
||||||
|
**NOTE**: If drop original is changed to true, then the plugin can result in
|
||||||
|
dropping all metrics when no match is found! Please ensure to test
|
||||||
|
templates before putting into production *and* use metric filtering to
|
||||||
|
avoid data loss.
|
||||||
|
|
||||||
|
Some outputs are sensitive to the number of metric series that are produced.
|
||||||
|
Multiple metrics of the same series (i.e. identical name, tag key-values and
|
||||||
|
field name) with the same timestamp might result in squashing those points
|
||||||
|
to the latest metric produced.
|
||||||
|
|
||||||
|
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
|
||||||
|
|
||||||
|
In addition to the plugin-specific configuration settings, plugins support
|
||||||
|
additional global and plugin configuration settings. These settings are used to
|
||||||
|
modify metrics, tags, and field or create aliases and configure ordering, etc.
|
||||||
|
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
|
|
||||||
|
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
```toml @sample.conf
|
||||||
|
# Split a metric into one or more metrics with the specified field(s)/tag(s)
|
||||||
|
[[processors.split]]
|
||||||
|
## Keeps the original metric by default
|
||||||
|
# drop_original = false
|
||||||
|
|
||||||
|
## Template for an output metric
|
||||||
|
## Users can define multiple templates to split the original metric into
|
||||||
|
## multiple, potentially overlapping, metrics.
|
||||||
|
[[processors.split.template]]
|
||||||
|
## New metric name
|
||||||
|
name = ""
|
||||||
|
|
||||||
|
## List of tag keys for this metric template, accepts globs, e.g. "*"
|
||||||
|
tags = []
|
||||||
|
|
||||||
|
## List of field keys for this metric template, accepts globs, e.g. "*"
|
||||||
|
fields = []
|
||||||
|
```
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
The following takes a single metric with data from two sensors and splits out
|
||||||
|
each sensor into its own metric. It also copies all tags from the original
|
||||||
|
metric to the new metric.
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[processors.split]]
|
||||||
|
[[processors.split.metric]]
|
||||||
|
name = "sensor1"
|
||||||
|
tags = [ "*" ]
|
||||||
|
fields = [ "sensor1*" ]
|
||||||
|
[[processors.split.metric]]
|
||||||
|
name = "sensor2"
|
||||||
|
tags = [ "*" ]
|
||||||
|
fields = [ "sensor2*" ]
|
||||||
|
```
|
||||||
|
|
||||||
|
```diff
|
||||||
|
-metric,status=active sensor1_channel1=4i,sensor1_channel2=2i,sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
+sensor1,status=active sensor1_channel1=4i,sensor1_channel2=2i 1684784689000000000
|
||||||
|
+sensor2,status=active sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
```
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
# Split a metric into one or more metrics with the specified field(s)/tag(s)
|
||||||
|
[[processors.split]]
|
||||||
|
## Keeps the original metric by default
|
||||||
|
# drop_original = false
|
||||||
|
|
||||||
|
## Template for an output metric
|
||||||
|
## Users can define multiple templates to split the original metric into
|
||||||
|
## multiple, potentially overlapping, metrics.
|
||||||
|
[[processors.split.template]]
|
||||||
|
## New metric name
|
||||||
|
name = ""
|
||||||
|
|
||||||
|
## List of tag keys for this metric template, accepts globs, e.g. "*"
|
||||||
|
tags = []
|
||||||
|
|
||||||
|
## List of field keys for this metric template, accepts globs, e.g. "*"
|
||||||
|
fields = []
|
||||||
|
|
@ -0,0 +1,111 @@
|
||||||
|
//go:generate ../../../tools/readme_config_includer/generator
|
||||||
|
package split
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "embed"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/filter"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed sample.conf
|
||||||
|
var sampleConfig string
|
||||||
|
|
||||||
|
type Split struct {
|
||||||
|
Templates []template `toml:"template"`
|
||||||
|
DropOriginal bool `toml:"drop_original"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type template struct {
|
||||||
|
Name string `toml:"name"`
|
||||||
|
Tags []string `toml:"tags"`
|
||||||
|
Fields []string `toml:"fields"`
|
||||||
|
|
||||||
|
fieldFilters filter.Filter
|
||||||
|
tagFilters filter.Filter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*Split) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Split) Init() error {
|
||||||
|
if len(s.Templates) == 0 {
|
||||||
|
return errors.New("at least one template required")
|
||||||
|
}
|
||||||
|
|
||||||
|
for index, template := range s.Templates {
|
||||||
|
if template.Name == "" {
|
||||||
|
return errors.New("metric name cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(template.Fields) == 0 {
|
||||||
|
return errors.New("at least one field is required for a valid metric")
|
||||||
|
}
|
||||||
|
f, err := filter.Compile(template.Fields)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create new field filter: %w", err)
|
||||||
|
}
|
||||||
|
s.Templates[index].fieldFilters = f
|
||||||
|
|
||||||
|
if len(template.Tags) != 0 {
|
||||||
|
f, err := filter.Compile(template.Tags)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create new tag filter: %w", err)
|
||||||
|
}
|
||||||
|
s.Templates[index].tagFilters = f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Split) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||||
|
newMetrics := []telegraf.Metric{}
|
||||||
|
|
||||||
|
for _, point := range in {
|
||||||
|
if s.DropOriginal {
|
||||||
|
point.Accept()
|
||||||
|
} else {
|
||||||
|
newMetrics = append(newMetrics, point)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, template := range s.Templates {
|
||||||
|
fields := make(map[string]any, len(point.FieldList()))
|
||||||
|
for _, field := range point.FieldList() {
|
||||||
|
if template.fieldFilters.Match(field.Key) {
|
||||||
|
fields[field.Key] = field.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tags := make(map[string]string, len(point.TagList()))
|
||||||
|
if len(template.Tags) != 0 {
|
||||||
|
for _, tag := range point.TagList() {
|
||||||
|
if template.tagFilters.Match(tag.Key) {
|
||||||
|
tags[tag.Key] = tag.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// metric with no fields should be skipped
|
||||||
|
if len(fields) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
m := metric.New(template.Name, tags, fields, point.Time())
|
||||||
|
newMetrics = append(newMetrics, m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return newMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
processors.Add("split", func() telegraf.Processor {
|
||||||
|
return &Split{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,67 @@
|
||||||
|
package split
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
|
"github.com/influxdata/telegraf/plugins/processors"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCases(t *testing.T) {
|
||||||
|
folders, err := os.ReadDir("testcases")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, folders)
|
||||||
|
|
||||||
|
processors.Add("split", func() telegraf.Processor {
|
||||||
|
return &Split{}
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, f := range folders {
|
||||||
|
// Only handle folders
|
||||||
|
if !f.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fname := f.Name()
|
||||||
|
testdataPath := filepath.Join("testcases", fname)
|
||||||
|
configFilename := filepath.Join(testdataPath, "config.toml")
|
||||||
|
inputFilename := filepath.Join(testdataPath, "input.influx")
|
||||||
|
expectedFilename := filepath.Join(testdataPath, "expected.out")
|
||||||
|
|
||||||
|
t.Run(fname, func(t *testing.T) {
|
||||||
|
// Get parser to parse input and expected output
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
|
input, err := testutil.ParseMetricsFromFile(inputFilename, parser)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var expected []telegraf.Metric
|
||||||
|
if _, err := os.Stat(expectedFilename); err == nil {
|
||||||
|
var err error
|
||||||
|
expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure the plugin
|
||||||
|
cfg := config.NewConfig()
|
||||||
|
require.NoError(t, cfg.LoadConfig(configFilename))
|
||||||
|
require.Len(t, cfg.Processors, 1, "wrong number of processors")
|
||||||
|
|
||||||
|
proc := cfg.Processors[0].Processor.(processors.HasUnwrap)
|
||||||
|
plugin := proc.Unwrap().(*Split)
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
// Process expected metrics and compare with resulting metrics
|
||||||
|
actual := plugin.Apply(input...)
|
||||||
|
|
||||||
|
testutil.RequireMetricsEqual(t, expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
[[processors.split]]
|
||||||
|
drop_original = true
|
||||||
|
[[processors.split.template]]
|
||||||
|
name = "sensor1"
|
||||||
|
fields = ["sensor1_channel1", "sensor1_channel2"]
|
||||||
|
[[processors.split.template]]
|
||||||
|
name = "sensor2"
|
||||||
|
fields = ["sensor2_channel1", "sensor2_channel2"]
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
sensor1 sensor1_channel1=4i,sensor1_channel2=2i 1684784689000000000
|
||||||
|
sensor2 sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
metric sensor1_channel1=4i,sensor1_channel2=2i,sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
|
@ -0,0 +1,9 @@
|
||||||
|
[[processors.split]]
|
||||||
|
[[processors.split.template]]
|
||||||
|
name = "sensor1"
|
||||||
|
fields = ["sensor1*"]
|
||||||
|
tags = ["sensor1*", "host"]
|
||||||
|
[[processors.split.template]]
|
||||||
|
name = "sensor2"
|
||||||
|
fields = ["sensor2*"]
|
||||||
|
tags = ["sensor2*", "host"]
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
metric,sensor1_status=active,sensor2_status=failed,host=foobar sensor1_channel1=4i,sensor1_channel2=2i,sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
sensor1,sensor1_status=active,host=foobar sensor1_channel1=4i,sensor1_channel2=2i 1684784689000000000
|
||||||
|
sensor2,sensor2_status=failed,host=foobar sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
metric,sensor1_status=active,sensor2_status=failed,host=foobar sensor1_channel1=4i,sensor1_channel2=2i,sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
[[processors.split]]
|
||||||
|
[[processors.split.template]]
|
||||||
|
name = "sensor1"
|
||||||
|
fields = ["sensor1_channel1", "sensor1_channel2"]
|
||||||
|
[[processors.split.template]]
|
||||||
|
name = "sensor2"
|
||||||
|
fields = ["sensor2_channel1", "sensor2_channel2"]
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
metric,tag=fake value=42 1684784689000000000
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
metric,tag=fake value=42 1684784689000000000
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
[[processors.split]]
|
||||||
|
[[processors.split.template]]
|
||||||
|
name = "sensor1"
|
||||||
|
fields = ["sensor1_channel1", "sensor1_channel2"]
|
||||||
|
[[processors.split.template]]
|
||||||
|
name = "sensor2"
|
||||||
|
fields = ["sensor2_channel1", "sensor2_channel2"]
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
metric sensor1_channel1=4i,sensor1_channel2=2i,sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
sensor1 sensor1_channel1=4i,sensor1_channel2=2i 1684784689000000000
|
||||||
|
sensor2 sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
metric sensor1_channel1=4i,sensor1_channel2=2i,sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
|
@ -0,0 +1,9 @@
|
||||||
|
[[processors.split]]
|
||||||
|
[[processors.split.template]]
|
||||||
|
name = "sensor1"
|
||||||
|
fields = ["sensor1_channel1", "sensor1_channel2"]
|
||||||
|
tags = ["status", "hostname"]
|
||||||
|
[[processors.split.template]]
|
||||||
|
name = "sensor2"
|
||||||
|
fields = ["sensor2_channel1", "sensor2_channel2"]
|
||||||
|
tags = ["status", "hostname"]
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
metric,status=active,hostname=foobar sensor1_channel1=4i,sensor1_channel2=2i,sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
sensor1,status=active,hostname=foobar sensor1_channel1=4i,sensor1_channel2=2i 1684784689000000000
|
||||||
|
sensor2,status=active,hostname=foobar sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
metric,status=active,hostname=foobar sensor1_channel1=4i,sensor1_channel2=2i,sensor2_channel1=1i,sensor2_channel2=2i 1684784689000000000
|
||||||
Loading…
Reference in New Issue