diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 668138d89..c86b72637 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -2,9 +2,10 @@ Telegraf contains many general purpose plugins that support parsing input data using a configurable parser into [metrics][]. This allows, for example, the -`kafka_consumer` input plugin to process messages in either InfluxDB Line -Protocol or in JSON format. +`kafka_consumer` input plugin to process messages in any of InfluxDB Line +Protocol, JSON format, or Apache Avro format. +- [Avro](/plugins/parsers/avro) - [Binary](/plugins/parsers/binary) - [Collectd](/plugins/parsers/collectd) - [CSV](/plugins/parsers/csv) diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 9a63ed7ce..a3dec1465 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -198,6 +198,7 @@ following works: - github.com/jcmturner/gofork [BSD 3-Clause "New" or "Revised" License](https://github.com/jcmturner/gofork/blob/master/LICENSE) - github.com/jcmturner/gokrb5 [Apache License 2.0](https://github.com/jcmturner/gokrb5/blob/master/LICENSE) - github.com/jcmturner/rpc [Apache License 2.0](https://github.com/jcmturner/rpc/blob/master/LICENSE) +- github.com/jeremywohl/flatten [MIT License](https://github.com/jeremywohl/flatten/blob/master/LICENSE) - github.com/jhump/protoreflect [Apache License 2.0](https://github.com/jhump/protoreflect/blob/master/LICENSE) - github.com/jmespath/go-jmespath [Apache License 2.0](https://github.com/jmespath/go-jmespath/blob/master/LICENSE) - github.com/josharian/intern [MIT License](https://github.com/josharian/intern/blob/master/LICENSE.md) @@ -212,6 +213,7 @@ following works: - github.com/kolo/xmlrpc [MIT License](https://github.com/kolo/xmlrpc/blob/master/LICENSE) - github.com/kylelemons/godebug [Apache License 2.0](https://github.com/kylelemons/godebug/blob/master/LICENSE) - github.com/leodido/ragel-machinery [MIT License](https://github.com/leodido/ragel-machinery/blob/develop/LICENSE) +- github.com/linkedin/goavro [Apache License 2.0](https://github.com/linkedin/goavro/blob/master/LICENSE) - github.com/logzio/azure-monitor-metrics-receiver [MIT License](https://github.com/logzio/azure-monitor-metrics-receiver/blob/master/LICENSE) - github.com/magiconair/properties [BSD 2-Clause "Simplified" License](https://github.com/magiconair/properties/blob/main/LICENSE.md) - github.com/mailru/easyjson [MIT License](https://github.com/mailru/easyjson/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 84bf39b81..f243f82c6 100644 --- a/go.mod +++ b/go.mod @@ -108,12 +108,14 @@ require ( github.com/jackc/pgtype v1.12.0 github.com/jackc/pgx/v4 v4.17.1 github.com/james4k/rcon v0.0.0-20120923215419-8fbb8268b60a + github.com/jeremywohl/flatten/v2 v2.0.0-20211013061545-07e4a09fb8e4 github.com/jhump/protoreflect v1.8.3-0.20210616212123-6cc1efa697ca github.com/jmespath/go-jmespath v0.4.0 github.com/kardianos/service v1.2.2 github.com/karrick/godirwalk v1.17.0 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/kolo/xmlrpc v0.0.0-20201022064351-38db28db192b + github.com/linkedin/goavro/v2 v2.12.0 github.com/logzio/azure-monitor-metrics-receiver v1.0.0 github.com/lxc/lxd v0.0.0-20220920163450-e9b4b514106a github.com/matttproud/golang_protobuf_extensions v1.0.4 diff --git a/go.sum b/go.sum index 56213b308..3d97ce8ee 100644 --- a/go.sum +++ b/go.sum @@ -1383,6 +1383,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/jeremywohl/flatten/v2 v2.0.0-20211013061545-07e4a09fb8e4 h1:eA9wi6ZzpIRobvXkn/S2Lyw1hr2pc71zxzOPl7Xjs4w= +github.com/jeremywohl/flatten/v2 v2.0.0-20211013061545-07e4a09fb8e4/go.mod h1:s9g9Dfls+aEgucKXKW+i8MRZuLXT2MrD/WjYpMnWfOw= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jgautheron/goconst v1.4.0/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= github.com/jhump/protoreflect v1.6.0/go.mod h1:eaTn3RZAmMBcV0fifFvlm6VHNz3wSkYyXYWUh7ymB74= @@ -1530,6 +1532,8 @@ github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= +github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBWXslg= +github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/logzio/azure-monitor-metrics-receiver v1.0.0 h1:TAzhIZL2ueyyc81qIw8FGg4nUbts4Hvc3oOxSobY1IA= github.com/logzio/azure-monitor-metrics-receiver v1.0.0/go.mod h1:UIaQ7UgxZ8jO3L0JB2hctsHFBbZqL6mbxYscQAeFpl4= diff --git a/plugins/parsers/all/avro.go b/plugins/parsers/all/avro.go new file mode 100644 index 000000000..4dfca9a0a --- /dev/null +++ b/plugins/parsers/all/avro.go @@ -0,0 +1,5 @@ +//go:build !custom || parsers || parsers.avro + +package all + +import _ "github.com/influxdata/telegraf/plugins/parsers/avro" // register plugin diff --git a/plugins/parsers/avro/README.md b/plugins/parsers/avro/README.md new file mode 100644 index 000000000..9e5d6e7e6 --- /dev/null +++ b/plugins/parsers/avro/README.md @@ -0,0 +1,108 @@ +# Avro Parser Plugin + +The `Avro` parser creates metrics from a message serialized with Avro. + +The message is supposed to be encoded as follows: + +| Bytes | Area | Description | +| ----- | ---------- | ------------------------------------------------ | +| 0 | Magic Byte | Confluent serialization format version number. | +| 1-4 | Schema ID | 4-byte schema ID as returned by Schema Registry. | +| 5- | Data | Serialized data. | + +## Configuration + +```toml +[[inputs.kafka_consumer]] + ## Kafka brokers. + brokers = ["localhost:9092"] + + ## Topics to consume. + topics = ["telegraf"] + + ## Maximum length of a message to consume, in bytes (default 0/unlimited); + ## larger messages are dropped + max_message_len = 1000000 + + ## Avro data format settings + data_format = "avro" + + ## Url of the schema registry; exactly one of schema registry and + ## schema must be set + avro_schema_registry = "http://localhost:8081" + + ## Schema string; exactly one of schema registry and schema must be set + #avro_schema = """ + # { + # "type":"record", + # "name":"Value", + # "namespace":"com.example", + # "fields":[ + # { + # "name":"tag", + # "type":"string" + # }, + # { + # "name":"field", + # "type":"long" + # }, + # { + # "name":"timestamp", + # "type":"long" + # } + # ] + # } + #""" + + ## Measurement string; if not set, determine measurement name from + ## schema (as ".") + # avro_measurement = "ratings" + + ## Avro fields to be used as tags; optional. + # avro_tags = ["CHANNEL", "CLUB_STATUS"] + + ## Avro fields to be used as fields; if empty, any Avro fields + ## detected from the schema, not used as tags, will be used as + ## measurement fields. + # avro_fields = ["STARS"] + + ## Avro fields to be used as timestamp; if empty, current time will + ## be used for the measurement timestamp. + # avro_timestamp = "" + ## If avro_timestamp is specified, avro_timestamp_format must be set + ## to one of 'unix', 'unix_ms', 'unix_us', or 'unix_ns' + # avro_timestamp_format = "unix" + + ## Used to separate parts of array structures. As above, the default + ## is the empty string, so a=["a", "b"] becomes a0="a", a1="b". + ## If this were set to "_", then it would be a_0="a", a_1="b". + # avro_field_separator = "_" + + ## Default values for given tags: optional + # tags = { "application": "hermes", "region": "central" } + +``` + +### `avro_timestamp` and `avro_timestamp_format` + +By default the current time at ingestion will be used for all created +metrics; to set the time using the Avro message you can use the +`avro_timestamp` and `avro_timestamp_format` options together to set the +time to a value in the parsed document. + +The `avro_timestamp` option specifies the field containing the time +value. If it is not set, the time of record ingestion is used. If it +is set, the field may be any numerical type: notably, it is *not* +constrained to an Avro `long` (int64) (which Avro uses for timestamps in +millisecond or microsecond resolution). However, it must represent the +number of time increments since the Unix epoch (00:00 UTC 1 Jan 1970). + +The `avro_timestamp_format` option specifies the precision of the timestamp +field, and, if set, must be one of `unix`, `unix_ms`, `unix_us`, or +`unix_ns`. If `avro_timestamp` is set, `avro_timestamp_format` must be +as well. + +## Metrics + +One metric is created for each message. The type of each field is +automatically determined based on the schema. diff --git a/plugins/parsers/avro/parser.go b/plugins/parsers/avro/parser.go new file mode 100644 index 000000000..99a0fbbd6 --- /dev/null +++ b/plugins/parsers/avro/parser.go @@ -0,0 +1,255 @@ +package avro + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/jeremywohl/flatten/v2" + "github.com/linkedin/goavro/v2" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers" +) + +// If SchemaRegistry is set, we assume that our input will be in +// Confluent Wire Format +// (https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format) and we will load the schema from the registry. + +// If Schema is set, we assume the input will be Avro binary format, without +// an attached schema or schema fingerprint + +type Parser struct { + MetricName string `toml:"metric_name"` + SchemaRegistry string `toml:"avro_schema_registry"` + Schema string `toml:"avro_schema"` + Measurement string `toml:"avro_measurement"` + Tags []string `toml:"avro_tags"` + Fields []string `toml:"avro_fields"` + Timestamp string `toml:"avro_timestamp"` + TimestampFormat string `toml:"avro_timestamp_format"` + FieldSeparator string `toml:"avro_field_separator"` + DefaultTags map[string]string `toml:"tags"` + + Log telegraf.Logger `toml:"-"` + registryObj *schemaRegistry +} + +func (p *Parser) Init() error { + if (p.Schema == "" && p.SchemaRegistry == "") || (p.Schema != "" && p.SchemaRegistry != "") { + return errors.New("exactly one of 'schema_registry' or 'schema' must be specified") + } + if p.TimestampFormat == "" { + if p.Timestamp != "" { + return errors.New("if 'timestamp' field is specified, 'timestamp_format' must be as well") + } + if p.TimestampFormat != "unix" && p.TimestampFormat != "unix_us" && p.TimestampFormat != "unix_ms" && p.TimestampFormat != "unix_ns" { + return fmt.Errorf("invalid timestamp format '%v'", p.TimestampFormat) + } + } + if p.SchemaRegistry != "" { + p.registryObj = newSchemaRegistry(p.SchemaRegistry) + } + return nil +} + +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + var schema string + var codec *goavro.Codec + var err error + var message []byte + message = buf[:] + + if p.registryObj != nil { + // The input must be Confluent Wire Protocol + if buf[0] != 0 { + return nil, errors.New("first byte is not 0: not Confluent Wire Protocol") + } + schemaID := int(binary.BigEndian.Uint32(buf[1:5])) + schemastruct, err := p.registryObj.getSchemaAndCodec(schemaID) + if err != nil { + return nil, err + } + schema = schemastruct.Schema + codec = schemastruct.Codec + message = buf[5:] + } else { + // Check for single-object encoding + magicBytes := int(binary.BigEndian.Uint16(buf[:2])) + expectedMagic := int(binary.BigEndian.Uint16([]byte("c301"))) + if magicBytes == expectedMagic { + message = buf[10:] + // We could in theory validate the fingerprint against + // the schema. Maybe later. + // We would get the fingerprint as int(binary.LittleEndian.Uint64(buf[2:10])) + } // Otherwise we assume bare Avro binary + schema = p.Schema + codec, err = goavro.NewCodec(schema) + if err != nil { + return nil, err + } + } + native, _, err := codec.NativeFromBinary(message) + if err != nil { + return nil, err + } + // Cast to string-to-interface + codecSchema, ok := native.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("native is of unsupported type %T", native) + } + m, err := p.createMetric(codecSchema, schema) + if err != nil { + return nil, err + } + + return []telegraf.Metric{m}, nil +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line)) + if err != nil { + return nil, err + } + + if len(metrics) != 1 { + return nil, errors.New("line contains multiple metrics") + } + + return metrics[0], nil +} + +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} + +func (p *Parser) createMetric(data map[string]interface{}, schema string) (telegraf.Metric, error) { + // Tags differ from fields, in that tags are inherently strings. + // fields can be of any type. + fields := make(map[string]interface{}) + tags := make(map[string]string) + + // Set default tag values + for k, v := range p.DefaultTags { + tags[k] = v + } + // Avro doesn't have a Tag/Field distinction, so we have to tell + // Telegraf which items are our tags. + for _, tag := range p.Tags { + sTag, err := internal.ToString(data[tag]) + if err != nil { + p.Log.Warnf("Could not convert %v to string for tag %q: %v", data[tag], tag, err) + continue + } + tags[tag] = sTag + } + var fieldList []string + if len(p.Fields) != 0 { + // If you have specified your fields in the config, you + // get what you asked for. + fieldList = p.Fields + + // Except...if you specify the timestamp field, and it's + // not listed in your fields, you'll get it anyway. + // This will randomize your field ordering, which isn't + // ideal. If you care, list the timestamp field. + if p.Timestamp != "" { + // quick list-to-set-to-list implementation + fieldSet := make(map[string]bool) + for k := range fieldList { + fieldSet[fieldList[k]] = true + } + fieldSet[p.Timestamp] = true + var newList []string + for s := range fieldSet { + newList = append(newList, s) + } + fieldList = newList + } + } else { + for k := range data { + // Otherwise, that which is not a tag is a field + if _, ok := tags[k]; !ok { + fieldList = append(fieldList, k) + } + } + } + // We need to flatten out our fields. The default (the separator + // string is empty) is equivalent to what streamreactor does. + sep := flatten.SeparatorStyle{ + Before: "", + Middle: p.FieldSeparator, + After: "", + } + for _, fld := range fieldList { + candidate := make(map[string]interface{}) + candidate[fld] = data[fld] // 1-item map + flat, err := flatten.Flatten(candidate, "", sep) + if err != nil { + return nil, fmt.Errorf("flatten field %q failed: %w", fld, err) + } + for k, v := range flat { + fields[k] = v + } + } + + var schemaObj map[string]interface{} + if err := json.Unmarshal([]byte(schema), &schemaObj); err != nil { + return nil, fmt.Errorf("unmarshaling schema failed: %w", err) + } + if len(fields) == 0 { + // A telegraf metric needs at least one field. + return nil, errors.New("number of fields is 0; unable to create metric") + } + // Now some fancy stuff to extract the measurement. + // If it's set in the configuration, use that. + name := p.Measurement + separator := "." + if name == "" { + // Try using the namespace defined in the schema. In case there + // is none, just use the schema's name definition. + nsStr, ok := schemaObj["namespace"].(string) + // namespace is optional + if !ok { + separator = "" + } + + nStr, ok := schemaObj["name"].(string) + if !ok { + return nil, fmt.Errorf("could not determine name from schema %s", schema) + } + name = nsStr + separator + nStr + } + // Still don't have a name? Guess we should use the metric name if + // it's set. + if name == "" { + name = p.MetricName + } + // Nothing? Give up. + if name == "" { + return nil, errors.New("could not determine measurement name") + } + var timestamp time.Time + if p.Timestamp != "" { + rawTime := fmt.Sprintf("%v", fields[p.Timestamp]) + var err error + timestamp, err = internal.ParseTimestamp(p.TimestampFormat, rawTime, "") + if err != nil { + return nil, fmt.Errorf("could not parse '%s' to '%s'", rawTime, p.TimestampFormat) + } + } else { + timestamp = time.Now() + } + return metric.New(name, tags, fields, timestamp), nil +} + +func init() { + parsers.Add("avro", + func(defaultMetricName string) telegraf.Parser { + return &Parser{MetricName: defaultMetricName} + }) +} diff --git a/plugins/parsers/avro/parser_test.go b/plugins/parsers/avro/parser_test.go new file mode 100644 index 000000000..0fc599bdc --- /dev/null +++ b/plugins/parsers/avro/parser_test.go @@ -0,0 +1,82 @@ +package avro + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/inputs/file" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/testutil" +) + +func TestCases(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testdata") + require.NoError(t, err) + // Make sure testdata contains data + require.NotEmpty(t, folders) + + // Set up for file inputs + inputs.Add("file", func() telegraf.Input { + return &file.File{} + }) + + for _, f := range folders { + fname := f.Name() + testdataPath := filepath.Join("testdata", fname) + configFilename := filepath.Join(testdataPath, "telegraf.conf") + expectedFilename := filepath.Join(testdataPath, "expected.out") + expectedErrorFilename := filepath.Join(testdataPath, "expected.err") + + t.Run(fname, func(t *testing.T) { + // Get parser to parse expected output + testdataParser := &influx.Parser{} + require.NoError(t, testdataParser.Init()) + + var expected []telegraf.Metric + if _, err := os.Stat(expectedFilename); err == nil { + var err error + expected, err = testutil.ParseMetricsFromFile(expectedFilename, testdataParser) + require.NoError(t, err) + } + + // Read the expected errors if any + var expectedErrors []string + + if _, err := os.Stat(expectedErrorFilename); err == nil { + var err error + expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename) + require.NoError(t, err) + require.NotEmpty(t, expectedErrors) + } + + // Set up error catching + var acc testutil.Accumulator + var actualErrors []string + var actual []telegraf.Metric + + // Configure the plugin + cfg := config.NewConfig() + err := cfg.LoadConfig(configFilename) + require.NoError(t, err) + + for _, input := range cfg.Inputs { + require.NoError(t, input.Init()) + + if err := input.Gather(&acc); err != nil { + actualErrors = append(actualErrors, err.Error()) + } + } + require.ElementsMatch(t, actualErrors, expectedErrors) + actual = acc.GetTelegrafMetrics() + // Process expected metrics and compare with resulting metrics + testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime()) + }) + } +} diff --git a/plugins/parsers/avro/schema_registry.go b/plugins/parsers/avro/schema_registry.go new file mode 100644 index 000000000..8b352b7ab --- /dev/null +++ b/plugins/parsers/avro/schema_registry.go @@ -0,0 +1,58 @@ +package avro + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/linkedin/goavro/v2" +) + +type schemaAndCodec struct { + Schema string + Codec *goavro.Codec +} + +type schemaRegistry struct { + url string + cache map[int]*schemaAndCodec +} + +const schemaByID = "%s/schemas/ids/%d" + +func newSchemaRegistry(url string) *schemaRegistry { + return &schemaRegistry{url: url, cache: make(map[int]*schemaAndCodec)} +} + +func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) { + if v, ok := sr.cache[id]; ok { + return v, nil + } + resp, err := http.Get(fmt.Sprintf(schemaByID, sr.url, id)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var jsonResponse map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&jsonResponse); err != nil { + return nil, err + } + + schema, ok := jsonResponse["schema"] + if !ok { + return nil, fmt.Errorf("malformed respose from schema registry: no 'schema' key") + } + + schemaValue, ok := schema.(string) + if !ok { + return nil, fmt.Errorf("malformed respose from schema registry: %v cannot be cast to string", schema) + } + codec, err := goavro.NewCodec(schemaValue) + if err != nil { + return nil, err + } + retval := &schemaAndCodec{Schema: schemaValue, Codec: codec} + sr.cache[id] = retval + return retval, nil +} diff --git a/plugins/parsers/avro/testdata/config-both/expected.err b/plugins/parsers/avro/testdata/config-both/expected.err new file mode 100644 index 000000000..e465188d9 --- /dev/null +++ b/plugins/parsers/avro/testdata/config-both/expected.err @@ -0,0 +1,4 @@ +could not instantiate parser: exactly one of 'schema_registry' or 'schema' must be specified + + + diff --git a/plugins/parsers/avro/testdata/config-both/expected.out b/plugins/parsers/avro/testdata/config-both/expected.out new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/parsers/avro/testdata/config-both/message.avro b/plugins/parsers/avro/testdata/config-both/message.avro new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/parsers/avro/testdata/config-both/telegraf.conf b/plugins/parsers/avro/testdata/config-both/telegraf.conf new file mode 100644 index 000000000..61cba90f2 --- /dev/null +++ b/plugins/parsers/avro/testdata/config-both/telegraf.conf @@ -0,0 +1,28 @@ +[[ inputs.file ]] + files = ["./testdata/config-both/message.avro"] + data_format = "avro" + + avro_measurement = "measurement" + avro_tags = [ "tag" ] + avro_schema_registry = "https://localhost:8081" + avro_schema = ''' +{ + "type":"record", + "name":"Value", + "namespace":"com.example", + "fields":[ + { + "name":"tag", + "type":"string" + }, + { + "name":"field", + "type":"long" + }, + { + "name":"timestamp", + "type":"long" + } + ] +} +''' diff --git a/plugins/parsers/avro/testdata/config-neither/expected.err b/plugins/parsers/avro/testdata/config-neither/expected.err new file mode 100644 index 000000000..a2b4d5f86 --- /dev/null +++ b/plugins/parsers/avro/testdata/config-neither/expected.err @@ -0,0 +1,2 @@ +could not instantiate parser: exactly one of 'schema_registry' or 'schema' must be specified + diff --git a/plugins/parsers/avro/testdata/config-neither/expected.out b/plugins/parsers/avro/testdata/config-neither/expected.out new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/parsers/avro/testdata/config-neither/message.avro b/plugins/parsers/avro/testdata/config-neither/message.avro new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/parsers/avro/testdata/config-neither/telegraf.conf b/plugins/parsers/avro/testdata/config-neither/telegraf.conf new file mode 100644 index 000000000..e52128df6 --- /dev/null +++ b/plugins/parsers/avro/testdata/config-neither/telegraf.conf @@ -0,0 +1,5 @@ +[[ inputs.file ]] + files = ["./testdata/config-neither/message.avro"] + data_format = "avro" + avro_measurement = "measurement" + avro_tags = [ "tag" ] diff --git a/plugins/parsers/avro/testdata/no-timestamp-format/expected.err b/plugins/parsers/avro/testdata/no-timestamp-format/expected.err new file mode 100644 index 000000000..8bebe6fb3 --- /dev/null +++ b/plugins/parsers/avro/testdata/no-timestamp-format/expected.err @@ -0,0 +1 @@ +could not instantiate parser: if 'timestamp' field is specified, 'timestamp_format' must be as well diff --git a/plugins/parsers/avro/testdata/no-timestamp-format/expected.out b/plugins/parsers/avro/testdata/no-timestamp-format/expected.out new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/parsers/avro/testdata/no-timestamp-format/message.avro b/plugins/parsers/avro/testdata/no-timestamp-format/message.avro new file mode 100644 index 000000000..e69de29bb diff --git a/plugins/parsers/avro/testdata/no-timestamp-format/telegraf.conf b/plugins/parsers/avro/testdata/no-timestamp-format/telegraf.conf new file mode 100644 index 000000000..a5d21090f --- /dev/null +++ b/plugins/parsers/avro/testdata/no-timestamp-format/telegraf.conf @@ -0,0 +1,28 @@ +[[ inputs.file ]] + files = ["./testdata/no-timestamp-format/message.avro"] + data_format = "avro" + + avro_measurement = "measurement" + avro_tags = [ "tag" ] + avro_timestamp = "timestamp" + avro_schema = ''' +{ + "type":"record", + "name":"Value", + "namespace":"com.example", + "fields":[ + { + "name":"tag", + "type":"string" + }, + { + "name":"field", + "type":"long" + }, + { + "name":"timestamp", + "type":"long" + } + ] +} +''' diff --git a/plugins/parsers/avro/testdata/supplied_timestamp/expected.out b/plugins/parsers/avro/testdata/supplied_timestamp/expected.out new file mode 100644 index 000000000..c831d4ad4 --- /dev/null +++ b/plugins/parsers/avro/testdata/supplied_timestamp/expected.out @@ -0,0 +1 @@ +measurement,tag=test_tag field=19i,timestamp=1664296121000000i 1664296121000000 diff --git a/plugins/parsers/avro/testdata/supplied_timestamp/message.avro b/plugins/parsers/avro/testdata/supplied_timestamp/message.avro new file mode 100644 index 000000000..5e4d4d9b0 --- /dev/null +++ b/plugins/parsers/avro/testdata/supplied_timestamp/message.avro @@ -0,0 +1 @@ +test_tag& \ No newline at end of file diff --git a/plugins/parsers/avro/testdata/supplied_timestamp/telegraf.conf b/plugins/parsers/avro/testdata/supplied_timestamp/telegraf.conf new file mode 100644 index 000000000..ee711eae3 --- /dev/null +++ b/plugins/parsers/avro/testdata/supplied_timestamp/telegraf.conf @@ -0,0 +1,28 @@ +[[ inputs.file ]] + files = ["./testdata/supplied_timestamp/message.avro"] + data_format = "avro" + avro_measurement = "measurement" + avro_tags = [ "tag" ] + avro_timestamp = "timestamp" + avro_timestamp_format = "unix_us" + avro_schema = ''' +{ + "type":"record", + "name":"Value", + "namespace":"com.example", + "fields":[ + { + "name":"tag", + "type":"string" + }, + { + "name":"field", + "type":"long" + }, + { + "name":"timestamp", + "type":"long" + } + ] +} +''' diff --git a/plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/expected.out b/plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/expected.out new file mode 100644 index 000000000..c831d4ad4 --- /dev/null +++ b/plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/expected.out @@ -0,0 +1 @@ +measurement,tag=test_tag field=19i,timestamp=1664296121000000i 1664296121000000 diff --git a/plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/message.avro b/plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/message.avro new file mode 100644 index 000000000..5e4d4d9b0 --- /dev/null +++ b/plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/message.avro @@ -0,0 +1 @@ +test_tag& \ No newline at end of file diff --git a/plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/telegraf.conf b/plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/telegraf.conf new file mode 100644 index 000000000..9ae72b530 --- /dev/null +++ b/plugins/parsers/avro/testdata/supplied_timestamp_fields_specified/telegraf.conf @@ -0,0 +1,29 @@ +[[ inputs.file ]] + files = ["./testdata/supplied_timestamp_fields_specified/message.avro"] + data_format = "avro" + avro_measurement = "measurement" + avro_tags = [ "tag" ] + avro_fields = [ "field", "timestamp"] + avro_timestamp = "timestamp" + avro_timestamp_format = "unix_us" + avro_schema = ''' +{ + "type":"record", + "name":"Value", + "namespace":"com.example", + "fields":[ + { + "name":"tag", + "type":"string" + }, + { + "name":"field", + "type":"long" + }, + { + "name":"timestamp", + "type":"long" + } + ] +} +''' diff --git a/plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/expected.out b/plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/expected.out new file mode 100644 index 000000000..c831d4ad4 --- /dev/null +++ b/plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/expected.out @@ -0,0 +1 @@ +measurement,tag=test_tag field=19i,timestamp=1664296121000000i 1664296121000000 diff --git a/plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/message.avro b/plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/message.avro new file mode 100644 index 000000000..5e4d4d9b0 --- /dev/null +++ b/plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/message.avro @@ -0,0 +1 @@ +test_tag& \ No newline at end of file diff --git a/plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/telegraf.conf b/plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/telegraf.conf new file mode 100644 index 000000000..257a9ea69 --- /dev/null +++ b/plugins/parsers/avro/testdata/supplied_timestamp_fields_unspecified/telegraf.conf @@ -0,0 +1,29 @@ +[[ inputs.file ]] + files = ["./testdata/supplied_timestamp_fields_unspecified/message.avro"] + data_format = "avro" + avro_measurement = "measurement" + avro_tags = [ "tag" ] + avro_fields = [ "field" ] + avro_timestamp = "timestamp" + avro_timestamp_format = "unix_us" + avro_schema = ''' +{ + "type":"record", + "name":"Value", + "namespace":"com.example", + "fields":[ + { + "name":"tag", + "type":"string" + }, + { + "name":"field", + "type":"long" + }, + { + "name":"timestamp", + "type":"long" + } + ] +} +''' diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index b53b9ec7e..e6365eb03 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -68,7 +68,7 @@ type ParserCompatibility interface { // Config is a struct that covers the data types needed for all parser types, // and can be used to instantiate _any_ of the parsers. type Config struct { - // DataFormat can be one of: json, influx, graphite, value, nagios + // DataFormat can be one of: avro, json, influx, graphite, value, nagios DataFormat string `toml:"data_format"` // Separator only applied to Graphite data. diff --git a/plugins/parsers/registry_test.go b/plugins/parsers/registry_test.go index 50b08906e..30802daaa 100644 --- a/plugins/parsers/registry_test.go +++ b/plugins/parsers/registry_test.go @@ -45,8 +45,7 @@ func TestRegistry_BackwardCompatibility(t *testing.T) { } // Define parsers that do not have an old-school init - newStyleOnly := []string{"binary"} - + newStyleOnly := []string{"binary", "avro"} for name, creator := range parsers.Parsers { if choice.Contains(name, newStyleOnly) { t.Logf("skipping new-style-only %q...", name)