From 5d669e27247f57c683d320b777e80132fc62bf94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A9=98=E5=AD=90?= <70060096+citrusreticulata@users.noreply.github.com> Date: Wed, 10 Aug 2022 12:00:41 -0700 Subject: [PATCH] feat(outputs.iotdb): Add new output plugin to support Apache IoTDB (#11557) --- docs/LICENSE_OF_DEPENDENCIES.md | 1 + go.mod | 3 +- go.sum | 7 +- plugins/outputs/all/all.go | 1 + plugins/outputs/iotdb/README.md | 125 ++++++ plugins/outputs/iotdb/iotdb.go | 276 +++++++++++++ plugins/outputs/iotdb/iotdb_test.go | 577 ++++++++++++++++++++++++++++ plugins/outputs/iotdb/sample.conf | 45 +++ 8 files changed, 1032 insertions(+), 3 deletions(-) create mode 100644 plugins/outputs/iotdb/README.md create mode 100644 plugins/outputs/iotdb/iotdb.go create mode 100644 plugins/outputs/iotdb/iotdb_test.go create mode 100644 plugins/outputs/iotdb/sample.conf diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index ec960d729..a9c23efe0 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -36,6 +36,7 @@ following works: - github.com/antchfx/xmlquery [MIT License](https://github.com/antchfx/xmlquery/blob/master/LICENSE) - github.com/antchfx/xpath [MIT License](https://github.com/antchfx/xpath/blob/master/LICENSE) - github.com/apache/arrow/go/arrow [Apache License 2.0](https://github.com/apache/arrow/blob/master/LICENSE.txt) +- github.com/apache/iotdb-client-go [Apache License 2.0](https://github.com/apache/iotdb-client-go/blob/main/LICENSE) - github.com/apache/thrift [Apache License 2.0](https://github.com/apache/thrift/blob/master/LICENSE) - github.com/aristanetworks/glog [Apache License 2.0](https://github.com/aristanetworks/glog/blob/master/LICENSE) - github.com/aristanetworks/goarista [Apache License 2.0](https://github.com/aristanetworks/goarista/blob/master/COPYING) diff --git a/go.mod b/go.mod index ca06a9834..7c091f2b2 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/Shopify/sarama v1.35.0 github.com/aerospike/aerospike-client-go/v5 v5.7.0 github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 - github.com/aliyun/alibaba-cloud-sdk-go v1.61.1529 + github.com/aliyun/alibaba-cloud-sdk-go v1.61.1695 github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 github.com/antchfx/jsonquery v1.3.0 github.com/antchfx/xmlquery v1.3.12 @@ -211,6 +211,7 @@ require ( github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/alecthomas/participle v0.4.1 // indirect github.com/apache/arrow/go/arrow v0.0.0-20211006091945-a69884db78f4 // indirect + github.com/apache/iotdb-client-go v0.12.2-0.20220722111104-cd17da295b46 github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 // indirect github.com/armon/go-metrics v0.3.3 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.3 // indirect diff --git a/go.sum b/go.sum index e9207b837..77567969e 100644 --- a/go.sum +++ b/go.sum @@ -261,8 +261,8 @@ github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:C github.com/alexkohler/prealloc v1.0.0/go.mod h1:VetnK3dIgFBBKmg0YnD9F9x6Icjd+9cvfHR56wJVlKE= github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk= -github.com/aliyun/alibaba-cloud-sdk-go v1.61.1529 h1:qAt5MZ3Ukwx/JMAiaagQhNQMBZLcmJbnweBoK3EeHxI= -github.com/aliyun/alibaba-cloud-sdk-go v1.61.1529/go.mod h1:RcDobYh8k5VP6TNybz9m++gL3ijVI5wueVr0EM10VsU= +github.com/aliyun/alibaba-cloud-sdk-go v1.61.1695 h1:ISpCPksXFyDOLO/8OQAdO6pKkC31NKldG6q+lPxM/ao= +github.com/aliyun/alibaba-cloud-sdk-go v1.61.1695/go.mod h1:RcDobYh8k5VP6TNybz9m++gL3ijVI5wueVr0EM10VsU= github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 h1:FXrPTd8Rdlc94dKccl7KPmdmIbVh/OjelJ8/vgMRzcQ= github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= @@ -282,10 +282,13 @@ github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VT github.com/apache/arrow/go/arrow v0.0.0-20210818145353-234c94e4ce64/go.mod h1:2qMFB56yOP3KzkB3PbYZ4AlUFg3a88F67TIx5lB/WwY= github.com/apache/arrow/go/arrow v0.0.0-20211006091945-a69884db78f4 h1:nPUln5QTzhftSpmld3xcXw/GOJ3z1E8fR8tUrrc0YWk= github.com/apache/arrow/go/arrow v0.0.0-20211006091945-a69884db78f4/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= +github.com/apache/iotdb-client-go v0.12.2-0.20220722111104-cd17da295b46 h1:28HyUQcr8ZCyCAatR0gkf9PuLr52U2T+66tx5Th0nxI= +github.com/apache/iotdb-client-go v0.12.2-0.20220722111104-cd17da295b46/go.mod h1:1z89VPGCUGHGqxkPW8p2Haq6WJwrRBKZN+WOjDBiQQM= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.14.1/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY= diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 4d5e9b56b..86a8472ce 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -28,6 +28,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/influxdb" _ "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2" _ "github.com/influxdata/telegraf/plugins/outputs/instrumental" + _ "github.com/influxdata/telegraf/plugins/outputs/iotdb" _ "github.com/influxdata/telegraf/plugins/outputs/kafka" _ "github.com/influxdata/telegraf/plugins/outputs/kinesis" _ "github.com/influxdata/telegraf/plugins/outputs/librato" diff --git a/plugins/outputs/iotdb/README.md b/plugins/outputs/iotdb/README.md new file mode 100644 index 000000000..0021b905d --- /dev/null +++ b/plugins/outputs/iotdb/README.md @@ -0,0 +1,125 @@ +# IoTDB Output Plugin + +This output plugin saves Telegraf metrics to an Apache IoTDB backend, +supporting session connection and data insertion. + +## Apache IoTDB + +Apache IoTDB (Database for Internet of Things) is an IoT native database with +high performance for data management and analysis, deployable on the edge and +the cloud. Due to its light-weight architecture, high performance and rich +feature set together with its deep integration with Apache Hadoop, Spark and +Flink, Apache IoTDB can meet the requirements of massive data storage, +high-speed data ingestion and complex data analysis in the IoT industrial +fields. + +For more details consult the [Apache IoTDB website](https://iotdb.apache.org) +or the [Apache IoTDB GitHub page](https://github.com/apache/iotdb). + +## Getting started + +Before using this plugin, please configure the IP address, port number, +user name, password and other information of the database server, +as well as some data type conversion, time unit and other configurations. + +Please see the [configuration section](#Configuration) for an example +configuration. + +## Metric Translation + +IoTDB uses a different data format for metric data than telegraf. It is +important to note that depending on the metrics being written, the translation +may be lossy. This plugin translates to IoTDB format in the following ways: + +### Unsigned Integers + +IoTDB currently **DOES NOT support unsigned integer**. +There are three available options of converting uint64, which are specified by +setting `uint64_conversion`. + +- `int64_clip`, default option. If an unsigned integer is greater than +`math.MaxInt64`, save it as `int64`; else save `math.MaxInt64` +(9223372036854775807). +- `int64`, force converting an unsigned integer to a`int64`,no mater +what the value it is. This option may lead to exception if the value is +greater than `int64`. +- `text`force converting an unsigned integer to a string, no mater what the +value it is. + +### Time Precision + +IoTDB supports a variety of time precision. You can specify which precision +you want using the `timestamp_precision` setting. Default is `nanosecond`. +Other options are `second`, `millisecond`, `microsecond`. + +### Metadata (tags) + +IoTDB uses a tree model for metadata while Telegraf uses a tag model. +(See [InfluxDB-Protocol Adapter]( +https://iotdb.apache.org/UserGuide/Master/API/InfluxDB-Protocol.html) +There are two available options of converting tags, which are specified by +setting `convert_tags_to`: + +- `fields`. Treat Tags as measurements. For each Key:Value in Tag, +convert them into Measurement, Value, DataType, which are supported in IoTDB. +- `device_id`, default option. Treat Tags as part of device id. Tags +constitute a subtree of `Name`. + +For example, there is a metric: + +```markdown +Name="root.sg.device", Tags={tag1="private", tag2="working"}, Fields={s1=100, s2="hello"} +``` + +- `fields`, result: `root.sg.device, s1=100, s2="hello", tag1="private", tag2="working"` +- `device_id`, result: `root.sg.device.private.working, s1=100, s2="hello"` + +## Configuration + +```toml @sample.conf +# Save metrics to an IoTDB Database +[[outputs.iotdb]] + ## Configuration of IoTDB server connection + host = "127.0.0.1" + # port = "6667" + + ## Configuration of authentication + # user = "root" + # password = "root" + + ## Timeout to open a new session. + ## A value of zero means no timeout. + # timeout = "5s" + + ## Configuration of type conversion for 64-bit unsigned int + ## IoTDB currently DOES NOT support unsigned integers (version 13.x). + ## 32-bit unsigned integers are safely converted into 64-bit signed integers by the plugin, + ## however, this is not true for 64-bit values in general as overflows may occur. + ## The following setting allows to specify the handling of 64-bit unsigned integers. + ## Available values are: + ## - "int64" -- convert to 64-bit signed integers and accept overflows + ## - "int64_clip" -- convert to 64-bit signed integers and clip the values on overflow to 9,223,372,036,854,775,807 + ## - "text" -- convert to the string representation of the value + # uint64_conversion = "int64_clip" + + ## Configuration of TimeStamp + ## TimeStamp is always saved in 64bits int. timestamp_precision specifies the unit of timestamp. + ## Available value: + ## "second", "millisecond", "microsecond", "nanosecond"(default) + # timestamp_precision = "nanosecond" + + ## Handling of tags + ## Tags are not fully supported by IoTDB. + ## A guide with suggestions on how to handle tags can be found here: + ## https://iotdb.apache.org/UserGuide/Master/API/InfluxDB-Protocol.html + ## + ## Available values are: + ## - "fields" -- convert tags to fields in the measurement + ## - "device_id" -- attach tags to the device ID + ## + ## For Example, a metric named "root.sg.device" with the tags `tag1: "private"` and `tag2: "working"` and + ## fields `s1: 100` and `s2: "hello"` will result in the following representations in IoTDB + ## - "fields" -- root.sg.device, s1=100, s2="hello", tag1="private", tag2="working" + ## - "device_id" -- root.sg.device.private.working, s1=100, s2="hello" + # convert_tags_to = "device_id" +``` diff --git a/plugins/outputs/iotdb/iotdb.go b/plugins/outputs/iotdb/iotdb.go new file mode 100644 index 000000000..50a5e0509 --- /dev/null +++ b/plugins/outputs/iotdb/iotdb.go @@ -0,0 +1,276 @@ +//go:generate ../../../tools/readme_config_includer/generator +package iotdb + +import ( + _ "embed" + "errors" + "fmt" + "math" + "strconv" + "strings" + "time" + + "github.com/apache/iotdb-client-go/client" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/plugins/outputs" +) + +// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data. +//go:embed sample.conf +var sampleConfig string + +type IoTDB struct { + Host string `toml:"host"` + Port string `toml:"port"` + User string `toml:"user"` + Password string `toml:"password"` + Timeout config.Duration `toml:"timeout"` + ConvertUint64To string `toml:"uint64_conversion"` + TimeStampUnit string `toml:"timestamp_precision"` + TreatTagsAs string `toml:"convert_tags_to"` + Log telegraf.Logger `toml:"-"` + + session *client.Session +} + +type recordsWithTags struct { + // IoTDB Records basic data struct + DeviceIDList []string + MeasurementsList [][]string + ValuesList [][]interface{} + DataTypesList [][]client.TSDataType + TimestampList []int64 + // extra tags + TagsList [][]*telegraf.Tag +} + +func (*IoTDB) SampleConfig() string { + return sampleConfig +} + +// Init is for setup, and validating config. +func (s *IoTDB) Init() error { + if s.Timeout < 0 { + return errors.New("negative timeout") + } + if !choice.Contains(s.ConvertUint64To, []string{"int64", "int64_clip", "text"}) { + return fmt.Errorf("unknown 'uint64_conversion' method %q", s.ConvertUint64To) + } + if !choice.Contains(s.TimeStampUnit, []string{"second", "millisecond", "microsecond", "nanosecond"}) { + return fmt.Errorf("unknown 'timestamp_precision' method %q", s.TimeStampUnit) + } + if !choice.Contains(s.TreatTagsAs, []string{"fields", "device_id"}) { + return fmt.Errorf("unknown 'convert_tags_to' method %q", s.TreatTagsAs) + } + s.Log.Info("Initialization completed.") + return nil +} + +func (s *IoTDB) Connect() error { + sessionConf := &client.Config{ + Host: s.Host, + Port: s.Port, + UserName: s.User, + Password: s.Password, + } + var ss = client.NewSession(sessionConf) + s.session = &ss + timeoutInMs := int(time.Duration(s.Timeout).Milliseconds()) + if err := s.session.Open(false, timeoutInMs); err != nil { + return fmt.Errorf("connecting to %s:%s failed: %w", s.Host, s.Port, err) + } + return nil +} + +func (s *IoTDB) Close() error { + _, err := s.session.Close() + return err +} + +// Write should write immediately to the output, and not buffer writes +// (Telegraf manages the buffer for you). Returning an error will fail this +// batch of writes and the entire batch will be retried automatically. +func (s *IoTDB) Write(metrics []telegraf.Metric) error { + // Convert Metrics to Records with Tags + rwt, err := s.convertMetricsToRecordsWithTags(metrics) + if err != nil { + return err + } + // Write to client. + // If first writing fails, the client will automatically retry three times. If all fail, it returns an error. + if err := s.writeRecordsWithTags(rwt); err != nil { + return fmt.Errorf("write failed: %w", err) + } + return nil +} + +// Find out data type of the value and return it's id in TSDataType, and convert it if necessary. +func (s *IoTDB) getDataTypeAndValue(value interface{}) (client.TSDataType, interface{}) { + switch v := value.(type) { + case int32: + return client.INT32, v + case int64: + return client.INT64, v + case uint32: + return client.INT64, int64(v) + case uint64: + switch s.ConvertUint64To { + case "int64_clip": + if v <= uint64(math.MaxInt64) { + return client.INT64, int64(v) + } + return client.INT64, int64(math.MaxInt64) + case "int64": + return client.INT64, int64(v) + case "text": + return client.TEXT, strconv.FormatUint(v, 10) + default: + return client.UNKNOW, int64(0) + } + case float64: + return client.DOUBLE, v + case string: + return client.TEXT, v + case bool: + return client.BOOLEAN, v + default: + return client.UNKNOW, int64(0) + } +} + +// convert Timestamp Unit according to config +func (s *IoTDB) convertTimestampOfMetric(m telegraf.Metric) (int64, error) { + switch s.TimeStampUnit { + case "second": + return m.Time().Unix(), nil + case "millisecond": + return m.Time().UnixMilli(), nil + case "microsecond": + return m.Time().UnixMicro(), nil + case "nanosecond": + return m.Time().UnixNano(), nil + default: + return 0, fmt.Errorf("unknown timestamp_precision %q", s.TimeStampUnit) + } +} + +// convert Metrics to Records with tags +func (s *IoTDB) convertMetricsToRecordsWithTags(metrics []telegraf.Metric) (*recordsWithTags, error) { + var deviceidList []string + var measurementsList [][]string + var valuesList [][]interface{} + var dataTypesList [][]client.TSDataType + var timestampList []int64 + var tagsList [][]*telegraf.Tag + + for _, metric := range metrics { + // write `metric` to the output sink here + var tags []*telegraf.Tag + tags = append(tags, metric.TagList()...) + // deal with basic parameter + var keys []string + var values []interface{} + var dataTypes []client.TSDataType + for _, field := range metric.FieldList() { + datatype, value := s.getDataTypeAndValue(field.Value) + if datatype == client.UNKNOW { + return nil, fmt.Errorf("datatype of %q is unknown, values: %v", field.Key, field.Value) + } + keys = append(keys, field.Key) + values = append(values, value) + dataTypes = append(dataTypes, datatype) + } + // Convert timestamp into specified unit + ts, err := s.convertTimestampOfMetric(metric) + if err != nil { + return nil, err + } + timestampList = append(timestampList, ts) + // append all metric data of this record to lists + deviceidList = append(deviceidList, metric.Name()) + measurementsList = append(measurementsList, keys) + valuesList = append(valuesList, values) + dataTypesList = append(dataTypesList, dataTypes) + tagsList = append(tagsList, tags) + } + rwt := &recordsWithTags{ + DeviceIDList: deviceidList, + MeasurementsList: measurementsList, + ValuesList: valuesList, + DataTypesList: dataTypesList, + TimestampList: timestampList, + TagsList: tagsList, + } + return rwt, nil +} + +// modify recordsWithTags according to 'TreatTagsAs' Configuration +func (s *IoTDB) modifyRecordsWithTags(rwt *recordsWithTags) error { + switch s.TreatTagsAs { + case "fields": + // method 1: treat Tag(Key:Value) as measurement + for index, tags := range rwt.TagsList { // for each record + for _, tag := range tags { // for each tag of this record, append it's Key:Value to measurements + datatype, value := s.getDataTypeAndValue(tag.Value) + if datatype == client.UNKNOW { + return fmt.Errorf("datatype of %q is unknown, values: %v", tag.Key, value) + } + rwt.MeasurementsList[index] = append(rwt.MeasurementsList[index], tag.Key) + rwt.ValuesList[index] = append(rwt.ValuesList[index], value) + rwt.DataTypesList[index] = append(rwt.DataTypesList[index], datatype) + } + } + return nil + case "device_id": + // method 2: treat Tag(Key:Value) as subtree of device id + for index, tags := range rwt.TagsList { // for each record + topic := []string{rwt.DeviceIDList[index]} + for _, tag := range tags { // for each tag, append it's Value + topic = append(topic, tag.Value) + } + rwt.DeviceIDList[index] = strings.Join(topic, ".") + } + return nil + default: + // something go wrong. This configuration should have been checked in func Init(). + return fmt.Errorf("unknown 'convert_tags_to' method: %q", s.TreatTagsAs) + } +} + +// Write records with tags to IoTDB server +func (s *IoTDB) writeRecordsWithTags(rwt *recordsWithTags) error { + // deal with tags + if err := s.modifyRecordsWithTags(rwt); err != nil { + return err + } + // write to IoTDB server + status, err := s.session.InsertRecords(rwt.DeviceIDList, rwt.MeasurementsList, + rwt.DataTypesList, rwt.ValuesList, rwt.TimestampList) + if status != nil { + if verifyResult := client.VerifySuccess(status); verifyResult != nil { + s.Log.Debug(verifyResult) + } + } + return err +} + +func init() { + outputs.Add("iotdb", func() telegraf.Output { return newIoTDB() }) +} + +// create a new IoTDB struct with default values. +func newIoTDB() *IoTDB { + return &IoTDB{ + Host: "localhost", + Port: "6667", + User: "root", + Password: "root", + Timeout: config.Duration(time.Second * 5), + ConvertUint64To: "int64_clip", + TimeStampUnit: "nanosecond", + TreatTagsAs: "device_id", + } +} diff --git a/plugins/outputs/iotdb/iotdb_test.go b/plugins/outputs/iotdb/iotdb_test.go new file mode 100644 index 000000000..d70c7e08b --- /dev/null +++ b/plugins/outputs/iotdb/iotdb_test.go @@ -0,0 +1,577 @@ +package iotdb + +import ( + "math" + "strconv" + "testing" + "time" + + "github.com/apache/iotdb-client-go/client" + "github.com/docker/go-connections/nat" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" +) + +// newMetricWithOrderedFields creates new Metric and makes sure fields are in +// order. This is required to define the expected output where the field order +// needs to be defines. +func newMetricWithOrderedFields( + name string, + tags []telegraf.Tag, + fields []telegraf.Field, + timestamp time.Time, +) telegraf.Metric { + m := metric.New(name, map[string]string{}, map[string]interface{}{}, timestamp) + for _, tag := range tags { + m.AddTag(tag.Key, tag.Value) + } + for _, field := range fields { + m.AddField(field.Key, field.Value) + } + return m +} + +func TestInitInvalid(t *testing.T) { + tests := []struct { + name string + plugin *IoTDB + expected string + }{ + { + name: "empty tag-conversion", + plugin: func() *IoTDB { + s := newIoTDB() + s.TreatTagsAs = "" + s.Log = &testutil.Logger{} + return s + }(), + expected: `unknown 'convert_tags_to' method ""`, + }, + { + name: "empty uint-conversion", + plugin: func() *IoTDB { + s := newIoTDB() + s.ConvertUint64To = "" + s.Log = &testutil.Logger{} + return s + }(), + expected: `unknown 'uint64_conversion' method ""`, + }, + { + name: "empty timestamp precision", + plugin: func() *IoTDB { + s := newIoTDB() + s.TimeStampUnit = "" + s.Log = &testutil.Logger{} + return s + }(), + expected: `unknown 'timestamp_precision' method ""`, + }, + { + name: "invalid tag-conversion", + plugin: func() *IoTDB { + s := newIoTDB() + s.TreatTagsAs = "garbage" + s.Log = &testutil.Logger{} + return s + }(), + expected: `unknown 'convert_tags_to' method "garbage"`, + }, + { + name: "invalid uint-conversion", + plugin: func() *IoTDB { + s := newIoTDB() + s.ConvertUint64To = "garbage" + s.Log = &testutil.Logger{} + return s + }(), + expected: `unknown 'uint64_conversion' method "garbage"`, + }, + { + name: "invalid timestamp precision", + plugin: func() *IoTDB { + s := newIoTDB() + s.TimeStampUnit = "garbage" + s.Log = &testutil.Logger{} + return s + }(), + expected: `unknown 'timestamp_precision' method "garbage"`, + }, + { + name: "negative timeout", + plugin: func() *IoTDB { + s := newIoTDB() + s.Timeout = config.Duration(time.Second * -5) + s.Log = &testutil.Logger{} + return s + }(), + expected: `negative timeout`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.EqualError(t, tt.plugin.Init(), tt.expected) + }) + } +} + +// Test Metric conversion, which means testing function `convertMetricsToRecordsWithTags` +func TestMetricConversionToRecordsWithTags(t *testing.T) { + var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC) + + tests := []struct { + name string + plugin *IoTDB + expected recordsWithTags + metrics []telegraf.Metric + }{ + { + name: "default config", + plugin: func() *IoTDB { s := newIoTDB(); return s }(), + expected: recordsWithTags{ + DeviceIDList: []string{"root.computer.fan", "root.computer.fan", "root.computer.keyboard"}, + MeasurementsList: [][]string{ + {"temperature", "counter"}, + {"counter", "temperature"}, + {"temperature", "counter", "unsigned_big", "string", "bool", "int_text"}, + }, + ValuesList: [][]interface{}{ + {float64(42.55), int64(987654321)}, + {int64(123456789), float64(56.24)}, + {float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"}, + }, + DataTypesList: [][]client.TSDataType{ + {client.DOUBLE, client.INT64}, + {client.INT64, client.DOUBLE}, + {client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT}, + }, + TimestampList: []int64{testTimestamp.UnixNano(), testTimestamp.UnixNano(), testTimestamp.UnixNano()}, + }, + metrics: []telegraf.Metric{ + newMetricWithOrderedFields( + "root.computer.fan", + []telegraf.Tag{ + {Key: "price", Value: "expensive"}, + {Key: "owner", Value: "cpu"}, + }, + []telegraf.Field{ + {Key: "temperature", Value: float64(42.55)}, + {Key: "counter", Value: int64(987654321)}, + }, + testTimestamp, + ), + newMetricWithOrderedFields( + "root.computer.fan", + []telegraf.Tag{ // same keys in different order + {Key: "owner", Value: "gpu"}, + {Key: "price", Value: "cheap"}, + }, + []telegraf.Field{ // same keys in different order + {Key: "counter", Value: int64(123456789)}, + {Key: "temperature", Value: float64(56.24)}, + }, + testTimestamp, + ), + newMetricWithOrderedFields( + "root.computer.keyboard", + []telegraf.Tag{}, + []telegraf.Field{ + {Key: "temperature", Value: float64(30.33)}, + {Key: "counter", Value: int64(123456789)}, + {Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)}, + {Key: "string", Value: "Made in China."}, + {Key: "bool", Value: bool(false)}, + {Key: "int_text", Value: "123456789011"}, + }, + testTimestamp, + ), + }, + }, + { + name: "unsigned int to text", + plugin: func() *IoTDB { cli002 := newIoTDB(); cli002.ConvertUint64To = "text"; return cli002 }(), + expected: recordsWithTags{ + DeviceIDList: []string{"root.computer.uint_to_text"}, + MeasurementsList: [][]string{{"unsigned_big"}}, + ValuesList: [][]interface{}{{strconv.FormatUint(uint64(math.MaxInt64+1000), 10)}}, + DataTypesList: [][]client.TSDataType{{client.TEXT}}, + TimestampList: []int64{testTimestamp.UnixNano()}, + }, + metrics: []telegraf.Metric{ + newMetricWithOrderedFields( + "root.computer.uint_to_text", + []telegraf.Tag{}, + []telegraf.Field{ + {Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)}, + }, + testTimestamp, + ), + }, + }, + { + name: "unsigned int to int with overflow", + plugin: func() *IoTDB { cli002 := newIoTDB(); cli002.ConvertUint64To = "int64"; return cli002 }(), + expected: recordsWithTags{ + DeviceIDList: []string{"root.computer.overflow"}, + MeasurementsList: [][]string{{"unsigned_big"}}, + ValuesList: [][]interface{}{{int64(-9223372036854774809)}}, + DataTypesList: [][]client.TSDataType{{client.INT64}}, + TimestampList: []int64{testTimestamp.UnixNano()}, + }, + metrics: []telegraf.Metric{ + newMetricWithOrderedFields( + "root.computer.overflow", + []telegraf.Tag{}, + []telegraf.Field{ + {Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)}, + }, + testTimestamp, + ), + }, + }, + { + name: "second timestamp precision", + plugin: func() *IoTDB { s := newIoTDB(); s.TimeStampUnit = "second"; return s }(), + expected: recordsWithTags{ + DeviceIDList: []string{"root.computer.second"}, + MeasurementsList: [][]string{{"unsigned_big"}}, + ValuesList: [][]interface{}{{int64(math.MaxInt64)}}, + DataTypesList: [][]client.TSDataType{{client.INT64}}, + TimestampList: []int64{testTimestamp.Unix()}, + }, + metrics: []telegraf.Metric{ + newMetricWithOrderedFields( + "root.computer.second", + []telegraf.Tag{}, + []telegraf.Field{ + {Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)}, + }, + testTimestamp, + ), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.plugin.Log = &testutil.Logger{} + require.NoError(t, tt.plugin.Init()) + actual, err := tt.plugin.convertMetricsToRecordsWithTags(tt.metrics) + require.NoError(t, err) + // Ignore the tags-list for comparison + actual.TagsList = nil + require.EqualValues(t, &tt.expected, actual) + }) + } +} + +// Test tags handling, which means testing function `modifyRecordsWithTags` +func TestTagsHandling(t *testing.T) { + var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC) + + tests := []struct { + name string + plugin *IoTDB + expected recordsWithTags + input recordsWithTags + }{ + { //treat tags as fields. And input Tags are NOT in order. + name: "treat tags as fields", + plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "fields"; return s }(), + expected: recordsWithTags{ + DeviceIDList: []string{"root.computer.fields"}, + MeasurementsList: [][]string{{"temperature", "counter", "owner", "price"}}, + ValuesList: [][]interface{}{ + {float64(42.55), int64(987654321), "cpu", "expensive"}, + }, + DataTypesList: [][]client.TSDataType{ + {client.DOUBLE, client.INT64, client.TEXT, client.TEXT}, + }, + TimestampList: []int64{testTimestamp.UnixNano()}, + }, + input: recordsWithTags{ + DeviceIDList: []string{"root.computer.fields"}, + MeasurementsList: [][]string{{"temperature", "counter"}}, + ValuesList: [][]interface{}{ + {float64(42.55), int64(987654321)}, + }, + DataTypesList: [][]client.TSDataType{ + {client.DOUBLE, client.INT64}, + }, + TimestampList: []int64{testTimestamp.UnixNano()}, + TagsList: [][]*telegraf.Tag{{ + {Key: "owner", Value: "cpu"}, + {Key: "price", Value: "expensive"}, + }}, + }, + }, + { //treat tags as device IDs. And input Tags are in order. + name: "treat tags as device IDs", + plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "device_id"; return s }(), + expected: recordsWithTags{ + DeviceIDList: []string{"root.computer.deviceID.cpu.expensive"}, + MeasurementsList: [][]string{{"temperature", "counter"}}, + ValuesList: [][]interface{}{ + {float64(42.55), int64(987654321)}, + }, + DataTypesList: [][]client.TSDataType{ + {client.DOUBLE, client.INT64}, + }, + TimestampList: []int64{testTimestamp.UnixNano()}, + }, + input: recordsWithTags{ + DeviceIDList: []string{"root.computer.deviceID"}, + MeasurementsList: [][]string{{"temperature", "counter"}}, + ValuesList: [][]interface{}{ + {float64(42.55), int64(987654321)}, + }, + DataTypesList: [][]client.TSDataType{ + {client.DOUBLE, client.INT64}, + }, + TimestampList: []int64{testTimestamp.UnixNano()}, + TagsList: [][]*telegraf.Tag{{ + {Key: "owner", Value: "cpu"}, + {Key: "price", Value: "expensive"}, + }}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.plugin.Log = &testutil.Logger{} + require.NoError(t, tt.plugin.Init()) + require.NoError(t, tt.plugin.modifyRecordsWithTags(&tt.input)) + // Ignore the tags-list for comparison + tt.input.TagsList = nil + require.EqualValues(t, tt.expected, tt.input) + }) + } +} + +// Test entire Metric conversion, from metrics to records which are ready to insert +func TestEntireMetricConversion(t *testing.T) { + var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC) + + tests := []struct { + name string + plugin *IoTDB + expected recordsWithTags + metrics []telegraf.Metric + requireEqual bool + }{ + { + name: "default config", + plugin: func() *IoTDB { s := newIoTDB(); return s }(), + expected: recordsWithTags{ + DeviceIDList: []string{"root.computer.screen.high.LED"}, + MeasurementsList: [][]string{ + {"temperature", "counter", "unsigned_big", "string", "bool", "int_text"}, + }, + ValuesList: [][]interface{}{ + {float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"}, + }, + DataTypesList: [][]client.TSDataType{ + {client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT}, + }, + TimestampList: []int64{testTimestamp.UnixNano()}, + }, + metrics: []telegraf.Metric{ + newMetricWithOrderedFields( + "root.computer.screen", + []telegraf.Tag{ + {Key: "brightness", Value: "high"}, + {Key: "type", Value: "LED"}, + }, + []telegraf.Field{ + {Key: "temperature", Value: float64(30.33)}, + {Key: "counter", Value: int64(123456789)}, + {Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)}, + {Key: "string", Value: "Made in China."}, + {Key: "bool", Value: bool(false)}, + {Key: "int_text", Value: "123456789011"}, + }, + testTimestamp, + ), + }, + requireEqual: true, + }, + { + name: "wrong order of tags", + plugin: func() *IoTDB { s := newIoTDB(); return s }(), + expected: recordsWithTags{ + DeviceIDList: []string{"root.computer.screen.LED.high"}, + MeasurementsList: [][]string{ + {"temperature", "counter", "unsigned_big", "string", "bool", "int_text"}, + }, + ValuesList: [][]interface{}{ + {float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"}, + }, + DataTypesList: [][]client.TSDataType{ + {client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT}, + }, + TimestampList: []int64{testTimestamp.UnixNano()}, + }, + metrics: []telegraf.Metric{ + newMetricWithOrderedFields( + "root.computer.screen", + []telegraf.Tag{ + {Key: "brightness", Value: "high"}, + {Key: "type", Value: "LED"}, + }, + []telegraf.Field{ + {Key: "temperature", Value: float64(30.33)}, + {Key: "counter", Value: int64(123456789)}, + {Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)}, + {Key: "string", Value: "Made in China."}, + {Key: "bool", Value: bool(false)}, + {Key: "int_text", Value: "123456789011"}, + }, + testTimestamp, + ), + }, + requireEqual: false, + }, + { + name: "wrong order of tags", + plugin: func() *IoTDB { s := newIoTDB(); return s }(), + expected: recordsWithTags{ + DeviceIDList: []string{"root.computer.screen.LED.high"}, + MeasurementsList: [][]string{ + {"temperature", "counter"}, + }, + ValuesList: [][]interface{}{ + {float64(30.33), int64(123456789)}, + }, + DataTypesList: [][]client.TSDataType{ + {client.DOUBLE, client.INT64}, + }, + TimestampList: []int64{testTimestamp.UnixNano()}, + }, + metrics: []telegraf.Metric{ + newMetricWithOrderedFields( + "root.computer.screen", + []telegraf.Tag{ + {Key: "brightness", Value: "high"}, + {Key: "type", Value: "LED"}, + }, + []telegraf.Field{ + {Key: "temperature", Value: float64(30.33)}, + {Key: "counter", Value: int64(123456789)}, + }, + testTimestamp, + ), + }, + requireEqual: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.plugin.Log = &testutil.Logger{} + require.NoError(t, tt.plugin.Init()) + actual, err := tt.plugin.convertMetricsToRecordsWithTags(tt.metrics) + require.NoError(t, err) + require.NoError(t, tt.plugin.modifyRecordsWithTags(actual)) + // Ignore the tags-list for comparison + actual.TagsList = nil + if tt.requireEqual { + require.EqualValues(t, &tt.expected, actual) + } else { + require.NotEqualValues(t, &tt.expected, actual) + } + }) + } +} + +// Start a container and do integration test. +func TestIntegrationInserts(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + const iotdbPort = "6667" + + container := testutil.Container{ + Image: "apache/iotdb:0.13.0-node", + ExposedPorts: []string{iotdbPort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(iotdbPort)), + wait.ForLog("IoTDB has started."), + ), + } + err := container.Start() + require.NoError(t, err, "failed to start IoTDB container") + defer func() { + require.NoError(t, container.Terminate(), "terminating IoTDB container failed") + }() + + t.Logf("Container Address:%q, ExposedPorts:[%v:%v]", container.Address, container.Ports[iotdbPort], iotdbPort) + // create a client and tests two groups of insertion + testClient := &IoTDB{ + Host: container.Address, + Port: container.Ports[iotdbPort], + User: "root", + Password: "root", + Timeout: config.Duration(time.Second * 5), + ConvertUint64To: "int64_clip", + TimeStampUnit: "nanosecond", + TreatTagsAs: "device_id", + } + testClient.Log = &testutil.Logger{} + + // generate Metrics to input + metrics := []telegraf.Metric{ + newMetricWithOrderedFields( + "root.computer.unsigned_big", + []telegraf.Tag{}, + []telegraf.Field{ + {Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)}, + }, + time.Now(), + ), + newMetricWithOrderedFields( + "root.computer.fan", + []telegraf.Tag{ + {Key: "price", Value: "expensive"}, + {Key: "owner", Value: "cpu"}, + }, + []telegraf.Field{ + {Key: "temperature", Value: float64(42.55)}, + {Key: "counter", Value: int64(987654321)}, + }, + time.Now(), + ), + newMetricWithOrderedFields( + "root.computer.fan", + []telegraf.Tag{ // same keys in different order + {Key: "owner", Value: "gpu"}, + {Key: "price", Value: "cheap"}, + }, + []telegraf.Field{ // same keys in different order + {Key: "counter", Value: int64(123456789)}, + {Key: "temperature", Value: float64(56.24)}, + }, + time.Now(), + ), + newMetricWithOrderedFields( + "root.computer.keyboard", + []telegraf.Tag{}, + []telegraf.Field{ + {Key: "temperature", Value: float64(30.33)}, + {Key: "counter", Value: int64(123456789)}, + {Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)}, + {Key: "string", Value: "Made in China."}, + {Key: "bool", Value: bool(false)}, + {Key: "int_text", Value: "123456789011"}, + }, + time.Now(), + ), + } + + require.NoError(t, testClient.Connect()) + require.NoError(t, testClient.Write(metrics)) + require.NoError(t, testClient.Close()) +} diff --git a/plugins/outputs/iotdb/sample.conf b/plugins/outputs/iotdb/sample.conf new file mode 100644 index 000000000..13858fa0f --- /dev/null +++ b/plugins/outputs/iotdb/sample.conf @@ -0,0 +1,45 @@ +# Save metrics to an IoTDB Database +[[outputs.iotdb]] + ## Configuration of IoTDB server connection + host = "127.0.0.1" + # port = "6667" + + ## Configuration of authentication + # user = "root" + # password = "root" + + ## Timeout to open a new session. + ## A value of zero means no timeout. + # timeout = "5s" + + ## Configuration of type conversion for 64-bit unsigned int + ## IoTDB currently DOES NOT support unsigned integers (version 13.x). + ## 32-bit unsigned integers are safely converted into 64-bit signed integers by the plugin, + ## however, this is not true for 64-bit values in general as overflows may occur. + ## The following setting allows to specify the handling of 64-bit unsigned integers. + ## Available values are: + ## - "int64" -- convert to 64-bit signed integers and accept overflows + ## - "int64_clip" -- convert to 64-bit signed integers and clip the values on overflow to 9,223,372,036,854,775,807 + ## - "text" -- convert to the string representation of the value + # uint64_conversion = "int64_clip" + + ## Configuration of TimeStamp + ## TimeStamp is always saved in 64bits int. timestamp_precision specifies the unit of timestamp. + ## Available value: + ## "second", "millisecond", "microsecond", "nanosecond"(default) + # timestamp_precision = "nanosecond" + + ## Handling of tags + ## Tags are not fully supported by IoTDB. + ## A guide with suggestions on how to handle tags can be found here: + ## https://iotdb.apache.org/UserGuide/Master/API/InfluxDB-Protocol.html + ## + ## Available values are: + ## - "fields" -- convert tags to fields in the measurement + ## - "device_id" -- attach tags to the device ID + ## + ## For Example, a metric named "root.sg.device" with the tags `tag1: "private"` and `tag2: "working"` and + ## fields `s1: 100` and `s2: "hello"` will result in the following representations in IoTDB + ## - "fields" -- root.sg.device, s1=100, s2="hello", tag1="private", tag2="working" + ## - "device_id" -- root.sg.device.private.working, s1=100, s2="hello" + # convert_tags_to = "device_id"