feat(inputs.opcua_listener): Add monitoring params (#13923)

Co-authored-by: Tobias Reindl <tobias.reindl@s7-rail.com>
This commit is contained in:
reindlt 2023-10-10 13:06:17 +02:00 committed by GitHub
parent 963616540d
commit 474aff588e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 551 additions and 29 deletions

View File

@ -12,21 +12,51 @@ import (
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/opcua"
)
type Trigger string
const (
Status Trigger = "Status"
StatusValue Trigger = "StatusValue"
StatusValueTimestamp Trigger = "StatusValueTimestamp"
)
type DeadbandType string
const (
Absolute DeadbandType = "Absolute"
Percent DeadbandType = "Percent"
)
type DataChangeFilter struct {
Trigger Trigger `toml:"trigger"`
DeadbandType DeadbandType `toml:"deadband_type"`
DeadbandValue *float64 `toml:"deadband_value"`
}
type MonitoringParameters struct {
SamplingInterval config.Duration `toml:"sampling_interval"`
QueueSize *uint32 `toml:"queue_size"`
DiscardOldest *bool `toml:"discard_oldest"`
DataChangeFilter *DataChangeFilter `toml:"data_change_filter"`
}
// NodeSettings describes how to map from a OPC UA node to a Metric
type NodeSettings struct {
FieldName string `toml:"name"`
Namespace string `toml:"namespace"`
IdentifierType string `toml:"identifier_type"`
Identifier string `toml:"identifier"`
DataType string `toml:"data_type" deprecated:"1.17.0;option is ignored"`
Description string `toml:"description" deprecated:"1.17.0;option is ignored"`
TagsSlice [][]string `toml:"tags" deprecated:"1.25.0;use 'default_tags' instead"`
DefaultTags map[string]string `toml:"default_tags"`
FieldName string `toml:"name"`
Namespace string `toml:"namespace"`
IdentifierType string `toml:"identifier_type"`
Identifier string `toml:"identifier"`
DataType string `toml:"data_type" deprecated:"1.17.0;option is ignored"`
Description string `toml:"description" deprecated:"1.17.0;option is ignored"`
TagsSlice [][]string `toml:"tags" deprecated:"1.25.0;use 'default_tags' instead"`
DefaultTags map[string]string `toml:"default_tags"`
MonitoringParams MonitoringParameters `toml:"monitoring_params"`
}
// NodeID returns the OPC UA node id
@ -36,12 +66,13 @@ func (tag *NodeSettings) NodeID() string {
// NodeGroupSettings describes a mapping of group of nodes to Metrics
type NodeGroupSettings struct {
MetricName string `toml:"name"` // Overrides plugin's setting
Namespace string `toml:"namespace"` // Can be overridden by node setting
IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting
Nodes []NodeSettings `toml:"nodes"`
TagsSlice [][]string `toml:"tags" deprecated:"1.26.0;use default_tags"`
DefaultTags map[string]string `toml:"default_tags"`
MetricName string `toml:"name"` // Overrides plugin's setting
Namespace string `toml:"namespace"` // Can be overridden by node setting
IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting
Nodes []NodeSettings `toml:"nodes"`
TagsSlice [][]string `toml:"tags" deprecated:"1.26.0;use default_tags"`
DefaultTags map[string]string `toml:"default_tags"`
SamplingInterval config.Duration `toml:"sampling_interval"` // Can be overridden by monitoring parameters
}
type TimestampSource string
@ -318,6 +349,9 @@ func (o *OpcUAInputClient) InitNodeMetricMapping() error {
if node.IdentifierType == "" {
node.IdentifierType = group.IdentifierType
}
if node.MonitoringParams.SamplingInterval == 0 {
node.MonitoringParams.SamplingInterval = group.SamplingInterval
}
nmm, err := NewNodeMetricMapping(group.MetricName, node, groupTags)
if err != nil {

View File

@ -97,13 +97,44 @@ to use them.
## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque)
## identifier - OPC UA ID (tag as shown in opcua browser)
## default_tags - extra tags to be added to the output metric (optional)
## monitoring_params - additional settings for the monitored node (optional)
##
## Monitoring parameters
## sampling_interval - interval at which the server should check for data
## changes (default: 0s)
## queue_size - size of the notification queue (default: 10)
## discard_oldest - how notifications should be handled in case of full
## notification queues, possible values:
## true: oldest value added to queue gets replaced with new
## (default)
## false: last value added to queue gets replaced with new
## data_change_filter - defines the condition under which a notification should
## be reported
##
## Data change filter
## trigger - specify the conditions under which a data change notification
## should be reported, possible values:
## "Status": only report notifications if the status changes
## (default if parameter is omitted)
## "StatusValue": report notifications if either status or value
## changes
## "StatusValueTimestamp": report notifications if either status,
## value or timestamp changes
## deadband_type - type of the deadband filter to be applied, possible values:
## "Absolute": absolute change in a data value to report a notification
## "Percent": works only with nodes that have an EURange property set
## and is defined as: send notification if
## (last value - current value) >
## (deadband_value/100.0) * ((highlow) of EURange)
## deadband_value - value to deadband_type, must be a float value, no filter is set
## for negative values
##
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier=""},
# {name="", namespace="", identifier_type="", identifier=""},
# {name="node1", namespace="", identifier_type="", identifier="",}
# {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}},
# ]
#
## Bracketed notation
@ -120,6 +151,16 @@ to use them.
# identifier_type = ""
# identifier = ""
#
# [inputs.opcua_listener.nodes.monitoring_params]
# sampling_interval = "0s"
# queue_size = 10
# discard_oldest = true
#
# [inputs.opcua_listener.nodes.monitoring_params.data_change_filter]
# trigger = "Status"
# deadband_type = "Absolute"
# deadband_value = 0.0
#
## Node Group
## Sets defaults so they aren't required in every node.
## Default values can be set for:
@ -127,6 +168,7 @@ to use them.
## * OPC UA namespace
## * Identifier
## * Default tags
## * Sampling interval
##
## Multiple node groups are allowed
#[[inputs.opcua_listener.group]]
@ -147,13 +189,17 @@ to use them.
## example: default_tags = { tag1 = "value1" }
# default_tags = {}
#
## Group default sampling interval. If a node in the group doesn't set its
## sampling interval, this is used.
# sampling_interval = "0s"
#
## Node ID Configuration. Array of nodes with the same settings as above.
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="node1", namespace="", identifier_type="", identifier=""},
# {name="node2", namespace="", identifier_type="", identifier=""},
# {name="node1", namespace="", identifier_type="", identifier="",}
# {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}},
#]
#
## Bracketed notation
@ -169,7 +215,17 @@ to use them.
# namespace = ""
# identifier_type = ""
# identifier = ""
#
# [inputs.opcua_listener.group.nodes.monitoring_params]
# sampling_interval = "0s"
# queue_size = 10
# discard_oldest = true
#
# [inputs.opcua_listener.group.nodes.monitoring_params.data_change_filter]
# trigger = "Status"
# deadband_type = "Absolute"
# deadband_value = 0.0
#
## Enable workarounds required by some devices to work correctly
# [inputs.opcua_listener.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid
@ -201,11 +257,11 @@ opcua,id=ns\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000
## Group Configuration
Groups can set default values for the namespace, identifier type, and
tags settings. The default values apply to all the nodes in the
group. If a default is set, a node may omit the setting altogether.
This simplifies node configuration, especially when many nodes share
the same namespace or identifier type.
Groups can set default values for the namespace, identifier type, tags
settings and sampling interval. The default values apply to all the
nodes in the group. If a default is set, a node may omit the setting
altogether. This simplifies node configuration, especially when many
nodes share the same namespace or identifier type.
The output metric will include tags set in the group and the node. If
a tag with the same name is set in both places, the tag value from the

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/docker/go-connections/nat"
"github.com/gopcua/opcua/ua"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
@ -247,3 +248,329 @@ additional_valid_status_codes = ["0xC0"]
}, o.SubscribeClientConfig.Groups)
require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.SubscribeClientConfig.Workarounds)
}
func TestSubscribeClientConfigWithMonitoringParams(t *testing.T) {
toml := `
[[inputs.opcua_listener]]
name = "localhost"
endpoint = "opc.tcp://localhost:4840"
subscription_interval = "200ms"
[[inputs.opcua_listener.group]]
name = "foo"
namespace = "3"
identifier_type = "i"
tags = [["tag1", "val1"], ["tag2", "val2"]]
nodes = [{name="name3", identifier="3000", tags=[["tag3", "val3"]]}]
[inputs.opcua_listener.group.nodes.monitoring_params]
sampling_interval = "50ms"
queue_size = 10
discard_oldest = true
[inputs.opcua_listener.group.nodes.monitoring_params.data_change_filter]
trigger = "StatusValue"
deadband_type = "Absolute"
deadband_value = 100.0
`
c := config.NewConfig()
err := c.LoadConfigData([]byte(toml))
require.NoError(t, err)
require.Len(t, c.Inputs, 1)
o, ok := c.Inputs[0].Input.(*OpcUaListener)
require.True(t, ok)
queueSize := uint32(10)
discardOldest := true
deadbandValue := 100.0
require.Equal(t, []input.NodeGroupSettings{
{
MetricName: "foo",
Namespace: "3",
IdentifierType: "i",
TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}},
Nodes: []input.NodeSettings{{
FieldName: "name3",
Identifier: "3000",
TagsSlice: [][]string{{"tag3", "val3"}},
MonitoringParams: input.MonitoringParameters{
SamplingInterval: 50000000,
QueueSize: &queueSize,
DiscardOldest: &discardOldest,
DataChangeFilter: &input.DataChangeFilter{
Trigger: "StatusValue",
DeadbandType: "Absolute",
DeadbandValue: &deadbandValue,
},
},
}},
},
}, o.SubscribeClientConfig.Groups)
}
func TestSubscribeClientConfigInvalidTrigger(t *testing.T) {
subscribeConfig := SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{
FieldName: "foo",
Namespace: "3",
Identifier: "1",
IdentifierType: "i",
MonitoringParams: input.MonitoringParameters{
DataChangeFilter: &input.DataChangeFilter{
Trigger: "not_valid",
},
},
})
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "trigger 'not_valid' not supported, node 'ns=3;i=1'")
}
func TestSubscribeClientConfigMissingTrigger(t *testing.T) {
subscribeConfig := SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{
FieldName: "foo",
Namespace: "3",
Identifier: "1",
IdentifierType: "i",
MonitoringParams: input.MonitoringParameters{
DataChangeFilter: &input.DataChangeFilter{
DeadbandType: "Absolute",
},
},
})
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "trigger '' not supported, node 'ns=3;i=1'")
}
func TestSubscribeClientConfigInvalidDeadbandType(t *testing.T) {
subscribeConfig := SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{
FieldName: "foo",
Namespace: "3",
Identifier: "1",
IdentifierType: "i",
MonitoringParams: input.MonitoringParameters{
DataChangeFilter: &input.DataChangeFilter{
Trigger: "Status",
DeadbandType: "not_valid",
},
},
})
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "deadband_type 'not_valid' not supported, node 'ns=3;i=1'")
}
func TestSubscribeClientConfigMissingDeadbandType(t *testing.T) {
subscribeConfig := SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{
FieldName: "foo",
Namespace: "3",
Identifier: "1",
IdentifierType: "i",
MonitoringParams: input.MonitoringParameters{
DataChangeFilter: &input.DataChangeFilter{
Trigger: "Status",
},
},
})
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "deadband_type '' not supported, node 'ns=3;i=1'")
}
func TestSubscribeClientConfigInvalidDeadbandValue(t *testing.T) {
subscribeConfig := SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
SubscriptionInterval: 0,
}
deadbandValue := -1.0
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{
FieldName: "foo",
Namespace: "3",
Identifier: "1",
IdentifierType: "i",
MonitoringParams: input.MonitoringParameters{
DataChangeFilter: &input.DataChangeFilter{
Trigger: "Status",
DeadbandType: "Absolute",
DeadbandValue: &deadbandValue,
},
},
})
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "negative deadband_value not supported, node 'ns=3;i=1'")
}
func TestSubscribeClientConfigMissingDeadbandValue(t *testing.T) {
subscribeConfig := SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{
FieldName: "foo",
Namespace: "3",
Identifier: "1",
IdentifierType: "i",
MonitoringParams: input.MonitoringParameters{
DataChangeFilter: &input.DataChangeFilter{
Trigger: "Status",
DeadbandType: "Absolute",
},
},
})
_, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "deadband_value was not set, node 'ns=3;i=1'")
}
func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) {
subscribeConfig := SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
SubscriptionInterval: 0,
}
var queueSize uint32 = 10
discardOldest := true
deadbandValue := 10.0
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{
FieldName: "foo",
Namespace: "3",
Identifier: "1",
IdentifierType: "i",
MonitoringParams: input.MonitoringParameters{
SamplingInterval: 50000000,
QueueSize: &queueSize,
DiscardOldest: &discardOldest,
DataChangeFilter: &input.DataChangeFilter{
Trigger: "Status",
DeadbandType: "Absolute",
DeadbandValue: &deadbandValue,
},
},
})
subClient, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
require.NoError(t, err)
require.Equal(t, &ua.MonitoringParameters{
SamplingInterval: 50,
QueueSize: queueSize,
DiscardOldest: discardOldest,
Filter: ua.NewExtensionObject(
&ua.DataChangeFilter{
Trigger: ua.DataChangeTriggerStatus,
DeadbandType: uint32(ua.DeadbandTypeAbsolute),
DeadbandValue: deadbandValue,
},
),
}, subClient.monitoredItemsReqs[0].RequestedParameters)
}

View File

@ -58,13 +58,44 @@
## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque)
## identifier - OPC UA ID (tag as shown in opcua browser)
## default_tags - extra tags to be added to the output metric (optional)
## monitoring_params - additional settings for the monitored node (optional)
##
## Monitoring parameters
## sampling_interval - interval at which the server should check for data
## changes (default: 0s)
## queue_size - size of the notification queue (default: 10)
## discard_oldest - how notifications should be handled in case of full
## notification queues, possible values:
## true: oldest value added to queue gets replaced with new
## (default)
## false: last value added to queue gets replaced with new
## data_change_filter - defines the condition under which a notification should
## be reported
##
## Data change filter
## trigger - specify the conditions under which a data change notification
## should be reported, possible values:
## "Status": only report notifications if the status changes
## (default if parameter is omitted)
## "StatusValue": report notifications if either status or value
## changes
## "StatusValueTimestamp": report notifications if either status,
## value or timestamp changes
## deadband_type - type of the deadband filter to be applied, possible values:
## "Absolute": absolute change in a data value to report a notification
## "Percent": works only with nodes that have an EURange property set
## and is defined as: send notification if
## (last value - current value) >
## (deadband_value/100.0) * ((highlow) of EURange)
## deadband_value - value to deadband_type, must be a float value, no filter is set
## for negative values
##
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier=""},
# {name="", namespace="", identifier_type="", identifier=""},
# {name="node1", namespace="", identifier_type="", identifier="",}
# {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}},
# ]
#
## Bracketed notation
@ -81,6 +112,16 @@
# identifier_type = ""
# identifier = ""
#
# [inputs.opcua_listener.nodes.monitoring_params]
# sampling_interval = "0s"
# queue_size = 10
# discard_oldest = true
#
# [inputs.opcua_listener.nodes.monitoring_params.data_change_filter]
# trigger = "Status"
# deadband_type = "Absolute"
# deadband_value = 0.0
#
## Node Group
## Sets defaults so they aren't required in every node.
## Default values can be set for:
@ -88,6 +129,7 @@
## * OPC UA namespace
## * Identifier
## * Default tags
## * Sampling interval
##
## Multiple node groups are allowed
#[[inputs.opcua_listener.group]]
@ -108,13 +150,17 @@
## example: default_tags = { tag1 = "value1" }
# default_tags = {}
#
## Group default sampling interval. If a node in the group doesn't set its
## sampling interval, this is used.
# sampling_interval = "0s"
#
## Node ID Configuration. Array of nodes with the same settings as above.
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="node1", namespace="", identifier_type="", identifier=""},
# {name="node2", namespace="", identifier_type="", identifier=""},
# {name="node1", namespace="", identifier_type="", identifier="",}
# {name="node2", namespace="", identifier_type="", identifier="", monitoring_params={sampling_interval="0s", queue_size=10, discard_oldest=true, data_change_filter={trigger="Status", deadband_type="Absolute", deadband_value=0.0}}},
#]
#
## Bracketed notation
@ -130,7 +176,17 @@
# namespace = ""
# identifier_type = ""
# identifier = ""
#
# [inputs.opcua_listener.group.nodes.monitoring_params]
# sampling_interval = "0s"
# queue_size = 10
# discard_oldest = true
#
# [inputs.opcua_listener.group.nodes.monitoring_params.data_change_filter]
# trigger = "Status"
# deadband_type = "Absolute"
# deadband_value = 0.0
#
## Enable workarounds required by some devices to work correctly
# [inputs.opcua_listener.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid

View File

@ -32,6 +32,52 @@ type SubscribeClient struct {
processingCancel context.CancelFunc
}
func checkDataChangeFilterParameters(params *input.DataChangeFilter) error {
switch {
case params.Trigger != input.Status &&
params.Trigger != input.StatusValue &&
params.Trigger != input.StatusValueTimestamp:
return fmt.Errorf("trigger '%s' not supported", params.Trigger)
case params.DeadbandType != input.Absolute &&
params.DeadbandType != input.Percent:
return fmt.Errorf("deadband_type '%s' not supported", params.DeadbandType)
case params.DeadbandValue == nil:
return fmt.Errorf("deadband_value was not set")
case *params.DeadbandValue < 0:
return fmt.Errorf("negative deadband_value not supported")
default:
return nil
}
}
func assignConfigValuesToRequest(req *ua.MonitoredItemCreateRequest, monParams *input.MonitoringParameters) error {
req.RequestedParameters.SamplingInterval = float64(time.Duration(monParams.SamplingInterval) / time.Millisecond)
if monParams.QueueSize != nil {
req.RequestedParameters.QueueSize = *monParams.QueueSize
}
if monParams.DiscardOldest != nil {
req.RequestedParameters.DiscardOldest = *monParams.DiscardOldest
}
if monParams.DataChangeFilter != nil {
if err := checkDataChangeFilterParameters(monParams.DataChangeFilter); err != nil {
return fmt.Errorf(err.Error()+", node '%s'", req.ItemToMonitor.NodeID)
}
req.RequestedParameters.Filter = ua.NewExtensionObject(
&ua.DataChangeFilter{
Trigger: ua.DataChangeTriggerFromString(string(monParams.DataChangeFilter.Trigger)),
DeadbandType: uint32(ua.DeadbandTypeFromString(string(monParams.DataChangeFilter.DeadbandType))),
DeadbandValue: *monParams.DataChangeFilter.DeadbandValue,
},
)
}
return nil
}
func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*SubscribeClient, error) {
client, err := sc.InputClientConfig.CreateInputClient(log)
if err != nil {
@ -57,6 +103,9 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
for i, nodeID := range client.NodeIDs {
// The node id index (i) is used as the handle for the monitored item
req := opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, uint32(i))
if err := assignConfigValuesToRequest(req, &client.NodeMetricMapping[i].Tag.MonitoringParams); err != nil {
return nil, err
}
subClient.monitoredItemsReqs[i] = req
}