feat(parsers.avro): Support multiple modes for union handling (#13945)
This commit is contained in:
parent
705176b8d5
commit
cb13577c95
|
|
@ -107,6 +107,12 @@ the error will be rised and the message will not be parsed.
|
||||||
## If this were set to "_", then it would be a_0="a", a_1="b".
|
## If this were set to "_", then it would be a_0="a", a_1="b".
|
||||||
# avro_field_separator = "_"
|
# avro_field_separator = "_"
|
||||||
|
|
||||||
|
## Define handling of union types. Possible values are:
|
||||||
|
## flatten -- add type suffix to field name (default)
|
||||||
|
## nullable -- do not modify field name but discard "null" field values
|
||||||
|
## any -- do not modify field name and set field value to the received type
|
||||||
|
# avro_union_mode = "flatten"
|
||||||
|
|
||||||
## Default values for given tags: optional
|
## Default values for given tags: optional
|
||||||
# tags = { "application": "hermes", "region": "central" }
|
# tags = { "application": "hermes", "region": "central" }
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -36,10 +36,10 @@ type Parser struct {
|
||||||
Timestamp string `toml:"avro_timestamp"`
|
Timestamp string `toml:"avro_timestamp"`
|
||||||
TimestampFormat string `toml:"avro_timestamp_format"`
|
TimestampFormat string `toml:"avro_timestamp_format"`
|
||||||
FieldSeparator string `toml:"avro_field_separator"`
|
FieldSeparator string `toml:"avro_field_separator"`
|
||||||
|
UnionMode string `toml:"avro_union_mode"`
|
||||||
DefaultTags map[string]string `toml:"tags"`
|
DefaultTags map[string]string `toml:"tags"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
registryObj *schemaRegistry
|
||||||
registryObj *schemaRegistry
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Parser) Init() error {
|
func (p *Parser) Init() error {
|
||||||
|
|
@ -51,6 +51,14 @@ func (p *Parser) Init() error {
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unknown 'avro_format' %q", p.Format)
|
return fmt.Errorf("unknown 'avro_format' %q", p.Format)
|
||||||
}
|
}
|
||||||
|
switch p.UnionMode {
|
||||||
|
case "":
|
||||||
|
p.UnionMode = "flatten"
|
||||||
|
case "flatten", "nullable", "any":
|
||||||
|
// Do nothing as those are valid settings
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown avro_union_mode %q", p.Format)
|
||||||
|
}
|
||||||
|
|
||||||
if (p.Schema == "" && p.SchemaRegistry == "") || (p.Schema != "" && p.SchemaRegistry != "") {
|
if (p.Schema == "" && p.SchemaRegistry == "") || (p.Schema != "" && p.SchemaRegistry != "") {
|
||||||
return errors.New("exactly one of 'schema_registry' or 'schema' must be specified")
|
return errors.New("exactly one of 'schema_registry' or 'schema' must be specified")
|
||||||
|
|
@ -153,6 +161,25 @@ func (p *Parser) SetDefaultTags(tags map[string]string) {
|
||||||
p.DefaultTags = tags
|
p.DefaultTags = tags
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Parser) flattenField(fldName string, fldVal map[string]interface{}) map[string]interface{} {
|
||||||
|
// Helper function for the "nullable" and "any" p.UnionModes
|
||||||
|
// fldVal is a one-item map of string-to-something
|
||||||
|
ret := make(map[string]interface{})
|
||||||
|
if p.UnionMode == "nullable" {
|
||||||
|
_, ok := fldVal["null"]
|
||||||
|
if ok {
|
||||||
|
return ret // Return the empty map
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Otherwise, we just return the value in the fieldname.
|
||||||
|
// See README.md for an important warning about "any" and "nullable".
|
||||||
|
for _, v := range fldVal {
|
||||||
|
ret[fldName] = v
|
||||||
|
break // Not really needed, since it's a one-item map
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Parser) createMetric(data map[string]interface{}, schema string) (telegraf.Metric, error) {
|
func (p *Parser) createMetric(data map[string]interface{}, schema string) (telegraf.Metric, error) {
|
||||||
// Tags differ from fields, in that tags are inherently strings.
|
// Tags differ from fields, in that tags are inherently strings.
|
||||||
// fields can be of any type.
|
// fields can be of any type.
|
||||||
|
|
@ -196,7 +223,29 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg
|
||||||
for _, fld := range fieldList {
|
for _, fld := range fieldList {
|
||||||
candidate := make(map[string]interface{})
|
candidate := make(map[string]interface{})
|
||||||
candidate[fld] = data[fld] // 1-item map
|
candidate[fld] = data[fld] // 1-item map
|
||||||
flat, err := flatten.Flatten(candidate, "", sep)
|
var flat map[string]interface{}
|
||||||
|
var err error
|
||||||
|
// Exactly how we flatten is decided by p.UnionMode
|
||||||
|
if p.UnionMode == "flatten" {
|
||||||
|
flat, err = flatten.Flatten(candidate, "", sep)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("flatten candidate %q failed: %w", candidate, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// "nullable" or "any"
|
||||||
|
typedVal, ok := candidate[fld].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
// the "key" is not a string, so ...
|
||||||
|
// most likely an array? Do the default thing
|
||||||
|
// and flatten the candidate.
|
||||||
|
flat, err = flatten.Flatten(candidate, "", sep)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("flatten candidate %q failed: %w", candidate, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
flat = p.flattenField(fld, typedVal)
|
||||||
|
}
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("flatten field %q failed: %w", fld, err)
|
return nil, fmt.Errorf("flatten field %q failed: %w", fld, err)
|
||||||
}
|
}
|
||||||
|
|
@ -204,7 +253,6 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg
|
||||||
fields[k] = v
|
fields[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var schemaObj map[string]interface{}
|
var schemaObj map[string]interface{}
|
||||||
if err := json.Unmarshal([]byte(schema), &schemaObj); err != nil {
|
if err := json.Unmarshal([]byte(schema), &schemaObj); err != nil {
|
||||||
return nil, fmt.Errorf("unmarshaling schema failed: %w", err)
|
return nil, fmt.Errorf("unmarshaling schema failed: %w", err)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
array,name=pi data_0=3,data_1=3.0999999046325684,data_2=3.140000104904175,data_3=3.1410000324249268 1682509200092000
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
{
|
||||||
|
"statistics_collection_time": 1682509200092,
|
||||||
|
"data": [ 3, 3.1, 3.14, 3.141 ],
|
||||||
|
"name": "pi"
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
[[ inputs.file ]]
|
||||||
|
files = ["./testdata/json-array/message.json"]
|
||||||
|
data_format = "avro"
|
||||||
|
|
||||||
|
avro_format = "json"
|
||||||
|
avro_measurement = "array"
|
||||||
|
avro_tags = ["name"]
|
||||||
|
avro_timestamp = "statistics_collection_time"
|
||||||
|
avro_timestamp_format = "unix_ms"
|
||||||
|
avro_fields = ["data"]
|
||||||
|
avro_field_separator = "_"
|
||||||
|
avro_schema = '''
|
||||||
|
{
|
||||||
|
"namespace": "constants",
|
||||||
|
"name": "classical",
|
||||||
|
"type": "record",
|
||||||
|
"version": "1",
|
||||||
|
"fields": [
|
||||||
|
{"name": "name", "type": "string"},
|
||||||
|
{"name": "data", "type": "array", "items": "float"},
|
||||||
|
{"name": "statistics_collection_time", "type": "long"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
'''
|
||||||
|
|
@ -1 +1 @@
|
||||||
Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A statistics_collection_time=1682509200092i,up_time=1166984904i,cpu_utilization=14.0,memory_utilization=20.0 1682509200092000
|
Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A up_time=1166984904i,cpu_utilization=14.0,memory_utilization=20.0 1682509200092000
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@
|
||||||
avro_format = "json"
|
avro_format = "json"
|
||||||
avro_measurement = "Switch"
|
avro_measurement = "Switch"
|
||||||
avro_tags = ["switch_wwn"]
|
avro_tags = ["switch_wwn"]
|
||||||
avro_fields = ["up_time", "cpu_utilization", "memory_utilization", "statistics_collection_time"]
|
avro_fields = ["up_time", "cpu_utilization", "memory_utilization"]
|
||||||
avro_timestamp = "statistics_collection_time"
|
avro_timestamp = "statistics_collection_time"
|
||||||
avro_timestamp_format = "unix_ms"
|
avro_timestamp_format = "unix_ms"
|
||||||
avro_schema = '''
|
avro_schema = '''
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A statistics_collection_time=1682509200092i,up_time=1166984904i,cpu_utilization=11i,memory_utilization=20.0 1682509200092000
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
{
|
||||||
|
"switch_wwn": "10:00:50:EB:1A:0B:84:3A",
|
||||||
|
"statistics_collection_time": 1682509200092,
|
||||||
|
"up_time": 1166984904,
|
||||||
|
"cpu_utilization": {
|
||||||
|
"int": 11
|
||||||
|
},
|
||||||
|
"memory_utilization": {
|
||||||
|
"float": 20.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,26 @@
|
||||||
|
[[ inputs.file ]]
|
||||||
|
files = ["./testdata/union-any/message.json"]
|
||||||
|
data_format = "avro"
|
||||||
|
|
||||||
|
avro_format = "json"
|
||||||
|
avro_measurement = "Switch"
|
||||||
|
avro_tags = ["switch_wwn"]
|
||||||
|
avro_fields = ["up_time", "cpu_utilization", "memory_utilization", "statistics_collection_time"]
|
||||||
|
avro_timestamp = "statistics_collection_time"
|
||||||
|
avro_timestamp_format = "unix_ms"
|
||||||
|
avro_union_mode = "any"
|
||||||
|
avro_schema = '''
|
||||||
|
{
|
||||||
|
"namespace": "com.brocade.streaming",
|
||||||
|
"name": "fibrechannel_switch_statistics",
|
||||||
|
"type": "record",
|
||||||
|
"version": "1",
|
||||||
|
"fields": [
|
||||||
|
{"name": "switch_wwn", "type": "string", "doc": "WWN of the Physical Switch."},
|
||||||
|
{"name": "statistics_collection_time", "type": "long", "doc": "Epoch time when statistics is collected."},
|
||||||
|
{"name": "up_time", "type": "long", "doc": "Switch Up Time (in hundredths of a second)"},
|
||||||
|
{"name": "cpu_utilization", "type": ["null", "float", "int"], "default": null, "doc": "CPU Utilization in %"},
|
||||||
|
{"name": "memory_utilization", "type": ["null", "float"], "doc": "Memory Utilization in %"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
'''
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
array,name=pi data_0=3,data_1=3.0999999046325684,data_2=3.140000104904175,data_3=3.1410000324249268 1682509200092000
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
{
|
||||||
|
"statistics_collection_time": 1682509200092,
|
||||||
|
"data": [ 3, 3.1, 3.14, 3.141 ],
|
||||||
|
"name": "pi"
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,25 @@
|
||||||
|
[[ inputs.file ]]
|
||||||
|
files = ["./testdata/union-array/message.json"]
|
||||||
|
data_format = "avro"
|
||||||
|
|
||||||
|
avro_format = "json"
|
||||||
|
avro_measurement = "array"
|
||||||
|
avro_tags = ["name"]
|
||||||
|
avro_timestamp = "statistics_collection_time"
|
||||||
|
avro_timestamp_format = "unix_ms"
|
||||||
|
avro_fields = ["data"]
|
||||||
|
avro_union_mode = "any"
|
||||||
|
avro_field_separator = "_"
|
||||||
|
avro_schema = '''
|
||||||
|
{
|
||||||
|
"namespace": "constants",
|
||||||
|
"name": "classical",
|
||||||
|
"type": "record",
|
||||||
|
"version": "1",
|
||||||
|
"fields": [
|
||||||
|
{"name": "name", "type": "string"},
|
||||||
|
{"name": "data", "type": "array", "items": "float"},
|
||||||
|
{"name": "statistics_collection_time", "type": "long"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
'''
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A statistics_collection_time=1682509200092i,up_time=1166984904i,memory_utilization=20.0 1682509200092000
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
{
|
||||||
|
"switch_wwn": "10:00:50:EB:1A:0B:84:3A",
|
||||||
|
"statistics_collection_time": 1682509200092,
|
||||||
|
"up_time": 1166984904,
|
||||||
|
"cpu_utilization": {
|
||||||
|
"null": null
|
||||||
|
},
|
||||||
|
"memory_utilization": {
|
||||||
|
"float": 20.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,26 @@
|
||||||
|
[[ inputs.file ]]
|
||||||
|
files = ["./testdata/union-nullable/message.json"]
|
||||||
|
data_format = "avro"
|
||||||
|
|
||||||
|
avro_format = "json"
|
||||||
|
avro_measurement = "Switch"
|
||||||
|
avro_tags = ["switch_wwn"]
|
||||||
|
avro_fields = ["up_time", "cpu_utilization", "memory_utilization", "statistics_collection_time"]
|
||||||
|
avro_timestamp = "statistics_collection_time"
|
||||||
|
avro_timestamp_format = "unix_ms"
|
||||||
|
avro_union_mode = "nullable"
|
||||||
|
avro_schema = '''
|
||||||
|
{
|
||||||
|
"namespace": "com.brocade.streaming",
|
||||||
|
"name": "fibrechannel_switch_statistics",
|
||||||
|
"type": "record",
|
||||||
|
"version": "1",
|
||||||
|
"fields": [
|
||||||
|
{"name": "switch_wwn", "type": "string", "doc": "WWN of the Physical Switch."},
|
||||||
|
{"name": "statistics_collection_time", "type": "long", "doc": "Epoch time when statistics is collected."},
|
||||||
|
{"name": "up_time", "type": "long", "doc": "Switch Up Time (in hundredths of a second)"},
|
||||||
|
{"name": "cpu_utilization", "type": ["null","float"], "default": null, "doc": "CPU Utilization in %"},
|
||||||
|
{"name": "memory_utilization", "type": ["null", "float"], "default": null, "doc": "Memory Utilization in %"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
'''
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A statistics_collection_time=1682509200092i,up_time=1166984904i,memory_utilization_float=20.0 1682509200092000
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
{
|
||||||
|
"switch_wwn": "10:00:50:EB:1A:0B:84:3A",
|
||||||
|
"statistics_collection_time": 1682509200092,
|
||||||
|
"up_time": 1166984904,
|
||||||
|
"cpu_utilization": {
|
||||||
|
"null": null
|
||||||
|
},
|
||||||
|
"memory_utilization": {
|
||||||
|
"float": 20.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,26 @@
|
||||||
|
[[ inputs.file ]]
|
||||||
|
files = ["./testdata/union/message.json"]
|
||||||
|
data_format = "avro"
|
||||||
|
|
||||||
|
avro_format = "json"
|
||||||
|
avro_measurement = "Switch"
|
||||||
|
avro_tags = ["switch_wwn"]
|
||||||
|
avro_fields = ["up_time", "cpu_utilization", "memory_utilization", "statistics_collection_time"]
|
||||||
|
avro_timestamp = "statistics_collection_time"
|
||||||
|
avro_timestamp_format = "unix_ms"
|
||||||
|
avro_field_separator = "_"
|
||||||
|
avro_schema = '''
|
||||||
|
{
|
||||||
|
"namespace": "com.brocade.streaming",
|
||||||
|
"name": "fibrechannel_switch_statistics",
|
||||||
|
"type": "record",
|
||||||
|
"version": "1",
|
||||||
|
"fields": [
|
||||||
|
{"name": "switch_wwn", "type": "string", "doc": "WWN of the Physical Switch."},
|
||||||
|
{"name": "statistics_collection_time", "type": "long", "doc": "Epoch time when statistics is collected."},
|
||||||
|
{"name": "up_time", "type": "long", "doc": "Switch Up Time (in hundredths of a second)"},
|
||||||
|
{"name": "cpu_utilization", "type": ["null", "float"], "default": null, "doc": "CPU Utilization in %"},
|
||||||
|
{"name": "memory_utilization", "type": ["null", "float"], "default": null, "doc": "Memory Utilization in %"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
'''
|
||||||
Loading…
Reference in New Issue