diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 6fce9c446..c943e3f8a 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -82,6 +82,7 @@ following works: - github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE) - github.com/blues/jsonata-go [MIT License](https://github.com/blues/jsonata-go/blob/main/LICENSE) - github.com/bmatcuk/doublestar [MIT License](https://github.com/bmatcuk/doublestar/blob/master/LICENSE) +- github.com/boschrexroth/ctrlx-datalayer-golang [MIT License](https://github.com/boschrexroth/ctrlx-datalayer-golang/blob/main/LICENSE) - github.com/bufbuild/protocompile [Apache License 2.0](https://github.com/bufbuild/protocompile/blob/main/LICENSE) - github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE) - github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE) diff --git a/go.mod b/go.mod index d416faf3f..c453320ad 100644 --- a/go.mod +++ b/go.mod @@ -51,6 +51,7 @@ require ( github.com/benbjohnson/clock v1.3.3 github.com/blues/jsonata-go v1.5.4 github.com/bmatcuk/doublestar/v3 v3.0.0 + github.com/boschrexroth/ctrlx-datalayer-golang v1.3.0 github.com/caio/go-tdigest v3.1.0+incompatible github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df github.com/clarify/clarify-go v0.2.4 diff --git a/go.sum b/go.sum index a395b6e46..4c8dcfd1d 100644 --- a/go.sum +++ b/go.sum @@ -371,6 +371,8 @@ github.com/bmatcuk/doublestar/v3 v3.0.0/go.mod h1:6PcTVMw80pCY1RVuoqu3V++99uQB3v github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= +github.com/boschrexroth/ctrlx-datalayer-golang v1.3.0 h1:rwOJNZEGwMGbKziTcGpcoMdK0lfZE78lxR+UzLw+pRM= +github.com/boschrexroth/ctrlx-datalayer-golang v1.3.0/go.mod h1:i0ex6o3HhWHDSS0KEmRuHZOk3FVdJamzyk+tp3qmxkg= github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8= github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= @@ -692,6 +694,7 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc h1:jd+stC3Fqf9kaqgCLOdm4Da/AN3txPTlmLB6tStXAcU= github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc/go.mod h1:YzWEoI07MC/a/wj9in8GeVatqfypkldgBlwXh9bCwqY= +github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v23.3.3+incompatible h1:5PJI/WbJkaMTvpGxsHVKG/LurN/KnWXNyGpwSCDgen0= github.com/google/flatbuffers v23.3.3+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= @@ -1073,6 +1076,10 @@ github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBW github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk= github.com/logzio/azure-monitor-metrics-receiver v1.0.0 h1:TAzhIZL2ueyyc81qIw8FGg4nUbts4Hvc3oOxSobY1IA= github.com/logzio/azure-monitor-metrics-receiver v1.0.0/go.mod h1:UIaQ7UgxZ8jO3L0JB2hctsHFBbZqL6mbxYscQAeFpl4= +github.com/loov/hrtime v1.0.1/go.mod h1:yDY3Pwv2izeY4sq7YcPX/dtLwzg5NU1AxWuWxKwd0p0= +github.com/loov/hrtime v1.0.3/go.mod h1:yDY3Pwv2izeY4sq7YcPX/dtLwzg5NU1AxWuWxKwd0p0= +github.com/loov/hrtime/hrplot v1.0.2/go.mod h1:9t65xYn4d42ntjv40Wt5lbU72/VC5S0zGDgjC8kD5BU= +github.com/loov/plot v0.0.0-20200413101321-e09a6f01d2f5/go.mod h1:gSrhfSMoiPGG0CZ9E66kXjaHxFw0fzJhooyicOnz5z4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c h1:VtwQ41oftZwlMnOEbMWQtSEUgU64U4s+GHk7hZK+jtY= github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE= @@ -1235,6 +1242,7 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4= github.com/olivere/elastic v6.2.37+incompatible h1:UfSGJem5czY+x/LqxgeCBgjDn6St+z8OnsCuxwD3L0U= github.com/olivere/elastic v6.2.37+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8= github.com/olivere/elastic/v7 v7.0.12/go.mod h1:14rWX28Pnh3qCKYRVnSGXWLf9MbLonYS/4FDCY3LAPo= diff --git a/plugins/inputs/all/ctrlx_datalayer.go b/plugins/inputs/all/ctrlx_datalayer.go new file mode 100644 index 000000000..f552374f1 --- /dev/null +++ b/plugins/inputs/all/ctrlx_datalayer.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.ctrlx_datalayer + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/ctrlx_datalayer" // register plugin diff --git a/plugins/inputs/ctrlx_datalayer/README.md b/plugins/inputs/ctrlx_datalayer/README.md new file mode 100644 index 000000000..fcd6c3866 --- /dev/null +++ b/plugins/inputs/ctrlx_datalayer/README.md @@ -0,0 +1,386 @@ +# ctrlX Data Layer Input Plugin + +The `ctrlx_datalayer` plugin gathers data from the ctrlX Data Layer, +a communication middleware runnning on +[ctrlX CORE devices](https://ctrlx-core.com) from +[Bosch Rexroth](https://boschrexroth.com). The platform is used for +professional automation applications like industrial automation, building +automation, robotics, IoT Gateways or as classical PLC. For more +information, see [ctrlX AUTOMATION](https://ctrlx-automation.com). + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +# A ctrlX Data Layer server sent event input plugin +[[inputs.ctrlx_datalayer]] + ## Hostname or IP address of the ctrlX CORE Data Layer server + ## example: server = "localhost" # Telegraf is running directly on the device + ## server = "192.168.1.1" # Connect to ctrlX CORE remote via IP + ## server = "host.example.com" # Connect to ctrlX CORE remote via hostname + ## server = "10.0.2.2:8443" # Connect to ctrlX CORE Virtual from development environment + server = "localhost" + + ## Authentication credentials + username = "boschrexroth" + password = "boschrexroth" + + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Timeout for HTTP requests. (default: "10s") + # timeout = "10s" + + + ## Create a ctrlX Data Layer subscription. + ## It is possible to define multiple subscriptions per host. Each subscription can have its own + ## sampling properties and a list of nodes to subscribe to. + ## All subscriptions share the same credentials. + [[inputs.ctrlx_datalayer.subscription]] + ## The name of the measurement. (default: "ctrlx") + measurement = "memory" + + ## Configure the ctrlX Data Layer nodes which should be subscribed. + ## address - node address in ctrlX Data Layer (mandatory) + ## name - field name to use in the output (optional, default: base name of address) + ## tags - extra node tags to be added to the output metric (optional) + ## Note: + ## Use either the inline notation or the bracketed notation, not both. + ## The tags property is only supported in bracketed notation due to toml parser restrictions + ## Examples: + ## Inline notation + nodes=[ + {name="available", address="framework/metrics/system/memavailable-mb"}, + {name="used", address="framework/metrics/system/memused-mb"}, + ] + ## Bracketed notation + # [[inputs.ctrlx_datalayer.subscription.nodes]] + # name ="available" + # address="framework/metrics/system/memavailable-mb" + # ## Define extra tags related to node to be added to the output metric (optional) + # [inputs.ctrlx_datalayer.subscription.nodes.tags] + # node_tag1="node_tag1" + # node_tag2="node_tag2" + # [[inputs.ctrlx_datalayer.subscription.nodes]] + # name ="used" + # address="framework/metrics/system/memused-mb" + + ## The switch "output_json_string" enables output of the measurement as json. + ## That way it can be used in in a subsequent processor plugin, e.g. "Starlark Processor Plugin". + # output_json_string = false + + ## Define extra tags related to subscription to be added to the output metric (optional) + # [inputs.ctrlx_datalayer.subscription.tags] + # subscription_tag1 = "subscription_tag1" + # subscription_tag2 = "subscription_tag2" + + ## The interval in which messages shall be sent by the ctrlX Data Layer to this plugin. (default: 1s) + ## Higher values reduce load on network by queuing samples on server side and sending as a single TCP packet. + # publish_interval = "1s" + + ## The interval a "keepalive" message is sent if no change of data occurs. (default: 60s) + ## Only used internally to detect broken network connections. + # keep_alive_interval = "60s" + + ## The interval an "error" message is sent if an error was received from a node. (default: 10s) + ## Higher values reduce load on output target and network in case of errors by limiting frequency of error messages. + # error_interval = "10s" + + ## The interval that defines the fastest rate at which the node values should be sampled and values captured. (default: 1s) + ## The sampling frequency should be adjusted to the dynamics of the signal to be sampled. + ## Higher sampling frequence increases load on ctrlX Data Layer. + ## The sampling frequency can be higher, than the publish interval. Captured samples are put in a queue and sent in publish interval. + ## Note: The minimum sampling interval can be overruled by a global setting in the ctrlX Data Layer configuration ('datalayer/subscriptions/settings'). + # sampling_interval = "1s" + + ## The requested size of the node value queue. (default: 10) + ## Relevant if more values are captured than can be sent. + # queue_size = 10 + + ## The behaviour of the queue if it is full. (default: "DiscardOldest") + ## Possible values: + ## - "DiscardOldest" + ## The oldest value gets deleted from the queue when it is full. + ## - "DiscardNewest" + ## The newest value gets deleted from the queue when it is full. + # queue_behaviour = "DiscardOldest" + + ## The filter when a new value will be sampled. (default: 0.0) + ## Calculation rule: If (abs(lastCapturedValue - newValue) > dead_band_value) capture(newValue). + # dead_band_value = 0.0 + + ## The conditions on which a sample should be captured and thus will be sent as a message. (default: "StatusValue") + ## Possible values: + ## - "Status" + ## Capture the value only, when the state of the node changes from or to error state. Value changes are ignored. + ## - "StatusValue" + ## Capture when the value changes or the node changes from or to error state. + ## See also 'dead_band_value' for what is considered as a value change. + ## - "StatusValueTimestamp": + ## Capture even if the value is the same, but the timestamp of the value is newer. + ## Note: This might lead to high load on the network because every sample will be sent as a message + ## even if the value of the node did not change. + # value_change = "StatusValue" + +``` + +## Metrics + +All measurements are tagged with the server address of the device and the +corresponding node address as defined in the ctrlX Data Layer. + +- measurement name + - tags: + - `source` (ctrlX Data Layer server where the metrics are gathered from) + - `node` (Address of the ctrlX Data Layer node) + - fields: + - `{name}` (for nodes with simple data types) + - `{name}_{index}`(for nodes with array data types) + - `{name}_{jsonflat.key}` (for nodes with object data types) + +### Output Format + +The switch "output_json_string" determines the format of the output metric. + +#### Output default format + +With the output default format + +```toml +output_json_string=false +``` + +the output is formatted automatically as follows depending on the data type: + +##### Simple data type + +The value is passed 'as it is' to a metric with pattern: + +```text +{name}={value} +``` + +Simple data types of ctrlX Data Layer: + +```text +bool8,int8,uint8,int16,uint16,int32,uint32,int64,uint64,float,double,string,timestamp +``` + +##### Array data type + +Every value in the array is passed to a metric with pattern: + +```text +{name}_{index}={value[index]} +``` + +example: + +```text +myarray=[1,2,3] -> myarray_1=1, myarray_2=2, myarray_3=3 +``` + +Array data types of ctrlX Data Layer: + +```text +arbool8,arint8,aruint8,arint16,aruint16,arint32,aruint32,arint64,aruint64,arfloat,ardouble,arstring,artimestamp +``` + +##### Object data type (JSON) + +Every value of the flattened json is passed to a metric with pattern: + +```text +{name}_{jsonflat.key}={jsonflat.value} +``` + +example: + +```text +myobj={"a":1,"b":2,"c":{"d": 3}} -> myobj_a=1, myobj_b=2, myobj_c_d=3 +``` + +#### Output JSON format + +With the output JSON format + +```toml +output_json_string=true +``` + +the output is formatted as JSON string: + +```text +{name}="{value}" +``` + +examples: + +```text +input=true -> output="true" +``` + +```text +input=[1,2,3] -> output="[1,2,3]" +``` + +```text +input={"x":4720,"y":9440,"z":{"d": 14160}} -> output="{\"x\":4720,\"y\":9440,\"z\":14160}" +``` + +The JSON output string can be passed to a processor plugin for transformation +e.g. [Parser Processor Plugin][PARSER.md] +or [Starlark Processor Plugin][STARLARK.md] + +[PARSER.md]: ../../processors/parser/README.md +[STARLARK.md]: ../../processors/starlark/README.md + +example: + +```toml +[[inputs.ctrlx_datalayer.subscription]] + measurement = "osci" + nodes = [ + {address="oscilloscope/instances/Osci_PLC/rec-values/allsignals"}, + ] + output_json_string = true + +[[processors.starlark]] + namepass = [ + 'osci', + ] + script = "oscilloscope.star" +``` + +## Troubleshooting + +This plugin was contributed by +[Bosch Rexroth](https://www.boschrexroth.com). +For questions regarding ctrlX AUTOMATION and this plugin feel +free to check out and be part of the +[ctrlX AUTOMATION Community](https://ctrlx-automation.com/community) +to get additional support or leave some ideas and feedback. + +Also, join +[InfluxData Community Slack](https://influxdata.com/slack) or +[InfluxData Community Page](https://community.influxdata.com/) +if you have questions or comments for the telegraf engineering teams. + +## Example Output + +The plugin handles simple, array and object (JSON) data types. + +### Example with simple data type + +Configuration: + +```toml +[[inputs.ctrlx_datalayer.subscription]] + measurement="memory" + [inputs.ctrlx_datalayer.subscription.tags] + sub_tag1="memory_tag1" + sub_tag2="memory_tag2" + + [[inputs.ctrlx_datalayer.subscription.nodes]] + name ="available" + address="framework/metrics/system/memavailable-mb" + [inputs.ctrlx_datalayer.subscription.nodes.tags] + node_tag1="memory_available_tag1" + node_tag2="memory_available_tag2" + + [[inputs.ctrlx_datalayer.subscription.nodes]] + name ="used" + address="framework/metrics/system/memused-mb" + [inputs.ctrlx_datalayer.subscription.nodes.tags] + node_tag1="memory_used_node_tag1" + node_tag2="memory_used_node_tag2" +``` + +Source: + +```json +"framework/metrics/system/memavailable-mb" : 365.93359375 +"framework/metrics/system/memused-mb" : 567.67578125 +``` + +Metrics: + +```text +memory,source=192.168.1.1,host=host.example.com,node=framework/metrics/system/memavailable-mb,node_tag1=memory_available_tag1,node_tag2=memory_available_tag2,sub_tag1=memory2_tag1,sub_tag2=memory_tag2 available=365.93359375 1680093310249627400 +memory,source=192.168.1.1,host=host.example.com,node=framework/metrics/system/memused-mb,node_tag1=memory_used_node_tag1,node_tag2=memory_used_node_tag2,sub_tag1=memory2_tag1,sub_tag2=memory_tag2 used=567.67578125 1680093310249667600 +``` + +### Example with array data type + +Configuration: + +```toml +[[inputs.ctrlx_datalayer.subscription]] + measurement="array" + nodes=[ + { name="ar_uint8", address="alldata/dynamic/array-of-uint8"}, + { name="ar_bool8", address="alldata/dynamic/array-of-bool8"}, + ] +``` + +Source: + +```json +"alldata/dynamic/array-of-bool8" : [true, false, true] +"alldata/dynamic/array-of-uint8" : [0, 255] +``` + +Metrics: + +```text +array,source=192.168.1.1,host=host.example.com,node=alldata/dynamic/array-of-bool8 ar_bool8_0=true,ar_bool8_1=false,ar_bool8_2=true 1680095727347018800 +array,source=192.168.1.1,host=host.example.com,node=alldata/dynamic/array-of-uint8 ar_uint8_0=0,ar_uint8_1=255 1680095727347223300 +``` + +### Example with object data type (JSON) + +Configuration: + +```toml +[[inputs.ctrlx_datalayer.subscription]] + measurement="motion" + nodes=[ + {name="linear", address="motion/axs/Axis_1/state/values/actual"}, + {name="rotational", address="motion/axs/Axis_2/state/values/actual"}, + ] +``` + +Source: + +```json +"motion/axs/Axis_1/state/values/actual" : {"actualPos":65.249329860957,"actualVel":5,"actualAcc":0,"actualTorque":0,"distLeft":0,"actualPosUnit":"mm","actualVelUnit":"mm/min","actualAccUnit":"m/s^2","actualTorqueUnit":"Nm","distLeftUnit":"mm"} +"motion/axs/Axis_2/state/values/actual" : {"actualPos":120,"actualVel":0,"actualAcc":0,"actualTorque":0,"distLeft":0,"actualPosUnit":"deg","actualVelUnit":"rpm","actualAccUnit":"rad/s^2","actualTorqueUnit":"Nm","distLeftUnit":"deg"} +``` + +Metrics: + +```text +motion,source=192.168.1.1,host=host.example.com,node=motion/axs/Axis_1/state/values/actual linear_actualVel=5,linear_distLeftUnit="mm",linear_actualAcc=0,linear_distLeft=0,linear_actualPosUnit="mm",linear_actualAccUnit="m/s^2",linear_actualTorqueUnit="Nm",linear_actualPos=65.249329860957,linear_actualVelUnit="mm/min",linear_actualTorque=0 1680258290342523500 +motion,source=192.168.1.1,host=host.example.com,node=motion/axs/Axis_2/state/values/actual rotational_distLeft=0,rotational_actualVelUnit="rpm",rotational_actualAccUnit="rad/s^2",rotational_distLeftUnit="deg",rotational_actualPos=120,rotational_actualVel=0,rotational_actualAcc=0,rotational_actualPosUnit="deg",rotational_actualTorqueUnit="Nm",rotational_actualTorque=0 1680258290342538100 +``` + +If `output_json_string` is set in the configuration: + +```toml + output_json_string = true +``` + +then the metrics will be generated like this: + +```text +motion,source=192.168.1.1,host=host.example.com,node=motion/axs/Axis_1/state/values/actual linear="{\"actualAcc\":0,\"actualAccUnit\":\"m/s^2\",\"actualPos\":65.249329860957,\"actualPosUnit\":\"mm\",\"actualTorque\":0,\"actualTorqueUnit\":\"Nm\",\"actualVel\":5,\"actualVelUnit\":\"mm/min\",\"distLeft\":0,\"distLeftUnit\":\"mm\"}" 1680258290342523500 +motion,source=192.168.1.1,host=host.example.com,node=motion/axs/Axis_2/state/values/actual rotational="{\"actualAcc\":0,\"actualAccUnit\":\"rad/s^2\",\"actualPos\":120,\"actualPosUnit\":\"deg\",\"actualTorque\":0,\"actualTorqueUnit\":\"Nm\",\"actualVel\":0,\"actualVelUnit\":\"rpm\",\"distLeft\":0,\"distLeftUnit\":\"deg\"}" 1680258290342538100 +``` diff --git a/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go new file mode 100644 index 000000000..08e4107df --- /dev/null +++ b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer.go @@ -0,0 +1,373 @@ +//go:generate ../../../tools/readme_config_includer/generator +package ctrlx_datalayer + +import ( + "bytes" + "context" + _ "embed" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" + "sync" + "time" + + "github.com/boschrexroth/ctrlx-datalayer-golang/pkg/sseclient" + "github.com/boschrexroth/ctrlx-datalayer-golang/pkg/token" + "github.com/google/uuid" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/metric" + httpconfig "github.com/influxdata/telegraf/plugins/common/http" + "github.com/influxdata/telegraf/plugins/inputs" + jsonParser "github.com/influxdata/telegraf/plugins/parsers/json" +) + +// This plugin is based on the official ctrlX CORE API. Documentation can be found in OpenAPI format at: +// https://boschrexroth.github.io/rest-api-description/ctrlx-automation/ctrlx-core/ +// Used APIs are: +// * ctrlX CORE - Authorization and Authentication API +// * ctrlX CORE - Data Layer API +// +// All communication between the device and this input plugin is based +// on https REST and HTML5 Server Sent Events (sse). + +//go:embed sample.conf +var sampleConfig string + +// CtrlXDataLayer encapsulated the configuration as well as the state of this plugin. +type CtrlXDataLayer struct { + Server string `toml:"server"` + Username config.Secret `toml:"username"` + Password config.Secret `toml:"password"` + + Log telegraf.Logger `toml:"-"` + Subscription []Subscription + + url string + wg sync.WaitGroup + cancel context.CancelFunc + + acc telegraf.Accumulator + connection *http.Client + tokenManager token.TokenManager + httpconfig.HTTPClientConfig +} + +// convertTimestamp2UnixTime converts the given Data Layer timestamp of the payload to UnixTime. +func convertTimestamp2UnixTime(t int64) time.Time { + // 1 sec=1000 milisec=1000000 microsec=1000000000 nanosec. + // Convert from FILETIME (100-nanosecond intervals since January 1, 1601 UTC) to + // seconds and nanoseconds since January 1, 1970 UTC. + // Between Jan 1, 1601 and Jan 1, 1970 there are 11644473600 seconds. + return time.Unix(0, (t-116444736000000000)*100) +} + +// createSubscription uses the official 'ctrlX Data Layer API' to create the sse subscription. +func (c *CtrlXDataLayer) createSubscription(sub *Subscription) (string, error) { + sseURL := c.url + subscriptionPath + + id := "telegraf_" + uuid.New().String() + request := sub.createRequest(id) + payload, err := json.Marshal(request) + if err != nil { + return "", fmt.Errorf("failed to create subscription %d payload: %w", sub.index, err) + } + + requestBody := bytes.NewBuffer(payload) + req, err := http.NewRequest("POST", sseURL, requestBody) + if err != nil { + return "", fmt.Errorf("failed to create subscription %d request: %w", sub.index, err) + } + + req.Header.Add("Authorization", c.tokenManager.Token.String()) + + resp, err := c.connection.Do(req) + if err != nil { + return "", fmt.Errorf("failed to do request to create sse subscription %d: %w", sub.index, err) + } + resp.Body.Close() + + if resp.StatusCode != 200 && resp.StatusCode != 201 { + return "", fmt.Errorf("failed to create sse subscription %d, status: %s", sub.index, resp.Status) + } + + return sseURL + "/" + id, nil +} + +// createSubscriptionAndSseClient creates a sse subscription on the server and +// initializes a sse client to receive sse events from the server. +func (c *CtrlXDataLayer) createSubscriptionAndSseClient(sub *Subscription) (*sseclient.SseClient, error) { + t, err := c.tokenManager.RequestAuthToken() + if err != nil { + return nil, err + } + + subURL, err := c.createSubscription(sub) + if err != nil { + return nil, err + } + + client := sseclient.NewSseClient(subURL, t.String(), c.InsecureSkipVerify) + + return client, nil +} + +// addMetric writes sse metric into accumulator. +func (c *CtrlXDataLayer) addMetric(se *sseclient.SseEvent, sub *Subscription) { + switch se.Event { + case "update": + // Received an updated value, that we translate into a metric + var d sseEventData + + if err := json.Unmarshal([]byte(se.Data), &d); err != nil { + c.acc.AddError(fmt.Errorf("received malformed data from 'update' event: %w", err)) + return + } + m, err := c.createMetric(&d, sub) + if err != nil { + c.acc.AddError(fmt.Errorf("failed to create metrics: %w", err)) + return + } + c.acc.AddMetric(m) + case "error": + // Received an error event, that we report to the accumulator + var e sseEventError + if err := json.Unmarshal([]byte(se.Data), &e); err != nil { + c.acc.AddError(fmt.Errorf("received malformed data from 'error' event: %w", err)) + return + } + c.acc.AddError(fmt.Errorf("received 'error' event for node: %q", e.Instance)) + case "keepalive": + // Keepalive events are ignored for the moment + c.Log.Debug("Received keepalive event") + default: + // Received a yet unsupported event type + c.Log.Debug("Received unsupported event: %q", se.Event) + } +} + +// createMetric - create metric depending on flag 'output_json' and data type +func (c *CtrlXDataLayer) createMetric(em *sseEventData, sub *Subscription) (telegraf.Metric, error) { + t := convertTimestamp2UnixTime(em.Timestamp) + node := sub.node(em.Node) + if node == nil { + return nil, errors.New("node not found") + } + + // default tags + tags := map[string]string{ + "node": em.Node, + "source": c.Server, + } + + // add tags of subscription if user has defined + for key, value := range sub.Tags { + tags[key] = value + } + + // add tags of node if user has defined + for key, value := range node.Tags { + tags[key] = value + } + + // set measurement of subscription + measurement := sub.Measurement + + // get field key from node properties + fieldKey := node.fieldKey() + + if fieldKey == "" { + return nil, errors.New("field key not valid") + } + + if sub.OutputJSONString { + b, err := json.Marshal(em.Value) + if err != nil { + return nil, err + } + fields := map[string]interface{}{fieldKey: string(b)} + m := metric.New(measurement, tags, fields, t) + return m, nil + } + + switch em.Type { + case "object": + flattener := jsonParser.JSONFlattener{} + err := flattener.FullFlattenJSON(fieldKey, em.Value, true, true) + if err != nil { + return nil, err + } + + m := metric.New(measurement, tags, flattener.Fields, t) + return m, nil + case "arbool8", + "arint8", "aruint8", + "arint16", "aruint16", + "arint32", "aruint32", + "arint64", "aruint64", + "arfloat", "ardouble", + "arstring", + "artimestamp": + fields := make(map[string]interface{}) + values := em.Value.([]interface{}) + for i := 0; i < len(values); i++ { + index := strconv.Itoa(i) + key := fieldKey + "_" + index + fields[key] = values[i] + } + m := metric.New(measurement, tags, fields, t) + return m, nil + case "bool8", + "int8", "uint8", + "int16", "uint16", + "int32", "uint32", + "int64", "uint64", + "float", "double", + "string", + "timestamp": + fields := map[string]interface{}{fieldKey: em.Value} + m := metric.New(measurement, tags, fields, t) + return m, nil + } + + return nil, fmt.Errorf("unsupported value type: %s", em.Type) +} + +// Init is for setup, and validating config +func (c *CtrlXDataLayer) Init() error { + // Check all configured subscriptions for valid settings + for i := range c.Subscription { + sub := &c.Subscription[i] + sub.applyDefaultSettings() + if !choice.Contains(sub.QueueBehaviour, queueBehaviours) { + c.Log.Infof("The right queue behaviour values are %v", queueBehaviours) + return fmt.Errorf("subscription %d: setting 'queue_behaviour' %q is invalid", i, sub.QueueBehaviour) + } + if !choice.Contains(sub.ValueChange, valueChanges) { + c.Log.Infof("The right value change values are %v", valueChanges) + return fmt.Errorf("subscription %d: setting 'value_change' %q is invalid", i, sub.ValueChange) + } + if len(sub.Nodes) == 0 { + c.Log.Warn("A configured subscription has no nodes configured") + } + sub.index = i + } + + // Generate valid communication url based on configured server address + u := url.URL{ + Scheme: "https", + Host: c.Server, + } + c.url = u.String() + if _, err := url.Parse(c.url); err != nil { + return errors.New("invalid server address") + } + + return nil +} + +// Start input as service, retain the accumulator, establish the connection. +func (c *CtrlXDataLayer) Start(acc telegraf.Accumulator) error { + var ctx context.Context + ctx, c.cancel = context.WithCancel(context.Background()) + + var err error + c.connection, err = c.HTTPClientConfig.CreateClient(ctx, c.Log) + if err != nil { + return fmt.Errorf("failed to create http client: %w", err) + } + + username, err := c.Username.Get() + if err != nil { + return fmt.Errorf("getting username failed: %w", err) + } + + password, err := c.Password.Get() + if err != nil { + return fmt.Errorf("getting password failed: %w", err) + } + + c.tokenManager = token.TokenManager{ + Url: c.url, + Username: string(username), + Password: string(password), + Connection: c.connection, + } + config.ReleaseSecret(username) + config.ReleaseSecret(password) + + c.acc = acc + + c.gatherLoop(ctx) + + return nil +} + +// gatherLoop creates sse subscriptions on the Data Layer and requests the sse data +// the connection will be restablished if the sse subscription is broken. +func (c *CtrlXDataLayer) gatherLoop(ctx context.Context) { + for _, sub := range c.Subscription { + c.wg.Add(1) + go func(sub Subscription) { + defer c.wg.Done() + for { + select { + case <-ctx.Done(): + c.Log.Debugf("Gather loop for subscription %d stopped", sub.index) + return + default: + client, err := c.createSubscriptionAndSseClient(&sub) + if err != nil { + c.Log.Errorf("Creating sse client to subscription %d: %v", sub.index, err) + time.Sleep(time.Duration(defaultReconnectInterval)) + continue + } + c.Log.Debugf("Created sse client to subscription %d", sub.index) + + // Establish connection and handle events in a callback function. + err = client.Subscribe(ctx, func(event string, data string) { + c.addMetric(&sseclient.SseEvent{ + Event: event, + Data: data, + }, &sub) + }) + if errors.Is(err, context.Canceled) { + // Subscription cancelled + c.Log.Debugf("Requesting data of subscription %d cancelled", sub.index) + return + } + c.Log.Errorf("Requesting data of subscription %d failed: %v", sub.index, err) + } + } + }(sub) + } +} + +// Stop input as service. +func (c *CtrlXDataLayer) Stop() { + c.cancel() + c.wg.Wait() +} + +// Gather is called by telegraf to collect the metrics. +func (c *CtrlXDataLayer) Gather(_ telegraf.Accumulator) error { + // Metrics are sent to the accumulator asynchronously in worker thread. So nothing to do here. + return nil +} + +// SampleConfig returns the auto-inserted sample configuration to the telegraf. +func (*CtrlXDataLayer) SampleConfig() string { + return sampleConfig +} + +// init registers the plugin in telegraf. +func init() { + inputs.Add("ctrlx_datalayer", func() telegraf.Input { + return &CtrlXDataLayer{} + }) +} diff --git a/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_payload_types.go b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_payload_types.go new file mode 100644 index 000000000..bc045763f --- /dev/null +++ b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_payload_types.go @@ -0,0 +1,19 @@ +package ctrlx_datalayer + +// Once a subscription is created, the server will send event notifications to this plugin. +// This file contains the different event types and the included event payload. + +// sseEventData represents the json structure send by the ctrlX CORE +// server on an "update" event. +type sseEventData struct { + Node string `json:"node"` + Timestamp int64 `json:"timestamp"` + Type string `json:"type"` + Value interface{} `json:"value"` +} + +// sseEventError represents the json structure send by the ctrlX CORE +// server on an "error" event. +type sseEventError struct { + Instance string `json:"instance"` +} diff --git a/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_subscription.go b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_subscription.go new file mode 100644 index 000000000..eb6755114 --- /dev/null +++ b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_subscription.go @@ -0,0 +1,188 @@ +package ctrlx_datalayer + +import ( + "strings" + "time" + + "github.com/influxdata/telegraf/config" +) + +// A subscription can be used to watch multiple ctrlX Data Layer nodes for changes. +// Additional configuration settings can be given to tune the sampling and monitoring behaviour of the nodes. +// All nodes in a subscription share the same configuration. +// The plugin is able to create and manage multiple subscriptions. + +// The allowed values of the subscription property 'QueueBehaviour' +var queueBehaviours = []string{"DiscardOldest", "DiscardNewest"} + +// The allowed values of the subscription property 'ValueChange' +var valueChanges = []string{"Status", "StatusValue", "StatusValueTimestamp"} + +// The default subscription settings +const ( + defaultKeepaliveInterval = config.Duration(60 * time.Second) + defaultErrorInterval = config.Duration(10 * time.Second) + defaultReconnectInterval = config.Duration(10 * time.Second) + defaultPublishInterval = config.Duration(1 * time.Second) + defaultSamplingInterval = config.Duration(1 * time.Second) + defaultQueueSize = 10 + defaultQueueBehaviour = "DiscardOldest" + defaultValueChange = "StatusValue" + defaultMeasurementName = "ctrlx" + subscriptionPath = "/automation/api/v2/events" +) + +// Node contains all properties of a node configuration +type Node struct { + Name string `toml:"name"` + Address string `toml:"address"` + Tags map[string]string `toml:"tags"` +} + +// Subscription contains all properties of a subscription configuration +type Subscription struct { + index int + Nodes []Node `toml:"nodes"` + Tags map[string]string `toml:"tags"` + Measurement string `toml:"measurement"` + PublishInterval config.Duration `toml:"publish_interval"` + KeepaliveInterval config.Duration `toml:"keep_alive_interval"` + ErrorInterval config.Duration `toml:"error_interval"` + SamplingInterval config.Duration `toml:"sampling_interval"` + QueueSize uint `toml:"queue_size"` + QueueBehaviour string `toml:"queue_behaviour"` + DeadBandValue float64 `toml:"dead_band_value"` + ValueChange string `toml:"value_change"` + OutputJSONString bool `toml:"output_json_string"` +} + +// Rule can be used to override default rule settings. +type Rule struct { + RuleType string `json:"rule_type"` + Rule interface{} `json:"rule"` +} + +// Sampling can be used to override default sampling settings. +type Sampling struct { + SamplingInterval uint64 `json:"samplingInterval"` +} + +// Queueing can be used to override default queuing settings. +type Queueing struct { + QueueSize uint `json:"queueSize"` + Behaviour string `json:"behaviour"` +} + +// DataChangeFilter can be used to override default data change filter settings. +type DataChangeFilter struct { + DeadBandValue float64 `json:"deadBandValue"` +} + +// ChangeEvents can be used to override default change events settings. +type ChangeEvents struct { + ValueChange string `json:"valueChange"` + BrowselistChange bool `json:"browselistChange"` + MetadataChange bool `json:"metadataChange"` +} + +// SubscriptionProperties can be used to override default subscription settings. +type SubscriptionProperties struct { + KeepaliveInterval int64 `json:"keepaliveInterval"` + Rules []Rule `json:"rules"` + ID string `json:"id"` + PublishInterval int64 `json:"publishInterval"` + ErrorInterval int64 `json:"errorInterval"` +} + +// SubscriptionRequest can be used to to create a sse subscription at the ctrlX Data Layer. +type SubscriptionRequest struct { + Properties SubscriptionProperties `json:"properties"` + Nodes []string `json:"nodes"` +} + +// applyDefaultSettings applies the default settings if they are not configured in the config file. +func (s *Subscription) applyDefaultSettings() { + if s.Measurement == "" { + s.Measurement = defaultMeasurementName + } + if s.PublishInterval == 0 { + s.PublishInterval = defaultPublishInterval + } + if s.KeepaliveInterval == 0 { + s.KeepaliveInterval = defaultKeepaliveInterval + } + if s.ErrorInterval == 0 { + s.ErrorInterval = defaultErrorInterval + } + if s.SamplingInterval == 0 { + s.SamplingInterval = defaultSamplingInterval + } + if s.QueueSize == 0 { + s.QueueSize = defaultQueueSize + } + if s.QueueBehaviour == "" { + s.QueueBehaviour = defaultQueueBehaviour + } + if s.ValueChange == "" { + s.ValueChange = defaultValueChange + } +} + +// createRequestBody builds the request body for the sse subscription, based on the subscription configuration. +// The request body can be send to the server to create a new subscription. +func (s *Subscription) createRequest(id string) SubscriptionRequest { + pl := SubscriptionRequest{ + Properties: SubscriptionProperties{ + Rules: []Rule{ + {"Sampling", Sampling{uint64(time.Duration(s.SamplingInterval).Microseconds())}}, + {"Queueing", Queueing{s.QueueSize, s.QueueBehaviour}}, + {"DataChangeFilter", DataChangeFilter{s.DeadBandValue}}, + {"ChangeEvents", ChangeEvents{s.ValueChange, false, false}}, + }, + ID: id, + KeepaliveInterval: time.Duration(s.KeepaliveInterval).Milliseconds(), + PublishInterval: time.Duration(s.PublishInterval).Milliseconds(), + ErrorInterval: time.Duration(s.ErrorInterval).Milliseconds(), + }, + Nodes: s.addressList(), + } + + return pl +} + +// addressList lists all configured node addresses +func (s *Subscription) addressList() []string { + addressList := []string{} + for _, node := range s.Nodes { + addressList = append(addressList, node.Address) + } + return addressList +} + +// node finds the node according the node address +func (s *Subscription) node(address string) *Node { + for _, node := range s.Nodes { + if address == node.Address { + return &node + } + } + return nil +} + +// fieldKey determines the field key out of node name or address +func (n *Node) fieldKey() string { + if n.Name != "" { + // return user defined node name as field key + return n.Name + } + + // fallback: field key is extracted from mandatory node address + i := strings.LastIndex(n.Address, "/") + if i > 0 { + // return last part of node address as field key + return n.Address[i+1:] + } + + // return full node address as field key + return n.Address +} diff --git a/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_subscription_test.go b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_subscription_test.go new file mode 100644 index 000000000..f053febf0 --- /dev/null +++ b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_subscription_test.go @@ -0,0 +1,248 @@ +package ctrlx_datalayer + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf/config" + "github.com/stretchr/testify/require" +) + +func TestSubscription_createRequest(t *testing.T) { + tests := []struct { + name string + subscription Subscription + id string + wantBody SubscriptionRequest + wantErr bool + }{ + { + name: "Should_Return_Expected_Request", + subscription: Subscription{ + Nodes: []Node{ + { + Name: "node1", + Address: "path/to/node1", + Tags: map[string]string{}, + }, + { + Name: "node2", + Address: "path/to/node2", + Tags: map[string]string{}, + }, + }, + Tags: map[string]string{}, + Measurement: "", + PublishInterval: config.Duration(2 * time.Second), + KeepaliveInterval: config.Duration(10 * time.Second), + ErrorInterval: config.Duration(20 * time.Second), + SamplingInterval: config.Duration(100 * time.Millisecond), + QueueSize: 100, + QueueBehaviour: "DiscardNewest", + DeadBandValue: 1.12345, + ValueChange: "StatusValueTimestamp", + OutputJSONString: true, + }, + id: "sub_id", + wantBody: SubscriptionRequest{ + Properties: SubscriptionProperties{ + KeepaliveInterval: 10000, + Rules: []Rule{ + { + "Sampling", + Sampling{ + SamplingInterval: 100000, + }, + }, + { + "Queueing", + Queueing{ + QueueSize: 100, + Behaviour: "DiscardNewest", + }, + }, + { + "DataChangeFilter", + DataChangeFilter{ + DeadBandValue: 1.12345, + }, + }, + { + "ChangeEvents", + ChangeEvents{ + ValueChange: "StatusValueTimestamp", + }, + }, + }, + ID: "sub_id", + PublishInterval: 2000, + ErrorInterval: 20000, + }, + Nodes: []string{ + "path/to/node1", + "path/to/node2", + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.subscription.createRequest(tt.id) + require.Equal(t, tt.wantBody, got) + }) + } +} + +func TestSubscription_node(t *testing.T) { + tests := []struct { + name string + nodes []Node + address string + want *Node + }{ + { + name: "Should_Return_Node_Of_Given_Address", + nodes: []Node{ + { + Name: "node1", + Address: "path/to/node1", + Tags: map[string]string{}, + }, + { + Name: "node2", + Address: "path/to/node2", + Tags: map[string]string{}, + }, + { + Name: "", + Address: "path/to/node3", + Tags: map[string]string{}, + }, + }, + address: "path/to/node3", + want: &Node{ + Name: "", + Address: "path/to/node3", + Tags: map[string]string{}, + }, + }, + { + name: "Should_Return_Nil_If_Node_With_Given_Address_Not_Found", + nodes: []Node{ + { + Name: "Node1", + Address: "path/to/node1", + Tags: map[string]string{}, + }, + { + Name: "Node2", + Address: "path/to/node2", + Tags: map[string]string{}, + }, + { + Name: "", + Address: "path/to/node3", + Tags: map[string]string{}, + }, + }, + address: "path/to/node4", + want: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &Subscription{ + Nodes: tt.nodes, + } + require.Equal(t, tt.want, s.node(tt.address)) + }) + } +} + +func TestSubscription_addressList(t *testing.T) { + tests := []struct { + name string + nodes []Node + want []string + }{ + { + name: "Should_Return_AddressArray_Of_All_Nodes", + nodes: []Node{ + { + Address: "framework/metrics/system/memused-mb", + }, + { + Address: "framework/metrics/system/memavailable-mb", + }, + { + Address: "root", + }, + { + Address: "", + }, + }, + want: []string{ + "framework/metrics/system/memused-mb", + "framework/metrics/system/memavailable-mb", + "root", + "", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &Subscription{ + Nodes: tt.nodes, + } + require.Equal(t, tt.want, s.addressList()) + }) + } +} + +func TestNode_fieldKey(t *testing.T) { + tests := []struct { + name string + node Node + want string + }{ + { + name: "Should_Return_Name_When_Name_Is_Not_Empty", + node: Node{ + Name: "used", + Address: "framework/metrics/system/memused-mb", + }, + want: "used", + }, + { + name: "Should_Return_Address_Base_When_Name_Is_Empty_And_Address_Contains_Full_Path", + node: Node{ + Name: "", + Address: "framework/metrics/system/memused-mb", + }, + want: "memused-mb", + }, + { + name: "Should_Return_Address_Base_Root_When_Name_Is_Empty_And_Address_Contains_Root_Path", + node: Node{ + Name: "", + Address: "root", + }, + want: "root", + }, + { + name: "Should_Return_Empty_When_Name_and_Address_Are_Empty", + node: Node{ + Name: "", + Address: "", + }, + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.want, tt.node.fieldKey()) + }) + } +} diff --git a/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_test.go b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_test.go new file mode 100644 index 000000000..5485a7e1a --- /dev/null +++ b/plugins/inputs/ctrlx_datalayer/ctrlx_datalayer_test.go @@ -0,0 +1,256 @@ +package ctrlx_datalayer + +import ( + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/boschrexroth/ctrlx-datalayer-golang/pkg/token" + "github.com/influxdata/telegraf/config" + httpconfig "github.com/influxdata/telegraf/plugins/common/http" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +const path = "/automation/api/v2/events" + +var multiEntries = false +var mux sync.Mutex + +func setMultiEntries(m bool) { + mux.Lock() + defer mux.Unlock() + multiEntries = m +} + +func getMultiEntries() bool { + mux.Lock() + defer mux.Unlock() + return multiEntries +} + +func TestCtrlXCreateSubscriptionBasic(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusCreated) + _, err := w.Write([]byte("201 created")) + require.NoError(t, err) + })) + defer server.Close() + + subs := make([]Subscription, 0) + subs = append(subs, Subscription{ + index: 0, + Nodes: []Node{ + {Name: "counter", Address: "plc/app/Application/sym/PLC_PRG/counter"}, + {Name: "counterReverse", Address: "plc/app/Application/sym/PLC_PRG/counterReverse"}, + }, + }, + ) + s := &CtrlXDataLayer{ + connection: &http.Client{}, + url: server.URL, + Username: config.NewSecret([]byte("user")), + Password: config.NewSecret([]byte("password")), + tokenManager: token.TokenManager{Url: server.URL, Username: "user", Password: "password", Connection: &http.Client{}}, + Subscription: subs, + Log: testutil.Logger{}, + } + + subID, err := s.createSubscription(&subs[0]) + + require.NoError(t, err) + require.NotEmpty(t, subID) +} + +func TestCtrlXCreateSubscriptionDriven(t *testing.T) { + var tests = []struct { + res string + status int + wantError bool + }{ + {res: "{\"status\":200}", status: 200, wantError: false}, + {res: "{\"status\":401}", status: 401, wantError: true}, + } + + for _, test := range tests { + t.Run(test.res, func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(test.status) + _, err := w.Write([]byte(test.res)) + require.NoError(t, err) + })) + defer server.Close() + subs := make([]Subscription, 0) + subs = append(subs, Subscription{ + Nodes: []Node{ + {Name: "counter", Address: "plc/app/Application/sym/PLC_PRG/counter"}, + {Name: "counterReverse", Address: "plc/app/Application/sym/PLC_PRG/counterReverse"}, + }, + }, + ) + s := &CtrlXDataLayer{ + connection: &http.Client{}, + url: server.URL, + Username: config.NewSecret([]byte("user")), + Password: config.NewSecret([]byte("password")), + Subscription: subs, + tokenManager: token.TokenManager{Url: server.URL, Username: "user", Password: "password", Connection: &http.Client{}}, + Log: testutil.Logger{}, + } + subID, err := s.createSubscription(&subs[0]) + + if test.wantError { + require.EqualError(t, err, "failed to create sse subscription 0, status: 401 Unauthorized") + require.Empty(t, subID) + } else { + require.NoError(t, err) + require.NotEmpty(t, subID) + } + }) + } +} + +func newServer(t *testing.T) *httptest.Server { + mux := http.NewServeMux() + // Handle request to fetch token + mux.HandleFunc("/identity-manager/api/v2/auth/token", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("{\"access_token\": \"eyJhbGciOiJIU.xxx.xxx\", \"token_type\":\"Bearer\"}")) + require.NoError(t, err) + })) + // Handle request to validate token + mux.HandleFunc("/identity-manager/api/v2/auth/token/validity", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("{\"valid\": \"true\"}")) + require.NoError(t, err) + })) + // Handle request to create subscription + mux.HandleFunc(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusCreated) + _, err := w.Write([]byte("201 created")) + require.NoError(t, err) + })) + // Handle request to fetch sse data + mux.HandleFunc(path+"/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodGet { + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("event: update\n")) + require.NoError(t, err) + _, err = w.Write([]byte("id: 12345\n")) + require.NoError(t, err) + if getMultiEntries() { + data := "data: {\n" + _, err = w.Write([]byte(data)) + require.NoError(t, err) + data = "data: \"node\":\"plc/app/Application/sym/PLC_PRG/counter\", \"timestamp\":132669450604571037,\"type\":\"double\",\"value\":44.0\n" + _, err = w.Write([]byte(data)) + require.NoError(t, err) + data = "data: }\n" + _, err = w.Write([]byte(data)) + require.NoError(t, err) + } else { + data := "data: {\"node\":\"plc/app/Application/sym/PLC_PRG/counter\", \"timestamp\":132669450604571037,\"type\":\"double\",\"value\":43.0}\n" + _, err = w.Write([]byte(data)) + require.NoError(t, err) + } + _, err = w.Write([]byte("\n")) + require.NoError(t, err) + } + })) + return httptest.NewServer(mux) +} + +func cleanup(server *httptest.Server) { + server.CloseClientConnections() + server.Close() +} + +func initRunner(t *testing.T) (*CtrlXDataLayer, *httptest.Server) { + server := newServer(t) + + subs := make([]Subscription, 0) + subs = append(subs, Subscription{ + Measurement: "ctrlx", + Nodes: []Node{ + {Name: "counter", Address: "plc/app/Application/sym/PLC_PRG/counter"}, + }, + }, + ) + s := &CtrlXDataLayer{ + connection: &http.Client{}, + url: server.URL, + Username: config.NewSecret([]byte("user")), + Password: config.NewSecret([]byte("password")), + HTTPClientConfig: httpconfig.HTTPClientConfig{ + ClientConfig: tls.ClientConfig{ + InsecureSkipVerify: true, + }, + }, + Subscription: subs, + tokenManager: token.TokenManager{Url: server.URL, Username: "user", Password: "password", Connection: &http.Client{}}, + Log: testutil.Logger{}, + } + return s, server +} + +func TestCtrlXMetricsField(t *testing.T) { + const measurement = "ctrlx" + const fieldName = "counter" + + s, server := initRunner(t) + defer cleanup(server) + + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(s.Start)) + require.Eventually(t, func() bool { + if v, found := acc.FloatField(measurement, fieldName); found { + require.Equal(t, 43.0, v) + return true + } + return false + }, time.Second*10, time.Second) +} + +func TestCtrlXMetricsMulti(t *testing.T) { + const measurement = "ctrlx" + const fieldName = "counter" + + setMultiEntries(true) + s, server := initRunner(t) + defer cleanup(server) + + var acc testutil.Accumulator + + require.NoError(t, acc.GatherError(s.Start)) + require.Eventually(t, func() bool { + if v, found := acc.FloatField(measurement, fieldName); found { + require.Equal(t, 44.0, v) + return true + } + return false + }, time.Second*10, time.Second) + + setMultiEntries(false) +} + +func TestCtrlXCreateSseClient(t *testing.T) { + sub := Subscription{ + Measurement: "ctrlx", + Nodes: []Node{ + {Name: "counter", Address: "plc/app/Application/sym/PLC_PRG/counter"}, + {Name: "counterReverse", Address: "plc/app/Application/sym/PLC_PRG/counterReverse"}, + }, + } + s, server := initRunner(t) + defer cleanup(server) + client, err := s.createSubscriptionAndSseClient(&sub) + require.NoError(t, err) + require.NotEmpty(t, client) +} + +func TestConvertTimestamp2UnixTime(t *testing.T) { + expected := time.Date(2022, 02, 14, 14, 22, 38, 333552400, time.UTC) + actual := convertTimestamp2UnixTime(132893221583335524) + require.EqualValues(t, expected.UnixNano(), actual.UnixNano()) +} diff --git a/plugins/inputs/ctrlx_datalayer/sample.conf b/plugins/inputs/ctrlx_datalayer/sample.conf new file mode 100644 index 000000000..aeccc3b67 --- /dev/null +++ b/plugins/inputs/ctrlx_datalayer/sample.conf @@ -0,0 +1,110 @@ +# A ctrlX Data Layer server sent event input plugin +[[inputs.ctrlx_datalayer]] + ## Hostname or IP address of the ctrlX CORE Data Layer server + ## example: server = "localhost" # Telegraf is running directly on the device + ## server = "192.168.1.1" # Connect to ctrlX CORE remote via IP + ## server = "host.example.com" # Connect to ctrlX CORE remote via hostname + ## server = "10.0.2.2:8443" # Connect to ctrlX CORE Virtual from development environment + server = "localhost" + + ## Authentication credentials + username = "boschrexroth" + password = "boschrexroth" + + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Timeout for HTTP requests. (default: "10s") + # timeout = "10s" + + + ## Create a ctrlX Data Layer subscription. + ## It is possible to define multiple subscriptions per host. Each subscription can have its own + ## sampling properties and a list of nodes to subscribe to. + ## All subscriptions share the same credentials. + [[inputs.ctrlx_datalayer.subscription]] + ## The name of the measurement. (default: "ctrlx") + measurement = "memory" + + ## Configure the ctrlX Data Layer nodes which should be subscribed. + ## address - node address in ctrlX Data Layer (mandatory) + ## name - field name to use in the output (optional, default: base name of address) + ## tags - extra node tags to be added to the output metric (optional) + ## Note: + ## Use either the inline notation or the bracketed notation, not both. + ## The tags property is only supported in bracketed notation due to toml parser restrictions + ## Examples: + ## Inline notation + nodes=[ + {name="available", address="framework/metrics/system/memavailable-mb"}, + {name="used", address="framework/metrics/system/memused-mb"}, + ] + ## Bracketed notation + # [[inputs.ctrlx_datalayer.subscription.nodes]] + # name ="available" + # address="framework/metrics/system/memavailable-mb" + # ## Define extra tags related to node to be added to the output metric (optional) + # [inputs.ctrlx_datalayer.subscription.nodes.tags] + # node_tag1="node_tag1" + # node_tag2="node_tag2" + # [[inputs.ctrlx_datalayer.subscription.nodes]] + # name ="used" + # address="framework/metrics/system/memused-mb" + + ## The switch "output_json_string" enables output of the measurement as json. + ## That way it can be used in in a subsequent processor plugin, e.g. "Starlark Processor Plugin". + # output_json_string = false + + ## Define extra tags related to subscription to be added to the output metric (optional) + # [inputs.ctrlx_datalayer.subscription.tags] + # subscription_tag1 = "subscription_tag1" + # subscription_tag2 = "subscription_tag2" + + ## The interval in which messages shall be sent by the ctrlX Data Layer to this plugin. (default: 1s) + ## Higher values reduce load on network by queuing samples on server side and sending as a single TCP packet. + # publish_interval = "1s" + + ## The interval a "keepalive" message is sent if no change of data occurs. (default: 60s) + ## Only used internally to detect broken network connections. + # keep_alive_interval = "60s" + + ## The interval an "error" message is sent if an error was received from a node. (default: 10s) + ## Higher values reduce load on output target and network in case of errors by limiting frequency of error messages. + # error_interval = "10s" + + ## The interval that defines the fastest rate at which the node values should be sampled and values captured. (default: 1s) + ## The sampling frequency should be adjusted to the dynamics of the signal to be sampled. + ## Higher sampling frequence increases load on ctrlX Data Layer. + ## The sampling frequency can be higher, than the publish interval. Captured samples are put in a queue and sent in publish interval. + ## Note: The minimum sampling interval can be overruled by a global setting in the ctrlX Data Layer configuration ('datalayer/subscriptions/settings'). + # sampling_interval = "1s" + + ## The requested size of the node value queue. (default: 10) + ## Relevant if more values are captured than can be sent. + # queue_size = 10 + + ## The behaviour of the queue if it is full. (default: "DiscardOldest") + ## Possible values: + ## - "DiscardOldest" + ## The oldest value gets deleted from the queue when it is full. + ## - "DiscardNewest" + ## The newest value gets deleted from the queue when it is full. + # queue_behaviour = "DiscardOldest" + + ## The filter when a new value will be sampled. (default: 0.0) + ## Calculation rule: If (abs(lastCapturedValue - newValue) > dead_band_value) capture(newValue). + # dead_band_value = 0.0 + + ## The conditions on which a sample should be captured and thus will be sent as a message. (default: "StatusValue") + ## Possible values: + ## - "Status" + ## Capture the value only, when the state of the node changes from or to error state. Value changes are ignored. + ## - "StatusValue" + ## Capture when the value changes or the node changes from or to error state. + ## See also 'dead_band_value' for what is considered as a value change. + ## - "StatusValueTimestamp": + ## Capture even if the value is the same, but the timestamp of the value is newer. + ## Note: This might lead to high load on the network because every sample will be sent as a message + ## even if the value of the node did not change. + # value_change = "StatusValue" +