feat(processors.enum): Allow mapping to be applied to multiple fields (#16030)
This commit is contained in:
parent
b100c3a185
commit
8400b6a640
|
|
@ -24,8 +24,8 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
# Map enum values according to given table.
|
||||
[[processors.enum]]
|
||||
[[processors.enum.mapping]]
|
||||
## Name of the field to map. Globs accepted.
|
||||
field = "status"
|
||||
## Names of the fields to map. Globs accepted.
|
||||
fields = ["status"]
|
||||
|
||||
## Name of the tag to map. Globs accepted.
|
||||
# tag = "status"
|
||||
|
|
|
|||
|
|
@ -15,17 +15,19 @@ import (
|
|||
var sampleConfig string
|
||||
|
||||
type EnumMapper struct {
|
||||
Mappings []Mapping `toml:"mapping"`
|
||||
|
||||
FieldFilters map[string]filter.Filter
|
||||
TagFilters map[string]filter.Filter
|
||||
Mappings []*Mapping `toml:"mapping"`
|
||||
}
|
||||
|
||||
type Mapping struct {
|
||||
Tag string
|
||||
Field string
|
||||
Dest string
|
||||
Default interface{}
|
||||
Tag string `toml:"tag"`
|
||||
Field string `toml:"field" deprecated:"1.35.0;1.40.0;use 'fields' instead"`
|
||||
Fields []string `toml:"fields"`
|
||||
Dest string `toml:"dest"`
|
||||
Default interface{} `toml:"default"`
|
||||
|
||||
fieldFilter filter.Filter
|
||||
tagFilter filter.Filter
|
||||
|
||||
ValueMappings map[string]interface{}
|
||||
}
|
||||
|
||||
|
|
@ -34,22 +36,24 @@ func (*EnumMapper) SampleConfig() string {
|
|||
}
|
||||
|
||||
func (mapper *EnumMapper) Init() error {
|
||||
mapper.FieldFilters = make(map[string]filter.Filter)
|
||||
mapper.TagFilters = make(map[string]filter.Filter)
|
||||
for _, mapping := range mapper.Mappings {
|
||||
// Handle deprecated field option
|
||||
if mapping.Field != "" {
|
||||
fieldFilter, err := filter.NewIncludeExcludeFilter([]string{mapping.Field}, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create new field filter: %w", err)
|
||||
}
|
||||
mapper.FieldFilters[mapping.Field] = fieldFilter
|
||||
mapping.Fields = append(mapping.Fields, mapping.Field)
|
||||
}
|
||||
|
||||
fieldFilter, err := filter.Compile(mapping.Fields)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create new field filter: %w", err)
|
||||
}
|
||||
mapping.fieldFilter = fieldFilter
|
||||
|
||||
if mapping.Tag != "" {
|
||||
tagFilter, err := filter.NewIncludeExcludeFilter([]string{mapping.Tag}, nil)
|
||||
tagFilter, err := filter.Compile([]string{mapping.Tag})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create new tag filter: %w", err)
|
||||
}
|
||||
mapper.TagFilters[mapping.Tag] = tagFilter
|
||||
mapping.tagFilter = tagFilter
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -68,11 +72,11 @@ func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric
|
|||
newTags := make(map[string]string)
|
||||
|
||||
for _, mapping := range mapper.Mappings {
|
||||
if mapping.Field != "" {
|
||||
mapper.fieldMapping(metric, mapping, newFields)
|
||||
if mapping.fieldFilter != nil {
|
||||
fieldMapping(metric, mapping, newFields)
|
||||
}
|
||||
if mapping.Tag != "" {
|
||||
mapper.tagMapping(metric, mapping, newTags)
|
||||
if mapping.tagFilter != nil {
|
||||
tagMapping(metric, mapping, newTags)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -87,30 +91,32 @@ func (mapper *EnumMapper) applyMappings(metric telegraf.Metric) telegraf.Metric
|
|||
return metric
|
||||
}
|
||||
|
||||
func (mapper *EnumMapper) fieldMapping(metric telegraf.Metric, mapping Mapping, newFields map[string]interface{}) {
|
||||
func fieldMapping(metric telegraf.Metric, mapping *Mapping, newFields map[string]interface{}) {
|
||||
fields := metric.FieldList()
|
||||
for _, f := range fields {
|
||||
if mapper.FieldFilters[mapping.Field].Match(f.Key) {
|
||||
if adjustedValue, isString := adjustValue(f.Value).(string); isString {
|
||||
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent {
|
||||
newFields[mapping.getDestination(f.Key)] = mappedValue
|
||||
}
|
||||
if !mapping.fieldFilter.Match(f.Key) {
|
||||
continue
|
||||
}
|
||||
if adjustedValue, isString := adjustValue(f.Value).(string); isString {
|
||||
if mappedValue, isMappedValuePresent := mapping.mapValue(adjustedValue); isMappedValuePresent {
|
||||
newFields[mapping.getDestination(f.Key)] = mappedValue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mapper *EnumMapper) tagMapping(metric telegraf.Metric, mapping Mapping, newTags map[string]string) {
|
||||
func tagMapping(metric telegraf.Metric, mapping *Mapping, newTags map[string]string) {
|
||||
tags := metric.TagList()
|
||||
for _, t := range tags {
|
||||
if mapper.TagFilters[mapping.Tag].Match(t.Key) {
|
||||
if mappedValue, isMappedValuePresent := mapping.mapValue(t.Value); isMappedValuePresent {
|
||||
switch val := mappedValue.(type) {
|
||||
case string:
|
||||
newTags[mapping.getDestination(t.Key)] = val
|
||||
default:
|
||||
newTags[mapping.getDestination(t.Key)] = fmt.Sprintf("%v", val)
|
||||
}
|
||||
if !mapping.tagFilter.Match(t.Key) {
|
||||
continue
|
||||
}
|
||||
if mappedValue, isMappedValuePresent := mapping.mapValue(t.Value); isMappedValuePresent {
|
||||
switch val := mappedValue.(type) {
|
||||
case string:
|
||||
newTags[mapping.getDestination(t.Key)] = val
|
||||
default:
|
||||
newTags[mapping.getDestination(t.Key)] = fmt.Sprintf("%v", val)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -71,7 +71,10 @@ func TestRetainsMetric(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestMapsSingleStringValueTag(t *testing.T) {
|
||||
mapper := EnumMapper{Mappings: []Mapping{{Tag: "tag", ValueMappings: map[string]interface{}{"tag_value": "valuable"}}}}
|
||||
mapper := EnumMapper{Mappings: []*Mapping{{
|
||||
Tag: "tag",
|
||||
ValueMappings: map[string]interface{}{"tag_value": "valuable"},
|
||||
}}}
|
||||
err := mapper.Init()
|
||||
require.NoError(t, err)
|
||||
tags := calculateProcessedTags(mapper, createTestMetric())
|
||||
|
|
@ -117,8 +120,13 @@ func TestMappings(t *testing.T) {
|
|||
fieldName := mapping["field_name"][0].(string)
|
||||
for index := range mapping["target_value"] {
|
||||
mapper := EnumMapper{
|
||||
Mappings: []Mapping{
|
||||
{Field: fieldName, ValueMappings: map[string]interface{}{mapping["target_value"][index].(string): mapping["mapped_value"][index]}},
|
||||
Mappings: []*Mapping{
|
||||
{
|
||||
Fields: []string{fieldName},
|
||||
ValueMappings: map[string]interface{}{
|
||||
mapping["target_value"][index].(string): mapping["mapped_value"][index],
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := mapper.Init()
|
||||
|
|
@ -130,7 +138,11 @@ func TestMappings(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
|
||||
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
||||
mapper := EnumMapper{Mappings: []*Mapping{{
|
||||
Fields: []string{"string_value"},
|
||||
Default: int64(42),
|
||||
ValueMappings: map[string]interface{}{"other": int64(1)},
|
||||
}}}
|
||||
err := mapper.Init()
|
||||
require.NoError(t, err)
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
|
@ -139,7 +151,11 @@ func TestMapsToDefaultValueOnUnknownSourceValue(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
|
||||
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Default: int64(42), ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||
mapper := EnumMapper{Mappings: []*Mapping{{
|
||||
Fields: []string{"string_value"},
|
||||
Default: int64(42),
|
||||
ValueMappings: map[string]interface{}{"test": int64(1)},
|
||||
}}}
|
||||
err := mapper.Init()
|
||||
require.NoError(t, err)
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
|
@ -148,7 +164,10 @@ func TestDoNotMapToDefaultValueKnownSourceValue(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
|
||||
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
||||
mapper := EnumMapper{Mappings: []*Mapping{{
|
||||
Fields: []string{"string_value"},
|
||||
ValueMappings: map[string]interface{}{"other": int64(1)},
|
||||
}}}
|
||||
err := mapper.Init()
|
||||
require.NoError(t, err)
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
|
@ -157,7 +176,11 @@ func TestNoMappingWithoutDefaultOrDefinedMappingValue(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWritesToDestination(t *testing.T) {
|
||||
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: "string_code", ValueMappings: map[string]interface{}{"test": int64(1)}}}}
|
||||
mapper := EnumMapper{Mappings: []*Mapping{{
|
||||
Fields: []string{"string_value"},
|
||||
Dest: "string_code",
|
||||
ValueMappings: map[string]interface{}{"test": int64(1)},
|
||||
}}}
|
||||
err := mapper.Init()
|
||||
require.NoError(t, err)
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
|
@ -168,7 +191,11 @@ func TestWritesToDestination(t *testing.T) {
|
|||
|
||||
func TestDoNotWriteToDestinationWithoutDefaultOrDefinedMapping(t *testing.T) {
|
||||
field := "string_code"
|
||||
mapper := EnumMapper{Mappings: []Mapping{{Field: "string_value", Dest: field, ValueMappings: map[string]interface{}{"other": int64(1)}}}}
|
||||
mapper := EnumMapper{Mappings: []*Mapping{{
|
||||
Fields: []string{"string_value"},
|
||||
Dest: field,
|
||||
ValueMappings: map[string]interface{}{"other": int64(1)},
|
||||
}}}
|
||||
err := mapper.Init()
|
||||
require.NoError(t, err)
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
|
@ -178,8 +205,23 @@ func TestDoNotWriteToDestinationWithoutDefaultOrDefinedMapping(t *testing.T) {
|
|||
require.False(t, present, "value of field '"+field+"' was present")
|
||||
}
|
||||
|
||||
func TestMultipleFields(t *testing.T) {
|
||||
mapper := EnumMapper{Mappings: []*Mapping{{
|
||||
Fields: []string{"string_value", "duplicate_string_value"},
|
||||
ValueMappings: map[string]interface{}{"test": "multiple"},
|
||||
}}}
|
||||
require.NoError(t, mapper.Init())
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
||||
assertFieldValue(t, "multiple", "string_value", fields)
|
||||
assertFieldValue(t, "multiple", "duplicate_string_value", fields)
|
||||
}
|
||||
|
||||
func TestFieldGlobMatching(t *testing.T) {
|
||||
mapper := EnumMapper{Mappings: []Mapping{{Field: "*", ValueMappings: map[string]interface{}{"test": "glob"}}}}
|
||||
mapper := EnumMapper{Mappings: []*Mapping{{
|
||||
Fields: []string{"*"},
|
||||
ValueMappings: map[string]interface{}{"test": "glob"},
|
||||
}}}
|
||||
err := mapper.Init()
|
||||
require.NoError(t, err)
|
||||
fields := calculateProcessedValues(mapper, createTestMetric())
|
||||
|
|
@ -189,7 +231,10 @@ func TestFieldGlobMatching(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTagGlobMatching(t *testing.T) {
|
||||
mapper := EnumMapper{Mappings: []Mapping{{Tag: "*", ValueMappings: map[string]interface{}{"tag_value": "glob"}}}}
|
||||
mapper := EnumMapper{Mappings: []*Mapping{{
|
||||
Tag: "*",
|
||||
ValueMappings: map[string]interface{}{"tag_value": "glob"},
|
||||
}}}
|
||||
err := mapper.Init()
|
||||
require.NoError(t, err)
|
||||
tags := calculateProcessedTags(mapper, createTestMetric())
|
||||
|
|
@ -197,6 +242,36 @@ func TestTagGlobMatching(t *testing.T) {
|
|||
assertTagValue(t, "glob", "tag", tags)
|
||||
}
|
||||
|
||||
func TestCollidingValueMappings(t *testing.T) {
|
||||
mapper := EnumMapper{Mappings: []*Mapping{
|
||||
{
|
||||
Fields: []string{"status"},
|
||||
ValueMappings: map[string]interface{}{"green": 1, "amber": 2, "red": 3},
|
||||
},
|
||||
{
|
||||
Fields: []string{"status_reverse"},
|
||||
ValueMappings: map[string]interface{}{"green": 3, "amber": 2, "red": 1},
|
||||
},
|
||||
}}
|
||||
require.NoError(t, mapper.Init())
|
||||
|
||||
input := metric.New("m1",
|
||||
map[string]string{
|
||||
"tag": "tag_value",
|
||||
},
|
||||
map[string]interface{}{
|
||||
"status": "green",
|
||||
"status_reverse": "green",
|
||||
},
|
||||
time.Now(),
|
||||
)
|
||||
|
||||
output := mapper.Apply(input)[0]
|
||||
fields := output.Fields()
|
||||
assertFieldValue(t, int64(1), "status", fields)
|
||||
assertFieldValue(t, int64(3), "status_reverse", fields)
|
||||
}
|
||||
|
||||
func TestTracking(t *testing.T) {
|
||||
m := createTestMetric()
|
||||
var delivered bool
|
||||
|
|
@ -205,7 +280,10 @@ func TestTracking(t *testing.T) {
|
|||
}
|
||||
m, _ = metric.WithTracking(m, notify)
|
||||
|
||||
mapper := EnumMapper{Mappings: []Mapping{{Tag: "*", ValueMappings: map[string]interface{}{"tag_value": "glob"}}}}
|
||||
mapper := EnumMapper{Mappings: []*Mapping{{
|
||||
Tag: "*",
|
||||
ValueMappings: map[string]interface{}{"tag_value": "glob"},
|
||||
}}}
|
||||
err := mapper.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
# Map enum values according to given table.
|
||||
[[processors.enum]]
|
||||
[[processors.enum.mapping]]
|
||||
## Name of the field to map. Globs accepted.
|
||||
field = "status"
|
||||
## Names of the fields to map. Globs accepted.
|
||||
fields = ["status"]
|
||||
|
||||
## Name of the tag to map. Globs accepted.
|
||||
# tag = "status"
|
||||
|
|
|
|||
Loading…
Reference in New Issue