feat(parsers.avro): Get metric name from the message field (#13914)

This commit is contained in:
Anton Belousov 2023-09-20 22:14:55 +04:00 committed by GitHub
parent ac4cb12db8
commit fbe2fb47eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 13 deletions

View File

@ -10,6 +10,20 @@ The message is supposed to be encoded as follows:
| 1-4 | Schema ID | 4-byte schema ID as returned by Schema Registry. |
| 5- | Data | Serialized data. |
The metric name will be set according the following priority:
1. Try to get metric name from the message field if it is set in the
`avro_measurement_field` option.
2. If the name is not determined, then try to get it from
`avro_measurement` option as the static value.
3. If the name is still not determined, then try to get it from the
schema definition in the following format `[schema_namespace.]schema_name`,
where schema namespace is optional and will be added only if it is specified
in the schema definition.
In case if the metric name could not be determined according to these steps
the error will be rised and the message will not be parsed.
## Configuration
```toml
@ -63,6 +77,11 @@ The message is supposed to be encoded as follows:
# }
#'''
## Measurement field name; The meauserment name will be taken
## from this field. If not set, determine measurement name
## from the following 'avro_measurement' option
# avro_measurement_field = "field_name"
## Measurement string; if not set, determine measurement name from
## schema (as "<namespace>.<name>")
# avro_measurement = "ratings"

View File

@ -24,18 +24,19 @@ import (
// an attached schema or schema fingerprint
type Parser struct {
MetricName string `toml:"metric_name"`
SchemaRegistry string `toml:"avro_schema_registry"`
CaCertPath string `toml:"avro_schema_registry_cert"`
Schema string `toml:"avro_schema"`
Format string `toml:"avro_format"`
Measurement string `toml:"avro_measurement"`
Tags []string `toml:"avro_tags"`
Fields []string `toml:"avro_fields"`
Timestamp string `toml:"avro_timestamp"`
TimestampFormat string `toml:"avro_timestamp_format"`
FieldSeparator string `toml:"avro_field_separator"`
DefaultTags map[string]string `toml:"tags"`
MetricName string `toml:"metric_name"`
SchemaRegistry string `toml:"avro_schema_registry"`
CaCertPath string `toml:"avro_schema_registry_cert"`
Schema string `toml:"avro_schema"`
Format string `toml:"avro_format"`
Measurement string `toml:"avro_measurement"`
MeasurementField string `toml:"avro_measurement_field"`
Tags []string `toml:"avro_tags"`
Fields []string `toml:"avro_fields"`
Timestamp string `toml:"avro_timestamp"`
TimestampFormat string `toml:"avro_timestamp_format"`
FieldSeparator string `toml:"avro_field_separator"`
DefaultTags map[string]string `toml:"tags"`
Log telegraf.Logger `toml:"-"`
registryObj *schemaRegistry
@ -212,9 +213,26 @@ func (p *Parser) createMetric(data map[string]interface{}, schema string) (teleg
// A telegraf metric needs at least one field.
return nil, errors.New("number of fields is 0; unable to create metric")
}
// If measurement field name is specified in the configuration
// take value from that field and do not include it into fields or tags
name := ""
if p.MeasurementField != "" {
sField := p.MeasurementField
sMetric, err := internal.ToString(data[sField])
if err != nil {
p.Log.Warnf("Could not convert %v to string for metric name %q: %w", data[sField], sField, err)
} else {
name = sMetric
}
}
// Now some fancy stuff to extract the measurement.
// If it's set in the configuration, use that.
name := p.Measurement
if name == "" {
// If field name is not specified or field does not exist and
// metric name set in the configuration, use that.
name = p.Measurement
}
separator := "."
if name == "" {
// Try using the namespace defined in the schema. In case there

View File

@ -0,0 +1 @@
cpu_load,Server=test_server Value=18.7 1694526986671

View File

@ -0,0 +1 @@
ÞæîšÑbtest_server33333³2@cpu_load

View File

@ -0,0 +1,30 @@
[[ inputs.file ]]
files = ["./testdata/measurement_name_from_message/message.avro"]
data_format = "avro"
avro_measurement_field = "Measurement"
avro_tags = [ "Server" ]
avro_fields = [ "Value" ]
avro_schema = '''
{
"type": "record",
"name": "TestRecord",
"fields": [
{
"name": "ServerTs",
"type": "long"
},
{
"name": "Server",
"type": "string"
},
{
"name": "Value",
"type": "double"
},
{
"name": "Measurement",
"type": "string"
}
]
}
'''