From 474aff588e68b6fd583605661bfc92383e0ba977 Mon Sep 17 00:00:00 2001 From: reindlt <73192028+reindlt@users.noreply.github.com> Date: Tue, 10 Oct 2023 13:06:17 +0200 Subject: [PATCH] feat(inputs.opcua_listener): Add monitoring params (#13923) Co-authored-by: Tobias Reindl --- plugins/common/opcua/input/input_client.go | 62 +++- plugins/inputs/opcua_listener/README.md | 76 +++- .../opcua_listener/opcua_listener_test.go | 327 ++++++++++++++++++ plugins/inputs/opcua_listener/sample.conf | 66 +++- .../inputs/opcua_listener/subscribe_client.go | 49 +++ 5 files changed, 551 insertions(+), 29 deletions(-) diff --git a/plugins/common/opcua/input/input_client.go b/plugins/common/opcua/input/input_client.go index 29eb3c424..a33b31ea3 100644 --- a/plugins/common/opcua/input/input_client.go +++ b/plugins/common/opcua/input/input_client.go @@ -12,21 +12,51 @@ import ( "github.com/gopcua/opcua/ua" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/common/opcua" ) +type Trigger string + +const ( + Status Trigger = "Status" + StatusValue Trigger = "StatusValue" + StatusValueTimestamp Trigger = "StatusValueTimestamp" +) + +type DeadbandType string + +const ( + Absolute DeadbandType = "Absolute" + Percent DeadbandType = "Percent" +) + +type DataChangeFilter struct { + Trigger Trigger `toml:"trigger"` + DeadbandType DeadbandType `toml:"deadband_type"` + DeadbandValue *float64 `toml:"deadband_value"` +} + +type MonitoringParameters struct { + SamplingInterval config.Duration `toml:"sampling_interval"` + QueueSize *uint32 `toml:"queue_size"` + DiscardOldest *bool `toml:"discard_oldest"` + DataChangeFilter *DataChangeFilter `toml:"data_change_filter"` +} + // NodeSettings describes how to map from a OPC UA node to a Metric type NodeSettings struct { - FieldName string `toml:"name"` - Namespace string `toml:"namespace"` - IdentifierType string `toml:"identifier_type"` - Identifier string `toml:"identifier"` - DataType string `toml:"data_type" deprecated:"1.17.0;option is ignored"` - Description string `toml:"description" deprecated:"1.17.0;option is ignored"` - TagsSlice [][]string `toml:"tags" deprecated:"1.25.0;use 'default_tags' instead"` - DefaultTags map[string]string `toml:"default_tags"` + FieldName string `toml:"name"` + Namespace string `toml:"namespace"` + IdentifierType string `toml:"identifier_type"` + Identifier string `toml:"identifier"` + DataType string `toml:"data_type" deprecated:"1.17.0;option is ignored"` + Description string `toml:"description" deprecated:"1.17.0;option is ignored"` + TagsSlice [][]string `toml:"tags" deprecated:"1.25.0;use 'default_tags' instead"` + DefaultTags map[string]string `toml:"default_tags"` + MonitoringParams MonitoringParameters `toml:"monitoring_params"` } // NodeID returns the OPC UA node id @@ -36,12 +66,13 @@ func (tag *NodeSettings) NodeID() string { // NodeGroupSettings describes a mapping of group of nodes to Metrics type NodeGroupSettings struct { - MetricName string `toml:"name"` // Overrides plugin's setting - Namespace string `toml:"namespace"` // Can be overridden by node setting - IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting - Nodes []NodeSettings `toml:"nodes"` - TagsSlice [][]string `toml:"tags" deprecated:"1.26.0;use default_tags"` - DefaultTags map[string]string `toml:"default_tags"` + MetricName string `toml:"name"` // Overrides plugin's setting + Namespace string `toml:"namespace"` // Can be overridden by node setting + IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting + Nodes []NodeSettings `toml:"nodes"` + TagsSlice [][]string `toml:"tags" deprecated:"1.26.0;use default_tags"` + DefaultTags map[string]string `toml:"default_tags"` + SamplingInterval config.Duration `toml:"sampling_interval"` // Can be overridden by monitoring parameters } type TimestampSource string @@ -318,6 +349,9 @@ func (o *OpcUAInputClient) InitNodeMetricMapping() error { if node.IdentifierType == "" { node.IdentifierType = group.IdentifierType } + if node.MonitoringParams.SamplingInterval == 0 { + node.MonitoringParams.SamplingInterval = group.SamplingInterval + } nmm, err := NewNodeMetricMapping(group.MetricName, node, groupTags) if err != nil { diff --git a/plugins/inputs/opcua_listener/README.md b/plugins/inputs/opcua_listener/README.md index 64730433c..54be90daf 100644 --- a/plugins/inputs/opcua_listener/README.md +++ b/plugins/inputs/opcua_listener/README.md @@ -97,13 +97,44 @@ to use them. ## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque) ## identifier - OPC UA ID (tag as shown in opcua browser) ## default_tags - extra tags to be added to the output metric (optional) + ## monitoring_params - additional settings for the monitored node (optional) + ## + ## Monitoring parameters + ## sampling_interval - interval at which the server should check for data + ## changes (default: 0s) + ## queue_size - size of the notification queue (default: 10) + ## discard_oldest - how notifications should be handled in case of full + ## notification queues, possible values: + ## true: oldest value added to queue gets replaced with new + ## (default) + ## false: last value added to queue gets replaced with new + ## data_change_filter - defines the condition under which a notification should + ## be reported + ## + ## Data change filter + ## trigger - specify the conditions under which a data change notification + ## should be reported, possible values: + ## "Status": only report notifications if the status changes + ## (default if parameter is omitted) + ## "StatusValue": report notifications if either status or value + ## changes + ## "StatusValueTimestamp": report notifications if either status, + ## value or timestamp changes + ## deadband_type - type of the deadband filter to be applied, possible values: + ## "Absolute": absolute change in a data value to report a notification + ## "Percent": works only with nodes that have an EURange property set + ## and is defined as: send notification if + ## (last value - current value) > + ## (deadband_value/100.0) * ((high–low) of EURange) + ## deadband_value - value to deadband_type, must be a float value, no filter is set + ## for negative values ## ## Use either the inline notation or the bracketed notation, not both. # ## Inline notation (default_tags not supported yet) # nodes = [ - # {name="", namespace="", identifier_type="", identifier=""}, - # {name="", namespace="", identifier_type="", identifier=""}, + # {name="node1", namespace="", identifier_type="", identifier="",} + # {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}}, # ] # ## Bracketed notation @@ -120,6 +151,16 @@ to use them. # identifier_type = "" # identifier = "" # + # [inputs.opcua_listener.nodes.monitoring_params] + # sampling_interval = "0s" + # queue_size = 10 + # discard_oldest = true + # + # [inputs.opcua_listener.nodes.monitoring_params.data_change_filter] + # trigger = "Status" + # deadband_type = "Absolute" + # deadband_value = 0.0 + # ## Node Group ## Sets defaults so they aren't required in every node. ## Default values can be set for: @@ -127,6 +168,7 @@ to use them. ## * OPC UA namespace ## * Identifier ## * Default tags + ## * Sampling interval ## ## Multiple node groups are allowed #[[inputs.opcua_listener.group]] @@ -147,13 +189,17 @@ to use them. ## example: default_tags = { tag1 = "value1" } # default_tags = {} # + ## Group default sampling interval. If a node in the group doesn't set its + ## sampling interval, this is used. + # sampling_interval = "0s" + # ## Node ID Configuration. Array of nodes with the same settings as above. ## Use either the inline notation or the bracketed notation, not both. # ## Inline notation (default_tags not supported yet) # nodes = [ - # {name="node1", namespace="", identifier_type="", identifier=""}, - # {name="node2", namespace="", identifier_type="", identifier=""}, + # {name="node1", namespace="", identifier_type="", identifier="",} + # {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}}, #] # ## Bracketed notation @@ -169,7 +215,17 @@ to use them. # namespace = "" # identifier_type = "" # identifier = "" - + # + # [inputs.opcua_listener.group.nodes.monitoring_params] + # sampling_interval = "0s" + # queue_size = 10 + # discard_oldest = true + # + # [inputs.opcua_listener.group.nodes.monitoring_params.data_change_filter] + # trigger = "Status" + # deadband_type = "Absolute" + # deadband_value = 0.0 + # ## Enable workarounds required by some devices to work correctly # [inputs.opcua_listener.workarounds] ## Set additional valid status codes, StatusOK (0x0) is always considered valid @@ -201,11 +257,11 @@ opcua,id=ns\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000 ## Group Configuration -Groups can set default values for the namespace, identifier type, and -tags settings. The default values apply to all the nodes in the -group. If a default is set, a node may omit the setting altogether. -This simplifies node configuration, especially when many nodes share -the same namespace or identifier type. +Groups can set default values for the namespace, identifier type, tags +settings and sampling interval. The default values apply to all the +nodes in the group. If a default is set, a node may omit the setting +altogether. This simplifies node configuration, especially when many +nodes share the same namespace or identifier type. The output metric will include tags set in the group and the node. If a tag with the same name is set in both places, the tag value from the diff --git a/plugins/inputs/opcua_listener/opcua_listener_test.go b/plugins/inputs/opcua_listener/opcua_listener_test.go index 96e8da413..06807f5e8 100644 --- a/plugins/inputs/opcua_listener/opcua_listener_test.go +++ b/plugins/inputs/opcua_listener/opcua_listener_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/docker/go-connections/nat" + "github.com/gopcua/opcua/ua" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" @@ -247,3 +248,329 @@ additional_valid_status_codes = ["0xC0"] }, o.SubscribeClientConfig.Groups) require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.SubscribeClientConfig.Workarounds) } + +func TestSubscribeClientConfigWithMonitoringParams(t *testing.T) { + toml := ` +[[inputs.opcua_listener]] +name = "localhost" +endpoint = "opc.tcp://localhost:4840" +subscription_interval = "200ms" + +[[inputs.opcua_listener.group]] +name = "foo" +namespace = "3" +identifier_type = "i" +tags = [["tag1", "val1"], ["tag2", "val2"]] +nodes = [{name="name3", identifier="3000", tags=[["tag3", "val3"]]}] + +[inputs.opcua_listener.group.nodes.monitoring_params] +sampling_interval = "50ms" +queue_size = 10 +discard_oldest = true + +[inputs.opcua_listener.group.nodes.monitoring_params.data_change_filter] +trigger = "StatusValue" +deadband_type = "Absolute" +deadband_value = 100.0 +` + + c := config.NewConfig() + err := c.LoadConfigData([]byte(toml)) + require.NoError(t, err) + + require.Len(t, c.Inputs, 1) + + o, ok := c.Inputs[0].Input.(*OpcUaListener) + require.True(t, ok) + + queueSize := uint32(10) + discardOldest := true + deadbandValue := 100.0 + require.Equal(t, []input.NodeGroupSettings{ + { + MetricName: "foo", + Namespace: "3", + IdentifierType: "i", + TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}}, + Nodes: []input.NodeSettings{{ + FieldName: "name3", + Identifier: "3000", + TagsSlice: [][]string{{"tag3", "val3"}}, + MonitoringParams: input.MonitoringParameters{ + SamplingInterval: 50000000, + QueueSize: &queueSize, + DiscardOldest: &discardOldest, + DataChangeFilter: &input.DataChangeFilter{ + Trigger: "StatusValue", + DeadbandType: "Absolute", + DeadbandValue: &deadbandValue, + }, + }, + }}, + }, + }, o.SubscribeClientConfig.Groups) +} + +func TestSubscribeClientConfigInvalidTrigger(t *testing.T) { + subscribeConfig := SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4840", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{ + FieldName: "foo", + Namespace: "3", + Identifier: "1", + IdentifierType: "i", + MonitoringParams: input.MonitoringParameters{ + DataChangeFilter: &input.DataChangeFilter{ + Trigger: "not_valid", + }, + }, + }) + + _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "trigger 'not_valid' not supported, node 'ns=3;i=1'") +} + +func TestSubscribeClientConfigMissingTrigger(t *testing.T) { + subscribeConfig := SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4840", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{ + FieldName: "foo", + Namespace: "3", + Identifier: "1", + IdentifierType: "i", + MonitoringParams: input.MonitoringParameters{ + DataChangeFilter: &input.DataChangeFilter{ + DeadbandType: "Absolute", + }, + }, + }) + + _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "trigger '' not supported, node 'ns=3;i=1'") +} + +func TestSubscribeClientConfigInvalidDeadbandType(t *testing.T) { + subscribeConfig := SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4840", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{ + FieldName: "foo", + Namespace: "3", + Identifier: "1", + IdentifierType: "i", + MonitoringParams: input.MonitoringParameters{ + DataChangeFilter: &input.DataChangeFilter{ + Trigger: "Status", + DeadbandType: "not_valid", + }, + }, + }) + + _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "deadband_type 'not_valid' not supported, node 'ns=3;i=1'") +} + +func TestSubscribeClientConfigMissingDeadbandType(t *testing.T) { + subscribeConfig := SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4840", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{ + FieldName: "foo", + Namespace: "3", + Identifier: "1", + IdentifierType: "i", + MonitoringParams: input.MonitoringParameters{ + DataChangeFilter: &input.DataChangeFilter{ + Trigger: "Status", + }, + }, + }) + + _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "deadband_type '' not supported, node 'ns=3;i=1'") +} + +func TestSubscribeClientConfigInvalidDeadbandValue(t *testing.T) { + subscribeConfig := SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4840", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + deadbandValue := -1.0 + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{ + FieldName: "foo", + Namespace: "3", + Identifier: "1", + IdentifierType: "i", + MonitoringParams: input.MonitoringParameters{ + DataChangeFilter: &input.DataChangeFilter{ + Trigger: "Status", + DeadbandType: "Absolute", + DeadbandValue: &deadbandValue, + }, + }, + }) + + _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "negative deadband_value not supported, node 'ns=3;i=1'") +} + +func TestSubscribeClientConfigMissingDeadbandValue(t *testing.T) { + subscribeConfig := SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4840", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{ + FieldName: "foo", + Namespace: "3", + Identifier: "1", + IdentifierType: "i", + MonitoringParams: input.MonitoringParameters{ + DataChangeFilter: &input.DataChangeFilter{ + Trigger: "Status", + DeadbandType: "Absolute", + }, + }, + }) + + _, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "deadband_value was not set, node 'ns=3;i=1'") +} + +func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) { + subscribeConfig := SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4840", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + + var queueSize uint32 = 10 + discardOldest := true + deadbandValue := 10.0 + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{ + FieldName: "foo", + Namespace: "3", + Identifier: "1", + IdentifierType: "i", + MonitoringParams: input.MonitoringParameters{ + SamplingInterval: 50000000, + QueueSize: &queueSize, + DiscardOldest: &discardOldest, + DataChangeFilter: &input.DataChangeFilter{ + Trigger: "Status", + DeadbandType: "Absolute", + DeadbandValue: &deadbandValue, + }, + }, + }) + + subClient, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + require.NoError(t, err) + require.Equal(t, &ua.MonitoringParameters{ + SamplingInterval: 50, + QueueSize: queueSize, + DiscardOldest: discardOldest, + Filter: ua.NewExtensionObject( + &ua.DataChangeFilter{ + Trigger: ua.DataChangeTriggerStatus, + DeadbandType: uint32(ua.DeadbandTypeAbsolute), + DeadbandValue: deadbandValue, + }, + ), + }, subClient.monitoredItemsReqs[0].RequestedParameters) +} diff --git a/plugins/inputs/opcua_listener/sample.conf b/plugins/inputs/opcua_listener/sample.conf index f6b9409f2..788502937 100644 --- a/plugins/inputs/opcua_listener/sample.conf +++ b/plugins/inputs/opcua_listener/sample.conf @@ -58,13 +58,44 @@ ## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque) ## identifier - OPC UA ID (tag as shown in opcua browser) ## default_tags - extra tags to be added to the output metric (optional) + ## monitoring_params - additional settings for the monitored node (optional) + ## + ## Monitoring parameters + ## sampling_interval - interval at which the server should check for data + ## changes (default: 0s) + ## queue_size - size of the notification queue (default: 10) + ## discard_oldest - how notifications should be handled in case of full + ## notification queues, possible values: + ## true: oldest value added to queue gets replaced with new + ## (default) + ## false: last value added to queue gets replaced with new + ## data_change_filter - defines the condition under which a notification should + ## be reported + ## + ## Data change filter + ## trigger - specify the conditions under which a data change notification + ## should be reported, possible values: + ## "Status": only report notifications if the status changes + ## (default if parameter is omitted) + ## "StatusValue": report notifications if either status or value + ## changes + ## "StatusValueTimestamp": report notifications if either status, + ## value or timestamp changes + ## deadband_type - type of the deadband filter to be applied, possible values: + ## "Absolute": absolute change in a data value to report a notification + ## "Percent": works only with nodes that have an EURange property set + ## and is defined as: send notification if + ## (last value - current value) > + ## (deadband_value/100.0) * ((high–low) of EURange) + ## deadband_value - value to deadband_type, must be a float value, no filter is set + ## for negative values ## ## Use either the inline notation or the bracketed notation, not both. # ## Inline notation (default_tags not supported yet) # nodes = [ - # {name="", namespace="", identifier_type="", identifier=""}, - # {name="", namespace="", identifier_type="", identifier=""}, + # {name="node1", namespace="", identifier_type="", identifier="",} + # {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}}, # ] # ## Bracketed notation @@ -81,6 +112,16 @@ # identifier_type = "" # identifier = "" # + # [inputs.opcua_listener.nodes.monitoring_params] + # sampling_interval = "0s" + # queue_size = 10 + # discard_oldest = true + # + # [inputs.opcua_listener.nodes.monitoring_params.data_change_filter] + # trigger = "Status" + # deadband_type = "Absolute" + # deadband_value = 0.0 + # ## Node Group ## Sets defaults so they aren't required in every node. ## Default values can be set for: @@ -88,6 +129,7 @@ ## * OPC UA namespace ## * Identifier ## * Default tags + ## * Sampling interval ## ## Multiple node groups are allowed #[[inputs.opcua_listener.group]] @@ -108,13 +150,17 @@ ## example: default_tags = { tag1 = "value1" } # default_tags = {} # + ## Group default sampling interval. If a node in the group doesn't set its + ## sampling interval, this is used. + # sampling_interval = "0s" + # ## Node ID Configuration. Array of nodes with the same settings as above. ## Use either the inline notation or the bracketed notation, not both. # ## Inline notation (default_tags not supported yet) # nodes = [ - # {name="node1", namespace="", identifier_type="", identifier=""}, - # {name="node2", namespace="", identifier_type="", identifier=""}, + # {name="node1", namespace="", identifier_type="", identifier="",} + # {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}}, #] # ## Bracketed notation @@ -130,7 +176,17 @@ # namespace = "" # identifier_type = "" # identifier = "" - + # + # [inputs.opcua_listener.group.nodes.monitoring_params] + # sampling_interval = "0s" + # queue_size = 10 + # discard_oldest = true + # + # [inputs.opcua_listener.group.nodes.monitoring_params.data_change_filter] + # trigger = "Status" + # deadband_type = "Absolute" + # deadband_value = 0.0 + # ## Enable workarounds required by some devices to work correctly # [inputs.opcua_listener.workarounds] ## Set additional valid status codes, StatusOK (0x0) is always considered valid diff --git a/plugins/inputs/opcua_listener/subscribe_client.go b/plugins/inputs/opcua_listener/subscribe_client.go index 872dffc08..20d479caa 100644 --- a/plugins/inputs/opcua_listener/subscribe_client.go +++ b/plugins/inputs/opcua_listener/subscribe_client.go @@ -32,6 +32,52 @@ type SubscribeClient struct { processingCancel context.CancelFunc } +func checkDataChangeFilterParameters(params *input.DataChangeFilter) error { + switch { + case params.Trigger != input.Status && + params.Trigger != input.StatusValue && + params.Trigger != input.StatusValueTimestamp: + return fmt.Errorf("trigger '%s' not supported", params.Trigger) + case params.DeadbandType != input.Absolute && + params.DeadbandType != input.Percent: + return fmt.Errorf("deadband_type '%s' not supported", params.DeadbandType) + case params.DeadbandValue == nil: + return fmt.Errorf("deadband_value was not set") + case *params.DeadbandValue < 0: + return fmt.Errorf("negative deadband_value not supported") + default: + return nil + } +} + +func assignConfigValuesToRequest(req *ua.MonitoredItemCreateRequest, monParams *input.MonitoringParameters) error { + req.RequestedParameters.SamplingInterval = float64(time.Duration(monParams.SamplingInterval) / time.Millisecond) + + if monParams.QueueSize != nil { + req.RequestedParameters.QueueSize = *monParams.QueueSize + } + + if monParams.DiscardOldest != nil { + req.RequestedParameters.DiscardOldest = *monParams.DiscardOldest + } + + if monParams.DataChangeFilter != nil { + if err := checkDataChangeFilterParameters(monParams.DataChangeFilter); err != nil { + return fmt.Errorf(err.Error()+", node '%s'", req.ItemToMonitor.NodeID) + } + + req.RequestedParameters.Filter = ua.NewExtensionObject( + &ua.DataChangeFilter{ + Trigger: ua.DataChangeTriggerFromString(string(monParams.DataChangeFilter.Trigger)), + DeadbandType: uint32(ua.DeadbandTypeFromString(string(monParams.DataChangeFilter.DeadbandType))), + DeadbandValue: *monParams.DataChangeFilter.DeadbandValue, + }, + ) + } + + return nil +} + func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*SubscribeClient, error) { client, err := sc.InputClientConfig.CreateInputClient(log) if err != nil { @@ -57,6 +103,9 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su for i, nodeID := range client.NodeIDs { // The node id index (i) is used as the handle for the monitored item req := opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, uint32(i)) + if err := assignConfigValuesToRequest(req, &client.NodeMetricMapping[i].Tag.MonitoringParams); err != nil { + return nil, err + } subClient.monitoredItemsReqs[i] = req }