feat: Add input plugin for ctrlX Data Layer (#11155)
This commit is contained in:
parent
05aab954cf
commit
2a33f496c4
|
|
@ -82,6 +82,7 @@ following works:
|
||||||
- github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE)
|
- github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE)
|
||||||
- github.com/blues/jsonata-go [MIT License](https://github.com/blues/jsonata-go/blob/main/LICENSE)
|
- github.com/blues/jsonata-go [MIT License](https://github.com/blues/jsonata-go/blob/main/LICENSE)
|
||||||
- github.com/bmatcuk/doublestar [MIT License](https://github.com/bmatcuk/doublestar/blob/master/LICENSE)
|
- github.com/bmatcuk/doublestar [MIT License](https://github.com/bmatcuk/doublestar/blob/master/LICENSE)
|
||||||
|
- github.com/boschrexroth/ctrlx-datalayer-golang [MIT License](https://github.com/boschrexroth/ctrlx-datalayer-golang/blob/main/LICENSE)
|
||||||
- github.com/bufbuild/protocompile [Apache License 2.0](https://github.com/bufbuild/protocompile/blob/main/LICENSE)
|
- github.com/bufbuild/protocompile [Apache License 2.0](https://github.com/bufbuild/protocompile/blob/main/LICENSE)
|
||||||
- github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE)
|
- github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE)
|
||||||
- github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE)
|
- github.com/cenkalti/backoff [MIT License](https://github.com/cenkalti/backoff/blob/master/LICENSE)
|
||||||
|
|
|
||||||
1
go.mod
1
go.mod
|
|
@ -51,6 +51,7 @@ require (
|
||||||
github.com/benbjohnson/clock v1.3.3
|
github.com/benbjohnson/clock v1.3.3
|
||||||
github.com/blues/jsonata-go v1.5.4
|
github.com/blues/jsonata-go v1.5.4
|
||||||
github.com/bmatcuk/doublestar/v3 v3.0.0
|
github.com/bmatcuk/doublestar/v3 v3.0.0
|
||||||
|
github.com/boschrexroth/ctrlx-datalayer-golang v1.3.0
|
||||||
github.com/caio/go-tdigest v3.1.0+incompatible
|
github.com/caio/go-tdigest v3.1.0+incompatible
|
||||||
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df
|
github.com/cisco-ie/nx-telemetry-proto v0.0.0-20230117155933-f64c045c77df
|
||||||
github.com/clarify/clarify-go v0.2.4
|
github.com/clarify/clarify-go v0.2.4
|
||||||
|
|
|
||||||
8
go.sum
8
go.sum
|
|
@ -371,6 +371,8 @@ github.com/bmatcuk/doublestar/v3 v3.0.0/go.mod h1:6PcTVMw80pCY1RVuoqu3V++99uQB3v
|
||||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
|
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
|
||||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||||
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
|
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
|
||||||
|
github.com/boschrexroth/ctrlx-datalayer-golang v1.3.0 h1:rwOJNZEGwMGbKziTcGpcoMdK0lfZE78lxR+UzLw+pRM=
|
||||||
|
github.com/boschrexroth/ctrlx-datalayer-golang v1.3.0/go.mod h1:i0ex6o3HhWHDSS0KEmRuHZOk3FVdJamzyk+tp3qmxkg=
|
||||||
github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA=
|
github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA=
|
||||||
github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8=
|
github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8=
|
||||||
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
|
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
|
||||||
|
|
@ -692,6 +694,7 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ
|
||||||
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
|
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
|
||||||
github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc h1:jd+stC3Fqf9kaqgCLOdm4Da/AN3txPTlmLB6tStXAcU=
|
github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc h1:jd+stC3Fqf9kaqgCLOdm4Da/AN3txPTlmLB6tStXAcU=
|
||||||
github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc/go.mod h1:YzWEoI07MC/a/wj9in8GeVatqfypkldgBlwXh9bCwqY=
|
github.com/google/cel-go v0.14.1-0.20230424164844-d39523c445fc/go.mod h1:YzWEoI07MC/a/wj9in8GeVatqfypkldgBlwXh9bCwqY=
|
||||||
|
github.com/google/flatbuffers v1.12.0/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
||||||
github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
||||||
github.com/google/flatbuffers v23.3.3+incompatible h1:5PJI/WbJkaMTvpGxsHVKG/LurN/KnWXNyGpwSCDgen0=
|
github.com/google/flatbuffers v23.3.3+incompatible h1:5PJI/WbJkaMTvpGxsHVKG/LurN/KnWXNyGpwSCDgen0=
|
||||||
github.com/google/flatbuffers v23.3.3+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
github.com/google/flatbuffers v23.3.3+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
|
||||||
|
|
@ -1073,6 +1076,10 @@ github.com/linkedin/goavro/v2 v2.12.0 h1:rIQQSj8jdAUlKQh6DttK8wCRv4t4QO09g1C4aBW
|
||||||
github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
|
github.com/linkedin/goavro/v2 v2.12.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
|
||||||
github.com/logzio/azure-monitor-metrics-receiver v1.0.0 h1:TAzhIZL2ueyyc81qIw8FGg4nUbts4Hvc3oOxSobY1IA=
|
github.com/logzio/azure-monitor-metrics-receiver v1.0.0 h1:TAzhIZL2ueyyc81qIw8FGg4nUbts4Hvc3oOxSobY1IA=
|
||||||
github.com/logzio/azure-monitor-metrics-receiver v1.0.0/go.mod h1:UIaQ7UgxZ8jO3L0JB2hctsHFBbZqL6mbxYscQAeFpl4=
|
github.com/logzio/azure-monitor-metrics-receiver v1.0.0/go.mod h1:UIaQ7UgxZ8jO3L0JB2hctsHFBbZqL6mbxYscQAeFpl4=
|
||||||
|
github.com/loov/hrtime v1.0.1/go.mod h1:yDY3Pwv2izeY4sq7YcPX/dtLwzg5NU1AxWuWxKwd0p0=
|
||||||
|
github.com/loov/hrtime v1.0.3/go.mod h1:yDY3Pwv2izeY4sq7YcPX/dtLwzg5NU1AxWuWxKwd0p0=
|
||||||
|
github.com/loov/hrtime/hrplot v1.0.2/go.mod h1:9t65xYn4d42ntjv40Wt5lbU72/VC5S0zGDgjC8kD5BU=
|
||||||
|
github.com/loov/plot v0.0.0-20200413101321-e09a6f01d2f5/go.mod h1:gSrhfSMoiPGG0CZ9E66kXjaHxFw0fzJhooyicOnz5z4=
|
||||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
|
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
|
||||||
github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c h1:VtwQ41oftZwlMnOEbMWQtSEUgU64U4s+GHk7hZK+jtY=
|
github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c h1:VtwQ41oftZwlMnOEbMWQtSEUgU64U4s+GHk7hZK+jtY=
|
||||||
github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE=
|
github.com/lufia/plan9stats v0.0.0-20220913051719-115f729f3c8c/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE=
|
||||||
|
|
@ -1235,6 +1242,7 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||||
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
|
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
|
||||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||||
|
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4=
|
||||||
github.com/olivere/elastic v6.2.37+incompatible h1:UfSGJem5czY+x/LqxgeCBgjDn6St+z8OnsCuxwD3L0U=
|
github.com/olivere/elastic v6.2.37+incompatible h1:UfSGJem5czY+x/LqxgeCBgjDn6St+z8OnsCuxwD3L0U=
|
||||||
github.com/olivere/elastic v6.2.37+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
|
github.com/olivere/elastic v6.2.37+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
|
||||||
github.com/olivere/elastic/v7 v7.0.12/go.mod h1:14rWX28Pnh3qCKYRVnSGXWLf9MbLonYS/4FDCY3LAPo=
|
github.com/olivere/elastic/v7 v7.0.12/go.mod h1:14rWX28Pnh3qCKYRVnSGXWLf9MbLonYS/4FDCY3LAPo=
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
//go:build !custom || inputs || inputs.ctrlx_datalayer
|
||||||
|
|
||||||
|
package all
|
||||||
|
|
||||||
|
import _ "github.com/influxdata/telegraf/plugins/inputs/ctrlx_datalayer" // register plugin
|
||||||
|
|
@ -0,0 +1,386 @@
|
||||||
|
# ctrlX Data Layer Input Plugin
|
||||||
|
|
||||||
|
The `ctrlx_datalayer` plugin gathers data from the ctrlX Data Layer,
|
||||||
|
a communication middleware runnning on
|
||||||
|
[ctrlX CORE devices](https://ctrlx-core.com) from
|
||||||
|
[Bosch Rexroth](https://boschrexroth.com). The platform is used for
|
||||||
|
professional automation applications like industrial automation, building
|
||||||
|
automation, robotics, IoT Gateways or as classical PLC. For more
|
||||||
|
information, see [ctrlX AUTOMATION](https://ctrlx-automation.com).
|
||||||
|
|
||||||
|
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
|
||||||
|
|
||||||
|
In addition to the plugin-specific configuration settings, plugins support
|
||||||
|
additional global and plugin configuration settings. These settings are used to
|
||||||
|
modify metrics, tags, and field or create aliases and configure ordering, etc.
|
||||||
|
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
|
|
||||||
|
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
```toml @sample.conf
|
||||||
|
# A ctrlX Data Layer server sent event input plugin
|
||||||
|
[[inputs.ctrlx_datalayer]]
|
||||||
|
## Hostname or IP address of the ctrlX CORE Data Layer server
|
||||||
|
## example: server = "localhost" # Telegraf is running directly on the device
|
||||||
|
## server = "192.168.1.1" # Connect to ctrlX CORE remote via IP
|
||||||
|
## server = "host.example.com" # Connect to ctrlX CORE remote via hostname
|
||||||
|
## server = "10.0.2.2:8443" # Connect to ctrlX CORE Virtual from development environment
|
||||||
|
server = "localhost"
|
||||||
|
|
||||||
|
## Authentication credentials
|
||||||
|
username = "boschrexroth"
|
||||||
|
password = "boschrexroth"
|
||||||
|
|
||||||
|
## Use TLS but skip chain & host verification
|
||||||
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## Timeout for HTTP requests. (default: "10s")
|
||||||
|
# timeout = "10s"
|
||||||
|
|
||||||
|
|
||||||
|
## Create a ctrlX Data Layer subscription.
|
||||||
|
## It is possible to define multiple subscriptions per host. Each subscription can have its own
|
||||||
|
## sampling properties and a list of nodes to subscribe to.
|
||||||
|
## All subscriptions share the same credentials.
|
||||||
|
[[inputs.ctrlx_datalayer.subscription]]
|
||||||
|
## The name of the measurement. (default: "ctrlx")
|
||||||
|
measurement = "memory"
|
||||||
|
|
||||||
|
## Configure the ctrlX Data Layer nodes which should be subscribed.
|
||||||
|
## address - node address in ctrlX Data Layer (mandatory)
|
||||||
|
## name - field name to use in the output (optional, default: base name of address)
|
||||||
|
## tags - extra node tags to be added to the output metric (optional)
|
||||||
|
## Note:
|
||||||
|
## Use either the inline notation or the bracketed notation, not both.
|
||||||
|
## The tags property is only supported in bracketed notation due to toml parser restrictions
|
||||||
|
## Examples:
|
||||||
|
## Inline notation
|
||||||
|
nodes=[
|
||||||
|
{name="available", address="framework/metrics/system/memavailable-mb"},
|
||||||
|
{name="used", address="framework/metrics/system/memused-mb"},
|
||||||
|
]
|
||||||
|
## Bracketed notation
|
||||||
|
# [[inputs.ctrlx_datalayer.subscription.nodes]]
|
||||||
|
# name ="available"
|
||||||
|
# address="framework/metrics/system/memavailable-mb"
|
||||||
|
# ## Define extra tags related to node to be added to the output metric (optional)
|
||||||
|
# [inputs.ctrlx_datalayer.subscription.nodes.tags]
|
||||||
|
# node_tag1="node_tag1"
|
||||||
|
# node_tag2="node_tag2"
|
||||||
|
# [[inputs.ctrlx_datalayer.subscription.nodes]]
|
||||||
|
# name ="used"
|
||||||
|
# address="framework/metrics/system/memused-mb"
|
||||||
|
|
||||||
|
## The switch "output_json_string" enables output of the measurement as json.
|
||||||
|
## That way it can be used in in a subsequent processor plugin, e.g. "Starlark Processor Plugin".
|
||||||
|
# output_json_string = false
|
||||||
|
|
||||||
|
## Define extra tags related to subscription to be added to the output metric (optional)
|
||||||
|
# [inputs.ctrlx_datalayer.subscription.tags]
|
||||||
|
# subscription_tag1 = "subscription_tag1"
|
||||||
|
# subscription_tag2 = "subscription_tag2"
|
||||||
|
|
||||||
|
## The interval in which messages shall be sent by the ctrlX Data Layer to this plugin. (default: 1s)
|
||||||
|
## Higher values reduce load on network by queuing samples on server side and sending as a single TCP packet.
|
||||||
|
# publish_interval = "1s"
|
||||||
|
|
||||||
|
## The interval a "keepalive" message is sent if no change of data occurs. (default: 60s)
|
||||||
|
## Only used internally to detect broken network connections.
|
||||||
|
# keep_alive_interval = "60s"
|
||||||
|
|
||||||
|
## The interval an "error" message is sent if an error was received from a node. (default: 10s)
|
||||||
|
## Higher values reduce load on output target and network in case of errors by limiting frequency of error messages.
|
||||||
|
# error_interval = "10s"
|
||||||
|
|
||||||
|
## The interval that defines the fastest rate at which the node values should be sampled and values captured. (default: 1s)
|
||||||
|
## The sampling frequency should be adjusted to the dynamics of the signal to be sampled.
|
||||||
|
## Higher sampling frequence increases load on ctrlX Data Layer.
|
||||||
|
## The sampling frequency can be higher, than the publish interval. Captured samples are put in a queue and sent in publish interval.
|
||||||
|
## Note: The minimum sampling interval can be overruled by a global setting in the ctrlX Data Layer configuration ('datalayer/subscriptions/settings').
|
||||||
|
# sampling_interval = "1s"
|
||||||
|
|
||||||
|
## The requested size of the node value queue. (default: 10)
|
||||||
|
## Relevant if more values are captured than can be sent.
|
||||||
|
# queue_size = 10
|
||||||
|
|
||||||
|
## The behaviour of the queue if it is full. (default: "DiscardOldest")
|
||||||
|
## Possible values:
|
||||||
|
## - "DiscardOldest"
|
||||||
|
## The oldest value gets deleted from the queue when it is full.
|
||||||
|
## - "DiscardNewest"
|
||||||
|
## The newest value gets deleted from the queue when it is full.
|
||||||
|
# queue_behaviour = "DiscardOldest"
|
||||||
|
|
||||||
|
## The filter when a new value will be sampled. (default: 0.0)
|
||||||
|
## Calculation rule: If (abs(lastCapturedValue - newValue) > dead_band_value) capture(newValue).
|
||||||
|
# dead_band_value = 0.0
|
||||||
|
|
||||||
|
## The conditions on which a sample should be captured and thus will be sent as a message. (default: "StatusValue")
|
||||||
|
## Possible values:
|
||||||
|
## - "Status"
|
||||||
|
## Capture the value only, when the state of the node changes from or to error state. Value changes are ignored.
|
||||||
|
## - "StatusValue"
|
||||||
|
## Capture when the value changes or the node changes from or to error state.
|
||||||
|
## See also 'dead_band_value' for what is considered as a value change.
|
||||||
|
## - "StatusValueTimestamp":
|
||||||
|
## Capture even if the value is the same, but the timestamp of the value is newer.
|
||||||
|
## Note: This might lead to high load on the network because every sample will be sent as a message
|
||||||
|
## even if the value of the node did not change.
|
||||||
|
# value_change = "StatusValue"
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
## Metrics
|
||||||
|
|
||||||
|
All measurements are tagged with the server address of the device and the
|
||||||
|
corresponding node address as defined in the ctrlX Data Layer.
|
||||||
|
|
||||||
|
- measurement name
|
||||||
|
- tags:
|
||||||
|
- `source` (ctrlX Data Layer server where the metrics are gathered from)
|
||||||
|
- `node` (Address of the ctrlX Data Layer node)
|
||||||
|
- fields:
|
||||||
|
- `{name}` (for nodes with simple data types)
|
||||||
|
- `{name}_{index}`(for nodes with array data types)
|
||||||
|
- `{name}_{jsonflat.key}` (for nodes with object data types)
|
||||||
|
|
||||||
|
### Output Format
|
||||||
|
|
||||||
|
The switch "output_json_string" determines the format of the output metric.
|
||||||
|
|
||||||
|
#### Output default format
|
||||||
|
|
||||||
|
With the output default format
|
||||||
|
|
||||||
|
```toml
|
||||||
|
output_json_string=false
|
||||||
|
```
|
||||||
|
|
||||||
|
the output is formatted automatically as follows depending on the data type:
|
||||||
|
|
||||||
|
##### Simple data type
|
||||||
|
|
||||||
|
The value is passed 'as it is' to a metric with pattern:
|
||||||
|
|
||||||
|
```text
|
||||||
|
{name}={value}
|
||||||
|
```
|
||||||
|
|
||||||
|
Simple data types of ctrlX Data Layer:
|
||||||
|
|
||||||
|
```text
|
||||||
|
bool8,int8,uint8,int16,uint16,int32,uint32,int64,uint64,float,double,string,timestamp
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Array data type
|
||||||
|
|
||||||
|
Every value in the array is passed to a metric with pattern:
|
||||||
|
|
||||||
|
```text
|
||||||
|
{name}_{index}={value[index]}
|
||||||
|
```
|
||||||
|
|
||||||
|
example:
|
||||||
|
|
||||||
|
```text
|
||||||
|
myarray=[1,2,3] -> myarray_1=1, myarray_2=2, myarray_3=3
|
||||||
|
```
|
||||||
|
|
||||||
|
Array data types of ctrlX Data Layer:
|
||||||
|
|
||||||
|
```text
|
||||||
|
arbool8,arint8,aruint8,arint16,aruint16,arint32,aruint32,arint64,aruint64,arfloat,ardouble,arstring,artimestamp
|
||||||
|
```
|
||||||
|
|
||||||
|
##### Object data type (JSON)
|
||||||
|
|
||||||
|
Every value of the flattened json is passed to a metric with pattern:
|
||||||
|
|
||||||
|
```text
|
||||||
|
{name}_{jsonflat.key}={jsonflat.value}
|
||||||
|
```
|
||||||
|
|
||||||
|
example:
|
||||||
|
|
||||||
|
```text
|
||||||
|
myobj={"a":1,"b":2,"c":{"d": 3}} -> myobj_a=1, myobj_b=2, myobj_c_d=3
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Output JSON format
|
||||||
|
|
||||||
|
With the output JSON format
|
||||||
|
|
||||||
|
```toml
|
||||||
|
output_json_string=true
|
||||||
|
```
|
||||||
|
|
||||||
|
the output is formatted as JSON string:
|
||||||
|
|
||||||
|
```text
|
||||||
|
{name}="{value}"
|
||||||
|
```
|
||||||
|
|
||||||
|
examples:
|
||||||
|
|
||||||
|
```text
|
||||||
|
input=true -> output="true"
|
||||||
|
```
|
||||||
|
|
||||||
|
```text
|
||||||
|
input=[1,2,3] -> output="[1,2,3]"
|
||||||
|
```
|
||||||
|
|
||||||
|
```text
|
||||||
|
input={"x":4720,"y":9440,"z":{"d": 14160}} -> output="{\"x\":4720,\"y\":9440,\"z\":14160}"
|
||||||
|
```
|
||||||
|
|
||||||
|
The JSON output string can be passed to a processor plugin for transformation
|
||||||
|
e.g. [Parser Processor Plugin][PARSER.md]
|
||||||
|
or [Starlark Processor Plugin][STARLARK.md]
|
||||||
|
|
||||||
|
[PARSER.md]: ../../processors/parser/README.md
|
||||||
|
[STARLARK.md]: ../../processors/starlark/README.md
|
||||||
|
|
||||||
|
example:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.ctrlx_datalayer.subscription]]
|
||||||
|
measurement = "osci"
|
||||||
|
nodes = [
|
||||||
|
{address="oscilloscope/instances/Osci_PLC/rec-values/allsignals"},
|
||||||
|
]
|
||||||
|
output_json_string = true
|
||||||
|
|
||||||
|
[[processors.starlark]]
|
||||||
|
namepass = [
|
||||||
|
'osci',
|
||||||
|
]
|
||||||
|
script = "oscilloscope.star"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Troubleshooting
|
||||||
|
|
||||||
|
This plugin was contributed by
|
||||||
|
[Bosch Rexroth](https://www.boschrexroth.com).
|
||||||
|
For questions regarding ctrlX AUTOMATION and this plugin feel
|
||||||
|
free to check out and be part of the
|
||||||
|
[ctrlX AUTOMATION Community](https://ctrlx-automation.com/community)
|
||||||
|
to get additional support or leave some ideas and feedback.
|
||||||
|
|
||||||
|
Also, join
|
||||||
|
[InfluxData Community Slack](https://influxdata.com/slack) or
|
||||||
|
[InfluxData Community Page](https://community.influxdata.com/)
|
||||||
|
if you have questions or comments for the telegraf engineering teams.
|
||||||
|
|
||||||
|
## Example Output
|
||||||
|
|
||||||
|
The plugin handles simple, array and object (JSON) data types.
|
||||||
|
|
||||||
|
### Example with simple data type
|
||||||
|
|
||||||
|
Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.ctrlx_datalayer.subscription]]
|
||||||
|
measurement="memory"
|
||||||
|
[inputs.ctrlx_datalayer.subscription.tags]
|
||||||
|
sub_tag1="memory_tag1"
|
||||||
|
sub_tag2="memory_tag2"
|
||||||
|
|
||||||
|
[[inputs.ctrlx_datalayer.subscription.nodes]]
|
||||||
|
name ="available"
|
||||||
|
address="framework/metrics/system/memavailable-mb"
|
||||||
|
[inputs.ctrlx_datalayer.subscription.nodes.tags]
|
||||||
|
node_tag1="memory_available_tag1"
|
||||||
|
node_tag2="memory_available_tag2"
|
||||||
|
|
||||||
|
[[inputs.ctrlx_datalayer.subscription.nodes]]
|
||||||
|
name ="used"
|
||||||
|
address="framework/metrics/system/memused-mb"
|
||||||
|
[inputs.ctrlx_datalayer.subscription.nodes.tags]
|
||||||
|
node_tag1="memory_used_node_tag1"
|
||||||
|
node_tag2="memory_used_node_tag2"
|
||||||
|
```
|
||||||
|
|
||||||
|
Source:
|
||||||
|
|
||||||
|
```json
|
||||||
|
"framework/metrics/system/memavailable-mb" : 365.93359375
|
||||||
|
"framework/metrics/system/memused-mb" : 567.67578125
|
||||||
|
```
|
||||||
|
|
||||||
|
Metrics:
|
||||||
|
|
||||||
|
```text
|
||||||
|
memory,source=192.168.1.1,host=host.example.com,node=framework/metrics/system/memavailable-mb,node_tag1=memory_available_tag1,node_tag2=memory_available_tag2,sub_tag1=memory2_tag1,sub_tag2=memory_tag2 available=365.93359375 1680093310249627400
|
||||||
|
memory,source=192.168.1.1,host=host.example.com,node=framework/metrics/system/memused-mb,node_tag1=memory_used_node_tag1,node_tag2=memory_used_node_tag2,sub_tag1=memory2_tag1,sub_tag2=memory_tag2 used=567.67578125 1680093310249667600
|
||||||
|
```
|
||||||
|
|
||||||
|
### Example with array data type
|
||||||
|
|
||||||
|
Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.ctrlx_datalayer.subscription]]
|
||||||
|
measurement="array"
|
||||||
|
nodes=[
|
||||||
|
{ name="ar_uint8", address="alldata/dynamic/array-of-uint8"},
|
||||||
|
{ name="ar_bool8", address="alldata/dynamic/array-of-bool8"},
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
Source:
|
||||||
|
|
||||||
|
```json
|
||||||
|
"alldata/dynamic/array-of-bool8" : [true, false, true]
|
||||||
|
"alldata/dynamic/array-of-uint8" : [0, 255]
|
||||||
|
```
|
||||||
|
|
||||||
|
Metrics:
|
||||||
|
|
||||||
|
```text
|
||||||
|
array,source=192.168.1.1,host=host.example.com,node=alldata/dynamic/array-of-bool8 ar_bool8_0=true,ar_bool8_1=false,ar_bool8_2=true 1680095727347018800
|
||||||
|
array,source=192.168.1.1,host=host.example.com,node=alldata/dynamic/array-of-uint8 ar_uint8_0=0,ar_uint8_1=255 1680095727347223300
|
||||||
|
```
|
||||||
|
|
||||||
|
### Example with object data type (JSON)
|
||||||
|
|
||||||
|
Configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.ctrlx_datalayer.subscription]]
|
||||||
|
measurement="motion"
|
||||||
|
nodes=[
|
||||||
|
{name="linear", address="motion/axs/Axis_1/state/values/actual"},
|
||||||
|
{name="rotational", address="motion/axs/Axis_2/state/values/actual"},
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
Source:
|
||||||
|
|
||||||
|
```json
|
||||||
|
"motion/axs/Axis_1/state/values/actual" : {"actualPos":65.249329860957,"actualVel":5,"actualAcc":0,"actualTorque":0,"distLeft":0,"actualPosUnit":"mm","actualVelUnit":"mm/min","actualAccUnit":"m/s^2","actualTorqueUnit":"Nm","distLeftUnit":"mm"}
|
||||||
|
"motion/axs/Axis_2/state/values/actual" : {"actualPos":120,"actualVel":0,"actualAcc":0,"actualTorque":0,"distLeft":0,"actualPosUnit":"deg","actualVelUnit":"rpm","actualAccUnit":"rad/s^2","actualTorqueUnit":"Nm","distLeftUnit":"deg"}
|
||||||
|
```
|
||||||
|
|
||||||
|
Metrics:
|
||||||
|
|
||||||
|
```text
|
||||||
|
motion,source=192.168.1.1,host=host.example.com,node=motion/axs/Axis_1/state/values/actual linear_actualVel=5,linear_distLeftUnit="mm",linear_actualAcc=0,linear_distLeft=0,linear_actualPosUnit="mm",linear_actualAccUnit="m/s^2",linear_actualTorqueUnit="Nm",linear_actualPos=65.249329860957,linear_actualVelUnit="mm/min",linear_actualTorque=0 1680258290342523500
|
||||||
|
motion,source=192.168.1.1,host=host.example.com,node=motion/axs/Axis_2/state/values/actual rotational_distLeft=0,rotational_actualVelUnit="rpm",rotational_actualAccUnit="rad/s^2",rotational_distLeftUnit="deg",rotational_actualPos=120,rotational_actualVel=0,rotational_actualAcc=0,rotational_actualPosUnit="deg",rotational_actualTorqueUnit="Nm",rotational_actualTorque=0 1680258290342538100
|
||||||
|
```
|
||||||
|
|
||||||
|
If `output_json_string` is set in the configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
output_json_string = true
|
||||||
|
```
|
||||||
|
|
||||||
|
then the metrics will be generated like this:
|
||||||
|
|
||||||
|
```text
|
||||||
|
motion,source=192.168.1.1,host=host.example.com,node=motion/axs/Axis_1/state/values/actual linear="{\"actualAcc\":0,\"actualAccUnit\":\"m/s^2\",\"actualPos\":65.249329860957,\"actualPosUnit\":\"mm\",\"actualTorque\":0,\"actualTorqueUnit\":\"Nm\",\"actualVel\":5,\"actualVelUnit\":\"mm/min\",\"distLeft\":0,\"distLeftUnit\":\"mm\"}" 1680258290342523500
|
||||||
|
motion,source=192.168.1.1,host=host.example.com,node=motion/axs/Axis_2/state/values/actual rotational="{\"actualAcc\":0,\"actualAccUnit\":\"rad/s^2\",\"actualPos\":120,\"actualPosUnit\":\"deg\",\"actualTorque\":0,\"actualTorqueUnit\":\"Nm\",\"actualVel\":0,\"actualVelUnit\":\"rpm\",\"distLeft\":0,\"distLeftUnit\":\"deg\"}" 1680258290342538100
|
||||||
|
```
|
||||||
|
|
@ -0,0 +1,373 @@
|
||||||
|
//go:generate ../../../tools/readme_config_includer/generator
|
||||||
|
package ctrlx_datalayer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
_ "embed"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/boschrexroth/ctrlx-datalayer-golang/pkg/sseclient"
|
||||||
|
"github.com/boschrexroth/ctrlx-datalayer-golang/pkg/token"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal/choice"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
jsonParser "github.com/influxdata/telegraf/plugins/parsers/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This plugin is based on the official ctrlX CORE API. Documentation can be found in OpenAPI format at:
|
||||||
|
// https://boschrexroth.github.io/rest-api-description/ctrlx-automation/ctrlx-core/
|
||||||
|
// Used APIs are:
|
||||||
|
// * ctrlX CORE - Authorization and Authentication API
|
||||||
|
// * ctrlX CORE - Data Layer API
|
||||||
|
//
|
||||||
|
// All communication between the device and this input plugin is based
|
||||||
|
// on https REST and HTML5 Server Sent Events (sse).
|
||||||
|
|
||||||
|
//go:embed sample.conf
|
||||||
|
var sampleConfig string
|
||||||
|
|
||||||
|
// CtrlXDataLayer encapsulated the configuration as well as the state of this plugin.
|
||||||
|
type CtrlXDataLayer struct {
|
||||||
|
Server string `toml:"server"`
|
||||||
|
Username config.Secret `toml:"username"`
|
||||||
|
Password config.Secret `toml:"password"`
|
||||||
|
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
Subscription []Subscription
|
||||||
|
|
||||||
|
url string
|
||||||
|
wg sync.WaitGroup
|
||||||
|
cancel context.CancelFunc
|
||||||
|
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
connection *http.Client
|
||||||
|
tokenManager token.TokenManager
|
||||||
|
httpconfig.HTTPClientConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
// convertTimestamp2UnixTime converts the given Data Layer timestamp of the payload to UnixTime.
|
||||||
|
func convertTimestamp2UnixTime(t int64) time.Time {
|
||||||
|
// 1 sec=1000 milisec=1000000 microsec=1000000000 nanosec.
|
||||||
|
// Convert from FILETIME (100-nanosecond intervals since January 1, 1601 UTC) to
|
||||||
|
// seconds and nanoseconds since January 1, 1970 UTC.
|
||||||
|
// Between Jan 1, 1601 and Jan 1, 1970 there are 11644473600 seconds.
|
||||||
|
return time.Unix(0, (t-116444736000000000)*100)
|
||||||
|
}
|
||||||
|
|
||||||
|
// createSubscription uses the official 'ctrlX Data Layer API' to create the sse subscription.
|
||||||
|
func (c *CtrlXDataLayer) createSubscription(sub *Subscription) (string, error) {
|
||||||
|
sseURL := c.url + subscriptionPath
|
||||||
|
|
||||||
|
id := "telegraf_" + uuid.New().String()
|
||||||
|
request := sub.createRequest(id)
|
||||||
|
payload, err := json.Marshal(request)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to create subscription %d payload: %w", sub.index, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
requestBody := bytes.NewBuffer(payload)
|
||||||
|
req, err := http.NewRequest("POST", sseURL, requestBody)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to create subscription %d request: %w", sub.index, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Add("Authorization", c.tokenManager.Token.String())
|
||||||
|
|
||||||
|
resp, err := c.connection.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to do request to create sse subscription %d: %w", sub.index, err)
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != 200 && resp.StatusCode != 201 {
|
||||||
|
return "", fmt.Errorf("failed to create sse subscription %d, status: %s", sub.index, resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
return sseURL + "/" + id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createSubscriptionAndSseClient creates a sse subscription on the server and
|
||||||
|
// initializes a sse client to receive sse events from the server.
|
||||||
|
func (c *CtrlXDataLayer) createSubscriptionAndSseClient(sub *Subscription) (*sseclient.SseClient, error) {
|
||||||
|
t, err := c.tokenManager.RequestAuthToken()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
subURL, err := c.createSubscription(sub)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
client := sseclient.NewSseClient(subURL, t.String(), c.InsecureSkipVerify)
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// addMetric writes sse metric into accumulator.
|
||||||
|
func (c *CtrlXDataLayer) addMetric(se *sseclient.SseEvent, sub *Subscription) {
|
||||||
|
switch se.Event {
|
||||||
|
case "update":
|
||||||
|
// Received an updated value, that we translate into a metric
|
||||||
|
var d sseEventData
|
||||||
|
|
||||||
|
if err := json.Unmarshal([]byte(se.Data), &d); err != nil {
|
||||||
|
c.acc.AddError(fmt.Errorf("received malformed data from 'update' event: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m, err := c.createMetric(&d, sub)
|
||||||
|
if err != nil {
|
||||||
|
c.acc.AddError(fmt.Errorf("failed to create metrics: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.acc.AddMetric(m)
|
||||||
|
case "error":
|
||||||
|
// Received an error event, that we report to the accumulator
|
||||||
|
var e sseEventError
|
||||||
|
if err := json.Unmarshal([]byte(se.Data), &e); err != nil {
|
||||||
|
c.acc.AddError(fmt.Errorf("received malformed data from 'error' event: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.acc.AddError(fmt.Errorf("received 'error' event for node: %q", e.Instance))
|
||||||
|
case "keepalive":
|
||||||
|
// Keepalive events are ignored for the moment
|
||||||
|
c.Log.Debug("Received keepalive event")
|
||||||
|
default:
|
||||||
|
// Received a yet unsupported event type
|
||||||
|
c.Log.Debug("Received unsupported event: %q", se.Event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// createMetric - create metric depending on flag 'output_json' and data type
|
||||||
|
func (c *CtrlXDataLayer) createMetric(em *sseEventData, sub *Subscription) (telegraf.Metric, error) {
|
||||||
|
t := convertTimestamp2UnixTime(em.Timestamp)
|
||||||
|
node := sub.node(em.Node)
|
||||||
|
if node == nil {
|
||||||
|
return nil, errors.New("node not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
// default tags
|
||||||
|
tags := map[string]string{
|
||||||
|
"node": em.Node,
|
||||||
|
"source": c.Server,
|
||||||
|
}
|
||||||
|
|
||||||
|
// add tags of subscription if user has defined
|
||||||
|
for key, value := range sub.Tags {
|
||||||
|
tags[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
// add tags of node if user has defined
|
||||||
|
for key, value := range node.Tags {
|
||||||
|
tags[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
// set measurement of subscription
|
||||||
|
measurement := sub.Measurement
|
||||||
|
|
||||||
|
// get field key from node properties
|
||||||
|
fieldKey := node.fieldKey()
|
||||||
|
|
||||||
|
if fieldKey == "" {
|
||||||
|
return nil, errors.New("field key not valid")
|
||||||
|
}
|
||||||
|
|
||||||
|
if sub.OutputJSONString {
|
||||||
|
b, err := json.Marshal(em.Value)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
fields := map[string]interface{}{fieldKey: string(b)}
|
||||||
|
m := metric.New(measurement, tags, fields, t)
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch em.Type {
|
||||||
|
case "object":
|
||||||
|
flattener := jsonParser.JSONFlattener{}
|
||||||
|
err := flattener.FullFlattenJSON(fieldKey, em.Value, true, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
m := metric.New(measurement, tags, flattener.Fields, t)
|
||||||
|
return m, nil
|
||||||
|
case "arbool8",
|
||||||
|
"arint8", "aruint8",
|
||||||
|
"arint16", "aruint16",
|
||||||
|
"arint32", "aruint32",
|
||||||
|
"arint64", "aruint64",
|
||||||
|
"arfloat", "ardouble",
|
||||||
|
"arstring",
|
||||||
|
"artimestamp":
|
||||||
|
fields := make(map[string]interface{})
|
||||||
|
values := em.Value.([]interface{})
|
||||||
|
for i := 0; i < len(values); i++ {
|
||||||
|
index := strconv.Itoa(i)
|
||||||
|
key := fieldKey + "_" + index
|
||||||
|
fields[key] = values[i]
|
||||||
|
}
|
||||||
|
m := metric.New(measurement, tags, fields, t)
|
||||||
|
return m, nil
|
||||||
|
case "bool8",
|
||||||
|
"int8", "uint8",
|
||||||
|
"int16", "uint16",
|
||||||
|
"int32", "uint32",
|
||||||
|
"int64", "uint64",
|
||||||
|
"float", "double",
|
||||||
|
"string",
|
||||||
|
"timestamp":
|
||||||
|
fields := map[string]interface{}{fieldKey: em.Value}
|
||||||
|
m := metric.New(measurement, tags, fields, t)
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("unsupported value type: %s", em.Type)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init is for setup, and validating config
|
||||||
|
func (c *CtrlXDataLayer) Init() error {
|
||||||
|
// Check all configured subscriptions for valid settings
|
||||||
|
for i := range c.Subscription {
|
||||||
|
sub := &c.Subscription[i]
|
||||||
|
sub.applyDefaultSettings()
|
||||||
|
if !choice.Contains(sub.QueueBehaviour, queueBehaviours) {
|
||||||
|
c.Log.Infof("The right queue behaviour values are %v", queueBehaviours)
|
||||||
|
return fmt.Errorf("subscription %d: setting 'queue_behaviour' %q is invalid", i, sub.QueueBehaviour)
|
||||||
|
}
|
||||||
|
if !choice.Contains(sub.ValueChange, valueChanges) {
|
||||||
|
c.Log.Infof("The right value change values are %v", valueChanges)
|
||||||
|
return fmt.Errorf("subscription %d: setting 'value_change' %q is invalid", i, sub.ValueChange)
|
||||||
|
}
|
||||||
|
if len(sub.Nodes) == 0 {
|
||||||
|
c.Log.Warn("A configured subscription has no nodes configured")
|
||||||
|
}
|
||||||
|
sub.index = i
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate valid communication url based on configured server address
|
||||||
|
u := url.URL{
|
||||||
|
Scheme: "https",
|
||||||
|
Host: c.Server,
|
||||||
|
}
|
||||||
|
c.url = u.String()
|
||||||
|
if _, err := url.Parse(c.url); err != nil {
|
||||||
|
return errors.New("invalid server address")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start input as service, retain the accumulator, establish the connection.
|
||||||
|
func (c *CtrlXDataLayer) Start(acc telegraf.Accumulator) error {
|
||||||
|
var ctx context.Context
|
||||||
|
ctx, c.cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
var err error
|
||||||
|
c.connection, err = c.HTTPClientConfig.CreateClient(ctx, c.Log)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create http client: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
username, err := c.Username.Get()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("getting username failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
password, err := c.Password.Get()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("getting password failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.tokenManager = token.TokenManager{
|
||||||
|
Url: c.url,
|
||||||
|
Username: string(username),
|
||||||
|
Password: string(password),
|
||||||
|
Connection: c.connection,
|
||||||
|
}
|
||||||
|
config.ReleaseSecret(username)
|
||||||
|
config.ReleaseSecret(password)
|
||||||
|
|
||||||
|
c.acc = acc
|
||||||
|
|
||||||
|
c.gatherLoop(ctx)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// gatherLoop creates sse subscriptions on the Data Layer and requests the sse data
|
||||||
|
// the connection will be restablished if the sse subscription is broken.
|
||||||
|
func (c *CtrlXDataLayer) gatherLoop(ctx context.Context) {
|
||||||
|
for _, sub := range c.Subscription {
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func(sub Subscription) {
|
||||||
|
defer c.wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
c.Log.Debugf("Gather loop for subscription %d stopped", sub.index)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
client, err := c.createSubscriptionAndSseClient(&sub)
|
||||||
|
if err != nil {
|
||||||
|
c.Log.Errorf("Creating sse client to subscription %d: %v", sub.index, err)
|
||||||
|
time.Sleep(time.Duration(defaultReconnectInterval))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
c.Log.Debugf("Created sse client to subscription %d", sub.index)
|
||||||
|
|
||||||
|
// Establish connection and handle events in a callback function.
|
||||||
|
err = client.Subscribe(ctx, func(event string, data string) {
|
||||||
|
c.addMetric(&sseclient.SseEvent{
|
||||||
|
Event: event,
|
||||||
|
Data: data,
|
||||||
|
}, &sub)
|
||||||
|
})
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
// Subscription cancelled
|
||||||
|
c.Log.Debugf("Requesting data of subscription %d cancelled", sub.index)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.Log.Errorf("Requesting data of subscription %d failed: %v", sub.index, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(sub)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop input as service.
|
||||||
|
func (c *CtrlXDataLayer) Stop() {
|
||||||
|
c.cancel()
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Gather is called by telegraf to collect the metrics.
|
||||||
|
func (c *CtrlXDataLayer) Gather(_ telegraf.Accumulator) error {
|
||||||
|
// Metrics are sent to the accumulator asynchronously in worker thread. So nothing to do here.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SampleConfig returns the auto-inserted sample configuration to the telegraf.
|
||||||
|
func (*CtrlXDataLayer) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
// init registers the plugin in telegraf.
|
||||||
|
func init() {
|
||||||
|
inputs.Add("ctrlx_datalayer", func() telegraf.Input {
|
||||||
|
return &CtrlXDataLayer{}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,19 @@
|
||||||
|
package ctrlx_datalayer
|
||||||
|
|
||||||
|
// Once a subscription is created, the server will send event notifications to this plugin.
|
||||||
|
// This file contains the different event types and the included event payload.
|
||||||
|
|
||||||
|
// sseEventData represents the json structure send by the ctrlX CORE
|
||||||
|
// server on an "update" event.
|
||||||
|
type sseEventData struct {
|
||||||
|
Node string `json:"node"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
Value interface{} `json:"value"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// sseEventError represents the json structure send by the ctrlX CORE
|
||||||
|
// server on an "error" event.
|
||||||
|
type sseEventError struct {
|
||||||
|
Instance string `json:"instance"`
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,188 @@
|
||||||
|
package ctrlx_datalayer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A subscription can be used to watch multiple ctrlX Data Layer nodes for changes.
|
||||||
|
// Additional configuration settings can be given to tune the sampling and monitoring behaviour of the nodes.
|
||||||
|
// All nodes in a subscription share the same configuration.
|
||||||
|
// The plugin is able to create and manage multiple subscriptions.
|
||||||
|
|
||||||
|
// The allowed values of the subscription property 'QueueBehaviour'
|
||||||
|
var queueBehaviours = []string{"DiscardOldest", "DiscardNewest"}
|
||||||
|
|
||||||
|
// The allowed values of the subscription property 'ValueChange'
|
||||||
|
var valueChanges = []string{"Status", "StatusValue", "StatusValueTimestamp"}
|
||||||
|
|
||||||
|
// The default subscription settings
|
||||||
|
const (
|
||||||
|
defaultKeepaliveInterval = config.Duration(60 * time.Second)
|
||||||
|
defaultErrorInterval = config.Duration(10 * time.Second)
|
||||||
|
defaultReconnectInterval = config.Duration(10 * time.Second)
|
||||||
|
defaultPublishInterval = config.Duration(1 * time.Second)
|
||||||
|
defaultSamplingInterval = config.Duration(1 * time.Second)
|
||||||
|
defaultQueueSize = 10
|
||||||
|
defaultQueueBehaviour = "DiscardOldest"
|
||||||
|
defaultValueChange = "StatusValue"
|
||||||
|
defaultMeasurementName = "ctrlx"
|
||||||
|
subscriptionPath = "/automation/api/v2/events"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Node contains all properties of a node configuration
|
||||||
|
type Node struct {
|
||||||
|
Name string `toml:"name"`
|
||||||
|
Address string `toml:"address"`
|
||||||
|
Tags map[string]string `toml:"tags"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscription contains all properties of a subscription configuration
|
||||||
|
type Subscription struct {
|
||||||
|
index int
|
||||||
|
Nodes []Node `toml:"nodes"`
|
||||||
|
Tags map[string]string `toml:"tags"`
|
||||||
|
Measurement string `toml:"measurement"`
|
||||||
|
PublishInterval config.Duration `toml:"publish_interval"`
|
||||||
|
KeepaliveInterval config.Duration `toml:"keep_alive_interval"`
|
||||||
|
ErrorInterval config.Duration `toml:"error_interval"`
|
||||||
|
SamplingInterval config.Duration `toml:"sampling_interval"`
|
||||||
|
QueueSize uint `toml:"queue_size"`
|
||||||
|
QueueBehaviour string `toml:"queue_behaviour"`
|
||||||
|
DeadBandValue float64 `toml:"dead_band_value"`
|
||||||
|
ValueChange string `toml:"value_change"`
|
||||||
|
OutputJSONString bool `toml:"output_json_string"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rule can be used to override default rule settings.
|
||||||
|
type Rule struct {
|
||||||
|
RuleType string `json:"rule_type"`
|
||||||
|
Rule interface{} `json:"rule"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sampling can be used to override default sampling settings.
|
||||||
|
type Sampling struct {
|
||||||
|
SamplingInterval uint64 `json:"samplingInterval"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queueing can be used to override default queuing settings.
|
||||||
|
type Queueing struct {
|
||||||
|
QueueSize uint `json:"queueSize"`
|
||||||
|
Behaviour string `json:"behaviour"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// DataChangeFilter can be used to override default data change filter settings.
|
||||||
|
type DataChangeFilter struct {
|
||||||
|
DeadBandValue float64 `json:"deadBandValue"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChangeEvents can be used to override default change events settings.
|
||||||
|
type ChangeEvents struct {
|
||||||
|
ValueChange string `json:"valueChange"`
|
||||||
|
BrowselistChange bool `json:"browselistChange"`
|
||||||
|
MetadataChange bool `json:"metadataChange"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscriptionProperties can be used to override default subscription settings.
|
||||||
|
type SubscriptionProperties struct {
|
||||||
|
KeepaliveInterval int64 `json:"keepaliveInterval"`
|
||||||
|
Rules []Rule `json:"rules"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
PublishInterval int64 `json:"publishInterval"`
|
||||||
|
ErrorInterval int64 `json:"errorInterval"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscriptionRequest can be used to to create a sse subscription at the ctrlX Data Layer.
|
||||||
|
type SubscriptionRequest struct {
|
||||||
|
Properties SubscriptionProperties `json:"properties"`
|
||||||
|
Nodes []string `json:"nodes"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyDefaultSettings applies the default settings if they are not configured in the config file.
|
||||||
|
func (s *Subscription) applyDefaultSettings() {
|
||||||
|
if s.Measurement == "" {
|
||||||
|
s.Measurement = defaultMeasurementName
|
||||||
|
}
|
||||||
|
if s.PublishInterval == 0 {
|
||||||
|
s.PublishInterval = defaultPublishInterval
|
||||||
|
}
|
||||||
|
if s.KeepaliveInterval == 0 {
|
||||||
|
s.KeepaliveInterval = defaultKeepaliveInterval
|
||||||
|
}
|
||||||
|
if s.ErrorInterval == 0 {
|
||||||
|
s.ErrorInterval = defaultErrorInterval
|
||||||
|
}
|
||||||
|
if s.SamplingInterval == 0 {
|
||||||
|
s.SamplingInterval = defaultSamplingInterval
|
||||||
|
}
|
||||||
|
if s.QueueSize == 0 {
|
||||||
|
s.QueueSize = defaultQueueSize
|
||||||
|
}
|
||||||
|
if s.QueueBehaviour == "" {
|
||||||
|
s.QueueBehaviour = defaultQueueBehaviour
|
||||||
|
}
|
||||||
|
if s.ValueChange == "" {
|
||||||
|
s.ValueChange = defaultValueChange
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// createRequestBody builds the request body for the sse subscription, based on the subscription configuration.
|
||||||
|
// The request body can be send to the server to create a new subscription.
|
||||||
|
func (s *Subscription) createRequest(id string) SubscriptionRequest {
|
||||||
|
pl := SubscriptionRequest{
|
||||||
|
Properties: SubscriptionProperties{
|
||||||
|
Rules: []Rule{
|
||||||
|
{"Sampling", Sampling{uint64(time.Duration(s.SamplingInterval).Microseconds())}},
|
||||||
|
{"Queueing", Queueing{s.QueueSize, s.QueueBehaviour}},
|
||||||
|
{"DataChangeFilter", DataChangeFilter{s.DeadBandValue}},
|
||||||
|
{"ChangeEvents", ChangeEvents{s.ValueChange, false, false}},
|
||||||
|
},
|
||||||
|
ID: id,
|
||||||
|
KeepaliveInterval: time.Duration(s.KeepaliveInterval).Milliseconds(),
|
||||||
|
PublishInterval: time.Duration(s.PublishInterval).Milliseconds(),
|
||||||
|
ErrorInterval: time.Duration(s.ErrorInterval).Milliseconds(),
|
||||||
|
},
|
||||||
|
Nodes: s.addressList(),
|
||||||
|
}
|
||||||
|
|
||||||
|
return pl
|
||||||
|
}
|
||||||
|
|
||||||
|
// addressList lists all configured node addresses
|
||||||
|
func (s *Subscription) addressList() []string {
|
||||||
|
addressList := []string{}
|
||||||
|
for _, node := range s.Nodes {
|
||||||
|
addressList = append(addressList, node.Address)
|
||||||
|
}
|
||||||
|
return addressList
|
||||||
|
}
|
||||||
|
|
||||||
|
// node finds the node according the node address
|
||||||
|
func (s *Subscription) node(address string) *Node {
|
||||||
|
for _, node := range s.Nodes {
|
||||||
|
if address == node.Address {
|
||||||
|
return &node
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fieldKey determines the field key out of node name or address
|
||||||
|
func (n *Node) fieldKey() string {
|
||||||
|
if n.Name != "" {
|
||||||
|
// return user defined node name as field key
|
||||||
|
return n.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
// fallback: field key is extracted from mandatory node address
|
||||||
|
i := strings.LastIndex(n.Address, "/")
|
||||||
|
if i > 0 {
|
||||||
|
// return last part of node address as field key
|
||||||
|
return n.Address[i+1:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// return full node address as field key
|
||||||
|
return n.Address
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,248 @@
|
||||||
|
package ctrlx_datalayer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSubscription_createRequest(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
subscription Subscription
|
||||||
|
id string
|
||||||
|
wantBody SubscriptionRequest
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Should_Return_Expected_Request",
|
||||||
|
subscription: Subscription{
|
||||||
|
Nodes: []Node{
|
||||||
|
{
|
||||||
|
Name: "node1",
|
||||||
|
Address: "path/to/node1",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "node2",
|
||||||
|
Address: "path/to/node2",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Tags: map[string]string{},
|
||||||
|
Measurement: "",
|
||||||
|
PublishInterval: config.Duration(2 * time.Second),
|
||||||
|
KeepaliveInterval: config.Duration(10 * time.Second),
|
||||||
|
ErrorInterval: config.Duration(20 * time.Second),
|
||||||
|
SamplingInterval: config.Duration(100 * time.Millisecond),
|
||||||
|
QueueSize: 100,
|
||||||
|
QueueBehaviour: "DiscardNewest",
|
||||||
|
DeadBandValue: 1.12345,
|
||||||
|
ValueChange: "StatusValueTimestamp",
|
||||||
|
OutputJSONString: true,
|
||||||
|
},
|
||||||
|
id: "sub_id",
|
||||||
|
wantBody: SubscriptionRequest{
|
||||||
|
Properties: SubscriptionProperties{
|
||||||
|
KeepaliveInterval: 10000,
|
||||||
|
Rules: []Rule{
|
||||||
|
{
|
||||||
|
"Sampling",
|
||||||
|
Sampling{
|
||||||
|
SamplingInterval: 100000,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"Queueing",
|
||||||
|
Queueing{
|
||||||
|
QueueSize: 100,
|
||||||
|
Behaviour: "DiscardNewest",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"DataChangeFilter",
|
||||||
|
DataChangeFilter{
|
||||||
|
DeadBandValue: 1.12345,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ChangeEvents",
|
||||||
|
ChangeEvents{
|
||||||
|
ValueChange: "StatusValueTimestamp",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ID: "sub_id",
|
||||||
|
PublishInterval: 2000,
|
||||||
|
ErrorInterval: 20000,
|
||||||
|
},
|
||||||
|
Nodes: []string{
|
||||||
|
"path/to/node1",
|
||||||
|
"path/to/node2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got := tt.subscription.createRequest(tt.id)
|
||||||
|
require.Equal(t, tt.wantBody, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscription_node(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
nodes []Node
|
||||||
|
address string
|
||||||
|
want *Node
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Should_Return_Node_Of_Given_Address",
|
||||||
|
nodes: []Node{
|
||||||
|
{
|
||||||
|
Name: "node1",
|
||||||
|
Address: "path/to/node1",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "node2",
|
||||||
|
Address: "path/to/node2",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "",
|
||||||
|
Address: "path/to/node3",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
address: "path/to/node3",
|
||||||
|
want: &Node{
|
||||||
|
Name: "",
|
||||||
|
Address: "path/to/node3",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Should_Return_Nil_If_Node_With_Given_Address_Not_Found",
|
||||||
|
nodes: []Node{
|
||||||
|
{
|
||||||
|
Name: "Node1",
|
||||||
|
Address: "path/to/node1",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Node2",
|
||||||
|
Address: "path/to/node2",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "",
|
||||||
|
Address: "path/to/node3",
|
||||||
|
Tags: map[string]string{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
address: "path/to/node4",
|
||||||
|
want: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
s := &Subscription{
|
||||||
|
Nodes: tt.nodes,
|
||||||
|
}
|
||||||
|
require.Equal(t, tt.want, s.node(tt.address))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSubscription_addressList(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
nodes []Node
|
||||||
|
want []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Should_Return_AddressArray_Of_All_Nodes",
|
||||||
|
nodes: []Node{
|
||||||
|
{
|
||||||
|
Address: "framework/metrics/system/memused-mb",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Address: "framework/metrics/system/memavailable-mb",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Address: "root",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Address: "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: []string{
|
||||||
|
"framework/metrics/system/memused-mb",
|
||||||
|
"framework/metrics/system/memavailable-mb",
|
||||||
|
"root",
|
||||||
|
"",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
s := &Subscription{
|
||||||
|
Nodes: tt.nodes,
|
||||||
|
}
|
||||||
|
require.Equal(t, tt.want, s.addressList())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNode_fieldKey(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
node Node
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Should_Return_Name_When_Name_Is_Not_Empty",
|
||||||
|
node: Node{
|
||||||
|
Name: "used",
|
||||||
|
Address: "framework/metrics/system/memused-mb",
|
||||||
|
},
|
||||||
|
want: "used",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Should_Return_Address_Base_When_Name_Is_Empty_And_Address_Contains_Full_Path",
|
||||||
|
node: Node{
|
||||||
|
Name: "",
|
||||||
|
Address: "framework/metrics/system/memused-mb",
|
||||||
|
},
|
||||||
|
want: "memused-mb",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Should_Return_Address_Base_Root_When_Name_Is_Empty_And_Address_Contains_Root_Path",
|
||||||
|
node: Node{
|
||||||
|
Name: "",
|
||||||
|
Address: "root",
|
||||||
|
},
|
||||||
|
want: "root",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Should_Return_Empty_When_Name_and_Address_Are_Empty",
|
||||||
|
node: Node{
|
||||||
|
Name: "",
|
||||||
|
Address: "",
|
||||||
|
},
|
||||||
|
want: "",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
require.Equal(t, tt.want, tt.node.fieldKey())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,256 @@
|
||||||
|
package ctrlx_datalayer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/boschrexroth/ctrlx-datalayer-golang/pkg/token"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
|
||||||
|
"github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
const path = "/automation/api/v2/events"
|
||||||
|
|
||||||
|
var multiEntries = false
|
||||||
|
var mux sync.Mutex
|
||||||
|
|
||||||
|
func setMultiEntries(m bool) {
|
||||||
|
mux.Lock()
|
||||||
|
defer mux.Unlock()
|
||||||
|
multiEntries = m
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMultiEntries() bool {
|
||||||
|
mux.Lock()
|
||||||
|
defer mux.Unlock()
|
||||||
|
return multiEntries
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCtrlXCreateSubscriptionBasic(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusCreated)
|
||||||
|
_, err := w.Write([]byte("201 created"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
subs := make([]Subscription, 0)
|
||||||
|
subs = append(subs, Subscription{
|
||||||
|
index: 0,
|
||||||
|
Nodes: []Node{
|
||||||
|
{Name: "counter", Address: "plc/app/Application/sym/PLC_PRG/counter"},
|
||||||
|
{Name: "counterReverse", Address: "plc/app/Application/sym/PLC_PRG/counterReverse"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
s := &CtrlXDataLayer{
|
||||||
|
connection: &http.Client{},
|
||||||
|
url: server.URL,
|
||||||
|
Username: config.NewSecret([]byte("user")),
|
||||||
|
Password: config.NewSecret([]byte("password")),
|
||||||
|
tokenManager: token.TokenManager{Url: server.URL, Username: "user", Password: "password", Connection: &http.Client{}},
|
||||||
|
Subscription: subs,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
|
||||||
|
subID, err := s.createSubscription(&subs[0])
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, subID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCtrlXCreateSubscriptionDriven(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
res string
|
||||||
|
status int
|
||||||
|
wantError bool
|
||||||
|
}{
|
||||||
|
{res: "{\"status\":200}", status: 200, wantError: false},
|
||||||
|
{res: "{\"status\":401}", status: 401, wantError: true},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.res, func(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(test.status)
|
||||||
|
_, err := w.Write([]byte(test.res))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
subs := make([]Subscription, 0)
|
||||||
|
subs = append(subs, Subscription{
|
||||||
|
Nodes: []Node{
|
||||||
|
{Name: "counter", Address: "plc/app/Application/sym/PLC_PRG/counter"},
|
||||||
|
{Name: "counterReverse", Address: "plc/app/Application/sym/PLC_PRG/counterReverse"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
s := &CtrlXDataLayer{
|
||||||
|
connection: &http.Client{},
|
||||||
|
url: server.URL,
|
||||||
|
Username: config.NewSecret([]byte("user")),
|
||||||
|
Password: config.NewSecret([]byte("password")),
|
||||||
|
Subscription: subs,
|
||||||
|
tokenManager: token.TokenManager{Url: server.URL, Username: "user", Password: "password", Connection: &http.Client{}},
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
subID, err := s.createSubscription(&subs[0])
|
||||||
|
|
||||||
|
if test.wantError {
|
||||||
|
require.EqualError(t, err, "failed to create sse subscription 0, status: 401 Unauthorized")
|
||||||
|
require.Empty(t, subID)
|
||||||
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, subID)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newServer(t *testing.T) *httptest.Server {
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
// Handle request to fetch token
|
||||||
|
mux.HandleFunc("/identity-manager/api/v2/auth/token", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, err := w.Write([]byte("{\"access_token\": \"eyJhbGciOiJIU.xxx.xxx\", \"token_type\":\"Bearer\"}"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}))
|
||||||
|
// Handle request to validate token
|
||||||
|
mux.HandleFunc("/identity-manager/api/v2/auth/token/validity", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, err := w.Write([]byte("{\"valid\": \"true\"}"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}))
|
||||||
|
// Handle request to create subscription
|
||||||
|
mux.HandleFunc(path, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusCreated)
|
||||||
|
_, err := w.Write([]byte("201 created"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}))
|
||||||
|
// Handle request to fetch sse data
|
||||||
|
mux.HandleFunc(path+"/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method == http.MethodGet {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, err := w.Write([]byte("event: update\n"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = w.Write([]byte("id: 12345\n"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
if getMultiEntries() {
|
||||||
|
data := "data: {\n"
|
||||||
|
_, err = w.Write([]byte(data))
|
||||||
|
require.NoError(t, err)
|
||||||
|
data = "data: \"node\":\"plc/app/Application/sym/PLC_PRG/counter\", \"timestamp\":132669450604571037,\"type\":\"double\",\"value\":44.0\n"
|
||||||
|
_, err = w.Write([]byte(data))
|
||||||
|
require.NoError(t, err)
|
||||||
|
data = "data: }\n"
|
||||||
|
_, err = w.Write([]byte(data))
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
data := "data: {\"node\":\"plc/app/Application/sym/PLC_PRG/counter\", \"timestamp\":132669450604571037,\"type\":\"double\",\"value\":43.0}\n"
|
||||||
|
_, err = w.Write([]byte(data))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
_, err = w.Write([]byte("\n"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
return httptest.NewServer(mux)
|
||||||
|
}
|
||||||
|
|
||||||
|
func cleanup(server *httptest.Server) {
|
||||||
|
server.CloseClientConnections()
|
||||||
|
server.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func initRunner(t *testing.T) (*CtrlXDataLayer, *httptest.Server) {
|
||||||
|
server := newServer(t)
|
||||||
|
|
||||||
|
subs := make([]Subscription, 0)
|
||||||
|
subs = append(subs, Subscription{
|
||||||
|
Measurement: "ctrlx",
|
||||||
|
Nodes: []Node{
|
||||||
|
{Name: "counter", Address: "plc/app/Application/sym/PLC_PRG/counter"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
s := &CtrlXDataLayer{
|
||||||
|
connection: &http.Client{},
|
||||||
|
url: server.URL,
|
||||||
|
Username: config.NewSecret([]byte("user")),
|
||||||
|
Password: config.NewSecret([]byte("password")),
|
||||||
|
HTTPClientConfig: httpconfig.HTTPClientConfig{
|
||||||
|
ClientConfig: tls.ClientConfig{
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Subscription: subs,
|
||||||
|
tokenManager: token.TokenManager{Url: server.URL, Username: "user", Password: "password", Connection: &http.Client{}},
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
return s, server
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCtrlXMetricsField(t *testing.T) {
|
||||||
|
const measurement = "ctrlx"
|
||||||
|
const fieldName = "counter"
|
||||||
|
|
||||||
|
s, server := initRunner(t)
|
||||||
|
defer cleanup(server)
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, acc.GatherError(s.Start))
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
if v, found := acc.FloatField(measurement, fieldName); found {
|
||||||
|
require.Equal(t, 43.0, v)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}, time.Second*10, time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCtrlXMetricsMulti(t *testing.T) {
|
||||||
|
const measurement = "ctrlx"
|
||||||
|
const fieldName = "counter"
|
||||||
|
|
||||||
|
setMultiEntries(true)
|
||||||
|
s, server := initRunner(t)
|
||||||
|
defer cleanup(server)
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
|
||||||
|
require.NoError(t, acc.GatherError(s.Start))
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
if v, found := acc.FloatField(measurement, fieldName); found {
|
||||||
|
require.Equal(t, 44.0, v)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}, time.Second*10, time.Second)
|
||||||
|
|
||||||
|
setMultiEntries(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCtrlXCreateSseClient(t *testing.T) {
|
||||||
|
sub := Subscription{
|
||||||
|
Measurement: "ctrlx",
|
||||||
|
Nodes: []Node{
|
||||||
|
{Name: "counter", Address: "plc/app/Application/sym/PLC_PRG/counter"},
|
||||||
|
{Name: "counterReverse", Address: "plc/app/Application/sym/PLC_PRG/counterReverse"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
s, server := initRunner(t)
|
||||||
|
defer cleanup(server)
|
||||||
|
client, err := s.createSubscriptionAndSseClient(&sub)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, client)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConvertTimestamp2UnixTime(t *testing.T) {
|
||||||
|
expected := time.Date(2022, 02, 14, 14, 22, 38, 333552400, time.UTC)
|
||||||
|
actual := convertTimestamp2UnixTime(132893221583335524)
|
||||||
|
require.EqualValues(t, expected.UnixNano(), actual.UnixNano())
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,110 @@
|
||||||
|
# A ctrlX Data Layer server sent event input plugin
|
||||||
|
[[inputs.ctrlx_datalayer]]
|
||||||
|
## Hostname or IP address of the ctrlX CORE Data Layer server
|
||||||
|
## example: server = "localhost" # Telegraf is running directly on the device
|
||||||
|
## server = "192.168.1.1" # Connect to ctrlX CORE remote via IP
|
||||||
|
## server = "host.example.com" # Connect to ctrlX CORE remote via hostname
|
||||||
|
## server = "10.0.2.2:8443" # Connect to ctrlX CORE Virtual from development environment
|
||||||
|
server = "localhost"
|
||||||
|
|
||||||
|
## Authentication credentials
|
||||||
|
username = "boschrexroth"
|
||||||
|
password = "boschrexroth"
|
||||||
|
|
||||||
|
## Use TLS but skip chain & host verification
|
||||||
|
# insecure_skip_verify = false
|
||||||
|
|
||||||
|
## Timeout for HTTP requests. (default: "10s")
|
||||||
|
# timeout = "10s"
|
||||||
|
|
||||||
|
|
||||||
|
## Create a ctrlX Data Layer subscription.
|
||||||
|
## It is possible to define multiple subscriptions per host. Each subscription can have its own
|
||||||
|
## sampling properties and a list of nodes to subscribe to.
|
||||||
|
## All subscriptions share the same credentials.
|
||||||
|
[[inputs.ctrlx_datalayer.subscription]]
|
||||||
|
## The name of the measurement. (default: "ctrlx")
|
||||||
|
measurement = "memory"
|
||||||
|
|
||||||
|
## Configure the ctrlX Data Layer nodes which should be subscribed.
|
||||||
|
## address - node address in ctrlX Data Layer (mandatory)
|
||||||
|
## name - field name to use in the output (optional, default: base name of address)
|
||||||
|
## tags - extra node tags to be added to the output metric (optional)
|
||||||
|
## Note:
|
||||||
|
## Use either the inline notation or the bracketed notation, not both.
|
||||||
|
## The tags property is only supported in bracketed notation due to toml parser restrictions
|
||||||
|
## Examples:
|
||||||
|
## Inline notation
|
||||||
|
nodes=[
|
||||||
|
{name="available", address="framework/metrics/system/memavailable-mb"},
|
||||||
|
{name="used", address="framework/metrics/system/memused-mb"},
|
||||||
|
]
|
||||||
|
## Bracketed notation
|
||||||
|
# [[inputs.ctrlx_datalayer.subscription.nodes]]
|
||||||
|
# name ="available"
|
||||||
|
# address="framework/metrics/system/memavailable-mb"
|
||||||
|
# ## Define extra tags related to node to be added to the output metric (optional)
|
||||||
|
# [inputs.ctrlx_datalayer.subscription.nodes.tags]
|
||||||
|
# node_tag1="node_tag1"
|
||||||
|
# node_tag2="node_tag2"
|
||||||
|
# [[inputs.ctrlx_datalayer.subscription.nodes]]
|
||||||
|
# name ="used"
|
||||||
|
# address="framework/metrics/system/memused-mb"
|
||||||
|
|
||||||
|
## The switch "output_json_string" enables output of the measurement as json.
|
||||||
|
## That way it can be used in in a subsequent processor plugin, e.g. "Starlark Processor Plugin".
|
||||||
|
# output_json_string = false
|
||||||
|
|
||||||
|
## Define extra tags related to subscription to be added to the output metric (optional)
|
||||||
|
# [inputs.ctrlx_datalayer.subscription.tags]
|
||||||
|
# subscription_tag1 = "subscription_tag1"
|
||||||
|
# subscription_tag2 = "subscription_tag2"
|
||||||
|
|
||||||
|
## The interval in which messages shall be sent by the ctrlX Data Layer to this plugin. (default: 1s)
|
||||||
|
## Higher values reduce load on network by queuing samples on server side and sending as a single TCP packet.
|
||||||
|
# publish_interval = "1s"
|
||||||
|
|
||||||
|
## The interval a "keepalive" message is sent if no change of data occurs. (default: 60s)
|
||||||
|
## Only used internally to detect broken network connections.
|
||||||
|
# keep_alive_interval = "60s"
|
||||||
|
|
||||||
|
## The interval an "error" message is sent if an error was received from a node. (default: 10s)
|
||||||
|
## Higher values reduce load on output target and network in case of errors by limiting frequency of error messages.
|
||||||
|
# error_interval = "10s"
|
||||||
|
|
||||||
|
## The interval that defines the fastest rate at which the node values should be sampled and values captured. (default: 1s)
|
||||||
|
## The sampling frequency should be adjusted to the dynamics of the signal to be sampled.
|
||||||
|
## Higher sampling frequence increases load on ctrlX Data Layer.
|
||||||
|
## The sampling frequency can be higher, than the publish interval. Captured samples are put in a queue and sent in publish interval.
|
||||||
|
## Note: The minimum sampling interval can be overruled by a global setting in the ctrlX Data Layer configuration ('datalayer/subscriptions/settings').
|
||||||
|
# sampling_interval = "1s"
|
||||||
|
|
||||||
|
## The requested size of the node value queue. (default: 10)
|
||||||
|
## Relevant if more values are captured than can be sent.
|
||||||
|
# queue_size = 10
|
||||||
|
|
||||||
|
## The behaviour of the queue if it is full. (default: "DiscardOldest")
|
||||||
|
## Possible values:
|
||||||
|
## - "DiscardOldest"
|
||||||
|
## The oldest value gets deleted from the queue when it is full.
|
||||||
|
## - "DiscardNewest"
|
||||||
|
## The newest value gets deleted from the queue when it is full.
|
||||||
|
# queue_behaviour = "DiscardOldest"
|
||||||
|
|
||||||
|
## The filter when a new value will be sampled. (default: 0.0)
|
||||||
|
## Calculation rule: If (abs(lastCapturedValue - newValue) > dead_band_value) capture(newValue).
|
||||||
|
# dead_band_value = 0.0
|
||||||
|
|
||||||
|
## The conditions on which a sample should be captured and thus will be sent as a message. (default: "StatusValue")
|
||||||
|
## Possible values:
|
||||||
|
## - "Status"
|
||||||
|
## Capture the value only, when the state of the node changes from or to error state. Value changes are ignored.
|
||||||
|
## - "StatusValue"
|
||||||
|
## Capture when the value changes or the node changes from or to error state.
|
||||||
|
## See also 'dead_band_value' for what is considered as a value change.
|
||||||
|
## - "StatusValueTimestamp":
|
||||||
|
## Capture even if the value is the same, but the timestamp of the value is newer.
|
||||||
|
## Note: This might lead to high load on the network because every sample will be sent as a message
|
||||||
|
## even if the value of the node did not change.
|
||||||
|
# value_change = "StatusValue"
|
||||||
|
|
||||||
Loading…
Reference in New Issue