diff --git a/plugins/parsers/avro/README.md b/plugins/parsers/avro/README.md index 0a56fa511..5f1f60249 100644 --- a/plugins/parsers/avro/README.md +++ b/plugins/parsers/avro/README.md @@ -107,6 +107,12 @@ the error will be rised and the message will not be parsed. ## If this were set to "_", then it would be a_0="a", a_1="b". # avro_field_separator = "_" + ## Define handling of union types. Possible values are: + ## flatten -- add type suffix to field name (default) + ## nullable -- do not modify field name but discard "null" field values + ## any -- do not modify field name and set field value to the received type + # avro_union_mode = "flatten" + ## Default values for given tags: optional # tags = { "application": "hermes", "region": "central" } diff --git a/plugins/parsers/avro/parser.go b/plugins/parsers/avro/parser.go index b43c0737a..2fbbd4888 100644 --- a/plugins/parsers/avro/parser.go +++ b/plugins/parsers/avro/parser.go @@ -36,10 +36,10 @@ type Parser struct { Timestamp string `toml:"avro_timestamp"` TimestampFormat string `toml:"avro_timestamp_format"` FieldSeparator string `toml:"avro_field_separator"` + UnionMode string `toml:"avro_union_mode"` DefaultTags map[string]string `toml:"tags"` - - Log telegraf.Logger `toml:"-"` - registryObj *schemaRegistry + Log telegraf.Logger `toml:"-"` + registryObj *schemaRegistry } func (p *Parser) Init() error { @@ -51,6 +51,14 @@ func (p *Parser) Init() error { default: return fmt.Errorf("unknown 'avro_format' %q", p.Format) } + switch p.UnionMode { + case "": + p.UnionMode = "flatten" + case "flatten", "nullable", "any": + // Do nothing as those are valid settings + default: + return fmt.Errorf("unknown avro_union_mode %q", p.Format) + } if (p.Schema == "" && p.SchemaRegistry == "") || (p.Schema != "" && p.SchemaRegistry != "") { return errors.New("exactly one of 'schema_registry' or 'schema' must be specified") @@ -153,6 +161,25 @@ func (p *Parser) SetDefaultTags(tags map[string]string) { p.DefaultTags = tags } +func (p *Parser) flattenField(fldName string, fldVal map[string]interface{}) map[string]interface{} { + // Helper function for the "nullable" and "any" p.UnionModes + // fldVal is a one-item map of string-to-something + ret := make(map[string]interface{}) + if p.UnionMode == "nullable" { + _, ok := fldVal["null"] + if ok { + return ret // Return the empty map + } + } + // Otherwise, we just return the value in the fieldname. + // See README.md for an important warning about "any" and "nullable". + for _, v := range fldVal { + ret[fldName] = v + break // Not really needed, since it's a one-item map + } + return ret +} + func (p *Parser) createMetric(data map[string]interface{}, schema string) (telegraf.Metric, error) { // Tags differ from fields, in that tags are inherently strings. // fields can be of any type. @@ -196,7 +223,29 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg for _, fld := range fieldList { candidate := make(map[string]interface{}) candidate[fld] = data[fld] // 1-item map - flat, err := flatten.Flatten(candidate, "", sep) + var flat map[string]interface{} + var err error + // Exactly how we flatten is decided by p.UnionMode + if p.UnionMode == "flatten" { + flat, err = flatten.Flatten(candidate, "", sep) + if err != nil { + return nil, fmt.Errorf("flatten candidate %q failed: %w", candidate, err) + } + } else { + // "nullable" or "any" + typedVal, ok := candidate[fld].(map[string]interface{}) + if !ok { + // the "key" is not a string, so ... + // most likely an array? Do the default thing + // and flatten the candidate. + flat, err = flatten.Flatten(candidate, "", sep) + if err != nil { + return nil, fmt.Errorf("flatten candidate %q failed: %w", candidate, err) + } + } else { + flat = p.flattenField(fld, typedVal) + } + } if err != nil { return nil, fmt.Errorf("flatten field %q failed: %w", fld, err) } @@ -204,7 +253,6 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg fields[k] = v } } - var schemaObj map[string]interface{} if err := json.Unmarshal([]byte(schema), &schemaObj); err != nil { return nil, fmt.Errorf("unmarshaling schema failed: %w", err) diff --git a/plugins/parsers/avro/testdata/json-array/expected.out b/plugins/parsers/avro/testdata/json-array/expected.out new file mode 100644 index 000000000..7651a0067 --- /dev/null +++ b/plugins/parsers/avro/testdata/json-array/expected.out @@ -0,0 +1 @@ +array,name=pi data_0=3,data_1=3.0999999046325684,data_2=3.140000104904175,data_3=3.1410000324249268 1682509200092000 diff --git a/plugins/parsers/avro/testdata/json-array/message.json b/plugins/parsers/avro/testdata/json-array/message.json new file mode 100644 index 000000000..b1c265ca4 --- /dev/null +++ b/plugins/parsers/avro/testdata/json-array/message.json @@ -0,0 +1,5 @@ +{ + "statistics_collection_time": 1682509200092, + "data": [ 3, 3.1, 3.14, 3.141 ], + "name": "pi" +} diff --git a/plugins/parsers/avro/testdata/json-array/telegraf.conf b/plugins/parsers/avro/testdata/json-array/telegraf.conf new file mode 100644 index 000000000..1133f3849 --- /dev/null +++ b/plugins/parsers/avro/testdata/json-array/telegraf.conf @@ -0,0 +1,24 @@ +[[ inputs.file ]] + files = ["./testdata/json-array/message.json"] + data_format = "avro" + + avro_format = "json" + avro_measurement = "array" + avro_tags = ["name"] + avro_timestamp = "statistics_collection_time" + avro_timestamp_format = "unix_ms" + avro_fields = ["data"] + avro_field_separator = "_" + avro_schema = ''' + { + "namespace": "constants", + "name": "classical", + "type": "record", + "version": "1", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "data", "type": "array", "items": "float"}, + {"name": "statistics_collection_time", "type": "long"} + ] + } + ''' diff --git a/plugins/parsers/avro/testdata/json-format/expected.out b/plugins/parsers/avro/testdata/json-format/expected.out index dca80eb98..14e257caa 100644 --- a/plugins/parsers/avro/testdata/json-format/expected.out +++ b/plugins/parsers/avro/testdata/json-format/expected.out @@ -1 +1 @@ -Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A statistics_collection_time=1682509200092i,up_time=1166984904i,cpu_utilization=14.0,memory_utilization=20.0 1682509200092000 +Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A up_time=1166984904i,cpu_utilization=14.0,memory_utilization=20.0 1682509200092000 diff --git a/plugins/parsers/avro/testdata/json-format/telegraf.conf b/plugins/parsers/avro/testdata/json-format/telegraf.conf index ddf8ccff9..e2238f673 100644 --- a/plugins/parsers/avro/testdata/json-format/telegraf.conf +++ b/plugins/parsers/avro/testdata/json-format/telegraf.conf @@ -5,7 +5,7 @@ avro_format = "json" avro_measurement = "Switch" avro_tags = ["switch_wwn"] - avro_fields = ["up_time", "cpu_utilization", "memory_utilization", "statistics_collection_time"] + avro_fields = ["up_time", "cpu_utilization", "memory_utilization"] avro_timestamp = "statistics_collection_time" avro_timestamp_format = "unix_ms" avro_schema = ''' diff --git a/plugins/parsers/avro/testdata/union-any/expected.out b/plugins/parsers/avro/testdata/union-any/expected.out new file mode 100644 index 000000000..6f881b083 --- /dev/null +++ b/plugins/parsers/avro/testdata/union-any/expected.out @@ -0,0 +1 @@ +Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A statistics_collection_time=1682509200092i,up_time=1166984904i,cpu_utilization=11i,memory_utilization=20.0 1682509200092000 diff --git a/plugins/parsers/avro/testdata/union-any/message.json b/plugins/parsers/avro/testdata/union-any/message.json new file mode 100644 index 000000000..65680595b --- /dev/null +++ b/plugins/parsers/avro/testdata/union-any/message.json @@ -0,0 +1,11 @@ +{ + "switch_wwn": "10:00:50:EB:1A:0B:84:3A", + "statistics_collection_time": 1682509200092, + "up_time": 1166984904, + "cpu_utilization": { + "int": 11 + }, + "memory_utilization": { + "float": 20.0 + } +} diff --git a/plugins/parsers/avro/testdata/union-any/telegraf.conf b/plugins/parsers/avro/testdata/union-any/telegraf.conf new file mode 100644 index 000000000..e4966aee8 --- /dev/null +++ b/plugins/parsers/avro/testdata/union-any/telegraf.conf @@ -0,0 +1,26 @@ +[[ inputs.file ]] + files = ["./testdata/union-any/message.json"] + data_format = "avro" + + avro_format = "json" + avro_measurement = "Switch" + avro_tags = ["switch_wwn"] + avro_fields = ["up_time", "cpu_utilization", "memory_utilization", "statistics_collection_time"] + avro_timestamp = "statistics_collection_time" + avro_timestamp_format = "unix_ms" + avro_union_mode = "any" + avro_schema = ''' + { + "namespace": "com.brocade.streaming", + "name": "fibrechannel_switch_statistics", + "type": "record", + "version": "1", + "fields": [ + {"name": "switch_wwn", "type": "string", "doc": "WWN of the Physical Switch."}, + {"name": "statistics_collection_time", "type": "long", "doc": "Epoch time when statistics is collected."}, + {"name": "up_time", "type": "long", "doc": "Switch Up Time (in hundredths of a second)"}, + {"name": "cpu_utilization", "type": ["null", "float", "int"], "default": null, "doc": "CPU Utilization in %"}, + {"name": "memory_utilization", "type": ["null", "float"], "doc": "Memory Utilization in %"} + ] + } + ''' diff --git a/plugins/parsers/avro/testdata/union-array/expected.out b/plugins/parsers/avro/testdata/union-array/expected.out new file mode 100644 index 000000000..7651a0067 --- /dev/null +++ b/plugins/parsers/avro/testdata/union-array/expected.out @@ -0,0 +1 @@ +array,name=pi data_0=3,data_1=3.0999999046325684,data_2=3.140000104904175,data_3=3.1410000324249268 1682509200092000 diff --git a/plugins/parsers/avro/testdata/union-array/message.json b/plugins/parsers/avro/testdata/union-array/message.json new file mode 100644 index 000000000..b1c265ca4 --- /dev/null +++ b/plugins/parsers/avro/testdata/union-array/message.json @@ -0,0 +1,5 @@ +{ + "statistics_collection_time": 1682509200092, + "data": [ 3, 3.1, 3.14, 3.141 ], + "name": "pi" +} diff --git a/plugins/parsers/avro/testdata/union-array/telegraf.conf b/plugins/parsers/avro/testdata/union-array/telegraf.conf new file mode 100644 index 000000000..75ef5cb40 --- /dev/null +++ b/plugins/parsers/avro/testdata/union-array/telegraf.conf @@ -0,0 +1,25 @@ +[[ inputs.file ]] + files = ["./testdata/union-array/message.json"] + data_format = "avro" + + avro_format = "json" + avro_measurement = "array" + avro_tags = ["name"] + avro_timestamp = "statistics_collection_time" + avro_timestamp_format = "unix_ms" + avro_fields = ["data"] + avro_union_mode = "any" + avro_field_separator = "_" + avro_schema = ''' + { + "namespace": "constants", + "name": "classical", + "type": "record", + "version": "1", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "data", "type": "array", "items": "float"}, + {"name": "statistics_collection_time", "type": "long"} + ] + } + ''' diff --git a/plugins/parsers/avro/testdata/union-nullable/expected.out b/plugins/parsers/avro/testdata/union-nullable/expected.out new file mode 100644 index 000000000..7cb48b808 --- /dev/null +++ b/plugins/parsers/avro/testdata/union-nullable/expected.out @@ -0,0 +1 @@ +Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A statistics_collection_time=1682509200092i,up_time=1166984904i,memory_utilization=20.0 1682509200092000 diff --git a/plugins/parsers/avro/testdata/union-nullable/message.json b/plugins/parsers/avro/testdata/union-nullable/message.json new file mode 100644 index 000000000..e65d34764 --- /dev/null +++ b/plugins/parsers/avro/testdata/union-nullable/message.json @@ -0,0 +1,11 @@ +{ + "switch_wwn": "10:00:50:EB:1A:0B:84:3A", + "statistics_collection_time": 1682509200092, + "up_time": 1166984904, + "cpu_utilization": { + "null": null + }, + "memory_utilization": { + "float": 20.0 + } +} diff --git a/plugins/parsers/avro/testdata/union-nullable/telegraf.conf b/plugins/parsers/avro/testdata/union-nullable/telegraf.conf new file mode 100644 index 000000000..a03a7e5dc --- /dev/null +++ b/plugins/parsers/avro/testdata/union-nullable/telegraf.conf @@ -0,0 +1,26 @@ +[[ inputs.file ]] + files = ["./testdata/union-nullable/message.json"] + data_format = "avro" + + avro_format = "json" + avro_measurement = "Switch" + avro_tags = ["switch_wwn"] + avro_fields = ["up_time", "cpu_utilization", "memory_utilization", "statistics_collection_time"] + avro_timestamp = "statistics_collection_time" + avro_timestamp_format = "unix_ms" + avro_union_mode = "nullable" + avro_schema = ''' + { + "namespace": "com.brocade.streaming", + "name": "fibrechannel_switch_statistics", + "type": "record", + "version": "1", + "fields": [ + {"name": "switch_wwn", "type": "string", "doc": "WWN of the Physical Switch."}, + {"name": "statistics_collection_time", "type": "long", "doc": "Epoch time when statistics is collected."}, + {"name": "up_time", "type": "long", "doc": "Switch Up Time (in hundredths of a second)"}, + {"name": "cpu_utilization", "type": ["null","float"], "default": null, "doc": "CPU Utilization in %"}, + {"name": "memory_utilization", "type": ["null", "float"], "default": null, "doc": "Memory Utilization in %"} + ] + } + ''' diff --git a/plugins/parsers/avro/testdata/union/expected.out b/plugins/parsers/avro/testdata/union/expected.out new file mode 100644 index 000000000..184357b16 --- /dev/null +++ b/plugins/parsers/avro/testdata/union/expected.out @@ -0,0 +1 @@ +Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A statistics_collection_time=1682509200092i,up_time=1166984904i,memory_utilization_float=20.0 1682509200092000 diff --git a/plugins/parsers/avro/testdata/union/message.json b/plugins/parsers/avro/testdata/union/message.json new file mode 100644 index 000000000..e65d34764 --- /dev/null +++ b/plugins/parsers/avro/testdata/union/message.json @@ -0,0 +1,11 @@ +{ + "switch_wwn": "10:00:50:EB:1A:0B:84:3A", + "statistics_collection_time": 1682509200092, + "up_time": 1166984904, + "cpu_utilization": { + "null": null + }, + "memory_utilization": { + "float": 20.0 + } +} diff --git a/plugins/parsers/avro/testdata/union/telegraf.conf b/plugins/parsers/avro/testdata/union/telegraf.conf new file mode 100644 index 000000000..dad3fb0a2 --- /dev/null +++ b/plugins/parsers/avro/testdata/union/telegraf.conf @@ -0,0 +1,26 @@ +[[ inputs.file ]] + files = ["./testdata/union/message.json"] + data_format = "avro" + + avro_format = "json" + avro_measurement = "Switch" + avro_tags = ["switch_wwn"] + avro_fields = ["up_time", "cpu_utilization", "memory_utilization", "statistics_collection_time"] + avro_timestamp = "statistics_collection_time" + avro_timestamp_format = "unix_ms" + avro_field_separator = "_" + avro_schema = ''' + { + "namespace": "com.brocade.streaming", + "name": "fibrechannel_switch_statistics", + "type": "record", + "version": "1", + "fields": [ + {"name": "switch_wwn", "type": "string", "doc": "WWN of the Physical Switch."}, + {"name": "statistics_collection_time", "type": "long", "doc": "Epoch time when statistics is collected."}, + {"name": "up_time", "type": "long", "doc": "Switch Up Time (in hundredths of a second)"}, + {"name": "cpu_utilization", "type": ["null", "float"], "default": null, "doc": "CPU Utilization in %"}, + {"name": "memory_utilization", "type": ["null", "float"], "default": null, "doc": "Memory Utilization in %"} + ] + } + '''