feat(processors.regex): Allow batch transforms using named groups (#13971)
This commit is contained in:
parent
d636560483
commit
d07701f35f
|
|
@ -1,19 +1,12 @@
|
|||
# Regex Processor Plugin
|
||||
|
||||
The `regex` plugin transforms tag and field values with regex pattern. If
|
||||
`result_key` parameter is present, it can produce new tags and fields from
|
||||
existing ones.
|
||||
This plugin transforms tag and field _values_ as well as renaming tags, fields
|
||||
and metrics using regex patterns. Tag and field _values_ can be transformed
|
||||
using named-groups in a batch fashion.
|
||||
|
||||
The regex processor **only operates on string fields**. It will not work on
|
||||
any other data types, like an integer or float.
|
||||
|
||||
For tags transforms, if `append` is set to `true`, it will append the
|
||||
transformation to the existing tag value, instead of overwriting it.
|
||||
|
||||
For metrics transforms, `key` denotes the element that should be
|
||||
transformed. Furthermore, `result_key` allows control over the behavior applied
|
||||
in case the resulting `tag` or `field` name already exists.
|
||||
|
||||
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
|
||||
|
||||
In addition to the plugin-specific configuration settings, plugins support
|
||||
|
|
@ -30,74 +23,221 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
[[processors.regex]]
|
||||
namepass = ["nginx_requests"]
|
||||
|
||||
# Tag and field conversions defined in a separate sub-tables
|
||||
## Tag value conversion(s). Multiple instances are allowed.
|
||||
[[processors.regex.tags]]
|
||||
## Tag to change, "*" will change every tag
|
||||
## Tag(s) to process with optional glob expressions such as '*'.
|
||||
key = "resp_code"
|
||||
## Regular expression to match on a tag value
|
||||
## Regular expression to match the tag value. If the value doesn't
|
||||
## match the tag is ignored.
|
||||
pattern = "^(\\d)\\d\\d$"
|
||||
## Matches of the pattern will be replaced with this string. Use ${1}
|
||||
## notation to use the text of the first submatch.
|
||||
## Replacement expression defining the value of the target tag. You can
|
||||
## use regexp groups or named groups e.g. ${1} references the first group.
|
||||
replacement = "${1}xx"
|
||||
## Name of the target tag defaulting to 'key' if not specified.
|
||||
## In case of wildcards being used in `key` the currently processed
|
||||
## tag-name is used as target.
|
||||
# result_key = "method"
|
||||
## Appends the replacement to the target tag instead of overwriting it when
|
||||
## set to true.
|
||||
# append = false
|
||||
|
||||
## Field value conversion(s). Multiple instances are allowed.
|
||||
[[processors.regex.fields]]
|
||||
## Field to change
|
||||
## Field(s) to process with optional glob expressions such as '*'.
|
||||
key = "request"
|
||||
## All the power of the Go regular expressions available here
|
||||
## For example, named subgroups
|
||||
## Regular expression to match the field value. If the value doesn't
|
||||
## match or the field doesn't contain a string the field is ignored.
|
||||
pattern = "^/api(?P<method>/[\\w/]+)\\S*"
|
||||
## Replacement expression defining the value of the target field. You can
|
||||
## use regexp groups or named groups e.g. ${method} references the group
|
||||
## named "method".
|
||||
replacement = "${method}"
|
||||
## If result_key is present, a new field will be created
|
||||
## instead of changing existing field
|
||||
result_key = "method"
|
||||
## Name of the target field defaulting to 'key' if not specified.
|
||||
## In case of wildcards being used in `key` the currently processed
|
||||
## field-name is used as target.
|
||||
# result_key = "method"
|
||||
|
||||
# Multiple conversions may be applied for one field sequentially
|
||||
# Let's extract one more value
|
||||
[[processors.regex.fields]]
|
||||
key = "request"
|
||||
pattern = ".*category=(\\w+).*"
|
||||
replacement = "${1}"
|
||||
result_key = "search_category"
|
||||
|
||||
# Rename metric fields
|
||||
## Rename metric fields
|
||||
[[processors.regex.field_rename]]
|
||||
## Regular expression to match on a field name
|
||||
## Regular expression to match on the field name
|
||||
pattern = "^search_(\\w+)d$"
|
||||
## Matches of the pattern will be replaced with this string. Use ${1}
|
||||
## notation to use the text of the first submatch.
|
||||
## Replacement expression defining the name of the new field
|
||||
replacement = "${1}"
|
||||
## If the new field name already exists, you can either "overwrite" the
|
||||
## existing one with the value of the renamed field OR you can "keep"
|
||||
## both the existing and source field.
|
||||
# result_key = "keep"
|
||||
|
||||
# Rename metric tags
|
||||
# [[processors.regex.tag_rename]]
|
||||
# ## Regular expression to match on a tag name
|
||||
# pattern = "^search_(\\w+)d$"
|
||||
# ## Matches of the pattern will be replaced with this string. Use ${1}
|
||||
# ## notation to use the text of the first submatch.
|
||||
# replacement = "${1}"
|
||||
# ## If the new tag name already exists, you can either "overwrite" the
|
||||
# ## existing one with the value of the renamed tag OR you can "keep"
|
||||
# ## both the existing and source tag.
|
||||
# # result_key = "keep"
|
||||
## Rename metric tags
|
||||
[[processors.regex.tag_rename]]
|
||||
## Regular expression to match on a tag name
|
||||
pattern = "^search_(\\w+)d$"
|
||||
## Replacement expression defining the name of the new tag
|
||||
replacement = "${1}"
|
||||
## If the new tag name already exists, you can either "overwrite" the
|
||||
## existing one with the value of the renamed tag OR you can "keep"
|
||||
## both the existing and source tag.
|
||||
# result_key = "keep"
|
||||
|
||||
# Rename metrics
|
||||
# [[processors.regex.metric_rename]]
|
||||
# ## Regular expression to match on an metric name
|
||||
# pattern = "^search_(\\w+)d$"
|
||||
# ## Matches of the pattern will be replaced with this string. Use ${1}
|
||||
# ## notation to use the text of the first submatch.
|
||||
# replacement = "${1}"
|
||||
## Rename metrics
|
||||
[[processors.regex.metric_rename]]
|
||||
## Regular expression to match on an metric name
|
||||
pattern = "^search_(\\w+)d$"
|
||||
## Replacement expression defining the new name of the metric
|
||||
replacement = "${1}"
|
||||
```
|
||||
|
||||
Please note, you can use multiple `tags`, `fields`, `tag_rename`, `field_rename`
|
||||
and `metric_rename` sections in one processor. All of those are applied.
|
||||
|
||||
### Tag and field _value_ conversions
|
||||
|
||||
Conversions are only applied if a tag/field _name_ matches the `key` which can
|
||||
contain glob statements such as `*` (asterix) _and_ the `pattern` matches the
|
||||
tag/field _value_. For fields the field values has to be of type `string` to
|
||||
apply the conversion. If any of the given criteria does not apply the conversion
|
||||
is not applied to the metric.
|
||||
|
||||
The `replacement` option specifies the value of the resulting tag or field. It
|
||||
can reference capturing groups by index (e.g. `${1}` being the first group) or
|
||||
by name (e.g. `${mygroup}` being the group named `mygroup`).
|
||||
|
||||
By default, the currently processed tag or field is overwritten by the
|
||||
`replacement`. To create a new tag or field you can additionally specify the
|
||||
`result_key` option containing the new target tag or field name. In case the
|
||||
given tag or field already exists, its value is overwritten. For `tags` you
|
||||
might use the `append` flag to append the `replacement` value to an existing
|
||||
tag.
|
||||
|
||||
### Batch processing using named groups
|
||||
|
||||
In `tags` and `fields` sections it is possible to use named groups to create
|
||||
multiple new tags or fields respectively. To do so, _all_ capture groups have
|
||||
to be named in the `pattern`. Additional non-capturing ones or other
|
||||
expressions are allowed. Furthermore, neither `replacement` nor `result_key`
|
||||
can be set as the resulting tag/field name is the name of the group and the
|
||||
value corresponds to the group's content.
|
||||
|
||||
### Tag and field _name_ conversions
|
||||
|
||||
You can batch-rename tags and fields using the `tag_rename` and `field_rename`
|
||||
sections. Contrary to the `tags` and `fields` sections, the rename operates on
|
||||
the tag or field _name_, not its _value_.
|
||||
|
||||
A tag or field is renamed if the given `pattern` matches the name. The new name
|
||||
is specified via the `replacement` option. Optionally, the `result_key` can be
|
||||
set to either `overwrite` or `keep` (default) to control the behavior in case
|
||||
the target tag/field already exists. For `overwrite` the target tag/field is
|
||||
replaced by the source key. With this setting, the source tag/field
|
||||
is removed in any case. When using the `keep` setting (default), the target
|
||||
tag/field as well as the source is left unchanged and no renaming takes place.
|
||||
|
||||
### Metric _name_ conversions
|
||||
|
||||
Similar to the tag and field renaming, `metric_rename` section(s) can be used
|
||||
to rename metrics matching the given `pattern`. The resulting metric name is
|
||||
given via `replacement` option. If matching `pattern` the conversion is always
|
||||
applied. The `result_key` option has no effect on metric renaming and shall
|
||||
not be specified.
|
||||
|
||||
## Tags
|
||||
|
||||
No tags are applied by this processor.
|
||||
|
||||
## Example
|
||||
|
||||
In the following examples we are using this metric
|
||||
|
||||
```text
|
||||
nginx_requests,verb=GET,resp_code=2xx request="/api/search/?category=plugins&q=regex&sort=asc",method="/search/",category="plugins",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||
nginx_requests,verb=GET,resp_code=200 request="/api/search/?category=plugins&q=regex&sort=asc",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||
```
|
||||
|
||||
### Explicit specification
|
||||
|
||||
```toml
|
||||
[[processors.regex]]
|
||||
namepass = ["nginx_requests"]
|
||||
|
||||
[[processors.regex.tags]]
|
||||
key = "resp_code"
|
||||
pattern = "^(\\d)\\d\\d$"
|
||||
replacement = "${1}xx"
|
||||
|
||||
[[processors.regex.fields]]
|
||||
key = "request"
|
||||
pattern = "^/api(?P<method>/[\\w/]+)\\S*"
|
||||
replacement = "${method}"
|
||||
result_key = "method"
|
||||
|
||||
[[processors.regex.fields]]
|
||||
key = "request"
|
||||
pattern = ".*category=(\\w+).*"
|
||||
replacement = "${1}"
|
||||
result_key = "search_category"
|
||||
|
||||
[[processors.regex.field_rename]]
|
||||
pattern = "^client_(\\w+)$"
|
||||
replacement = "${1}"
|
||||
```
|
||||
|
||||
will result in
|
||||
|
||||
```diff
|
||||
-nginx_requests,verb=GET,resp_code=200 request="/api/search/?category=plugins&q=regex&sort=asc",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||
+nginx_requests,verb=GET,resp_code=2xx request="/api/search/?category=plugins&q=regex&sort=asc",method="/search/",category="plugins",referrer="-",ident="-",http_version=1.1,agent="UserAgent",ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||
```
|
||||
|
||||
### Appending
|
||||
|
||||
```toml
|
||||
[[processors.regex]]
|
||||
namepass = ["nginx_requests"]
|
||||
|
||||
[[processors.regex.tags]]
|
||||
key = "resp_code"
|
||||
pattern = '^2\d\d$'
|
||||
replacement = " OK"
|
||||
result_key = "verb"
|
||||
append = true
|
||||
```
|
||||
|
||||
will result in
|
||||
|
||||
```diff
|
||||
-nginx_requests,verb=GET,resp_code=200 request="/api/search/?category=plugins&q=regex&sort=asc",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||
+nginx_requests,verb=GET\ OK,resp_code=200 request="/api/search/?category=plugins&q=regex&sort=asc",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||
```
|
||||
|
||||
### Named groups
|
||||
|
||||
```toml
|
||||
[[processors.regex]]
|
||||
namepass = ["nginx_requests"]
|
||||
|
||||
[[processors.regex.fields]]
|
||||
key = "request"
|
||||
pattern = '^/api/(?P<method>\w+)[/?].*category=(?P<category>\w+)&(?:.*)'
|
||||
```
|
||||
|
||||
will result in
|
||||
|
||||
```diff
|
||||
-nginx_requests,verb=GET,resp_code=200 request="/api/search/?category=plugins&q=regex&sort=asc",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||
+nginx_requests,verb=GET,resp_code=200 request="/api/search/?category=plugins&q=regex&sort=asc",method="search",category="plugins",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||
```
|
||||
|
||||
### Metric renaming
|
||||
|
||||
```toml
|
||||
[[processors.regex]]
|
||||
[[processors.regex.metric_rename]]
|
||||
pattern = '^(\w+)_.*$'
|
||||
replacement = "${1}"
|
||||
```
|
||||
|
||||
will result in
|
||||
|
||||
```diff
|
||||
-nginx_requests,verb=GET,resp_code=200 request="/api/search/?category=plugins&q=regex&sort=asc",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||
+nginx,verb=GET,resp_code=200 request="/api/search/?category=plugins&q=regex&sort=asc",referrer="-",ident="-",http_version=1.1,agent="UserAgent",client_ip="127.0.0.1",auth="-",resp_bytes=270i 1519652321000000000
|
||||
```
|
||||
|
|
|
|||
|
|
@ -0,0 +1,223 @@
|
|||
package regex
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/filter"
|
||||
)
|
||||
|
||||
func (c *converter) setup(ct converterType) error {
|
||||
// Compile the pattern
|
||||
re, err := regexp.Compile(c.Pattern)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.re = re
|
||||
|
||||
switch ct {
|
||||
case convertTags, convertFields:
|
||||
if c.Key == "" {
|
||||
return errors.New("key required")
|
||||
}
|
||||
f, err := filter.Compile([]string{c.Key})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.filter = f
|
||||
|
||||
// Check for named groups
|
||||
if c.ResultKey == "" && c.Replacement == "" {
|
||||
groups := c.re.SubexpNames()
|
||||
allNamed := len(groups) > 1
|
||||
for _, g := range groups[1:] {
|
||||
if g == "" {
|
||||
allNamed = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if allNamed {
|
||||
c.groups = groups[1:]
|
||||
}
|
||||
}
|
||||
case convertTagRename, convertFieldRename:
|
||||
switch c.ResultKey {
|
||||
case "":
|
||||
c.ResultKey = "keep"
|
||||
case "overwrite", "keep":
|
||||
// Do nothing as those are valid choices
|
||||
default:
|
||||
return fmt.Errorf("invalid metrics result_key %q", c.ResultKey)
|
||||
}
|
||||
}
|
||||
|
||||
// Select the application function
|
||||
switch ct {
|
||||
case convertTags:
|
||||
c.apply = c.applyTags
|
||||
case convertFields:
|
||||
c.apply = c.applyFields
|
||||
case convertTagRename:
|
||||
c.apply = c.applyTagRename
|
||||
case convertFieldRename:
|
||||
c.apply = c.applyFieldRename
|
||||
case convertMetricRename:
|
||||
c.apply = c.applyMetricRename
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *converter) applyTags(m telegraf.Metric) {
|
||||
for _, tag := range m.TagList() {
|
||||
if !c.filter.Match(tag.Key) || !c.re.MatchString(tag.Value) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle named groups
|
||||
if len(c.groups) > 0 {
|
||||
matches := c.re.FindStringSubmatch(tag.Value)
|
||||
for i, match := range matches[1:] {
|
||||
if match == "" {
|
||||
continue
|
||||
}
|
||||
name := c.groups[i]
|
||||
if c.Append {
|
||||
if v, ok := m.GetTag(name); ok {
|
||||
match = v + match
|
||||
}
|
||||
}
|
||||
m.AddTag(name, match)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle explicit replacements
|
||||
newKey := tag.Key
|
||||
if c.ResultKey != "" {
|
||||
newKey = c.ResultKey
|
||||
}
|
||||
|
||||
newValue := c.re.ReplaceAllString(tag.Value, c.Replacement)
|
||||
if c.Append {
|
||||
if v, ok := m.GetTag(newKey); ok {
|
||||
newValue = v + newValue
|
||||
}
|
||||
}
|
||||
m.AddTag(newKey, newValue)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *converter) applyFields(m telegraf.Metric) {
|
||||
for _, field := range m.FieldList() {
|
||||
if !c.filter.Match(field.Key) {
|
||||
continue
|
||||
}
|
||||
|
||||
value, ok := field.Value.(string)
|
||||
if !ok || !c.re.MatchString(value) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle named groups
|
||||
if len(c.groups) > 0 {
|
||||
matches := c.re.FindStringSubmatch(value)
|
||||
for i, match := range matches[1:] {
|
||||
if match == "" {
|
||||
continue
|
||||
}
|
||||
name := c.groups[i]
|
||||
if c.Append {
|
||||
if v, ok := m.GetTag(name); ok {
|
||||
match = v + match
|
||||
}
|
||||
}
|
||||
m.AddField(name, match)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Handle explicit replacements
|
||||
newKey := field.Key
|
||||
if c.ResultKey != "" {
|
||||
newKey = c.ResultKey
|
||||
}
|
||||
|
||||
newValue := c.re.ReplaceAllString(value, c.Replacement)
|
||||
m.AddField(newKey, newValue)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *converter) applyTagRename(m telegraf.Metric) {
|
||||
replacements := make(map[string]string)
|
||||
for _, tag := range m.TagList() {
|
||||
name := tag.Key
|
||||
if c.re.MatchString(name) {
|
||||
newName := c.re.ReplaceAllString(name, c.Replacement)
|
||||
|
||||
if !m.HasTag(newName) {
|
||||
// There is no colliding tag, we can just change the name.
|
||||
tag.Key = newName
|
||||
continue
|
||||
}
|
||||
|
||||
if c.ResultKey == "overwrite" {
|
||||
// We got a colliding tag, remember the replacement and do it later
|
||||
replacements[name] = newName
|
||||
}
|
||||
}
|
||||
}
|
||||
// We needed to postpone the replacement as we cannot modify the tag-list
|
||||
// while iterating it as this will result in invalid memory dereference panic.
|
||||
for oldName, newName := range replacements {
|
||||
value, ok := m.GetTag(oldName)
|
||||
if !ok {
|
||||
// Just in case the tag got removed in the meantime
|
||||
continue
|
||||
}
|
||||
m.AddTag(newName, value)
|
||||
m.RemoveTag(oldName)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *converter) applyFieldRename(m telegraf.Metric) {
|
||||
replacements := make(map[string]string)
|
||||
for _, field := range m.FieldList() {
|
||||
name := field.Key
|
||||
if c.re.MatchString(name) {
|
||||
newName := c.re.ReplaceAllString(name, c.Replacement)
|
||||
|
||||
if !m.HasField(newName) {
|
||||
// There is no colliding field, we can just change the name.
|
||||
field.Key = newName
|
||||
continue
|
||||
}
|
||||
|
||||
if c.ResultKey == "overwrite" {
|
||||
// We got a colliding field, remember the replacement and do it later
|
||||
replacements[name] = newName
|
||||
}
|
||||
}
|
||||
}
|
||||
// We needed to postpone the replacement as we cannot modify the field-list
|
||||
// while iterating it as this will result in invalid memory dereference panic.
|
||||
for oldName, newName := range replacements {
|
||||
value, ok := m.GetField(oldName)
|
||||
if !ok {
|
||||
// Just in case the field got removed in the meantime
|
||||
continue
|
||||
}
|
||||
m.AddField(newName, value)
|
||||
m.RemoveField(oldName)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *converter) applyMetricRename(m telegraf.Metric) {
|
||||
value := m.Name()
|
||||
if c.re.MatchString(value) {
|
||||
newValue := c.re.ReplaceAllString(value, c.Replacement)
|
||||
m.SetName(newValue)
|
||||
}
|
||||
}
|
||||
|
|
@ -7,13 +7,23 @@ import (
|
|||
"regexp"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal/choice"
|
||||
"github.com/influxdata/telegraf/filter"
|
||||
"github.com/influxdata/telegraf/plugins/processors"
|
||||
)
|
||||
|
||||
//go:embed sample.conf
|
||||
var sampleConfig string
|
||||
|
||||
type converterType int
|
||||
|
||||
const (
|
||||
convertTags = iota
|
||||
convertFields
|
||||
convertTagRename
|
||||
convertFieldRename
|
||||
convertMetricRename
|
||||
)
|
||||
|
||||
type Regex struct {
|
||||
Tags []converter `toml:"tags"`
|
||||
Fields []converter `toml:"fields"`
|
||||
|
|
@ -21,7 +31,6 @@ type Regex struct {
|
|||
FieldRename []converter `toml:"field_rename"`
|
||||
MetricRename []converter `toml:"metric_rename"`
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
regexCache map[string]*regexp.Regexp
|
||||
}
|
||||
|
||||
type converter struct {
|
||||
|
|
@ -30,6 +39,11 @@ type converter struct {
|
|||
Replacement string `toml:"replacement"`
|
||||
ResultKey string `toml:"result_key"`
|
||||
Append bool `toml:"append"`
|
||||
|
||||
filter filter.Filter
|
||||
re *regexp.Regexp
|
||||
groups []string
|
||||
apply func(m telegraf.Metric)
|
||||
}
|
||||
|
||||
func (*Regex) SampleConfig() string {
|
||||
|
|
@ -37,56 +51,38 @@ func (*Regex) SampleConfig() string {
|
|||
}
|
||||
|
||||
func (r *Regex) Init() error {
|
||||
r.regexCache = make(map[string]*regexp.Regexp)
|
||||
|
||||
// Compile the regular expressions
|
||||
for _, c := range r.Tags {
|
||||
if _, compiled := r.regexCache[c.Pattern]; !compiled {
|
||||
r.regexCache[c.Pattern] = regexp.MustCompile(c.Pattern)
|
||||
for i := range r.Tags {
|
||||
if err := r.Tags[i].setup(convertTags); err != nil {
|
||||
return fmt.Errorf("'tags' %w", err)
|
||||
}
|
||||
}
|
||||
for _, c := range r.Fields {
|
||||
if _, compiled := r.regexCache[c.Pattern]; !compiled {
|
||||
r.regexCache[c.Pattern] = regexp.MustCompile(c.Pattern)
|
||||
for i := range r.Fields {
|
||||
if err := r.Fields[i].setup(convertFields); err != nil {
|
||||
return fmt.Errorf("'fields' %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
resultOptions := []string{"overwrite", "keep"}
|
||||
for _, c := range r.TagRename {
|
||||
for i, c := range r.TagRename {
|
||||
if c.Key != "" {
|
||||
r.Log.Info("'tag_rename' section contains a key which is ignored during processing")
|
||||
}
|
||||
|
||||
if c.ResultKey == "" {
|
||||
c.ResultKey = "keep"
|
||||
}
|
||||
if err := choice.Check(c.ResultKey, resultOptions); err != nil {
|
||||
return fmt.Errorf("invalid metrics result_key: %w", err)
|
||||
}
|
||||
|
||||
if _, compiled := r.regexCache[c.Pattern]; !compiled {
|
||||
r.regexCache[c.Pattern] = regexp.MustCompile(c.Pattern)
|
||||
if err := r.TagRename[i].setup(convertTagRename); err != nil {
|
||||
return fmt.Errorf("'tag_rename' %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, c := range r.FieldRename {
|
||||
for i, c := range r.FieldRename {
|
||||
if c.Key != "" {
|
||||
r.Log.Info("'field_rename' section contains a key which is ignored during processing")
|
||||
}
|
||||
|
||||
if c.ResultKey == "" {
|
||||
c.ResultKey = "keep"
|
||||
}
|
||||
if err := choice.Check(c.ResultKey, resultOptions); err != nil {
|
||||
return fmt.Errorf("invalid metrics result_key: %w", err)
|
||||
}
|
||||
|
||||
if _, compiled := r.regexCache[c.Pattern]; !compiled {
|
||||
r.regexCache[c.Pattern] = regexp.MustCompile(c.Pattern)
|
||||
if err := r.FieldRename[i].setup(convertFieldRename); err != nil {
|
||||
return fmt.Errorf("'field_rename' %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, c := range r.MetricRename {
|
||||
for i, c := range r.MetricRename {
|
||||
if c.Key != "" {
|
||||
r.Log.Info("'metric_rename' section contains a key which is ignored during processing")
|
||||
}
|
||||
|
|
@ -95,8 +91,8 @@ func (r *Regex) Init() error {
|
|||
r.Log.Info("'metric_rename' section contains a 'result_key' ignored during processing as metrics will ALWAYS the name")
|
||||
}
|
||||
|
||||
if _, compiled := r.regexCache[c.Pattern]; !compiled {
|
||||
r.regexCache[c.Pattern] = regexp.MustCompile(c.Pattern)
|
||||
if err := r.MetricRename[i].setup(convertMetricRename); err != nil {
|
||||
return fmt.Errorf("'metric_rename' %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -105,134 +101,30 @@ func (r *Regex) Init() error {
|
|||
|
||||
func (r *Regex) Apply(in ...telegraf.Metric) []telegraf.Metric {
|
||||
for _, metric := range in {
|
||||
for _, converter := range r.Tags {
|
||||
if converter.Key == "*" {
|
||||
for _, tag := range metric.TagList() {
|
||||
regex := r.regexCache[converter.Pattern]
|
||||
if regex.MatchString(tag.Value) {
|
||||
newValue := regex.ReplaceAllString(tag.Value, converter.Replacement)
|
||||
updateTag(converter, metric, tag.Key, newValue)
|
||||
}
|
||||
}
|
||||
} else if value, ok := metric.GetTag(converter.Key); ok {
|
||||
if key, newValue := r.convert(converter, value); newValue != "" {
|
||||
updateTag(converter, metric, key, newValue)
|
||||
}
|
||||
}
|
||||
for _, c := range r.Tags {
|
||||
c.apply(metric)
|
||||
}
|
||||
|
||||
for _, converter := range r.Fields {
|
||||
if value, ok := metric.GetField(converter.Key); ok {
|
||||
if v, ok := value.(string); ok {
|
||||
if key, newValue := r.convert(converter, v); newValue != "" {
|
||||
metric.AddField(key, newValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, c := range r.Fields {
|
||||
c.apply(metric)
|
||||
}
|
||||
|
||||
for _, converter := range r.TagRename {
|
||||
regex := r.regexCache[converter.Pattern]
|
||||
replacements := make(map[string]string)
|
||||
for _, tag := range metric.TagList() {
|
||||
name := tag.Key
|
||||
if regex.MatchString(name) {
|
||||
newName := regex.ReplaceAllString(name, converter.Replacement)
|
||||
|
||||
if !metric.HasTag(newName) {
|
||||
// There is no colliding tag, we can just change the name.
|
||||
tag.Key = newName
|
||||
continue
|
||||
}
|
||||
|
||||
if converter.ResultKey == "overwrite" {
|
||||
// We got a colliding tag, remember the replacement and do it later
|
||||
replacements[name] = newName
|
||||
}
|
||||
}
|
||||
}
|
||||
// We needed to postpone the replacement as we cannot modify the tag-list
|
||||
// while iterating it as this will result in invalid memory dereference panic.
|
||||
for oldName, newName := range replacements {
|
||||
value, ok := metric.GetTag(oldName)
|
||||
if !ok {
|
||||
// Just in case the tag got removed in the meantime
|
||||
continue
|
||||
}
|
||||
metric.AddTag(newName, value)
|
||||
metric.RemoveTag(oldName)
|
||||
}
|
||||
for _, c := range r.TagRename {
|
||||
c.apply(metric)
|
||||
}
|
||||
|
||||
for _, converter := range r.FieldRename {
|
||||
regex := r.regexCache[converter.Pattern]
|
||||
replacements := make(map[string]string)
|
||||
for _, field := range metric.FieldList() {
|
||||
name := field.Key
|
||||
if regex.MatchString(name) {
|
||||
newName := regex.ReplaceAllString(name, converter.Replacement)
|
||||
|
||||
if !metric.HasField(newName) {
|
||||
// There is no colliding field, we can just change the name.
|
||||
field.Key = newName
|
||||
continue
|
||||
}
|
||||
|
||||
if converter.ResultKey == "overwrite" {
|
||||
// We got a colliding field, remember the replacement and do it later
|
||||
replacements[name] = newName
|
||||
}
|
||||
}
|
||||
}
|
||||
// We needed to postpone the replacement as we cannot modify the field-list
|
||||
// while iterating it as this will result in invalid memory dereference panic.
|
||||
for oldName, newName := range replacements {
|
||||
value, ok := metric.GetField(oldName)
|
||||
if !ok {
|
||||
// Just in case the field got removed in the meantime
|
||||
continue
|
||||
}
|
||||
metric.AddField(newName, value)
|
||||
metric.RemoveField(oldName)
|
||||
}
|
||||
for _, c := range r.FieldRename {
|
||||
c.apply(metric)
|
||||
}
|
||||
|
||||
for _, converter := range r.MetricRename {
|
||||
regex := r.regexCache[converter.Pattern]
|
||||
value := metric.Name()
|
||||
if regex.MatchString(value) {
|
||||
newValue := regex.ReplaceAllString(value, converter.Replacement)
|
||||
metric.SetName(newValue)
|
||||
}
|
||||
for _, c := range r.MetricRename {
|
||||
c.apply(metric)
|
||||
}
|
||||
}
|
||||
|
||||
return in
|
||||
}
|
||||
|
||||
func (r *Regex) convert(c converter, src string) (key string, value string) {
|
||||
regex := r.regexCache[c.Pattern]
|
||||
|
||||
if c.ResultKey == "" || regex.MatchString(src) {
|
||||
value = regex.ReplaceAllString(src, c.Replacement)
|
||||
}
|
||||
|
||||
if c.ResultKey != "" {
|
||||
return c.ResultKey, value
|
||||
}
|
||||
|
||||
return c.Key, value
|
||||
}
|
||||
|
||||
func updateTag(converter converter, metric telegraf.Metric, key string, newValue string) {
|
||||
if converter.Append {
|
||||
if v, ok := metric.GetTag(key); ok {
|
||||
newValue = v + newValue
|
||||
}
|
||||
}
|
||||
metric.AddTag(key, newValue)
|
||||
}
|
||||
|
||||
func init() {
|
||||
processors.Add("regex", func() telegraf.Processor { return &Regex{} })
|
||||
}
|
||||
|
|
|
|||
|
|
@ -88,23 +88,25 @@ func TestFieldConversions(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
regex := Regex{
|
||||
Fields: []converter{test.converter},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, regex.Init())
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.message, func(t *testing.T) {
|
||||
regex := Regex{
|
||||
Fields: []converter{tt.converter},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, regex.Init())
|
||||
|
||||
processed := regex.Apply(newM1())
|
||||
processed := regex.Apply(newM1())
|
||||
|
||||
expectedTags := map[string]string{
|
||||
"verb": "GET",
|
||||
"resp_code": "200",
|
||||
}
|
||||
expectedTags := map[string]string{
|
||||
"verb": "GET",
|
||||
"resp_code": "200",
|
||||
}
|
||||
|
||||
require.Equal(t, test.expectedFields, processed[0].Fields(), test.message)
|
||||
require.Equal(t, expectedTags, processed[0].Tags(), "Should not change tags")
|
||||
require.Equal(t, "access_log", processed[0].Name(), "Should not change name")
|
||||
require.Equal(t, tt.expectedFields, processed[0].Fields(), tt.message)
|
||||
require.Equal(t, expectedTags, processed[0].Tags(), "Should not change tags")
|
||||
require.Equal(t, "access_log", processed[0].Name(), "Should not change name")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -763,6 +765,60 @@ func TestMultipleConversions(t *testing.T) {
|
|||
require.Equal(t, expectedTags, processed[0].Tags())
|
||||
}
|
||||
|
||||
func TestNamedGroups(t *testing.T) {
|
||||
regex := Regex{
|
||||
Tags: []converter{
|
||||
{
|
||||
Key: "resp_code",
|
||||
Pattern: "^(?P<resp_code_group>\\d)\\d\\d$",
|
||||
},
|
||||
},
|
||||
Fields: []converter{
|
||||
{
|
||||
Key: "request",
|
||||
Pattern: `^/api/(?P<method>\w+)[/?].*category=(?P<search_category>\w+)&(?:.*)`,
|
||||
},
|
||||
},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, regex.Init())
|
||||
|
||||
input := testutil.MustMetric(
|
||||
"access_log",
|
||||
map[string]string{
|
||||
"verb": "GET",
|
||||
"resp_code": "200",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"request": "/api/search/?category=plugins&q=regex&sort=asc",
|
||||
"ignore_number": int64(200),
|
||||
"ignore_bool": true,
|
||||
},
|
||||
time.Unix(1695243874, 0),
|
||||
)
|
||||
|
||||
expected := []telegraf.Metric{
|
||||
metric.New(
|
||||
"access_log",
|
||||
map[string]string{
|
||||
"verb": "GET",
|
||||
"resp_code": "200",
|
||||
"resp_code_group": "2",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"request": "/api/search/?category=plugins&q=regex&sort=asc",
|
||||
"method": "search",
|
||||
"search_category": "plugins",
|
||||
"ignore_number": int64(200),
|
||||
"ignore_bool": true,
|
||||
},
|
||||
time.Unix(1695243874, 0),
|
||||
),
|
||||
}
|
||||
actual := regex.Apply(input)
|
||||
testutil.RequireMetricsEqual(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestNoMatches(t *testing.T) {
|
||||
tests := []struct {
|
||||
message string
|
||||
|
|
@ -884,3 +940,53 @@ func TestAnyTagConversion(t *testing.T) {
|
|||
require.Equal(t, "access_log", processed[0].Name(), "Should not change name")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAnyFieldConversion(t *testing.T) {
|
||||
tests := []struct {
|
||||
message string
|
||||
converter converter
|
||||
expectedFields map[string]interface{}
|
||||
}{
|
||||
{
|
||||
message: "Should change existing fields",
|
||||
converter: converter{
|
||||
Key: "*",
|
||||
Pattern: "[0-9]{4}",
|
||||
Replacement: "{ID}",
|
||||
},
|
||||
expectedFields: map[string]interface{}{
|
||||
"counter": int64(42),
|
||||
"id": "{ID}",
|
||||
"user_id": "{ID}",
|
||||
"status": "1",
|
||||
"request": "/users/{ID}/",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
regex := Regex{
|
||||
Fields: []converter{test.converter},
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
require.NoError(t, regex.Init())
|
||||
|
||||
input := metric.New("access_log",
|
||||
map[string]string{},
|
||||
map[string]interface{}{
|
||||
"counter": int64(42),
|
||||
"id": "1234",
|
||||
"user_id": "2300",
|
||||
"status": "1",
|
||||
"request": "/users/2300/",
|
||||
},
|
||||
time.Now(),
|
||||
)
|
||||
|
||||
processed := regex.Apply(input)
|
||||
|
||||
require.Empty(t, processed[0].Tags(), test.message, "Should not change tags")
|
||||
require.Equal(t, test.expectedFields, processed[0].Fields(), test.message)
|
||||
require.Equal(t, "access_log", processed[0].Name(), "Should not change name")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,63 +2,65 @@
|
|||
[[processors.regex]]
|
||||
namepass = ["nginx_requests"]
|
||||
|
||||
# Tag and field conversions defined in a separate sub-tables
|
||||
## Tag value conversion(s). Multiple instances are allowed.
|
||||
[[processors.regex.tags]]
|
||||
## Tag to change, "*" will change every tag
|
||||
## Tag(s) to process with optional glob expressions such as '*'.
|
||||
key = "resp_code"
|
||||
## Regular expression to match on a tag value
|
||||
## Regular expression to match the tag value. If the value doesn't
|
||||
## match the tag is ignored.
|
||||
pattern = "^(\\d)\\d\\d$"
|
||||
## Matches of the pattern will be replaced with this string. Use ${1}
|
||||
## notation to use the text of the first submatch.
|
||||
## Replacement expression defining the value of the target tag. You can
|
||||
## use regexp groups or named groups e.g. ${1} references the first group.
|
||||
replacement = "${1}xx"
|
||||
## Name of the target tag defaulting to 'key' if not specified.
|
||||
## In case of wildcards being used in `key` the currently processed
|
||||
## tag-name is used as target.
|
||||
# result_key = "method"
|
||||
## Appends the replacement to the target tag instead of overwriting it when
|
||||
## set to true.
|
||||
# append = false
|
||||
|
||||
## Field value conversion(s). Multiple instances are allowed.
|
||||
[[processors.regex.fields]]
|
||||
## Field to change
|
||||
## Field(s) to process with optional glob expressions such as '*'.
|
||||
key = "request"
|
||||
## All the power of the Go regular expressions available here
|
||||
## For example, named subgroups
|
||||
## Regular expression to match the field value. If the value doesn't
|
||||
## match or the field doesn't contain a string the field is ignored.
|
||||
pattern = "^/api(?P<method>/[\\w/]+)\\S*"
|
||||
## Replacement expression defining the value of the target field. You can
|
||||
## use regexp groups or named groups e.g. ${method} references the group
|
||||
## named "method".
|
||||
replacement = "${method}"
|
||||
## If result_key is present, a new field will be created
|
||||
## instead of changing existing field
|
||||
result_key = "method"
|
||||
## Name of the target field defaulting to 'key' if not specified.
|
||||
## In case of wildcards being used in `key` the currently processed
|
||||
## field-name is used as target.
|
||||
# result_key = "method"
|
||||
|
||||
# Multiple conversions may be applied for one field sequentially
|
||||
# Let's extract one more value
|
||||
[[processors.regex.fields]]
|
||||
key = "request"
|
||||
pattern = ".*category=(\\w+).*"
|
||||
replacement = "${1}"
|
||||
result_key = "search_category"
|
||||
|
||||
# Rename metric fields
|
||||
## Rename metric fields
|
||||
[[processors.regex.field_rename]]
|
||||
## Regular expression to match on a field name
|
||||
## Regular expression to match on the field name
|
||||
pattern = "^search_(\\w+)d$"
|
||||
## Matches of the pattern will be replaced with this string. Use ${1}
|
||||
## notation to use the text of the first submatch.
|
||||
## Replacement expression defining the name of the new field
|
||||
replacement = "${1}"
|
||||
## If the new field name already exists, you can either "overwrite" the
|
||||
## existing one with the value of the renamed field OR you can "keep"
|
||||
## both the existing and source field.
|
||||
# result_key = "keep"
|
||||
|
||||
# Rename metric tags
|
||||
# [[processors.regex.tag_rename]]
|
||||
# ## Regular expression to match on a tag name
|
||||
# pattern = "^search_(\\w+)d$"
|
||||
# ## Matches of the pattern will be replaced with this string. Use ${1}
|
||||
# ## notation to use the text of the first submatch.
|
||||
# replacement = "${1}"
|
||||
# ## If the new tag name already exists, you can either "overwrite" the
|
||||
# ## existing one with the value of the renamed tag OR you can "keep"
|
||||
# ## both the existing and source tag.
|
||||
# # result_key = "keep"
|
||||
## Rename metric tags
|
||||
[[processors.regex.tag_rename]]
|
||||
## Regular expression to match on a tag name
|
||||
pattern = "^search_(\\w+)d$"
|
||||
## Replacement expression defining the name of the new tag
|
||||
replacement = "${1}"
|
||||
## If the new tag name already exists, you can either "overwrite" the
|
||||
## existing one with the value of the renamed tag OR you can "keep"
|
||||
## both the existing and source tag.
|
||||
# result_key = "keep"
|
||||
|
||||
# Rename metrics
|
||||
# [[processors.regex.metric_rename]]
|
||||
# ## Regular expression to match on an metric name
|
||||
# pattern = "^search_(\\w+)d$"
|
||||
# ## Matches of the pattern will be replaced with this string. Use ${1}
|
||||
# ## notation to use the text of the first submatch.
|
||||
# replacement = "${1}"
|
||||
## Rename metrics
|
||||
[[processors.regex.metric_rename]]
|
||||
## Regular expression to match on an metric name
|
||||
pattern = "^search_(\\w+)d$"
|
||||
## Replacement expression defining the new name of the metric
|
||||
replacement = "${1}"
|
||||
|
|
|
|||
Loading…
Reference in New Issue