feat(outputs.iotdb): Add new output plugin to support Apache IoTDB (#11557)

This commit is contained in:
橘子 2022-08-10 12:00:41 -07:00 committed by GitHub
parent 4458e22f04
commit 5d669e2724
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1032 additions and 3 deletions

View File

@ -36,6 +36,7 @@ following works:
- github.com/antchfx/xmlquery [MIT License](https://github.com/antchfx/xmlquery/blob/master/LICENSE)
- github.com/antchfx/xpath [MIT License](https://github.com/antchfx/xpath/blob/master/LICENSE)
- github.com/apache/arrow/go/arrow [Apache License 2.0](https://github.com/apache/arrow/blob/master/LICENSE.txt)
- github.com/apache/iotdb-client-go [Apache License 2.0](https://github.com/apache/iotdb-client-go/blob/main/LICENSE)
- github.com/apache/thrift [Apache License 2.0](https://github.com/apache/thrift/blob/master/LICENSE)
- github.com/aristanetworks/glog [Apache License 2.0](https://github.com/aristanetworks/glog/blob/master/LICENSE)
- github.com/aristanetworks/goarista [Apache License 2.0](https://github.com/aristanetworks/goarista/blob/master/COPYING)

3
go.mod
View File

@ -23,7 +23,7 @@ require (
github.com/Shopify/sarama v1.35.0
github.com/aerospike/aerospike-client-go/v5 v5.7.0
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1529
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1695
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9
github.com/antchfx/jsonquery v1.3.0
github.com/antchfx/xmlquery v1.3.12
@ -211,6 +211,7 @@ require (
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/alecthomas/participle v0.4.1 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211006091945-a69884db78f4 // indirect
github.com/apache/iotdb-client-go v0.12.2-0.20220722111104-cd17da295b46
github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3 // indirect
github.com/armon/go-metrics v0.3.3 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.3 // indirect

7
go.sum
View File

@ -261,8 +261,8 @@ github.com/alexflint/go-filemutex v0.0.0-20171022225611-72bdc8eae2ae/go.mod h1:C
github.com/alexkohler/prealloc v1.0.0/go.mod h1:VetnK3dIgFBBKmg0YnD9F9x6Icjd+9cvfHR56wJVlKE=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1529 h1:qAt5MZ3Ukwx/JMAiaagQhNQMBZLcmJbnweBoK3EeHxI=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1529/go.mod h1:RcDobYh8k5VP6TNybz9m++gL3ijVI5wueVr0EM10VsU=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1695 h1:ISpCPksXFyDOLO/8OQAdO6pKkC31NKldG6q+lPxM/ao=
github.com/aliyun/alibaba-cloud-sdk-go v1.61.1695/go.mod h1:RcDobYh8k5VP6TNybz9m++gL3ijVI5wueVr0EM10VsU=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9 h1:FXrPTd8Rdlc94dKccl7KPmdmIbVh/OjelJ8/vgMRzcQ=
github.com/amir/raidman v0.0.0-20170415203553-1ccc43bfb9c9/go.mod h1:eliMa/PW+RDr2QLWRmLH1R1ZA4RInpmvOzDDXtaIZkc=
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
@ -282,10 +282,13 @@ github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VT
github.com/apache/arrow/go/arrow v0.0.0-20210818145353-234c94e4ce64/go.mod h1:2qMFB56yOP3KzkB3PbYZ4AlUFg3a88F67TIx5lB/WwY=
github.com/apache/arrow/go/arrow v0.0.0-20211006091945-a69884db78f4 h1:nPUln5QTzhftSpmld3xcXw/GOJ3z1E8fR8tUrrc0YWk=
github.com/apache/arrow/go/arrow v0.0.0-20211006091945-a69884db78f4/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs=
github.com/apache/iotdb-client-go v0.12.2-0.20220722111104-cd17da295b46 h1:28HyUQcr8ZCyCAatR0gkf9PuLr52U2T+66tx5Th0nxI=
github.com/apache/iotdb-client-go v0.12.2-0.20220722111104-cd17da295b46/go.mod h1:1z89VPGCUGHGqxkPW8p2Haq6WJwrRBKZN+WOjDBiQQM=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.14.1/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.14.2/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/apex/log v1.6.0/go.mod h1:x7s+P9VtvFBXge9Vbn+8TrqKmuzmD35TTkeBHul8UtY=

View File

@ -28,6 +28,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb_v2"
_ "github.com/influxdata/telegraf/plugins/outputs/instrumental"
_ "github.com/influxdata/telegraf/plugins/outputs/iotdb"
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"
_ "github.com/influxdata/telegraf/plugins/outputs/kinesis"
_ "github.com/influxdata/telegraf/plugins/outputs/librato"

View File

@ -0,0 +1,125 @@
# IoTDB Output Plugin
This output plugin saves Telegraf metrics to an Apache IoTDB backend,
supporting session connection and data insertion.
## Apache IoTDB
Apache IoTDB (Database for Internet of Things) is an IoT native database with
high performance for data management and analysis, deployable on the edge and
the cloud. Due to its light-weight architecture, high performance and rich
feature set together with its deep integration with Apache Hadoop, Spark and
Flink, Apache IoTDB can meet the requirements of massive data storage,
high-speed data ingestion and complex data analysis in the IoT industrial
fields.
For more details consult the [Apache IoTDB website](https://iotdb.apache.org)
or the [Apache IoTDB GitHub page](https://github.com/apache/iotdb).
## Getting started
Before using this plugin, please configure the IP address, port number,
user name, password and other information of the database server,
as well as some data type conversion, time unit and other configurations.
Please see the [configuration section](#Configuration) for an example
configuration.
## Metric Translation
IoTDB uses a different data format for metric data than telegraf. It is
important to note that depending on the metrics being written, the translation
may be lossy. This plugin translates to IoTDB format in the following ways:
### Unsigned Integers
IoTDB currently **DOES NOT support unsigned integer**.
There are three available options of converting uint64, which are specified by
setting `uint64_conversion`.
- `int64_clip`, default option. If an unsigned integer is greater than
`math.MaxInt64`, save it as `int64`; else save `math.MaxInt64`
(9223372036854775807).
- `int64`, force converting an unsigned integer to a`int64`,no mater
what the value it is. This option may lead to exception if the value is
greater than `int64`.
- `text`force converting an unsigned integer to a string, no mater what the
value it is.
### Time Precision
IoTDB supports a variety of time precision. You can specify which precision
you want using the `timestamp_precision` setting. Default is `nanosecond`.
Other options are `second`, `millisecond`, `microsecond`.
### Metadata (tags)
IoTDB uses a tree model for metadata while Telegraf uses a tag model.
(See [InfluxDB-Protocol Adapter](
https://iotdb.apache.org/UserGuide/Master/API/InfluxDB-Protocol.html)
There are two available options of converting tags, which are specified by
setting `convert_tags_to`:
- `fields`. Treat Tags as measurements. For each Key:Value in Tag,
convert them into Measurement, Value, DataType, which are supported in IoTDB.
- `device_id`, default option. Treat Tags as part of device id. Tags
constitute a subtree of `Name`.
For example, there is a metric:
```markdown
Name="root.sg.device", Tags={tag1="private", tag2="working"}, Fields={s1=100, s2="hello"}
```
- `fields`, result: `root.sg.device, s1=100, s2="hello", tag1="private", tag2="working"`
- `device_id`, result: `root.sg.device.private.working, s1=100, s2="hello"`
## Configuration
```toml @sample.conf
# Save metrics to an IoTDB Database
[[outputs.iotdb]]
## Configuration of IoTDB server connection
host = "127.0.0.1"
# port = "6667"
## Configuration of authentication
# user = "root"
# password = "root"
## Timeout to open a new session.
## A value of zero means no timeout.
# timeout = "5s"
## Configuration of type conversion for 64-bit unsigned int
## IoTDB currently DOES NOT support unsigned integers (version 13.x).
## 32-bit unsigned integers are safely converted into 64-bit signed integers by the plugin,
## however, this is not true for 64-bit values in general as overflows may occur.
## The following setting allows to specify the handling of 64-bit unsigned integers.
## Available values are:
## - "int64" -- convert to 64-bit signed integers and accept overflows
## - "int64_clip" -- convert to 64-bit signed integers and clip the values on overflow to 9,223,372,036,854,775,807
## - "text" -- convert to the string representation of the value
# uint64_conversion = "int64_clip"
## Configuration of TimeStamp
## TimeStamp is always saved in 64bits int. timestamp_precision specifies the unit of timestamp.
## Available value:
## "second", "millisecond", "microsecond", "nanosecond"(default)
# timestamp_precision = "nanosecond"
## Handling of tags
## Tags are not fully supported by IoTDB.
## A guide with suggestions on how to handle tags can be found here:
## https://iotdb.apache.org/UserGuide/Master/API/InfluxDB-Protocol.html
##
## Available values are:
## - "fields" -- convert tags to fields in the measurement
## - "device_id" -- attach tags to the device ID
##
## For Example, a metric named "root.sg.device" with the tags `tag1: "private"` and `tag2: "working"` and
## fields `s1: 100` and `s2: "hello"` will result in the following representations in IoTDB
## - "fields" -- root.sg.device, s1=100, s2="hello", tag1="private", tag2="working"
## - "device_id" -- root.sg.device.private.working, s1=100, s2="hello"
# convert_tags_to = "device_id"
```

View File

@ -0,0 +1,276 @@
//go:generate ../../../tools/readme_config_includer/generator
package iotdb
import (
_ "embed"
"errors"
"fmt"
"math"
"strconv"
"strings"
"time"
"github.com/apache/iotdb-client-go/client"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/outputs"
)
// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
//go:embed sample.conf
var sampleConfig string
type IoTDB struct {
Host string `toml:"host"`
Port string `toml:"port"`
User string `toml:"user"`
Password string `toml:"password"`
Timeout config.Duration `toml:"timeout"`
ConvertUint64To string `toml:"uint64_conversion"`
TimeStampUnit string `toml:"timestamp_precision"`
TreatTagsAs string `toml:"convert_tags_to"`
Log telegraf.Logger `toml:"-"`
session *client.Session
}
type recordsWithTags struct {
// IoTDB Records basic data struct
DeviceIDList []string
MeasurementsList [][]string
ValuesList [][]interface{}
DataTypesList [][]client.TSDataType
TimestampList []int64
// extra tags
TagsList [][]*telegraf.Tag
}
func (*IoTDB) SampleConfig() string {
return sampleConfig
}
// Init is for setup, and validating config.
func (s *IoTDB) Init() error {
if s.Timeout < 0 {
return errors.New("negative timeout")
}
if !choice.Contains(s.ConvertUint64To, []string{"int64", "int64_clip", "text"}) {
return fmt.Errorf("unknown 'uint64_conversion' method %q", s.ConvertUint64To)
}
if !choice.Contains(s.TimeStampUnit, []string{"second", "millisecond", "microsecond", "nanosecond"}) {
return fmt.Errorf("unknown 'timestamp_precision' method %q", s.TimeStampUnit)
}
if !choice.Contains(s.TreatTagsAs, []string{"fields", "device_id"}) {
return fmt.Errorf("unknown 'convert_tags_to' method %q", s.TreatTagsAs)
}
s.Log.Info("Initialization completed.")
return nil
}
func (s *IoTDB) Connect() error {
sessionConf := &client.Config{
Host: s.Host,
Port: s.Port,
UserName: s.User,
Password: s.Password,
}
var ss = client.NewSession(sessionConf)
s.session = &ss
timeoutInMs := int(time.Duration(s.Timeout).Milliseconds())
if err := s.session.Open(false, timeoutInMs); err != nil {
return fmt.Errorf("connecting to %s:%s failed: %w", s.Host, s.Port, err)
}
return nil
}
func (s *IoTDB) Close() error {
_, err := s.session.Close()
return err
}
// Write should write immediately to the output, and not buffer writes
// (Telegraf manages the buffer for you). Returning an error will fail this
// batch of writes and the entire batch will be retried automatically.
func (s *IoTDB) Write(metrics []telegraf.Metric) error {
// Convert Metrics to Records with Tags
rwt, err := s.convertMetricsToRecordsWithTags(metrics)
if err != nil {
return err
}
// Write to client.
// If first writing fails, the client will automatically retry three times. If all fail, it returns an error.
if err := s.writeRecordsWithTags(rwt); err != nil {
return fmt.Errorf("write failed: %w", err)
}
return nil
}
// Find out data type of the value and return it's id in TSDataType, and convert it if necessary.
func (s *IoTDB) getDataTypeAndValue(value interface{}) (client.TSDataType, interface{}) {
switch v := value.(type) {
case int32:
return client.INT32, v
case int64:
return client.INT64, v
case uint32:
return client.INT64, int64(v)
case uint64:
switch s.ConvertUint64To {
case "int64_clip":
if v <= uint64(math.MaxInt64) {
return client.INT64, int64(v)
}
return client.INT64, int64(math.MaxInt64)
case "int64":
return client.INT64, int64(v)
case "text":
return client.TEXT, strconv.FormatUint(v, 10)
default:
return client.UNKNOW, int64(0)
}
case float64:
return client.DOUBLE, v
case string:
return client.TEXT, v
case bool:
return client.BOOLEAN, v
default:
return client.UNKNOW, int64(0)
}
}
// convert Timestamp Unit according to config
func (s *IoTDB) convertTimestampOfMetric(m telegraf.Metric) (int64, error) {
switch s.TimeStampUnit {
case "second":
return m.Time().Unix(), nil
case "millisecond":
return m.Time().UnixMilli(), nil
case "microsecond":
return m.Time().UnixMicro(), nil
case "nanosecond":
return m.Time().UnixNano(), nil
default:
return 0, fmt.Errorf("unknown timestamp_precision %q", s.TimeStampUnit)
}
}
// convert Metrics to Records with tags
func (s *IoTDB) convertMetricsToRecordsWithTags(metrics []telegraf.Metric) (*recordsWithTags, error) {
var deviceidList []string
var measurementsList [][]string
var valuesList [][]interface{}
var dataTypesList [][]client.TSDataType
var timestampList []int64
var tagsList [][]*telegraf.Tag
for _, metric := range metrics {
// write `metric` to the output sink here
var tags []*telegraf.Tag
tags = append(tags, metric.TagList()...)
// deal with basic parameter
var keys []string
var values []interface{}
var dataTypes []client.TSDataType
for _, field := range metric.FieldList() {
datatype, value := s.getDataTypeAndValue(field.Value)
if datatype == client.UNKNOW {
return nil, fmt.Errorf("datatype of %q is unknown, values: %v", field.Key, field.Value)
}
keys = append(keys, field.Key)
values = append(values, value)
dataTypes = append(dataTypes, datatype)
}
// Convert timestamp into specified unit
ts, err := s.convertTimestampOfMetric(metric)
if err != nil {
return nil, err
}
timestampList = append(timestampList, ts)
// append all metric data of this record to lists
deviceidList = append(deviceidList, metric.Name())
measurementsList = append(measurementsList, keys)
valuesList = append(valuesList, values)
dataTypesList = append(dataTypesList, dataTypes)
tagsList = append(tagsList, tags)
}
rwt := &recordsWithTags{
DeviceIDList: deviceidList,
MeasurementsList: measurementsList,
ValuesList: valuesList,
DataTypesList: dataTypesList,
TimestampList: timestampList,
TagsList: tagsList,
}
return rwt, nil
}
// modify recordsWithTags according to 'TreatTagsAs' Configuration
func (s *IoTDB) modifyRecordsWithTags(rwt *recordsWithTags) error {
switch s.TreatTagsAs {
case "fields":
// method 1: treat Tag(Key:Value) as measurement
for index, tags := range rwt.TagsList { // for each record
for _, tag := range tags { // for each tag of this record, append it's Key:Value to measurements
datatype, value := s.getDataTypeAndValue(tag.Value)
if datatype == client.UNKNOW {
return fmt.Errorf("datatype of %q is unknown, values: %v", tag.Key, value)
}
rwt.MeasurementsList[index] = append(rwt.MeasurementsList[index], tag.Key)
rwt.ValuesList[index] = append(rwt.ValuesList[index], value)
rwt.DataTypesList[index] = append(rwt.DataTypesList[index], datatype)
}
}
return nil
case "device_id":
// method 2: treat Tag(Key:Value) as subtree of device id
for index, tags := range rwt.TagsList { // for each record
topic := []string{rwt.DeviceIDList[index]}
for _, tag := range tags { // for each tag, append it's Value
topic = append(topic, tag.Value)
}
rwt.DeviceIDList[index] = strings.Join(topic, ".")
}
return nil
default:
// something go wrong. This configuration should have been checked in func Init().
return fmt.Errorf("unknown 'convert_tags_to' method: %q", s.TreatTagsAs)
}
}
// Write records with tags to IoTDB server
func (s *IoTDB) writeRecordsWithTags(rwt *recordsWithTags) error {
// deal with tags
if err := s.modifyRecordsWithTags(rwt); err != nil {
return err
}
// write to IoTDB server
status, err := s.session.InsertRecords(rwt.DeviceIDList, rwt.MeasurementsList,
rwt.DataTypesList, rwt.ValuesList, rwt.TimestampList)
if status != nil {
if verifyResult := client.VerifySuccess(status); verifyResult != nil {
s.Log.Debug(verifyResult)
}
}
return err
}
func init() {
outputs.Add("iotdb", func() telegraf.Output { return newIoTDB() })
}
// create a new IoTDB struct with default values.
func newIoTDB() *IoTDB {
return &IoTDB{
Host: "localhost",
Port: "6667",
User: "root",
Password: "root",
Timeout: config.Duration(time.Second * 5),
ConvertUint64To: "int64_clip",
TimeStampUnit: "nanosecond",
TreatTagsAs: "device_id",
}
}

View File

@ -0,0 +1,577 @@
package iotdb
import (
"math"
"strconv"
"testing"
"time"
"github.com/apache/iotdb-client-go/client"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)
// newMetricWithOrderedFields creates new Metric and makes sure fields are in
// order. This is required to define the expected output where the field order
// needs to be defines.
func newMetricWithOrderedFields(
name string,
tags []telegraf.Tag,
fields []telegraf.Field,
timestamp time.Time,
) telegraf.Metric {
m := metric.New(name, map[string]string{}, map[string]interface{}{}, timestamp)
for _, tag := range tags {
m.AddTag(tag.Key, tag.Value)
}
for _, field := range fields {
m.AddField(field.Key, field.Value)
}
return m
}
func TestInitInvalid(t *testing.T) {
tests := []struct {
name string
plugin *IoTDB
expected string
}{
{
name: "empty tag-conversion",
plugin: func() *IoTDB {
s := newIoTDB()
s.TreatTagsAs = ""
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'convert_tags_to' method ""`,
},
{
name: "empty uint-conversion",
plugin: func() *IoTDB {
s := newIoTDB()
s.ConvertUint64To = ""
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'uint64_conversion' method ""`,
},
{
name: "empty timestamp precision",
plugin: func() *IoTDB {
s := newIoTDB()
s.TimeStampUnit = ""
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'timestamp_precision' method ""`,
},
{
name: "invalid tag-conversion",
plugin: func() *IoTDB {
s := newIoTDB()
s.TreatTagsAs = "garbage"
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'convert_tags_to' method "garbage"`,
},
{
name: "invalid uint-conversion",
plugin: func() *IoTDB {
s := newIoTDB()
s.ConvertUint64To = "garbage"
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'uint64_conversion' method "garbage"`,
},
{
name: "invalid timestamp precision",
plugin: func() *IoTDB {
s := newIoTDB()
s.TimeStampUnit = "garbage"
s.Log = &testutil.Logger{}
return s
}(),
expected: `unknown 'timestamp_precision' method "garbage"`,
},
{
name: "negative timeout",
plugin: func() *IoTDB {
s := newIoTDB()
s.Timeout = config.Duration(time.Second * -5)
s.Log = &testutil.Logger{}
return s
}(),
expected: `negative timeout`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.EqualError(t, tt.plugin.Init(), tt.expected)
})
}
}
// Test Metric conversion, which means testing function `convertMetricsToRecordsWithTags`
func TestMetricConversionToRecordsWithTags(t *testing.T) {
var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC)
tests := []struct {
name string
plugin *IoTDB
expected recordsWithTags
metrics []telegraf.Metric
}{
{
name: "default config",
plugin: func() *IoTDB { s := newIoTDB(); return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.fan", "root.computer.fan", "root.computer.keyboard"},
MeasurementsList: [][]string{
{"temperature", "counter"},
{"counter", "temperature"},
{"temperature", "counter", "unsigned_big", "string", "bool", "int_text"},
},
ValuesList: [][]interface{}{
{float64(42.55), int64(987654321)},
{int64(123456789), float64(56.24)},
{float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64},
{client.INT64, client.DOUBLE},
{client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT},
},
TimestampList: []int64{testTimestamp.UnixNano(), testTimestamp.UnixNano(), testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.fan",
[]telegraf.Tag{
{Key: "price", Value: "expensive"},
{Key: "owner", Value: "cpu"},
},
[]telegraf.Field{
{Key: "temperature", Value: float64(42.55)},
{Key: "counter", Value: int64(987654321)},
},
testTimestamp,
),
newMetricWithOrderedFields(
"root.computer.fan",
[]telegraf.Tag{ // same keys in different order
{Key: "owner", Value: "gpu"},
{Key: "price", Value: "cheap"},
},
[]telegraf.Field{ // same keys in different order
{Key: "counter", Value: int64(123456789)},
{Key: "temperature", Value: float64(56.24)},
},
testTimestamp,
),
newMetricWithOrderedFields(
"root.computer.keyboard",
[]telegraf.Tag{},
[]telegraf.Field{
{Key: "temperature", Value: float64(30.33)},
{Key: "counter", Value: int64(123456789)},
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
{Key: "string", Value: "Made in China."},
{Key: "bool", Value: bool(false)},
{Key: "int_text", Value: "123456789011"},
},
testTimestamp,
),
},
},
{
name: "unsigned int to text",
plugin: func() *IoTDB { cli002 := newIoTDB(); cli002.ConvertUint64To = "text"; return cli002 }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.uint_to_text"},
MeasurementsList: [][]string{{"unsigned_big"}},
ValuesList: [][]interface{}{{strconv.FormatUint(uint64(math.MaxInt64+1000), 10)}},
DataTypesList: [][]client.TSDataType{{client.TEXT}},
TimestampList: []int64{testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.uint_to_text",
[]telegraf.Tag{},
[]telegraf.Field{
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
},
testTimestamp,
),
},
},
{
name: "unsigned int to int with overflow",
plugin: func() *IoTDB { cli002 := newIoTDB(); cli002.ConvertUint64To = "int64"; return cli002 }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.overflow"},
MeasurementsList: [][]string{{"unsigned_big"}},
ValuesList: [][]interface{}{{int64(-9223372036854774809)}},
DataTypesList: [][]client.TSDataType{{client.INT64}},
TimestampList: []int64{testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.overflow",
[]telegraf.Tag{},
[]telegraf.Field{
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
},
testTimestamp,
),
},
},
{
name: "second timestamp precision",
plugin: func() *IoTDB { s := newIoTDB(); s.TimeStampUnit = "second"; return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.second"},
MeasurementsList: [][]string{{"unsigned_big"}},
ValuesList: [][]interface{}{{int64(math.MaxInt64)}},
DataTypesList: [][]client.TSDataType{{client.INT64}},
TimestampList: []int64{testTimestamp.Unix()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.second",
[]telegraf.Tag{},
[]telegraf.Field{
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
},
testTimestamp,
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.plugin.Log = &testutil.Logger{}
require.NoError(t, tt.plugin.Init())
actual, err := tt.plugin.convertMetricsToRecordsWithTags(tt.metrics)
require.NoError(t, err)
// Ignore the tags-list for comparison
actual.TagsList = nil
require.EqualValues(t, &tt.expected, actual)
})
}
}
// Test tags handling, which means testing function `modifyRecordsWithTags`
func TestTagsHandling(t *testing.T) {
var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC)
tests := []struct {
name string
plugin *IoTDB
expected recordsWithTags
input recordsWithTags
}{
{ //treat tags as fields. And input Tags are NOT in order.
name: "treat tags as fields",
plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "fields"; return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.fields"},
MeasurementsList: [][]string{{"temperature", "counter", "owner", "price"}},
ValuesList: [][]interface{}{
{float64(42.55), int64(987654321), "cpu", "expensive"},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64, client.TEXT, client.TEXT},
},
TimestampList: []int64{testTimestamp.UnixNano()},
},
input: recordsWithTags{
DeviceIDList: []string{"root.computer.fields"},
MeasurementsList: [][]string{{"temperature", "counter"}},
ValuesList: [][]interface{}{
{float64(42.55), int64(987654321)},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64},
},
TimestampList: []int64{testTimestamp.UnixNano()},
TagsList: [][]*telegraf.Tag{{
{Key: "owner", Value: "cpu"},
{Key: "price", Value: "expensive"},
}},
},
},
{ //treat tags as device IDs. And input Tags are in order.
name: "treat tags as device IDs",
plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "device_id"; return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.deviceID.cpu.expensive"},
MeasurementsList: [][]string{{"temperature", "counter"}},
ValuesList: [][]interface{}{
{float64(42.55), int64(987654321)},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64},
},
TimestampList: []int64{testTimestamp.UnixNano()},
},
input: recordsWithTags{
DeviceIDList: []string{"root.computer.deviceID"},
MeasurementsList: [][]string{{"temperature", "counter"}},
ValuesList: [][]interface{}{
{float64(42.55), int64(987654321)},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64},
},
TimestampList: []int64{testTimestamp.UnixNano()},
TagsList: [][]*telegraf.Tag{{
{Key: "owner", Value: "cpu"},
{Key: "price", Value: "expensive"},
}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.plugin.Log = &testutil.Logger{}
require.NoError(t, tt.plugin.Init())
require.NoError(t, tt.plugin.modifyRecordsWithTags(&tt.input))
// Ignore the tags-list for comparison
tt.input.TagsList = nil
require.EqualValues(t, tt.expected, tt.input)
})
}
}
// Test entire Metric conversion, from metrics to records which are ready to insert
func TestEntireMetricConversion(t *testing.T) {
var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC)
tests := []struct {
name string
plugin *IoTDB
expected recordsWithTags
metrics []telegraf.Metric
requireEqual bool
}{
{
name: "default config",
plugin: func() *IoTDB { s := newIoTDB(); return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.screen.high.LED"},
MeasurementsList: [][]string{
{"temperature", "counter", "unsigned_big", "string", "bool", "int_text"},
},
ValuesList: [][]interface{}{
{float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT},
},
TimestampList: []int64{testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.screen",
[]telegraf.Tag{
{Key: "brightness", Value: "high"},
{Key: "type", Value: "LED"},
},
[]telegraf.Field{
{Key: "temperature", Value: float64(30.33)},
{Key: "counter", Value: int64(123456789)},
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
{Key: "string", Value: "Made in China."},
{Key: "bool", Value: bool(false)},
{Key: "int_text", Value: "123456789011"},
},
testTimestamp,
),
},
requireEqual: true,
},
{
name: "wrong order of tags",
plugin: func() *IoTDB { s := newIoTDB(); return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.screen.LED.high"},
MeasurementsList: [][]string{
{"temperature", "counter", "unsigned_big", "string", "bool", "int_text"},
},
ValuesList: [][]interface{}{
{float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT},
},
TimestampList: []int64{testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.screen",
[]telegraf.Tag{
{Key: "brightness", Value: "high"},
{Key: "type", Value: "LED"},
},
[]telegraf.Field{
{Key: "temperature", Value: float64(30.33)},
{Key: "counter", Value: int64(123456789)},
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
{Key: "string", Value: "Made in China."},
{Key: "bool", Value: bool(false)},
{Key: "int_text", Value: "123456789011"},
},
testTimestamp,
),
},
requireEqual: false,
},
{
name: "wrong order of tags",
plugin: func() *IoTDB { s := newIoTDB(); return s }(),
expected: recordsWithTags{
DeviceIDList: []string{"root.computer.screen.LED.high"},
MeasurementsList: [][]string{
{"temperature", "counter"},
},
ValuesList: [][]interface{}{
{float64(30.33), int64(123456789)},
},
DataTypesList: [][]client.TSDataType{
{client.DOUBLE, client.INT64},
},
TimestampList: []int64{testTimestamp.UnixNano()},
},
metrics: []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.screen",
[]telegraf.Tag{
{Key: "brightness", Value: "high"},
{Key: "type", Value: "LED"},
},
[]telegraf.Field{
{Key: "temperature", Value: float64(30.33)},
{Key: "counter", Value: int64(123456789)},
},
testTimestamp,
),
},
requireEqual: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.plugin.Log = &testutil.Logger{}
require.NoError(t, tt.plugin.Init())
actual, err := tt.plugin.convertMetricsToRecordsWithTags(tt.metrics)
require.NoError(t, err)
require.NoError(t, tt.plugin.modifyRecordsWithTags(actual))
// Ignore the tags-list for comparison
actual.TagsList = nil
if tt.requireEqual {
require.EqualValues(t, &tt.expected, actual)
} else {
require.NotEqualValues(t, &tt.expected, actual)
}
})
}
}
// Start a container and do integration test.
func TestIntegrationInserts(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
const iotdbPort = "6667"
container := testutil.Container{
Image: "apache/iotdb:0.13.0-node",
ExposedPorts: []string{iotdbPort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(iotdbPort)),
wait.ForLog("IoTDB has started."),
),
}
err := container.Start()
require.NoError(t, err, "failed to start IoTDB container")
defer func() {
require.NoError(t, container.Terminate(), "terminating IoTDB container failed")
}()
t.Logf("Container Address:%q, ExposedPorts:[%v:%v]", container.Address, container.Ports[iotdbPort], iotdbPort)
// create a client and tests two groups of insertion
testClient := &IoTDB{
Host: container.Address,
Port: container.Ports[iotdbPort],
User: "root",
Password: "root",
Timeout: config.Duration(time.Second * 5),
ConvertUint64To: "int64_clip",
TimeStampUnit: "nanosecond",
TreatTagsAs: "device_id",
}
testClient.Log = &testutil.Logger{}
// generate Metrics to input
metrics := []telegraf.Metric{
newMetricWithOrderedFields(
"root.computer.unsigned_big",
[]telegraf.Tag{},
[]telegraf.Field{
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
},
time.Now(),
),
newMetricWithOrderedFields(
"root.computer.fan",
[]telegraf.Tag{
{Key: "price", Value: "expensive"},
{Key: "owner", Value: "cpu"},
},
[]telegraf.Field{
{Key: "temperature", Value: float64(42.55)},
{Key: "counter", Value: int64(987654321)},
},
time.Now(),
),
newMetricWithOrderedFields(
"root.computer.fan",
[]telegraf.Tag{ // same keys in different order
{Key: "owner", Value: "gpu"},
{Key: "price", Value: "cheap"},
},
[]telegraf.Field{ // same keys in different order
{Key: "counter", Value: int64(123456789)},
{Key: "temperature", Value: float64(56.24)},
},
time.Now(),
),
newMetricWithOrderedFields(
"root.computer.keyboard",
[]telegraf.Tag{},
[]telegraf.Field{
{Key: "temperature", Value: float64(30.33)},
{Key: "counter", Value: int64(123456789)},
{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
{Key: "string", Value: "Made in China."},
{Key: "bool", Value: bool(false)},
{Key: "int_text", Value: "123456789011"},
},
time.Now(),
),
}
require.NoError(t, testClient.Connect())
require.NoError(t, testClient.Write(metrics))
require.NoError(t, testClient.Close())
}

View File

@ -0,0 +1,45 @@
# Save metrics to an IoTDB Database
[[outputs.iotdb]]
## Configuration of IoTDB server connection
host = "127.0.0.1"
# port = "6667"
## Configuration of authentication
# user = "root"
# password = "root"
## Timeout to open a new session.
## A value of zero means no timeout.
# timeout = "5s"
## Configuration of type conversion for 64-bit unsigned int
## IoTDB currently DOES NOT support unsigned integers (version 13.x).
## 32-bit unsigned integers are safely converted into 64-bit signed integers by the plugin,
## however, this is not true for 64-bit values in general as overflows may occur.
## The following setting allows to specify the handling of 64-bit unsigned integers.
## Available values are:
## - "int64" -- convert to 64-bit signed integers and accept overflows
## - "int64_clip" -- convert to 64-bit signed integers and clip the values on overflow to 9,223,372,036,854,775,807
## - "text" -- convert to the string representation of the value
# uint64_conversion = "int64_clip"
## Configuration of TimeStamp
## TimeStamp is always saved in 64bits int. timestamp_precision specifies the unit of timestamp.
## Available value:
## "second", "millisecond", "microsecond", "nanosecond"(default)
# timestamp_precision = "nanosecond"
## Handling of tags
## Tags are not fully supported by IoTDB.
## A guide with suggestions on how to handle tags can be found here:
## https://iotdb.apache.org/UserGuide/Master/API/InfluxDB-Protocol.html
##
## Available values are:
## - "fields" -- convert tags to fields in the measurement
## - "device_id" -- attach tags to the device ID
##
## For Example, a metric named "root.sg.device" with the tags `tag1: "private"` and `tag2: "working"` and
## fields `s1: 100` and `s2: "hello"` will result in the following representations in IoTDB
## - "fields" -- root.sg.device, s1=100, s2="hello", tag1="private", tag2="working"
## - "device_id" -- root.sg.device.private.working, s1=100, s2="hello"
# convert_tags_to = "device_id"