Allow globs (wildcards) in config for tags/fields in enum processor (#8598)

* Allow glob in enum processor config

* change assert to require
This commit is contained in:
Sebastian Spaink 2020-12-18 15:41:39 -06:00 committed by GitHub
parent 7bf8cdb8e3
commit 50265d9023
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 123 additions and 42 deletions

View File

@ -14,10 +14,10 @@ source tag or field is overwritten.
```toml ```toml
[[processors.enum]] [[processors.enum]]
[[processors.enum.mapping]] [[processors.enum.mapping]]
## Name of the field to map ## Name of the field to map. Globs accepted.
field = "status" field = "status"
## Name of the tag to map ## Name of the tag to map. Globs accepted.
# tag = "status" # tag = "status"
## Destination tag or field to be used for the mapped value. By default the ## Destination tag or field to be used for the mapped value. By default the

View File

@ -5,15 +5,16 @@ import (
"strconv" "strconv"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/plugins/processors" "github.com/influxdata/telegraf/plugins/processors"
) )
var sampleConfig = ` var sampleConfig = `
[[processors.enum.mapping]] [[processors.enum.mapping]]
## Name of the field to map ## Name of the field to map. Globs accepted.
field = "status" field = "status"
## Name of the tag to map ## Name of the tag to map. Globs accepted.
# tag = "status" # tag = "status"
## Destination tag or field to be used for the mapped value. By default the ## Destination tag or field to be used for the mapped value. By default the
@ -34,6 +35,9 @@ var sampleConfig = `
type EnumMapper struct { type EnumMapper struct {
Mappings []Mapping `toml:"mapping"` Mappings []Mapping `toml:"mapping"`
FieldFilters map[string]filter.Filter
TagFilters map[string]filter.Filter
} }
type Mapping struct { type Mapping struct {
@ -44,6 +48,29 @@ type Mapping struct {
ValueMappings map[string]interface{} ValueMappings map[string]interface{}
} }
func (mapper *EnumMapper) Init() error {
mapper.FieldFilters = make(map[string]filter.Filter)
mapper.TagFilters = make(map[string]filter.Filter)
for _, mapping := range mapper.Mappings {
if mapping.Field != "" {
fieldFilter, err := filter.NewIncludeExcludeFilter([]string{mapping.Field}, nil)
if err != nil {
return fmt.Errorf("Failed to create new field filter: %w", err)
}
mapper.FieldFilters[mapping.Field] = fieldFilter
}
if mapping.Tag != "" {
tagFilter, err := filter.NewIncludeExcludeFilter([]string{mapping.Tag}, nil)
if err != nil {
return fmt.Errorf("Failed to create new tag filter: %s", err)
}
mapper.TagFilters[mapping.Tag] = tagFilter
}
}
return nil
}
func (mapper *EnumMapper) SampleConfig() string { func (mapper *EnumMapper) SampleConfig() string {
return sampleConfig return sampleConfig
} }
@ -60,30 +87,56 @@ func (mapper *EnumMapper) Apply(in ...telegraf.Metric) []telegraf.Metric {
} }
func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric { func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric {
newFields := make(map[string]interface{})
newTags := make(map[string]string)
for _, mapping := range mapper.Mappings { for _, mapping := range mapper.Mappings {
if mapping.Field != "" { if mapping.Field != "" {
if originalValue, isPresent := metric.GetField(mapping.Field); isPresent { mapper.fieldMapping(metric, mapping, newFields)
if adjustedValue, isString := adjustValue(originalValue).(string); isString {
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent {
writeField(metric, mapping.getDestination(), mappedValue)
}
}
}
} }
if mapping.Tag != "" { if mapping.Tag != "" {
if originalValue, isPresent := metric.GetTag(mapping.Tag); isPresent { mapper.tagMapping(metric, mapping, newTags)
if mappedValue, isMappedValuePresent := mapping.mapValue(originalValue); isMappedValuePresent { }
}
for k, v := range newFields {
writeField(metric, k, v)
}
for k, v := range newTags {
writeTag(metric, k, v)
}
return metric
}
func (mapper *EnumMapper) fieldMapping(metric telegraf.Metric, mapping Mapping, newFields map[string]interface{}) {
fields := metric.FieldList()
for _, f := range fields {
if mapper.FieldFilters[mapping.Field].Match(f.Key) {
if adjustedValue, isString := adjustValue(f.Value).(string); isString {
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent {
newFields[mapping.getDestination(f.Key)] = mappedValue
}
}
}
}
}
func (mapper *EnumMapper) tagMapping(metric telegraf.Metric, mapping Mapping, newTags map[string]string) {
tags := metric.TagList()
for _, t := range tags {
if mapper.TagFilters[mapping.Tag].Match(t.Key) {
if mappedValue, isMappedValuePresent := mapping.mapValue(t.Value); isMappedValuePresent {
switch val := mappedValue.(type) { switch val := mappedValue.(type) {
case string: case string:
writeTag(metric, mapping.getDestinationTag(), val) newTags[mapping.getDestination(t.Key)] = val
default: default:
writeTag(metric, mapping.getDestinationTag(), fmt.Sprintf("%v", val)) newTags[mapping.getDestination(t.Key)] = fmt.Sprintf("%v", val)
} }
} }
} }
} }
}
return metric
} }
func adjustValue(in interface{}) interface{} { func adjustValue(in interface{}) interface{} {
@ -109,18 +162,11 @@ func (mapping *Mapping) mapValue(original string) (interface{}, bool) {
return original, false return original, false
} }
func (mapping *Mapping) getDestination() string { func (mapping *Mapping) getDestination(defaultDest string) string {
if mapping.Dest != "" { if mapping.Dest != "" {
return mapping.Dest return mapping.Dest
} }
return mapping.Field return defaultDest
}
func (mapping *Mapping) getDestinationTag() string {
if mapping.Dest != "" {
return mapping.Dest
}
return mapping.Tag
} }
func writeField(metric telegraf.Metric, name string, value interface{}) { func writeField(metric telegraf.Metric, name string, value interface{}) {

View File

@ -7,13 +7,18 @@ import (
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/metric"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func createTestMetric() telegraf.Metric { func createTestMetric() telegraf.Metric {
metric, _ := metric.New("m1", metric, _ := metric.New("m1",
map[string]string{"tag": "tag_value"}, map[string]string{
"tag": "tag_value",
"duplicate_tag": "tag_value",
},
map[string]interface{}{ map[string]interface{}{
"string_value": "test", "string_value": "test",
"duplicate_string_value": "test",
"int_value": int(200), "int_value": int(200),
"uint_value": uint(500), "uint_value": uint(500),
"float_value": float64(3.14), "float_value": float64(3.14),
@ -48,6 +53,8 @@ func assertTagValue(t *testing.T, expected interface{}, tag string, tags map[str
func TestRetainsMetric(t *testing.T) { func TestRetainsMetric(t *testing.T) {
mapper := EnumMapper{} mapper := EnumMapper{}
err := mapper.Init()
require.Nil(t, err)
source := createTestMetric() source := createTestMetric()
target := mapper.Apply(source)[0] target := mapper.Apply(source)[0]
@ -64,7 +71,8 @@ func TestRetainsMetric(t *testing.T) {
func TestMapsSingleStringValueTag(t *testing.T) { func TestMapsSingleStringValueTag(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Tag: "tag", ValueMappings: map[string]interface{}{"tag_value": "valuable"}}}} mapper := EnumMapper{Mappings: []Mapping{{Tag: "tag", ValueMappings: map[string]interface{}{"tag_value": "valuable"}}}}
err := mapper.Init()
require.Nil(t, err)
tags := calculateProcessedTags(mapper, createTestMetric()) tags := calculateProcessedTags(mapper, createTestMetric())
assertTagValue(t, "valuable", "tag", tags) assertTagValue(t, "valuable", "tag", tags)
@ -72,7 +80,8 @@ func TestMapsSingleStringValueTag(t *testing.T) {
func TestNoFailureOnMappingsOnNonSupportedValuedFields(t *testing.T) { func TestNoFailureOnMappingsOnNonSupportedValuedFields(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "float_value", ValueMappings: map[string]interface{}{"3.14": "pi"}}}} mapper := EnumMapper{Mappings: []Mapping{{Field: "float_value", ValueMappings: map[string]interface{}{"3.14": "pi"}}}}
err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric()) fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, float64(3.14), "float_value", fields) assertFieldValue(t, float64(3.14), "float_value", fields)
@ -110,6 +119,8 @@ func TestMappings(t *testing.T) {
field_name := mapping["field_name"][0].(string) field_name := mapping["field_name"][0].(string)
for index := range mapping["target_value"] { for index := range mapping["target_value"] {
mapper := EnumMapper{Mappings: []Mapping{{Field: field_name, ValueMappings: map[string]interface{}{mapping["target_value"][index].(string): mapping["mapped_value"][index]}}}} mapper := EnumMapper{Mappings: []Mapping{{Field: field_name, ValueMappings: map[string]interface{}{mapping["target_value"][index].(string): mapping["mapped_value"][index]}}}}
err := mapper.Init()
assert.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric()) fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, mapping["expected_value"][index], field_name, fields) assertFieldValue(t, mapping["expected_value"][index], field_name, fields)
} }
@ -118,7 +129,8 @@ func TestMappings(t *testing.T) {
func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) { func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}} mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}}
err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric()) fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 42, "string_value", fields) assertFieldValue(t, 42, "string_value", fields)
@ -126,7 +138,8 @@ func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) { func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}} mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}}
err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric()) fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, 1, "string_value", fields) assertFieldValue(t, 1, "string_value", fields)
@ -134,7 +147,8 @@ func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) { func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}} mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}}
err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric()) fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, "test", "string_value", fields) assertFieldValue(t, "test", "string_value", fields)
@ -142,7 +156,8 @@ func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
func TestWritesToDestination(t *testing.T) { func TestWritesToDestination(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}} mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric()) fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, "test", "string_value", fields) assertFieldValue(t, "test", "string_value", fields)
@ -152,10 +167,30 @@ func TestWritesToDestination(t *testing.T) {
func TestDoNotWriteToDestinationWithoutDefaultOrDefinedMapping(t *testing.T) { func TestDoNotWriteToDestinationWithoutDefaultOrDefinedMapping(t *testing.T) {
field := "string_code" field := "string_code"
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: field, ValueMappings: map[string]interface{}{"other": int64(1)}}}} mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: field, ValueMappings: map[string]interface{}{"other": int64(1)}}}}
err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric()) fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, "test", "string_value", fields) assertFieldValue(t, "test", "string_value", fields)
_, present := fields[field] _, present := fields[field]
assert.False(t, present, "value of field '"+field+"' was present") assert.False(t, present, "value of field '"+field+"' was present")
} }
func TestFieldGlobMatching(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Field: "*", ValueMappings: map[string]interface{}{"test": "glob"}}}}
err := mapper.Init()
require.Nil(t, err)
fields := calculateProcessedValues(mapper, createTestMetric())
assertFieldValue(t, "glob", "string_value", fields)
assertFieldValue(t, "glob", "duplicate_string_value", fields)
}
func TestTagGlobMatching(t *testing.T) {
mapper := EnumMapper{Mappings: []Mapping{{Tag: "*", ValueMappings: map[string]interface{}{"tag_value": "glob"}}}}
err := mapper.Init()
require.Nil(t, err)
tags := calculateProcessedTags(mapper, createTestMetric())
assertTagValue(t, "glob", "tag", tags)
}