feat: Add common expression language metric filtering (#13144)
This commit is contained in:
parent
a868add749
commit
129d8eb98e
|
|
@ -1386,6 +1386,8 @@ func (c *Config) buildFilter(tbl *ast.Table) (models.Filter, error) {
|
|||
c.getFieldStringSlice(tbl, "tagexclude", &f.TagExclude)
|
||||
c.getFieldStringSlice(tbl, "taginclude", &f.TagInclude)
|
||||
|
||||
c.getFieldString(tbl, "metricpass", &f.MetricPass)
|
||||
|
||||
if c.hasErrs() {
|
||||
return f, c.firstErr()
|
||||
}
|
||||
|
|
@ -1536,7 +1538,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
|
|||
"grace",
|
||||
"interval",
|
||||
"lvm", // What is this used for?
|
||||
"metric_batch_size", "metric_buffer_limit",
|
||||
"metric_batch_size", "metric_buffer_limit", "metricpass",
|
||||
"name_override", "name_prefix", "name_suffix", "namedrop", "namepass",
|
||||
"order",
|
||||
"pass", "period", "precision",
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/models"
|
||||
"github.com/influxdata/telegraf/persister"
|
||||
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||
|
|
@ -32,6 +33,7 @@ import (
|
|||
"github.com/influxdata/telegraf/plugins/processors"
|
||||
"github.com/influxdata/telegraf/plugins/serializers"
|
||||
_ "github.com/influxdata/telegraf/plugins/serializers/all" // Blank import to have all serializers for testing
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
func TestReadBinaryFile(t *testing.T) {
|
||||
|
|
@ -514,6 +516,66 @@ func TestConfig_URLLikeFileName(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestConfig_Filtering(t *testing.T) {
|
||||
c := NewConfig()
|
||||
require.NoError(t, c.LoadAll("./testdata/filter_metricpass.toml"))
|
||||
require.Len(t, c.Processors, 1)
|
||||
|
||||
in := []telegraf.Metric{
|
||||
metric.New(
|
||||
"machine",
|
||||
map[string]string{"state": "on"},
|
||||
map[string]interface{}{"value": 42.0},
|
||||
time.Date(2023, time.April, 23, 01, 15, 30, 0, time.UTC),
|
||||
),
|
||||
metric.New(
|
||||
"machine",
|
||||
map[string]string{"state": "off"},
|
||||
map[string]interface{}{"value": 23.0},
|
||||
time.Date(2023, time.April, 23, 23, 59, 01, 0, time.UTC),
|
||||
),
|
||||
metric.New(
|
||||
"temperature",
|
||||
map[string]string{},
|
||||
map[string]interface{}{"value": 23.5},
|
||||
time.Date(2023, time.April, 24, 02, 15, 30, 0, time.UTC),
|
||||
),
|
||||
}
|
||||
expected := []telegraf.Metric{
|
||||
metric.New(
|
||||
"machine",
|
||||
map[string]string{
|
||||
"state": "on",
|
||||
"processed": "yes",
|
||||
},
|
||||
map[string]interface{}{"value": 42.0},
|
||||
time.Date(2023, time.April, 23, 01, 15, 30, 0, time.UTC),
|
||||
),
|
||||
metric.New(
|
||||
"machine",
|
||||
map[string]string{"state": "off"},
|
||||
map[string]interface{}{"value": 23.0},
|
||||
time.Date(2023, time.April, 23, 23, 59, 01, 0, time.UTC),
|
||||
),
|
||||
metric.New(
|
||||
"temperature",
|
||||
map[string]string{
|
||||
"processed": "yes",
|
||||
},
|
||||
map[string]interface{}{"value": 23.5},
|
||||
time.Date(2023, time.April, 24, 02, 15, 30, 0, time.UTC),
|
||||
),
|
||||
}
|
||||
|
||||
plugin := c.Processors[0]
|
||||
var acc testutil.Accumulator
|
||||
for _, m := range in {
|
||||
require.NoError(t, plugin.Add(m, &acc))
|
||||
}
|
||||
actual := acc.GetTelegrafMetrics()
|
||||
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
|
||||
}
|
||||
|
||||
func TestConfig_SerializerInterfaceNewFormat(t *testing.T) {
|
||||
formats := []string{
|
||||
"carbon2",
|
||||
|
|
@ -1434,11 +1496,13 @@ func (m *MockupProcessorPlugin) Stop() {
|
|||
func (m *MockupProcessorPlugin) SampleConfig() string {
|
||||
return "Mockup test processor plugin with parser"
|
||||
}
|
||||
func (m *MockupProcessorPlugin) Apply(_ ...telegraf.Metric) []telegraf.Metric {
|
||||
return nil
|
||||
}
|
||||
func (m *MockupProcessorPlugin) Add(_ telegraf.Metric, _ telegraf.Accumulator) error {
|
||||
return nil
|
||||
func (m *MockupProcessorPlugin) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||
out := make([]telegraf.Metric, 0, len(in))
|
||||
for _, m := range in {
|
||||
m.AddTag("processed", "yes")
|
||||
out = append(out, m)
|
||||
}
|
||||
return out
|
||||
}
|
||||
func (m *MockupProcessorPlugin) GetState() interface{} {
|
||||
return m.state
|
||||
|
|
|
|||
|
|
@ -0,0 +1,2 @@
|
|||
[[processors.processor]]
|
||||
metricpass = '("state" in tags and tags.state == "on") or time > timestamp("2023-04-24T00:00:00Z")'
|
||||
|
|
@ -595,11 +595,11 @@ excluded from a Processor or Aggregator plugin, it is skips the plugin and is
|
|||
sent onwards to the next stage of processing.
|
||||
|
||||
- **namepass**:
|
||||
An array of [glob pattern][] strings. Only metrics whose measurement name matches
|
||||
a pattern in this list are emitted.
|
||||
An array of [glob pattern][] strings. Only metrics whose measurement name
|
||||
matches a pattern in this list are emitted.
|
||||
|
||||
- **namedrop**:
|
||||
The inverse of `namepass`. If a match is found the metric is discarded. This
|
||||
The inverse of `namepass`. If a match is found the metric is discarded. This
|
||||
is tested on metrics after they have passed the `namepass` test.
|
||||
|
||||
- **tagpass**:
|
||||
|
|
@ -620,6 +620,25 @@ is tested on metrics after they have passed the `tagpass` test.
|
|||
> tables. This limitation does not apply when using the inline table
|
||||
> syntax (`{...}`).
|
||||
|
||||
- **metricpass**:
|
||||
A ["Common Expression Language"][CEL] (CEL) expression with boolean result where
|
||||
`true` will allow the metric to pass, otherwise the metric is discarded. This
|
||||
filter expression is more general compared to e.g. `namepass` and also allows
|
||||
for time-based filtering. An introduction to the CEL language can be found
|
||||
[here][CEL intro]. Further details, such as available functions and expressions,
|
||||
are provided in the [language definition][CEL lang] as well as in the
|
||||
[extension documentation][CEL ext].
|
||||
|
||||
> NOTE: As CEL is an _interpreted_ languguage, this type of filtering is much
|
||||
> slower compared to `namepass`/`namedrop` and friends. So consider to use the
|
||||
> more restricted filter options where possible in case of high-throughput
|
||||
> scenarios.
|
||||
|
||||
[CEL]:https://github.com/google/cel-go/tree/master
|
||||
[CEL intro]: https://codelabs.developers.google.com/codelabs/cel-go
|
||||
[CEL lang]: https://github.com/google/cel-spec/blob/master/doc/langdef.md
|
||||
[CEL ext]: https://github.com/google/cel-go/tree/master/ext#readme
|
||||
|
||||
### Modifiers
|
||||
|
||||
Modifier filters remove tags and fields from a metric. If all fields are
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ following works:
|
|||
- github.com/antchfx/jsonquery [MIT License](https://github.com/antchfx/jsonquery/blob/master/LICENSE)
|
||||
- github.com/antchfx/xmlquery [MIT License](https://github.com/antchfx/xmlquery/blob/master/LICENSE)
|
||||
- github.com/antchfx/xpath [MIT License](https://github.com/antchfx/xpath/blob/master/LICENSE)
|
||||
- github.com/antlr/antlr4/runtime/Go/antlr [BSD 3-Clause "New" or "Revised" License](https://github.com/antlr/antlr4/blob/master/LICENSE.txt)
|
||||
- github.com/apache/arrow/go/arrow [Apache License 2.0](https://github.com/apache/arrow/blob/master/LICENSE.txt)
|
||||
- github.com/apache/iotdb-client-go [Apache License 2.0](https://github.com/apache/iotdb-client-go/blob/main/LICENSE)
|
||||
- github.com/apache/thrift [Apache License 2.0](https://github.com/apache/thrift/blob/master/LICENSE)
|
||||
|
|
@ -141,6 +142,7 @@ following works:
|
|||
- github.com/golang/groupcache [Apache License 2.0](https://github.com/golang/groupcache/blob/master/LICENSE)
|
||||
- github.com/golang/protobuf [BSD 3-Clause "New" or "Revised" License](https://github.com/golang/protobuf/blob/master/LICENSE)
|
||||
- github.com/golang/snappy [BSD 3-Clause "New" or "Revised" License](https://github.com/golang/snappy/blob/master/LICENSE)
|
||||
- github.com/google/cel-go [Apache License 2.0](https://github.com/google/cel-go/blob/master/LICENSE)
|
||||
- github.com/google/flatbuffers [Apache License 2.0](https://github.com/google/flatbuffers/blob/master/LICENSE)
|
||||
- github.com/google/gnostic [Apache License 2.0](https://github.com/google/gnostic/blob/master/LICENSE)
|
||||
- github.com/google/gnxi [Apache License 2.0](https://github.com/google/gnxi/blob/master/LICENSE)
|
||||
|
|
@ -298,6 +300,7 @@ following works:
|
|||
- github.com/sleepinggenius2/gosmi [MIT License](https://github.com/sleepinggenius2/gosmi/blob/master/LICENSE)
|
||||
- github.com/snowflakedb/gosnowflake [Apache License 2.0](https://github.com/snowflakedb/gosnowflake/blob/master/LICENSE)
|
||||
- github.com/spf13/pflag [BSD 3-Clause "New" or "Revised" License](https://github.com/spf13/pflag/blob/master/LICENSE)
|
||||
- github.com/stoewer/go-strcase [MIT License](https://github.com/stoewer/go-strcase/blob/master/LICENSE)
|
||||
- github.com/stretchr/objx [MIT License](https://github.com/stretchr/objx/blob/master/LICENSE)
|
||||
- github.com/stretchr/testify [MIT License](https://github.com/stretchr/testify/blob/master/LICENSE)
|
||||
- github.com/testcontainers/testcontainers-go [MIT License](https://github.com/testcontainers/testcontainers-go/blob/main/LICENSE)
|
||||
|
|
|
|||
6
go.mod
6
go.mod
|
|
@ -200,6 +200,12 @@ require (
|
|||
modernc.org/sqlite v1.21.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect
|
||||
github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc
|
||||
github.com/stoewer/go-strcase v1.2.0 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.107.0 // indirect
|
||||
cloud.google.com/go/compute v1.15.1 // indirect
|
||||
|
|
|
|||
5
go.sum
5
go.sum
|
|
@ -261,6 +261,8 @@ github.com/antchfx/xpath v1.2.4 h1:dW1HB/JxKvGtJ9WyVGJ0sIoEcqftV3SqIstujI+B9XY=
|
|||
github.com/antchfx/xpath v1.2.4/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs=
|
||||
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
|
||||
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
|
||||
github.com/antonmedv/expr v1.9.0/go.mod h1:5qsM3oLGDND7sDmQGDXHkYfkjYMUX14qsgqmHhwGEk8=
|
||||
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0=
|
||||
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ=
|
||||
|
|
@ -1012,6 +1014,8 @@ github.com/google/addlicense v0.0.0-20200906110928-a0294312aa76/go.mod h1:EMjYTR
|
|||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
|
||||
github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc h1:jd+stC3Fqf9kaqgCLOdm4Da/AN3txPTlmLB6tStXAcU=
|
||||
github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc/go.mod h1:YzWEoI07MC/a/wj9in8GeVatqfypkldgBlwXh9bCwqY=
|
||||
github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
||||
github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
||||
github.com/google/flatbuffers v2.0.0+incompatible h1:dicJ2oXwypfwUGnB2/TYWYEKiuk9eYQlQO/AnOHl5mI=
|
||||
|
|
@ -2146,6 +2150,7 @@ github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q
|
|||
github.com/spf13/viper v1.11.0/go.mod h1:djo0X/bA5+tYVoCn+C7cAYJGcVn/qYLFTG8gdUsX7Zk=
|
||||
github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI=
|
||||
github.com/ssgreg/nlreturn/v2 v2.1.0/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I=
|
||||
github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU=
|
||||
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
|
||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||
|
|
|
|||
196
models/filter.go
196
models/filter.go
|
|
@ -1,7 +1,16 @@
|
|||
package models
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/google/cel-go/cel"
|
||||
"github.com/google/cel-go/checker/decls"
|
||||
"github.com/google/cel-go/common/types"
|
||||
"github.com/google/cel-go/common/types/ref"
|
||||
"github.com/google/cel-go/ext"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/filter"
|
||||
|
|
@ -43,81 +52,111 @@ type Filter struct {
|
|||
TagInclude []string
|
||||
tagIncludeFilter filter.Filter
|
||||
|
||||
// New metric-filtering interface
|
||||
MetricPass string
|
||||
metricFilter cel.Program
|
||||
|
||||
selectActive bool
|
||||
modifyActive bool
|
||||
|
||||
isActive bool
|
||||
}
|
||||
|
||||
// Compile all Filter lists into filter.Filter objects.
|
||||
func (f *Filter) Compile() error {
|
||||
if len(f.NameDrop) == 0 &&
|
||||
len(f.NamePass) == 0 &&
|
||||
len(f.FieldDrop) == 0 &&
|
||||
len(f.FieldPass) == 0 &&
|
||||
len(f.TagInclude) == 0 &&
|
||||
len(f.TagExclude) == 0 &&
|
||||
len(f.TagPassFilters) == 0 &&
|
||||
len(f.TagDropFilters) == 0 {
|
||||
f.selectActive = len(f.NamePass) > 0 || len(f.NameDrop) > 0
|
||||
f.selectActive = f.selectActive || len(f.TagPassFilters) > 0 || len(f.TagDropFilters) > 0
|
||||
f.selectActive = f.selectActive || f.MetricPass != ""
|
||||
|
||||
f.modifyActive = len(f.FieldPass) > 0 || len(f.FieldDrop) > 0
|
||||
f.modifyActive = f.modifyActive || len(f.TagInclude) > 0 || len(f.TagExclude) > 0
|
||||
|
||||
f.isActive = f.selectActive || f.modifyActive
|
||||
|
||||
if !f.isActive {
|
||||
return nil
|
||||
}
|
||||
|
||||
f.isActive = true
|
||||
var err error
|
||||
f.nameDropFilter, err = filter.Compile(f.NameDrop)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'namedrop', %w", err)
|
||||
}
|
||||
f.namePassFilter, err = filter.Compile(f.NamePass)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'namepass', %w", err)
|
||||
}
|
||||
if f.selectActive {
|
||||
var err error
|
||||
f.nameDropFilter, err = filter.Compile(f.NameDrop)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'namedrop', %w", err)
|
||||
}
|
||||
f.namePassFilter, err = filter.Compile(f.NamePass)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'namepass', %w", err)
|
||||
}
|
||||
|
||||
f.fieldDropFilter, err = filter.Compile(f.FieldDrop)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'fielddrop', %w", err)
|
||||
}
|
||||
f.fieldPassFilter, err = filter.Compile(f.FieldPass)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'fieldpass', %w", err)
|
||||
}
|
||||
|
||||
f.tagExcludeFilter, err = filter.Compile(f.TagExclude)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'tagexclude', %w", err)
|
||||
}
|
||||
f.tagIncludeFilter, err = filter.Compile(f.TagInclude)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'taginclude', %w", err)
|
||||
}
|
||||
|
||||
for i := 0; i < len(f.TagDropFilters); i++ {
|
||||
if err := f.TagDropFilters[i].Compile(); err != nil {
|
||||
return fmt.Errorf("error compiling 'tagdrop', %w", err)
|
||||
for i := range f.TagPassFilters {
|
||||
if err := f.TagPassFilters[i].Compile(); err != nil {
|
||||
return fmt.Errorf("error compiling 'tagpass', %w", err)
|
||||
}
|
||||
}
|
||||
for i := range f.TagDropFilters {
|
||||
if err := f.TagDropFilters[i].Compile(); err != nil {
|
||||
return fmt.Errorf("error compiling 'tagdrop', %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < len(f.TagPassFilters); i++ {
|
||||
if err := f.TagPassFilters[i].Compile(); err != nil {
|
||||
return fmt.Errorf("error compiling 'tagpass', %w", err)
|
||||
if f.modifyActive {
|
||||
var err error
|
||||
f.fieldDropFilter, err = filter.Compile(f.FieldDrop)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'fielddrop', %w", err)
|
||||
}
|
||||
f.fieldPassFilter, err = filter.Compile(f.FieldPass)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'fieldpass', %w", err)
|
||||
}
|
||||
|
||||
f.tagExcludeFilter, err = filter.Compile(f.TagExclude)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'tagexclude', %w", err)
|
||||
}
|
||||
f.tagIncludeFilter, err = filter.Compile(f.TagInclude)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error compiling 'taginclude', %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
return f.compileMetricFilter()
|
||||
}
|
||||
|
||||
// Select returns true if the metric matches according to the
|
||||
// namepass/namedrop and tagpass/tagdrop filters. The metric is not modified.
|
||||
func (f *Filter) Select(metric telegraf.Metric) bool {
|
||||
if !f.isActive {
|
||||
return true
|
||||
// namepass/namedrop, tagpass/tagdrop and metric filters.
|
||||
// The metric is not modified.
|
||||
func (f *Filter) Select(metric telegraf.Metric) (bool, error) {
|
||||
if !f.selectActive {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if !f.shouldNamePass(metric.Name()) {
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if !f.shouldTagsPass(metric.TagList()) {
|
||||
return false
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true
|
||||
if f.metricFilter != nil {
|
||||
result, _, err := f.metricFilter.Eval(map[string]interface{}{
|
||||
"name": metric.Name(),
|
||||
"tags": metric.Tags(),
|
||||
"fields": metric.Fields(),
|
||||
"time": metric.Time(),
|
||||
})
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
if r, ok := result.Value().(bool); ok {
|
||||
return r, nil
|
||||
}
|
||||
return true, fmt.Errorf("invalid result type %T", result.Value())
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Modify removes any tags and fields from the metric according to the
|
||||
|
|
@ -210,6 +249,63 @@ func (f *Filter) filterTags(metric telegraf.Metric) {
|
|||
}
|
||||
}
|
||||
|
||||
// Compile the metric filter
|
||||
func (f *Filter) compileMetricFilter() error {
|
||||
// Reset internal state
|
||||
f.metricFilter = nil
|
||||
|
||||
// Initialize the expression
|
||||
expression := f.MetricPass
|
||||
|
||||
// Replace python-like logic-operators
|
||||
expression = regexp.MustCompile(`\bnot\b`).ReplaceAllString(expression, "!")
|
||||
expression = regexp.MustCompile(`\band\b`).ReplaceAllString(expression, "&&")
|
||||
expression = regexp.MustCompile(`\bor\b`).ReplaceAllString(expression, "||")
|
||||
|
||||
// Check if we need to call into CEL at all and quit early
|
||||
if expression == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Declare the computation environment for the filter including custom functions
|
||||
env, err := cel.NewEnv(
|
||||
cel.Declarations(
|
||||
decls.NewVar("name", decls.String),
|
||||
decls.NewVar("tags", decls.NewMapType(decls.String, decls.String)),
|
||||
decls.NewVar("fields", decls.NewMapType(decls.String, decls.Dyn)),
|
||||
decls.NewVar("time", decls.Timestamp),
|
||||
),
|
||||
cel.Function(
|
||||
"now",
|
||||
cel.Overload("now", nil, cel.TimestampType),
|
||||
cel.SingletonFunctionBinding(func(_ ...ref.Val) ref.Val { return types.Timestamp{Time: time.Now()} }),
|
||||
),
|
||||
ext.Encoders(),
|
||||
ext.Math(),
|
||||
ext.Strings(),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating environment failed: %w", err)
|
||||
}
|
||||
|
||||
// Compile the program
|
||||
ast, issues := env.Compile(expression)
|
||||
if issues.Err() != nil {
|
||||
return issues.Err()
|
||||
}
|
||||
// Check if we got a boolean expression needed for filtering
|
||||
if ast.OutputType() != cel.BoolType {
|
||||
return errors.New("expression needs to return a boolean")
|
||||
}
|
||||
|
||||
// Get the final program
|
||||
options := cel.EvalOptions(
|
||||
cel.OptOptimize,
|
||||
)
|
||||
f.metricFilter, err = env.Program(ast, options)
|
||||
return err
|
||||
}
|
||||
|
||||
func ShouldPassFilters(include filter.Filter, exclude filter.Filter, key string) bool {
|
||||
if include != nil && exclude != nil {
|
||||
return include.Match(key) && !exclude.Match(key)
|
||||
|
|
|
|||
|
|
@ -20,7 +20,9 @@ func TestFilter_ApplyEmpty(t *testing.T) {
|
|||
map[string]string{},
|
||||
map[string]interface{}{"value": int64(1)},
|
||||
time.Now())
|
||||
require.True(t, f.Select(m))
|
||||
selected, err := f.Select(m)
|
||||
require.NoError(t, err)
|
||||
require.True(t, selected)
|
||||
}
|
||||
|
||||
func TestFilter_ApplyTagsDontPass(t *testing.T) {
|
||||
|
|
@ -41,7 +43,9 @@ func TestFilter_ApplyTagsDontPass(t *testing.T) {
|
|||
map[string]string{"cpu": "cpu-total"},
|
||||
map[string]interface{}{"value": int64(1)},
|
||||
time.Now())
|
||||
require.False(t, f.Select(m))
|
||||
selected, err := f.Select(m)
|
||||
require.NoError(t, err)
|
||||
require.False(t, selected)
|
||||
}
|
||||
|
||||
func TestFilter_ApplyDeleteFields(t *testing.T) {
|
||||
|
|
@ -59,7 +63,9 @@ func TestFilter_ApplyDeleteFields(t *testing.T) {
|
|||
"value2": int64(2),
|
||||
},
|
||||
time.Now())
|
||||
require.True(t, f.Select(m))
|
||||
selected, err := f.Select(m)
|
||||
require.NoError(t, err)
|
||||
require.True(t, selected)
|
||||
f.Modify(m)
|
||||
require.Equal(t, map[string]interface{}{"value2": int64(2)}, m.Fields())
|
||||
}
|
||||
|
|
@ -79,7 +85,9 @@ func TestFilter_ApplyDeleteAllFields(t *testing.T) {
|
|||
"value2": int64(2),
|
||||
},
|
||||
time.Now())
|
||||
require.True(t, f.Select(m))
|
||||
selected, err := f.Select(m)
|
||||
require.NoError(t, err)
|
||||
require.True(t, selected)
|
||||
f.Modify(m)
|
||||
require.Len(t, m.FieldList(), 0)
|
||||
}
|
||||
|
|
@ -465,7 +473,6 @@ func TestFilter_FilterTagsPassAndDrop(t *testing.T) {
|
|||
TagDropFilters: filterDrop,
|
||||
TagPassFilters: filterPass,
|
||||
}
|
||||
|
||||
require.NoError(t, f.Compile())
|
||||
|
||||
for i, tag := range inputData {
|
||||
|
|
@ -473,6 +480,118 @@ func TestFilter_FilterTagsPassAndDrop(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFilter_MetricPass(t *testing.T) {
|
||||
m := testutil.MustMetric("cpu",
|
||||
map[string]string{
|
||||
"host": "Hugin",
|
||||
"source": "myserver@mycompany.com",
|
||||
"status": "ok",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"value": 15.0,
|
||||
"id": "24cxnwr3480k",
|
||||
"on": true,
|
||||
"count": 18,
|
||||
"errors": 29,
|
||||
"total": 129,
|
||||
},
|
||||
time.Date(2023, time.April, 24, 23, 30, 15, 42, time.UTC),
|
||||
)
|
||||
|
||||
var tests = []struct {
|
||||
name string
|
||||
expression string
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "exact name match (pass)",
|
||||
expression: `name == "cpu"`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "exact name match (fail)",
|
||||
expression: `name == "test"`,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "case-insensitive tag match",
|
||||
expression: `tags.host.lowerAscii() == "hugin"`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "regexp tag match",
|
||||
expression: `tags.source.matches("^[0-9a-zA-z-_]+@mycompany.com$")`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "match field value",
|
||||
expression: `fields.count > 10`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "match timestamp year",
|
||||
expression: `time.getFullYear() == 2023`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "now",
|
||||
expression: `now() > time`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "arithmetics",
|
||||
expression: `fields.count + fields.errors < fields.total`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "arithmetics",
|
||||
expression: `fields.count + fields.errors < fields.total`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "logical expression",
|
||||
expression: `(name.startsWith("t") || fields.on) && "id" in fields && fields.id.contains("nwr")`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "python-style logical expression",
|
||||
expression: `(name.startsWith("t") or fields.on) and "id" in fields and fields.id.contains("nwr")`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "time arithmetics",
|
||||
expression: `time >= timestamp("2023-04-25T00:00:00Z") - duration("24h")`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "complex field filtering",
|
||||
expression: `fields.exists(f, type(fields[f]) in [int, uint, double] and fields[f] > 20.0)`,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "complex field filtering (exactly one)",
|
||||
expression: `fields.exists_one(f, type(fields[f]) in [int, uint, double] and fields[f] > 20.0)`,
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
f := Filter{
|
||||
MetricPass: tt.expression,
|
||||
}
|
||||
require.NoError(t, f.Compile())
|
||||
selected, err := f.Select(m)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expected, selected)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkFilter(b *testing.B) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
@ -503,13 +622,68 @@ func BenchmarkFilter(b *testing.B) {
|
|||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
{
|
||||
name: "metric filter exact name",
|
||||
filter: Filter{
|
||||
MetricPass: `name == "cpu"`,
|
||||
},
|
||||
metric: testutil.MustMetric("cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": 42,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
{
|
||||
name: "metric filter regexp",
|
||||
filter: Filter{
|
||||
MetricPass: `name.matches("^c[a-z]*$")`,
|
||||
},
|
||||
metric: testutil.MustMetric("cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": 42,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
{
|
||||
name: "metric filter time",
|
||||
filter: Filter{
|
||||
MetricPass: `time >= timestamp("2023-04-25T00:00:00Z") - duration("24h")`,
|
||||
},
|
||||
metric: testutil.MustMetric("cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": 42,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
{
|
||||
name: "metric filter complex",
|
||||
filter: Filter{
|
||||
MetricPass: `"source" in tags` +
|
||||
` and fields.exists(f, type(fields[f]) in [int, uint, double] and fields[f] > 20.0)` +
|
||||
` and time >= timestamp("2023-04-25T00:00:00Z") - duration("24h")`,
|
||||
},
|
||||
metric: testutil.MustMetric("cpu",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"value": 42,
|
||||
},
|
||||
time.Unix(0, 0),
|
||||
),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
b.Run(tt.name, func(b *testing.B) {
|
||||
require.NoError(b, tt.filter.Compile())
|
||||
for n := 0; n < b.N; n++ {
|
||||
tt.filter.Select(tt.metric)
|
||||
_, err := tt.filter.Select(tt.metric)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -133,7 +133,10 @@ func (r *RunningAggregator) MakeMetric(telegrafMetric telegraf.Metric) telegraf.
|
|||
// Add a metric to the aggregator and return true if the original metric
|
||||
// should be dropped.
|
||||
func (r *RunningAggregator) Add(m telegraf.Metric) bool {
|
||||
if ok := r.Config.Filter.Select(m); !ok {
|
||||
ok, err := r.Config.Filter.Select(m)
|
||||
if err != nil {
|
||||
r.log.Errorf("filtering failed: %v", err)
|
||||
} else if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -97,7 +97,10 @@ func (r *RunningInput) ID() string {
|
|||
}
|
||||
|
||||
func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric {
|
||||
if ok := r.Config.Filter.Select(metric); !ok {
|
||||
ok, err := r.Config.Filter.Select(metric)
|
||||
if err != nil {
|
||||
r.log.Errorf("filtering failed: %v", err)
|
||||
} else if !ok {
|
||||
r.metricFiltered(metric)
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -139,7 +139,10 @@ func (r *RunningOutput) ID() string {
|
|||
// AddMetric adds a metric to the output.
|
||||
// Takes ownership of metric
|
||||
func (r *RunningOutput) AddMetric(metric telegraf.Metric) {
|
||||
if ok := r.Config.Filter.Select(metric); !ok {
|
||||
ok, err := r.Config.Filter.Select(metric)
|
||||
if err != nil {
|
||||
r.log.Errorf("filtering failed: %v", err)
|
||||
} else if !ok {
|
||||
r.metricFiltered(metric)
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -87,7 +87,10 @@ func (rp *RunningProcessor) Start(acc telegraf.Accumulator) error {
|
|||
}
|
||||
|
||||
func (rp *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error {
|
||||
if ok := rp.Config.Filter.Select(m); !ok {
|
||||
ok, err := rp.Config.Filter.Select(m)
|
||||
if err != nil {
|
||||
rp.log.Errorf("filtering failed: %v", err)
|
||||
} else if !ok {
|
||||
// pass downstream
|
||||
acc.AddMetric(m)
|
||||
return nil
|
||||
|
|
|
|||
Loading…
Reference in New Issue