feat(parsers.avro): Add support for JSON format (#13607)
This commit is contained in:
parent
3da80fdfc6
commit
714f3c187f
|
|
@ -27,6 +27,10 @@ The message is supposed to be encoded as follows:
|
||||||
## Avro data format settings
|
## Avro data format settings
|
||||||
data_format = "avro"
|
data_format = "avro"
|
||||||
|
|
||||||
|
## Avro message format
|
||||||
|
## Supported values are "binary" (default) and "json"
|
||||||
|
# avro_format = "binary"
|
||||||
|
|
||||||
## Url of the schema registry; exactly one of schema registry and
|
## Url of the schema registry; exactly one of schema registry and
|
||||||
## schema must be set
|
## schema must be set
|
||||||
avro_schema_registry = "http://localhost:8081"
|
avro_schema_registry = "http://localhost:8081"
|
||||||
|
|
@ -83,6 +87,11 @@ The message is supposed to be encoded as follows:
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### `avro_format`
|
||||||
|
|
||||||
|
This optional setting specifies the format of the Avro messages. Currently, the
|
||||||
|
parser supports the `binary` and `json` formats with `binary` being the default.
|
||||||
|
|
||||||
### `avro_timestamp` and `avro_timestamp_format`
|
### `avro_timestamp` and `avro_timestamp_format`
|
||||||
|
|
||||||
By default the current time at ingestion will be used for all created
|
By default the current time at ingestion will be used for all created
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ type Parser struct {
|
||||||
MetricName string `toml:"metric_name"`
|
MetricName string `toml:"metric_name"`
|
||||||
SchemaRegistry string `toml:"avro_schema_registry"`
|
SchemaRegistry string `toml:"avro_schema_registry"`
|
||||||
Schema string `toml:"avro_schema"`
|
Schema string `toml:"avro_schema"`
|
||||||
|
Format string `toml:"avro_format"`
|
||||||
Measurement string `toml:"avro_measurement"`
|
Measurement string `toml:"avro_measurement"`
|
||||||
Tags []string `toml:"avro_tags"`
|
Tags []string `toml:"avro_tags"`
|
||||||
Fields []string `toml:"avro_fields"`
|
Fields []string `toml:"avro_fields"`
|
||||||
|
|
@ -40,6 +41,15 @@ type Parser struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Parser) Init() error {
|
func (p *Parser) Init() error {
|
||||||
|
switch p.Format {
|
||||||
|
case "":
|
||||||
|
p.Format = "binary"
|
||||||
|
case "binary", "json":
|
||||||
|
// Do nothing as those are valid settings
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown 'avro_format' %q", p.Format)
|
||||||
|
}
|
||||||
|
|
||||||
if (p.Schema == "" && p.SchemaRegistry == "") || (p.Schema != "" && p.SchemaRegistry != "") {
|
if (p.Schema == "" && p.SchemaRegistry == "") || (p.Schema != "" && p.SchemaRegistry != "") {
|
||||||
return errors.New("exactly one of 'schema_registry' or 'schema' must be specified")
|
return errors.New("exactly one of 'schema_registry' or 'schema' must be specified")
|
||||||
}
|
}
|
||||||
|
|
@ -54,6 +64,7 @@ func (p *Parser) Init() error {
|
||||||
if p.SchemaRegistry != "" {
|
if p.SchemaRegistry != "" {
|
||||||
p.registryObj = newSchemaRegistry(p.SchemaRegistry)
|
p.registryObj = newSchemaRegistry(p.SchemaRegistry)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -93,7 +104,16 @@ func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
native, _, err := codec.NativeFromBinary(message)
|
|
||||||
|
var native interface{}
|
||||||
|
switch p.Format {
|
||||||
|
case "binary":
|
||||||
|
native, _, err = codec.NativeFromBinary(message)
|
||||||
|
case "json":
|
||||||
|
native, _, err = codec.NativeFromTextual(message)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown format %q", p.Format)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
Switch,switch_wwn=10:00:50:EB:1A:0B:84:3A statistics_collection_time=1682509200092i,up_time=1166984904i,cpu_utilization=14.0,memory_utilization=20.0 1682509200092000
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
{
|
||||||
|
"switch_wwn": "10:00:50:EB:1A:0B:84:3A",
|
||||||
|
"statistics_collection_time": 1682509200092,
|
||||||
|
"up_time": 1166984904,
|
||||||
|
"cpu_utilization": 14.0,
|
||||||
|
"memory_utilization": 20.0
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,25 @@
|
||||||
|
[[ inputs.file ]]
|
||||||
|
files = ["./testdata/json-format/message.json"]
|
||||||
|
data_format = "avro"
|
||||||
|
|
||||||
|
avro_format = "json"
|
||||||
|
avro_measurement = "Switch"
|
||||||
|
avro_tags = ["switch_wwn"]
|
||||||
|
avro_fields = ["up_time", "cpu_utilization", "memory_utilization"]
|
||||||
|
avro_timestamp = "statistics_collection_time"
|
||||||
|
avro_timestamp_format = "unix_ms"
|
||||||
|
avro_schema = '''
|
||||||
|
{
|
||||||
|
"namespace": "com.brocade.streaming",
|
||||||
|
"name": "fibrechannel_switch_statistics",
|
||||||
|
"type": "record",
|
||||||
|
"version": "1",
|
||||||
|
"fields": [
|
||||||
|
{"name": "switch_wwn", "type": "string", "doc": "WWN of the Physical Switch."},
|
||||||
|
{"name": "statistics_collection_time", "type": "long", "doc": "Epoch time when statistics is collected."},
|
||||||
|
{"name": "up_time", "type": "long", "doc": "Switch Up Time (in hundredths of a second)"},
|
||||||
|
{"name": "cpu_utilization", "type": "float", "default": 0, "doc": "CPU Utilization in %"},
|
||||||
|
{"name": "memory_utilization", "type": "float", "default": 0, "doc": "Memory Utilization in %"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
'''
|
||||||
Loading…
Reference in New Issue