From 508398d4542d2828f4641429ddbb0892c0e98454 Mon Sep 17 00:00:00 2001 From: frmoschner <155885364+frmoschner@users.noreply.github.com> Date: Wed, 16 Apr 2025 15:15:12 +0200 Subject: [PATCH] feat(inputs.opcua_listener): Allow to subscribe to OPCUA events (#16532) Co-authored-by: Sven Rebhan --- plugins/common/opcua/input/input_client.go | 292 ++++++++++- plugins/inputs/opcua_listener/README.md | 115 ++++- .../inputs/opcua_listener/opcua_listener.go | 2 +- .../opcua_listener/opcua_listener_test.go | 483 +++++++++++++++++- plugins/inputs/opcua_listener/sample.conf | 37 +- .../inputs/opcua_listener/subscribe_client.go | 81 ++- 6 files changed, 958 insertions(+), 52 deletions(-) diff --git a/plugins/common/opcua/input/input_client.go b/plugins/common/opcua/input/input_client.go index 7e741a999..411673945 100644 --- a/plugins/common/opcua/input/input_client.go +++ b/plugins/common/opcua/input/input_client.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "github.com/gopcua/opcua/id" "github.com/gopcua/opcua/ua" "github.com/influxdata/telegraf" @@ -75,6 +76,80 @@ type NodeGroupSettings struct { SamplingInterval config.Duration `toml:"sampling_interval"` // Can be overridden by monitoring parameters } +type EventNodeSettings struct { + Namespace string `toml:"namespace"` + IdentifierType string `toml:"identifier_type"` + Identifier string `toml:"identifier"` +} + +func (e *EventNodeSettings) NodeID() string { + return "ns=" + e.Namespace + ";" + e.IdentifierType + "=" + e.Identifier +} + +type EventGroupSettings struct { + SamplingInterval config.Duration `toml:"sampling_interval"` + QueueSize uint32 `toml:"queue_size"` + EventTypeNode EventNodeSettings `toml:"event_type_node"` + Namespace string `toml:"namespace"` + IdentifierType string `toml:"identifier_type"` + NodeIDSettings []EventNodeSettings `toml:"node_ids"` + SourceNames []string `toml:"source_names"` + Fields []string `toml:"fields"` +} + +func (e *EventGroupSettings) UpdateNodeIDSettings() { + for i := range e.NodeIDSettings { + n := &e.NodeIDSettings[i] + if n.Namespace == "" { + n.Namespace = e.Namespace + } + if n.IdentifierType == "" { + n.IdentifierType = e.IdentifierType + } + } +} + +func (e *EventGroupSettings) Validate() error { + if err := e.EventTypeNode.validateEventNodeSettings(); err != nil { + return fmt.Errorf("invalid event_type_node_settings: %w", err) + } + + if len(e.NodeIDSettings) == 0 { + return errors.New("at least one node_id must be specified") + } + + for _, node := range e.NodeIDSettings { + if err := node.validateEventNodeSettings(); err != nil { + return fmt.Errorf("invalid node_id_settings: %w", err) + } + } + + if len(e.Fields) == 0 { + return errors.New("at least one Field must be specified") + } + for _, field := range e.Fields { + if field == "" { + return errors.New("empty field name in fields stanza") + } + } + return nil +} + +func (e EventNodeSettings) validateEventNodeSettings() error { + var defaultNodeSettings EventNodeSettings + if e == defaultNodeSettings { + return errors.New("node settings can't be empty") + } + if e.Identifier == "" { + return errors.New("identifier must be set") + } else if e.IdentifierType == "" { + return errors.New("identifier_type must be set") + } else if e.Namespace == "" { + return errors.New("namespace must be set") + } + return nil +} + type TimestampSource string const ( @@ -86,11 +161,12 @@ const ( // InputClientConfig a configuration for the input client type InputClientConfig struct { opcua.OpcUAClientConfig - MetricName string `toml:"name"` - Timestamp TimestampSource `toml:"timestamp"` - TimestampFormat string `toml:"timestamp_format"` - RootNodes []NodeSettings `toml:"nodes"` - Groups []NodeGroupSettings `toml:"group"` + MetricName string `toml:"name"` + Timestamp TimestampSource `toml:"timestamp"` + TimestampFormat string `toml:"timestamp_format"` + RootNodes []NodeSettings `toml:"nodes"` + Groups []NodeGroupSettings `toml:"group"` + EventGroups []EventGroupSettings `toml:"events"` } func (o *InputClientConfig) Validate() error { @@ -107,8 +183,8 @@ func (o *InputClientConfig) Validate() error { o.TimestampFormat = time.RFC3339Nano } - if len(o.Groups) == 0 && len(o.RootNodes) == 0 { - return errors.New("no groups or root nodes provided to gather from") + if len(o.Groups) == 0 && len(o.RootNodes) == 0 && o.EventGroups == nil { + return errors.New("no groups, root nodes or events provided to gather from") } for _, group := range o.Groups { if len(group.Nodes) == 0 { @@ -124,6 +200,15 @@ func (o *InputClientConfig) CreateInputClient(log telegraf.Logger) (*OpcUAInputC return nil, err } + if o.EventGroups != nil { + for _, eventGroup := range o.EventGroups { + eventGroup.UpdateNodeIDSettings() + if err := eventGroup.Validate(); err != nil { + return nil, fmt.Errorf("invalid event_settings: %w", err) + } + } + } + log.Debug("Initialising OpcUAInputClient") opcClient, err := o.OpcUAClientConfig.CreateClient(log) if err != nil { @@ -134,6 +219,7 @@ func (o *InputClientConfig) CreateInputClient(log telegraf.Logger) (*OpcUAInputC OpcUAClient: opcClient, Log: log, Config: *o, + EventGroups: o.EventGroups, } log.Debug("Initialising node to metric mapping") @@ -185,6 +271,15 @@ func NewNodeMetricMapping(metricName string, node NodeSettings, groupTags map[st }, nil } +type EventNodeMetricMapping struct { + NodeID *ua.NodeID + SamplingInterval *config.Duration + QueueSize *uint32 + EventTypeNode *ua.NodeID + SourceNames []string + Fields []string +} + // NodeValue The received value for a node type NodeValue struct { TagName string @@ -203,9 +298,11 @@ type OpcUAInputClient struct { Config InputClientConfig Log telegraf.Logger - NodeMetricMapping []NodeMetricMapping - NodeIDs []*ua.NodeID - LastReceivedData []NodeValue + NodeMetricMapping []NodeMetricMapping + NodeIDs []*ua.NodeID + LastReceivedData []NodeValue + EventGroups []EventGroupSettings + EventNodeMetricMapping []EventNodeMetricMapping } // Stop the connection to the client @@ -381,6 +478,33 @@ func (o *OpcUAInputClient) InitNodeIDs() error { return nil } +func (o *OpcUAInputClient) InitEventNodeIDs() error { + for _, eventSetting := range o.EventGroups { + eid, err := ua.ParseNodeID(eventSetting.EventTypeNode.NodeID()) + if err != nil { + return err + } + for _, node := range eventSetting.NodeIDSettings { + nid, err := ua.ParseNodeID(node.NodeID()) + + if err != nil { + return err + } + nmm := EventNodeMetricMapping{ + NodeID: nid, + SamplingInterval: &eventSetting.SamplingInterval, + QueueSize: &eventSetting.QueueSize, + EventTypeNode: eid, + SourceNames: eventSetting.SourceNames, + Fields: eventSetting.Fields, + } + o.EventNodeMetricMapping = append(o.EventNodeMetricMapping, nmm) + } + } + + return nil +} + func (o *OpcUAInputClient) initLastReceivedValues() { o.LastReceivedData = make([]NodeValue, len(o.NodeMetricMapping)) for nodeIdx, nmm := range o.NodeMetricMapping { @@ -448,3 +572,151 @@ func (o *OpcUAInputClient) MetricForNode(nodeIdx int) telegraf.Metric { return metric.New(nmm.metricName, tags, fields, t) } + +func (o *OpcUAInputClient) MetricForEvent(nodeIdx int, event *ua.EventFieldList) telegraf.Metric { + node := o.EventNodeMetricMapping[nodeIdx] + fields := make(map[string]interface{}, len(event.EventFields)) + for i, field := range event.EventFields { + name := node.Fields[i] + value := field.Value() + + if value == nil { + o.Log.Warnf("Field %s has no value", name) + continue + } + + switch v := value.(type) { + case *ua.LocalizedText: + fields[name] = v.Text + case time.Time: + fields[name] = v.Format(time.RFC3339) + default: + fields[name] = v + } + } + tags := map[string]string{ + "node_id": node.NodeID.String(), + "source": o.Config.Endpoint, + } + var t time.Time + switch o.Config.Timestamp { + case TimestampSourceServer: + t = o.LastReceivedData[nodeIdx].ServerTime + case TimestampSourceSource: + t = o.LastReceivedData[nodeIdx].SourceTime + default: + t = time.Now() + } + + return metric.New("opcua_event", tags, fields, t) +} + +// Creation of event filter for event streaming +func (node *EventNodeMetricMapping) CreateEventFilter() (*ua.ExtensionObject, error) { + selects, err := node.createSelectClauses() + if err != nil { + return nil, err + } + wheres, err := node.createWhereClauses() + if err != nil { + return nil, err + } + return &ua.ExtensionObject{ + EncodingMask: ua.ExtensionObjectBinary, + TypeID: &ua.ExpandedNodeID{NodeID: ua.NewNumericNodeID(0, id.EventFilter_Encoding_DefaultBinary)}, + Value: ua.EventFilter{ + SelectClauses: selects, + WhereClause: wheres, + }, + }, nil +} + +func (node *EventNodeMetricMapping) createSelectClauses() ([]*ua.SimpleAttributeOperand, error) { + selects := make([]*ua.SimpleAttributeOperand, len(node.Fields)) + typeDefinition, err := node.determineNodeIDType() + if err != nil { + return nil, err + } + for i, name := range node.Fields { + selects[i] = &ua.SimpleAttributeOperand{ + TypeDefinitionID: typeDefinition, + BrowsePath: []*ua.QualifiedName{{NamespaceIndex: 0, Name: name}}, + AttributeID: ua.AttributeIDValue, + } + } + return selects, nil +} + +func (node *EventNodeMetricMapping) createWhereClauses() (*ua.ContentFilter, error) { + if len(node.SourceNames) == 0 { + return &ua.ContentFilter{ + Elements: make([]*ua.ContentFilterElement, 0), + }, nil + } + operands := make([]*ua.ExtensionObject, 0) + for _, sourceName := range node.SourceNames { + literalOperand := &ua.ExtensionObject{ + EncodingMask: 1, + TypeID: &ua.ExpandedNodeID{ + NodeID: ua.NewNumericNodeID(0, id.LiteralOperand_Encoding_DefaultBinary), + }, + Value: ua.LiteralOperand{ + Value: ua.MustVariant(sourceName), + }, + } + operands = append(operands, literalOperand) + } + + typeDefinition, err := node.determineNodeIDType() + if err != nil { + return nil, err + } + + attributeOperand := &ua.ExtensionObject{ + EncodingMask: ua.ExtensionObjectBinary, + TypeID: &ua.ExpandedNodeID{ + NodeID: ua.NewNumericNodeID(0, id.SimpleAttributeOperand_Encoding_DefaultBinary), + }, + Value: &ua.SimpleAttributeOperand{ + TypeDefinitionID: typeDefinition, + BrowsePath: []*ua.QualifiedName{ + {NamespaceIndex: 0, Name: "SourceName"}, + }, + AttributeID: ua.AttributeIDValue, + }, + } + + filterElement := &ua.ContentFilterElement{ + FilterOperator: ua.FilterOperatorInList, + FilterOperands: append([]*ua.ExtensionObject{attributeOperand}, operands...), + } + + wheres := &ua.ContentFilter{ + Elements: []*ua.ContentFilterElement{filterElement}, + } + + return wheres, nil +} + +func (node *EventNodeMetricMapping) determineNodeIDType() (*ua.NodeID, error) { + switch node.EventTypeNode.Type() { + case ua.NodeIDTypeGUID: + return ua.NewGUIDNodeID(node.EventTypeNode.Namespace(), node.EventTypeNode.StringID()), nil + case ua.NodeIDTypeString: + return ua.NewStringNodeID(node.EventTypeNode.Namespace(), node.EventTypeNode.StringID()), nil + case ua.NodeIDTypeByteString: + return ua.NewByteStringNodeID(node.EventTypeNode.Namespace(), []byte(node.EventTypeNode.StringID())), nil + case ua.NodeIDTypeTwoByte: + nodeID := node.EventTypeNode.IntID() + if nodeID > 255 { + return nil, fmt.Errorf("twoByte EventType requires a value in the range 0-255, got %d", nodeID) + } + return ua.NewTwoByteNodeID(uint8(node.EventTypeNode.IntID())), nil + case ua.NodeIDTypeFourByte: + return ua.NewFourByteNodeID(uint8(node.EventTypeNode.Namespace()), uint16(node.EventTypeNode.IntID())), nil + case ua.NodeIDTypeNumeric: + return ua.NewNumericNodeID(node.EventTypeNode.Namespace(), node.EventTypeNode.IntID()), nil + default: + return nil, fmt.Errorf("unsupported NodeID type: %v", node.EventTypeNode.String()) + } +} diff --git a/plugins/inputs/opcua_listener/README.md b/plugins/inputs/opcua_listener/README.md index 37bbef5e3..c4cfe7238 100644 --- a/plugins/inputs/opcua_listener/README.md +++ b/plugins/inputs/opcua_listener/README.md @@ -1,9 +1,13 @@ # OPC UA Client Listener Input Plugin -The `opcua_listener` plugin subscribes to data from OPC UA Server devices. +This service plugin subscribes to data and events from an [OPC UA][opcua] +erver. -Telegraf minimum version: Telegraf 1.25 -Plugin minimum tested version: 1.25 +⭐ Telegraf v1.25.0 +🏷️ network +💻 linux, windows + +[opcua]: https://opcfoundation.org/ ## Service Input @@ -250,17 +254,42 @@ to use them. # deadband_value = 0.0 # + ## Multiple event groups are allowed. + # [[inputs.opcua_listener.events]] + # ## Polling interval for data collection + # # sampling_interval = "10s" + # ## Size of the notification queue + # # queue_size = 10 + # ## Node parameter defaults for node definitions below + # # namespace = "" + # # identifier_type = "" + # ## Specifies OPCUA Event sources to filter on + # # source_names = ["SourceName1", "SourceName2"] + # ## Fields to capture from event notifications + # fields = ["Severity", "Message", "Time"] + # + # ## Type or level of events to capture from the monitored nodes. + # [inputs.opcua_listener.events.event_type_node] + # namespace = "" + # identifier_type = "" + # identifier = "" + # + # ## Nodes to monitor for event notifications associated with the defined + # ## event type + # [[inputs.opcua_listener.events.node_ids]] + # namespace = "" + # identifier_type = "" + # identifier = "" + ## Enable workarounds required by some devices to work correctly # [inputs.opcua_listener.workarounds] - ## Set additional valid status codes, StatusOK (0x0) is always considered valid - # additional_valid_status_codes = ["0xC0"] - - # [inputs.opcua_listener.request_workarounds] - ## Use unregistered reads instead of registered reads - # use_unregistered_reads = false + # ## Set additional valid status codes, StatusOK (0x0) is always considered valid + # # additional_valid_status_codes = ["0xC0"] + # ## Use unregistered reads instead of registered reads + # # use_unregistered_reads = false ``` -## Node Configuration +### Node Configuration An OPC UA node ID may resemble: "ns=3;s=Temperature". In this example: @@ -286,7 +315,7 @@ produces a metric like this: opcua,id=ns\=3;s\=Temperature temp=79.0,Quality="OK (0x0)",DataType="Float" 1597820490000000000 ``` -## Group Configuration +#### Group Configuration Groups can set default values for the namespace, identifier type, tags settings and sampling interval. The default values apply to all the @@ -342,16 +371,68 @@ This example group configuration has three groups with two nodes each: ] ``` -## Connection Service +### Event Configuration -This plugin subscribes to the specified nodes to receive data from -the OPC server. The updates are received at most as fast as the -`subscription_interval`. +Defining events allows subscribing to events with the specific node IDs and +filtering criteria based on the event type and source. The plugin subscribes to +the specified `event_type` Node-IDs and collects events that meet the defined +criteria. The `node_ids` parameter specifies the nodes to monitor for events +(monitored items). However, the actual subscription is based on the +`event_type_node` determining the events to capture. + +#### Event Group Configuration + +You can define multiple groups for the event streaming to subscribe to different +event types. Each group allows to specify defaults for `namespace` and +`identifier_type` being overwritten by settings in `node_ids`. The group +defaults for node information will not affected the `event_type_node` setting +and all paramters must be set in this section. + +This example group configuration shows how to use group settings: + +```toml +# Group 1 +[[inputs.opcua_listener.events]] + sampling_interval = "10s" + queue_size = "100" + source_names = ["SourceName1", "SourceName2"] + fields = ["Severity", "Message", "Time"] + + [inputs.opcua_listener.events.event_type_node] + namespace = "1" + identifier_type = "i" + identifier = "1234" + + [[inputs.opcua_listener.events.node_ids]] + namespace = "2" + identifier_type = "i" + identifier = "2345" + +# Group 2 +[[inputs.opcua_listener.events]] + sampling_interval = "10s" + queue_size = "100" + namespace = "3" + identifier_type = "s" + source_names = ["SourceName1", "SourceName2"] + fields = ["Severity", "Message", "Time"] + + [inputs.opcua_listener.events.event_type_node] + namespace = "1" + identifier_type = "i" + identifier = "5678" + + node_ids = [ + {identifier="Sensor1"}, // default values will be used for namespace and identifier_type + {namespace="2", identifier="TemperatureSensor"}, // default values will be used for identifier_type + {namespace="5", identifier_type="i", identifier="2002"} // no default values will be used + ] +``` ## Metrics -The metrics collected by this input plugin will depend on the -configured `nodes` and `group`. +The metrics collected by this input plugin will depend on the configured +`nodes`, `events` and the corresponding groups. ## Example Output diff --git a/plugins/inputs/opcua_listener/opcua_listener.go b/plugins/inputs/opcua_listener/opcua_listener.go index 6085c90c9..6fd9fca36 100644 --- a/plugins/inputs/opcua_listener/opcua_listener.go +++ b/plugins/inputs/opcua_listener/opcua_listener.go @@ -64,7 +64,7 @@ func (o *OpcUaListener) Stop() { func (o *OpcUaListener) connect(acc telegraf.Accumulator) error { ctx := context.Background() - ch, err := o.client.startStreamValues(ctx) + ch, err := o.client.startMonitoring(ctx) if err != nil { return err } diff --git a/plugins/inputs/opcua_listener/opcua_listener_test.go b/plugins/inputs/opcua_listener/opcua_listener_test.go index 7fc29ae03..4937155f7 100644 --- a/plugins/inputs/opcua_listener/opcua_listener_test.go +++ b/plugins/inputs/opcua_listener/opcua_listener_test.go @@ -193,7 +193,7 @@ func TestSubscribeClientIntegration(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), time.Second*10) defer cancel() - res, err := o.startStreamValues(ctx) + res, err := o.startMonitoring(ctx) require.Equal(t, opcua.Connected, o.State()) require.NoError(t, err) @@ -336,7 +336,7 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), time.Second*10) defer cancel() - res, err := o.startStreamValues(ctx) + res, err := o.startMonitoring(ctx) require.NoError(t, err) for { @@ -814,3 +814,482 @@ func TestSubscribeClientConfigValidMonitoringParams(t *testing.T) { ), }, subClient.monitoredItemsReqs[0].RequestedParameters) } + +func TestSubscribeClientConfigValidMonitoringAndEventParams(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4840", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + + var queueSize uint32 = 10 + discardOldest := true + deadbandValue := 10.0 + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, input.NodeSettings{ + FieldName: "foo", + Namespace: "3", + Identifier: "1", + IdentifierType: "i", + MonitoringParams: input.MonitoringParameters{ + SamplingInterval: 50000000, + QueueSize: &queueSize, + DiscardOldest: &discardOldest, + DataChangeFilter: &input.DataChangeFilter{ + Trigger: "Status", + DeadbandType: "Absolute", + DeadbandValue: &deadbandValue, + }, + }, + }) + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + SamplingInterval: 1.0, + EventTypeNode: input.EventNodeSettings{ + Namespace: "3", + IdentifierType: "i", + Identifier: "1234", + }, + Namespace: "3", + IdentifierType: "i", + NodeIDSettings: []input.EventNodeSettings{ + { + Namespace: "3", + IdentifierType: "i", + Identifier: "12", + }, + { + Namespace: "3", + IdentifierType: "i", + Identifier: "13", + }, + }, + SourceNames: []string{"SensorXYZ"}, + Fields: []string{"PressureValue"}, + }) + + subClient, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.NoError(t, err) + require.Equal(t, &ua.MonitoringParameters{ + SamplingInterval: 50, + QueueSize: queueSize, + DiscardOldest: discardOldest, + Filter: ua.NewExtensionObject( + &ua.DataChangeFilter{ + Trigger: ua.DataChangeTriggerStatus, + DeadbandType: uint32(ua.DeadbandTypeAbsolute), + DeadbandValue: deadbandValue, + }, + ), + }, subClient.monitoredItemsReqs[0].RequestedParameters) +} + +func TestSubscribeClientConfigValidEventStreamingParams(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + SamplingInterval: 1.0, + EventTypeNode: input.EventNodeSettings{ + Namespace: "3", + IdentifierType: "i", + Identifier: "1234", + }, + Namespace: "3", + IdentifierType: "i", + NodeIDSettings: []input.EventNodeSettings{ + { + Namespace: "3", + IdentifierType: "i", + Identifier: "12", + }, + { + Namespace: "3", + IdentifierType: "i", + Identifier: "13", + }, + }, + SourceNames: []string{"SensorXYZ"}, + Fields: []string{"PressureValue"}, + }) + + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.NoError(t, err) +} + +func TestSubscribeClientConfigEventInputMissingSamplingInterval(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + EventTypeNode: input.EventNodeSettings{ + Namespace: "3", + IdentifierType: "i", + Identifier: "1234", + }, + Namespace: "3", + IdentifierType: "i", + NodeIDSettings: []input.EventNodeSettings{ + { + Namespace: "3", + IdentifierType: "i", + Identifier: "12", + }, + }, + SourceNames: []string{"SensorXYZ"}, + Fields: []string{"PressureValue"}, + }) + + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.NoError(t, err) +} + +func TestSubscribeClientConfigEventInputMissingEventType(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + SamplingInterval: 1.0, + Namespace: "3", + IdentifierType: "i", + NodeIDSettings: []input.EventNodeSettings{ + { + Namespace: "3", + IdentifierType: "i", + Identifier: "12", + }, + }, + SourceNames: []string{"SensorXYZ"}, + Fields: []string{"PressureValue"}, + }) + + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "invalid event_type_node_settings") +} + +func TestSubscribeClientConfigEventMissingEventTypeNamespace(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + SamplingInterval: 1.0, + EventTypeNode: input.EventNodeSettings{ + IdentifierType: "i", + Identifier: "1234", + }, + Namespace: "3", + IdentifierType: "i", + NodeIDSettings: []input.EventNodeSettings{ + { + Namespace: "3", + IdentifierType: "i", + Identifier: "12", + }, + }, + SourceNames: []string{"SensorXYZ"}, + Fields: []string{"PressureValue"}, + }) + + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "namespace must be set") +} + +func TestSubscribeClientConfigEventMissingEventTypeIdentifierType(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + SamplingInterval: 1.0, + EventTypeNode: input.EventNodeSettings{ + Namespace: "3", + Identifier: "1234", + }, + Namespace: "3", + IdentifierType: "i", + NodeIDSettings: []input.EventNodeSettings{ + { + Namespace: "3", + IdentifierType: "i", + Identifier: "12", + }, + }, + SourceNames: []string{"SensorXYZ"}, + Fields: []string{"PressureValue"}, + }) + + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "identifier_type must be set") +} + +func TestSubscribeClientConfigEventMissingEventTypeIdentifier(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + SamplingInterval: 1.0, + EventTypeNode: input.EventNodeSettings{ + Namespace: "3", + IdentifierType: "i", + }, + Namespace: "3", + IdentifierType: "i", + NodeIDSettings: []input.EventNodeSettings{ + { + Namespace: "3", + IdentifierType: "i", + Identifier: "12", + }, + }, + SourceNames: []string{"SensorXYZ"}, + Fields: []string{"PressureValue"}, + }) + + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "identifier must be set") +} + +func TestSubscribeClientConfigEventInputMissingNodeIDs(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + EventTypeNode: input.EventNodeSettings{ + Namespace: "3", + IdentifierType: "i", + Identifier: "1234", + }, + Namespace: "3", + IdentifierType: "i", + SourceNames: []string{"SensorXYZ"}, + Fields: []string{"PressureValue"}, + }) + + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "at least one node_id must be specified") +} + +func TestSubscribeClientConfigEventInputMissingFields(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + SamplingInterval: 1.0, + EventTypeNode: input.EventNodeSettings{ + Namespace: "3", + IdentifierType: "i", + Identifier: "1234", + }, + Namespace: "3", + IdentifierType: "i", + NodeIDSettings: []input.EventNodeSettings{ + { + Namespace: "3", + IdentifierType: "i", + Identifier: "12", + }, + }, + SourceNames: []string{"SensorXYZ"}, + }) + + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "at least one Field must be specified") +} + +func TestSubscribeClientConfigEventInputInvalidFields(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + SamplingInterval: 1.0, + EventTypeNode: input.EventNodeSettings{ + Namespace: "3", + IdentifierType: "i", + Identifier: "1234", + }, + Namespace: "3", + IdentifierType: "i", + NodeIDSettings: []input.EventNodeSettings{ + { + Namespace: "3", + IdentifierType: "i", + Identifier: "12", + }, + }, + SourceNames: []string{"SensorXYZ"}, + Fields: []string{"Fieldname", ""}, + }) + + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.ErrorContains(t, err, "empty field name in fields stanza") +} + +func TestSubscribeClientConfigValidEventStreamingDefaultNodeParams(t *testing.T) { + subscribeConfig := subscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://opcua.demo-this.com:62544/Quickstarts/AlarmConditionServer", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + EventGroups: make([]input.EventGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + subscribeConfig.EventGroups = append(subscribeConfig.EventGroups, input.EventGroupSettings{ + SamplingInterval: 1.0, + EventTypeNode: input.EventNodeSettings{ + Namespace: "3", + IdentifierType: "i", + Identifier: "1234", + }, + Namespace: "3", + IdentifierType: "i", + NodeIDSettings: []input.EventNodeSettings{ + { + Identifier: "12", + }, + }, + SourceNames: []string{"SensorXYZ"}, + Fields: []string{"PressureValue"}, + }) + + _, err := subscribeConfig.createSubscribeClient(testutil.Logger{}) + require.NoError(t, err) + + o := subscribeConfig.InputClientConfig.EventGroups[0].NodeIDSettings[0] + require.Equal(t, "i", o.IdentifierType) + require.Equal(t, "3", o.Namespace) +} diff --git a/plugins/inputs/opcua_listener/sample.conf b/plugins/inputs/opcua_listener/sample.conf index bc6ff6019..2b820c49f 100644 --- a/plugins/inputs/opcua_listener/sample.conf +++ b/plugins/inputs/opcua_listener/sample.conf @@ -211,11 +211,36 @@ # deadband_value = 0.0 # + ## Multiple event groups are allowed. + # [[inputs.opcua_listener.events]] + # ## Polling interval for data collection + # # sampling_interval = "10s" + # ## Size of the notification queue + # # queue_size = 10 + # ## Node parameter defaults for node definitions below + # # namespace = "" + # # identifier_type = "" + # ## Specifies OPCUA Event sources to filter on + # # source_names = ["SourceName1", "SourceName2"] + # ## Fields to capture from event notifications + # fields = ["Severity", "Message", "Time"] + # + # ## Type or level of events to capture from the monitored nodes. + # [inputs.opcua_listener.events.event_type_node] + # namespace = "" + # identifier_type = "" + # identifier = "" + # + # ## Nodes to monitor for event notifications associated with the defined + # ## event type + # [[inputs.opcua_listener.events.node_ids]] + # namespace = "" + # identifier_type = "" + # identifier = "" + ## Enable workarounds required by some devices to work correctly # [inputs.opcua_listener.workarounds] - ## Set additional valid status codes, StatusOK (0x0) is always considered valid - # additional_valid_status_codes = ["0xC0"] - - # [inputs.opcua_listener.request_workarounds] - ## Use unregistered reads instead of registered reads - # use_unregistered_reads = false + # ## Set additional valid status codes, StatusOK (0x0) is always considered valid + # # additional_valid_status_codes = ["0xC0"] + # ## Use unregistered reads instead of registered reads + # # use_unregistered_reads = false diff --git a/plugins/inputs/opcua_listener/subscribe_client.go b/plugins/inputs/opcua_listener/subscribe_client.go index 1f70f006e..572f17f94 100644 --- a/plugins/inputs/opcua_listener/subscribe_client.go +++ b/plugins/inputs/opcua_listener/subscribe_client.go @@ -28,6 +28,7 @@ type subscribeClient struct { sub *opcua.Subscription monitoredItemsReqs []*ua.MonitoredItemCreateRequest + eventItemsReqs []*ua.MonitoredItemCreateRequest dataNotifications chan *opcua.PublishNotificationData metrics chan telegraf.Metric @@ -91,11 +92,17 @@ func (sc *subscribeClientConfig) createSubscribeClient(log telegraf.Logger) (*su return nil, err } + if err := client.InitEventNodeIDs(); err != nil { + return nil, err + } + processingCtx, processingCancel := context.WithCancel(context.Background()) + subClient := &subscribeClient{ OpcUAInputClient: client, Config: *sc, monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeIDs)), + eventItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.EventNodeMetricMapping)), // 100 was chosen to make sure that the channels will not block when multiple changes come in at the same time. // The channel size should be increased if reports come in on Telegraf blocking when many changes come in at // the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval. @@ -115,6 +122,23 @@ func (sc *subscribeClientConfig) createSubscribeClient(log telegraf.Logger) (*su subClient.monitoredItemsReqs[i] = req } + log.Debugf("Creating event streaming items") + for i, node := range client.EventNodeMetricMapping { + req := opcua.NewMonitoredItemCreateRequestWithDefaults(node.NodeID, ua.AttributeIDEventNotifier, uint32(i)) + if node.SamplingInterval != nil { + req.RequestedParameters.SamplingInterval = float64(time.Duration(*node.SamplingInterval) / time.Millisecond) + } + if node.QueueSize != nil { + req.RequestedParameters.QueueSize = *node.QueueSize + } + + filterExtObj, err := node.CreateEventFilter() + if err != nil { + return nil, fmt.Errorf("failed to create event filter: %w", err) + } + req.RequestedParameters.Filter = filterExtObj + subClient.eventItemsReqs[i] = req + } return subClient, nil } @@ -152,7 +176,7 @@ func (o *subscribeClient) stop(ctx context.Context) <-chan struct{} { return closing } -func (o *subscribeClient) startStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) { +func (o *subscribeClient) startMonitoring(ctx context.Context) (<-chan telegraf.Metric, error) { err := o.connect() if err != nil { switch o.Config.ConnectFailBehavior { @@ -166,23 +190,38 @@ func (o *subscribeClient) startStreamValues(ctx context.Context) (<-chan telegra return nil, err } - resp, err := o.sub.Monitor(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...) - if err != nil { - return nil, fmt.Errorf("failed to start monitoring items: %w", err) - } - o.Log.Debug("Monitoring items") + if len(o.monitoredItemsReqs) != 0 { + resp, err := o.sub.Monitor(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...) + if err != nil { + return nil, fmt.Errorf("failed to start monitoring items: %w", err) + } + o.Log.Debug("Monitoring items") - for idx, res := range resp.Results { - if !o.StatusCodeOK(res.StatusCode) { - // Verify NodeIDs array has been built before trying to get item; otherwise show '?' for node id - if len(o.OpcUAInputClient.NodeIDs) > idx { - o.Log.Debugf("Failed to create monitored item for node %v (%v)", - o.OpcUAInputClient.NodeMetricMapping[idx].Tag.FieldName, o.OpcUAInputClient.NodeIDs[idx].String()) - } else { - o.Log.Debugf("Failed to create monitored item for node %v (%v)", o.OpcUAInputClient.NodeMetricMapping[idx].Tag.FieldName, '?') + for idx, res := range resp.Results { + if !o.StatusCodeOK(res.StatusCode) { + // Verify NodeIDs array has been built before trying to get item; otherwise show '?' for node id + if len(o.OpcUAInputClient.NodeIDs) > idx { + o.Log.Debugf("Failed to create monitored item for node %v (%v)", + o.OpcUAInputClient.NodeMetricMapping[idx].Tag.FieldName, o.OpcUAInputClient.NodeIDs[idx].String()) + } else { + o.Log.Debugf("Failed to create monitored item for node %v (%v)", o.OpcUAInputClient.NodeMetricMapping[idx].Tag.FieldName, '?') + } + return nil, fmt.Errorf("creating monitored item failed with status code: %w", res.StatusCode) } + } + } - return nil, fmt.Errorf("creating monitored item failed with status code: %w", res.StatusCode) + if len(o.eventItemsReqs) != 0 { + resp, err := o.sub.Monitor(ctx, ua.TimestampsToReturnBoth, o.eventItemsReqs...) + if err != nil { + return nil, fmt.Errorf("failed to start monitoring event stream: %w", err) + } + o.Log.Debug("Monitoring events") + + for _, res := range resp.Results { + if !o.StatusCodeOK(res.StatusCode) { + return nil, fmt.Errorf("creating monitored event streaming item failed with status code: %w", res.StatusCode) + } } } @@ -207,6 +246,10 @@ func (o *subscribeClient) processReceivedNotifications() { o.Log.Error(res.Error) continue } + if res.Value == nil { + o.Log.Error("Received nil notification") + return + } switch notif := res.Value.(type) { case *ua.DataChangeNotification: @@ -220,7 +263,13 @@ func (o *subscribeClient) processReceivedNotifications() { o.NodeIDs[i].String(), oldValue, o.LastReceivedData[i].Value) o.metrics <- o.MetricForNode(i) } - + case *ua.EventNotificationList: + o.Log.Debugf("Processing event notification with %d events", len(notif.Events)) + // It is assumed the events are ordered chronologically + for _, event := range notif.Events { + i := int(event.ClientHandle) + o.metrics <- o.MetricForEvent(i, event) + } default: o.Log.Warnf("Received notification has unexpected type %s", reflect.TypeOf(res.Value)) }