feat(parsers.avro): Allow union fields to be specified as tags (#16272)
This commit is contained in:
parent
e01f5f77ce
commit
bcea9a28c0
|
|
@ -180,6 +180,41 @@ func (p *Parser) flattenField(fldName string, fldVal map[string]interface{}) map
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Parser) flattenItem(fld string, fldVal interface{}) (map[string]interface{}, error) {
|
||||||
|
sep := flatten.SeparatorStyle{
|
||||||
|
Before: "",
|
||||||
|
Middle: p.FieldSeparator,
|
||||||
|
After: "",
|
||||||
|
}
|
||||||
|
candidate := make(map[string]interface{})
|
||||||
|
candidate[fld] = fldVal
|
||||||
|
|
||||||
|
var flat map[string]interface{}
|
||||||
|
var err error
|
||||||
|
// Exactly how we flatten is decided by p.UnionMode
|
||||||
|
if p.UnionMode == "flatten" {
|
||||||
|
flat, err = flatten.Flatten(candidate, "", sep)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("flatten candidate %q failed: %w", candidate, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// "nullable" or "any"
|
||||||
|
typedVal, ok := candidate[fld].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
// the "key" is not a string, so ...
|
||||||
|
// most likely an array? Do the default thing
|
||||||
|
// and flatten the candidate.
|
||||||
|
flat, err = flatten.Flatten(candidate, "", sep)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("flatten candidate %q failed: %w", candidate, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
flat = p.flattenField(fld, typedVal)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return flat, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Parser) createMetric(data map[string]interface{}, schema string) (telegraf.Metric, error) {
|
func (p *Parser) createMetric(data map[string]interface{}, schema string) (telegraf.Metric, error) {
|
||||||
// Tags differ from fields, in that tags are inherently strings.
|
// Tags differ from fields, in that tags are inherently strings.
|
||||||
// fields can be of any type.
|
// fields can be of any type.
|
||||||
|
|
@ -193,12 +228,18 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg
|
||||||
// Avro doesn't have a Tag/Field distinction, so we have to tell
|
// Avro doesn't have a Tag/Field distinction, so we have to tell
|
||||||
// Telegraf which items are our tags.
|
// Telegraf which items are our tags.
|
||||||
for _, tag := range p.Tags {
|
for _, tag := range p.Tags {
|
||||||
sTag, err := internal.ToString(data[tag])
|
flat, flattenErr := p.flattenItem(tag, data[tag])
|
||||||
if err != nil {
|
if flattenErr != nil {
|
||||||
p.Log.Warnf("Could not convert %v to string for tag %q: %v", data[tag], tag, err)
|
return nil, fmt.Errorf("flatten tag %q failed: %w", tag, flattenErr)
|
||||||
continue
|
}
|
||||||
|
for k, v := range flat {
|
||||||
|
sTag, stringErr := internal.ToString(v)
|
||||||
|
if stringErr != nil {
|
||||||
|
p.Log.Warnf("Could not convert %v to string for tag %q: %v", data[tag], tag, stringErr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tags[k] = sTag
|
||||||
}
|
}
|
||||||
tags[tag] = sTag
|
|
||||||
}
|
}
|
||||||
var fieldList []string
|
var fieldList []string
|
||||||
if len(p.Fields) != 0 {
|
if len(p.Fields) != 0 {
|
||||||
|
|
@ -215,37 +256,8 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg
|
||||||
}
|
}
|
||||||
// We need to flatten out our fields. The default (the separator
|
// We need to flatten out our fields. The default (the separator
|
||||||
// string is empty) is equivalent to what streamreactor does.
|
// string is empty) is equivalent to what streamreactor does.
|
||||||
sep := flatten.SeparatorStyle{
|
|
||||||
Before: "",
|
|
||||||
Middle: p.FieldSeparator,
|
|
||||||
After: "",
|
|
||||||
}
|
|
||||||
for _, fld := range fieldList {
|
for _, fld := range fieldList {
|
||||||
candidate := make(map[string]interface{})
|
flat, err := p.flattenItem(fld, data[fld])
|
||||||
candidate[fld] = data[fld] // 1-item map
|
|
||||||
var flat map[string]interface{}
|
|
||||||
var err error
|
|
||||||
// Exactly how we flatten is decided by p.UnionMode
|
|
||||||
if p.UnionMode == "flatten" {
|
|
||||||
flat, err = flatten.Flatten(candidate, "", sep)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("flatten candidate %q failed: %w", candidate, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// "nullable" or "any"
|
|
||||||
typedVal, ok := candidate[fld].(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
// the "key" is not a string, so ...
|
|
||||||
// most likely an array? Do the default thing
|
|
||||||
// and flatten the candidate.
|
|
||||||
flat, err = flatten.Flatten(candidate, "", sep)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("flatten candidate %q failed: %w", candidate, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
flat = p.flattenField(fld, typedVal)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("flatten field %q failed: %w", fld, err)
|
return nil, fmt.Errorf("flatten field %q failed: %w", fld, err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A,some_union_in_a_tag=some_value statistics_collection_time=1682509200092i,up_time=1166984904i,memory_utilization=20.0 1682509200092000
|
||||||
|
|
@ -0,0 +1,14 @@
|
||||||
|
{
|
||||||
|
"some_union_in_a_tag": {
|
||||||
|
"string": "some_value"
|
||||||
|
},
|
||||||
|
"switch_wwn": "10:00:50:EB:1A:0B:84:3A",
|
||||||
|
"statistics_collection_time": 1682509200092,
|
||||||
|
"up_time": 1166984904,
|
||||||
|
"cpu_utilization": {
|
||||||
|
"null": null
|
||||||
|
},
|
||||||
|
"memory_utilization": {
|
||||||
|
"float": 20.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
[[ inputs.file ]]
|
||||||
|
files = ["./testcases/union-nullable-tag/message.json"]
|
||||||
|
data_format = "avro"
|
||||||
|
|
||||||
|
avro_format = "json"
|
||||||
|
avro_measurement = "Switch"
|
||||||
|
avro_tags = ["switch_wwn", "some_union_in_a_tag"]
|
||||||
|
avro_fields = ["up_time", "cpu_utilization", "memory_utilization", "statistics_collection_time"]
|
||||||
|
avro_timestamp = "statistics_collection_time"
|
||||||
|
avro_timestamp_format = "unix_ms"
|
||||||
|
avro_union_mode = "nullable"
|
||||||
|
avro_schema = '''
|
||||||
|
{
|
||||||
|
"namespace": "com.brocade.streaming",
|
||||||
|
"name": "fibrechannel_switch_statistics",
|
||||||
|
"type": "record",
|
||||||
|
"version": "1",
|
||||||
|
"fields": [
|
||||||
|
{"name": "some_union_in_a_tag", "type": ["null", "string"], "default": null, "doc": "Some union that is used in a tag"},
|
||||||
|
{"name": "switch_wwn", "type": "string", "doc": "WWN of the Physical Switch."},
|
||||||
|
{"name": "statistics_collection_time", "type": "long", "doc": "Epoch time when statistics is collected."},
|
||||||
|
{"name": "up_time", "type": "long", "doc": "Switch Up Time (in hundredths of a second)"},
|
||||||
|
{"name": "cpu_utilization", "type": ["null","float"], "default": null, "doc": "CPU Utilization in %"},
|
||||||
|
{"name": "memory_utilization", "type": ["null", "float"], "default": null, "doc": "Memory Utilization in %"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
'''
|
||||||
Loading…
Reference in New Issue