diff --git a/plugins/common/opcua/client.go b/plugins/common/opcua/client.go new file mode 100644 index 000000000..ea838b50e --- /dev/null +++ b/plugins/common/opcua/client.go @@ -0,0 +1,214 @@ +package opcua + +import ( + "context" + "fmt" + "github.com/gopcua/opcua" + "github.com/gopcua/opcua/ua" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "net/url" + "strconv" + "time" +) + +type OpcUAWorkarounds struct { + AdditionalValidStatusCodes []string `toml:"additional_valid_status_codes"` +} + +type OpcUAClientConfig struct { + Endpoint string `toml:"endpoint"` + SecurityPolicy string `toml:"security_policy"` + SecurityMode string `toml:"security_mode"` + Certificate string `toml:"certificate"` + PrivateKey string `toml:"private_key"` + Username string `toml:"username"` + Password string `toml:"password"` + AuthMethod string `toml:"auth_method"` + ConnectTimeout config.Duration `toml:"connect_timeout"` + RequestTimeout config.Duration `toml:"request_timeout"` + + Workarounds OpcUAWorkarounds `toml:"workarounds"` +} + +func (o *OpcUAClientConfig) Validate() error { + return o.validateEndpoint() +} + +func (o *OpcUAClientConfig) validateEndpoint() error { + if o.Endpoint == "" { + return fmt.Errorf("endpoint url is empty") + } + + _, err := url.Parse(o.Endpoint) + if err != nil { + return fmt.Errorf("endpoint url is invalid") + } + + switch o.SecurityPolicy { + case "None", "Basic128Rsa15", "Basic256", "Basic256Sha256", "auto": + default: + return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityPolicy, o.Endpoint) + } + + switch o.SecurityMode { + case "None", "Sign", "SignAndEncrypt", "auto": + default: + return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityMode, o.Endpoint) + } + return nil +} + +func (o *OpcUAClientConfig) CreateClient(log telegraf.Logger) (*OpcUAClient, error) { + err := o.Validate() + if err != nil { + return nil, err + } + + c := &OpcUAClient{ + Config: o, + Log: log, + } + c.Log.Debug("Initialising OpcUAClient") + c.State = Disconnected + + err = c.setupWorkarounds() + return c, err +} + +// ConnectionState used for constants +type ConnectionState int + +const ( + // Disconnected constant State 0 + Disconnected ConnectionState = iota + // Connecting constant State 1 + Connecting + // Connected constant State 2 + Connected +) + +type OpcUAClient struct { + Config *OpcUAClientConfig + Log telegraf.Logger + + State ConnectionState + Client *opcua.Client + + opts []opcua.Option + codes []ua.StatusCode +} + +func (o *OpcUAClient) Init() error { + return o.setupOptions() +} + +// / setupOptions read the endpoints from the specified server and setup all authentication +func (o *OpcUAClient) setupOptions() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout)) + defer cancel() + // Get a list of the endpoints for our target server + endpoints, err := opcua.GetEndpoints(ctx, o.Config.Endpoint) + if err != nil { + return err + } + + if o.Config.Certificate == "" && o.Config.PrivateKey == "" { + if o.Config.SecurityPolicy != "None" || o.Config.SecurityMode != "None" { + o.Log.Debug("Generating self-signed certificate") + cert, privateKey, err := generateCert("urn:telegraf:gopcua:client", 2048, + o.Config.Certificate, o.Config.PrivateKey, 365*24*time.Hour) + if err != nil { + return err + } + + o.Config.Certificate = cert + o.Config.PrivateKey = privateKey + } + } + + o.Log.Debug("Configuring OPC UA connection options") + o.opts, err = o.generateClientOpts(endpoints) + + return err +} + +func (o *OpcUAClient) setupWorkarounds() error { + o.codes = []ua.StatusCode{ua.StatusOK} + for _, c := range o.Config.Workarounds.AdditionalValidStatusCodes { + val, err := strconv.ParseInt(c, 0, 32) // setting 32 bits to allow for safe conversion + if err != nil { + return err + } + o.codes = append(o.codes, ua.StatusCode(uint32(val))) + } + + return nil +} + +func (o *OpcUAClient) StatusCodeOK(code ua.StatusCode) bool { + for _, val := range o.codes { + if val == code { + return true + } + } + return false +} + +// Connect to an OPC UA device +func (o *OpcUAClient) Connect() error { + o.Log.Debug("Connecting OPC UA Client to server") + u, err := url.Parse(o.Config.Endpoint) + if err != nil { + return err + } + + switch u.Scheme { + case "opc.tcp": + o.State = Connecting + + if o.Client != nil { + o.Log.Warnf("Closing connection due to Connect called while already instantiated", u) + if err := o.Client.Close(); err != nil { + // Only log the error but to not bail-out here as this prevents + // reconnections for multiple parties (see e.g. #9523). + o.Log.Errorf("Closing connection failed: %v", err) + } + } + + o.Client = opcua.NewClient(o.Config.Endpoint, o.opts...) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout)) + defer cancel() + if err := o.Client.Connect(ctx); err != nil { + o.State = Disconnected + return fmt.Errorf("error in Client Connection: %s", err) + } + + o.State = Connected + o.Log.Debug("Connected to OPC UA Server") + + default: + return fmt.Errorf("unsupported scheme %q in endpoint. Expected opc.tcp", u.Scheme) + } + return nil +} + +func (o *OpcUAClient) Disconnect(ctx context.Context) error { + o.Log.Debug("Disconnecting from OPC UA Server") + u, err := url.Parse(o.Config.Endpoint) + if err != nil { + return err + } + + switch u.Scheme { + case "opc.tcp": + o.State = Disconnected + // We can't do anything about failing to close a connection + //nolint:errcheck,revive + err := o.Client.CloseWithContext(ctx) + o.Client = nil + return err + default: + return fmt.Errorf("invalid controller") + } +} diff --git a/plugins/common/opcua/client_test.go b/plugins/common/opcua/client_test.go new file mode 100644 index 000000000..cf6cd277f --- /dev/null +++ b/plugins/common/opcua/client_test.go @@ -0,0 +1,31 @@ +package opcua + +import ( + "github.com/gopcua/opcua/ua" + "github.com/stretchr/testify/require" + "testing" +) + +func TestSetupWorkarounds(t *testing.T) { + o := OpcUAClient{ + Config: &OpcUAClientConfig{ + Workarounds: OpcUAWorkarounds{ + AdditionalValidStatusCodes: []string{"0xC0", "0x00AA0000"}, + }, + }, + } + + err := o.setupWorkarounds() + require.NoError(t, err) + + require.Len(t, o.codes, 3) + require.Equal(t, o.codes[0], ua.StatusCode(0)) + require.Equal(t, o.codes[1], ua.StatusCode(192)) + require.Equal(t, o.codes[2], ua.StatusCode(11141120)) +} + +func TestCheckStatusCode(t *testing.T) { + var o OpcUAClient + o.codes = []ua.StatusCode{ua.StatusCode(0), ua.StatusCode(192), ua.StatusCode(11141120)} + require.Equal(t, o.StatusCodeOK(ua.StatusCode(192)), true) +} diff --git a/plugins/common/opcua/input/input_client.go b/plugins/common/opcua/input/input_client.go new file mode 100644 index 000000000..d8c900545 --- /dev/null +++ b/plugins/common/opcua/input/input_client.go @@ -0,0 +1,395 @@ +package input + +import ( + "context" + "fmt" + "github.com/gopcua/opcua/ua" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/common/opcua" + "sort" + "strconv" + "strings" + "time" +) + +// NodeSettings describes how to map from a OPC UA node to a Metric +type NodeSettings struct { + FieldName string `toml:"name"` + Namespace string `toml:"namespace"` + IdentifierType string `toml:"identifier_type"` + Identifier string `toml:"identifier"` + DataType string `toml:"data_type" deprecated:"1.17.0;option is ignored"` + Description string `toml:"description" deprecated:"1.17.0;option is ignored"` + TagsSlice [][]string `toml:"tags" deprecated:"1.25.0;use 'default_tags' instead"` + DefaultTags map[string]string `toml:"default_tags"` +} + +// NodeID returns the OPC UA node id +func (tag *NodeSettings) NodeID() string { + return "ns=" + tag.Namespace + ";" + tag.IdentifierType + "=" + tag.Identifier +} + +// NodeGroupSettings describes a mapping of group of nodes to Metrics +type NodeGroupSettings struct { + MetricName string `toml:"name"` // Overrides plugin's setting + Namespace string `toml:"namespace"` // Can be overridden by node setting + IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting + Nodes []NodeSettings `toml:"nodes"` + TagsSlice [][]string `toml:"tags" deprecated:"1.26.0;use default_tags"` + DefaultTags map[string]string `toml:"default_tags"` +} + +type TimestampSource string + +const ( + TimestampSourceServer TimestampSource = "server" + TimestampSourceSource TimestampSource = "source" + TimestampSourceTelegraf TimestampSource = "gather" +) + +// InputClientConfig a configuration for the input client +type InputClientConfig struct { + opcua.OpcUAClientConfig + MetricName string `toml:"name"` + Timestamp TimestampSource `toml:"timestamp"` + RootNodes []NodeSettings `toml:"nodes"` + Groups []NodeGroupSettings `toml:"group"` +} + +func (o *InputClientConfig) Validate() error { + if o.MetricName == "" { + return fmt.Errorf("metric name is empty") + } + + err := choice.Check(string(o.Timestamp), []string{"", "gather", "server", "source"}) + if err != nil { + return err + } + + return nil +} + +func (o *InputClientConfig) CreateInputClient(log telegraf.Logger) (*OpcUAInputClient, error) { + err := o.Validate() + if err != nil { + return nil, err + } + + log.Debug("Initialising OpcUAInputClient") + opcClient, err := o.OpcUAClientConfig.CreateClient(log) + if err != nil { + return nil, err + } + + c := &OpcUAInputClient{ + OpcUAClient: opcClient, + Log: log, + Config: *o, + } + + log.Debug("Initialising node to metric mapping") + err = c.InitNodeMetricMapping() + if err != nil { + return nil, err + } + + c.initLastReceivedValues() + + err = c.initNodeIDs() + return c, err +} + +// NodeMetricMapping mapping from a single node to a metric +type NodeMetricMapping struct { + Tag NodeSettings + idStr string + metricName string + MetricTags map[string]string +} + +// NewNodeMetricMapping builds a new NodeMetricMapping from the given argument +func NewNodeMetricMapping(metricName string, node NodeSettings, groupTags map[string]string) (*NodeMetricMapping, error) { + mergedTags := make(map[string]string) + for n, t := range groupTags { + mergedTags[n] = t + } + + nodeTags := make(map[string]string) + if len(node.DefaultTags) > 0 { + nodeTags = node.DefaultTags + } else if len(node.TagsSlice) > 0 { + // fixme: once the TagsSlice has been removed (after deprecation), remove this if else logic + var err error + nodeTags, err = tagsSliceToMap(node.TagsSlice) + if err != nil { + return nil, err + } + } + + for n, t := range nodeTags { + mergedTags[n] = t + } + + return &NodeMetricMapping{ + Tag: node, + idStr: node.NodeID(), + metricName: metricName, + MetricTags: mergedTags, + }, nil +} + +// NodeValue The received value for a node +type NodeValue struct { + TagName string + Value interface{} + Quality ua.StatusCode + ServerTime time.Time + SourceTime time.Time + DataType ua.TypeID +} + +// OpcUAInputClient can receive data from an OPC UA server and map it to Metrics. This type does not contain +// logic for actually retrieving data from the server, but is used by other types like ReadClient and +// OpcUAInputSubscribeClient to store data needed to convert node ids to the corresponding metrics. +type OpcUAInputClient struct { + *opcua.OpcUAClient + Config InputClientConfig + Log telegraf.Logger + + NodeMetricMapping []NodeMetricMapping + NodeIDs []*ua.NodeID + LastReceivedData []NodeValue +} + +// Stop the connection to the client +func (o *OpcUAInputClient) Stop(ctx context.Context) <-chan struct{} { + ch := make(chan struct{}) + defer close(ch) + err := o.Disconnect(ctx) + if err != nil { + o.Log.Warn("Disconnecting from server failed with error ", err) + } + + return ch +} + +// metricParts is only used to ensure no duplicate metrics are created +type metricParts struct { + metricName string + fieldName string + tags string // sorted by tag name and in format tag1=value1, tag2=value2 +} + +func newMP(n *NodeMetricMapping) metricParts { + var keys []string + for key := range n.MetricTags { + keys = append(keys, key) + } + sort.Strings(keys) + var sb strings.Builder + for i, key := range keys { + if i != 0 { + // Writes to a string-builder will always succeed + //nolint:errcheck,revive + sb.WriteString(", ") + } + // Writes to a string-builder will always succeed + //nolint:errcheck,revive + sb.WriteString(key) + // Writes to a string-builder will always succeed + //nolint:errcheck,revive + sb.WriteString("=") + // Writes to a string-builder will always succeed + //nolint:errcheck,revive + sb.WriteString(n.MetricTags[key]) + } + x := metricParts{ + metricName: n.metricName, + fieldName: n.Tag.FieldName, + tags: sb.String(), + } + return x +} + +// fixme: once the TagsSlice has been removed (after deprecation), remove this +// tagsSliceToMap takes an array of pairs of strings and creates a map from it +func tagsSliceToMap(tags [][]string) (map[string]string, error) { + m := make(map[string]string) + for i, tag := range tags { + if len(tag) != 2 { + return nil, fmt.Errorf("tag %d needs 2 values, has %d: %v", i+1, len(tag), tag) + } + if tag[0] == "" { + return nil, fmt.Errorf("tag %d has empty name", i+1) + } + if tag[1] == "" { + return nil, fmt.Errorf("tag %d has empty value", i+1) + } + if _, ok := m[tag[0]]; ok { + return nil, fmt.Errorf("tag %d has duplicate key: %v", i+1, tag[0]) + } + m[tag[0]] = tag[1] + } + return m, nil +} + +func validateNodeToAdd(existing map[metricParts]struct{}, nmm *NodeMetricMapping) error { + if nmm.Tag.FieldName == "" { + return fmt.Errorf("empty name in '%s'", nmm.Tag.FieldName) + } + + if len(nmm.Tag.Namespace) == 0 { + return fmt.Errorf("empty node namespace not allowed") + } + + if len(nmm.Tag.Identifier) == 0 { + return fmt.Errorf("empty node identifier not allowed") + } + + mp := newMP(nmm) + if _, exists := existing[mp]; exists { + return fmt.Errorf("name '%s' is duplicated (metric name '%s', tags '%s')", + mp.fieldName, mp.metricName, mp.tags) + } + + switch nmm.Tag.IdentifierType { + case "i": + if _, err := strconv.Atoi(nmm.Tag.Identifier); err != nil { + return fmt.Errorf("identifier type '%s' does not match the type of identifier '%s'", nmm.Tag.IdentifierType, nmm.Tag.Identifier) + } + case "s", "g", "b": + // Valid identifier type - do nothing. + default: + return fmt.Errorf("invalid identifier type '%s' in '%s'", nmm.Tag.IdentifierType, nmm.Tag.FieldName) + } + + existing[mp] = struct{}{} + return nil +} + +// InitNodeMetricMapping builds nodes from the configuration +func (o *OpcUAInputClient) InitNodeMetricMapping() error { + existing := map[metricParts]struct{}{} + for _, node := range o.Config.RootNodes { + nmm, err := NewNodeMetricMapping(o.Config.MetricName, node, make(map[string]string)) + if err != nil { + return err + } + + if err := validateNodeToAdd(existing, nmm); err != nil { + return err + } + o.NodeMetricMapping = append(o.NodeMetricMapping, *nmm) + } + + for _, group := range o.Config.Groups { + if group.MetricName == "" { + group.MetricName = o.Config.MetricName + } + + if len(group.DefaultTags) > 0 && len(group.TagsSlice) > 0 { + o.Log.Warn("Tags found in both `tags` and `default_tags`, only using tags defined in `default_tags`") + } + + groupTags := make(map[string]string) + if len(group.DefaultTags) > 0 { + groupTags = group.DefaultTags + } else if len(group.TagsSlice) > 0 { + // fixme: once the TagsSlice has been removed (after deprecation), remove this if else logic + var err error + groupTags, err = tagsSliceToMap(group.TagsSlice) + if err != nil { + return err + } + } + + for _, node := range group.Nodes { + if node.Namespace == "" { + node.Namespace = group.Namespace + } + if node.IdentifierType == "" { + node.IdentifierType = group.IdentifierType + } + + nmm, err := NewNodeMetricMapping(group.MetricName, node, groupTags) + if err != nil { + return err + } + + if err := validateNodeToAdd(existing, nmm); err != nil { + return err + } + o.NodeMetricMapping = append(o.NodeMetricMapping, *nmm) + } + } + + return nil +} + +func (o *OpcUAInputClient) initNodeIDs() error { + o.NodeIDs = make([]*ua.NodeID, len(o.NodeMetricMapping)) + for i, node := range o.NodeMetricMapping { + nid, err := ua.ParseNodeID(node.Tag.NodeID()) + if err != nil { + return err + } + o.NodeIDs[i] = nid + } + + return nil +} + +func (o *OpcUAInputClient) initLastReceivedValues() { + o.LastReceivedData = make([]NodeValue, len(o.NodeMetricMapping)) + for nodeIdx, nmm := range o.NodeMetricMapping { + o.LastReceivedData[nodeIdx].TagName = nmm.Tag.FieldName + } +} + +func (o *OpcUAInputClient) UpdateNodeValue(nodeIdx int, d *ua.DataValue) { + o.LastReceivedData[nodeIdx].Quality = d.Status + if !o.StatusCodeOK(d.Status) { + o.Log.Errorf("status not OK for node %v: %v", o.NodeMetricMapping[nodeIdx].Tag.FieldName, d.Status) + return + } + + if d.Value != nil { + o.LastReceivedData[nodeIdx].Value = d.Value.Value() + o.LastReceivedData[nodeIdx].DataType = d.Value.Type() + } + o.LastReceivedData[nodeIdx].ServerTime = d.ServerTimestamp + o.LastReceivedData[nodeIdx].SourceTime = d.SourceTimestamp +} + +func (o *OpcUAInputClient) MetricForNode(nodeIdx int) telegraf.Metric { + nmm := &o.NodeMetricMapping[nodeIdx] + fields := make(map[string]interface{}) + tags := map[string]string{ + "id": nmm.idStr, + } + for k, v := range nmm.MetricTags { + tags[k] = v + } + + fields[nmm.Tag.FieldName] = o.LastReceivedData[nodeIdx].Value + fields["Quality"] = strings.TrimSpace(fmt.Sprint(o.LastReceivedData[nodeIdx].Quality)) + if !o.StatusCodeOK(o.LastReceivedData[nodeIdx].Quality) { + mp := newMP(nmm) + o.Log.Debugf("status not OK for node '%s'(metric name '%s', tags '%s')", + mp.fieldName, mp.metricName, mp.tags) + } + + var t time.Time + switch o.Config.Timestamp { + case TimestampSourceServer: + t = o.LastReceivedData[nodeIdx].ServerTime + case TimestampSourceSource: + t = o.LastReceivedData[nodeIdx].SourceTime + default: + t = time.Now() + } + + return metric.New(nmm.metricName, tags, fields, t) +} diff --git a/plugins/common/opcua/input/input_client_test.go b/plugins/common/opcua/input/input_client_test.go new file mode 100644 index 000000000..6d7e93c89 --- /dev/null +++ b/plugins/common/opcua/input/input_client_test.go @@ -0,0 +1,850 @@ +package input + +import ( + "fmt" + "github.com/gopcua/opcua/ua" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/common/opcua" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestTagsSliceToMap(t *testing.T) { + m, err := tagsSliceToMap([][]string{{"foo", "bar"}, {"baz", "bat"}}) + require.NoError(t, err) + require.Len(t, m, 2) + require.Equal(t, m["foo"], "bar") + require.Equal(t, m["baz"], "bat") +} + +func TestTagsSliceToMap_twoStrings(t *testing.T) { + var err error + _, err = tagsSliceToMap([][]string{{"foo", "bar", "baz"}}) + require.Error(t, err) + _, err = tagsSliceToMap([][]string{{"foo"}}) + require.Error(t, err) +} + +func TestTagsSliceToMap_dupeKey(t *testing.T) { + _, err := tagsSliceToMap([][]string{{"foo", "bar"}, {"foo", "bat"}}) + require.Error(t, err) +} + +func TestTagsSliceToMap_empty(t *testing.T) { + _, err := tagsSliceToMap([][]string{{"foo", ""}}) + require.Equal(t, fmt.Errorf("tag 1 has empty value"), err) + _, err = tagsSliceToMap([][]string{{"", "bar"}}) + require.Equal(t, fmt.Errorf("tag 1 has empty name"), err) +} + +func TestValidateOPCTags(t *testing.T) { + tests := []struct { + name string + config InputClientConfig + err error + }{ + { + "duplicates", + InputClientConfig{ + MetricName: "mn", + RootNodes: []NodeSettings{ + { + FieldName: "fn", + Namespace: "2", + IdentifierType: "s", + Identifier: "i1", + TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}}, + }, + }, + Groups: []NodeGroupSettings{ + { + Nodes: []NodeSettings{ + { + FieldName: "fn", + Namespace: "2", + IdentifierType: "s", + Identifier: "i1", + }, + }, + TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}}, + }, + }, + }, + fmt.Errorf("name 'fn' is duplicated (metric name 'mn', tags 't1=v1, t2=v2')"), + }, + { + "empty tag value not allowed", + InputClientConfig{ + MetricName: "mn", + RootNodes: []NodeSettings{ + { + FieldName: "fn", + IdentifierType: "s", + TagsSlice: [][]string{{"t1", ""}}, + }, + }, + }, + fmt.Errorf("tag 1 has empty value"), + }, + { + "empty tag name not allowed", + InputClientConfig{ + MetricName: "mn", + RootNodes: []NodeSettings{ + { + FieldName: "fn", + IdentifierType: "s", + TagsSlice: [][]string{{"", "1"}}, + }, + }, + }, + fmt.Errorf("tag 1 has empty name"), + }, + { + "different metric tag names", + InputClientConfig{ + MetricName: "mn", + RootNodes: []NodeSettings{ + { + FieldName: "fn", + Namespace: "2", + IdentifierType: "s", + Identifier: "i1", + TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}}, + }, + { + FieldName: "fn", + Namespace: "2", + IdentifierType: "s", + Identifier: "i1", + TagsSlice: [][]string{{"t1", "v1"}, {"t3", "v2"}}, + }, + }, + Groups: []NodeGroupSettings{}, + }, + nil, + }, + { + "different metric tag values", + InputClientConfig{ + MetricName: "mn", + RootNodes: []NodeSettings{ + { + FieldName: "fn", + Namespace: "2", + IdentifierType: "s", + Identifier: "i1", + TagsSlice: [][]string{{"t1", "foo"}, {"t2", "v2"}}, + }, + { + FieldName: "fn", + Namespace: "2", + IdentifierType: "s", + Identifier: "i1", + TagsSlice: [][]string{{"t1", "bar"}, {"t2", "v2"}}, + }, + }, + Groups: []NodeGroupSettings{}, + }, + nil, + }, + { + "different metric names", + InputClientConfig{ + MetricName: "mn", + RootNodes: []NodeSettings{}, + Groups: []NodeGroupSettings{ + { + MetricName: "mn", + Namespace: "2", + Nodes: []NodeSettings{ + { + FieldName: "fn", + IdentifierType: "s", + Identifier: "i1", + TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}}, + }, + }, + }, + { + MetricName: "mn2", + Namespace: "2", + Nodes: []NodeSettings{ + { + FieldName: "fn", + IdentifierType: "s", + Identifier: "i1", + TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}}, + }, + }, + }, + }, + }, + nil, + }, + { + "different field names", + InputClientConfig{ + MetricName: "mn", + RootNodes: []NodeSettings{ + { + FieldName: "fn", + Namespace: "2", + IdentifierType: "s", + Identifier: "i1", + TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}}, + }, + { + FieldName: "fn2", + Namespace: "2", + IdentifierType: "s", + Identifier: "i1", + TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}}, + }, + }, + Groups: []NodeGroupSettings{}, + }, + nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + o := OpcUAInputClient{ + Config: tt.config, + Log: testutil.Logger{}, + } + require.Equal(t, tt.err, o.InitNodeMetricMapping()) + }) + } +} + +func TestNewNodeMetricMappingTags(t *testing.T) { + tests := []struct { + name string + settings NodeSettings + groupTags map[string]string + expectedTags map[string]string + err error + }{ + { + name: "empty tags", + settings: NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "h", + TagsSlice: [][]string{}, + }, + groupTags: map[string]string{}, + expectedTags: map[string]string{}, + err: nil, + }, + { + name: "node tags only", + settings: NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "h", + TagsSlice: [][]string{{"t1", "v1"}}, + }, + groupTags: map[string]string{}, + expectedTags: map[string]string{"t1": "v1"}, + err: nil, + }, + { + name: "group tags only", + settings: NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "h", + TagsSlice: [][]string{}, + }, + groupTags: map[string]string{"t1": "v1"}, + expectedTags: map[string]string{"t1": "v1"}, + err: nil, + }, + { + name: "node tag overrides group tags", + settings: NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "h", + TagsSlice: [][]string{{"t1", "v2"}}, + }, + groupTags: map[string]string{"t1": "v1"}, + expectedTags: map[string]string{"t1": "v2"}, + err: nil, + }, + { + name: "node tag merged with group tags", + settings: NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "h", + TagsSlice: [][]string{{"t2", "v2"}}, + }, + groupTags: map[string]string{"t1": "v1"}, + expectedTags: map[string]string{"t1": "v1", "t2": "v2"}, + err: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + nmm, err := NewNodeMetricMapping("testmetric", tt.settings, tt.groupTags) + require.Equal(t, tt.err, err) + require.Equal(t, tt.expectedTags, nmm.MetricTags) + }) + } +} + +func TestNewNodeMetricMappingIdStrInstantiated(t *testing.T) { + nmm, err := NewNodeMetricMapping("testmetric", NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "h", + TagsSlice: [][]string{}, + }, map[string]string{}) + require.NoError(t, err) + require.Equal(t, nmm.idStr, "ns=2;s=h") +} + +func TestValidateNodeToAdd(t *testing.T) { + tests := []struct { + name string + existing map[metricParts]struct{} + nmm *NodeMetricMapping + err error + }{ + { + name: "valid", + existing: map[metricParts]struct{}{}, + nmm: func() *NodeMetricMapping { + nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "hf", + TagsSlice: [][]string{}, + }, map[string]string{}) + return nmm + }(), + err: nil, + }, + { + name: "empty field name not allowed", + existing: map[metricParts]struct{}{}, + nmm: func() *NodeMetricMapping { + nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{ + FieldName: "", + Namespace: "2", + IdentifierType: "s", + Identifier: "hf", + TagsSlice: [][]string{}, + }, map[string]string{}) + return nmm + }(), + err: fmt.Errorf("empty name in ''"), + }, + { + name: "empty namespace not allowed", + existing: map[metricParts]struct{}{}, + nmm: func() *NodeMetricMapping { + nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{ + FieldName: "f", + Namespace: "", + IdentifierType: "s", + Identifier: "hf", + TagsSlice: [][]string{}, + }, map[string]string{}) + return nmm + }(), + err: fmt.Errorf("empty node namespace not allowed"), + }, + { + name: "empty identifier type not allowed", + existing: map[metricParts]struct{}{}, + nmm: func() *NodeMetricMapping { + nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "", + Identifier: "hf", + TagsSlice: [][]string{}, + }, map[string]string{}) + return nmm + }(), + err: fmt.Errorf("invalid identifier type '' in 'f'"), + }, + { + name: "invalid identifier type not allowed", + existing: map[metricParts]struct{}{}, + nmm: func() *NodeMetricMapping { + nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "j", + Identifier: "hf", + TagsSlice: [][]string{}, + }, map[string]string{}) + return nmm + }(), + err: fmt.Errorf("invalid identifier type 'j' in 'f'"), + }, + { + name: "duplicate metric not allowed", + existing: map[metricParts]struct{}{ + {metricName: "testmetric", fieldName: "f", tags: "t1=v1, t2=v2"}: {}, + }, + nmm: func() *NodeMetricMapping { + nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "hf", + TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}}, + }, map[string]string{}) + return nmm + }(), + err: fmt.Errorf("name 'f' is duplicated (metric name 'testmetric', tags 't1=v1, t2=v2')"), + }, + { + name: "identifier type mismatch", + existing: map[metricParts]struct{}{}, + nmm: func() *NodeMetricMapping { + nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "i", + Identifier: "hf", + TagsSlice: [][]string{}, + }, map[string]string{}) + return nmm + }(), + err: fmt.Errorf("identifier type 'i' does not match the type of identifier 'hf'"), + }, + } + + for idT, idV := range map[string]string{ + "s": "hf", + "i": "1", + "g": "849683f0-ce92-4fa2-836f-a02cde61d75d", + "b": "aGVsbG8gSSBhbSBhIHRlc3QgaWRlbnRpZmllcg=="} { + tests = append(tests, struct { + name string + existing map[metricParts]struct{} + nmm *NodeMetricMapping + err error + }{ + name: "identifier type " + idT + " allowed", + existing: map[metricParts]struct{}{}, + nmm: func() *NodeMetricMapping { + nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: idT, + Identifier: idV, + TagsSlice: [][]string{}, + }, map[string]string{}) + return nmm + }(), + err: nil, + }) + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateNodeToAdd(tt.existing, tt.nmm) + require.Equal(t, tt.err, err) + }) + } +} + +func TestInitNodeMetricMapping(t *testing.T) { + tests := []struct { + testname string + config InputClientConfig + expected []NodeMetricMapping + err error + }{ + { + testname: "only root node", + config: InputClientConfig{ + MetricName: "testmetric", + Timestamp: TimestampSourceTelegraf, + RootNodes: []NodeSettings{ + { + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "id1", + TagsSlice: [][]string{{"t1", "v1"}}, + }, + }, + Groups: []NodeGroupSettings{}, + }, + expected: []NodeMetricMapping{ + { + Tag: NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "id1", + TagsSlice: [][]string{{"t1", "v1"}}, + }, + idStr: "ns=2;s=id1", + metricName: "testmetric", + MetricTags: map[string]string{"t1": "v1"}, + }, + }, + err: nil, + }, + { + testname: "root node and group node", + config: InputClientConfig{ + MetricName: "testmetric", + Timestamp: TimestampSourceTelegraf, + RootNodes: []NodeSettings{ + { + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "id1", + TagsSlice: [][]string{{"t1", "v1"}}, + }, + }, + Groups: []NodeGroupSettings{ + { + MetricName: "groupmetric", + Namespace: "3", + IdentifierType: "s", + Nodes: []NodeSettings{ + { + FieldName: "f", + Identifier: "id2", + TagsSlice: [][]string{{"t2", "v2"}}, + }, + }, + TagsSlice: [][]string{}, + }, + }, + }, + expected: []NodeMetricMapping{ + { + Tag: NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "id1", + TagsSlice: [][]string{{"t1", "v1"}}, + }, + idStr: "ns=2;s=id1", + metricName: "testmetric", + MetricTags: map[string]string{"t1": "v1"}, + }, + { + Tag: NodeSettings{ + FieldName: "f", + Namespace: "3", + IdentifierType: "s", + Identifier: "id2", + TagsSlice: [][]string{{"t2", "v2"}}, + }, + idStr: "ns=3;s=id2", + metricName: "groupmetric", + MetricTags: map[string]string{"t2": "v2"}, + }, + }, + err: nil, + }, + { + testname: "only group node", + config: InputClientConfig{ + MetricName: "testmetric", + Timestamp: TimestampSourceTelegraf, + RootNodes: []NodeSettings{}, + Groups: []NodeGroupSettings{ + { + MetricName: "groupmetric", + Namespace: "3", + IdentifierType: "s", + Nodes: []NodeSettings{ + { + FieldName: "f", + Identifier: "id2", + TagsSlice: [][]string{{"t2", "v2"}}, + }, + }, + TagsSlice: [][]string{}, + }, + }, + }, + expected: []NodeMetricMapping{ + { + Tag: NodeSettings{ + FieldName: "f", + Namespace: "3", + IdentifierType: "s", + Identifier: "id2", + TagsSlice: [][]string{{"t2", "v2"}}, + }, + idStr: "ns=3;s=id2", + metricName: "groupmetric", + MetricTags: map[string]string{"t2": "v2"}, + }, + }, + err: nil, + }, + { + testname: "tags and default only default tags used", + config: InputClientConfig{ + MetricName: "testmetric", + Timestamp: TimestampSourceTelegraf, + RootNodes: []NodeSettings{}, + Groups: []NodeGroupSettings{ + { + MetricName: "groupmetric", + Namespace: "3", + IdentifierType: "s", + Nodes: []NodeSettings{ + { + FieldName: "f", + Identifier: "id2", + TagsSlice: [][]string{{"t2", "v2"}}, + DefaultTags: map[string]string{"t3": "v3"}, + }, + }, + TagsSlice: [][]string{}, + }, + }, + }, + expected: []NodeMetricMapping{ + { + Tag: NodeSettings{ + FieldName: "f", + Namespace: "3", + IdentifierType: "s", + Identifier: "id2", + TagsSlice: [][]string{{"t2", "v2"}}, + DefaultTags: map[string]string{"t3": "v3"}, + }, + idStr: "ns=3;s=id2", + metricName: "groupmetric", + MetricTags: map[string]string{"t3": "v3"}, + }, + }, + err: nil, + }, + { + testname: "only root node default overrides slice", + config: InputClientConfig{ + MetricName: "testmetric", + Timestamp: TimestampSourceTelegraf, + RootNodes: []NodeSettings{ + { + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "id1", + TagsSlice: [][]string{{"t1", "v1"}}, + DefaultTags: map[string]string{"t3": "v3"}, + }, + }, + Groups: []NodeGroupSettings{}, + }, + expected: []NodeMetricMapping{ + { + Tag: NodeSettings{ + FieldName: "f", + Namespace: "2", + IdentifierType: "s", + Identifier: "id1", + TagsSlice: [][]string{{"t1", "v1"}}, + DefaultTags: map[string]string{"t3": "v3"}, + }, + idStr: "ns=2;s=id1", + metricName: "testmetric", + MetricTags: map[string]string{"t3": "v3"}, + }, + }, + err: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.testname, func(t *testing.T) { + o := OpcUAInputClient{Config: tt.config} + err := o.InitNodeMetricMapping() + require.NoError(t, err) + require.Equal(t, tt.expected, o.NodeMetricMapping) + }) + } +} + +func TestUpdateNodeValue(t *testing.T) { + type testStep struct { + nodeIdx int + value interface{} + status ua.StatusCode + expected interface{} + } + tests := []struct { + testname string + steps []testStep + }{ + { + "value should update when code ok", + []testStep{ + { + 0, + "Harmony", + ua.StatusOK, + "Harmony", + }, + }, + }, + { + "value should not update when code bad", + []testStep{ + { + 0, + "Harmony", + ua.StatusOK, + "Harmony", + }, + { + 0, + "Odium", + ua.StatusBad, + "Harmony", + }, + { + 0, + "Ati", + ua.StatusOK, + "Ati", + }, + }, + }, + } + + conf := &opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4930", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "", + ConnectTimeout: config.Duration(2 * time.Second), + RequestTimeout: config.Duration(2 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + } + c, err := conf.CreateClient(testutil.Logger{}) + require.NoError(t, err) + o := OpcUAInputClient{ + OpcUAClient: c, + Log: testutil.Logger{}, + NodeMetricMapping: []NodeMetricMapping{ + { + Tag: NodeSettings{ + FieldName: "f", + }, + }, + { + Tag: NodeSettings{ + FieldName: "f2", + }, + }, + }, + LastReceivedData: make([]NodeValue, 2), + } + + for _, tt := range tests { + t.Run(tt.testname, func(t *testing.T) { + o.LastReceivedData = make([]NodeValue, 2) + for i, step := range tt.steps { + v, _ := ua.NewVariant(step.value) + o.UpdateNodeValue(0, &ua.DataValue{ + Value: v, + Status: step.status, + SourceTimestamp: time.Date(2022, 03, 17, 8, 33, 00, 00, &time.Location{}).Add(time.Duration(i) * time.Second), + SourcePicoseconds: 0, + ServerTimestamp: time.Date(2022, 03, 17, 8, 33, 00, 500, &time.Location{}).Add(time.Duration(i) * time.Second), + ServerPicoseconds: 0, + }) + require.Equal(t, step.expected, o.LastReceivedData[0].Value) + } + }) + } +} + +func TestMetricForNode(t *testing.T) { + conf := &opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4930", + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "", + ConnectTimeout: config.Duration(2 * time.Second), + RequestTimeout: config.Duration(2 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + } + c, err := conf.CreateClient(testutil.Logger{}) + require.NoError(t, err) + o := OpcUAInputClient{ + Config: InputClientConfig{ + Timestamp: TimestampSourceSource, + }, + OpcUAClient: c, + Log: testutil.Logger{}, + LastReceivedData: make([]NodeValue, 2), + } + + tests := []struct { + testname string + nmm []NodeMetricMapping + v interface{} + time time.Time + status ua.StatusCode + expected telegraf.Metric + }{ + { + testname: "metric build correctly", + nmm: []NodeMetricMapping{ + { + Tag: NodeSettings{ + FieldName: "fn", + }, + idStr: "ns=3;s=hi", + metricName: "testingmetric", + MetricTags: map[string]string{"t1": "v1"}, + }, + }, + v: 16, + time: time.Date(2022, 03, 17, 8, 55, 00, 00, &time.Location{}), + status: ua.StatusOK, + expected: metric.New("testingmetric", + map[string]string{"t1": "v1", "id": "ns=3;s=hi"}, + map[string]interface{}{"Quality": "OK (0x0)", "fn": 16}, + time.Date(2022, 03, 17, 8, 55, 00, 00, &time.Location{})), + }, + } + + for _, tt := range tests { + t.Run(tt.testname, func(t *testing.T) { + o.NodeMetricMapping = tt.nmm + o.LastReceivedData[0].SourceTime = tt.time + o.LastReceivedData[0].Quality = tt.status + o.LastReceivedData[0].Value = tt.v + actual := o.MetricForNode(0) + require.Equal(t, tt.expected.Tags(), actual.Tags()) + require.Equal(t, tt.expected.Fields(), actual.Fields()) + require.Equal(t, tt.expected.Time(), actual.Time()) + }) + } +} diff --git a/plugins/inputs/opcua/opcua_util.go b/plugins/common/opcua/opcua_util.go similarity index 89% rename from plugins/inputs/opcua/opcua_util.go rename to plugins/common/opcua/opcua_util.go index e5335baba..1cc282aa6 100644 --- a/plugins/inputs/opcua/opcua_util.go +++ b/plugins/common/opcua/opcua_util.go @@ -62,7 +62,7 @@ func generateCert(host string, rsaBits int, certFile, keyFile string, dur time.D template := x509.Certificate{ SerialNumber: serialNumber, Subject: pkix.Name{ - Organization: []string{"Telegraf OPC UA client"}, + Organization: []string{"Telegraf OPC UA Client"}, }, NotBefore: notBefore, NotAfter: notAfter, @@ -144,8 +144,7 @@ func pemBlockForKey(priv interface{}) (*pem.Block, error) { } } -//revive:disable-next-line -func (o *OpcUA) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua.Option, error) { +func (o *OpcUAClient) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua.Option, error) { opts := []opcua.Option{} appuri := "urn:telegraf:gopcua:client" appname := "Telegraf" @@ -153,12 +152,12 @@ func (o *OpcUA) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua // ApplicationURI is automatically read from the cert so is not required if a cert if provided opts = append(opts, opcua.ApplicationURI(appuri)) opts = append(opts, opcua.ApplicationName(appname)) - opts = append(opts, opcua.RequestTimeout(time.Duration(o.RequestTimeout))) + opts = append(opts, opcua.RequestTimeout(time.Duration(o.Config.RequestTimeout))) - certFile := o.Certificate - keyFile := o.PrivateKey - policy := o.SecurityPolicy - mode := o.SecurityMode + certFile := o.Config.Certificate + keyFile := o.Config.PrivateKey + policy := o.Config.SecurityPolicy + mode := o.Config.SecurityMode var err error if certFile == "" && keyFile == "" { if policy != "None" || mode != "None" { @@ -199,8 +198,10 @@ func (o *OpcUA) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua return nil, fmt.Errorf("invalid security policy: %s", policy) } + o.Log.Debugf("security policy from configuration %s", secPolicy) + // Select the most appropriate authentication mode from server capabilities and user input - authMode, authOption, err := o.generateAuth(o.AuthMethod, cert, o.Username, o.Password) + authMode, authOption, err := o.generateAuth(o.Config.AuthMethod, cert, o.Config.Username, o.Config.Password) if err != nil { return nil, err } @@ -254,9 +255,13 @@ func (o *OpcUA) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua } default: // User cares about both + o.Log.Debugf("User cares about both the policy (%s) and security mode (%s)", secPolicy, secMode) + o.Log.Debugf("Server has %d endpoints", len(endpoints)) for _, e := range endpoints { + o.Log.Debugf("Evaluating endpoint %s, policy %s, mode %s, level %d", e.EndpointURL, e.SecurityPolicyURI, e.SecurityMode, e.SecurityLevel) if e.SecurityPolicyURI == secPolicy && e.SecurityMode == secMode && (serverEndpoint == nil || e.SecurityLevel >= serverEndpoint.SecurityLevel) { serverEndpoint = e + o.Log.Debugf("Security policy and mode found. Using server endpoint %s for security. Policy %s", serverEndpoint.EndpointURL, serverEndpoint.SecurityPolicyURI) } } } @@ -278,7 +283,7 @@ func (o *OpcUA) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua return opts, nil } -func (o *OpcUA) generateAuth(a string, cert []byte, un, pw string) (ua.UserTokenType, opcua.Option, error) { +func (o *OpcUAClient) generateAuth(a string, cert []byte, un, pw string) (ua.UserTokenType, opcua.Option, error) { var err error var authMode ua.UserTokenType diff --git a/plugins/inputs/all/opcua_listener.go b/plugins/inputs/all/opcua_listener.go new file mode 100644 index 000000000..7c602ebff --- /dev/null +++ b/plugins/inputs/all/opcua_listener.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.opcua_listener + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/opcua_listener" // register plugin diff --git a/plugins/inputs/opcua/README.md b/plugins/inputs/opcua/README.md index da8114220..34dc3b3b5 100644 --- a/plugins/inputs/opcua/README.md +++ b/plugins/inputs/opcua/README.md @@ -1,6 +1,6 @@ -# OPC UA Client Input Plugin +# OPC UA Client Reader Input Plugin -The `opcua` plugin retrieves data from OPC UA client devices. +The `opcua` plugin retrieves data from OPC UA Server devices. Telegraf minimum version: Telegraf 1.16 Plugin minimum tested version: 1.16 @@ -19,7 +19,7 @@ Plugin minimum tested version: 1.16 ## Maximum time allowed to establish a connect to the endpoint. # connect_timeout = "10s" # - ## Maximum time allowed for a request over the estabilished connection. + ## Maximum time allowed for a request over the established connection. # request_timeout = "5s" # ## Security policy, one of "None", "Basic128Rsa15", "Basic256", @@ -58,18 +58,38 @@ Plugin minimum tested version: 1.16 ## namespace - OPC UA namespace of the node (integer value 0 thru 3) ## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque) ## identifier - OPC UA ID (tag as shown in opcua browser) - ## tags - extra tags to be added to the output metric (optional) - ## Example: - ## {name="ProductUri", namespace="0", identifier_type="i", identifier="2262", tags=[["tag1","value1"],["tag2","value2"]]} + ## tags - extra tags to be added to the output metric (optional); deprecated in 1.25.0; use default_tags + ## default_tags - extra tags to be added to the output metric (optional) + ## + ## Use either the inline notation or the bracketed notation, not both. + # + ## Inline notation (default_tags not supported yet) # nodes = [ - # {name="", namespace="", identifier_type="", identifier=""}, - # {name="", namespace="", identifier_type="", identifier=""}, - #] + # {name="", namespace="", identifier_type="", identifier="", tags=[["tag1", "value1"], ["tag2", "value2"]}, + # {name="", namespace="", identifier_type="", identifier=""}, + # ] + # + ## Bracketed notation + # [[inputs.opcua.nodes]] + # name = "node1" + # namespace = "" + # identifier_type = "" + # identifier = "" + # default_tags = { tag1 = "value1", tag2 = "value2" } + # + # [[inputs.opcua.nodes]] + # name = "node2" + # namespace = "" + # identifier_type = "" + # identifier = "" # ## Node Group - ## Sets defaults for OPC UA namespace and ID type so they aren't required in - ## every node. A group can also have a metric name that overrides the main - ## plugin metric name. + ## Sets defaults so they aren't required in every node. + ## Default values can be set for: + ## * Metric name + ## * OPC UA namespace + ## * Identifier + ## * Default tags ## ## Multiple node groups are allowed #[[inputs.opcua.group]] @@ -85,39 +105,61 @@ Plugin minimum tested version: 1.16 ## namespace, this is used. # identifier_type = # + ## Default tags that are applied to every node in this group. Can be + ## overwritten in a node by setting a different value for the tag name. + ## example: default_tags = { tag1 = "value1" } + # default_tags = {} + # ## Node ID Configuration. Array of nodes with the same settings as above. + ## Use either the inline notation or the bracketed notation, not both. + # + ## Inline notation (default_tags not supported yet) # nodes = [ - # {name="", namespace="", identifier_type="", identifier=""}, - # {name="", namespace="", identifier_type="", identifier=""}, + # {name="node1", namespace="", identifier_type="", identifier=""}, + # {name="node2", namespace="", identifier_type="", identifier=""}, #] + # + ## Bracketed notation + # [[inputs.opcua.group.nodes]] + # name = "node1" + # namespace = "" + # identifier_type = "" + # identifier = "" + # default_tags = { tag1 = "override1", tag2 = "value2" } + # + # [[inputs.opcua.group.nodes]] + # name = "node2" + # namespace = "" + # identifier_type = "" + # identifier = "" ## Enable workarounds required by some devices to work correctly # [inputs.opcua.workarounds] ## Set additional valid status codes, StatusOK (0x0) is always considered valid # additional_valid_status_codes = ["0xC0"] + # [inputs.opcua.request_workarounds] ## Use unregistered reads instead of registered reads # use_unregistered_reads = false ``` ## Node Configuration -An OPC UA node ID may resemble: "n=3;s=Temperature". In this example: +An OPC UA node ID may resemble: "ns=3;s=Temperature". In this example: -- n=3 is indicating the `namespace` is 3 +- ns=3 is indicating the `namespace` is 3 - s=Temperature is indicting that the `identifier_type` is a string and `identifier` value is 'Temperature' - This example temperature node has a value of 79.0 To gather data from this node enter the following line into the 'nodes' property above: -```shell +```text {field_name="temp", namespace="3", identifier_type="s", identifier="Temperature"}, ``` This node configuration produces a metric like this: ```text -opcua,id=n\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000 - +opcua,id=ns\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000 ``` ## Group Configuration @@ -132,32 +174,59 @@ The output metric will include tags set in the group and the node. If a tag with the same name is set in both places, the tag value from the node is used. -This example group configuration has two groups with two nodes each: +This example group configuration has three groups with two nodes each: ```toml + # Group 1 [[inputs.opcua.group]] - name="group1_metric_name" - namespace="3" - identifier_type="i" - tags=[["group1_tag", "val1"]] - nodes = [ - {name="name", identifier="1001", tags=[["node1_tag", "val2"]]}, - {name="name", identifier="1002", tags=[["node1_tag", "val3"]]}, - ] + name = "group1_metric_name" + namespace = "3" + identifier_type = "i" + default_tags = { group1_tag = "val1" } + [[inputs.opcua.group.nodes]] + name = "name" + identifier = "1001" + default_tags = { node1_tag = "val2" } + [[inputs.opcua.group.nodes]] + name = "name" + identifier = "1002" + default_tags = {node1_tag = "val3"} + + # Group 2 [[inputs.opcua.group]] - name="group2_metric_name" - namespace="3" - identifier_type="i" - tags=[["group2_tag", "val3"]] - nodes = [ - {name="saw", identifier="1003", tags=[["node2_tag", "val4"]]}, - {name="sin", identifier="1004"}, - ] + name = "group2_metric_name" + namespace = "3" + identifier_type = "i" + default_tags = { group2_tag = "val3" } + [[inputs.opcua.group.nodes]] + name = "saw" + identifier = "1003" + default_tags = { node2_tag = "val4" } + [[inputs.opcua.group.nodes]] + name = "sin" + identifier = "1004" + + # Group 3 + [[inputs.opcua.group]] + name = "group3_metric_name" + namespace = "3" + identifier_type = "i" + default_tags = { group3_tag = "val5" } + nodes = [ + {name="name", identifier="1001"}, + {name="name", identifier="1002"}, + ] ``` +## Connection Service + +This plugin actively reads to retrieve data from the OPC server. +This is done every `interval`. + ## Metrics -Metrics are produced according to the defined node ID and group configuration. +The metrics collected by this input plugin will depend on the +configured `nodes` and `group`. ## Example Output diff --git a/plugins/inputs/opcua/opcua.go b/plugins/inputs/opcua/opcua.go index 4295c2db9..7fd9152f3 100644 --- a/plugins/inputs/opcua/opcua.go +++ b/plugins/inputs/opcua/opcua.go @@ -2,556 +2,48 @@ package opcua import ( - "context" _ "embed" - "fmt" - "net/url" - "sort" - "strconv" - "strings" - "time" - - "github.com/gopcua/opcua" "github.com/gopcua/opcua/ua" + "github.com/influxdata/telegraf/plugins/common/opcua" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/internal/choice" + "github.com/influxdata/telegraf/plugins/common/opcua/input" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/influxdata/telegraf/selfstat" ) //go:embed sample.conf var sampleConfig string -type OpcuaWorkarounds struct { - AdditionalValidStatusCodes []string `toml:"additional_valid_status_codes"` - UseUnregisteredReads bool `toml:"use_unregistered_reads"` -} - -// OpcUA type type OpcUA struct { - MetricName string `toml:"name"` - Endpoint string `toml:"endpoint"` - SecurityPolicy string `toml:"security_policy"` - SecurityMode string `toml:"security_mode"` - Certificate string `toml:"certificate"` - PrivateKey string `toml:"private_key"` - Username string `toml:"username"` - Password string `toml:"password"` - Timestamp string `toml:"timestamp"` - AuthMethod string `toml:"auth_method"` - ConnectTimeout config.Duration `toml:"connect_timeout"` - RequestTimeout config.Duration `toml:"request_timeout"` - RootNodes []NodeSettings `toml:"nodes"` - Groups []GroupSettings `toml:"group"` - Workarounds OpcuaWorkarounds `toml:"workarounds"` - Log telegraf.Logger `toml:"-"` + ReadClientConfig + Log telegraf.Logger `toml:"-"` - nodes []Node - nodeData []OPCData - nodeIDs []*ua.NodeID - nodeIDerror []error - state ConnectionState - - // status - ReadSuccess selfstat.Stat `toml:"-"` - ReadError selfstat.Stat `toml:"-"` - - // internal values - client *opcua.Client - req *ua.ReadRequest - opts []opcua.Option + client *ReadClient codes []ua.StatusCode } -type NodeSettings struct { - FieldName string `toml:"name"` - Namespace string `toml:"namespace"` - IdentifierType string `toml:"identifier_type"` - Identifier string `toml:"identifier"` - DataType string `toml:"data_type"` // Kept for backward compatibility but was never used. - Description string `toml:"description"` // Kept for backward compatibility but was never used. - TagsSlice [][]string `toml:"tags"` -} - -type Node struct { - tag NodeSettings - idStr string - metricName string - metricTags map[string]string -} - -type GroupSettings struct { - MetricName string `toml:"name"` // Overrides plugin's setting - Namespace string `toml:"namespace"` // Can be overridden by node setting - IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting - Nodes []NodeSettings `toml:"nodes"` - TagsSlice [][]string `toml:"tags"` -} - -// OPCData type -type OPCData struct { - TagName string - Value interface{} - Quality ua.StatusCode - ServerTime time.Time - SourceTime time.Time - DataType ua.TypeID -} - -// ConnectionState used for constants -type ConnectionState int - -const ( - //Disconnected constant state 0 - Disconnected ConnectionState = iota - //Connecting constant state 1 - Connecting - //Connected constant state 2 - Connected -) - func (*OpcUA) SampleConfig() string { return sampleConfig } -// Init will initialize all tags -func (o *OpcUA) Init() error { - o.state = Disconnected - - err := choice.Check(o.Timestamp, []string{"", "gather", "server", "source"}) - if err != nil { - return err - } - - err = o.validateEndpoint() - if err != nil { - return err - } - - err = o.InitNodes() - if err != nil { - return err - } - - err = o.setupOptions() - if err != nil { - return err - } - - err = o.setupWorkarounds() - if err != nil { - return err - } - - tags := map[string]string{ - "endpoint": o.Endpoint, - } - o.ReadError = selfstat.Register("opcua", "read_error", tags) - o.ReadSuccess = selfstat.Register("opcua", "read_success", tags) - - return nil -} - -func (o *OpcUA) validateEndpoint() error { - if o.MetricName == "" { - return fmt.Errorf("device name is empty") - } - - if o.Endpoint == "" { - return fmt.Errorf("endpoint url is empty") - } - - _, err := url.Parse(o.Endpoint) - if err != nil { - return fmt.Errorf("endpoint url is invalid") - } - - //search security policy type - switch o.SecurityPolicy { - case "None", "Basic128Rsa15", "Basic256", "Basic256Sha256", "auto": - // Valid security policy type - do nothing. - default: - return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityPolicy, o.MetricName) - } - //search security mode type - switch o.SecurityMode { - case "None", "Sign", "SignAndEncrypt", "auto": - // Valid security mode type - do nothing. - default: - return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityMode, o.MetricName) - } - return nil -} - -func tagsSliceToMap(tags [][]string) (map[string]string, error) { - m := make(map[string]string) - for i, tag := range tags { - if len(tag) != 2 { - return nil, fmt.Errorf("tag %d needs 2 values, has %d: %v", i+1, len(tag), tag) - } - if tag[0] == "" { - return nil, fmt.Errorf("tag %d has empty name", i+1) - } - if tag[1] == "" { - return nil, fmt.Errorf("tag %d has empty value", i+1) - } - if _, ok := m[tag[0]]; ok { - return nil, fmt.Errorf("tag %d has duplicate key: %v", i+1, tag[0]) - } - m[tag[0]] = tag[1] - } - return m, nil -} - -// InitNodes Method on OpcUA -func (o *OpcUA) InitNodes() error { - for _, node := range o.RootNodes { - nodeTags, err := tagsSliceToMap(node.TagsSlice) - - if err != nil { - return err - } - - o.nodes = append(o.nodes, Node{ - metricName: o.MetricName, - tag: node, - metricTags: nodeTags, - }) - } - - for _, group := range o.Groups { - if group.MetricName == "" { - group.MetricName = o.MetricName - } - groupTags, err := tagsSliceToMap(group.TagsSlice) - if err != nil { - return err - } - for _, node := range group.Nodes { - if node.Namespace == "" { - node.Namespace = group.Namespace - } - if node.IdentifierType == "" { - node.IdentifierType = group.IdentifierType - } - nodeTags, err := tagsSliceToMap(node.TagsSlice) - if err != nil { - return err - } - mergedTags := make(map[string]string) - for k, v := range groupTags { - mergedTags[k] = v - } - for k, v := range nodeTags { - mergedTags[k] = v - } - o.nodes = append(o.nodes, Node{ - metricName: group.MetricName, - tag: node, - metricTags: mergedTags, - }) - } - } - - err := o.validateOPCTags() - if err != nil { - return err - } - - return nil -} - -type metricParts struct { - metricName string - fieldName string - tags string // sorted by tag name and in format tag1=value1, tag2=value2 -} - -func newMP(n *Node) metricParts { - var keys []string - for key := range n.metricTags { - keys = append(keys, key) - } - sort.Strings(keys) - var sb strings.Builder - for i, key := range keys { - if i != 0 { - // Writes to a string-builder will always succeed - //nolint:errcheck,revive - sb.WriteString(", ") - } - // Writes to a string-builder will always succeed - //nolint:errcheck,revive - sb.WriteString(key) - // Writes to a string-builder will always succeed - //nolint:errcheck,revive - sb.WriteString("=") - // Writes to a string-builder will always succeed - //nolint:errcheck,revive - sb.WriteString(n.metricTags[key]) - } - x := metricParts{ - metricName: n.metricName, - fieldName: n.tag.FieldName, - tags: sb.String(), - } - return x -} - -func (o *OpcUA) validateOPCTags() error { - nameEncountered := map[metricParts]struct{}{} - for i, node := range o.nodes { - mp := newMP(&node) - //check empty name - if node.tag.FieldName == "" { - return fmt.Errorf("empty name in '%s'", node.tag.FieldName) - } - //search name duplicate - if _, ok := nameEncountered[mp]; ok { - return fmt.Errorf("name '%s' is duplicated (metric name '%s', tags '%s')", - mp.fieldName, mp.metricName, mp.tags) - } - - //add it to the set - nameEncountered[mp] = struct{}{} - - //search identifier type - switch node.tag.IdentifierType { - case "i": - if _, err := strconv.Atoi(node.tag.Identifier); err != nil { - return fmt.Errorf("identifier type '%s' does not match the type of identifier '%s'", node.tag.IdentifierType, node.tag.Identifier) - } - case "s", "g", "b": - // Valid identifier type - do nothing. - default: - return fmt.Errorf("invalid identifier type '%s' in '%s'", node.tag.IdentifierType, node.tag.FieldName) - } - - o.nodes[i].idStr = BuildNodeID(node.tag) - - //parse NodeIds and NodeIds errors - nid, niderr := ua.ParseNodeID(o.nodes[i].idStr) - // build NodeIds and Errors - o.nodeIDs = append(o.nodeIDs, nid) - o.nodeIDerror = append(o.nodeIDerror, niderr) - // Grow NodeData for later input - o.nodeData = append(o.nodeData, OPCData{}) - } - return nil -} - -// BuildNodeID build node ID from OPC tag -func BuildNodeID(tag NodeSettings) string { - return "ns=" + tag.Namespace + ";" + tag.IdentifierType + "=" + tag.Identifier -} - -// Connect to a OPCUA device -func Connect(o *OpcUA) error { - u, err := url.Parse(o.Endpoint) - if err != nil { - return err - } - - switch u.Scheme { - case "opc.tcp": - o.state = Connecting - - if o.client != nil { - if err := o.client.Close(); err != nil { - // Only log the error but to not bail-out here as this prevents - // reconnections for multiple parties (see e.g. #9523). - o.Log.Errorf("Closing connection failed: %v", err) - } - } - - o.client = opcua.NewClient(o.Endpoint, o.opts...) - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.ConnectTimeout)) - defer cancel() - if err := o.client.Connect(ctx); err != nil { - return fmt.Errorf("error in Client Connection: %s", err) - } - - if !o.Workarounds.UseUnregisteredReads { - regResp, err := o.client.RegisterNodes(&ua.RegisterNodesRequest{ - NodesToRegister: o.nodeIDs, - }) - if err != nil { - return fmt.Errorf("registerNodes failed: %v", err) - } - - o.req = &ua.ReadRequest{ - MaxAge: 2000, - TimestampsToReturn: ua.TimestampsToReturnBoth, - NodesToRead: readvalues(regResp.RegisteredNodeIDs), - } - } else { - var nodesToRead []*ua.ReadValueID - - for _, nid := range o.nodeIDs { - nodesToRead = append(nodesToRead, &ua.ReadValueID{NodeID: nid}) - } - - o.req = &ua.ReadRequest{ - MaxAge: 2000, - TimestampsToReturn: ua.TimestampsToReturnBoth, - NodesToRead: nodesToRead, - } - } - - err = o.getData() - if err != nil { - return fmt.Errorf("get Data Failed: %v", err) - } - - default: - return fmt.Errorf("unsupported scheme %q in endpoint. Expected opc.tcp", u.Scheme) - } - return nil -} - -func (o *OpcUA) setupOptions() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.ConnectTimeout)) - defer cancel() - // Get a list of the endpoints for our target server - endpoints, err := opcua.GetEndpoints(ctx, o.Endpoint) - if err != nil { - return err - } - - if o.Certificate == "" && o.PrivateKey == "" { - if o.SecurityPolicy != "None" || o.SecurityMode != "None" { - o.Certificate, o.PrivateKey, err = generateCert("urn:telegraf:gopcua:client", 2048, o.Certificate, o.PrivateKey, 365*24*time.Hour) - if err != nil { - return err - } - } - } - - o.opts, err = o.generateClientOpts(endpoints) - +// Init Initialise all required objects +func (o *OpcUA) Init() (err error) { + o.client, err = o.ReadClientConfig.CreateReadClient(o.Log) return err } -func (o *OpcUA) setupWorkarounds() error { - if len(o.Workarounds.AdditionalValidStatusCodes) != 0 { - for _, c := range o.Workarounds.AdditionalValidStatusCodes { - val, err := strconv.ParseInt(c, 0, 32) // setting 32 bits to allow for safe conversion - if err != nil { - return err - } - o.codes = append(o.codes, ua.StatusCode(uint32(val))) - } - } - return nil -} - -func (o *OpcUA) checkStatusCode(code ua.StatusCode) bool { - for _, val := range o.codes { - if val == code { - return true - } - } - return false -} - -func (o *OpcUA) getData() error { - resp, err := o.client.Read(o.req) - if err != nil { - o.ReadError.Incr(1) - return fmt.Errorf("Read failed: %w", err) - } - o.ReadSuccess.Incr(1) - for i, d := range resp.Results { - o.nodeData[i].Quality = d.Status - if !o.checkStatusCode(d.Status) { - mp := newMP(&o.nodes[i]) - o.Log.Errorf("status not OK for node '%s'(metric name '%s', tags '%s')", - mp.fieldName, mp.metricName, mp.tags) - continue - } - o.nodeData[i].TagName = o.nodes[i].tag.FieldName - if d.Value != nil { - o.nodeData[i].Value = d.Value.Value() - o.nodeData[i].DataType = d.Value.Type() - } - o.nodeData[i].Quality = d.Status - o.nodeData[i].ServerTime = d.ServerTimestamp - o.nodeData[i].SourceTime = d.SourceTimestamp - } - return nil -} - -func readvalues(ids []*ua.NodeID) []*ua.ReadValueID { - rvids := make([]*ua.ReadValueID, len(ids)) - for i, v := range ids { - rvids[i] = &ua.ReadValueID{NodeID: v} - } - return rvids -} - -func disconnect(o *OpcUA) error { - u, err := url.Parse(o.Endpoint) - if err != nil { - return err - } - - switch u.Scheme { - case "opc.tcp": - o.state = Disconnected - o.client.Close() - o.client = nil - return nil - default: - return fmt.Errorf("invalid controller") - } -} - // Gather defines what data the plugin will gather. func (o *OpcUA) Gather(acc telegraf.Accumulator) error { - if o.state == Disconnected { - o.state = Connecting - err := Connect(o) - if err != nil { - o.state = Disconnected - return err - } - } - - o.state = Connected - - err := o.getData() - if err != nil && o.state == Connected { - o.state = Disconnected - // Ignore returned error to not mask the original problem - //nolint:errcheck,revive - disconnect(o) + metrics, err := o.client.CurrentValues() + if err != nil { return err } - for i, n := range o.nodes { - if o.checkStatusCode(o.nodeData[i].Quality) { - fields := make(map[string]interface{}) - tags := map[string]string{ - "id": n.idStr, - } - for k, v := range n.metricTags { - tags[k] = v - } - - fields[o.nodeData[i].TagName] = o.nodeData[i].Value - fields["Quality"] = strings.TrimSpace(fmt.Sprint(o.nodeData[i].Quality)) - - switch o.Timestamp { - case "server": - acc.AddFields(n.metricName, fields, tags, o.nodeData[i].ServerTime) - case "source": - acc.AddFields(n.metricName, fields, tags, o.nodeData[i].SourceTime) - default: - acc.AddFields(n.metricName, fields, tags) - } - } + // Parse the resulting data into metrics + for _, m := range metrics { + acc.AddMetric(m) } return nil } @@ -560,17 +52,22 @@ func (o *OpcUA) Gather(acc telegraf.Accumulator) error { func init() { inputs.Add("opcua", func() telegraf.Input { return &OpcUA{ - MetricName: "opcua", - Endpoint: "opc.tcp://localhost:4840", - SecurityPolicy: "auto", - SecurityMode: "auto", - Timestamp: "gather", - RequestTimeout: config.Duration(5 * time.Second), - ConnectTimeout: config.Duration(10 * time.Second), - Certificate: "/etc/telegraf/cert.pem", - PrivateKey: "/etc/telegraf/key.pem", - AuthMethod: "Anonymous", - codes: []ua.StatusCode{ua.StatusOK}, + ReadClientConfig: ReadClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4840", + SecurityPolicy: "auto", + SecurityMode: "auto", + Certificate: "/etc/telegraf/cert.pem", + PrivateKey: "/etc/telegraf/key.pem", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(5 * time.Second), + RequestTimeout: config.Duration(10 * time.Second), + }, + MetricName: "opcua", + Timestamp: input.TimestampSourceTelegraf, + }, + }, } }) } diff --git a/plugins/inputs/opcua/opcua_test.go b/plugins/inputs/opcua/opcua_test.go index 5e6180f85..16e763e9a 100644 --- a/plugins/inputs/opcua/opcua_test.go +++ b/plugins/inputs/opcua/opcua_test.go @@ -2,19 +2,19 @@ package opcua import ( "fmt" - "reflect" - "testing" - "time" - "github.com/docker/go-connections/nat" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/opcua" + "github.com/influxdata/telegraf/plugins/common/opcua/input" + "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" - - "github.com/gopcua/opcua/ua" - "github.com/influxdata/telegraf/config" - "github.com/influxdata/telegraf/testutil" + "testing" + "time" ) +const servicePort = "4840" + type OPCTags struct { Name string Namespace string @@ -23,7 +23,13 @@ type OPCTags struct { Want interface{} } -const servicePort = "4840" +func MapOPCTag(tags OPCTags) (out input.NodeSettings) { + out.FieldName = tags.Name + out.Namespace = tags.Namespace + out.IdentifierType = tags.IdentifierType + out.Identifier = tags.Identifier + return out +} func TestGetDataBadNodeContainerIntegration(t *testing.T) { if testing.Short() { @@ -47,20 +53,28 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) { {"ManufacturerName", "0", "i", "2263", "open62541"}, } - var o OpcUA - o.MetricName = "testing" - o.Endpoint = fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]) - fmt.Println(o.Endpoint) - o.AuthMethod = "Anonymous" - o.ConnectTimeout = config.Duration(10 * time.Second) - o.RequestTimeout = config.Duration(1 * time.Second) - o.SecurityPolicy = "None" - o.SecurityMode = "None" - o.codes = []ua.StatusCode{ua.StatusOK} - logger := &testutil.CaptureLogger{} - o.Log = logger + readConfig := ReadClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), + SecurityPolicy: "None", + SecurityMode: "None", + Certificate: "", + PrivateKey: "", + Username: "", + Password: "", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + } - g := GroupSettings{ + g := input.NodeGroupSettings{ MetricName: "anodic_current", TagsSlice: [][]string{ {"pot", "2002"}, @@ -70,15 +84,20 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) { for _, tags := range testopctags { g.Nodes = append(g.Nodes, MapOPCTag(tags)) } - o.Groups = append(o.Groups, g) - err = o.Init() + readConfig.Groups = append(readConfig.Groups, g) + + logger := &testutil.CaptureLogger{} + readClient, err := readConfig.CreateReadClient(logger) require.NoError(t, err) - err = Connect(&o) + err = readClient.Init() require.NoError(t, err) - require.Contains(t, logger.LastError, "E! [] status not OK for node 'ProductName'(metric name 'anodic_current', tags 'pot=2002')") + + err = readClient.Connect() + require.NoError(t, err) + require.Contains(t, logger.LastError, "E! [] status not OK for node ProductName") } -func TestClient1Integration(t *testing.T) { +func TestReadClientIntegration(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } @@ -99,79 +118,48 @@ func TestClient1Integration(t *testing.T) { {"ProductUri", "0", "i", "2262", "http://open62541.org"}, {"ManufacturerName", "0", "i", "2263", "open62541"}, {"badnode", "1", "i", "1337", nil}, - {"goodnode", "1", "s", "the.answer", "42"}, + {"goodnode", "1", "s", "the.answer", int32(42)}, + } + + readConfig := ReadClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), + SecurityPolicy: "None", + SecurityMode: "None", + Certificate: "", + PrivateKey: "", + Username: "", + Password: "", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, } - var o OpcUA - o.MetricName = "testing" - o.Endpoint = fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]) - o.AuthMethod = "Anonymous" - o.ConnectTimeout = config.Duration(10 * time.Second) - o.RequestTimeout = config.Duration(1 * time.Second) - o.SecurityPolicy = "None" - o.SecurityMode = "None" - o.codes = []ua.StatusCode{ua.StatusOK} - o.Log = testutil.Logger{} for _, tags := range testopctags { - o.RootNodes = append(o.RootNodes, MapOPCTag(tags)) - } - err = o.Init() - if err != nil { - t.Errorf("Initialize Error: %s", err) - } - err = Connect(&o) - if err != nil { - t.Fatalf("Connect Error: %s", err) + readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags)) } - for i, v := range o.nodeData { - if v.Value != nil { - types := reflect.TypeOf(v.Value) - value := reflect.ValueOf(v.Value) - compare := fmt.Sprintf("%v", value.Interface()) - if compare != testopctags[i].Want { - t.Errorf("Tag %s: Values %v for type %s does not match record", o.nodes[i].tag.FieldName, value.Interface(), types) - } - } else if testopctags[i].Want != nil { - t.Errorf("Tag: %s has value: %v", o.nodes[i].tag.FieldName, v.Value) - } - } + client, err := readConfig.CreateReadClient(testutil.Logger{}) + require.NoError(t, err) - // test unregistered reads workaround - o.Workarounds.UseUnregisteredReads = true + err = client.Init() + require.NoError(t, err, "Initialization") + err = client.Connect() + require.NoError(t, err, "Connect") - for i := range o.nodeData { - o.nodeData[i] = OPCData{} - } - - err = Connect(&o) - if err != nil { - t.Fatalf("Connect Error: %s", err) - } - - for i, v := range o.nodeData { - if v.Value != nil { - types := reflect.TypeOf(v.Value) - value := reflect.ValueOf(v.Value) - compare := fmt.Sprintf("%v", value.Interface()) - if compare != testopctags[i].Want { - t.Errorf("Tag %s: Values %v for type %s does not match record", o.nodes[i].tag.FieldName, value.Interface(), types) - } - } else if testopctags[i].Want != nil { - t.Errorf("Tag: %s has value: %v", o.nodes[i].tag.FieldName, v.Value) - } + for i, v := range client.LastReceivedData { + require.Equal(t, testopctags[i].Want, v.Value) } } -func MapOPCTag(tags OPCTags) (out NodeSettings) { - out.FieldName = tags.Name - out.Namespace = tags.Namespace - out.IdentifierType = tags.IdentifierType - out.Identifier = tags.Identifier - return out -} - -func TestConfig(t *testing.T) { +func TestReadClientConfig(t *testing.T) { toml := ` [[inputs.opcua]] name = "localhost" @@ -185,25 +173,49 @@ private_key = "/etc/telegraf/key.pem" auth_method = "Anonymous" username = "" password = "" -nodes = [ - {name="name", namespace="1", identifier_type="s", identifier="one", tags=[["tag0", "val0"]]}, - {name="name2", namespace="2", identifier_type="s", identifier="two", tags=[["tag0", "val0"], ["tag00", "val00"]]}, -] + +[[inputs.opcua.nodes]] + name = "name" + namespace = "1" + identifier_type = "s" + identifier="one" + tags=[["tag0", "val0"]] + +[[inputs.opcua.nodes]] + name="name2" + namespace="2" + identifier_type="s" + identifier="two" + tags=[["tag0", "val0"], ["tag00", "val00"]] + default_tags = {tag6 = "val6"} + [[inputs.opcua.group]] name = "foo" namespace = "3" identifier_type = "i" tags = [["tag1", "val1"], ["tag2", "val2"]] nodes = [{name="name3", identifier="3000", tags=[["tag3", "val3"]]}] + [[inputs.opcua.group]] name = "bar" namespace = "0" identifier_type = "i" tags = [["tag1", "val1"], ["tag2", "val2"]] -nodes = [{name="name4", identifier="4000", tags=[["tag1", "override"]]}] +[[inputs.opcua.group.nodes]] + name = "name4" + identifier = "4000" + tags=[["tag4", "val4"]] + default_tags = { tag1 = "override" } + +[[inputs.opcua.group.nodes]] + name = "name5" + identifier = "4001" [inputs.opcua.workarounds] additional_valid_status_codes = ["0xC0"] + +[inputs.opcua.request_workarounds] +use_unregistered_reads = true ` c := config.NewConfig() @@ -215,206 +227,70 @@ additional_valid_status_codes = ["0xC0"] o, ok := c.Inputs[0].Input.(*OpcUA) require.True(t, ok) - require.Len(t, o.RootNodes, 2) - require.Equal(t, o.RootNodes[0].FieldName, "name") - require.Equal(t, o.RootNodes[1].FieldName, "name2") - - require.Len(t, o.Groups, 2) - require.Equal(t, o.Groups[0].MetricName, "foo") - require.Len(t, o.Groups[0].Nodes, 1) - require.Equal(t, o.Groups[0].Nodes[0].Identifier, "3000") - - require.NoError(t, o.InitNodes()) - require.Len(t, o.nodes, 4) - require.Len(t, o.nodes[0].metricTags, 1) - require.Len(t, o.nodes[1].metricTags, 2) - require.Len(t, o.nodes[2].metricTags, 3) - require.Len(t, o.nodes[3].metricTags, 2) - - require.Len(t, o.Workarounds.AdditionalValidStatusCodes, 1) - require.Equal(t, o.Workarounds.AdditionalValidStatusCodes[0], "0xC0") -} - -func TestConfigWithMismatchedTypes(t *testing.T) { - toml := ` -[[inputs.opcua]] -name = "localhost" -endpoint = "opc.tcp://localhost:4840" -connect_timeout = "10s" -request_timeout = "5s" -security_policy = "auto" -security_mode = "auto" -certificate = "/etc/telegraf/cert.pem" -private_key = "/etc/telegraf/key.pem" -auth_method = "Anonymous" -username = "" -password = "" -nodes = [ - {name="name", namespace="1", identifier_type="s", identifier="one"}, - {name="name2", namespace="2", identifier_type="i", identifier="two"}, -] -` - - c := config.NewConfig() - err := c.LoadConfigData([]byte(toml)) + require.Equal(t, "localhost", o.ReadClientConfig.MetricName) + require.Equal(t, "opc.tcp://localhost:4840", o.ReadClientConfig.Endpoint) + require.Equal(t, config.Duration(10*time.Second), o.ReadClientConfig.ConnectTimeout) + require.Equal(t, config.Duration(5*time.Second), o.ReadClientConfig.RequestTimeout) + require.Equal(t, "auto", o.ReadClientConfig.SecurityPolicy) + require.Equal(t, "auto", o.ReadClientConfig.SecurityMode) + require.Equal(t, "/etc/telegraf/cert.pem", o.ReadClientConfig.Certificate) + require.Equal(t, "/etc/telegraf/key.pem", o.ReadClientConfig.PrivateKey) + require.Equal(t, "Anonymous", o.ReadClientConfig.AuthMethod) + require.Equal(t, "", o.ReadClientConfig.Username) + require.Equal(t, "", o.ReadClientConfig.Password) + require.Equal(t, []input.NodeSettings{ + { + FieldName: "name", + Namespace: "1", + IdentifierType: "s", + Identifier: "one", + TagsSlice: [][]string{{"tag0", "val0"}}, + }, + { + FieldName: "name2", + Namespace: "2", + IdentifierType: "s", + Identifier: "two", + TagsSlice: [][]string{{"tag0", "val0"}, {"tag00", "val00"}}, + DefaultTags: map[string]string{"tag6": "val6"}, + }, + }, o.ReadClientConfig.RootNodes) + require.Equal(t, []input.NodeGroupSettings{ + { + MetricName: "foo", + Namespace: "3", + IdentifierType: "i", + TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}}, + Nodes: []input.NodeSettings{{ + FieldName: "name3", + Identifier: "3000", + TagsSlice: [][]string{{"tag3", "val3"}}, + }}, + }, + { + MetricName: "bar", + Namespace: "0", + IdentifierType: "i", + TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}}, + Nodes: []input.NodeSettings{{ + FieldName: "name4", + Identifier: "4000", + TagsSlice: [][]string{{"tag4", "val4"}}, + DefaultTags: map[string]string{"tag1": "override"}, + }, { + FieldName: "name5", + Identifier: "4001", + }}, + }, + }, o.ReadClientConfig.Groups) + require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.ReadClientConfig.Workarounds) + require.Equal(t, ReadClientWorkarounds{UseUnregisteredReads: true}, o.ReadClientConfig.ReadClientWorkarounds) + err = o.Init() require.NoError(t, err) - - require.Len(t, c.Inputs, 1) - - o, ok := c.Inputs[0].Input.(*OpcUA) - require.True(t, ok) - - require.Len(t, o.RootNodes, 2) - require.Equal(t, o.RootNodes[0].FieldName, "name") - require.Equal(t, o.RootNodes[1].FieldName, "name2") - - require.Error(t, o.InitNodes()) -} - -func TestTagsSliceToMap(t *testing.T) { - m, err := tagsSliceToMap([][]string{{"foo", "bar"}, {"baz", "bat"}}) - require.NoError(t, err) - require.Len(t, m, 2) - require.Equal(t, m["foo"], "bar") - require.Equal(t, m["baz"], "bat") -} - -func TestTagsSliceToMap_twoStrings(t *testing.T) { - var err error - _, err = tagsSliceToMap([][]string{{"foo", "bar", "baz"}}) - require.Error(t, err) - _, err = tagsSliceToMap([][]string{{"foo"}}) - require.Error(t, err) -} - -func TestTagsSliceToMap_dupeKey(t *testing.T) { - _, err := tagsSliceToMap([][]string{{"foo", "bar"}, {"foo", "bat"}}) - require.Error(t, err) -} - -func TestTagsSliceToMap_empty(t *testing.T) { - _, err := tagsSliceToMap([][]string{{"foo", ""}}) - require.Equal(t, fmt.Errorf("tag 1 has empty value"), err) - _, err = tagsSliceToMap([][]string{{"", "bar"}}) - require.Equal(t, fmt.Errorf("tag 1 has empty name"), err) -} - -func TestValidateOPCTags(t *testing.T) { - tests := []struct { - name string - nodes []Node - err error - }{ - { - "same", - []Node{ - { - metricName: "mn", - tag: NodeSettings{FieldName: "fn", IdentifierType: "s"}, - metricTags: map[string]string{"t1": "v1", "t2": "v2"}, - }, - { - metricName: "mn", - tag: NodeSettings{FieldName: "fn", IdentifierType: "s"}, - metricTags: map[string]string{"t1": "v1", "t2": "v2"}, - }, - }, - fmt.Errorf("name 'fn' is duplicated (metric name 'mn', tags 't1=v1, t2=v2')"), - }, - { - "different metric tag names", - []Node{ - { - metricName: "mn", - tag: NodeSettings{FieldName: "fn", IdentifierType: "s"}, - metricTags: map[string]string{"t1": "", "t2": ""}, - }, - { - metricName: "mn", - tag: NodeSettings{FieldName: "fn", IdentifierType: "s"}, - metricTags: map[string]string{"t1": "", "t3": ""}, - }, - }, - nil, - }, - { - "different metric tag values", - []Node{ - { - metricName: "mn", - tag: NodeSettings{FieldName: "fn", IdentifierType: "s"}, - metricTags: map[string]string{"t1": "foo", "t2": ""}, - }, - { - metricName: "mn", - tag: NodeSettings{FieldName: "fn", IdentifierType: "s"}, - metricTags: map[string]string{"t1": "bar", "t2": ""}, - }, - }, - nil, - }, - { - "different metric names", - []Node{ - { - metricName: "mn", - tag: NodeSettings{FieldName: "fn", IdentifierType: "s"}, - metricTags: map[string]string{"t1": "", "t2": ""}, - }, - { - metricName: "mn2", - tag: NodeSettings{FieldName: "fn", IdentifierType: "s"}, - metricTags: map[string]string{"t1": "", "t2": ""}, - }, - }, - nil, - }, - { - "different field names", - []Node{ - { - metricName: "mn", - tag: NodeSettings{FieldName: "fn", IdentifierType: "s"}, - metricTags: map[string]string{"t1": "", "t2": ""}, - }, - { - metricName: "mn", - tag: NodeSettings{FieldName: "fn2", IdentifierType: "s"}, - metricTags: map[string]string{"t1": "", "t2": ""}, - }, - }, - nil, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - o := OpcUA{ - nodes: tt.nodes, - Log: testutil.Logger{}, - } - require.Equal(t, tt.err, o.validateOPCTags()) - }) - } -} - -func TestSetupWorkarounds(t *testing.T) { - var o OpcUA - o.codes = []ua.StatusCode{ua.StatusOK} - - o.Workarounds.AdditionalValidStatusCodes = []string{"0xC0", "0x00AA0000"} - - err := o.setupWorkarounds() - require.NoError(t, err) - - require.Len(t, o.codes, 3) - require.Equal(t, o.codes[0], ua.StatusCode(0)) - require.Equal(t, o.codes[1], ua.StatusCode(192)) - require.Equal(t, o.codes[2], ua.StatusCode(11141120)) -} - -func TestCheckStatusCode(t *testing.T) { - var o OpcUA - o.codes = []ua.StatusCode{ua.StatusCode(0), ua.StatusCode(192), ua.StatusCode(11141120)} - require.Equal(t, o.checkStatusCode(ua.StatusCode(192)), true) + require.Len(t, o.client.NodeMetricMapping, 5, "incorrect number of nodes") + require.EqualValues(t, o.client.NodeMetricMapping[0].MetricTags, map[string]string{"tag0": "val0"}) + require.EqualValues(t, o.client.NodeMetricMapping[1].MetricTags, map[string]string{"tag6": "val6"}) + require.EqualValues(t, o.client.NodeMetricMapping[2].MetricTags, map[string]string{"tag1": "val1", "tag2": "val2", "tag3": "val3"}) + require.EqualValues(t, o.client.NodeMetricMapping[3].MetricTags, map[string]string{"tag1": "override", "tag2": "val2"}) + require.EqualValues(t, o.client.NodeMetricMapping[4].MetricTags, map[string]string{"tag1": "val1", "tag2": "val2"}) } diff --git a/plugins/inputs/opcua/read_client.go b/plugins/inputs/opcua/read_client.go new file mode 100644 index 000000000..820f0c538 --- /dev/null +++ b/plugins/inputs/opcua/read_client.go @@ -0,0 +1,150 @@ +package opcua + +import ( + "context" + "fmt" + "github.com/gopcua/opcua/ua" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/common/opcua" + "github.com/influxdata/telegraf/plugins/common/opcua/input" + "github.com/influxdata/telegraf/selfstat" +) + +type ReadClientWorkarounds struct { + UseUnregisteredReads bool `toml:"use_unregistered_reads"` +} + +type ReadClientConfig struct { + ReadClientWorkarounds ReadClientWorkarounds `toml:"request_workarounds"` + input.InputClientConfig +} + +// ReadClient Requests the current values from the required nodes when gather is called. +type ReadClient struct { + *input.OpcUAInputClient + + ReadSuccess selfstat.Stat + ReadError selfstat.Stat + Workarounds ReadClientWorkarounds + + // internal values + req *ua.ReadRequest +} + +func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, error) { + inputClient, err := rc.InputClientConfig.CreateInputClient(log) + if err != nil { + return nil, err + } + + tags := map[string]string{ + "endpoint": inputClient.Config.OpcUAClientConfig.Endpoint, + } + + return &ReadClient{ + OpcUAInputClient: inputClient, + ReadSuccess: selfstat.Register("opcua", "read_success", tags), + ReadError: selfstat.Register("opcua", "read_error", tags), + Workarounds: rc.ReadClientWorkarounds, + }, nil +} + +func (o *ReadClient) Connect() error { + err := o.OpcUAClient.Connect() + if err != nil { + return err + } + + readValueIds := make([]*ua.ReadValueID, len(o.NodeIDs)) + if o.Workarounds.UseUnregisteredReads { + for i, nid := range o.NodeIDs { + readValueIds[i] = &ua.ReadValueID{NodeID: nid} + } + } else { + regResp, err := o.Client.RegisterNodes(&ua.RegisterNodesRequest{ + NodesToRegister: o.NodeIDs, + }) + if err != nil { + return fmt.Errorf("registerNodes failed: %v", err) + } + + for i, v := range regResp.RegisteredNodeIDs { + readValueIds[i] = &ua.ReadValueID{NodeID: v} + } + } + + o.req = &ua.ReadRequest{ + MaxAge: 2000, + TimestampsToReturn: ua.TimestampsToReturnBoth, + NodesToRead: readValueIds, + } + + err = o.read() + if err != nil { + return fmt.Errorf("get Data Failed: %v", err) + } + + return nil +} + +func (o *ReadClient) ensureConnected() error { + if o.State == opcua.Disconnected { + err := o.Connect() + if err != nil { + return err + } + } + + return nil +} + +func (o *ReadClient) CurrentValues() ([]telegraf.Metric, error) { + err := o.ensureConnected() + if err != nil { + return nil, err + } + + err = o.read() + if err != nil && o.State == opcua.Connected { + // We do not return the disconnect error, as this would mask the + // original problem, but we do log it + disconnectErr := o.Disconnect(context.Background()) + if disconnectErr != nil { + o.Log.Debug("Error while disconnecting: ", disconnectErr) + } + + return nil, err + } + + metrics := make([]telegraf.Metric, 0, len(o.NodeMetricMapping)) + // Parse the resulting data into metrics + for i := range o.NodeIDs { + if !o.StatusCodeOK(o.LastReceivedData[i].Quality) { + continue + } + + metrics = append(metrics, o.MetricForNode(i)) + } + + return metrics, nil +} + +func (o *ReadClient) read() error { + resp, err := o.Client.Read(o.req) + if err != nil { + o.ReadError.Incr(1) + return fmt.Errorf("RegisterNodes Read failed: %v", err) + } + o.ReadSuccess.Incr(1) + for i, d := range resp.Results { + o.UpdateNodeValue(i, d) + } + return nil +} + +// StartStreamValues does nothing for the read client, as it has to actively fetch values. The channel is closed immediately. +func (o *ReadClient) StartStreamValues(_ context.Context) (<-chan telegraf.Metric, error) { + c := make(chan telegraf.Metric) + defer close(c) + return c, nil +} diff --git a/plugins/inputs/opcua/sample.conf b/plugins/inputs/opcua/sample.conf index 6ff01f025..3822bb66d 100644 --- a/plugins/inputs/opcua/sample.conf +++ b/plugins/inputs/opcua/sample.conf @@ -9,7 +9,7 @@ ## Maximum time allowed to establish a connect to the endpoint. # connect_timeout = "10s" # - ## Maximum time allowed for a request over the estabilished connection. + ## Maximum time allowed for a request over the established connection. # request_timeout = "5s" # ## Security policy, one of "None", "Basic128Rsa15", "Basic256", @@ -48,18 +48,38 @@ ## namespace - OPC UA namespace of the node (integer value 0 thru 3) ## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque) ## identifier - OPC UA ID (tag as shown in opcua browser) - ## tags - extra tags to be added to the output metric (optional) - ## Example: - ## {name="ProductUri", namespace="0", identifier_type="i", identifier="2262", tags=[["tag1","value1"],["tag2","value2"]]} + ## tags - extra tags to be added to the output metric (optional); deprecated in 1.25.0; use default_tags + ## default_tags - extra tags to be added to the output metric (optional) + ## + ## Use either the inline notation or the bracketed notation, not both. + # + ## Inline notation (default_tags not supported yet) # nodes = [ - # {name="", namespace="", identifier_type="", identifier=""}, - # {name="", namespace="", identifier_type="", identifier=""}, - #] + # {name="", namespace="", identifier_type="", identifier="", tags=[["tag1", "value1"], ["tag2", "value2"]}, + # {name="", namespace="", identifier_type="", identifier=""}, + # ] + # + ## Bracketed notation + # [[inputs.opcua.nodes]] + # name = "node1" + # namespace = "" + # identifier_type = "" + # identifier = "" + # default_tags = { tag1 = "value1", tag2 = "value2" } + # + # [[inputs.opcua.nodes]] + # name = "node2" + # namespace = "" + # identifier_type = "" + # identifier = "" # ## Node Group - ## Sets defaults for OPC UA namespace and ID type so they aren't required in - ## every node. A group can also have a metric name that overrides the main - ## plugin metric name. + ## Sets defaults so they aren't required in every node. + ## Default values can be set for: + ## * Metric name + ## * OPC UA namespace + ## * Identifier + ## * Default tags ## ## Multiple node groups are allowed #[[inputs.opcua.group]] @@ -75,16 +95,39 @@ ## namespace, this is used. # identifier_type = # + ## Default tags that are applied to every node in this group. Can be + ## overwritten in a node by setting a different value for the tag name. + ## example: default_tags = { tag1 = "value1" } + # default_tags = {} + # ## Node ID Configuration. Array of nodes with the same settings as above. + ## Use either the inline notation or the bracketed notation, not both. + # + ## Inline notation (default_tags not supported yet) # nodes = [ - # {name="", namespace="", identifier_type="", identifier=""}, - # {name="", namespace="", identifier_type="", identifier=""}, + # {name="node1", namespace="", identifier_type="", identifier=""}, + # {name="node2", namespace="", identifier_type="", identifier=""}, #] + # + ## Bracketed notation + # [[inputs.opcua.group.nodes]] + # name = "node1" + # namespace = "" + # identifier_type = "" + # identifier = "" + # default_tags = { tag1 = "override1", tag2 = "value2" } + # + # [[inputs.opcua.group.nodes]] + # name = "node2" + # namespace = "" + # identifier_type = "" + # identifier = "" ## Enable workarounds required by some devices to work correctly # [inputs.opcua.workarounds] ## Set additional valid status codes, StatusOK (0x0) is always considered valid # additional_valid_status_codes = ["0xC0"] + # [inputs.opcua.request_workarounds] ## Use unregistered reads instead of registered reads # use_unregistered_reads = false diff --git a/plugins/inputs/opcua_listener/README.md b/plugins/inputs/opcua_listener/README.md new file mode 100644 index 000000000..5b275d5e1 --- /dev/null +++ b/plugins/inputs/opcua_listener/README.md @@ -0,0 +1,241 @@ +# OPC UA Client Listener Input Plugin + +The `opcua_listener` plugin subscribes to data from OPC UA Server devices. + +Telegraf minimum version: Telegraf 1.25 +Plugin minimum tested version: 1.25 + +## Configuration + +```toml @sample.conf +# Retrieve data from OPCUA devices +[[inputs.opcua_listener]] + ## Metric name + # name = "opcua_listener" + # + ## OPC UA Endpoint URL + # endpoint = "opc.tcp://localhost:4840" + # + ## Maximum time allowed to establish a connect to the endpoint. + # connect_timeout = "10s" + # + ## Maximum time allowed for a request over the established connection. + # request_timeout = "5s" + # + ## The interval at which the server should at least update its monitored items + # subscription_interval = "100ms" + # + ## Security policy, one of "None", "Basic128Rsa15", "Basic256", + ## "Basic256Sha256", or "auto" + # security_policy = "auto" + # + ## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto" + # security_mode = "auto" + # + ## Path to cert.pem. Required when security mode or policy isn't "None". + ## If cert path is not supplied, self-signed cert and key will be generated. + # certificate = "/etc/telegraf/cert.pem" + # + ## Path to private key.pem. Required when security mode or policy isn't "None". + ## If key path is not supplied, self-signed cert and key will be generated. + # private_key = "/etc/telegraf/key.pem" + # + ## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To + ## authenticate using a specific ID, select 'Certificate' or 'UserName' + # auth_method = "Anonymous" + # + ## Username. Required for auth_method = "UserName" + # username = "" + # + ## Password. Required for auth_method = "UserName" + # password = "" + # + ## Option to select the metric timestamp to use. Valid options are: + ## "gather" -- uses the time of receiving the data in telegraf + ## "server" -- uses the timestamp provided by the server + ## "source" -- uses the timestamp provided by the source + # timestamp = "gather" + # + ## Node ID configuration + ## name - field name to use in the output + ## namespace - OPC UA namespace of the node (integer value 0 thru 3) + ## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque) + ## identifier - OPC UA ID (tag as shown in opcua browser) + ## default_tags - extra tags to be added to the output metric (optional) + ## + ## Use either the inline notation or the bracketed notation, not both. + # + ## Inline notation (default_tags not supported yet) + # nodes = [ + # {name="", namespace="", identifier_type="", identifier=""}, + # {name="", namespace="", identifier_type="", identifier=""}, + # ] + # + ## Bracketed notation + # [[inputs.opcua_listener.nodes]] + # name = "node1" + # namespace = "" + # identifier_type = "" + # identifier = "" + # default_tags = { tag1 = "value1", tag2 = "value2" } + # + # [[inputs.opcua_listener.nodes]] + # name = "node2" + # namespace = "" + # identifier_type = "" + # identifier = "" + # + ## Node Group + ## Sets defaults so they aren't required in every node. + ## Default values can be set for: + ## * Metric name + ## * OPC UA namespace + ## * Identifier + ## * Default tags + ## + ## Multiple node groups are allowed + #[[inputs.opcua_listener.group]] + ## Group Metric name. Overrides the top level name. If unset, the + ## top level name is used. + # name = + # + ## Group default namespace. If a node in the group doesn't set its + ## namespace, this is used. + # namespace = + # + ## Group default identifier type. If a node in the group doesn't set its + ## namespace, this is used. + # identifier_type = + # + ## Default tags that are applied to every node in this group. Can be + ## overwritten in a node by setting a different value for the tag name. + ## example: default_tags = { tag1 = "value1" } + # default_tags = {} + # + ## Node ID Configuration. Array of nodes with the same settings as above. + ## Use either the inline notation or the bracketed notation, not both. + # + ## Inline notation (default_tags not supported yet) + # nodes = [ + # {name="node1", namespace="", identifier_type="", identifier=""}, + # {name="node2", namespace="", identifier_type="", identifier=""}, + #] + # + ## Bracketed notation + # [[inputs.opcua_listener.group.nodes]] + # name = "node1" + # namespace = "" + # identifier_type = "" + # identifier = "" + # default_tags = { tag1 = "override1", tag2 = "value2" } + # + # [[inputs.opcua_listener.group.nodes]] + # name = "node2" + # namespace = "" + # identifier_type = "" + # identifier = "" + + ## Enable workarounds required by some devices to work correctly + # [inputs.opcua_listener.workarounds] + ## Set additional valid status codes, StatusOK (0x0) is always considered valid + # additional_valid_status_codes = ["0xC0"] + + # [inputs.opcua_listener.request_workarounds] + ## Use unregistered reads instead of registered reads + # use_unregistered_reads = false +``` + +## Node Configuration + +An OPC UA node ID may resemble: "ns=3;s=Temperature". In this example: + +- ns=3 is indicating the `namespace` is 3 +- s=Temperature is indicting that the `identifier_type` is a string and `identifier` value is 'Temperature' +- This example temperature node has a value of 79.0 +To gather data from this node enter the following line into the 'nodes' property above: + +```text +{field_name="temp", namespace="3", identifier_type="s", identifier="Temperature"}, +``` + +This node configuration produces a metric like this: + +```text +opcua,id=ns\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000 +``` + +## Group Configuration + +Groups can set default values for the namespace, identifier type, and +tags settings. The default values apply to all the nodes in the +group. If a default is set, a node may omit the setting altogether. +This simplifies node configuration, especially when many nodes share +the same namespace or identifier type. + +The output metric will include tags set in the group and the node. If +a tag with the same name is set in both places, the tag value from the +node is used. + +This example group configuration has three groups with two nodes each: + +```toml + # Group 1 + [[inputs.opcua_listener.group]] + name = "group1_metric_name" + namespace = "3" + identifier_type = "i" + default_tags = { group1_tag = "val1" } + [[inputs.opcua.group.nodes]] + name = "name" + identifier = "1001" + default_tags = { node1_tag = "val2" } + [[inputs.opcua.group.nodes]] + name = "name" + identifier = "1002" + default_tags = {node1_tag = "val3"} + + # Group 2 + [[inputs.opcua_listener.group]] + name = "group2_metric_name" + namespace = "3" + identifier_type = "i" + default_tags = { group2_tag = "val3" } + [[inputs.opcua.group.nodes]] + name = "saw" + identifier = "1003" + default_tags = { node2_tag = "val4" } + [[inputs.opcua.group.nodes]] + name = "sin" + identifier = "1004" + + # Group 3 + [[inputs.opcua_listener.group]] + name = "group3_metric_name" + namespace = "3" + identifier_type = "i" + default_tags = { group3_tag = "val5" } + nodes = [ + {name="name", identifier="1001"}, + {name="name", identifier="1002"}, + ] +``` + +## Connection Service + +This plugin subscribes to the specified nodes to receive data from +the OPC server. The updates are received at most as fast as the +`subscription_interval`. + +## Metrics + +The metrics collected by this input plugin will depend on the +configured `nodes` and `group`. + +## Example Output + +```text +group1_metric_name,group1_tag=val1,id=ns\=3;i\=1001,node1_tag=val2 name=0,Quality="OK (0x0)" 1606893246000000000 +group1_metric_name,group1_tag=val1,id=ns\=3;i\=1002,node1_tag=val3 name=-1.389117,Quality="OK (0x0)" 1606893246000000000 +group2_metric_name,group2_tag=val3,id=ns\=3;i\=1003,node2_tag=val4 Quality="OK (0x0)",saw=-1.6 1606893246000000000 +group2_metric_name,group2_tag=val3,id=ns\=3;i\=1004 sin=1.902113,Quality="OK (0x0)" 1606893246000000000 +``` diff --git a/plugins/inputs/opcua_listener/opcua_listener.go b/plugins/inputs/opcua_listener/opcua_listener.go new file mode 100644 index 000000000..8e266c440 --- /dev/null +++ b/plugins/inputs/opcua_listener/opcua_listener.go @@ -0,0 +1,91 @@ +package opcua_listener + +import ( + "context" + _ "embed" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/opcua" + "github.com/influxdata/telegraf/plugins/common/opcua/input" + "github.com/influxdata/telegraf/plugins/inputs" + "time" +) + +type OpcUaListener struct { + SubscribeClientConfig + client *SubscribeClient + Log telegraf.Logger `toml:"-"` +} + +//go:embed sample.conf +var sampleConfig string + +func (*OpcUaListener) SampleConfig() string { + return sampleConfig +} + +func (o *OpcUaListener) Init() (err error) { + o.client, err = o.SubscribeClientConfig.CreateSubscribeClient(o.Log) + return err +} + +func (o *OpcUaListener) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (o *OpcUaListener) Start(acc telegraf.Accumulator) error { + ctx := context.Background() + ch, err := o.client.StartStreamValues(ctx) + if err != nil { + return err + } + + go func() { + for { + m, ok := <-ch + if !ok { + o.Log.Debug("Metric collection stopped due to closed channel") + return + } + acc.AddMetric(m) + } + }() + + return nil +} + +func (o *OpcUaListener) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + select { + case <-o.client.Stop(ctx): + o.Log.Infof("Unsubscribed OPC UA successfully") + case <-ctx.Done(): // Timeout context + o.Log.Warn("Timeout while stopping OPC UA subscription") + } + cancel() +} + +// Add this plugin to telegraf +func init() { + inputs.Add("opcua_listener", func() telegraf.Input { + return &OpcUaListener{ + SubscribeClientConfig: SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: "opc.tcp://localhost:4840", + SecurityPolicy: "auto", + SecurityMode: "auto", + Certificate: "/etc/telegraf/cert.pem", + PrivateKey: "/etc/telegraf/key.pem", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(5 * time.Second), + RequestTimeout: config.Duration(10 * time.Second), + }, + MetricName: "opcua", + Timestamp: input.TimestampSourceTelegraf, + }, + SubscriptionInterval: config.Duration(100 * time.Millisecond), + }, + } + }) +} diff --git a/plugins/inputs/opcua_listener/opcua_listener_test.go b/plugins/inputs/opcua_listener/opcua_listener_test.go new file mode 100644 index 000000000..50f7bdcb3 --- /dev/null +++ b/plugins/inputs/opcua_listener/opcua_listener_test.go @@ -0,0 +1,240 @@ +package opcua_listener + +import ( + "context" + "fmt" + "github.com/docker/go-connections/nat" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/opcua" + "github.com/influxdata/telegraf/plugins/common/opcua/input" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go/wait" + "testing" + "time" +) + +const servicePort = "4840" + +type OPCTags struct { + Name string + Namespace string + IdentifierType string + Identifier string + Want interface{} +} + +func MapOPCTag(tags OPCTags) (out input.NodeSettings) { + out.FieldName = tags.Name + out.Namespace = tags.Namespace + out.IdentifierType = tags.IdentifierType + out.Identifier = tags.Identifier + return out +} + +func TestSubscribeClientIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := testutil.Container{ + Image: "open62541/open62541", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForListeningPort(nat.Port(servicePort)), + } + err := container.Start() + require.NoError(t, err, "failed to start container") + defer func() { + require.NoError(t, container.Terminate(), "terminating container failed") + }() + + var testopctags = []OPCTags{ + {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, + {"ProductUri", "0", "i", "2262", "http://open62541.org"}, + {"ManufacturerName", "0", "i", "2263", "open62541"}, + {"badnode", "1", "i", "1337", nil}, + {"goodnode", "1", "s", "the.answer", int32(42)}, + } + var tagsRemaining = make([]string, 0, len(testopctags)) + for i, tag := range testopctags { + if tag.Want != nil { + tagsRemaining = append(tagsRemaining, testopctags[i].Name) + } + } + + subscribeConfig := SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + for _, tags := range testopctags { + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, MapOPCTag(tags)) + } + o, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + require.NoError(t, err) + + err = o.Init() + require.NoError(t, err, "Initialization") + err = o.Connect() + require.NoError(t, err, "Connect") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + res, err := o.StartStreamValues(ctx) + require.NoError(t, err) + + for { + select { + case m := <-res: + for fieldName, fieldValue := range m.Fields() { + for _, tag := range testopctags { + if fieldName != tag.Name { + continue + } + + if tag.Want == nil { + t.Errorf("Tag: %s has value: %v", tag.Name, fieldValue) + return + } + + require.Equal(t, tag.Want, fieldValue) + + newRemaining := make([]string, 0, len(tagsRemaining)) + for _, remainingTag := range tagsRemaining { + if fieldName != remainingTag { + newRemaining = append(newRemaining, remainingTag) + break + } + } + + if len(newRemaining) <= 0 { + return + } + + tagsRemaining = newRemaining + } + } + + case <-ctx.Done(): + msg := "" + for _, tag := range tagsRemaining { + msg += tag + ", " + } + + t.Errorf("Tags %s are remaining without a received value", msg) + return + } + } +} + +func TestSubscribeClientConfig(t *testing.T) { + toml := ` +[[inputs.opcua_listener]] +name = "localhost" +endpoint = "opc.tcp://localhost:4840" +connect_timeout = "10s" +request_timeout = "5s" +subscription_interval = "200ms" +security_policy = "auto" +security_mode = "auto" +certificate = "/etc/telegraf/cert.pem" +private_key = "/etc/telegraf/key.pem" +auth_method = "Anonymous" +username = "" +password = "" +nodes = [ + {name="name", namespace="1", identifier_type="s", identifier="one"}, + {name="name2", namespace="2", identifier_type="s", identifier="two"}, +] + +[[inputs.opcua_listener.group]] +name = "foo" +namespace = "3" +identifier_type = "i" +tags = [["tag1", "val1"], ["tag2", "val2"]] +nodes = [{name="name3", identifier="3000", tags=[["tag3", "val3"]]}] + +[[inputs.opcua_listener.group]] +name = "bar" +namespace = "0" +identifier_type = "i" +tags = [["tag1", "val1"], ["tag2", "val2"]] +nodes = [{name="name4", identifier="4000", tags=[["tag1", "override"]]}] + +[inputs.opcua_listener.workarounds] +additional_valid_status_codes = ["0xC0"] +` + + c := config.NewConfig() + err := c.LoadConfigData([]byte(toml)) + require.NoError(t, err) + + require.Len(t, c.Inputs, 1) + + o, ok := c.Inputs[0].Input.(*OpcUaListener) + require.True(t, ok) + + require.Equal(t, "localhost", o.SubscribeClientConfig.MetricName) + require.Equal(t, "opc.tcp://localhost:4840", o.SubscribeClientConfig.Endpoint) + require.Equal(t, config.Duration(10*time.Second), o.SubscribeClientConfig.ConnectTimeout) + require.Equal(t, config.Duration(5*time.Second), o.SubscribeClientConfig.RequestTimeout) + require.Equal(t, config.Duration(200*time.Millisecond), o.SubscribeClientConfig.SubscriptionInterval) + require.Equal(t, "auto", o.SubscribeClientConfig.SecurityPolicy) + require.Equal(t, "auto", o.SubscribeClientConfig.SecurityMode) + require.Equal(t, "/etc/telegraf/cert.pem", o.SubscribeClientConfig.Certificate) + require.Equal(t, "/etc/telegraf/key.pem", o.SubscribeClientConfig.PrivateKey) + require.Equal(t, "Anonymous", o.SubscribeClientConfig.AuthMethod) + require.Equal(t, "", o.SubscribeClientConfig.Username) + require.Equal(t, "", o.SubscribeClientConfig.Password) + require.Equal(t, []input.NodeSettings{ + { + FieldName: "name", + Namespace: "1", + IdentifierType: "s", + Identifier: "one", + }, + { + FieldName: "name2", + Namespace: "2", + IdentifierType: "s", + Identifier: "two", + }, + }, o.SubscribeClientConfig.RootNodes) + require.Equal(t, []input.NodeGroupSettings{ + { + MetricName: "foo", + Namespace: "3", + IdentifierType: "i", + TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}}, + Nodes: []input.NodeSettings{{ + FieldName: "name3", + Identifier: "3000", + TagsSlice: [][]string{{"tag3", "val3"}}, + }}, + }, + { + MetricName: "bar", + Namespace: "0", + IdentifierType: "i", + TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}}, + Nodes: []input.NodeSettings{{ + FieldName: "name4", + Identifier: "4000", + TagsSlice: [][]string{{"tag1", "override"}}, + }}, + }, + }, o.SubscribeClientConfig.Groups) + require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.SubscribeClientConfig.Workarounds) +} diff --git a/plugins/inputs/opcua_listener/sample.conf b/plugins/inputs/opcua_listener/sample.conf new file mode 100644 index 000000000..ed8245abd --- /dev/null +++ b/plugins/inputs/opcua_listener/sample.conf @@ -0,0 +1,135 @@ +# Retrieve data from OPCUA devices +[[inputs.opcua]] + ## Metric name + # name = "opcua" + # + ## OPC UA Endpoint URL + # endpoint = "opc.tcp://localhost:4840" + # + ## Maximum time allowed to establish a connect to the endpoint. + # connect_timeout = "10s" + # + ## Maximum time allowed for a request over the established connection. + # request_timeout = "5s" + # + ## The interval at which the server should at least update its monitored items + # subscription_interval = "100ms" + # + ## Security policy, one of "None", "Basic128Rsa15", "Basic256", + ## "Basic256Sha256", or "auto" + # security_policy = "auto" + # + ## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto" + # security_mode = "auto" + # + ## Path to cert.pem. Required when security mode or policy isn't "None". + ## If cert path is not supplied, self-signed cert and key will be generated. + # certificate = "/etc/telegraf/cert.pem" + # + ## Path to private key.pem. Required when security mode or policy isn't "None". + ## If key path is not supplied, self-signed cert and key will be generated. + # private_key = "/etc/telegraf/key.pem" + # + ## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To + ## authenticate using a specific ID, select 'Certificate' or 'UserName' + # auth_method = "Anonymous" + # + ## Username. Required for auth_method = "UserName" + # username = "" + # + ## Password. Required for auth_method = "UserName" + # password = "" + # + ## Option to select the metric timestamp to use. Valid options are: + ## "gather" -- uses the time of receiving the data in telegraf + ## "server" -- uses the timestamp provided by the server + ## "source" -- uses the timestamp provided by the source + # timestamp = "gather" + # + ## Node ID configuration + ## name - field name to use in the output + ## namespace - OPC UA namespace of the node (integer value 0 thru 3) + ## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque) + ## identifier - OPC UA ID (tag as shown in opcua browser) + ## default_tags - extra tags to be added to the output metric (optional) + ## + ## Use either the inline notation or the bracketed notation, not both. + # + ## Inline notation (default_tags not supported yet) + # nodes = [ + # {name="", namespace="", identifier_type="", identifier=""}, + # {name="", namespace="", identifier_type="", identifier=""}, + # ] + # + ## Bracketed notation + # [[inputs.opcua.nodes]] + # name = "node1" + # namespace = "" + # identifier_type = "" + # identifier = "" + # default_tags = { tag1 = "value1", tag2 = "value2" } + # + # [[inputs.opcua.nodes]] + # name = "node2" + # namespace = "" + # identifier_type = "" + # identifier = "" + # + ## Node Group + ## Sets defaults so they aren't required in every node. + ## Default values can be set for: + ## * Metric name + ## * OPC UA namespace + ## * Identifier + ## * Default tags + ## + ## Multiple node groups are allowed + #[[inputs.opcua.group]] + ## Group Metric name. Overrides the top level name. If unset, the + ## top level name is used. + # name = + # + ## Group default namespace. If a node in the group doesn't set its + ## namespace, this is used. + # namespace = + # + ## Group default identifier type. If a node in the group doesn't set its + ## namespace, this is used. + # identifier_type = + # + ## Default tags that are applied to every node in this group. Can be + ## overwritten in a node by setting a different value for the tag name. + ## example: default_tags = { tag1 = "value1" } + # default_tags = {} + # + ## Node ID Configuration. Array of nodes with the same settings as above. + ## Use either the inline notation or the bracketed notation, not both. + # + ## Inline notation (default_tags not supported yet) + # nodes = [ + # {name="node1", namespace="", identifier_type="", identifier=""}, + # {name="node2", namespace="", identifier_type="", identifier=""}, + #] + # + ## Bracketed notation + # [[inputs.opcua.group.nodes]] + # name = "node1" + # namespace = "" + # identifier_type = "" + # identifier = "" + # default_tags = { tag1 = "override1", tag2 = "value2" } + # + # [[inputs.opcua.group.nodes]] + # name = "node2" + # namespace = "" + # identifier_type = "" + # identifier = "" + + ## Enable workarounds required by some devices to work correctly + # [inputs.opcua.workarounds] + ## Set additional valid status codes, StatusOK (0x0) is always considered valid + # additional_valid_status_codes = ["0xC0"] + + # [inputs.opcua.request_workarounds] + ## Use unregistered reads instead of registered reads + # use_unregistered_reads = false diff --git a/plugins/inputs/opcua_listener/subscribe_client.go b/plugins/inputs/opcua_listener/subscribe_client.go new file mode 100644 index 000000000..00fd20ad9 --- /dev/null +++ b/plugins/inputs/opcua_listener/subscribe_client.go @@ -0,0 +1,153 @@ +package opcua_listener + +import ( + "context" + "fmt" + "github.com/gopcua/opcua" + "github.com/gopcua/opcua/ua" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/opcua/input" + "reflect" + "time" +) + +type SubscribeClientConfig struct { + input.InputClientConfig + SubscriptionInterval config.Duration `toml:"subscription_interval"` +} + +type SubscribeClient struct { + *input.OpcUAInputClient + Config SubscribeClientConfig + + sub *opcua.Subscription + monitoredItemsReqs []*ua.MonitoredItemCreateRequest + dataNotifications chan *opcua.PublishNotificationData + metrics chan telegraf.Metric + + processingCtx context.Context + processingCancel context.CancelFunc +} + +func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*SubscribeClient, error) { + client, err := sc.InputClientConfig.CreateInputClient(log) + if err != nil { + return nil, err + } + subClient := &SubscribeClient{ + OpcUAInputClient: client, + Config: *sc, + monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeIDs)), + // 100 was chosen to make sure that the channels will not block when multiple changes come in at the same time. + // The channel size should be increased if reports come in on Telegraf blocking when many changes come in at + // the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval. + dataNotifications: make(chan *opcua.PublishNotificationData, 100), + metrics: make(chan telegraf.Metric, 100), + } + + log.Debugf("Creating monitored items") + for i, nodeID := range client.NodeIDs { + // The node id index (i) is used as the handle for the monitored item + req := opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, uint32(i)) + subClient.monitoredItemsReqs[i] = req + } + + return subClient, nil +} + +func (o *SubscribeClient) Connect() error { + err := o.OpcUAClient.Connect() + if err != nil { + return err + } + + o.Log.Debugf("Creating OPC UA subscription") + o.sub, err = o.Client.Subscribe(&opcua.SubscriptionParameters{ + Interval: time.Duration(o.Config.SubscriptionInterval), + }, o.dataNotifications) + if err != nil { + o.Log.Error("Failed to create subscription") + return err + } + + o.Log.Debugf("Subscribed with subscription ID %d", o.sub.SubscriptionID) + return nil +} + +func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} { + o.Log.Debugf("Opc Subscribe Stopped") + err := o.sub.Cancel(ctx) + if err != nil { + o.Log.Warn("Cancelling OPC UA subscription failed with error ", err) + } + closing := o.OpcUAInputClient.Stop(ctx) + o.processingCancel() + return closing +} + +func (o *SubscribeClient) CurrentValues() ([]telegraf.Metric, error) { + return []telegraf.Metric{}, nil +} + +func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) { + err := o.Connect() + if err != nil { + return nil, err + } + + resp, err := o.sub.MonitorWithContext(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...) + if err != nil { + o.Log.Error("Failed to create monitored items ", err) + return nil, fmt.Errorf("failed to start monitoring items %s", err) + } + o.Log.Debug("Monitoring items") + + for _, res := range resp.Results { + if !o.StatusCodeOK(res.StatusCode) { + return nil, fmt.Errorf("creating monitored item failed with status code %d", res.StatusCode) + } + } + + o.processingCtx, o.processingCancel = context.WithCancel(context.Background()) + go o.processReceivedNotifications() + + return o.metrics, nil +} + +func (o *SubscribeClient) processReceivedNotifications() { + for { + select { + case <-o.processingCtx.Done(): + o.Log.Debug("Processing received notifications stopped") + return + + case res, ok := <-o.dataNotifications: + if !ok { + o.Log.Debugf("Data notification channel closed. Processing of received notifications stopped") + return + } + if res.Error != nil { + o.Log.Error(res.Error) + continue + } + + switch notif := res.Value.(type) { + case *ua.DataChangeNotification: + o.Log.Debugf("Received data change notification with %d items", len(notif.MonitoredItems)) + // It is assumed the notifications are ordered chronologically + for _, monitoredItemNotif := range notif.MonitoredItems { + i := int(monitoredItemNotif.ClientHandle) + oldValue := o.LastReceivedData[i].Value + o.UpdateNodeValue(i, monitoredItemNotif.Value) + o.Log.Debugf("Data change notification: node '%s' value changed from %f to %f", + o.NodeIDs[i].String(), oldValue, o.LastReceivedData[i].Value) + o.metrics <- o.MetricForNode(i) + } + + default: + o.Log.Warnf("Received notification has unexpected type %s", reflect.TypeOf(res.Value)) + } + } + } +}