diff --git a/config/config.go b/config/config.go index 55db82400..d0525da39 100644 --- a/config/config.go +++ b/config/config.go @@ -1386,6 +1386,8 @@ func (c *Config) buildFilter(tbl *ast.Table) (models.Filter, error) { c.getFieldStringSlice(tbl, "tagexclude", &f.TagExclude) c.getFieldStringSlice(tbl, "taginclude", &f.TagInclude) + c.getFieldString(tbl, "metricpass", &f.MetricPass) + if c.hasErrs() { return f, c.firstErr() } @@ -1536,7 +1538,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error { "grace", "interval", "lvm", // What is this used for? - "metric_batch_size", "metric_buffer_limit", + "metric_batch_size", "metric_buffer_limit", "metricpass", "name_override", "name_prefix", "name_suffix", "namedrop", "namepass", "order", "pass", "period", "precision", diff --git a/config/config_test.go b/config/config_test.go index 3cce7d411..fd892e204 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/persister" "github.com/influxdata/telegraf/plugins/common/tls" @@ -32,6 +33,7 @@ import ( "github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/serializers" _ "github.com/influxdata/telegraf/plugins/serializers/all" // Blank import to have all serializers for testing + "github.com/influxdata/telegraf/testutil" ) func TestReadBinaryFile(t *testing.T) { @@ -514,6 +516,66 @@ func TestConfig_URLLikeFileName(t *testing.T) { } } +func TestConfig_Filtering(t *testing.T) { + c := NewConfig() + require.NoError(t, c.LoadAll("./testdata/filter_metricpass.toml")) + require.Len(t, c.Processors, 1) + + in := []telegraf.Metric{ + metric.New( + "machine", + map[string]string{"state": "on"}, + map[string]interface{}{"value": 42.0}, + time.Date(2023, time.April, 23, 01, 15, 30, 0, time.UTC), + ), + metric.New( + "machine", + map[string]string{"state": "off"}, + map[string]interface{}{"value": 23.0}, + time.Date(2023, time.April, 23, 23, 59, 01, 0, time.UTC), + ), + metric.New( + "temperature", + map[string]string{}, + map[string]interface{}{"value": 23.5}, + time.Date(2023, time.April, 24, 02, 15, 30, 0, time.UTC), + ), + } + expected := []telegraf.Metric{ + metric.New( + "machine", + map[string]string{ + "state": "on", + "processed": "yes", + }, + map[string]interface{}{"value": 42.0}, + time.Date(2023, time.April, 23, 01, 15, 30, 0, time.UTC), + ), + metric.New( + "machine", + map[string]string{"state": "off"}, + map[string]interface{}{"value": 23.0}, + time.Date(2023, time.April, 23, 23, 59, 01, 0, time.UTC), + ), + metric.New( + "temperature", + map[string]string{ + "processed": "yes", + }, + map[string]interface{}{"value": 23.5}, + time.Date(2023, time.April, 24, 02, 15, 30, 0, time.UTC), + ), + } + + plugin := c.Processors[0] + var acc testutil.Accumulator + for _, m := range in { + require.NoError(t, plugin.Add(m, &acc)) + } + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) +} + func TestConfig_SerializerInterfaceNewFormat(t *testing.T) { formats := []string{ "carbon2", @@ -1434,11 +1496,13 @@ func (m *MockupProcessorPlugin) Stop() { func (m *MockupProcessorPlugin) SampleConfig() string { return "Mockup test processor plugin with parser" } -func (m *MockupProcessorPlugin) Apply(_ ...telegraf.Metric) []telegraf.Metric { - return nil -} -func (m *MockupProcessorPlugin) Add(_ telegraf.Metric, _ telegraf.Accumulator) error { - return nil +func (m *MockupProcessorPlugin) Apply(in ...telegraf.Metric) []telegraf.Metric { + out := make([]telegraf.Metric, 0, len(in)) + for _, m := range in { + m.AddTag("processed", "yes") + out = append(out, m) + } + return out } func (m *MockupProcessorPlugin) GetState() interface{} { return m.state diff --git a/config/testdata/filter_metricpass.toml b/config/testdata/filter_metricpass.toml new file mode 100644 index 000000000..92d600d1d --- /dev/null +++ b/config/testdata/filter_metricpass.toml @@ -0,0 +1,2 @@ +[[processors.processor]] + metricpass = '("state" in tags and tags.state == "on") or time > timestamp("2023-04-24T00:00:00Z")' \ No newline at end of file diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index cb3f44af8..ed4fff6c1 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -595,11 +595,11 @@ excluded from a Processor or Aggregator plugin, it is skips the plugin and is sent onwards to the next stage of processing. - **namepass**: -An array of [glob pattern][] strings. Only metrics whose measurement name matches -a pattern in this list are emitted. +An array of [glob pattern][] strings. Only metrics whose measurement name +matches a pattern in this list are emitted. - **namedrop**: -The inverse of `namepass`. If a match is found the metric is discarded. This +The inverse of `namepass`. If a match is found the metric is discarded. This is tested on metrics after they have passed the `namepass` test. - **tagpass**: @@ -620,6 +620,25 @@ is tested on metrics after they have passed the `tagpass` test. > tables. This limitation does not apply when using the inline table > syntax (`{...}`). +- **metricpass**: +A ["Common Expression Language"][CEL] (CEL) expression with boolean result where +`true` will allow the metric to pass, otherwise the metric is discarded. This +filter expression is more general compared to e.g. `namepass` and also allows +for time-based filtering. An introduction to the CEL language can be found +[here][CEL intro]. Further details, such as available functions and expressions, +are provided in the [language definition][CEL lang] as well as in the +[extension documentation][CEL ext]. + +> NOTE: As CEL is an _interpreted_ languguage, this type of filtering is much +> slower compared to `namepass`/`namedrop` and friends. So consider to use the +> more restricted filter options where possible in case of high-throughput +> scenarios. + +[CEL]:https://github.com/google/cel-go/tree/master +[CEL intro]: https://codelabs.developers.google.com/codelabs/cel-go +[CEL lang]: https://github.com/google/cel-spec/blob/master/doc/langdef.md +[CEL ext]: https://github.com/google/cel-go/tree/master/ext#readme + ### Modifiers Modifier filters remove tags and fields from a metric. If all fields are diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index d3a9acac7..0a4ba3afc 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -40,6 +40,7 @@ following works: - github.com/antchfx/jsonquery [MIT License](https://github.com/antchfx/jsonquery/blob/master/LICENSE) - github.com/antchfx/xmlquery [MIT License](https://github.com/antchfx/xmlquery/blob/master/LICENSE) - github.com/antchfx/xpath [MIT License](https://github.com/antchfx/xpath/blob/master/LICENSE) +- github.com/antlr/antlr4/runtime/Go/antlr [BSD 3-Clause "New" or "Revised" License](https://github.com/antlr/antlr4/blob/master/LICENSE.txt) - github.com/apache/arrow/go/arrow [Apache License 2.0](https://github.com/apache/arrow/blob/master/LICENSE.txt) - github.com/apache/iotdb-client-go [Apache License 2.0](https://github.com/apache/iotdb-client-go/blob/main/LICENSE) - github.com/apache/thrift [Apache License 2.0](https://github.com/apache/thrift/blob/master/LICENSE) @@ -141,6 +142,7 @@ following works: - github.com/golang/groupcache [Apache License 2.0](https://github.com/golang/groupcache/blob/master/LICENSE) - github.com/golang/protobuf [BSD 3-Clause "New" or "Revised" License](https://github.com/golang/protobuf/blob/master/LICENSE) - github.com/golang/snappy [BSD 3-Clause "New" or "Revised" License](https://github.com/golang/snappy/blob/master/LICENSE) +- github.com/google/cel-go [Apache License 2.0](https://github.com/google/cel-go/blob/master/LICENSE) - github.com/google/flatbuffers [Apache License 2.0](https://github.com/google/flatbuffers/blob/master/LICENSE) - github.com/google/gnostic [Apache License 2.0](https://github.com/google/gnostic/blob/master/LICENSE) - github.com/google/gnxi [Apache License 2.0](https://github.com/google/gnxi/blob/master/LICENSE) @@ -298,6 +300,7 @@ following works: - github.com/sleepinggenius2/gosmi [MIT License](https://github.com/sleepinggenius2/gosmi/blob/master/LICENSE) - github.com/snowflakedb/gosnowflake [Apache License 2.0](https://github.com/snowflakedb/gosnowflake/blob/master/LICENSE) - github.com/spf13/pflag [BSD 3-Clause "New" or "Revised" License](https://github.com/spf13/pflag/blob/master/LICENSE) +- github.com/stoewer/go-strcase [MIT License](https://github.com/stoewer/go-strcase/blob/master/LICENSE) - github.com/stretchr/objx [MIT License](https://github.com/stretchr/objx/blob/master/LICENSE) - github.com/stretchr/testify [MIT License](https://github.com/stretchr/testify/blob/master/LICENSE) - github.com/testcontainers/testcontainers-go [MIT License](https://github.com/testcontainers/testcontainers-go/blob/main/LICENSE) diff --git a/go.mod b/go.mod index 76cff65f1..a4218cedf 100644 --- a/go.mod +++ b/go.mod @@ -200,6 +200,12 @@ require ( modernc.org/sqlite v1.21.0 ) +require ( + github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df // indirect + github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc + github.com/stoewer/go-strcase v1.2.0 // indirect +) + require ( cloud.google.com/go v0.107.0 // indirect cloud.google.com/go/compute v1.15.1 // indirect diff --git a/go.sum b/go.sum index 51c02ae1e..8fa74e5ae 100644 --- a/go.sum +++ b/go.sum @@ -261,6 +261,8 @@ github.com/antchfx/xpath v1.2.4 h1:dW1HB/JxKvGtJ9WyVGJ0sIoEcqftV3SqIstujI+B9XY= github.com/antchfx/xpath v1.2.4/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs= github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18= +github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM= github.com/antonmedv/expr v1.9.0/go.mod h1:5qsM3oLGDND7sDmQGDXHkYfkjYMUX14qsgqmHhwGEk8= github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 h1:q4dksr6ICHXqG5hm0ZW5IHyeEJXoIJSOZeBLmWPNeIQ= @@ -1012,6 +1014,8 @@ github.com/google/addlicense v0.0.0-20200906110928-a0294312aa76/go.mod h1:EMjYTR github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= +github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc h1:jd+stC3Fqf9kaqgCLOdm4Da/AN3txPTlmLB6tStXAcU= +github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc/go.mod h1:YzWEoI07MC/a/wj9in8GeVatqfypkldgBlwXh9bCwqY= github.com/google/flatbuffers v1.11.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v2.0.0+incompatible h1:dicJ2oXwypfwUGnB2/TYWYEKiuk9eYQlQO/AnOHl5mI= @@ -2146,6 +2150,7 @@ github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q github.com/spf13/viper v1.11.0/go.mod h1:djo0X/bA5+tYVoCn+C7cAYJGcVn/qYLFTG8gdUsX7Zk= github.com/spf13/viper v1.12.0/go.mod h1:b6COn30jlNxbm/V2IqWiNWkJ+vZNiMNksliPCiuKtSI= github.com/ssgreg/nlreturn/v2 v2.1.0/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRkkxBiELzh2I= +github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/models/filter.go b/models/filter.go index 8229dec49..d02778906 100644 --- a/models/filter.go +++ b/models/filter.go @@ -1,7 +1,16 @@ package models import ( + "errors" "fmt" + "regexp" + "time" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/checker/decls" + "github.com/google/cel-go/common/types" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/ext" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" @@ -43,81 +52,111 @@ type Filter struct { TagInclude []string tagIncludeFilter filter.Filter + // New metric-filtering interface + MetricPass string + metricFilter cel.Program + + selectActive bool + modifyActive bool + isActive bool } // Compile all Filter lists into filter.Filter objects. func (f *Filter) Compile() error { - if len(f.NameDrop) == 0 && - len(f.NamePass) == 0 && - len(f.FieldDrop) == 0 && - len(f.FieldPass) == 0 && - len(f.TagInclude) == 0 && - len(f.TagExclude) == 0 && - len(f.TagPassFilters) == 0 && - len(f.TagDropFilters) == 0 { + f.selectActive = len(f.NamePass) > 0 || len(f.NameDrop) > 0 + f.selectActive = f.selectActive || len(f.TagPassFilters) > 0 || len(f.TagDropFilters) > 0 + f.selectActive = f.selectActive || f.MetricPass != "" + + f.modifyActive = len(f.FieldPass) > 0 || len(f.FieldDrop) > 0 + f.modifyActive = f.modifyActive || len(f.TagInclude) > 0 || len(f.TagExclude) > 0 + + f.isActive = f.selectActive || f.modifyActive + + if !f.isActive { return nil } - f.isActive = true - var err error - f.nameDropFilter, err = filter.Compile(f.NameDrop) - if err != nil { - return fmt.Errorf("error compiling 'namedrop', %w", err) - } - f.namePassFilter, err = filter.Compile(f.NamePass) - if err != nil { - return fmt.Errorf("error compiling 'namepass', %w", err) - } + if f.selectActive { + var err error + f.nameDropFilter, err = filter.Compile(f.NameDrop) + if err != nil { + return fmt.Errorf("error compiling 'namedrop', %w", err) + } + f.namePassFilter, err = filter.Compile(f.NamePass) + if err != nil { + return fmt.Errorf("error compiling 'namepass', %w", err) + } - f.fieldDropFilter, err = filter.Compile(f.FieldDrop) - if err != nil { - return fmt.Errorf("error compiling 'fielddrop', %w", err) - } - f.fieldPassFilter, err = filter.Compile(f.FieldPass) - if err != nil { - return fmt.Errorf("error compiling 'fieldpass', %w", err) - } - - f.tagExcludeFilter, err = filter.Compile(f.TagExclude) - if err != nil { - return fmt.Errorf("error compiling 'tagexclude', %w", err) - } - f.tagIncludeFilter, err = filter.Compile(f.TagInclude) - if err != nil { - return fmt.Errorf("error compiling 'taginclude', %w", err) - } - - for i := 0; i < len(f.TagDropFilters); i++ { - if err := f.TagDropFilters[i].Compile(); err != nil { - return fmt.Errorf("error compiling 'tagdrop', %w", err) + for i := range f.TagPassFilters { + if err := f.TagPassFilters[i].Compile(); err != nil { + return fmt.Errorf("error compiling 'tagpass', %w", err) + } + } + for i := range f.TagDropFilters { + if err := f.TagDropFilters[i].Compile(); err != nil { + return fmt.Errorf("error compiling 'tagdrop', %w", err) + } } } - for i := 0; i < len(f.TagPassFilters); i++ { - if err := f.TagPassFilters[i].Compile(); err != nil { - return fmt.Errorf("error compiling 'tagpass', %w", err) + if f.modifyActive { + var err error + f.fieldDropFilter, err = filter.Compile(f.FieldDrop) + if err != nil { + return fmt.Errorf("error compiling 'fielddrop', %w", err) + } + f.fieldPassFilter, err = filter.Compile(f.FieldPass) + if err != nil { + return fmt.Errorf("error compiling 'fieldpass', %w", err) + } + + f.tagExcludeFilter, err = filter.Compile(f.TagExclude) + if err != nil { + return fmt.Errorf("error compiling 'tagexclude', %w", err) + } + f.tagIncludeFilter, err = filter.Compile(f.TagInclude) + if err != nil { + return fmt.Errorf("error compiling 'taginclude', %w", err) } } - return nil + + return f.compileMetricFilter() } // Select returns true if the metric matches according to the -// namepass/namedrop and tagpass/tagdrop filters. The metric is not modified. -func (f *Filter) Select(metric telegraf.Metric) bool { - if !f.isActive { - return true +// namepass/namedrop, tagpass/tagdrop and metric filters. +// The metric is not modified. +func (f *Filter) Select(metric telegraf.Metric) (bool, error) { + if !f.selectActive { + return true, nil } if !f.shouldNamePass(metric.Name()) { - return false + return false, nil } if !f.shouldTagsPass(metric.TagList()) { - return false + return false, nil } - return true + if f.metricFilter != nil { + result, _, err := f.metricFilter.Eval(map[string]interface{}{ + "name": metric.Name(), + "tags": metric.Tags(), + "fields": metric.Fields(), + "time": metric.Time(), + }) + if err != nil { + return true, err + } + if r, ok := result.Value().(bool); ok { + return r, nil + } + return true, fmt.Errorf("invalid result type %T", result.Value()) + } + + return true, nil } // Modify removes any tags and fields from the metric according to the @@ -210,6 +249,63 @@ func (f *Filter) filterTags(metric telegraf.Metric) { } } +// Compile the metric filter +func (f *Filter) compileMetricFilter() error { + // Reset internal state + f.metricFilter = nil + + // Initialize the expression + expression := f.MetricPass + + // Replace python-like logic-operators + expression = regexp.MustCompile(`\bnot\b`).ReplaceAllString(expression, "!") + expression = regexp.MustCompile(`\band\b`).ReplaceAllString(expression, "&&") + expression = regexp.MustCompile(`\bor\b`).ReplaceAllString(expression, "||") + + // Check if we need to call into CEL at all and quit early + if expression == "" { + return nil + } + + // Declare the computation environment for the filter including custom functions + env, err := cel.NewEnv( + cel.Declarations( + decls.NewVar("name", decls.String), + decls.NewVar("tags", decls.NewMapType(decls.String, decls.String)), + decls.NewVar("fields", decls.NewMapType(decls.String, decls.Dyn)), + decls.NewVar("time", decls.Timestamp), + ), + cel.Function( + "now", + cel.Overload("now", nil, cel.TimestampType), + cel.SingletonFunctionBinding(func(_ ...ref.Val) ref.Val { return types.Timestamp{Time: time.Now()} }), + ), + ext.Encoders(), + ext.Math(), + ext.Strings(), + ) + if err != nil { + return fmt.Errorf("creating environment failed: %w", err) + } + + // Compile the program + ast, issues := env.Compile(expression) + if issues.Err() != nil { + return issues.Err() + } + // Check if we got a boolean expression needed for filtering + if ast.OutputType() != cel.BoolType { + return errors.New("expression needs to return a boolean") + } + + // Get the final program + options := cel.EvalOptions( + cel.OptOptimize, + ) + f.metricFilter, err = env.Program(ast, options) + return err +} + func ShouldPassFilters(include filter.Filter, exclude filter.Filter, key string) bool { if include != nil && exclude != nil { return include.Match(key) && !exclude.Match(key) diff --git a/models/filter_test.go b/models/filter_test.go index 92e186887..f2adfd183 100644 --- a/models/filter_test.go +++ b/models/filter_test.go @@ -20,7 +20,9 @@ func TestFilter_ApplyEmpty(t *testing.T) { map[string]string{}, map[string]interface{}{"value": int64(1)}, time.Now()) - require.True(t, f.Select(m)) + selected, err := f.Select(m) + require.NoError(t, err) + require.True(t, selected) } func TestFilter_ApplyTagsDontPass(t *testing.T) { @@ -41,7 +43,9 @@ func TestFilter_ApplyTagsDontPass(t *testing.T) { map[string]string{"cpu": "cpu-total"}, map[string]interface{}{"value": int64(1)}, time.Now()) - require.False(t, f.Select(m)) + selected, err := f.Select(m) + require.NoError(t, err) + require.False(t, selected) } func TestFilter_ApplyDeleteFields(t *testing.T) { @@ -59,7 +63,9 @@ func TestFilter_ApplyDeleteFields(t *testing.T) { "value2": int64(2), }, time.Now()) - require.True(t, f.Select(m)) + selected, err := f.Select(m) + require.NoError(t, err) + require.True(t, selected) f.Modify(m) require.Equal(t, map[string]interface{}{"value2": int64(2)}, m.Fields()) } @@ -79,7 +85,9 @@ func TestFilter_ApplyDeleteAllFields(t *testing.T) { "value2": int64(2), }, time.Now()) - require.True(t, f.Select(m)) + selected, err := f.Select(m) + require.NoError(t, err) + require.True(t, selected) f.Modify(m) require.Len(t, m.FieldList(), 0) } @@ -465,7 +473,6 @@ func TestFilter_FilterTagsPassAndDrop(t *testing.T) { TagDropFilters: filterDrop, TagPassFilters: filterPass, } - require.NoError(t, f.Compile()) for i, tag := range inputData { @@ -473,6 +480,118 @@ func TestFilter_FilterTagsPassAndDrop(t *testing.T) { } } +func TestFilter_MetricPass(t *testing.T) { + m := testutil.MustMetric("cpu", + map[string]string{ + "host": "Hugin", + "source": "myserver@mycompany.com", + "status": "ok", + }, + map[string]interface{}{ + "value": 15.0, + "id": "24cxnwr3480k", + "on": true, + "count": 18, + "errors": 29, + "total": 129, + }, + time.Date(2023, time.April, 24, 23, 30, 15, 42, time.UTC), + ) + + var tests = []struct { + name string + expression string + expected bool + }{ + { + name: "empty", + expected: true, + }, + { + name: "exact name match (pass)", + expression: `name == "cpu"`, + expected: true, + }, + { + name: "exact name match (fail)", + expression: `name == "test"`, + expected: false, + }, + { + name: "case-insensitive tag match", + expression: `tags.host.lowerAscii() == "hugin"`, + expected: true, + }, + { + name: "regexp tag match", + expression: `tags.source.matches("^[0-9a-zA-z-_]+@mycompany.com$")`, + expected: true, + }, + { + name: "match field value", + expression: `fields.count > 10`, + expected: true, + }, + { + name: "match timestamp year", + expression: `time.getFullYear() == 2023`, + expected: true, + }, + { + name: "now", + expression: `now() > time`, + expected: true, + }, + { + name: "arithmetics", + expression: `fields.count + fields.errors < fields.total`, + expected: true, + }, + { + name: "arithmetics", + expression: `fields.count + fields.errors < fields.total`, + expected: true, + }, + { + name: "logical expression", + expression: `(name.startsWith("t") || fields.on) && "id" in fields && fields.id.contains("nwr")`, + expected: true, + }, + { + name: "python-style logical expression", + expression: `(name.startsWith("t") or fields.on) and "id" in fields and fields.id.contains("nwr")`, + expected: true, + }, + { + name: "time arithmetics", + expression: `time >= timestamp("2023-04-25T00:00:00Z") - duration("24h")`, + expected: true, + }, + { + name: "complex field filtering", + expression: `fields.exists(f, type(fields[f]) in [int, uint, double] and fields[f] > 20.0)`, + expected: true, + }, + { + name: "complex field filtering (exactly one)", + expression: `fields.exists_one(f, type(fields[f]) in [int, uint, double] and fields[f] > 20.0)`, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f := Filter{ + MetricPass: tt.expression, + } + require.NoError(t, f.Compile()) + selected, err := f.Select(m) + require.NoError(t, err) + require.Equal(t, tt.expected, selected) + }) + } +} + func BenchmarkFilter(b *testing.B) { tests := []struct { name string @@ -503,13 +622,68 @@ func BenchmarkFilter(b *testing.B) { time.Unix(0, 0), ), }, + { + name: "metric filter exact name", + filter: Filter{ + MetricPass: `name == "cpu"`, + }, + metric: testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + }, + { + name: "metric filter regexp", + filter: Filter{ + MetricPass: `name.matches("^c[a-z]*$")`, + }, + metric: testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + }, + { + name: "metric filter time", + filter: Filter{ + MetricPass: `time >= timestamp("2023-04-25T00:00:00Z") - duration("24h")`, + }, + metric: testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + }, + { + name: "metric filter complex", + filter: Filter{ + MetricPass: `"source" in tags` + + ` and fields.exists(f, type(fields[f]) in [int, uint, double] and fields[f] > 20.0)` + + ` and time >= timestamp("2023-04-25T00:00:00Z") - duration("24h")`, + }, + metric: testutil.MustMetric("cpu", + map[string]string{}, + map[string]interface{}{ + "value": 42, + }, + time.Unix(0, 0), + ), + }, } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { require.NoError(b, tt.filter.Compile()) for n := 0; n < b.N; n++ { - tt.filter.Select(tt.metric) + _, err := tt.filter.Select(tt.metric) + require.NoError(b, err) } }) } diff --git a/models/running_aggregator.go b/models/running_aggregator.go index 930906125..49d616718 100644 --- a/models/running_aggregator.go +++ b/models/running_aggregator.go @@ -133,7 +133,10 @@ func (r *RunningAggregator) MakeMetric(telegrafMetric telegraf.Metric) telegraf. // Add a metric to the aggregator and return true if the original metric // should be dropped. func (r *RunningAggregator) Add(m telegraf.Metric) bool { - if ok := r.Config.Filter.Select(m); !ok { + ok, err := r.Config.Filter.Select(m) + if err != nil { + r.log.Errorf("filtering failed: %v", err) + } else if !ok { return false } diff --git a/models/running_input.go b/models/running_input.go index 5684e3801..3d6e72f60 100644 --- a/models/running_input.go +++ b/models/running_input.go @@ -97,7 +97,10 @@ func (r *RunningInput) ID() string { } func (r *RunningInput) MakeMetric(metric telegraf.Metric) telegraf.Metric { - if ok := r.Config.Filter.Select(metric); !ok { + ok, err := r.Config.Filter.Select(metric) + if err != nil { + r.log.Errorf("filtering failed: %v", err) + } else if !ok { r.metricFiltered(metric) return nil } diff --git a/models/running_output.go b/models/running_output.go index 3f42b17b7..36bd53abe 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -139,7 +139,10 @@ func (r *RunningOutput) ID() string { // AddMetric adds a metric to the output. // Takes ownership of metric func (r *RunningOutput) AddMetric(metric telegraf.Metric) { - if ok := r.Config.Filter.Select(metric); !ok { + ok, err := r.Config.Filter.Select(metric) + if err != nil { + r.log.Errorf("filtering failed: %v", err) + } else if !ok { r.metricFiltered(metric) return } diff --git a/models/running_processor.go b/models/running_processor.go index 5053da587..9eed96be5 100644 --- a/models/running_processor.go +++ b/models/running_processor.go @@ -87,7 +87,10 @@ func (rp *RunningProcessor) Start(acc telegraf.Accumulator) error { } func (rp *RunningProcessor) Add(m telegraf.Metric, acc telegraf.Accumulator) error { - if ok := rp.Config.Filter.Select(m); !ok { + ok, err := rp.Config.Filter.Select(m) + if err != nil { + rp.log.Errorf("filtering failed: %v", err) + } else if !ok { // pass downstream acc.AddMetric(m) return nil