feat(parsers.avro): Add Apache Avro parser (#11816)
This commit is contained in:
parent
ed91ca2bfd
commit
acd1500d2b
|
|
@ -2,9 +2,10 @@
|
|||
|
||||
Telegraf contains many general purpose plugins that support parsing input data
|
||||
using a configurable parser into [metrics][]. This allows, for example, the
|
||||
`kafka_consumer` input plugin to process messages in either InfluxDB Line
|
||||
Protocol or in JSON format.
|
||||
`kafka_consumer` input plugin to process messages in any of InfluxDB Line
|
||||
Protocol, JSON format, or Apache Avro format.
|
||||
|
||||
- [Avro](/plugins/parsers/avro)
|
||||
- [Binary](/plugins/parsers/binary)
|
||||
- [Collectd](/plugins/parsers/collectd)
|
||||
- [CSV](/plugins/parsers/csv)
|
||||
|
|
|
|||
|
|
@ -198,6 +198,7 @@ following works:
|
|||
- github.com/jcmturner/gofork [BSD 3-Clause "New" or "Revised" License](https://github.com/jcmturner/gofork/blob/master/LICENSE)
|
||||
- github.com/jcmturner/gokrb5 [Apache License 2.0](https://github.com/jcmturner/gokrb5/blob/master/LICENSE)
|
||||
- github.com/jcmturner/rpc [Apache License 2.0](https://github.com/jcmturner/rpc/blob/master/LICENSE)
|
||||
- github.com/jeremywohl/flatten [MIT License](https://github.com/jeremywohl/flatten/blob/master/LICENSE)
|
||||
- github.com/jhump/protoreflect [Apache License 2.0](https://github.com/jhump/protoreflect/blob/master/LICENSE)
|
||||
- github.com/jmespath/go-jmespath [Apache License 2.0](https://github.com/jmespath/go-jmespath/blob/master/LICENSE)
|
||||
- github.com/josharian/intern [MIT License](https://github.com/josharian/intern/blob/master/LICENSE.md)
|
||||
|
|
@ -212,6 +213,7 @@ following works:
|
|||
- github.com/kolo/xmlrpc [MIT License](https://github.com/kolo/xmlrpc/blob/master/LICENSE)
|
||||
- github.com/kylelemons/godebug [Apache License 2.0](https://github.com/kylelemons/godebug/blob/master/LICENSE)
|
||||
- github.com/leodido/ragel-machinery [MIT License](https://github.com/leodido/ragel-machinery/blob/develop/LICENSE)
|
||||
- github.com/linkedin/goavro [Apache License 2.0](https://github.com/linkedin/goavro/blob/master/LICENSE)
|
||||
- github.com/logzio/azure-monitor-metrics-receiver [MIT License](https://github.com/logzio/azure-monitor-metrics-receiver/blob/master/LICENSE)
|
||||
- github.com/magiconair/properties [BSD 2-Clause "Simplified" License](https://github.com/magiconair/properties/blob/main/LICENSE.md)
|
||||
- github.com/mailru/easyjson [MIT License](https://github.com/mailru/easyjson/blob/master/LICENSE)
|
||||
|
|
|
|||
2
go.mod
2
go.mod
|
|
@ -108,12 +108,14 @@ require (
|
|||
github.com/jackc/pgtype v1.12.0
|
||||
github.com/jackc/pgx/v4 v4.17.1
|
||||
github.com/james4k/rcon v0.0.0-20120923215419-8fbb8268b60a
|
||||
github.com/jeremywohl/flatten/v2 v2.0.0-20211013061545-07e4a09fb8e4
|
||||
github.com/jhump/protoreflect v1.8.3-0.20210616212123-6cc1efa697ca
|
||||
github.com/jmespath/go-jmespath v0.4.0
|
||||
github.com/kardianos/service v1.2.2
|
||||
github.com/karrick/godirwalk v1.17.0
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
|
||||
github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b
|
||||
github.com/linkedin/goavro/v2 v2.12.0
|
||||
github.com/logzio/azure-monitor-metrics-receiver v1.0.0
|
||||
github.com/lxc/lxd v0.0.0-20220920163450-e9b4b514106a
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -1383,6 +1383,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE
|
|||
github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0=
|
||||
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
|
||||
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
|
||||
github.com/jeremywohl/flatten/v2 v2.0.0-20211013061545-07e4a09fb8e4 h1:eA9wi6ZzpIRobvXkn/S2Lyw1hr2pc71zxzOPl7Xjs4w=
|
||||
github.com/jeremywohl/flatten/v2 v2.0.0-20211013061545-07e4a09fb8e4/go.mod h1:s9g9Dfls+aEgucKXKW+i8MRZuLXT2MrD/WjYpMnWfOw=
|
||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/jgautheron/goconst v1.4.0/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4=
|
||||
github.com/jhump/protoreflect v1.6.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74=
|
||||
|
|
@ -1530,6 +1532,8 @@ github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8=
|
|||
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
|
||||
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
|
||||
github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg=
|
||||
github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
|
||||
github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
|
||||
github.com/logzio/azure-monitor-metrics-receiver v1.0.0 h1:TAzhIZL2ueyyc81qIw8FGg4nUbts4Hvc3oOxSobY1IA=
|
||||
github.com/logzio/azure-monitor-metrics-receiver v1.0.0/go.mod h1:UIaQ7UgxZ8jO3L0JB2hctsHFBbZqL6mbxYscQAeFpl4=
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
//go:build !custom || parsers || parsers.avro
|
||||
|
||||
package all
|
||||
|
||||
import _ "github.com/influxdata/telegraf/plugins/parsers/avro" // register plugin
|
||||
|
|
@ -0,0 +1,108 @@
|
|||
# Avro Parser Plugin
|
||||
|
||||
The `Avro` parser creates metrics from a message serialized with Avro.
|
||||
|
||||
The message is supposed to be encoded as follows:
|
||||
|
||||
| Bytes | Area | Description |
|
||||
| ----- | ---------- | ------------------------------------------------ |
|
||||
| 0 | Magic Byte | Confluent serialization format version number. |
|
||||
| 1-4 | Schema ID | 4-byte schema ID as returned by Schema Registry. |
|
||||
| 5- | Data | Serialized data. |
|
||||
|
||||
## Configuration
|
||||
|
||||
```toml
|
||||
[[inputs.kafka_consumer]]
|
||||
## Kafka brokers.
|
||||
brokers = ["localhost:9092"]
|
||||
|
||||
## Topics to consume.
|
||||
topics = ["telegraf"]
|
||||
|
||||
## Maximum length of a message to consume, in bytes (default 0/unlimited);
|
||||
## larger messages are dropped
|
||||
max_message_len = 1000000
|
||||
|
||||
## Avro data format settings
|
||||
data_format = "avro"
|
||||
|
||||
## Url of the schema registry; exactly one of schema registry and
|
||||
## schema must be set
|
||||
avro_schema_registry = "http://localhost:8081"
|
||||
|
||||
## Schema string; exactly one of schema registry and schema must be set
|
||||
#avro_schema = """
|
||||
# {
|
||||
# "type":"record",
|
||||
# "name":"Value",
|
||||
# "namespace":"com.example",
|
||||
# "fields":[
|
||||
# {
|
||||
# "name":"tag",
|
||||
# "type":"string"
|
||||
# },
|
||||
# {
|
||||
# "name":"field",
|
||||
# "type":"long"
|
||||
# },
|
||||
# {
|
||||
# "name":"timestamp",
|
||||
# "type":"long"
|
||||
# }
|
||||
# ]
|
||||
# }
|
||||
#"""
|
||||
|
||||
## Measurement string; if not set, determine measurement name from
|
||||
## schema (as "<namespace>.<name>")
|
||||
# avro_measurement = "ratings"
|
||||
|
||||
## Avro fields to be used as tags; optional.
|
||||
# avro_tags = ["CHANNEL", "CLUB_STATUS"]
|
||||
|
||||
## Avro fields to be used as fields; if empty, any Avro fields
|
||||
## detected from the schema, not used as tags, will be used as
|
||||
## measurement fields.
|
||||
# avro_fields = ["STARS"]
|
||||
|
||||
## Avro fields to be used as timestamp; if empty, current time will
|
||||
## be used for the measurement timestamp.
|
||||
# avro_timestamp = ""
|
||||
## If avro_timestamp is specified, avro_timestamp_format must be set
|
||||
## to one of 'unix', 'unix_ms', 'unix_us', or 'unix_ns'
|
||||
# avro_timestamp_format = "unix"
|
||||
|
||||
## Used to separate parts of array structures. As above, the default
|
||||
## is the empty string, so a=["a", "b"] becomes a0="a", a1="b".
|
||||
## If this were set to "_", then it would be a_0="a", a_1="b".
|
||||
# avro_field_separator = "_"
|
||||
|
||||
## Default values for given tags: optional
|
||||
# tags = { "application": "hermes", "region": "central" }
|
||||
|
||||
```
|
||||
|
||||
### `avro_timestamp` and `avro_timestamp_format`
|
||||
|
||||
By default the current time at ingestion will be used for all created
|
||||
metrics; to set the time using the Avro message you can use the
|
||||
`avro_timestamp` and `avro_timestamp_format` options together to set the
|
||||
time to a value in the parsed document.
|
||||
|
||||
The `avro_timestamp` option specifies the field containing the time
|
||||
value. If it is not set, the time of record ingestion is used. If it
|
||||
is set, the field may be any numerical type: notably, it is *not*
|
||||
constrained to an Avro `long` (int64) (which Avro uses for timestamps in
|
||||
millisecond or microsecond resolution). However, it must represent the
|
||||
number of time increments since the Unix epoch (00:00 UTC 1 Jan 1970).
|
||||
|
||||
The `avro_timestamp_format` option specifies the precision of the timestamp
|
||||
field, and, if set, must be one of `unix`, `unix_ms`, `unix_us`, or
|
||||
`unix_ns`. If `avro_timestamp` is set, `avro_timestamp_format` must be
|
||||
as well.
|
||||
|
||||
## Metrics
|
||||
|
||||
One metric is created for each message. The type of each field is
|
||||
automatically determined based on the schema.
|
||||
|
|
@ -0,0 +1,255 @@
|
|||
package avro
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jeremywohl/flatten/v2"
|
||||
"github.com/linkedin/goavro/v2"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
"github.com/influxdata/telegraf/plugins/parsers"
|
||||
)
|
||||
|
||||
// If SchemaRegistry is set, we assume that our input will be in
|
||||
// Confluent Wire Format
|
||||
// (https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format) and we will load the schema from the registry.
|
||||
|
||||
// If Schema is set, we assume the input will be Avro binary format, without
|
||||
// an attached schema or schema fingerprint
|
||||
|
||||
type Parser struct {
|
||||
MetricName string `toml:"metric_name"`
|
||||
SchemaRegistry string `toml:"avro_schema_registry"`
|
||||
Schema string `toml:"avro_schema"`
|
||||
Measurement string `toml:"avro_measurement"`
|
||||
Tags []string `toml:"avro_tags"`
|
||||
Fields []string `toml:"avro_fields"`
|
||||
Timestamp string `toml:"avro_timestamp"`
|
||||
TimestampFormat string `toml:"avro_timestamp_format"`
|
||||
FieldSeparator string `toml:"avro_field_separator"`
|
||||
DefaultTags map[string]string `toml:"tags"`
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
registryObj *schemaRegistry
|
||||
}
|
||||
|
||||
func (p *Parser) Init() error {
|
||||
if (p.Schema == "" && p.SchemaRegistry == "") || (p.Schema != "" && p.SchemaRegistry != "") {
|
||||
return errors.New("exactly one of 'schema_registry' or 'schema' must be specified")
|
||||
}
|
||||
if p.TimestampFormat == "" {
|
||||
if p.Timestamp != "" {
|
||||
return errors.New("if 'timestamp' field is specified, 'timestamp_format' must be as well")
|
||||
}
|
||||
if p.TimestampFormat != "unix" && p.TimestampFormat != "unix_us" && p.TimestampFormat != "unix_ms" && p.TimestampFormat != "unix_ns" {
|
||||
return fmt.Errorf("invalid timestamp format '%v'", p.TimestampFormat)
|
||||
}
|
||||
}
|
||||
if p.SchemaRegistry != "" {
|
||||
p.registryObj = newSchemaRegistry(p.SchemaRegistry)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
|
||||
var schema string
|
||||
var codec *goavro.Codec
|
||||
var err error
|
||||
var message []byte
|
||||
message = buf[:]
|
||||
|
||||
if p.registryObj != nil {
|
||||
// The input must be Confluent Wire Protocol
|
||||
if buf[0] != 0 {
|
||||
return nil, errors.New("first byte is not 0: not Confluent Wire Protocol")
|
||||
}
|
||||
schemaID := int(binary.BigEndian.Uint32(buf[1:5]))
|
||||
schemastruct, err := p.registryObj.getSchemaAndCodec(schemaID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
schema = schemastruct.Schema
|
||||
codec = schemastruct.Codec
|
||||
message = buf[5:]
|
||||
} else {
|
||||
// Check for single-object encoding
|
||||
magicBytes := int(binary.BigEndian.Uint16(buf[:2]))
|
||||
expectedMagic := int(binary.BigEndian.Uint16([]byte("c301")))
|
||||
if magicBytes == expectedMagic {
|
||||
message = buf[10:]
|
||||
// We could in theory validate the fingerprint against
|
||||
// the schema. Maybe later.
|
||||
// We would get the fingerprint as int(binary.LittleEndian.Uint64(buf[2:10]))
|
||||
} // Otherwise we assume bare Avro binary
|
||||
schema = p.Schema
|
||||
codec, err = goavro.NewCodec(schema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
native, _, err := codec.NativeFromBinary(message)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Cast to string-to-interface
|
||||
codecSchema, ok := native.(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("native is of unsupported type %T", native)
|
||||
}
|
||||
m, err := p.createMetric(codecSchema, schema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return []telegraf.Metric{m}, nil
|
||||
}
|
||||
|
||||
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
|
||||
metrics, err := p.Parse([]byte(line))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(metrics) != 1 {
|
||||
return nil, errors.New("line contains multiple metrics")
|
||||
}
|
||||
|
||||
return metrics[0], nil
|
||||
}
|
||||
|
||||
func (p *Parser) SetDefaultTags(tags map[string]string) {
|
||||
p.DefaultTags = tags
|
||||
}
|
||||
|
||||
func (p *Parser) createMetric(data map[string]interface{}, schema string) (telegraf.Metric, error) {
|
||||
// Tags differ from fields, in that tags are inherently strings.
|
||||
// fields can be of any type.
|
||||
fields := make(map[string]interface{})
|
||||
tags := make(map[string]string)
|
||||
|
||||
// Set default tag values
|
||||
for k, v := range p.DefaultTags {
|
||||
tags[k] = v
|
||||
}
|
||||
// Avro doesn't have a Tag/Field distinction, so we have to tell
|
||||
// Telegraf which items are our tags.
|
||||
for _, tag := range p.Tags {
|
||||
sTag, err := internal.ToString(data[tag])
|
||||
if err != nil {
|
||||
p.Log.Warnf("Could not convert %v to string for tag %q: %v", data[tag], tag, err)
|
||||
continue
|
||||
}
|
||||
tags[tag] = sTag
|
||||
}
|
||||
var fieldList []string
|
||||
if len(p.Fields) != 0 {
|
||||
// If you have specified your fields in the config, you
|
||||
// get what you asked for.
|
||||
fieldList = p.Fields
|
||||
|
||||
// Except...if you specify the timestamp field, and it's
|
||||
// not listed in your fields, you'll get it anyway.
|
||||
// This will randomize your field ordering, which isn't
|
||||
// ideal. If you care, list the timestamp field.
|
||||
if p.Timestamp != "" {
|
||||
// quick list-to-set-to-list implementation
|
||||
fieldSet := make(map[string]bool)
|
||||
for k := range fieldList {
|
||||
fieldSet[fieldList[k]] = true
|
||||
}
|
||||
fieldSet[p.Timestamp] = true
|
||||
var newList []string
|
||||
for s := range fieldSet {
|
||||
newList = append(newList, s)
|
||||
}
|
||||
fieldList = newList
|
||||
}
|
||||
} else {
|
||||
for k := range data {
|
||||
// Otherwise, that which is not a tag is a field
|
||||
if _, ok := tags[k]; !ok {
|
||||
fieldList = append(fieldList, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
// We need to flatten out our fields. The default (the separator
|
||||
// string is empty) is equivalent to what streamreactor does.
|
||||
sep := flatten.SeparatorStyle{
|
||||
Before: "",
|
||||
Middle: p.FieldSeparator,
|
||||
After: "",
|
||||
}
|
||||
for _, fld := range fieldList {
|
||||
candidate := make(map[string]interface{})
|
||||
candidate[fld] = data[fld] // 1-item map
|
||||
flat, err := flatten.Flatten(candidate, "", sep)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("flatten field %q failed: %w", fld, err)
|
||||
}
|
||||
for k, v := range flat {
|
||||
fields[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
var schemaObj map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(schema), &schemaObj); err != nil {
|
||||
return nil, fmt.Errorf("unmarshaling schema failed: %w", err)
|
||||
}
|
||||
if len(fields) == 0 {
|
||||
// A telegraf metric needs at least one field.
|
||||
return nil, errors.New("number of fields is 0; unable to create metric")
|
||||
}
|
||||
// Now some fancy stuff to extract the measurement.
|
||||
// If it's set in the configuration, use that.
|
||||
name := p.Measurement
|
||||
separator := "."
|
||||
if name == "" {
|
||||
// Try using the namespace defined in the schema. In case there
|
||||
// is none, just use the schema's name definition.
|
||||
nsStr, ok := schemaObj["namespace"].(string)
|
||||
// namespace is optional
|
||||
if !ok {
|
||||
separator = ""
|
||||
}
|
||||
|
||||
nStr, ok := schemaObj["name"].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("could not determine name from schema %s", schema)
|
||||
}
|
||||
name = nsStr + separator + nStr
|
||||
}
|
||||
// Still don't have a name? Guess we should use the metric name if
|
||||
// it's set.
|
||||
if name == "" {
|
||||
name = p.MetricName
|
||||
}
|
||||
// Nothing? Give up.
|
||||
if name == "" {
|
||||
return nil, errors.New("could not determine measurement name")
|
||||
}
|
||||
var timestamp time.Time
|
||||
if p.Timestamp != "" {
|
||||
rawTime := fmt.Sprintf("%v", fields[p.Timestamp])
|
||||
var err error
|
||||
timestamp, err = internal.ParseTimestamp(p.TimestampFormat, rawTime, "")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse '%s' to '%s'", rawTime, p.TimestampFormat)
|
||||
}
|
||||
} else {
|
||||
timestamp = time.Now()
|
||||
}
|
||||
return metric.New(name, tags, fields, timestamp), nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
parsers.Add("avro",
|
||||
func(defaultMetricName string) telegraf.Parser {
|
||||
return &Parser{MetricName: defaultMetricName}
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,82 @@
|
|||
package avro
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/config"
|
||||
"github.com/influxdata/telegraf/plugins/inputs"
|
||||
"github.com/influxdata/telegraf/plugins/inputs/file"
|
||||
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
)
|
||||
|
||||
func TestCases(t *testing.T) {
|
||||
// Get all directories in testdata
|
||||
folders, err := os.ReadDir("testdata")
|
||||
require.NoError(t, err)
|
||||
// Make sure testdata contains data
|
||||
require.NotEmpty(t, folders)
|
||||
|
||||
// Set up for file inputs
|
||||
inputs.Add("file", func() telegraf.Input {
|
||||
return &file.File{}
|
||||
})
|
||||
|
||||
for _, f := range folders {
|
||||
fname := f.Name()
|
||||
testdataPath := filepath.Join("testdata", fname)
|
||||
configFilename := filepath.Join(testdataPath, "telegraf.conf")
|
||||
expectedFilename := filepath.Join(testdataPath, "expected.out")
|
||||
expectedErrorFilename := filepath.Join(testdataPath, "expected.err")
|
||||
|
||||
t.Run(fname, func(t *testing.T) {
|
||||
// Get parser to parse expected output
|
||||
testdataParser := &influx.Parser{}
|
||||
require.NoError(t, testdataParser.Init())
|
||||
|
||||
var expected []telegraf.Metric
|
||||
if _, err := os.Stat(expectedFilename); err == nil {
|
||||
var err error
|
||||
expected, err = testutil.ParseMetricsFromFile(expectedFilename, testdataParser)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Read the expected errors if any
|
||||
var expectedErrors []string
|
||||
|
||||
if _, err := os.Stat(expectedErrorFilename); err == nil {
|
||||
var err error
|
||||
expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, expectedErrors)
|
||||
}
|
||||
|
||||
// Set up error catching
|
||||
var acc testutil.Accumulator
|
||||
var actualErrors []string
|
||||
var actual []telegraf.Metric
|
||||
|
||||
// Configure the plugin
|
||||
cfg := config.NewConfig()
|
||||
err := cfg.LoadConfig(configFilename)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, input := range cfg.Inputs {
|
||||
require.NoError(t, input.Init())
|
||||
|
||||
if err := input.Gather(&acc); err != nil {
|
||||
actualErrors = append(actualErrors, err.Error())
|
||||
}
|
||||
}
|
||||
require.ElementsMatch(t, actualErrors, expectedErrors)
|
||||
actual = acc.GetTelegrafMetrics()
|
||||
// Process expected metrics and compare with resulting metrics
|
||||
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
package avro
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/linkedin/goavro/v2"
|
||||
)
|
||||
|
||||
type schemaAndCodec struct {
|
||||
Schema string
|
||||
Codec *goavro.Codec
|
||||
}
|
||||
|
||||
type schemaRegistry struct {
|
||||
url string
|
||||
cache map[int]*schemaAndCodec
|
||||
}
|
||||
|
||||
const schemaByID = "%s/schemas/ids/%d"
|
||||
|
||||
func newSchemaRegistry(url string) *schemaRegistry {
|
||||
return &schemaRegistry{url: url, cache: make(map[int]*schemaAndCodec)}
|
||||
}
|
||||
|
||||
func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
|
||||
if v, ok := sr.cache[id]; ok {
|
||||
return v, nil
|
||||
}
|
||||
resp, err := http.Get(fmt.Sprintf(schemaByID, sr.url, id))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var jsonResponse map[string]interface{}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&jsonResponse); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
schema, ok := jsonResponse["schema"]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("malformed respose from schema registry: no 'schema' key")
|
||||
}
|
||||
|
||||
schemaValue, ok := schema.(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("malformed respose from schema registry: %v cannot be cast to string", schema)
|
||||
}
|
||||
codec, err := goavro.NewCodec(schemaValue)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
retval := &schemaAndCodec{Schema: schemaValue, Codec: codec}
|
||||
sr.cache[id] = retval
|
||||
return retval, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
could not instantiate parser: exactly one of 'schema_registry' or 'schema' must be specified
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
[[ inputs.file ]]
|
||||
files = ["./testdata/config-both/message.avro"]
|
||||
data_format = "avro"
|
||||
|
||||
avro_measurement = "measurement"
|
||||
avro_tags = [ "tag" ]
|
||||
avro_schema_registry = "https://localhost:8081"
|
||||
avro_schema = '''
|
||||
{
|
||||
"type":"record",
|
||||
"name":"Value",
|
||||
"namespace":"com.example",
|
||||
"fields":[
|
||||
{
|
||||
"name":"tag",
|
||||
"type":"string"
|
||||
},
|
||||
{
|
||||
"name":"field",
|
||||
"type":"long"
|
||||
},
|
||||
{
|
||||
"name":"timestamp",
|
||||
"type":"long"
|
||||
}
|
||||
]
|
||||
}
|
||||
'''
|
||||
|
|
@ -0,0 +1,2 @@
|
|||
could not instantiate parser: exactly one of 'schema_registry' or 'schema' must be specified
|
||||
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
[[ inputs.file ]]
|
||||
files = ["./testdata/config-neither/message.avro"]
|
||||
data_format = "avro"
|
||||
avro_measurement = "measurement"
|
||||
avro_tags = [ "tag" ]
|
||||
|
|
@ -0,0 +1 @@
|
|||
could not instantiate parser: if 'timestamp' field is specified, 'timestamp_format' must be as well
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
[[ inputs.file ]]
|
||||
files = ["./testdata/no-timestamp-format/message.avro"]
|
||||
data_format = "avro"
|
||||
|
||||
avro_measurement = "measurement"
|
||||
avro_tags = [ "tag" ]
|
||||
avro_timestamp = "timestamp"
|
||||
avro_schema = '''
|
||||
{
|
||||
"type":"record",
|
||||
"name":"Value",
|
||||
"namespace":"com.example",
|
||||
"fields":[
|
||||
{
|
||||
"name":"tag",
|
||||
"type":"string"
|
||||
},
|
||||
{
|
||||
"name":"field",
|
||||
"type":"long"
|
||||
},
|
||||
{
|
||||
"name":"timestamp",
|
||||
"type":"long"
|
||||
}
|
||||
]
|
||||
}
|
||||
'''
|
||||
|
|
@ -0,0 +1 @@
|
|||
measurement,tag=test_tag field=19i,timestamp=1664296121000000i 1664296121000000
|
||||
|
|
@ -0,0 +1 @@
|
|||
test_tag&€<>¿±äêô
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
[[ inputs.file ]]
|
||||
files = ["./testdata/supplied_timestamp/message.avro"]
|
||||
data_format = "avro"
|
||||
avro_measurement = "measurement"
|
||||
avro_tags = [ "tag" ]
|
||||
avro_timestamp = "timestamp"
|
||||
avro_timestamp_format = "unix_us"
|
||||
avro_schema = '''
|
||||
{
|
||||
"type":"record",
|
||||
"name":"Value",
|
||||
"namespace":"com.example",
|
||||
"fields":[
|
||||
{
|
||||
"name":"tag",
|
||||
"type":"string"
|
||||
},
|
||||
{
|
||||
"name":"field",
|
||||
"type":"long"
|
||||
},
|
||||
{
|
||||
"name":"timestamp",
|
||||
"type":"long"
|
||||
}
|
||||
]
|
||||
}
|
||||
'''
|
||||
1
plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/expected.out
vendored
Normal file
1
plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/expected.out
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
measurement,tag=test_tag field=19i,timestamp=1664296121000000i 1664296121000000
|
||||
1
plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/message.avro
vendored
Normal file
1
plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/message.avro
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
test_tag&€<>¿±äêô
|
||||
29
plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/telegraf.conf
vendored
Normal file
29
plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/telegraf.conf
vendored
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
[[ inputs.file ]]
|
||||
files = ["./testdata/supplied_timestamp_fields_specified/message.avro"]
|
||||
data_format = "avro"
|
||||
avro_measurement = "measurement"
|
||||
avro_tags = [ "tag" ]
|
||||
avro_fields = [ "field", "timestamp"]
|
||||
avro_timestamp = "timestamp"
|
||||
avro_timestamp_format = "unix_us"
|
||||
avro_schema = '''
|
||||
{
|
||||
"type":"record",
|
||||
"name":"Value",
|
||||
"namespace":"com.example",
|
||||
"fields":[
|
||||
{
|
||||
"name":"tag",
|
||||
"type":"string"
|
||||
},
|
||||
{
|
||||
"name":"field",
|
||||
"type":"long"
|
||||
},
|
||||
{
|
||||
"name":"timestamp",
|
||||
"type":"long"
|
||||
}
|
||||
]
|
||||
}
|
||||
'''
|
||||
1
plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/expected.out
vendored
Normal file
1
plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/expected.out
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
measurement,tag=test_tag field=19i,timestamp=1664296121000000i 1664296121000000
|
||||
1
plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/message.avro
vendored
Normal file
1
plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/message.avro
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
test_tag&€<>¿±äêô
|
||||
29
plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/telegraf.conf
vendored
Normal file
29
plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/telegraf.conf
vendored
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
[[ inputs.file ]]
|
||||
files = ["./testdata/supplied_timestamp_fields_unspecified/message.avro"]
|
||||
data_format = "avro"
|
||||
avro_measurement = "measurement"
|
||||
avro_tags = [ "tag" ]
|
||||
avro_fields = [ "field" ]
|
||||
avro_timestamp = "timestamp"
|
||||
avro_timestamp_format = "unix_us"
|
||||
avro_schema = '''
|
||||
{
|
||||
"type":"record",
|
||||
"name":"Value",
|
||||
"namespace":"com.example",
|
||||
"fields":[
|
||||
{
|
||||
"name":"tag",
|
||||
"type":"string"
|
||||
},
|
||||
{
|
||||
"name":"field",
|
||||
"type":"long"
|
||||
},
|
||||
{
|
||||
"name":"timestamp",
|
||||
"type":"long"
|
||||
}
|
||||
]
|
||||
}
|
||||
'''
|
||||
|
|
@ -68,7 +68,7 @@ type ParserCompatibility interface {
|
|||
// Config is a struct that covers the data types needed for all parser types,
|
||||
// and can be used to instantiate _any_ of the parsers.
|
||||
type Config struct {
|
||||
// DataFormat can be one of: json, influx, graphite, value, nagios
|
||||
// DataFormat can be one of: avro, json, influx, graphite, value, nagios
|
||||
DataFormat string `toml:"data_format"`
|
||||
|
||||
// Separator only applied to Graphite data.
|
||||
|
|
|
|||
|
|
@ -45,8 +45,7 @@ func TestRegistry_BackwardCompatibility(t *testing.T) {
|
|||
}
|
||||
|
||||
// Define parsers that do not have an old-school init
|
||||
newStyleOnly := []string{"binary"}
|
||||
|
||||
newStyleOnly := []string{"binary", "avro"}
|
||||
for name, creator := range parsers.Parsers {
|
||||
if choice.Contains(name, newStyleOnly) {
|
||||
t.Logf("skipping new-style-only %q...", name)
|
||||
|
|
|
|||
Loading…
Reference in New Issue