diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index cb04d3e00..668138d89 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -5,6 +5,7 @@ using a configurable parser into [metrics][]. This allows, for example, the `kafka_consumer` input plugin to process messages in either InfluxDB Line Protocol or in JSON format. +- [Binary](/plugins/parsers/binary) - [Collectd](/plugins/parsers/collectd) - [CSV](/plugins/parsers/csv) - [Dropwizard](/plugins/parsers/dropwizard) diff --git a/plugins/parsers/all/binary.go b/plugins/parsers/all/binary.go new file mode 100644 index 000000000..69227d3ef --- /dev/null +++ b/plugins/parsers/all/binary.go @@ -0,0 +1,5 @@ +//go:build !custom || parsers || parsers.binary + +package all + +import _ "github.com/influxdata/telegraf/plugins/parsers/binary" // register plugin diff --git a/plugins/parsers/binary/README.md b/plugins/parsers/binary/README.md new file mode 100644 index 000000000..b36cba6d0 --- /dev/null +++ b/plugins/parsers/binary/README.md @@ -0,0 +1,334 @@ +# Binary Parser Plugin + +The `binary` data format parser parses binary protocols into metrics using +user-specified configurations. + +## Configuration + +```toml +[[inputs.file]] + files = ["example.bin"] + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "binary" + + ## Do not error-out if none of the filter expressions below matches. + # allow_no_match = false + + ## Specify the endianess of the data. + ## Available values are "be" (big-endian), "le" (little-endian) and "host", + ## where "host" means the same endianess as the machine running Telegraf. + # endianess = "host" + + ## Multiple parsing sections are allowed + [[inputs.file.binary]] + ## Optional: Metric (measurement) name to use if not extracted from the data. + # metric_name = "my_name" + + ## Definition of the message format and the extracted data. + ## Please note that you need to define all elements of the data in the + ## correct order with the correct length as the data is parsed in the order + ## given. + ## An entry can have the following properties: + ## name -- Name of the element (e.g. field or tag). Can be omitted + ## for special assignments (i.e. time & measurement) or if + ## entry is omitted. + ## type -- Data-type of the entry. Can be "int8/16/32/64", "uint8/16/32/64", + ## "float32/64", "bool" and "string". + ## In case of time, this can be any of "unix" (default), "unix_ms", "unix_us", + ## "unix_ns" or a valid Golang time format. + ## bits -- Length in bits for this entry. If omitted, the length derived from + ## the "type" property will be used. For "time" 64-bit will be used + ## as default. + ## assignment -- Assignment of the gathered data. Can be "measurement", "time", + ## "field" or "tag". If omitted "field" is assumed. + ## omit -- Omit the given data. If true, the data is skipped and not added + ## to the metric. Omitted entries only need a length definition + ## via "bits" or "type". + ## terminator -- Terminator for dynamic-length strings. Only used for "string" type. + ## Valid values are "fixed" (fixed length string given by "bits"), + ## "null" (null-terminated string) or a character sequence specified + ## as HEX values (e.g. "0x0D0A"). Defaults to "fixed" for strings. + ## timezone -- Timezone of "time" entries. Only applies to "time" assignments. + ## Can be "utc", "local" or any valid Golang timezone (e.g. "Europe/Berlin") + entries = [ + { type = "string", assignment = "measurement", terminator: "null" }, + { name = "address", type = "uint16", assignment = "tag" }, + { name = "value", type = "float64" }, + { type = "unix", assignment = "time" }, + ] + + ## Optional: Filter evaluated before applying the configuration. + ## This option can be used to mange multiple configuration specific for + ## a certain message type. If no filter is given, the configuration is applied. + # [inputs.file.binary.filter] + # ## Filter message by the exact length in bytes (default: N/A). + # # length = 0 + # ## Filter the message by a minimum length in bytes. + # ## Messages longer of of equal length will pass. + # # length_min = 0 + # ## List of data parts to match. + # ## Only if all selected parts match, the configuration will be + # ## applied. The "offset" is the start of the data to match in bits, + # ## "bits" is the length in bits and "match" is the value to match + # ## against. Non-byte boundaries are supported, data is always right-aligned. + # selection = [ + # { offset = 0, bits = 8, match = "0x1F" }, + # ] + # + # +``` + +In this configuration mode, you explicitly specify the field and tags you want +to scrape out of your data. + +A configuration can contain multiple `binary` subsections for e.g. the file +plugin to process the binary data multiple times. This can be useful +(together with _filters_) to handle different message types. + +__Please note__: The `filter` section needs to be placed __after__ the `entries` +definitions due to TOML constraints as otherwise the entries will be assigned +to the filter section. + +### General options and remarks + +#### `allow_no_match` (optional) + +By specifying `allow_no_match` you allow the parser to silently ignore data +that does not match _any_ given configuration filter. This can be useful if +you only want to collect a subset of the available messages. + +#### `endianess` (optional) + +This specifies the endianess of the data. If not specified, the parser will +fallback to the "host" endianess, assuming that the message and Telegraf +machine share the same endianess. +Alternatively, you can explicitly specify big-endian format (`"be"`) or +little-endian format (`"le"`). + +### Non-byte aligned value extraction + +In both, `filter` and `entries` definitions, values can be extracted at non-byte +boundaries. You can for example extract 3-bit starting at bit-offset 8. In those +cases, the result will be masked and shifted such that the resulting byte-value +is _right_ aligned. In case your 3-bit are `101` the resulting byte value is +`0x05`. + +This is especially important when specifying the `match` value in the filter +section. + +### Entries definitions + +The entry array specifies how to dissect the message into the measurement name, +the timestamp, tags and fields. + +#### `measurement` specification + +When setting the `assignment` to `"measurement"`, the extracted value +will be used as the metric name, overriding other specifications. +The `type` setting is assumed to be `"string"` and can be omitted similar +to the `name` option. See [`string` type handling](#string-type-handling) +for details and further options. + +### `time` specification + +When setting the `assignment` to `"time"`, the extracted value +will be used as the timestamp of the metric. By default the current +time will be used for all created metrics. + +The `type` setting here contains the time-format can be set to `unix`, +`unix_ms`, `unix_us`, `unix_ns`, or an accepted +[Go "reference time"][time const]. Consult the Go [time][time parse] +package for details and additional examples on how to set the time format. +If `type` is omitted the `unix` format is assumed. + +For the `unix` format and derivatives, the underlying value is assumed +to be a 64-bit integer. The `bits` setting can be used to specify other +length settings. All other time-formats assume a fixed-length `string` +value to be extracted. The length of the string is automatically +determined using the format setting in `type`. + +The `timezone` setting allows to convert the extracted time to the +given value timezone. By default the time will be interpreted as `utc`. +Other valid values are `local`, i.e. the local timezone configured for +the machine, or valid timezone-specification e.g. `Europe/Berlin`. + +### `tag` specification + +When setting the `assignment` to `"tag"`, the extracted value +will be used as a tag. The `name` setting will be the name of the tag +and the `type` will default to `string`. When specifying other types, +the extracted value will first be interpreted as the given type and +then converted to `string`. + +The `bits` setting can be used to specify the length of the data to +extract and is required for fixed-length `string` types. + +### `field` specification + +When setting the `assignment` to `"field"` or omitting the `assignment` +setting, the extracted value will be used as a field. The `name` setting +is used as the name of the field and the `type` as type of the field value. + +The `bits` setting can be used to specify the length of the data to +extract. By default the length corresponding to `type` is used. +Please see the [string](#string-type-handling) and [bool](#bool-type-handling) +specific sections when using those types. + +### `string` type handling + +Strings are assumed to be fixed-length strings by default. In this case, the +`bits` setting is mandatory to specify the length of the string in _bit_. + +To handle dynamic strings, the `terminator` setting can be used to specify +characters to terminate the string. The two named options, `fixed` and `null` +will specify fixed-length and null-terminated strings, respectively. +Any other setting will be interpreted as hexadecimal sequence of bytes +matching the end of the string. The termination-sequence is removed from +the result. + +### `bool` type handling + +By default `bool` types are assumed to be _one_ bit in length. You can +specify any other length by using the `bits` setting. +When interpreting values as booleans, any zero value will be `false`, +while any non-zero value will result in `true`. + +### omitting data + +Parts of the data can be omitted by setting `omit = true`. In this case, +you only need to specify the length of the chunk to omit by either using +the `type` or `bits` setting. All other options can be skipped. + +### Filter definitions + +Filters can be used to match the length or the content of the data against +a specified reference. See the [examples section](#examples) for details. +You can also check multiple parts of the message by specifying multiple +`section` entries for a filter. Each `section` is then matched separately. +All have to match to apply the configuration. + +#### `length` and `length_min` options + +Using the `length` option, the filter will check if the data to parse has +exactly the given number of _bytes_. Otherwise, the configuration will not +be applied. +Similarly, for `length_min` the data has to have _at least_ the given number +of _bytes_ to generate a match. + +#### `selection` list + +Selections can be used with or without length constraints to match the content +of the data. Here, the `offset` and `bits` properties will specify the start +and length of the data to check. Both values are in _bit_ allowing for non-byte +aligned value extraction. The extracted data will the be checked against the +given `match` value specified in HEX. + +If multiple `selection` entries are specified _all_ of the selections must +match for the configuration to get applied. + +## Examples + +In the following example, we use a binary protocol with three different messages +in little-endian format + +### Message A definition + +```text ++--------+------+------+--------+--------+------------+--------------------+--------------------+ +| ID | type | len | addr | count | failure | value | timestamp | ++--------+------+------+--------+--------+------------+--------------------+--------------------+ +| 0x0201 | 0x0A | 0x18 | 0x7F01 | 0x2A00 | 0x00000000 | 0x6F1283C0CA210940 | 0x10D4DF6200000000 | ++--------+------+------+--------+--------+------------+--------------------+--------------------+ +``` + +### Message B definition + +```text ++--------+------+------+------------+ +| ID | type | len | value | ++--------+------+------+------------+ +| 0x0201 | 0x0B | 0x04 | 0xDEADC0DE | ++--------+------+------+------------+ +``` + +### Message C definition + +```text ++--------+------+------+------------+------------+--------------------+ +| ID | type | len | value x | value y | timestamp | ++--------+------+------+------------+------------+--------------------+ +| 0x0201 | 0x0C | 0x10 | 0x4DF82D40 | 0x5F305C08 | 0x10D4DF6200000000 | ++--------+------+------+------------+------------+--------------------+ +``` + +All messages consists of a 4-byte header containing the _message type_ +in the 3rd byte and a message specific body. To parse those messages +you can use the following configuration + +```toml +[[inputs.file]] + files = ["messageA.bin", "messageB.bin", "messageC.bin"] + data_format = "binary" + endianess = "le" + + [[inputs.file.binary]] + metric_name = "messageA" + + entries = [ + { bits = 32, omit = true }, + { name = "address", type = "uint16", assignment = "tag" }, + { name = "count", type = "int16" }, + { name = "failure", type = "bool", bits = 32, assignment = "tag" }, + { name = "value", type = "float64" }, + { type = "unix", assignment = "time" }, + ] + + [inputs.file.binary.filter] + selection = [{ offset = 16, bits = 8, match = "0x0A" }] + + [[inputs.file.binary]] + metric_name = "messageB" + + entries = [ + { bits = 32, omit = true }, + { name = "value", type = "uint32" }, + ] + + [inputs.file.binary.filter] + selection = [{ offset = 16, bits = 8, match = "0x0B" }] + + [[inputs.file.binary]] + metric_name = "messageC" + + entries = [ + { bits = 32, omit = true }, + { name = "x", type = "float32" }, + { name = "y", type = "float32" }, + { type = "unix", assignment = "time" }, + ] + + [inputs.file.binary.filter] + selection = [{ offset = 16, bits = 8, match = "0x0C" }] +``` + +The above configuration has one `[[inputs.file.binary]]` section per +message type and uses a filter in each of those sections to apply +the correct configuration by comparing the 3rd byte (containing +the message type). This will lead to the following output + +```text +> metricA,address=383,failure=false count=42i,value=3.1415 1658835984000000000 +> metricB value=3737169374i 1658847037000000000 +> metricC x=2.718280076980591,y=0.0000000000000000000000000000000006626070178575745 1658835984000000000 +``` + +where `metricB` uses the parsing time as timestamp due to missing +information in the data. The other two metrics use the timestamp +derived from the data. + +[time const]: https://golang.org/pkg/time/#pkg-constants +[time parse]: https://golang.org/pkg/time/#Parse diff --git a/plugins/parsers/binary/config.go b/plugins/parsers/binary/config.go new file mode 100644 index 000000000..d3d707d0d --- /dev/null +++ b/plugins/parsers/binary/config.go @@ -0,0 +1,183 @@ +package binary + +import ( + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/metric" +) + +type BinaryPart struct { + Offset uint64 `toml:"offset"` + Bits uint64 `toml:"bits"` + Match string `toml:"match"` + + val []byte +} + +type Filter struct { + Selection []BinaryPart `toml:"selection"` + LengthMin uint64 `toml:"length_min"` + Length uint64 `toml:"length"` +} + +type Config struct { + MetricName string `toml:"metric_name"` + Filter *Filter `toml:"filter"` + Entries []Entry `toml:"entries"` +} + +func (c *Config) preprocess(defaultName string) error { + // Preprocess filter part + if c.Filter != nil { + if c.Filter.Length != 0 && c.Filter.LengthMin != 0 { + return errors.New("length and length_min cannot be used together") + } + + var length uint64 + for i, s := range c.Filter.Selection { + end := (s.Offset + s.Bits) / 8 + if (s.Offset+s.Bits)%8 != 0 { + end++ + } + if end > length { + length = end + } + var err error + s.val, err = hex.DecodeString(strings.TrimPrefix(s.Match, "0x")) + if err != nil { + return fmt.Errorf("decoding match %d failed: %w", i, err) + } + c.Filter.Selection[i] = s + } + + if c.Filter.Length != 0 && length > c.Filter.Length { + return fmt.Errorf("filter length (%d) larger than constraint (%d)", length, c.Filter.Length) + } + + if c.Filter.Length == 0 && length > c.Filter.LengthMin { + c.Filter.LengthMin = length + } + } + + // Preprocess entries part + var hasField, hasMeasurement bool + defined := make(map[string]bool) + for i, e := range c.Entries { + if err := e.check(); err != nil { + return fmt.Errorf("entry %q (%d): %w", e.Name, i, err) + } + // Store the normalized entry + c.Entries[i] = e + + if e.Omit { + continue + } + + // Check for duplicate entries + key := e.Assignment + "_" + e.Name + if defined[key] { + return fmt.Errorf("multiple definitions of %q", e.Name) + } + defined[key] = true + hasMeasurement = hasMeasurement || e.Assignment == "measurement" + hasField = hasField || e.Assignment == "field" + } + + if !hasMeasurement && c.MetricName == "" { + if defaultName == "" { + return errors.New("no metric name given") + } + c.MetricName = defaultName + } + if !hasField { + return errors.New("no field defined") + } + + return nil +} + +func (c *Config) matches(in []byte) bool { + // If no filter is given, just match everything + if c.Filter == nil { + return true + } + + // Checking length constraints + length := uint64(len(in)) + if c.Filter.Length != 0 && length != c.Filter.Length { + return false + } + if c.Filter.LengthMin != 0 && length < c.Filter.LengthMin { + return false + } + + // Matching elements + for _, s := range c.Filter.Selection { + data, err := extractPart(in, s.Offset, s.Bits) + if err != nil { + return false + } + if len(data) != len(s.val) { + return false + } + for i, v := range data { + if v != s.val[i] { + return false + } + } + } + + return true +} + +func (c *Config) collect(in []byte, order binary.ByteOrder, defaultTime time.Time) (telegraf.Metric, error) { + t := defaultTime + name := c.MetricName + tags := make(map[string]string) + fields := make(map[string]interface{}) + + var offset uint64 + for _, e := range c.Entries { + data, n, err := e.extract(in, offset) + if err != nil { + return nil, err + } + offset += n + + switch e.Assignment { + case "measurement": + name = convertStringType(data) + case "field": + v, err := e.convertType(data, order) + if err != nil { + return nil, fmt.Errorf("field %q failed: %w", e.Name, err) + } + fields[e.Name] = v + case "tag": + raw, err := e.convertType(data, order) + if err != nil { + return nil, fmt.Errorf("tag %q failed: %w", e.Name, err) + } + v, err := internal.ToString(raw) + if err != nil { + return nil, fmt.Errorf("tag %q failed: %w", e.Name, err) + } + tags[e.Name] = v + case "time": + var err error + t, err = e.convertTimeType(data, order) + if err != nil { + return nil, fmt.Errorf("time failed: %w", err) + } + } + } + + return metric.New(name, tags, fields, t), nil +} diff --git a/plugins/parsers/binary/entry.go b/plugins/parsers/binary/entry.go new file mode 100644 index 000000000..ebdac2124 --- /dev/null +++ b/plugins/parsers/binary/entry.go @@ -0,0 +1,308 @@ +package binary + +import ( + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "math" + "strings" + "time" +) + +type Entry struct { + Name string `toml:"name"` + Type string `toml:"type"` + Bits uint64 `toml:"bits"` + Omit bool `toml:"omit"` + Terminator string `toml:"terminator"` + Timezone string `toml:"timezone"` + Assignment string `toml:"assignment"` + + termination []byte + location *time.Location +} + +func (e *Entry) check() error { + // Normalize cases + e.Assignment = strings.ToLower(e.Assignment) + e.Terminator = strings.ToLower(e.Terminator) + if e.Assignment != "time" { + e.Type = strings.ToLower(e.Type) + } + + // Handle omitted fields + if e.Omit { + if e.Bits == 0 && e.Type == "" { + return errors.New("neither type nor bits given") + } + if e.Bits == 0 { + bits, err := bitsForType(e.Type) + if err != nil { + return err + } + e.Bits = bits + } + return nil + } + + // Set name for global options + if e.Assignment == "measurement" || e.Assignment == "time" { + e.Name = e.Assignment + } + + // Check the name + if e.Name == "" { + return errors.New("missing name") + } + + // Check the assignment + var defaultType string + switch e.Assignment { + case "measurement": + defaultType = "string" + if e.Type != "string" && e.Type != "" { + return errors.New("'measurement' type has to be 'string'") + } + case "time": + bits := uint64(64) + + switch e.Type { + // Make 'unix' the default + case "": + defaultType = "unix" + // Special plugin specific names + case "unix", "unix_ms", "unix_us", "unix_ns": + // Format-specification string formats + default: + bits = uint64(len(e.Type) * 8) + } + if e.Bits == 0 { + e.Bits = bits + } + + switch e.Timezone { + case "", "utc": + // Make UTC the default + e.location = time.UTC + case "local": + e.location = time.Local + default: + var err error + e.location, err = time.LoadLocation(e.Timezone) + if err != nil { + return err + } + } + case "tag": + defaultType = "string" + case "", "field": + e.Assignment = "field" + default: + return fmt.Errorf("no assignment for %q", e.Name) + } + + // Check type (special type for "time") + switch e.Type { + case "uint8", "int8", "uint16", "int16", "uint32", "int32", "uint64", "int64": + fallthrough + case "float32", "float64": + bits, err := bitsForType(e.Type) + if err != nil { + return err + } + if e.Bits == 0 { + e.Bits = bits + } + if bits < e.Bits { + return fmt.Errorf("type overflow for %q", e.Name) + } + case "bool": + if e.Bits == 0 { + e.Bits = 1 + } + case "string": + // Check termination + switch e.Terminator { + case "", "fixed": + e.Terminator = "fixed" + if e.Bits == 0 { + return fmt.Errorf("require 'bits' for fixed-length string for %q", e.Name) + } + case "null": + e.termination = []byte{0} + if e.Bits != 0 { + return fmt.Errorf("cannot use 'bits' and 'null' terminator together for %q", e.Name) + } + default: + if e.Bits != 0 { + return fmt.Errorf("cannot use 'bits' and terminator together for %q", e.Name) + } + var err error + e.termination, err = hex.DecodeString(strings.TrimPrefix(e.Terminator, "0x")) + if err != nil { + return fmt.Errorf("decoding terminator failed for %q: %w", e.Name, err) + } + } + + // We can only handle strings that adhere to byte-bounds + if e.Bits%8 != 0 { + return fmt.Errorf("non-byte length for string field %q", e.Name) + } + case "": + if defaultType == "" { + return fmt.Errorf("no type for %q", e.Name) + } + e.Type = defaultType + default: + if e.Assignment != "time" { + return fmt.Errorf("unknown type for %q", e.Name) + } + } + + return nil +} + +func (e *Entry) extract(in []byte, offset uint64) ([]byte, uint64, error) { + if e.Bits > 0 { + data, err := extractPart(in, offset, e.Bits) + return data, e.Bits, err + } + + if e.Type != "string" { + return nil, 0, fmt.Errorf("unexpected entry: %v", e) + } + + inbits := uint64(len(in)) * 8 + + // Read up to the termination + var found bool + var data []byte + var termOffset int + var n uint64 + for offset+n+8 <= inbits { + buf, err := extractPart(in, offset+n, 8) + if err != nil { + return nil, 0, err + } + if len(buf) != 1 { + return nil, 0, fmt.Errorf("unexpected length %d", len(buf)) + } + data = append(data, buf[0]) + n += 8 + + // Check for terminator + if buf[0] == e.termination[termOffset] { + termOffset++ + } + if termOffset == len(e.termination) { + found = true + break + } + } + if !found { + return nil, n, fmt.Errorf("terminator not found for %q", e.Name) + } + + // Strip the terminator + return data[:len(data)-len(e.termination)], n, nil +} + +func (e *Entry) convertType(in []byte, order binary.ByteOrder) (interface{}, error) { + switch e.Type { + case "uint8", "int8", "uint16", "int16", "uint32", "int32", "float32", "uint64", "int64", "float64": + return convertNumericType(in, e.Type, order) + case "bool": + return convertBoolType(in), nil + case "string": + return convertStringType(in), nil + } + + return nil, fmt.Errorf("cannot handle type %q", e.Type) +} + +func (e *Entry) convertTimeType(in []byte, order binary.ByteOrder) (time.Time, error) { + factor := int64(1) + + switch e.Type { + case "unix": + factor *= 1000 + fallthrough + case "unix_ms": + factor *= 1000 + fallthrough + case "unix_us": + factor *= 1000 + fallthrough + case "unix_ns": + raw, err := convertNumericType(in, "int64", order) + if err != nil { + return time.Unix(0, 0), err + } + v := raw.(int64) + return time.Unix(0, v*factor).In(e.location), nil + } + // We have a format specification (hopefully) + v := convertStringType(in) + return time.ParseInLocation(e.Type, v, e.location) +} + +func convertStringType(in []byte) string { + return string(in) +} + +func convertNumericType(in []byte, t string, order binary.ByteOrder) (interface{}, error) { + bits, err := bitsForType(t) + if err != nil { + return nil, err + } + + inlen := uint64(len(in)) + expected := bits / 8 + if inlen > expected { + // Should never happen + return 0, fmt.Errorf("too many bytes %d vs %d", len(in), expected) + } + + // Pad the data if shorter than the datatype length + buf := make([]byte, expected-inlen, expected) + buf = append(buf, in...) + + switch t { + case "uint8": + return buf[0], nil + case "int8": + return int8(buf[0]), nil + case "uint16": + return order.Uint16(buf), nil + case "int16": + v := order.Uint16(buf) + return int16(v), nil + case "uint32": + return order.Uint32(buf), nil + case "int32": + v := order.Uint32(buf) + return int32(v), nil + case "uint64": + return order.Uint64(buf), nil + case "int64": + v := order.Uint64(buf) + return int64(v), nil + case "float32": + v := order.Uint32(buf) + return math.Float32frombits(v), nil + case "float64": + v := order.Uint64(buf) + return math.Float64frombits(v), nil + } + return nil, fmt.Errorf("no numeric type %q", t) +} + +func convertBoolType(in []byte) bool { + for _, x := range in { + if x != 0 { + return true + } + } + return false +} diff --git a/plugins/parsers/binary/entry_test.go b/plugins/parsers/binary/entry_test.go new file mode 100644 index 000000000..5dff01eab --- /dev/null +++ b/plugins/parsers/binary/entry_test.go @@ -0,0 +1,42 @@ +package binary + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEntryExtract(t *testing.T) { + testdata := []byte{0x01, 0x02, 0x03, 0x04} + + e := &Entry{Type: "uint64"} + _, _, err := e.extract(testdata, 0) + require.EqualError(t, err, `unexpected entry: &{ uint64 0 false [] }`) +} + +func TestEntryConvertType(t *testing.T) { + testdata := []byte{0x01, 0x02, 0x03, 0x04} + + e := &Entry{Type: "garbage"} + _, err := e.convertType(testdata, hostEndianess) + require.EqualError(t, err, `cannot handle type "garbage"`) +} + +func TestEntryConvertTimeType(t *testing.T) { + testdata := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09} + + e := &Entry{Type: "unix_ns", location: time.UTC} + _, err := e.convertTimeType(testdata, hostEndianess) + require.EqualError(t, err, `too many bytes 9 vs 8`) +} + +func TestConvertNumericType(t *testing.T) { + testdata := []byte{0x01, 0x02, 0x03, 0x04} + + _, err := convertNumericType(testdata, "garbage", hostEndianess) + require.EqualError(t, err, `cannot determine length for type "garbage"`) + + _, err = convertNumericType(testdata, "uint8", hostEndianess) + require.EqualError(t, err, `too many bytes 4 vs 1`) +} diff --git a/plugins/parsers/binary/host_endianess_be.go b/plugins/parsers/binary/host_endianess_be.go new file mode 100644 index 000000000..0dc1929cd --- /dev/null +++ b/plugins/parsers/binary/host_endianess_be.go @@ -0,0 +1,8 @@ +//go:build arm64be +// +build arm64be + +package binary + +import "encoding/binary" + +var hostEndianess = binary.BigEndian diff --git a/plugins/parsers/binary/host_endianess_le.go b/plugins/parsers/binary/host_endianess_le.go new file mode 100644 index 000000000..2f64625a6 --- /dev/null +++ b/plugins/parsers/binary/host_endianess_le.go @@ -0,0 +1,8 @@ +//go:build !arm64be +// +build !arm64be + +package binary + +import "encoding/binary" + +var hostEndianess = binary.LittleEndian diff --git a/plugins/parsers/binary/parser.go b/plugins/parsers/binary/parser.go new file mode 100644 index 000000000..47015467f --- /dev/null +++ b/plugins/parsers/binary/parser.go @@ -0,0 +1,166 @@ +package binary + +import ( + "encoding/binary" + "errors" + "fmt" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" +) + +type Parser struct { + AllowNoMatch bool `toml:"allow_no_match"` + Endianess string `toml:"endianess"` + Configs []Config `toml:"binary"` + Log telegraf.Logger `toml:"-"` + + metricName string + defaultTags map[string]string + converter binary.ByteOrder +} + +func (p *Parser) Init() error { + switch p.Endianess { + case "le": + p.converter = binary.LittleEndian + case "be": + p.converter = binary.BigEndian + case "", "host": + p.converter = hostEndianess + default: + return fmt.Errorf("unknown endianess %q", p.Endianess) + } + + // Pre-process the configurations + if len(p.Configs) == 0 { + return errors.New("no configuration given") + } + for i, cfg := range p.Configs { + if err := cfg.preprocess(p.metricName); err != nil { + return fmt.Errorf("config %d invalid: %w", i, err) + } + p.Configs[i] = cfg + } + + return nil +} + +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + t := time.Now() + + matches := 0 + metrics := make([]telegraf.Metric, 0) + for i, cfg := range p.Configs { + // Apply the filter and see if we should match this + if !cfg.matches(buf) { + p.Log.Debugf("ignoring data in config %d", i) + continue + } + matches++ + + // Collect the metric + m, err := cfg.collect(buf, p.converter, t) + if err != nil { + return nil, err + } + metrics = append(metrics, m) + } + if matches == 0 && !p.AllowNoMatch { + return nil, errors.New("no matching configuration") + } + + return metrics, nil +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + metrics, err := p.Parse([]byte(line)) + if err != nil { + return nil, err + } + + switch len(metrics) { + case 0: + return nil, nil + case 1: + return metrics[0], nil + default: + return metrics[0], fmt.Errorf("cannot parse line with multiple (%d) metrics", len(metrics)) + } +} + +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.defaultTags = tags +} + +func init() { + // Register all variants + parsers.Add("binary", + func(defaultMetricName string) telegraf.Parser { + return &Parser{metricName: defaultMetricName} + }, + ) +} + +func extractPart(in []byte, offset, bits uint64) ([]byte, error) { + inLen := uint64(len(in)) + + start := offset / 8 + bitend := offset%8 + bits + length := bitend / 8 + if bitend%8 != 0 { + length++ + } + + if start+length > inLen { + return nil, fmt.Errorf("out-of-bounds @%d with %d bits", offset, bits) + } + + var out []byte + out = append(out, in[start:start+length]...) + + if offset%8 != 0 { + // Mask the start-byte with the non-aligned bit-mask + startmask := (byte(1) << (8 - offset%8)) - 1 + out[0] = out[0] & startmask + } + + if bitend%8 == 0 { + // The end is aligned to byte-boundaries + return out, nil + } + + shift := 8 - bitend%8 + carryshift := bitend % 8 + + // We need to shift right in case of not ending at a byte boundary + // to make the bits right aligned. + // Carry over the bits from the byte left to fill in... + var carry byte + for i, x := range out { + out[i] = (x >> shift) | carry + carry = x << carryshift + } + + if bits%8 == 0 { + // Avoid an empty leading byte + return out[1:], nil + } + + return out, nil +} + +func bitsForType(t string) (uint64, error) { + switch t { + case "uint8", "int8": + return 8, nil + case "uint16", "int16": + return 16, nil + case "uint32", "int32", "float32": + return 32, nil + case "uint64", "int64", "float64": + return 64, nil + } + return 0, fmt.Errorf("cannot determine length for type %q", t) +} diff --git a/plugins/parsers/binary/parser_test.go b/plugins/parsers/binary/parser_test.go new file mode 100644 index 000000000..8e42ff221 --- /dev/null +++ b/plugins/parsers/binary/parser_test.go @@ -0,0 +1,1467 @@ +package binary + +import ( + "bytes" + "encoding/binary" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/inputs/file" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/require" +) + +var dummyEntry = Entry{ + Name: "dummy", + Type: "uint8", + Bits: 8, + Assignment: "field", +} + +func generateBinary(data []interface{}, order binary.ByteOrder) ([]byte, error) { + var buf bytes.Buffer + + for _, x := range data { + var err error + switch v := x.(type) { + case []byte: + _, err = buf.Write(v) + case string: + _, err = buf.WriteString(v) + default: + err = binary.Write(&buf, order, x) + } + if err != nil { + return nil, err + } + } + return buf.Bytes(), nil +} + +func determineEndianess(endianess string) binary.ByteOrder { + switch endianess { + case "le": + return binary.LittleEndian + case "be": + return binary.BigEndian + case "host": + return hostEndianess + } + panic(fmt.Errorf("unknown endianess %q", endianess)) +} + +func TestInitInvalid(t *testing.T) { + var tests = []struct { + name string + metric string + config []Config + endianess string + expected string + }{ + { + name: "wrong endianess", + metric: "binary", + endianess: "garbage", + expected: `unknown endianess "garbage"`, + }, + { + name: "empty configuration", + metric: "binary", + endianess: "host", + expected: `no configuration given`, + }, + { + name: "no metric name", + config: []Config{{}}, + endianess: "host", + expected: `config 0 invalid: no metric name given`, + }, + { + name: "no field", + config: []Config{{}}, + metric: "binary", + expected: `config 0 invalid: no field defined`, + }, + { + name: "invalid entry", + config: []Config{{ + Entries: []Entry{ + { + Bits: 8, + }, + }, + }}, + metric: "binary", + expected: `config 0 invalid: entry "" (0): missing name`, + }, + { + name: "multiple measurements", + config: []Config{{ + Entries: []Entry{ + { + Bits: 8, + Assignment: "measurement", + }, + { + Bits: 8, + Assignment: "measurement", + }, + }, + }}, + metric: "binary", + expected: `config 0 invalid: multiple definitions of "measurement"`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := &Parser{ + Endianess: tt.endianess, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: tt.metric, + } + + parser.Configs = tt.config + require.EqualError(t, parser.Init(), tt.expected) + }) + } +} + +func TestFilterInvalid(t *testing.T) { + var tests = []struct { + name string + filter *Filter + expected string + }{ + { + name: "both length and length-min", + filter: &Filter{Length: 35, LengthMin: 33}, + expected: `config 0 invalid: length and length_min cannot be used together`, + }, + { + name: "filter too long length", + filter: &Filter{Length: 3, Selection: []BinaryPart{{Offset: 16, Bits: 16}}}, + expected: `config 0 invalid: filter length (4) larger than constraint (3)`, + }, + { + name: "filter invalid match", + filter: &Filter{Selection: []BinaryPart{{Offset: 16, Bits: 16, Match: "XYZ"}}}, + expected: `config 0 invalid: decoding match 0 failed: encoding/hex: invalid byte: U+0058 'X'`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := &Parser{ + Configs: []Config{{Filter: tt.filter}}, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.EqualError(t, parser.Init(), tt.expected) + }) + } +} + +func TestFilterMatchInvalid(t *testing.T) { + testdata := []byte{0x01, 0x02} + + var tests = []struct { + name string + filter *Filter + expected string + }{ + { + name: "filter length mismatch", + filter: &Filter{Selection: []BinaryPart{{Offset: 0, Bits: 8, Match: "0x0102"}}}, + expected: `no matching configuration`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := &Parser{ + Configs: []Config{{Filter: tt.filter, Entries: []Entry{{Name: "test", Type: "uint8"}}}}, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.NoError(t, parser.Init()) + _, err := parser.Parse(testdata) + require.EqualError(t, err, tt.expected) + }) + } +} + +func TestFilterNoMatch(t *testing.T) { + testdata := []interface{}{uint16(0x0102)} + + t.Run("no match error", func(t *testing.T) { + parser := &Parser{ + Configs: []Config{ + { + Filter: &Filter{Length: 32}, + Entries: []Entry{dummyEntry}, + }, + }, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.NoError(t, parser.Init()) + + data, err := generateBinary(testdata, hostEndianess) + require.NoError(t, err) + + _, err = parser.Parse(data) + require.EqualError(t, err, "no matching configuration") + }) + + t.Run("no match allow", func(t *testing.T) { + parser := &Parser{ + AllowNoMatch: true, + Configs: []Config{ + { + Filter: &Filter{Length: 32}, + Entries: []Entry{dummyEntry}, + }, + }, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.NoError(t, parser.Init()) + + data, err := generateBinary(testdata, hostEndianess) + require.NoError(t, err) + + metrics, err := parser.Parse(data) + require.NoError(t, err) + require.Empty(t, metrics) + }) +} + +func TestFilterNone(t *testing.T) { + testdata := []interface{}{ + uint64(0x01020304050607), + uint64(0x08090A0B0C0D0E), + uint64(0x0F101213141516), + uint64(0x1718191A1B1C1D), + uint64(0x1E1F2021222324), + } + + var tests = []struct { + name string + data []interface{} + filter *Filter + endianess string + }{ + { + name: "no filter (BE)", + data: testdata, + filter: nil, + endianess: "be", + }, + { + name: "no filter (LE)", + data: testdata, + filter: nil, + endianess: "le", + }, + { + name: "no filter (host)", + data: testdata, + filter: nil, + endianess: "host", + }, + { + name: "empty filter (BE)", + data: testdata, + filter: &Filter{}, + endianess: "be", + }, + { + name: "empty filter (LE)", + data: testdata, + filter: &Filter{}, + endianess: "le", + }, + { + name: "empty filter (host)", + data: testdata, + filter: &Filter{}, + endianess: "host", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := &Parser{ + Endianess: tt.endianess, + Configs: []Config{ + { + Filter: tt.filter, + Entries: []Entry{dummyEntry}, + }, + }, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.NoError(t, parser.Init()) + + order := determineEndianess(tt.endianess) + data, err := generateBinary(tt.data, order) + require.NoError(t, err) + + metrics, err := parser.Parse(data) + require.NoError(t, err) + require.NotEmpty(t, metrics) + }) + } +} + +func TestFilterLength(t *testing.T) { + testdata := []interface{}{ + uint64(0x01020304050607), + uint64(0x08090A0B0C0D0E), + uint64(0x0F101213141516), + uint64(0x1718191A1B1C1D), + uint64(0x1E1F2021222324), + } + + var tests = []struct { + name string + data []interface{} + filter *Filter + expected bool + }{ + { + name: "length match", + data: testdata, + filter: &Filter{Length: 40}, + expected: true, + }, + { + name: "length no match too short", + data: testdata, + filter: &Filter{Length: 41}, + expected: false, + }, + { + name: "length no match too long", + data: testdata, + filter: &Filter{Length: 39}, + expected: false, + }, + { + name: "length min match", + data: testdata, + filter: &Filter{LengthMin: 40}, + expected: true, + }, + { + name: "length min no match too short", + data: testdata, + filter: &Filter{LengthMin: 41}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := &Parser{ + AllowNoMatch: true, + Configs: []Config{ + { + Filter: tt.filter, + Entries: []Entry{dummyEntry}, + }, + }, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.NoError(t, parser.Init()) + + data, err := generateBinary(tt.data, hostEndianess) + require.NoError(t, err) + + metrics, err := parser.Parse(data) + require.NoError(t, err) + if tt.expected { + require.NotEmpty(t, metrics) + } else { + require.Empty(t, metrics) + } + }) + } +} + +func TestFilterContent(t *testing.T) { + testdata := [][]byte{ + {0x01, 0x02, 0x03, 0xA4, 0x05, 0x06, 0x07, 0x08}, + {0x01, 0xA2, 0x03, 0x04, 0x15, 0x01, 0x07, 0x08}, + {0xF1, 0xB1, 0x03, 0xA4, 0x25, 0x06, 0x07, 0x08}, + {0xF1, 0xC2, 0x03, 0x04, 0x35, 0x01, 0x07, 0x08}, + {0x42, 0xD1, 0x03, 0xA4, 0x25, 0x06, 0x42, 0x08}, + {0x42, 0xE2, 0x03, 0x04, 0x35, 0x01, 0x42, 0x08}, + {0x01, 0x00, 0x00, 0xA4}, + } + var tests = []struct { + name string + filter *Filter + expected int + }{ + { + name: "first byte", + filter: &Filter{ + Selection: []BinaryPart{ + { + Offset: 0, + Bits: 8, + Match: "0xF1", + }, + }, + }, + expected: 2, + }, + { + name: "last byte", + filter: &Filter{ + Selection: []BinaryPart{ + { + Offset: 7 * 8, + Bits: 8, + Match: "0x08", + }, + }, + }, + expected: 6, + }, + { + name: "none-byte boundary begin", + filter: &Filter{ + Selection: []BinaryPart{ + { + Offset: 12, + Bits: 12, + Match: "0x0203", + }, + }, + }, + expected: 4, + }, + { + name: "none-byte boundary end", + filter: &Filter{ + Selection: []BinaryPart{ + { + Offset: 16, + Bits: 12, + Match: "0x003A", + }, + }, + }, + expected: 3, + }, + { + name: "none-byte boundary end", + filter: &Filter{ + Selection: []BinaryPart{ + { + Offset: 36, + Bits: 8, + Match: "0x50", + }, + }, + }, + expected: 6, + }, + { + name: "multiple elements", + filter: &Filter{ + Selection: []BinaryPart{ + { + Offset: 4, + Bits: 4, + Match: "0x01", + }, + { + Offset: 24, + Bits: 8, + Match: "0xA4", + }, + }, + }, + expected: 3, + }, + { + name: "multiple elements and length", + filter: &Filter{ + Selection: []BinaryPart{ + { + Offset: 4, + Bits: 4, + Match: "0x01", + }, + { + Offset: 24, + Bits: 8, + Match: "0xA4", + }, + }, + Length: 4, + }, + expected: 1, + }, + { + name: "multiple elements and length-min", + filter: &Filter{ + Selection: []BinaryPart{ + { + Offset: 4, + Bits: 4, + Match: "0x01", + }, + { + Offset: 24, + Bits: 8, + Match: "0xA4", + }, + }, + LengthMin: 5, + }, + expected: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parser := &Parser{ + AllowNoMatch: true, + Configs: []Config{ + { + Filter: tt.filter, + Entries: []Entry{dummyEntry}, + }, + }, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.NoError(t, parser.Init()) + + var metrics []telegraf.Metric + for _, data := range testdata { + m, err := parser.Parse(data) + require.NoError(t, err) + metrics = append(metrics, m...) + } + require.Len(t, metrics, tt.expected) + }) + } +} + +func TestParseLineInvalid(t *testing.T) { + var tests = []struct { + name string + data []interface{} + configs []Config + expected string + }{ + { + name: "out-of-bounds", + data: []interface{}{ + "2022-07-25T20:41:29+02:00", // time + uint16(0x0102), // address + float64(42.123), // value + }, + configs: []Config{ + { + Entries: []Entry{ + { + Type: "2006-01-02T15:04:05Z07:00", + Assignment: "time", + Timezone: "UTC", + }, + { + Type: "uint32", + Omit: true, + }, + { + Name: "value", + Type: "float64", + }, + }, + }, + }, + expected: `out-of-bounds @232 with 64 bits`, + }, + { + name: "multiple matches", + data: []interface{}{ + "2022-07-25T20:41:29+02:00", // time + uint16(0x0102), // address + float64(42.123), // value + }, + configs: []Config{ + { + Entries: []Entry{ + { + Type: "2006-01-02T15:04:05Z07:00", + Assignment: "time", + Timezone: "UTC", + }, + { + Type: "uint16", + Omit: true, + }, + { + Name: "value", + Type: "float64", + }, + }, + }, + { + Entries: []Entry{ + { + Type: "2006-01-02T15:04:05Z07:00", + Assignment: "time", + Timezone: "UTC", + }, + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "value", + Type: "float64", + }, + }, + }, + }, + expected: `cannot parse line with multiple (2) metrics`, + }, + } + + for _, tt := range tests { + for _, endianess := range []string{"be", "le", "host"} { + name := fmt.Sprintf("%s (%s)", tt.name, endianess) + t.Run(name, func(t *testing.T) { + parser := &Parser{ + Endianess: endianess, + Configs: tt.configs, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.NoError(t, parser.Init()) + + order := determineEndianess(endianess) + data, err := generateBinary(tt.data, order) + require.NoError(t, err) + + _, err = parser.ParseLine(string(data)) + require.EqualError(t, err, tt.expected) + }) + } + } +} + +func TestParseLine(t *testing.T) { + var tests = []struct { + name string + data []interface{} + filter *Filter + entries []Entry + expected telegraf.Metric + }{ + { + name: "no match", + data: []interface{}{ + "2022-07-25T20:41:29+02:00", // time + uint16(0x0102), // address + float64(42.123), // value + }, + filter: &Filter{Length: 4}, + entries: []Entry{ + { + Type: "2006-01-02T15:04:05Z07:00", + Assignment: "time", + Timezone: "UTC", + }, + { + Type: "uint16", + Omit: true, + }, + { + Name: "value", + Type: "float64", + }, + }, + }, + { + name: "single match", + data: []interface{}{ + "2022-07-25T20:41:29+02:00", // time + uint16(0x0102), // address + float64(42.123), // value + }, + entries: []Entry{ + { + Type: "2006-01-02T15:04:05Z07:00", + Assignment: "time", + Timezone: "UTC", + }, + { + Type: "uint16", + Omit: true, + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: metric.New( + "binary", + map[string]string{}, + map[string]interface{}{"value": float64(42.123)}, + time.Unix(1658774489, 0), + ), + }, + } + + for _, tt := range tests { + for _, endianess := range []string{"be", "le", "host"} { + name := fmt.Sprintf("%s (%s)", tt.name, endianess) + t.Run(name, func(t *testing.T) { + parser := &Parser{ + AllowNoMatch: true, + Endianess: endianess, + Configs: []Config{{ + Filter: tt.filter, + Entries: tt.entries, + }}, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.NoError(t, parser.Init()) + + order := determineEndianess(endianess) + data, err := generateBinary(tt.data, order) + require.NoError(t, err) + + m, err := parser.ParseLine(string(data)) + require.NoError(t, err) + + testutil.RequireMetricEqual(t, tt.expected, m) + }) + } + } +} + +func TestParseInvalid(t *testing.T) { + var tests = []struct { + name string + data []interface{} + entries []Entry + expected string + }{ + { + name: "message too short", + data: []interface{}{uint64(0x0102030405060708)}, + entries: []Entry{ + { + Name: "command", + Type: "uint32", + Assignment: "tag", + }, + { + Name: "version", + Type: "uint32", + Assignment: "tag", + }, + { + Name: "address", + Type: "uint32", + Assignment: "tag", + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: `out-of-bounds @64 with 32 bits`, + }, + { + name: "non-terminated string", + data: []interface{}{ + uint16(0xAB42), // address + "testmetric", // metric + float64(42.23432243), // value + }, + entries: []Entry{ + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Type: "string", + Terminator: "null", + Assignment: "measurement", + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: `terminator not found for "measurement"`, + }, + { + name: "invalid time", + data: []interface{}{ + "2022-07-25T18:41:XYZ", // time + uint16(0x0102), // address + float64(42.123), // value + }, + entries: []Entry{ + { + Type: "2006-01-02T15:04:05Z", + Assignment: "time", + }, + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: `time failed: parsing time "2022-07-25T18:41:XYZ" as "2006-01-02T15:04:05Z": cannot parse "XYZ" as "05"`, + }, + } + + for _, tt := range tests { + for _, endianess := range []string{"be", "le", "host"} { + name := fmt.Sprintf("%s (%s)", tt.name, endianess) + t.Run(name, func(t *testing.T) { + parser := &Parser{ + Endianess: endianess, + Configs: []Config{{Entries: tt.entries}}, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.NoError(t, parser.Init()) + + order := determineEndianess(endianess) + data, err := generateBinary(tt.data, order) + require.NoError(t, err) + + _, err = parser.Parse(data) + require.EqualError(t, err, tt.expected) + }) + } + } +} + +func TestParse(t *testing.T) { + timeBerlin, err := time.Parse(time.RFC3339, "2022-07-25T20:41:29+02:00") + require.NoError(t, err) + timeBerlinMilli, err := time.Parse(time.RFC3339Nano, "2022-07-25T20:41:29.123+02:00") + require.NoError(t, err) + + var tests = []struct { + name string + data []interface{} + entries []Entry + ignoreTime bool + expected []telegraf.Metric + }{ + { + name: "fixed numbers", + data: []interface{}{ + uint16(0xAB42), // command + uint8(0x02), // version + uint32(0x010000FF), // address + uint64(0x0102030405060708), // serial-number + int8(-25), // countdown as int32 + int16(-42), // overdue + int32(-65535), // batchleft + int64(12345678), // counter + float32(3.1415), // x + float32(99.471), // y + float64(0.23432243), // z + uint8(0xFF), // status + uint8(0x0F), // on/off bit-field + }, + entries: []Entry{ + { + Name: "command", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "version", + Type: "uint8", + Assignment: "tag", + }, + { + Name: "address", + Type: "uint32", + Assignment: "tag", + }, + { + Name: "serialnumber", + Type: "uint64", + Assignment: "tag", + }, + { + Name: "countdown", + Type: "int8", + }, + { + Name: "overdue", + Type: "int16", + }, + { + Name: "batchleft", + Type: "int32", + }, + { + Name: "counter", + Type: "int64", + }, + { + Name: "x", + Type: "float32", + }, + { + Name: "y", + Type: "float32", + }, + { + Name: "z", + Type: "float64", + }, + { + Name: "status", + Type: "bool", + Bits: 8, + }, + { + Name: "error_part", + Type: "bool", + Bits: 4, + Assignment: "tag", + }, + { + Name: "ok_part1", + Type: "bool", + Bits: 1, + Assignment: "tag", + }, + { + Name: "ok_part2", + Type: "bool", + Bits: 1, + Assignment: "tag", + }, + { + Name: "ok_part3", + Type: "bool", + Bits: 1, + Assignment: "tag", + }, + { + Name: "ok_part4", + Type: "bool", + Bits: 1, + Assignment: "tag", + }, + }, + ignoreTime: true, + expected: []telegraf.Metric{ + metric.New( + "binary", + map[string]string{ + "command": "43842", + "version": "2", + "address": "16777471", + "serialnumber": "72623859790382856", + "error_part": "false", + "ok_part1": "true", + "ok_part2": "true", + "ok_part3": "true", + "ok_part4": "true", + }, + map[string]interface{}{ + "x": float32(3.1415), + "y": float32(99.471), + "z": float64(0.23432243), + "countdown": int8(-25), + "overdue": int16(-42), + "batchleft": int32(-65535), + "counter": int64(12345678), + + "status": true, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "fixed length string", + data: []interface{}{ + uint16(0xAB42), // address + "test", // metric + float64(0.23432243), // value + }, + entries: []Entry{ + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "app", + Type: "string", + Bits: 4 * 8, + Assignment: "field", + }, + { + Name: "value", + Type: "float64", + }, + }, + ignoreTime: true, + expected: []telegraf.Metric{ + metric.New( + "binary", + map[string]string{"address": "43842"}, + map[string]interface{}{ + "app": "test", + "value": float64(0.23432243), + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "null-terminated string", + data: []interface{}{ + uint16(0xAB42), // address + append([]byte("testmetric"), 0x00), // metric + float64(42.23432243), // value + }, + entries: []Entry{ + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Type: "string", + Terminator: "null", + Assignment: "measurement", + }, + { + Name: "value", + Type: "float64", + }, + }, + ignoreTime: true, + expected: []telegraf.Metric{ + metric.New( + "testmetric", + map[string]string{"address": "43842"}, + map[string]interface{}{"value": float64(42.23432243)}, + time.Unix(0, 0), + ), + }, + }, + { + name: "char-terminated string", + data: []interface{}{ + uint16(0xAB42), // address + append([]byte("testmetric"), 0x0A, 0x0B), // metric + float64(42.23432243), // value + }, + entries: []Entry{ + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Type: "string", + Terminator: "0x0A0B", + Assignment: "measurement", + }, + { + Name: "value", + Type: "float64", + }, + }, + ignoreTime: true, + expected: []telegraf.Metric{ + metric.New( + "testmetric", + map[string]string{"address": "43842"}, + map[string]interface{}{"value": float64(42.23432243)}, + time.Unix(0, 0), + ), + }, + }, + { + name: "time (unix/UTC)", + data: []interface{}{ + uint64(1658774489), // time + uint16(0x0102), // address + float64(42.123), // value + }, + entries: []Entry{ + { + Type: "unix", + Assignment: "time", + }, + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: []telegraf.Metric{ + metric.New( + "binary", + map[string]string{"address": "258"}, + map[string]interface{}{"value": float64(42.123)}, + time.Unix(1658774489, 0), + ), + }, + }, + { + name: "time (unix/Berlin)", + data: []interface{}{ + uint64(1658774489), // time + uint16(0x0102), // address + float64(42.123), // value + }, + entries: []Entry{ + { + Type: "unix", + Assignment: "time", + Timezone: "Europe/Berlin", + }, + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: []telegraf.Metric{ + metric.New( + "binary", + map[string]string{"address": "258"}, + map[string]interface{}{"value": float64(42.123)}, + timeBerlin, + ), + }, + }, + { + name: "time (unix_ms/UTC)", + data: []interface{}{ + uint64(1658774489123), // time + uint16(0x0102), // address + float64(42.123), // value + }, + entries: []Entry{ + { + Type: "unix_ms", + Assignment: "time", + }, + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: []telegraf.Metric{ + metric.New( + "binary", + map[string]string{"address": "258"}, + map[string]interface{}{"value": float64(42.123)}, + time.Unix(0, 1658774489123*1_000_000), + ), + }, + }, + { + name: "time (unix_ms/Berlin)", + data: []interface{}{ + uint64(1658774489123), // time + uint16(0x0102), // address + float64(42.123), // value + }, + entries: []Entry{ + { + Type: "unix_ms", + Assignment: "time", + Timezone: "Europe/Berlin", + }, + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: []telegraf.Metric{ + metric.New( + "binary", + map[string]string{"address": "258"}, + map[string]interface{}{"value": float64(42.123)}, + timeBerlinMilli, + ), + }, + }, + { + name: "time (RFC3339/UTC)", + data: []interface{}{ + "2022-07-25T18:41:29Z", // time + uint16(0x0102), // address + float64(42.123), // value + }, + entries: []Entry{ + { + Type: "2006-01-02T15:04:05Z", + Assignment: "time", + }, + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: []telegraf.Metric{ + metric.New( + "binary", + map[string]string{"address": "258"}, + map[string]interface{}{"value": float64(42.123)}, + time.Unix(1658774489, 0), + ), + }, + }, + { + name: "time (RFC3339/Berlin)", + data: []interface{}{ + "2022-07-25T20:41:29+02:00", // time + uint16(0x0102), // address + float64(42.123), // value + }, + entries: []Entry{ + { + Type: "2006-01-02T15:04:05Z07:00", + Assignment: "time", + Timezone: "Europe/Berlin", + }, + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: []telegraf.Metric{ + metric.New( + "binary", + map[string]string{"address": "258"}, + map[string]interface{}{"value": float64(42.123)}, + timeBerlin, + ), + }, + }, + { + name: "time (RFC3339/Berlin->UTC)", + data: []interface{}{ + "2022-07-25T20:41:29+02:00", // time + uint16(0x0102), // address + float64(42.123), // value + }, + entries: []Entry{ + { + Type: "2006-01-02T15:04:05Z07:00", + Assignment: "time", + Timezone: "UTC", + }, + { + Name: "address", + Type: "uint16", + Assignment: "tag", + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: []telegraf.Metric{ + metric.New( + "binary", + map[string]string{"address": "258"}, + map[string]interface{}{"value": float64(42.123)}, + time.Unix(1658774489, 0), + ), + }, + }, + { + name: "omit", + data: []interface{}{ + "2022-07-25T20:41:29+02:00", // time + uint16(0x0102), // address + float64(42.123), // value + }, + entries: []Entry{ + { + Type: "2006-01-02T15:04:05Z07:00", + Assignment: "time", + Timezone: "UTC", + }, + { + Type: "uint16", + Omit: true, + }, + { + Name: "value", + Type: "float64", + }, + }, + expected: []telegraf.Metric{ + metric.New( + "binary", + map[string]string{}, + map[string]interface{}{"value": float64(42.123)}, + time.Unix(1658774489, 0), + ), + }, + }, + } + + for _, tt := range tests { + for _, endianess := range []string{"be", "le", "host"} { + name := fmt.Sprintf("%s (%s)", tt.name, endianess) + t.Run(name, func(t *testing.T) { + parser := &Parser{ + Endianess: endianess, + Configs: []Config{{Entries: tt.entries}}, + Log: testutil.Logger{Name: "parsers.binary"}, + metricName: "binary", + } + require.NoError(t, parser.Init()) + + order := determineEndianess(endianess) + data, err := generateBinary(tt.data, order) + require.NoError(t, err) + + metrics, err := parser.Parse(data) + require.NoError(t, err) + + var options []cmp.Option + if tt.ignoreTime { + options = append(options, testutil.IgnoreTime()) + } + testutil.RequireMetricsEqual(t, tt.expected, metrics, options...) + }) + } + } +} + +func TestCases(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + require.NotEmpty(t, folders) + + // Register the plugin + inputs.Add("file", func() telegraf.Input { + return &file.File{} + }) + + // Prepare the influx parser for expectations + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + for _, f := range folders { + testcasePath := filepath.Join("testcases", f.Name()) + configFilename := filepath.Join(testcasePath, "telegraf.conf") + expectedFilename := filepath.Join(testcasePath, "expected.out") + expectedErrorFilename := filepath.Join(testcasePath, "expected.err") + + t.Run(f.Name(), func(t *testing.T) { + // Read the expected output if any + var expected []telegraf.Metric + if _, err := os.Stat(expectedFilename); err == nil { + var err error + expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser) + require.NoError(t, err) + } + + // Read the expected errors if any + var expectedErrors []string + if _, err := os.Stat(expectedErrorFilename); err == nil { + var err error + expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename) + require.NoError(t, err) + require.NotEmpty(t, expectedErrors) + } + + // Configure the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(configFilename)) + require.NoError(t, err) + + // Gather the metrics from the input file configure + var acc testutil.Accumulator + var actualErrors []string + for _, input := range cfg.Inputs { + require.NoError(t, input.Init()) + if err := input.Gather(&acc); err != nil { + actualErrors = append(actualErrors, err.Error()) + } + } + + // Check for potential errors + require.ElementsMatch(t, actualErrors, expectedErrors) + + // Process expected metrics and compare with resulting metrics + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual) + }) + } +} diff --git a/plugins/parsers/binary/testcases/multiple_messages/expected.out b/plugins/parsers/binary/testcases/multiple_messages/expected.out new file mode 100644 index 000000000..431c1592d --- /dev/null +++ b/plugins/parsers/binary/testcases/multiple_messages/expected.out @@ -0,0 +1,3 @@ +metricA,address=383,failure=false count=42i,value=3.1415 1658835984000000000 +metricB value=3737169374u 1658835984000000000 +metricC x=2.718280076980591,y=0.0000000000000000000000000000000006626070178575745 1658835984000000000 diff --git a/plugins/parsers/binary/testcases/multiple_messages/messageA.bin b/plugins/parsers/binary/testcases/multiple_messages/messageA.bin new file mode 100644 index 000000000..14f7c1016 Binary files /dev/null and b/plugins/parsers/binary/testcases/multiple_messages/messageA.bin differ diff --git a/plugins/parsers/binary/testcases/multiple_messages/messageB.bin b/plugins/parsers/binary/testcases/multiple_messages/messageB.bin new file mode 100644 index 000000000..421310aba Binary files /dev/null and b/plugins/parsers/binary/testcases/multiple_messages/messageB.bin differ diff --git a/plugins/parsers/binary/testcases/multiple_messages/messageC.bin b/plugins/parsers/binary/testcases/multiple_messages/messageC.bin new file mode 100644 index 000000000..bdf6e1710 Binary files /dev/null and b/plugins/parsers/binary/testcases/multiple_messages/messageC.bin differ diff --git a/plugins/parsers/binary/testcases/multiple_messages/telegraf.conf b/plugins/parsers/binary/testcases/multiple_messages/telegraf.conf new file mode 100644 index 000000000..65e56d150 --- /dev/null +++ b/plugins/parsers/binary/testcases/multiple_messages/telegraf.conf @@ -0,0 +1,46 @@ +[[inputs.file]] + files = ["./testcases/multiple_messages/messageA.bin", "./testcases/multiple_messages/messageB.bin", "./testcases/multiple_messages/messageC.bin"] + data_format = "binary" + endianess = "le" + + [[inputs.file.binary]] + metric_name = "metricA" + + entries = [ + { bits = 32, omit = true }, + { name = "address", type = "uint16", assignment = "tag" }, + { name = "count", type = "int16" }, + { name = "failure", type = "bool", bits = 32, assignment = "tag" }, + { name = "value", type = "float64" }, + { type = "unix", assignment = "time" }, + ] + + [inputs.file.binary.filter] + selection = [ + { offset = 16, bits = 8, match = "0x0A" }, + ] + + [[inputs.file.binary]] + metric_name = "metricB" + + entries = [ + { bits = 32, omit = true }, + { name = "value", type = "uint32" }, + { type = "unix", assignment = "time" }, + ] + + [inputs.file.binary.filter] + selection = [{ offset = 16, bits = 8, match = "0x0B" }] + + [[inputs.file.binary]] + metric_name = "metricC" + + entries = [ + { bits = 32, omit = true }, + { name = "x", type = "float32" }, + { name = "y", type = "float32" }, + { type = "unix", assignment = "time" }, + ] + + [inputs.file.binary.filter] + selection = [{ offset = 16, bits = 8, match = "0x0C" }] diff --git a/plugins/parsers/registry_test.go b/plugins/parsers/registry_test.go index 6623c720f..50b08906e 100644 --- a/plugins/parsers/registry_test.go +++ b/plugins/parsers/registry_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/parsers" _ "github.com/influxdata/telegraf/plugins/parsers/all" ) @@ -43,7 +44,14 @@ func TestRegistry_BackwardCompatibility(t *testing.T) { }, } + // Define parsers that do not have an old-school init + newStyleOnly := []string{"binary"} + for name, creator := range parsers.Parsers { + if choice.Contains(name, newStyleOnly) { + t.Logf("skipping new-style-only %q...", name) + continue + } t.Logf("testing %q...", name) cfg.DataFormat = name