feat(inputs.opcua_listener): Allow to subscribe to OPCUA events (#16532)

Co-authored-by: Sven Rebhan <srebhan@influxdata.com>
This commit is contained in:
frmoschner 2025-04-16 15:15:12 +02:00 committed by GitHub
parent f033a75bc4
commit 508398d454
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 958 additions and 52 deletions

View File

@ -9,6 +9,7 @@ import (
"strings"
"time"
"github.com/gopcua/opcua/id"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
@ -75,6 +76,80 @@ type NodeGroupSettings struct {
SamplingInterval config.Duration `toml:"sampling_interval"` // Can be overridden by monitoring parameters
}
type EventNodeSettings struct {
Namespace string `toml:"namespace"`
IdentifierType string `toml:"identifier_type"`
Identifier string `toml:"identifier"`
}
func (e *EventNodeSettings) NodeID() string {
return "ns=" + e.Namespace + ";" + e.IdentifierType + "=" + e.Identifier
}
type EventGroupSettings struct {
SamplingInterval config.Duration `toml:"sampling_interval"`
QueueSize uint32 `toml:"queue_size"`
EventTypeNode EventNodeSettings `toml:"event_type_node"`
Namespace string `toml:"namespace"`
IdentifierType string `toml:"identifier_type"`
NodeIDSettings []EventNodeSettings `toml:"node_ids"`
SourceNames []string `toml:"source_names"`
Fields []string `toml:"fields"`
}
func (e *EventGroupSettings) UpdateNodeIDSettings() {
for i := range e.NodeIDSettings {
n := &e.NodeIDSettings[i]
if n.Namespace == "" {
n.Namespace = e.Namespace
}
if n.IdentifierType == "" {
n.IdentifierType = e.IdentifierType
}
}
}
func (e *EventGroupSettings) Validate() error {
if err := e.EventTypeNode.validateEventNodeSettings(); err != nil {
return fmt.Errorf("invalid event_type_node_settings: %w", err)
}
if len(e.NodeIDSettings) == 0 {
return errors.New("at least one node_id must be specified")
}
for _, node := range e.NodeIDSettings {
if err := node.validateEventNodeSettings(); err != nil {
return fmt.Errorf("invalid node_id_settings: %w", err)
}
}
if len(e.Fields) == 0 {
return errors.New("at least one Field must be specified")
}
for _, field := range e.Fields {
if field == "" {
return errors.New("empty field name in fields stanza")
}
}
return nil
}
func (e EventNodeSettings) validateEventNodeSettings() error {
var defaultNodeSettings EventNodeSettings
if e == defaultNodeSettings {
return errors.New("node settings can't be empty")
}
if e.Identifier == "" {
return errors.New("identifier must be set")
} else if e.IdentifierType == "" {
return errors.New("identifier_type must be set")
} else if e.Namespace == "" {
return errors.New("namespace must be set")
}
return nil
}
type TimestampSource string
const (
@ -91,6 +166,7 @@ type InputClientConfig struct {
TimestampFormat string `toml:"timestamp_format"`
RootNodes []NodeSettings `toml:"nodes"`
Groups []NodeGroupSettings `toml:"group"`
EventGroups []EventGroupSettings `toml:"events"`
}
func (o *InputClientConfig) Validate() error {
@ -107,8 +183,8 @@ func (o *InputClientConfig) Validate() error {
o.TimestampFormat = time.RFC3339Nano
}
if len(o.Groups) == 0 && len(o.RootNodes) == 0 {
return errors.New("no groups or root nodes provided to gather from")
if len(o.Groups) == 0 && len(o.RootNodes) == 0 && o.EventGroups == nil {
return errors.New("no groups, root nodes or events provided to gather from")
}
for _, group := range o.Groups {
if len(group.Nodes) == 0 {
@ -124,6 +200,15 @@ func (o *InputClientConfig) CreateInputClient(log telegraf.Logger) (*OpcUAInputC
return nil, err
}
if o.EventGroups != nil {
for _, eventGroup := range o.EventGroups {
eventGroup.UpdateNodeIDSettings()
if err := eventGroup.Validate(); err != nil {
return nil, fmt.Errorf("invalid event_settings: %w", err)
}
}
}
log.Debug("Initialising OpcUAInputClient")
opcClient, err := o.OpcUAClientConfig.CreateClient(log)
if err != nil {
@ -134,6 +219,7 @@ func (o *InputClientConfig) CreateInputClient(log telegraf.Logger) (*OpcUAInputC
OpcUAClient: opcClient,
Log: log,
Config: *o,
EventGroups: o.EventGroups,
}
log.Debug("Initialising node to metric mapping")
@ -185,6 +271,15 @@ func NewNodeMetricMapping(metricName string, node NodeSettings, groupTags map[st
}, nil
}
type EventNodeMetricMapping struct {
NodeID *ua.NodeID
SamplingInterval *config.Duration
QueueSize *uint32
EventTypeNode *ua.NodeID
SourceNames []string
Fields []string
}
// NodeValue The received value for a node
type NodeValue struct {
TagName string
@ -206,6 +301,8 @@ type OpcUAInputClient struct {
NodeMetricMapping []NodeMetricMapping
NodeIDs []*ua.NodeID
LastReceivedData []NodeValue
EventGroups []EventGroupSettings
EventNodeMetricMapping []EventNodeMetricMapping
}
// Stop the connection to the client
@ -381,6 +478,33 @@ func (o *OpcUAInputClient) InitNodeIDs() error {
return nil
}
func (o *OpcUAInputClient) InitEventNodeIDs() error {
for _, eventSetting := range o.EventGroups {
eid, err := ua.ParseNodeID(eventSetting.EventTypeNode.NodeID())
if err != nil {
return err
}
for _, node := range eventSetting.NodeIDSettings {
nid, err := ua.ParseNodeID(node.NodeID())
if err != nil {
return err
}
nmm := EventNodeMetricMapping{
NodeID: nid,
SamplingInterval: &eventSetting.SamplingInterval,
QueueSize: &eventSetting.QueueSize,
EventTypeNode: eid,
SourceNames: eventSetting.SourceNames,
Fields: eventSetting.Fields,
}
o.EventNodeMetricMapping = append(o.EventNodeMetricMapping, nmm)
}
}
return nil
}
func (o *OpcUAInputClient) initLastReceivedValues() {
o.LastReceivedData = make([]NodeValue, len(o.NodeMetricMapping))
for nodeIdx, nmm := range o.NodeMetricMapping {
@ -448,3 +572,151 @@ func (o *OpcUAInputClient) MetricForNode(nodeIdx int) telegraf.Metric {
return metric.New(nmm.metricName, tags, fields, t)
}
func (o *OpcUAInputClient) MetricForEvent(nodeIdx int, event *ua.EventFieldList) telegraf.Metric {
node := o.EventNodeMetricMapping[nodeIdx]
fields := make(map[string]interface{}, len(event.EventFields))
for i, field := range event.EventFields {
name := node.Fields[i]
value := field.Value()
if value == nil {
o.Log.Warnf("Field %s has no value", name)
continue
}
switch v := value.(type) {
case *ua.LocalizedText:
fields[name] = v.Text
case time.Time:
fields[name] = v.Format(time.RFC3339)
default:
fields[name] = v
}
}
tags := map[string]string{
"node_id": node.NodeID.String(),
"source": o.Config.Endpoint,
}
var t time.Time
switch o.Config.Timestamp {
case TimestampSourceServer:
t = o.LastReceivedData[nodeIdx].ServerTime
case TimestampSourceSource:
t = o.LastReceivedData[nodeIdx].SourceTime
default:
t = time.Now()
}
return metric.New("opcua_event", tags, fields, t)
}
// Creation of event filter for event streaming
func (node *EventNodeMetricMapping) CreateEventFilter() (*ua.ExtensionObject, error) {
selects, err := node.createSelectClauses()
if err != nil {
return nil, err
}
wheres, err := node.createWhereClauses()
if err != nil {
return nil, err
}
return &ua.ExtensionObject{
EncodingMask: ua.ExtensionObjectBinary,
TypeID: &ua.ExpandedNodeID{NodeID: ua.NewNumericNodeID(0, id.EventFilter_Encoding_DefaultBinary)},
Value: ua.EventFilter{
SelectClauses: selects,
WhereClause: wheres,
},
}, nil
}
func (node *EventNodeMetricMapping) createSelectClauses() ([]*ua.SimpleAttributeOperand, error) {
selects := make([]*ua.SimpleAttributeOperand, len(node.Fields))
typeDefinition, err := node.determineNodeIDType()
if err != nil {
return nil, err
}
for i, name := range node.Fields {
selects[i] = &ua.SimpleAttributeOperand{
TypeDefinitionID: typeDefinition,
BrowsePath: []*ua.QualifiedName{{NamespaceIndex: 0, Name: name}},
AttributeID: ua.AttributeIDValue,
}
}
return selects, nil
}
func (node *EventNodeMetricMapping) createWhereClauses() (*ua.ContentFilter, error) {
if len(node.SourceNames) == 0 {
return &ua.ContentFilter{
Elements: make([]*ua.ContentFilterElement, 0),
}, nil
}
operands := make([]*ua.ExtensionObject, 0)
for _, sourceName := range node.SourceNames {
literalOperand := &ua.ExtensionObject{
EncodingMask: 1,
TypeID: &ua.ExpandedNodeID{
NodeID: ua.NewNumericNodeID(0, id.LiteralOperand_Encoding_DefaultBinary),
},
Value: ua.LiteralOperand{
Value: ua.MustVariant(sourceName),
},
}
operands = append(operands, literalOperand)
}
typeDefinition, err := node.determineNodeIDType()
if err != nil {
return nil, err
}
attributeOperand := &ua.ExtensionObject{
EncodingMask: ua.ExtensionObjectBinary,
TypeID: &ua.ExpandedNodeID{
NodeID: ua.NewNumericNodeID(0, id.SimpleAttributeOperand_Encoding_DefaultBinary),
},
Value: &ua.SimpleAttributeOperand{
TypeDefinitionID: typeDefinition,
BrowsePath: []*ua.QualifiedName{
{NamespaceIndex: 0, Name: "SourceName"},
},
AttributeID: ua.AttributeIDValue,
},
}
filterElement := &ua.ContentFilterElement{
FilterOperator: ua.FilterOperatorInList,
FilterOperands: append([]*ua.ExtensionObject{attributeOperand}, operands...),
}
wheres := &ua.ContentFilter{
Elements: []*ua.ContentFilterElement{filterElement},
}
return wheres, nil
}
func (node *EventNodeMetricMapping) determineNodeIDType() (*ua.NodeID, error) {
switch node.EventTypeNode.Type() {
case ua.NodeIDTypeGUID:
return ua.NewGUIDNodeID(node.EventTypeNode.Namespace(), node.EventTypeNode.StringID()), nil
case ua.NodeIDTypeString:
return ua.NewStringNodeID(node.EventTypeNode.Namespace(), node.EventTypeNode.StringID()), nil
case ua.NodeIDTypeByteString:
return ua.NewByteStringNodeID(node.EventTypeNode.Namespace(), []byte(node.EventTypeNode.StringID())), nil
case ua.NodeIDTypeTwoByte:
nodeID := node.EventTypeNode.IntID()
if nodeID > 255 {
return nil, fmt.Errorf("twoByte EventType requires a value in the range 0-255, got %d", nodeID)
}
return ua.NewTwoByteNodeID(uint8(node.EventTypeNode.IntID())), nil
case ua.NodeIDTypeFourByte:
return ua.NewFourByteNodeID(uint8(node.EventTypeNode.Namespace()), uint16(node.EventTypeNode.IntID())), nil
case ua.NodeIDTypeNumeric:
return ua.NewNumericNodeID(node.EventTypeNode.Namespace(), node.EventTypeNode.IntID()), nil
default:
return nil, fmt.Errorf("unsupported NodeID type: %v", node.EventTypeNode.String())
}
}

View File

@ -1,9 +1,13 @@
# OPC UA Client Listener Input Plugin
The `opcua_listener` plugin subscribes to data from OPC UA Server devices.
This service plugin subscribes to data and events from an [OPC UA][opcua]
erver.
Telegraf minimum version: Telegraf 1.25
Plugin minimum tested version: 1.25
⭐ Telegraf v1.25.0
🏷️ network
💻 linux, windows
[opcua]: https://opcfoundation.org/
## Service Input <!-- @/docs/includes/service_input.md -->
@ -250,17 +254,42 @@ to use them.
# deadband_value = 0.0
#
## Multiple event groups are allowed.
# [[inputs.opcua_listener.events]]
# ## Polling interval for data collection
# # sampling_interval = "10s"
# ## Size of the notification queue
# # queue_size = 10
# ## Node parameter defaults for node definitions below
# # namespace = ""
# # identifier_type = ""
# ## Specifies OPCUA Event sources to filter on
# # source_names = ["SourceName1", "SourceName2"]
# ## Fields to capture from event notifications
# fields = ["Severity", "Message", "Time"]
#
# ## Type or level of events to capture from the monitored nodes.
# [inputs.opcua_listener.events.event_type_node]
# namespace = ""
# identifier_type = ""
# identifier = ""
#
# ## Nodes to monitor for event notifications associated with the defined
# ## event type
# [[inputs.opcua_listener.events.node_ids]]
# namespace = ""
# identifier_type = ""
# identifier = ""
## Enable workarounds required by some devices to work correctly
# [inputs.opcua_listener.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid
# additional_valid_status_codes = ["0xC0"]
# [inputs.opcua_listener.request_workarounds]
## Use unregistered reads instead of registered reads
# use_unregistered_reads = false
# ## Set additional valid status codes, StatusOK (0x0) is always considered valid
# # additional_valid_status_codes = ["0xC0"]
# ## Use unregistered reads instead of registered reads
# # use_unregistered_reads = false
```
## Node Configuration
### Node Configuration
An OPC UA node ID may resemble: "ns=3;s=Temperature". In this example:
@ -286,7 +315,7 @@ produces a metric like this:
opcua,id=ns\=3;s\=Temperature temp=79.0,Quality="OK (0x0)",DataType="Float" 1597820490000000000
```
## Group Configuration
#### Group Configuration
Groups can set default values for the namespace, identifier type, tags
settings and sampling interval. The default values apply to all the
@ -342,16 +371,68 @@ This example group configuration has three groups with two nodes each:
]
```
## Connection Service
### Event Configuration
This plugin subscribes to the specified nodes to receive data from
the OPC server. The updates are received at most as fast as the
`subscription_interval`.
Defining events allows subscribing to events with the specific node IDs and
filtering criteria based on the event type and source. The plugin subscribes to
the specified `event_type` Node-IDs and collects events that meet the defined
criteria. The `node_ids` parameter specifies the nodes to monitor for events
(monitored items). However, the actual subscription is based on the
`event_type_node` determining the events to capture.
#### Event Group Configuration
You can define multiple groups for the event streaming to subscribe to different
event types. Each group allows to specify defaults for `namespace` and
`identifier_type` being overwritten by settings in `node_ids`. The group
defaults for node information will not affected the `event_type_node` setting
and all paramters must be set in this section.
This example group configuration shows how to use group settings:
```toml
# Group 1
[[inputs.opcua_listener.events]]
sampling_interval = "10s"
queue_size = "100"
source_names = ["SourceName1", "SourceName2"]
fields = ["Severity", "Message", "Time"]
[inputs.opcua_listener.events.event_type_node]
namespace = "1"
identifier_type = "i"
identifier = "1234"
[[inputs.opcua_listener.events.node_ids]]
namespace = "2"
identifier_type = "i"
identifier = "2345"
# Group 2
[[inputs.opcua_listener.events]]
sampling_interval = "10s"
queue_size = "100"
namespace = "3"
identifier_type = "s"
source_names = ["SourceName1", "SourceName2"]
fields = ["Severity", "Message", "Time"]
[inputs.opcua_listener.events.event_type_node]
namespace = "1"
identifier_type = "i"
identifier = "5678"
node_ids = [
{identifier="Sensor1"}, // default values will be used for namespace and identifier_type
{namespace="2", identifier="TemperatureSensor"}, // default values will be used for identifier_type
{namespace="5", identifier_type="i", identifier="2002"} // no default values will be used
]
```
## Metrics
The metrics collected by this input plugin will depend on the
configured `nodes` and `group`.
The metrics collected by this input plugin will depend on the configured
`nodes`, `events` and the corresponding groups.
## Example Output

View File

@ -64,7 +64,7 @@ func (o *OpcUaListener) Stop() {
func (o *OpcUaListener) connect(acc telegraf.Accumulator) error {
ctx := context.Background()
ch, err := o.client.startStreamValues(ctx)
ch, err := o.client.startMonitoring(ctx)
if err != nil {
return err
}

View File

@ -193,7 +193,7 @@ func TestSubscribeClientIntegration(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), time.Second*10)
defer cancel()
res, err := o.startStreamValues(ctx)
res, err := o.startMonitoring(ctx)
require.Equal(t, opcua.Connected, o.State())
require.NoError(t, err)
@ -336,7 +336,7 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), time.Second*10)
defer cancel()
res, err := o.startStreamValues(ctx)
res, err := o.startMonitoring(ctx)
require.NoError(t, err)
for {
@ -814,3 +814,482 @@ func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) {
),
}, subClient.monitoredItemsReqs[0].RequestedParameters)
}
func TestSubscribeClientConfigValidMonitoringAndEventParams(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
var queueSize uint32 = 10
discardOldest := true
deadbandValue := 10.0
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{
FieldName: "foo",
Namespace: "3",
Identifier: "1",
IdentifierType: "i",
MonitoringParams: input.MonitoringParameters{
SamplingInterval: 50000000,
QueueSize: &queueSize,
DiscardOldest: &discardOldest,
DataChangeFilter: &input.DataChangeFilter{
Trigger: "Status",
DeadbandType: "Absolute",
DeadbandValue: &deadbandValue,
},
},
})
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
SamplingInterval: 1.0,
EventTypeNode: input.EventNodeSettings{
Namespace: "3",
IdentifierType: "i",
Identifier: "1234",
},
Namespace: "3",
IdentifierType: "i",
NodeIDSettings: []input.EventNodeSettings{
{
Namespace: "3",
IdentifierType: "i",
Identifier: "12",
},
{
Namespace: "3",
IdentifierType: "i",
Identifier: "13",
},
},
SourceNames: []string{"SensorXYZ"},
Fields: []string{"PressureValue"},
})
subClient, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.NoError(t, err)
require.Equal(t, &ua.MonitoringParameters{
SamplingInterval: 50,
QueueSize: queueSize,
DiscardOldest: discardOldest,
Filter: ua.NewExtensionObject(
&ua.DataChangeFilter{
Trigger: ua.DataChangeTriggerStatus,
DeadbandType: uint32(ua.DeadbandTypeAbsolute),
DeadbandValue: deadbandValue,
},
),
}, subClient.monitoredItemsReqs[0].RequestedParameters)
}
func TestSubscribeClientConfigValidEventStreamingParams(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
SamplingInterval: 1.0,
EventTypeNode: input.EventNodeSettings{
Namespace: "3",
IdentifierType: "i",
Identifier: "1234",
},
Namespace: "3",
IdentifierType: "i",
NodeIDSettings: []input.EventNodeSettings{
{
Namespace: "3",
IdentifierType: "i",
Identifier: "12",
},
{
Namespace: "3",
IdentifierType: "i",
Identifier: "13",
},
},
SourceNames: []string{"SensorXYZ"},
Fields: []string{"PressureValue"},
})
_, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.NoError(t, err)
}
func TestSubscribeClientConfigEventInputMissingSamplingInterval(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
EventTypeNode: input.EventNodeSettings{
Namespace: "3",
IdentifierType: "i",
Identifier: "1234",
},
Namespace: "3",
IdentifierType: "i",
NodeIDSettings: []input.EventNodeSettings{
{
Namespace: "3",
IdentifierType: "i",
Identifier: "12",
},
},
SourceNames: []string{"SensorXYZ"},
Fields: []string{"PressureValue"},
})
_, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.NoError(t, err)
}
func TestSubscribeClientConfigEventInputMissingEventType(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
SamplingInterval: 1.0,
Namespace: "3",
IdentifierType: "i",
NodeIDSettings: []input.EventNodeSettings{
{
Namespace: "3",
IdentifierType: "i",
Identifier: "12",
},
},
SourceNames: []string{"SensorXYZ"},
Fields: []string{"PressureValue"},
})
_, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "invalid event_type_node_settings")
}
func TestSubscribeClientConfigEventMissingEventTypeNamespace(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
SamplingInterval: 1.0,
EventTypeNode: input.EventNodeSettings{
IdentifierType: "i",
Identifier: "1234",
},
Namespace: "3",
IdentifierType: "i",
NodeIDSettings: []input.EventNodeSettings{
{
Namespace: "3",
IdentifierType: "i",
Identifier: "12",
},
},
SourceNames: []string{"SensorXYZ"},
Fields: []string{"PressureValue"},
})
_, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "namespace must be set")
}
func TestSubscribeClientConfigEventMissingEventTypeIdentifierType(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
SamplingInterval: 1.0,
EventTypeNode: input.EventNodeSettings{
Namespace: "3",
Identifier: "1234",
},
Namespace: "3",
IdentifierType: "i",
NodeIDSettings: []input.EventNodeSettings{
{
Namespace: "3",
IdentifierType: "i",
Identifier: "12",
},
},
SourceNames: []string{"SensorXYZ"},
Fields: []string{"PressureValue"},
})
_, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "identifier_type must be set")
}
func TestSubscribeClientConfigEventMissingEventTypeIdentifier(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
SamplingInterval: 1.0,
EventTypeNode: input.EventNodeSettings{
Namespace: "3",
IdentifierType: "i",
},
Namespace: "3",
IdentifierType: "i",
NodeIDSettings: []input.EventNodeSettings{
{
Namespace: "3",
IdentifierType: "i",
Identifier: "12",
},
},
SourceNames: []string{"SensorXYZ"},
Fields: []string{"PressureValue"},
})
_, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "identifier must be set")
}
func TestSubscribeClientConfigEventInputMissingNodeIDs(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
EventTypeNode: input.EventNodeSettings{
Namespace: "3",
IdentifierType: "i",
Identifier: "1234",
},
Namespace: "3",
IdentifierType: "i",
SourceNames: []string{"SensorXYZ"},
Fields: []string{"PressureValue"},
})
_, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "at least one node_id must be specified")
}
func TestSubscribeClientConfigEventInputMissingFields(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
SamplingInterval: 1.0,
EventTypeNode: input.EventNodeSettings{
Namespace: "3",
IdentifierType: "i",
Identifier: "1234",
},
Namespace: "3",
IdentifierType: "i",
NodeIDSettings: []input.EventNodeSettings{
{
Namespace: "3",
IdentifierType: "i",
Identifier: "12",
},
},
SourceNames: []string{"SensorXYZ"},
})
_, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "at least one Field must be specified")
}
func TestSubscribeClientConfigEventInputInvalidFields(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
SamplingInterval: 1.0,
EventTypeNode: input.EventNodeSettings{
Namespace: "3",
IdentifierType: "i",
Identifier: "1234",
},
Namespace: "3",
IdentifierType: "i",
NodeIDSettings: []input.EventNodeSettings{
{
Namespace: "3",
IdentifierType: "i",
Identifier: "12",
},
},
SourceNames: []string{"SensorXYZ"},
Fields: []string{"Fieldname", ""},
})
_, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.ErrorContains(t, err, "empty field name in fields stanza")
}
func TestSubscribeClientConfigValidEventStreamingDefaultNodeParams(t *testing.T) {
subscribeConfig := subscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
EventGroups: make([]input.EventGroupSettings, 0),
},
SubscriptionInterval: 0,
}
subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{
SamplingInterval: 1.0,
EventTypeNode: input.EventNodeSettings{
Namespace: "3",
IdentifierType: "i",
Identifier: "1234",
},
Namespace: "3",
IdentifierType: "i",
NodeIDSettings: []input.EventNodeSettings{
{
Identifier: "12",
},
},
SourceNames: []string{"SensorXYZ"},
Fields: []string{"PressureValue"},
})
_, err := subscribeConfig.createSubscribeClient(testutil.Logger{})
require.NoError(t, err)
o := subscribeConfig.InputClientConfig.EventGroups[0].NodeIDSettings[0]
require.Equal(t, "i", o.IdentifierType)
require.Equal(t, "3", o.Namespace)
}

View File

@ -211,11 +211,36 @@
# deadband_value = 0.0
#
## Multiple event groups are allowed.
# [[inputs.opcua_listener.events]]
# ## Polling interval for data collection
# # sampling_interval = "10s"
# ## Size of the notification queue
# # queue_size = 10
# ## Node parameter defaults for node definitions below
# # namespace = ""
# # identifier_type = ""
# ## Specifies OPCUA Event sources to filter on
# # source_names = ["SourceName1", "SourceName2"]
# ## Fields to capture from event notifications
# fields = ["Severity", "Message", "Time"]
#
# ## Type or level of events to capture from the monitored nodes.
# [inputs.opcua_listener.events.event_type_node]
# namespace = ""
# identifier_type = ""
# identifier = ""
#
# ## Nodes to monitor for event notifications associated with the defined
# ## event type
# [[inputs.opcua_listener.events.node_ids]]
# namespace = ""
# identifier_type = ""
# identifier = ""
## Enable workarounds required by some devices to work correctly
# [inputs.opcua_listener.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid
# additional_valid_status_codes = ["0xC0"]
# [inputs.opcua_listener.request_workarounds]
## Use unregistered reads instead of registered reads
# use_unregistered_reads = false
# ## Set additional valid status codes, StatusOK (0x0) is always considered valid
# # additional_valid_status_codes = ["0xC0"]
# ## Use unregistered reads instead of registered reads
# # use_unregistered_reads = false

View File

@ -28,6 +28,7 @@ type subscribeClient struct {
sub *opcua.Subscription
monitoredItemsReqs []*ua.MonitoredItemCreateRequest
eventItemsReqs []*ua.MonitoredItemCreateRequest
dataNotifications chan *opcua.PublishNotificationData
metrics chan telegraf.Metric
@ -91,11 +92,17 @@ func (sc *subscribeClientConfig) createSubscribeClient(log telegraf.Logger) (*su
return nil, err
}
if err := client.InitEventNodeIDs(); err != nil {
return nil, err
}
processingCtx, processingCancel := context.WithCancel(context.Background())
subClient := &subscribeClient{
OpcUAInputClient: client,
Config: *sc,
monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeIDs)),
eventItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.EventNodeMetricMapping)),
// 100 was chosen to make sure that the channels will not block when multiple changes come in at the same time.
// The channel size should be increased if reports come in on Telegraf blocking when many changes come in at
// the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval.
@ -115,6 +122,23 @@ func (sc *subscribeClientConfig) createSubscribeClient(log telegraf.Logger) (*su
subClient.monitoredItemsReqs[i] = req
}
log.Debugf("Creating event streaming items")
for i, node := range client.EventNodeMetricMapping {
req := opcua.NewMonitoredItemCreateRequestWithDefaults(node.NodeID, ua.AttributeIDEventNotifier, uint32(i))
if node.SamplingInterval != nil {
req.RequestedParameters.SamplingInterval = float64(time.Duration(*node.SamplingInterval) / time.Millisecond)
}
if node.QueueSize != nil {
req.RequestedParameters.QueueSize = *node.QueueSize
}
filterExtObj, err := node.CreateEventFilter()
if err != nil {
return nil, fmt.Errorf("failed to create event filter: %w", err)
}
req.RequestedParameters.Filter = filterExtObj
subClient.eventItemsReqs[i] = req
}
return subClient, nil
}
@ -152,7 +176,7 @@ func (o *subscribeClient) stop(ctx context.Context) <-chan struct{} {
return closing
}
func (o *subscribeClient) startStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) {
func (o *subscribeClient) startMonitoring(ctx context.Context) (<-chan telegraf.Metric, error) {
err := o.connect()
if err != nil {
switch o.Config.ConnectFailBehavior {
@ -166,6 +190,7 @@ func (o *subscribeClient) startStreamValues(ctx context.Context) (<-chan telegra
return nil, err
}
if len(o.monitoredItemsReqs) != 0 {
resp, err := o.sub.Monitor(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...)
if err != nil {
return nil, fmt.Errorf("failed to start monitoring items: %w", err)
@ -181,10 +206,24 @@ func (o *subscribeClient) startStreamValues(ctx context.Context) (<-chan telegra
} else {
o.Log.Debugf("Failed to create monitored item for node %v (%v)", o.OpcUAInputClient.NodeMetricMapping[idx].Tag.FieldName, '?')
}
return nil, fmt.Errorf("creating monitored item failed with status code: %w", res.StatusCode)
}
}
}
if len(o.eventItemsReqs) != 0 {
resp, err := o.sub.Monitor(ctx, ua.TimestampsToReturnBoth, o.eventItemsReqs...)
if err != nil {
return nil, fmt.Errorf("failed to start monitoring event stream: %w", err)
}
o.Log.Debug("Monitoring events")
for _, res := range resp.Results {
if !o.StatusCodeOK(res.StatusCode) {
return nil, fmt.Errorf("creating monitored event streaming item failed with status code: %w", res.StatusCode)
}
}
}
go o.processReceivedNotifications()
@ -207,6 +246,10 @@ func (o *subscribeClient) processReceivedNotifications() {
o.Log.Error(res.Error)
continue
}
if res.Value == nil {
o.Log.Error("Received nil notification")
return
}
switch notif := res.Value.(type) {
case *ua.DataChangeNotification:
@ -220,7 +263,13 @@ func (o *subscribeClient) processReceivedNotifications() {
o.NodeIDs[i].String(), oldValue, o.LastReceivedData[i].Value)
o.metrics <- o.MetricForNode(i)
}
case *ua.EventNotificationList:
o.Log.Debugf("Processing event notification with %d events", len(notif.Events))
// It is assumed the events are ordered chronologically
for _, event := range notif.Events {
i := int(event.ClientHandle)
o.metrics <- o.MetricForEvent(i, event)
}
default:
o.Log.Warnf("Received notification has unexpected type %s", reflect.TypeOf(res.Value))
}