diff --git a/plugins/parsers/avro/README.md b/plugins/parsers/avro/README.md index 78d7bf587..36089f441 100644 --- a/plugins/parsers/avro/README.md +++ b/plugins/parsers/avro/README.md @@ -27,6 +27,10 @@ The message is supposed to be encoded as follows: ## Avro data format settings data_format = "avro" + ## Avro message format + ## Supported values are "binary" (default) and "json" + # avro_format = "binary" + ## Url of the schema registry; exactly one of schema registry and ## schema must be set avro_schema_registry = "http://localhost:8081" @@ -83,6 +87,11 @@ The message is supposed to be encoded as follows: ``` +### `avro_format` + +This optional setting specifies the format of the Avro messages. Currently, the +parser supports the `binary` and `json` formats with `binary` being the default. + ### `avro_timestamp` and `avro_timestamp_format` By default the current time at ingestion will be used for all created diff --git a/plugins/parsers/avro/parser.go b/plugins/parsers/avro/parser.go index c6d5ecbdd..44011581c 100644 --- a/plugins/parsers/avro/parser.go +++ b/plugins/parsers/avro/parser.go @@ -27,6 +27,7 @@ type Parser struct { MetricName string `toml:"metric_name"` SchemaRegistry string `toml:"avro_schema_registry"` Schema string `toml:"avro_schema"` + Format string `toml:"avro_format"` Measurement string `toml:"avro_measurement"` Tags []string `toml:"avro_tags"` Fields []string `toml:"avro_fields"` @@ -40,6 +41,15 @@ type Parser struct { } func (p *Parser) Init() error { + switch p.Format { + case "": + p.Format = "binary" + case "binary", "json": + // Do nothing as those are valid settings + default: + return fmt.Errorf("unknown 'avro_format' %q", p.Format) + } + if (p.Schema == "" && p.SchemaRegistry == "") || (p.Schema != "" && p.SchemaRegistry != "") { return errors.New("exactly one of 'schema_registry' or 'schema' must be specified") } @@ -54,6 +64,7 @@ func (p *Parser) Init() error { if p.SchemaRegistry != "" { p.registryObj = newSchemaRegistry(p.SchemaRegistry) } + return nil } @@ -93,7 +104,16 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { return nil, err } } - native, _, err := codec.NativeFromBinary(message) + + var native interface{} + switch p.Format { + case "binary": + native, _, err = codec.NativeFromBinary(message) + case "json": + native, _, err = codec.NativeFromTextual(message) + default: + return nil, fmt.Errorf("unknown format %q", p.Format) + } if err != nil { return nil, err } diff --git a/plugins/parsers/avro/testdata/json-format/expected.out b/plugins/parsers/avro/testdata/json-format/expected.out new file mode 100644 index 000000000..dca80eb98 --- /dev/null +++ b/plugins/parsers/avro/testdata/json-format/expected.out @@ -0,0 +1 @@ +Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A statistics_collection_time=1682509200092i,up_time=1166984904i,cpu_utilization=14.0,memory_utilization=20.0 1682509200092000 diff --git a/plugins/parsers/avro/testdata/json-format/message.json b/plugins/parsers/avro/testdata/json-format/message.json new file mode 100644 index 000000000..5420b8718 --- /dev/null +++ b/plugins/parsers/avro/testdata/json-format/message.json @@ -0,0 +1,7 @@ +{ + "switch_wwn": "10:00:50:EB:1A:0B:84:3A", + "statistics_collection_time": 1682509200092, + "up_time": 1166984904, + "cpu_utilization": 14.0, + "memory_utilization": 20.0 +} \ No newline at end of file diff --git a/plugins/parsers/avro/testdata/json-format/telegraf.conf b/plugins/parsers/avro/testdata/json-format/telegraf.conf new file mode 100644 index 000000000..d61ece989 --- /dev/null +++ b/plugins/parsers/avro/testdata/json-format/telegraf.conf @@ -0,0 +1,25 @@ +[[ inputs.file ]] + files = ["./testdata/json-format/message.json"] + data_format = "avro" + + avro_format = "json" + avro_measurement = "Switch" + avro_tags = ["switch_wwn"] + avro_fields = ["up_time", "cpu_utilization", "memory_utilization"] + avro_timestamp = "statistics_collection_time" + avro_timestamp_format = "unix_ms" + avro_schema = ''' + { + "namespace": "com.brocade.streaming", + "name": "fibrechannel_switch_statistics", + "type": "record", + "version": "1", + "fields": [ + {"name": "switch_wwn", "type": "string", "doc": "WWN of the Physical Switch."}, + {"name": "statistics_collection_time", "type": "long", "doc": "Epoch time when statistics is collected."}, + {"name": "up_time", "type": "long", "doc": "Switch Up Time (in hundredths of a second)"}, + {"name": "cpu_utilization", "type": "float", "default": 0, "doc": "CPU Utilization in %"}, + {"name": "memory_utilization", "type": "float", "default": 0, "doc": "Memory Utilization in %"} + ] + } + ''' \ No newline at end of file