feat(inputs.opcua_listener): OPC UA Event subscriptions (#11786)

This commit is contained in:
Lars Stegman 2022-10-25 16:06:08 +02:00 committed by GitHub
parent 7d9f09ddc8
commit 739f800b53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 2883 additions and 888 deletions

View File

@ -0,0 +1,214 @@
package opcua
import (
"context"
"fmt"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"net/url"
"strconv"
"time"
)
type OpcUAWorkarounds struct {
AdditionalValidStatusCodes []string `toml:"additional_valid_status_codes"`
}
type OpcUAClientConfig struct {
Endpoint string `toml:"endpoint"`
SecurityPolicy string `toml:"security_policy"`
SecurityMode string `toml:"security_mode"`
Certificate string `toml:"certificate"`
PrivateKey string `toml:"private_key"`
Username string `toml:"username"`
Password string `toml:"password"`
AuthMethod string `toml:"auth_method"`
ConnectTimeout config.Duration `toml:"connect_timeout"`
RequestTimeout config.Duration `toml:"request_timeout"`
Workarounds OpcUAWorkarounds `toml:"workarounds"`
}
func (o *OpcUAClientConfig) Validate() error {
return o.validateEndpoint()
}
func (o *OpcUAClientConfig) validateEndpoint() error {
if o.Endpoint == "" {
return fmt.Errorf("endpoint url is empty")
}
_, err := url.Parse(o.Endpoint)
if err != nil {
return fmt.Errorf("endpoint url is invalid")
}
switch o.SecurityPolicy {
case "None", "Basic128Rsa15", "Basic256", "Basic256Sha256", "auto":
default:
return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityPolicy, o.Endpoint)
}
switch o.SecurityMode {
case "None", "Sign", "SignAndEncrypt", "auto":
default:
return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityMode, o.Endpoint)
}
return nil
}
func (o *OpcUAClientConfig) CreateClient(log telegraf.Logger) (*OpcUAClient, error) {
err := o.Validate()
if err != nil {
return nil, err
}
c := &OpcUAClient{
Config: o,
Log: log,
}
c.Log.Debug("Initialising OpcUAClient")
c.State = Disconnected
err = c.setupWorkarounds()
return c, err
}
// ConnectionState used for constants
type ConnectionState int
const (
// Disconnected constant State 0
Disconnected ConnectionState = iota
// Connecting constant State 1
Connecting
// Connected constant State 2
Connected
)
type OpcUAClient struct {
Config *OpcUAClientConfig
Log telegraf.Logger
State ConnectionState
Client *opcua.Client
opts []opcua.Option
codes []ua.StatusCode
}
func (o *OpcUAClient) Init() error {
return o.setupOptions()
}
// / setupOptions read the endpoints from the specified server and setup all authentication
func (o *OpcUAClient) setupOptions() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout))
defer cancel()
// Get a list of the endpoints for our target server
endpoints, err := opcua.GetEndpoints(ctx, o.Config.Endpoint)
if err != nil {
return err
}
if o.Config.Certificate == "" && o.Config.PrivateKey == "" {
if o.Config.SecurityPolicy != "None" || o.Config.SecurityMode != "None" {
o.Log.Debug("Generating self-signed certificate")
cert, privateKey, err := generateCert("urn:telegraf:gopcua:client", 2048,
o.Config.Certificate, o.Config.PrivateKey, 365*24*time.Hour)
if err != nil {
return err
}
o.Config.Certificate = cert
o.Config.PrivateKey = privateKey
}
}
o.Log.Debug("Configuring OPC UA connection options")
o.opts, err = o.generateClientOpts(endpoints)
return err
}
func (o *OpcUAClient) setupWorkarounds() error {
o.codes = []ua.StatusCode{ua.StatusOK}
for _, c := range o.Config.Workarounds.AdditionalValidStatusCodes {
val, err := strconv.ParseInt(c, 0, 32) // setting 32 bits to allow for safe conversion
if err != nil {
return err
}
o.codes = append(o.codes, ua.StatusCode(uint32(val)))
}
return nil
}
func (o *OpcUAClient) StatusCodeOK(code ua.StatusCode) bool {
for _, val := range o.codes {
if val == code {
return true
}
}
return false
}
// Connect to an OPC UA device
func (o *OpcUAClient) Connect() error {
o.Log.Debug("Connecting OPC UA Client to server")
u, err := url.Parse(o.Config.Endpoint)
if err != nil {
return err
}
switch u.Scheme {
case "opc.tcp":
o.State = Connecting
if o.Client != nil {
o.Log.Warnf("Closing connection due to Connect called while already instantiated", u)
if err := o.Client.Close(); err != nil {
// Only log the error but to not bail-out here as this prevents
// reconnections for multiple parties (see e.g. #9523).
o.Log.Errorf("Closing connection failed: %v", err)
}
}
o.Client = opcua.NewClient(o.Config.Endpoint, o.opts...)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout))
defer cancel()
if err := o.Client.Connect(ctx); err != nil {
o.State = Disconnected
return fmt.Errorf("error in Client Connection: %s", err)
}
o.State = Connected
o.Log.Debug("Connected to OPC UA Server")
default:
return fmt.Errorf("unsupported scheme %q in endpoint. Expected opc.tcp", u.Scheme)
}
return nil
}
func (o *OpcUAClient) Disconnect(ctx context.Context) error {
o.Log.Debug("Disconnecting from OPC UA Server")
u, err := url.Parse(o.Config.Endpoint)
if err != nil {
return err
}
switch u.Scheme {
case "opc.tcp":
o.State = Disconnected
// We can't do anything about failing to close a connection
//nolint:errcheck,revive
err := o.Client.CloseWithContext(ctx)
o.Client = nil
return err
default:
return fmt.Errorf("invalid controller")
}
}

View File

@ -0,0 +1,31 @@
package opcua
import (
"github.com/gopcua/opcua/ua"
"github.com/stretchr/testify/require"
"testing"
)
func TestSetupWorkarounds(t *testing.T) {
o := OpcUAClient{
Config: &OpcUAClientConfig{
Workarounds: OpcUAWorkarounds{
AdditionalValidStatusCodes: []string{"0xC0", "0x00AA0000"},
},
},
}
err := o.setupWorkarounds()
require.NoError(t, err)
require.Len(t, o.codes, 3)
require.Equal(t, o.codes[0], ua.StatusCode(0))
require.Equal(t, o.codes[1], ua.StatusCode(192))
require.Equal(t, o.codes[2], ua.StatusCode(11141120))
}
func TestCheckStatusCode(t *testing.T) {
var o OpcUAClient
o.codes = []ua.StatusCode{ua.StatusCode(0), ua.StatusCode(192), ua.StatusCode(11141120)}
require.Equal(t, o.StatusCodeOK(ua.StatusCode(192)), true)
}

View File

@ -0,0 +1,395 @@
package input
import (
"context"
"fmt"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/opcua"
"sort"
"strconv"
"strings"
"time"
)
// NodeSettings describes how to map from a OPC UA node to a Metric
type NodeSettings struct {
FieldName string `toml:"name"`
Namespace string `toml:"namespace"`
IdentifierType string `toml:"identifier_type"`
Identifier string `toml:"identifier"`
DataType string `toml:"data_type" deprecated:"1.17.0;option is ignored"`
Description string `toml:"description" deprecated:"1.17.0;option is ignored"`
TagsSlice [][]string `toml:"tags" deprecated:"1.25.0;use 'default_tags' instead"`
DefaultTags map[string]string `toml:"default_tags"`
}
// NodeID returns the OPC UA node id
func (tag *NodeSettings) NodeID() string {
return "ns=" + tag.Namespace + ";" + tag.IdentifierType + "=" + tag.Identifier
}
// NodeGroupSettings describes a mapping of group of nodes to Metrics
type NodeGroupSettings struct {
MetricName string `toml:"name"` // Overrides plugin's setting
Namespace string `toml:"namespace"` // Can be overridden by node setting
IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting
Nodes []NodeSettings `toml:"nodes"`
TagsSlice [][]string `toml:"tags" deprecated:"1.26.0;use default_tags"`
DefaultTags map[string]string `toml:"default_tags"`
}
type TimestampSource string
const (
TimestampSourceServer TimestampSource = "server"
TimestampSourceSource TimestampSource = "source"
TimestampSourceTelegraf TimestampSource = "gather"
)
// InputClientConfig a configuration for the input client
type InputClientConfig struct {
opcua.OpcUAClientConfig
MetricName string `toml:"name"`
Timestamp TimestampSource `toml:"timestamp"`
RootNodes []NodeSettings `toml:"nodes"`
Groups []NodeGroupSettings `toml:"group"`
}
func (o *InputClientConfig) Validate() error {
if o.MetricName == "" {
return fmt.Errorf("metric name is empty")
}
err := choice.Check(string(o.Timestamp), []string{"", "gather", "server", "source"})
if err != nil {
return err
}
return nil
}
func (o *InputClientConfig) CreateInputClient(log telegraf.Logger) (*OpcUAInputClient, error) {
err := o.Validate()
if err != nil {
return nil, err
}
log.Debug("Initialising OpcUAInputClient")
opcClient, err := o.OpcUAClientConfig.CreateClient(log)
if err != nil {
return nil, err
}
c := &OpcUAInputClient{
OpcUAClient: opcClient,
Log: log,
Config: *o,
}
log.Debug("Initialising node to metric mapping")
err = c.InitNodeMetricMapping()
if err != nil {
return nil, err
}
c.initLastReceivedValues()
err = c.initNodeIDs()
return c, err
}
// NodeMetricMapping mapping from a single node to a metric
type NodeMetricMapping struct {
Tag NodeSettings
idStr string
metricName string
MetricTags map[string]string
}
// NewNodeMetricMapping builds a new NodeMetricMapping from the given argument
func NewNodeMetricMapping(metricName string, node NodeSettings, groupTags map[string]string) (*NodeMetricMapping, error) {
mergedTags := make(map[string]string)
for n, t := range groupTags {
mergedTags[n] = t
}
nodeTags := make(map[string]string)
if len(node.DefaultTags) > 0 {
nodeTags = node.DefaultTags
} else if len(node.TagsSlice) > 0 {
// fixme: once the TagsSlice has been removed (after deprecation), remove this if else logic
var err error
nodeTags, err = tagsSliceToMap(node.TagsSlice)
if err != nil {
return nil, err
}
}
for n, t := range nodeTags {
mergedTags[n] = t
}
return &NodeMetricMapping{
Tag: node,
idStr: node.NodeID(),
metricName: metricName,
MetricTags: mergedTags,
}, nil
}
// NodeValue The received value for a node
type NodeValue struct {
TagName string
Value interface{}
Quality ua.StatusCode
ServerTime time.Time
SourceTime time.Time
DataType ua.TypeID
}
// OpcUAInputClient can receive data from an OPC UA server and map it to Metrics. This type does not contain
// logic for actually retrieving data from the server, but is used by other types like ReadClient and
// OpcUAInputSubscribeClient to store data needed to convert node ids to the corresponding metrics.
type OpcUAInputClient struct {
*opcua.OpcUAClient
Config InputClientConfig
Log telegraf.Logger
NodeMetricMapping []NodeMetricMapping
NodeIDs []*ua.NodeID
LastReceivedData []NodeValue
}
// Stop the connection to the client
func (o *OpcUAInputClient) Stop(ctx context.Context) <-chan struct{} {
ch := make(chan struct{})
defer close(ch)
err := o.Disconnect(ctx)
if err != nil {
o.Log.Warn("Disconnecting from server failed with error ", err)
}
return ch
}
// metricParts is only used to ensure no duplicate metrics are created
type metricParts struct {
metricName string
fieldName string
tags string // sorted by tag name and in format tag1=value1, tag2=value2
}
func newMP(n *NodeMetricMapping) metricParts {
var keys []string
for key := range n.MetricTags {
keys = append(keys, key)
}
sort.Strings(keys)
var sb strings.Builder
for i, key := range keys {
if i != 0 {
// Writes to a string-builder will always succeed
//nolint:errcheck,revive
sb.WriteString(", ")
}
// Writes to a string-builder will always succeed
//nolint:errcheck,revive
sb.WriteString(key)
// Writes to a string-builder will always succeed
//nolint:errcheck,revive
sb.WriteString("=")
// Writes to a string-builder will always succeed
//nolint:errcheck,revive
sb.WriteString(n.MetricTags[key])
}
x := metricParts{
metricName: n.metricName,
fieldName: n.Tag.FieldName,
tags: sb.String(),
}
return x
}
// fixme: once the TagsSlice has been removed (after deprecation), remove this
// tagsSliceToMap takes an array of pairs of strings and creates a map from it
func tagsSliceToMap(tags [][]string) (map[string]string, error) {
m := make(map[string]string)
for i, tag := range tags {
if len(tag) != 2 {
return nil, fmt.Errorf("tag %d needs 2 values, has %d: %v", i+1, len(tag), tag)
}
if tag[0] == "" {
return nil, fmt.Errorf("tag %d has empty name", i+1)
}
if tag[1] == "" {
return nil, fmt.Errorf("tag %d has empty value", i+1)
}
if _, ok := m[tag[0]]; ok {
return nil, fmt.Errorf("tag %d has duplicate key: %v", i+1, tag[0])
}
m[tag[0]] = tag[1]
}
return m, nil
}
func validateNodeToAdd(existing map[metricParts]struct{}, nmm *NodeMetricMapping) error {
if nmm.Tag.FieldName == "" {
return fmt.Errorf("empty name in '%s'", nmm.Tag.FieldName)
}
if len(nmm.Tag.Namespace) == 0 {
return fmt.Errorf("empty node namespace not allowed")
}
if len(nmm.Tag.Identifier) == 0 {
return fmt.Errorf("empty node identifier not allowed")
}
mp := newMP(nmm)
if _, exists := existing[mp]; exists {
return fmt.Errorf("name '%s' is duplicated (metric name '%s', tags '%s')",
mp.fieldName, mp.metricName, mp.tags)
}
switch nmm.Tag.IdentifierType {
case "i":
if _, err := strconv.Atoi(nmm.Tag.Identifier); err != nil {
return fmt.Errorf("identifier type '%s' does not match the type of identifier '%s'", nmm.Tag.IdentifierType, nmm.Tag.Identifier)
}
case "s", "g", "b":
// Valid identifier type - do nothing.
default:
return fmt.Errorf("invalid identifier type '%s' in '%s'", nmm.Tag.IdentifierType, nmm.Tag.FieldName)
}
existing[mp] = struct{}{}
return nil
}
// InitNodeMetricMapping builds nodes from the configuration
func (o *OpcUAInputClient) InitNodeMetricMapping() error {
existing := map[metricParts]struct{}{}
for _, node := range o.Config.RootNodes {
nmm, err := NewNodeMetricMapping(o.Config.MetricName, node, make(map[string]string))
if err != nil {
return err
}
if err := validateNodeToAdd(existing, nmm); err != nil {
return err
}
o.NodeMetricMapping = append(o.NodeMetricMapping, *nmm)
}
for _, group := range o.Config.Groups {
if group.MetricName == "" {
group.MetricName = o.Config.MetricName
}
if len(group.DefaultTags) > 0 && len(group.TagsSlice) > 0 {
o.Log.Warn("Tags found in both `tags` and `default_tags`, only using tags defined in `default_tags`")
}
groupTags := make(map[string]string)
if len(group.DefaultTags) > 0 {
groupTags = group.DefaultTags
} else if len(group.TagsSlice) > 0 {
// fixme: once the TagsSlice has been removed (after deprecation), remove this if else logic
var err error
groupTags, err = tagsSliceToMap(group.TagsSlice)
if err != nil {
return err
}
}
for _, node := range group.Nodes {
if node.Namespace == "" {
node.Namespace = group.Namespace
}
if node.IdentifierType == "" {
node.IdentifierType = group.IdentifierType
}
nmm, err := NewNodeMetricMapping(group.MetricName, node, groupTags)
if err != nil {
return err
}
if err := validateNodeToAdd(existing, nmm); err != nil {
return err
}
o.NodeMetricMapping = append(o.NodeMetricMapping, *nmm)
}
}
return nil
}
func (o *OpcUAInputClient) initNodeIDs() error {
o.NodeIDs = make([]*ua.NodeID, len(o.NodeMetricMapping))
for i, node := range o.NodeMetricMapping {
nid, err := ua.ParseNodeID(node.Tag.NodeID())
if err != nil {
return err
}
o.NodeIDs[i] = nid
}
return nil
}
func (o *OpcUAInputClient) initLastReceivedValues() {
o.LastReceivedData = make([]NodeValue, len(o.NodeMetricMapping))
for nodeIdx, nmm := range o.NodeMetricMapping {
o.LastReceivedData[nodeIdx].TagName = nmm.Tag.FieldName
}
}
func (o *OpcUAInputClient) UpdateNodeValue(nodeIdx int, d *ua.DataValue) {
o.LastReceivedData[nodeIdx].Quality = d.Status
if !o.StatusCodeOK(d.Status) {
o.Log.Errorf("status not OK for node %v: %v", o.NodeMetricMapping[nodeIdx].Tag.FieldName, d.Status)
return
}
if d.Value != nil {
o.LastReceivedData[nodeIdx].Value = d.Value.Value()
o.LastReceivedData[nodeIdx].DataType = d.Value.Type()
}
o.LastReceivedData[nodeIdx].ServerTime = d.ServerTimestamp
o.LastReceivedData[nodeIdx].SourceTime = d.SourceTimestamp
}
func (o *OpcUAInputClient) MetricForNode(nodeIdx int) telegraf.Metric {
nmm := &o.NodeMetricMapping[nodeIdx]
fields := make(map[string]interface{})
tags := map[string]string{
"id": nmm.idStr,
}
for k, v := range nmm.MetricTags {
tags[k] = v
}
fields[nmm.Tag.FieldName] = o.LastReceivedData[nodeIdx].Value
fields["Quality"] = strings.TrimSpace(fmt.Sprint(o.LastReceivedData[nodeIdx].Quality))
if !o.StatusCodeOK(o.LastReceivedData[nodeIdx].Quality) {
mp := newMP(nmm)
o.Log.Debugf("status not OK for node '%s'(metric name '%s', tags '%s')",
mp.fieldName, mp.metricName, mp.tags)
}
var t time.Time
switch o.Config.Timestamp {
case TimestampSourceServer:
t = o.LastReceivedData[nodeIdx].ServerTime
case TimestampSourceSource:
t = o.LastReceivedData[nodeIdx].SourceTime
default:
t = time.Now()
}
return metric.New(nmm.metricName, tags, fields, t)
}

View File

@ -0,0 +1,850 @@
package input
import (
"fmt"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestTagsSliceToMap(t *testing.T) {
m, err := tagsSliceToMap([][]string{{"foo", "bar"}, {"baz", "bat"}})
require.NoError(t, err)
require.Len(t, m, 2)
require.Equal(t, m["foo"], "bar")
require.Equal(t, m["baz"], "bat")
}
func TestTagsSliceToMap_twoStrings(t *testing.T) {
var err error
_, err = tagsSliceToMap([][]string{{"foo", "bar", "baz"}})
require.Error(t, err)
_, err = tagsSliceToMap([][]string{{"foo"}})
require.Error(t, err)
}
func TestTagsSliceToMap_dupeKey(t *testing.T) {
_, err := tagsSliceToMap([][]string{{"foo", "bar"}, {"foo", "bat"}})
require.Error(t, err)
}
func TestTagsSliceToMap_empty(t *testing.T) {
_, err := tagsSliceToMap([][]string{{"foo", ""}})
require.Equal(t, fmt.Errorf("tag 1 has empty value"), err)
_, err = tagsSliceToMap([][]string{{"", "bar"}})
require.Equal(t, fmt.Errorf("tag 1 has empty name"), err)
}
func TestValidateOPCTags(t *testing.T) {
tests := []struct {
name string
config InputClientConfig
err error
}{
{
"duplicates",
InputClientConfig{
MetricName: "mn",
RootNodes: []NodeSettings{
{
FieldName: "fn",
Namespace: "2",
IdentifierType: "s",
Identifier: "i1",
TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}},
},
},
Groups: []NodeGroupSettings{
{
Nodes: []NodeSettings{
{
FieldName: "fn",
Namespace: "2",
IdentifierType: "s",
Identifier: "i1",
},
},
TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}},
},
},
},
fmt.Errorf("name 'fn' is duplicated (metric name 'mn', tags 't1=v1, t2=v2')"),
},
{
"empty tag value not allowed",
InputClientConfig{
MetricName: "mn",
RootNodes: []NodeSettings{
{
FieldName: "fn",
IdentifierType: "s",
TagsSlice: [][]string{{"t1", ""}},
},
},
},
fmt.Errorf("tag 1 has empty value"),
},
{
"empty tag name not allowed",
InputClientConfig{
MetricName: "mn",
RootNodes: []NodeSettings{
{
FieldName: "fn",
IdentifierType: "s",
TagsSlice: [][]string{{"", "1"}},
},
},
},
fmt.Errorf("tag 1 has empty name"),
},
{
"different metric tag names",
InputClientConfig{
MetricName: "mn",
RootNodes: []NodeSettings{
{
FieldName: "fn",
Namespace: "2",
IdentifierType: "s",
Identifier: "i1",
TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}},
},
{
FieldName: "fn",
Namespace: "2",
IdentifierType: "s",
Identifier: "i1",
TagsSlice: [][]string{{"t1", "v1"}, {"t3", "v2"}},
},
},
Groups: []NodeGroupSettings{},
},
nil,
},
{
"different metric tag values",
InputClientConfig{
MetricName: "mn",
RootNodes: []NodeSettings{
{
FieldName: "fn",
Namespace: "2",
IdentifierType: "s",
Identifier: "i1",
TagsSlice: [][]string{{"t1", "foo"}, {"t2", "v2"}},
},
{
FieldName: "fn",
Namespace: "2",
IdentifierType: "s",
Identifier: "i1",
TagsSlice: [][]string{{"t1", "bar"}, {"t2", "v2"}},
},
},
Groups: []NodeGroupSettings{},
},
nil,
},
{
"different metric names",
InputClientConfig{
MetricName: "mn",
RootNodes: []NodeSettings{},
Groups: []NodeGroupSettings{
{
MetricName: "mn",
Namespace: "2",
Nodes: []NodeSettings{
{
FieldName: "fn",
IdentifierType: "s",
Identifier: "i1",
TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}},
},
},
},
{
MetricName: "mn2",
Namespace: "2",
Nodes: []NodeSettings{
{
FieldName: "fn",
IdentifierType: "s",
Identifier: "i1",
TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}},
},
},
},
},
},
nil,
},
{
"different field names",
InputClientConfig{
MetricName: "mn",
RootNodes: []NodeSettings{
{
FieldName: "fn",
Namespace: "2",
IdentifierType: "s",
Identifier: "i1",
TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}},
},
{
FieldName: "fn2",
Namespace: "2",
IdentifierType: "s",
Identifier: "i1",
TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}},
},
},
Groups: []NodeGroupSettings{},
},
nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
o := OpcUAInputClient{
Config: tt.config,
Log: testutil.Logger{},
}
require.Equal(t, tt.err, o.InitNodeMetricMapping())
})
}
}
func TestNewNodeMetricMappingTags(t *testing.T) {
tests := []struct {
name string
settings NodeSettings
groupTags map[string]string
expectedTags map[string]string
err error
}{
{
name: "empty tags",
settings: NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "h",
TagsSlice: [][]string{},
},
groupTags: map[string]string{},
expectedTags: map[string]string{},
err: nil,
},
{
name: "node tags only",
settings: NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "h",
TagsSlice: [][]string{{"t1", "v1"}},
},
groupTags: map[string]string{},
expectedTags: map[string]string{"t1": "v1"},
err: nil,
},
{
name: "group tags only",
settings: NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "h",
TagsSlice: [][]string{},
},
groupTags: map[string]string{"t1": "v1"},
expectedTags: map[string]string{"t1": "v1"},
err: nil,
},
{
name: "node tag overrides group tags",
settings: NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "h",
TagsSlice: [][]string{{"t1", "v2"}},
},
groupTags: map[string]string{"t1": "v1"},
expectedTags: map[string]string{"t1": "v2"},
err: nil,
},
{
name: "node tag merged with group tags",
settings: NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "h",
TagsSlice: [][]string{{"t2", "v2"}},
},
groupTags: map[string]string{"t1": "v1"},
expectedTags: map[string]string{"t1": "v1", "t2": "v2"},
err: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nmm, err := NewNodeMetricMapping("testmetric", tt.settings, tt.groupTags)
require.Equal(t, tt.err, err)
require.Equal(t, tt.expectedTags, nmm.MetricTags)
})
}
}
func TestNewNodeMetricMappingIdStrInstantiated(t *testing.T) {
nmm, err := NewNodeMetricMapping("testmetric", NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "h",
TagsSlice: [][]string{},
}, map[string]string{})
require.NoError(t, err)
require.Equal(t, nmm.idStr, "ns=2;s=h")
}
func TestValidateNodeToAdd(t *testing.T) {
tests := []struct {
name string
existing map[metricParts]struct{}
nmm *NodeMetricMapping
err error
}{
{
name: "valid",
existing: map[metricParts]struct{}{},
nmm: func() *NodeMetricMapping {
nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "hf",
TagsSlice: [][]string{},
}, map[string]string{})
return nmm
}(),
err: nil,
},
{
name: "empty field name not allowed",
existing: map[metricParts]struct{}{},
nmm: func() *NodeMetricMapping {
nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{
FieldName: "",
Namespace: "2",
IdentifierType: "s",
Identifier: "hf",
TagsSlice: [][]string{},
}, map[string]string{})
return nmm
}(),
err: fmt.Errorf("empty name in ''"),
},
{
name: "empty namespace not allowed",
existing: map[metricParts]struct{}{},
nmm: func() *NodeMetricMapping {
nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{
FieldName: "f",
Namespace: "",
IdentifierType: "s",
Identifier: "hf",
TagsSlice: [][]string{},
}, map[string]string{})
return nmm
}(),
err: fmt.Errorf("empty node namespace not allowed"),
},
{
name: "empty identifier type not allowed",
existing: map[metricParts]struct{}{},
nmm: func() *NodeMetricMapping {
nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "",
Identifier: "hf",
TagsSlice: [][]string{},
}, map[string]string{})
return nmm
}(),
err: fmt.Errorf("invalid identifier type '' in 'f'"),
},
{
name: "invalid identifier type not allowed",
existing: map[metricParts]struct{}{},
nmm: func() *NodeMetricMapping {
nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "j",
Identifier: "hf",
TagsSlice: [][]string{},
}, map[string]string{})
return nmm
}(),
err: fmt.Errorf("invalid identifier type 'j' in 'f'"),
},
{
name: "duplicate metric not allowed",
existing: map[metricParts]struct{}{
{metricName: "testmetric", fieldName: "f", tags: "t1=v1, t2=v2"}: {},
},
nmm: func() *NodeMetricMapping {
nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "hf",
TagsSlice: [][]string{{"t1", "v1"}, {"t2", "v2"}},
}, map[string]string{})
return nmm
}(),
err: fmt.Errorf("name 'f' is duplicated (metric name 'testmetric', tags 't1=v1, t2=v2')"),
},
{
name: "identifier type mismatch",
existing: map[metricParts]struct{}{},
nmm: func() *NodeMetricMapping {
nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "i",
Identifier: "hf",
TagsSlice: [][]string{},
}, map[string]string{})
return nmm
}(),
err: fmt.Errorf("identifier type 'i' does not match the type of identifier 'hf'"),
},
}
for idT, idV := range map[string]string{
"s": "hf",
"i": "1",
"g": "849683f0-ce92-4fa2-836f-a02cde61d75d",
"b": "aGVsbG8gSSBhbSBhIHRlc3QgaWRlbnRpZmllcg=="} {
tests = append(tests, struct {
name string
existing map[metricParts]struct{}
nmm *NodeMetricMapping
err error
}{
name: "identifier type " + idT + " allowed",
existing: map[metricParts]struct{}{},
nmm: func() *NodeMetricMapping {
nmm, _ := NewNodeMetricMapping("testmetric", NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: idT,
Identifier: idV,
TagsSlice: [][]string{},
}, map[string]string{})
return nmm
}(),
err: nil,
})
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateNodeToAdd(tt.existing, tt.nmm)
require.Equal(t, tt.err, err)
})
}
}
func TestInitNodeMetricMapping(t *testing.T) {
tests := []struct {
testname string
config InputClientConfig
expected []NodeMetricMapping
err error
}{
{
testname: "only root node",
config: InputClientConfig{
MetricName: "testmetric",
Timestamp: TimestampSourceTelegraf,
RootNodes: []NodeSettings{
{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "id1",
TagsSlice: [][]string{{"t1", "v1"}},
},
},
Groups: []NodeGroupSettings{},
},
expected: []NodeMetricMapping{
{
Tag: NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "id1",
TagsSlice: [][]string{{"t1", "v1"}},
},
idStr: "ns=2;s=id1",
metricName: "testmetric",
MetricTags: map[string]string{"t1": "v1"},
},
},
err: nil,
},
{
testname: "root node and group node",
config: InputClientConfig{
MetricName: "testmetric",
Timestamp: TimestampSourceTelegraf,
RootNodes: []NodeSettings{
{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "id1",
TagsSlice: [][]string{{"t1", "v1"}},
},
},
Groups: []NodeGroupSettings{
{
MetricName: "groupmetric",
Namespace: "3",
IdentifierType: "s",
Nodes: []NodeSettings{
{
FieldName: "f",
Identifier: "id2",
TagsSlice: [][]string{{"t2", "v2"}},
},
},
TagsSlice: [][]string{},
},
},
},
expected: []NodeMetricMapping{
{
Tag: NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "id1",
TagsSlice: [][]string{{"t1", "v1"}},
},
idStr: "ns=2;s=id1",
metricName: "testmetric",
MetricTags: map[string]string{"t1": "v1"},
},
{
Tag: NodeSettings{
FieldName: "f",
Namespace: "3",
IdentifierType: "s",
Identifier: "id2",
TagsSlice: [][]string{{"t2", "v2"}},
},
idStr: "ns=3;s=id2",
metricName: "groupmetric",
MetricTags: map[string]string{"t2": "v2"},
},
},
err: nil,
},
{
testname: "only group node",
config: InputClientConfig{
MetricName: "testmetric",
Timestamp: TimestampSourceTelegraf,
RootNodes: []NodeSettings{},
Groups: []NodeGroupSettings{
{
MetricName: "groupmetric",
Namespace: "3",
IdentifierType: "s",
Nodes: []NodeSettings{
{
FieldName: "f",
Identifier: "id2",
TagsSlice: [][]string{{"t2", "v2"}},
},
},
TagsSlice: [][]string{},
},
},
},
expected: []NodeMetricMapping{
{
Tag: NodeSettings{
FieldName: "f",
Namespace: "3",
IdentifierType: "s",
Identifier: "id2",
TagsSlice: [][]string{{"t2", "v2"}},
},
idStr: "ns=3;s=id2",
metricName: "groupmetric",
MetricTags: map[string]string{"t2": "v2"},
},
},
err: nil,
},
{
testname: "tags and default only default tags used",
config: InputClientConfig{
MetricName: "testmetric",
Timestamp: TimestampSourceTelegraf,
RootNodes: []NodeSettings{},
Groups: []NodeGroupSettings{
{
MetricName: "groupmetric",
Namespace: "3",
IdentifierType: "s",
Nodes: []NodeSettings{
{
FieldName: "f",
Identifier: "id2",
TagsSlice: [][]string{{"t2", "v2"}},
DefaultTags: map[string]string{"t3": "v3"},
},
},
TagsSlice: [][]string{},
},
},
},
expected: []NodeMetricMapping{
{
Tag: NodeSettings{
FieldName: "f",
Namespace: "3",
IdentifierType: "s",
Identifier: "id2",
TagsSlice: [][]string{{"t2", "v2"}},
DefaultTags: map[string]string{"t3": "v3"},
},
idStr: "ns=3;s=id2",
metricName: "groupmetric",
MetricTags: map[string]string{"t3": "v3"},
},
},
err: nil,
},
{
testname: "only root node default overrides slice",
config: InputClientConfig{
MetricName: "testmetric",
Timestamp: TimestampSourceTelegraf,
RootNodes: []NodeSettings{
{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "id1",
TagsSlice: [][]string{{"t1", "v1"}},
DefaultTags: map[string]string{"t3": "v3"},
},
},
Groups: []NodeGroupSettings{},
},
expected: []NodeMetricMapping{
{
Tag: NodeSettings{
FieldName: "f",
Namespace: "2",
IdentifierType: "s",
Identifier: "id1",
TagsSlice: [][]string{{"t1", "v1"}},
DefaultTags: map[string]string{"t3": "v3"},
},
idStr: "ns=2;s=id1",
metricName: "testmetric",
MetricTags: map[string]string{"t3": "v3"},
},
},
err: nil,
},
}
for _, tt := range tests {
t.Run(tt.testname, func(t *testing.T) {
o := OpcUAInputClient{Config: tt.config}
err := o.InitNodeMetricMapping()
require.NoError(t, err)
require.Equal(t, tt.expected, o.NodeMetricMapping)
})
}
}
func TestUpdateNodeValue(t *testing.T) {
type testStep struct {
nodeIdx int
value interface{}
status ua.StatusCode
expected interface{}
}
tests := []struct {
testname string
steps []testStep
}{
{
"value should update when code ok",
[]testStep{
{
0,
"Harmony",
ua.StatusOK,
"Harmony",
},
},
},
{
"value should not update when code bad",
[]testStep{
{
0,
"Harmony",
ua.StatusOK,
"Harmony",
},
{
0,
"Odium",
ua.StatusBad,
"Harmony",
},
{
0,
"Ati",
ua.StatusOK,
"Ati",
},
},
},
}
conf := &opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4930",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "",
ConnectTimeout: config.Duration(2 * time.Second),
RequestTimeout: config.Duration(2 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
}
c, err := conf.CreateClient(testutil.Logger{})
require.NoError(t, err)
o := OpcUAInputClient{
OpcUAClient: c,
Log: testutil.Logger{},
NodeMetricMapping: []NodeMetricMapping{
{
Tag: NodeSettings{
FieldName: "f",
},
},
{
Tag: NodeSettings{
FieldName: "f2",
},
},
},
LastReceivedData: make([]NodeValue, 2),
}
for _, tt := range tests {
t.Run(tt.testname, func(t *testing.T) {
o.LastReceivedData = make([]NodeValue, 2)
for i, step := range tt.steps {
v, _ := ua.NewVariant(step.value)
o.UpdateNodeValue(0, &ua.DataValue{
Value: v,
Status: step.status,
SourceTimestamp: time.Date(2022, 03, 17, 8, 33, 00, 00, &time.Location{}).Add(time.Duration(i) * time.Second),
SourcePicoseconds: 0,
ServerTimestamp: time.Date(2022, 03, 17, 8, 33, 00, 500, &time.Location{}).Add(time.Duration(i) * time.Second),
ServerPicoseconds: 0,
})
require.Equal(t, step.expected, o.LastReceivedData[0].Value)
}
})
}
}
func TestMetricForNode(t *testing.T) {
conf := &opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4930",
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "",
ConnectTimeout: config.Duration(2 * time.Second),
RequestTimeout: config.Duration(2 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
}
c, err := conf.CreateClient(testutil.Logger{})
require.NoError(t, err)
o := OpcUAInputClient{
Config: InputClientConfig{
Timestamp: TimestampSourceSource,
},
OpcUAClient: c,
Log: testutil.Logger{},
LastReceivedData: make([]NodeValue, 2),
}
tests := []struct {
testname string
nmm []NodeMetricMapping
v interface{}
time time.Time
status ua.StatusCode
expected telegraf.Metric
}{
{
testname: "metric build correctly",
nmm: []NodeMetricMapping{
{
Tag: NodeSettings{
FieldName: "fn",
},
idStr: "ns=3;s=hi",
metricName: "testingmetric",
MetricTags: map[string]string{"t1": "v1"},
},
},
v: 16,
time: time.Date(2022, 03, 17, 8, 55, 00, 00, &time.Location{}),
status: ua.StatusOK,
expected: metric.New("testingmetric",
map[string]string{"t1": "v1", "id": "ns=3;s=hi"},
map[string]interface{}{"Quality": "OK (0x0)", "fn": 16},
time.Date(2022, 03, 17, 8, 55, 00, 00, &time.Location{})),
},
}
for _, tt := range tests {
t.Run(tt.testname, func(t *testing.T) {
o.NodeMetricMapping = tt.nmm
o.LastReceivedData[0].SourceTime = tt.time
o.LastReceivedData[0].Quality = tt.status
o.LastReceivedData[0].Value = tt.v
actual := o.MetricForNode(0)
require.Equal(t, tt.expected.Tags(), actual.Tags())
require.Equal(t, tt.expected.Fields(), actual.Fields())
require.Equal(t, tt.expected.Time(), actual.Time())
})
}
}

View File

@ -62,7 +62,7 @@ func generateCert(host string, rsaBits int, certFile, keyFile string, dur time.D
template := x509.Certificate{
SerialNumber: serialNumber,
Subject: pkix.Name{
Organization: []string{"Telegraf OPC UA client"},
Organization: []string{"Telegraf OPC UA Client"},
},
NotBefore: notBefore,
NotAfter: notAfter,
@ -144,8 +144,7 @@ func pemBlockForKey(priv interface{}) (*pem.Block, error) {
}
}
//revive:disable-next-line
func (o *OpcUA) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua.Option, error) {
func (o *OpcUAClient) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua.Option, error) {
opts := []opcua.Option{}
appuri := "urn:telegraf:gopcua:client"
appname := "Telegraf"
@ -153,12 +152,12 @@ func (o *OpcUA) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua
// ApplicationURI is automatically read from the cert so is not required if a cert if provided
opts = append(opts, opcua.ApplicationURI(appuri))
opts = append(opts, opcua.ApplicationName(appname))
opts = append(opts, opcua.RequestTimeout(time.Duration(o.RequestTimeout)))
opts = append(opts, opcua.RequestTimeout(time.Duration(o.Config.RequestTimeout)))
certFile := o.Certificate
keyFile := o.PrivateKey
policy := o.SecurityPolicy
mode := o.SecurityMode
certFile := o.Config.Certificate
keyFile := o.Config.PrivateKey
policy := o.Config.SecurityPolicy
mode := o.Config.SecurityMode
var err error
if certFile == "" && keyFile == "" {
if policy != "None" || mode != "None" {
@ -199,8 +198,10 @@ func (o *OpcUA) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua
return nil, fmt.Errorf("invalid security policy: %s", policy)
}
o.Log.Debugf("security policy from configuration %s", secPolicy)
// Select the most appropriate authentication mode from server capabilities and user input
authMode, authOption, err := o.generateAuth(o.AuthMethod, cert, o.Username, o.Password)
authMode, authOption, err := o.generateAuth(o.Config.AuthMethod, cert, o.Config.Username, o.Config.Password)
if err != nil {
return nil, err
}
@ -254,9 +255,13 @@ func (o *OpcUA) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua
}
default: // User cares about both
o.Log.Debugf("User cares about both the policy (%s) and security mode (%s)", secPolicy, secMode)
o.Log.Debugf("Server has %d endpoints", len(endpoints))
for _, e := range endpoints {
o.Log.Debugf("Evaluating endpoint %s, policy %s, mode %s, level %d", e.EndpointURL, e.SecurityPolicyURI, e.SecurityMode, e.SecurityLevel)
if e.SecurityPolicyURI == secPolicy && e.SecurityMode == secMode && (serverEndpoint == nil || e.SecurityLevel >= serverEndpoint.SecurityLevel) {
serverEndpoint = e
o.Log.Debugf("Security policy and mode found. Using server endpoint %s for security. Policy %s", serverEndpoint.EndpointURL, serverEndpoint.SecurityPolicyURI)
}
}
}
@ -278,7 +283,7 @@ func (o *OpcUA) generateClientOpts(endpoints []*ua.EndpointDescription) ([]opcua
return opts, nil
}
func (o *OpcUA) generateAuth(a string, cert []byte, un, pw string) (ua.UserTokenType, opcua.Option, error) {
func (o *OpcUAClient) generateAuth(a string, cert []byte, un, pw string) (ua.UserTokenType, opcua.Option, error) {
var err error
var authMode ua.UserTokenType

View File

@ -0,0 +1,5 @@
//go:build !custom || inputs || inputs.opcua_listener
package all
import _ "github.com/influxdata/telegraf/plugins/inputs/opcua_listener" // register plugin

View File

@ -1,6 +1,6 @@
# OPC UA Client Input Plugin
# OPC UA Client Reader Input Plugin
The `opcua` plugin retrieves data from OPC UA client devices.
The `opcua` plugin retrieves data from OPC UA Server devices.
Telegraf minimum version: Telegraf 1.16
Plugin minimum tested version: 1.16
@ -19,7 +19,7 @@ Plugin minimum tested version: 1.16
## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s"
#
## Maximum time allowed for a request over the estabilished connection.
## Maximum time allowed for a request over the established connection.
# request_timeout = "5s"
#
## Security policy, one of "None", "Basic128Rsa15", "Basic256",
@ -58,18 +58,38 @@ Plugin minimum tested version: 1.16
## namespace - OPC UA namespace of the node (integer value 0 thru 3)
## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque)
## identifier - OPC UA ID (tag as shown in opcua browser)
## tags - extra tags to be added to the output metric (optional)
## Example:
## {name="ProductUri", namespace="0", identifier_type="i", identifier="2262", tags=[["tag1","value1"],["tag2","value2"]]}
## tags - extra tags to be added to the output metric (optional); deprecated in 1.25.0; use default_tags
## default_tags - extra tags to be added to the output metric (optional)
##
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier=""},
# {name="", namespace="", identifier_type="", identifier=""},
#]
# {name="", namespace="", identifier_type="", identifier="", tags=[["tag1", "value1"], ["tag2", "value2"]},
# {name="", namespace="", identifier_type="", identifier=""},
# ]
#
## Bracketed notation
# [[inputs.opcua.nodes]]
# name = "node1"
# namespace = ""
# identifier_type = ""
# identifier = ""
# default_tags = { tag1 = "value1", tag2 = "value2" }
#
# [[inputs.opcua.nodes]]
# name = "node2"
# namespace = ""
# identifier_type = ""
# identifier = ""
#
## Node Group
## Sets defaults for OPC UA namespace and ID type so they aren't required in
## every node. A group can also have a metric name that overrides the main
## plugin metric name.
## Sets defaults so they aren't required in every node.
## Default values can be set for:
## * Metric name
## * OPC UA namespace
## * Identifier
## * Default tags
##
## Multiple node groups are allowed
#[[inputs.opcua.group]]
@ -85,39 +105,61 @@ Plugin minimum tested version: 1.16
## namespace, this is used.
# identifier_type =
#
## Default tags that are applied to every node in this group. Can be
## overwritten in a node by setting a different value for the tag name.
## example: default_tags = { tag1 = "value1" }
# default_tags = {}
#
## Node ID Configuration. Array of nodes with the same settings as above.
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier=""},
# {name="", namespace="", identifier_type="", identifier=""},
# {name="node1", namespace="", identifier_type="", identifier=""},
# {name="node2", namespace="", identifier_type="", identifier=""},
#]
#
## Bracketed notation
# [[inputs.opcua.group.nodes]]
# name = "node1"
# namespace = ""
# identifier_type = ""
# identifier = ""
# default_tags = { tag1 = "override1", tag2 = "value2" }
#
# [[inputs.opcua.group.nodes]]
# name = "node2"
# namespace = ""
# identifier_type = ""
# identifier = ""
## Enable workarounds required by some devices to work correctly
# [inputs.opcua.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid
# additional_valid_status_codes = ["0xC0"]
# [inputs.opcua.request_workarounds]
## Use unregistered reads instead of registered reads
# use_unregistered_reads = false
```
## Node Configuration
An OPC UA node ID may resemble: "n=3;s=Temperature". In this example:
An OPC UA node ID may resemble: "ns=3;s=Temperature". In this example:
- n=3 is indicating the `namespace` is 3
- ns=3 is indicating the `namespace` is 3
- s=Temperature is indicting that the `identifier_type` is a string and `identifier` value is 'Temperature'
- This example temperature node has a value of 79.0
To gather data from this node enter the following line into the 'nodes' property above:
```shell
```text
{field_name="temp", namespace="3", identifier_type="s", identifier="Temperature"},
```
This node configuration produces a metric like this:
```text
opcua,id=n\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000
opcua,id=ns\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000
```
## Group Configuration
@ -132,32 +174,59 @@ The output metric will include tags set in the group and the node. If
a tag with the same name is set in both places, the tag value from the
node is used.
This example group configuration has two groups with two nodes each:
This example group configuration has three groups with two nodes each:
```toml
# Group 1
[[inputs.opcua.group]]
name="group1_metric_name"
namespace="3"
identifier_type="i"
tags=[["group1_tag", "val1"]]
nodes = [
{name="name", identifier="1001", tags=[["node1_tag", "val2"]]},
{name="name", identifier="1002", tags=[["node1_tag", "val3"]]},
]
name = "group1_metric_name"
namespace = "3"
identifier_type = "i"
default_tags = { group1_tag = "val1" }
[[inputs.opcua.group.nodes]]
name = "name"
identifier = "1001"
default_tags = { node1_tag = "val2" }
[[inputs.opcua.group.nodes]]
name = "name"
identifier = "1002"
default_tags = {node1_tag = "val3"}
# Group 2
[[inputs.opcua.group]]
name="group2_metric_name"
namespace="3"
identifier_type="i"
tags=[["group2_tag", "val3"]]
nodes = [
{name="saw", identifier="1003", tags=[["node2_tag", "val4"]]},
{name="sin", identifier="1004"},
]
name = "group2_metric_name"
namespace = "3"
identifier_type = "i"
default_tags = { group2_tag = "val3" }
[[inputs.opcua.group.nodes]]
name = "saw"
identifier = "1003"
default_tags = { node2_tag = "val4" }
[[inputs.opcua.group.nodes]]
name = "sin"
identifier = "1004"
# Group 3
[[inputs.opcua.group]]
name = "group3_metric_name"
namespace = "3"
identifier_type = "i"
default_tags = { group3_tag = "val5" }
nodes = [
{name="name", identifier="1001"},
{name="name", identifier="1002"},
]
```
## Connection Service
This plugin actively reads to retrieve data from the OPC server.
This is done every `interval`.
## Metrics
Metrics are produced according to the defined node ID and group configuration.
The metrics collected by this input plugin will depend on the
configured `nodes` and `group`.
## Example Output

View File

@ -2,556 +2,48 @@
package opcua
import (
"context"
_ "embed"
"fmt"
"net/url"
"sort"
"strconv"
"strings"
"time"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf/plugins/common/opcua"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/selfstat"
)
//go:embed sample.conf
var sampleConfig string
type OpcuaWorkarounds struct {
AdditionalValidStatusCodes []string `toml:"additional_valid_status_codes"`
UseUnregisteredReads bool `toml:"use_unregistered_reads"`
}
// OpcUA type
type OpcUA struct {
MetricName string `toml:"name"`
Endpoint string `toml:"endpoint"`
SecurityPolicy string `toml:"security_policy"`
SecurityMode string `toml:"security_mode"`
Certificate string `toml:"certificate"`
PrivateKey string `toml:"private_key"`
Username string `toml:"username"`
Password string `toml:"password"`
Timestamp string `toml:"timestamp"`
AuthMethod string `toml:"auth_method"`
ConnectTimeout config.Duration `toml:"connect_timeout"`
RequestTimeout config.Duration `toml:"request_timeout"`
RootNodes []NodeSettings `toml:"nodes"`
Groups []GroupSettings `toml:"group"`
Workarounds OpcuaWorkarounds `toml:"workarounds"`
Log telegraf.Logger `toml:"-"`
ReadClientConfig
Log telegraf.Logger `toml:"-"`
nodes []Node
nodeData []OPCData
nodeIDs []*ua.NodeID
nodeIDerror []error
state ConnectionState
// status
ReadSuccess selfstat.Stat `toml:"-"`
ReadError selfstat.Stat `toml:"-"`
// internal values
client *opcua.Client
req *ua.ReadRequest
opts []opcua.Option
client *ReadClient
codes []ua.StatusCode
}
type NodeSettings struct {
FieldName string `toml:"name"`
Namespace string `toml:"namespace"`
IdentifierType string `toml:"identifier_type"`
Identifier string `toml:"identifier"`
DataType string `toml:"data_type"` // Kept for backward compatibility but was never used.
Description string `toml:"description"` // Kept for backward compatibility but was never used.
TagsSlice [][]string `toml:"tags"`
}
type Node struct {
tag NodeSettings
idStr string
metricName string
metricTags map[string]string
}
type GroupSettings struct {
MetricName string `toml:"name"` // Overrides plugin's setting
Namespace string `toml:"namespace"` // Can be overridden by node setting
IdentifierType string `toml:"identifier_type"` // Can be overridden by node setting
Nodes []NodeSettings `toml:"nodes"`
TagsSlice [][]string `toml:"tags"`
}
// OPCData type
type OPCData struct {
TagName string
Value interface{}
Quality ua.StatusCode
ServerTime time.Time
SourceTime time.Time
DataType ua.TypeID
}
// ConnectionState used for constants
type ConnectionState int
const (
//Disconnected constant state 0
Disconnected ConnectionState = iota
//Connecting constant state 1
Connecting
//Connected constant state 2
Connected
)
func (*OpcUA) SampleConfig() string {
return sampleConfig
}
// Init will initialize all tags
func (o *OpcUA) Init() error {
o.state = Disconnected
err := choice.Check(o.Timestamp, []string{"", "gather", "server", "source"})
if err != nil {
return err
}
err = o.validateEndpoint()
if err != nil {
return err
}
err = o.InitNodes()
if err != nil {
return err
}
err = o.setupOptions()
if err != nil {
return err
}
err = o.setupWorkarounds()
if err != nil {
return err
}
tags := map[string]string{
"endpoint": o.Endpoint,
}
o.ReadError = selfstat.Register("opcua", "read_error", tags)
o.ReadSuccess = selfstat.Register("opcua", "read_success", tags)
return nil
}
func (o *OpcUA) validateEndpoint() error {
if o.MetricName == "" {
return fmt.Errorf("device name is empty")
}
if o.Endpoint == "" {
return fmt.Errorf("endpoint url is empty")
}
_, err := url.Parse(o.Endpoint)
if err != nil {
return fmt.Errorf("endpoint url is invalid")
}
//search security policy type
switch o.SecurityPolicy {
case "None", "Basic128Rsa15", "Basic256", "Basic256Sha256", "auto":
// Valid security policy type - do nothing.
default:
return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityPolicy, o.MetricName)
}
//search security mode type
switch o.SecurityMode {
case "None", "Sign", "SignAndEncrypt", "auto":
// Valid security mode type - do nothing.
default:
return fmt.Errorf("invalid security type '%s' in '%s'", o.SecurityMode, o.MetricName)
}
return nil
}
func tagsSliceToMap(tags [][]string) (map[string]string, error) {
m := make(map[string]string)
for i, tag := range tags {
if len(tag) != 2 {
return nil, fmt.Errorf("tag %d needs 2 values, has %d: %v", i+1, len(tag), tag)
}
if tag[0] == "" {
return nil, fmt.Errorf("tag %d has empty name", i+1)
}
if tag[1] == "" {
return nil, fmt.Errorf("tag %d has empty value", i+1)
}
if _, ok := m[tag[0]]; ok {
return nil, fmt.Errorf("tag %d has duplicate key: %v", i+1, tag[0])
}
m[tag[0]] = tag[1]
}
return m, nil
}
// InitNodes Method on OpcUA
func (o *OpcUA) InitNodes() error {
for _, node := range o.RootNodes {
nodeTags, err := tagsSliceToMap(node.TagsSlice)
if err != nil {
return err
}
o.nodes = append(o.nodes, Node{
metricName: o.MetricName,
tag: node,
metricTags: nodeTags,
})
}
for _, group := range o.Groups {
if group.MetricName == "" {
group.MetricName = o.MetricName
}
groupTags, err := tagsSliceToMap(group.TagsSlice)
if err != nil {
return err
}
for _, node := range group.Nodes {
if node.Namespace == "" {
node.Namespace = group.Namespace
}
if node.IdentifierType == "" {
node.IdentifierType = group.IdentifierType
}
nodeTags, err := tagsSliceToMap(node.TagsSlice)
if err != nil {
return err
}
mergedTags := make(map[string]string)
for k, v := range groupTags {
mergedTags[k] = v
}
for k, v := range nodeTags {
mergedTags[k] = v
}
o.nodes = append(o.nodes, Node{
metricName: group.MetricName,
tag: node,
metricTags: mergedTags,
})
}
}
err := o.validateOPCTags()
if err != nil {
return err
}
return nil
}
type metricParts struct {
metricName string
fieldName string
tags string // sorted by tag name and in format tag1=value1, tag2=value2
}
func newMP(n *Node) metricParts {
var keys []string
for key := range n.metricTags {
keys = append(keys, key)
}
sort.Strings(keys)
var sb strings.Builder
for i, key := range keys {
if i != 0 {
// Writes to a string-builder will always succeed
//nolint:errcheck,revive
sb.WriteString(", ")
}
// Writes to a string-builder will always succeed
//nolint:errcheck,revive
sb.WriteString(key)
// Writes to a string-builder will always succeed
//nolint:errcheck,revive
sb.WriteString("=")
// Writes to a string-builder will always succeed
//nolint:errcheck,revive
sb.WriteString(n.metricTags[key])
}
x := metricParts{
metricName: n.metricName,
fieldName: n.tag.FieldName,
tags: sb.String(),
}
return x
}
func (o *OpcUA) validateOPCTags() error {
nameEncountered := map[metricParts]struct{}{}
for i, node := range o.nodes {
mp := newMP(&node)
//check empty name
if node.tag.FieldName == "" {
return fmt.Errorf("empty name in '%s'", node.tag.FieldName)
}
//search name duplicate
if _, ok := nameEncountered[mp]; ok {
return fmt.Errorf("name '%s' is duplicated (metric name '%s', tags '%s')",
mp.fieldName, mp.metricName, mp.tags)
}
//add it to the set
nameEncountered[mp] = struct{}{}
//search identifier type
switch node.tag.IdentifierType {
case "i":
if _, err := strconv.Atoi(node.tag.Identifier); err != nil {
return fmt.Errorf("identifier type '%s' does not match the type of identifier '%s'", node.tag.IdentifierType, node.tag.Identifier)
}
case "s", "g", "b":
// Valid identifier type - do nothing.
default:
return fmt.Errorf("invalid identifier type '%s' in '%s'", node.tag.IdentifierType, node.tag.FieldName)
}
o.nodes[i].idStr = BuildNodeID(node.tag)
//parse NodeIds and NodeIds errors
nid, niderr := ua.ParseNodeID(o.nodes[i].idStr)
// build NodeIds and Errors
o.nodeIDs = append(o.nodeIDs, nid)
o.nodeIDerror = append(o.nodeIDerror, niderr)
// Grow NodeData for later input
o.nodeData = append(o.nodeData, OPCData{})
}
return nil
}
// BuildNodeID build node ID from OPC tag
func BuildNodeID(tag NodeSettings) string {
return "ns=" + tag.Namespace + ";" + tag.IdentifierType + "=" + tag.Identifier
}
// Connect to a OPCUA device
func Connect(o *OpcUA) error {
u, err := url.Parse(o.Endpoint)
if err != nil {
return err
}
switch u.Scheme {
case "opc.tcp":
o.state = Connecting
if o.client != nil {
if err := o.client.Close(); err != nil {
// Only log the error but to not bail-out here as this prevents
// reconnections for multiple parties (see e.g. #9523).
o.Log.Errorf("Closing connection failed: %v", err)
}
}
o.client = opcua.NewClient(o.Endpoint, o.opts...)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.ConnectTimeout))
defer cancel()
if err := o.client.Connect(ctx); err != nil {
return fmt.Errorf("error in Client Connection: %s", err)
}
if !o.Workarounds.UseUnregisteredReads {
regResp, err := o.client.RegisterNodes(&ua.RegisterNodesRequest{
NodesToRegister: o.nodeIDs,
})
if err != nil {
return fmt.Errorf("registerNodes failed: %v", err)
}
o.req = &ua.ReadRequest{
MaxAge: 2000,
TimestampsToReturn: ua.TimestampsToReturnBoth,
NodesToRead: readvalues(regResp.RegisteredNodeIDs),
}
} else {
var nodesToRead []*ua.ReadValueID
for _, nid := range o.nodeIDs {
nodesToRead = append(nodesToRead, &ua.ReadValueID{NodeID: nid})
}
o.req = &ua.ReadRequest{
MaxAge: 2000,
TimestampsToReturn: ua.TimestampsToReturnBoth,
NodesToRead: nodesToRead,
}
}
err = o.getData()
if err != nil {
return fmt.Errorf("get Data Failed: %v", err)
}
default:
return fmt.Errorf("unsupported scheme %q in endpoint. Expected opc.tcp", u.Scheme)
}
return nil
}
func (o *OpcUA) setupOptions() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.ConnectTimeout))
defer cancel()
// Get a list of the endpoints for our target server
endpoints, err := opcua.GetEndpoints(ctx, o.Endpoint)
if err != nil {
return err
}
if o.Certificate == "" && o.PrivateKey == "" {
if o.SecurityPolicy != "None" || o.SecurityMode != "None" {
o.Certificate, o.PrivateKey, err = generateCert("urn:telegraf:gopcua:client", 2048, o.Certificate, o.PrivateKey, 365*24*time.Hour)
if err != nil {
return err
}
}
}
o.opts, err = o.generateClientOpts(endpoints)
// Init Initialise all required objects
func (o *OpcUA) Init() (err error) {
o.client, err = o.ReadClientConfig.CreateReadClient(o.Log)
return err
}
func (o *OpcUA) setupWorkarounds() error {
if len(o.Workarounds.AdditionalValidStatusCodes) != 0 {
for _, c := range o.Workarounds.AdditionalValidStatusCodes {
val, err := strconv.ParseInt(c, 0, 32) // setting 32 bits to allow for safe conversion
if err != nil {
return err
}
o.codes = append(o.codes, ua.StatusCode(uint32(val)))
}
}
return nil
}
func (o *OpcUA) checkStatusCode(code ua.StatusCode) bool {
for _, val := range o.codes {
if val == code {
return true
}
}
return false
}
func (o *OpcUA) getData() error {
resp, err := o.client.Read(o.req)
if err != nil {
o.ReadError.Incr(1)
return fmt.Errorf("Read failed: %w", err)
}
o.ReadSuccess.Incr(1)
for i, d := range resp.Results {
o.nodeData[i].Quality = d.Status
if !o.checkStatusCode(d.Status) {
mp := newMP(&o.nodes[i])
o.Log.Errorf("status not OK for node '%s'(metric name '%s', tags '%s')",
mp.fieldName, mp.metricName, mp.tags)
continue
}
o.nodeData[i].TagName = o.nodes[i].tag.FieldName
if d.Value != nil {
o.nodeData[i].Value = d.Value.Value()
o.nodeData[i].DataType = d.Value.Type()
}
o.nodeData[i].Quality = d.Status
o.nodeData[i].ServerTime = d.ServerTimestamp
o.nodeData[i].SourceTime = d.SourceTimestamp
}
return nil
}
func readvalues(ids []*ua.NodeID) []*ua.ReadValueID {
rvids := make([]*ua.ReadValueID, len(ids))
for i, v := range ids {
rvids[i] = &ua.ReadValueID{NodeID: v}
}
return rvids
}
func disconnect(o *OpcUA) error {
u, err := url.Parse(o.Endpoint)
if err != nil {
return err
}
switch u.Scheme {
case "opc.tcp":
o.state = Disconnected
o.client.Close()
o.client = nil
return nil
default:
return fmt.Errorf("invalid controller")
}
}
// Gather defines what data the plugin will gather.
func (o *OpcUA) Gather(acc telegraf.Accumulator) error {
if o.state == Disconnected {
o.state = Connecting
err := Connect(o)
if err != nil {
o.state = Disconnected
return err
}
}
o.state = Connected
err := o.getData()
if err != nil && o.state == Connected {
o.state = Disconnected
// Ignore returned error to not mask the original problem
//nolint:errcheck,revive
disconnect(o)
metrics, err := o.client.CurrentValues()
if err != nil {
return err
}
for i, n := range o.nodes {
if o.checkStatusCode(o.nodeData[i].Quality) {
fields := make(map[string]interface{})
tags := map[string]string{
"id": n.idStr,
}
for k, v := range n.metricTags {
tags[k] = v
}
fields[o.nodeData[i].TagName] = o.nodeData[i].Value
fields["Quality"] = strings.TrimSpace(fmt.Sprint(o.nodeData[i].Quality))
switch o.Timestamp {
case "server":
acc.AddFields(n.metricName, fields, tags, o.nodeData[i].ServerTime)
case "source":
acc.AddFields(n.metricName, fields, tags, o.nodeData[i].SourceTime)
default:
acc.AddFields(n.metricName, fields, tags)
}
}
// Parse the resulting data into metrics
for _, m := range metrics {
acc.AddMetric(m)
}
return nil
}
@ -560,17 +52,22 @@ func (o *OpcUA) Gather(acc telegraf.Accumulator) error {
func init() {
inputs.Add("opcua", func() telegraf.Input {
return &OpcUA{
MetricName: "opcua",
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "auto",
SecurityMode: "auto",
Timestamp: "gather",
RequestTimeout: config.Duration(5 * time.Second),
ConnectTimeout: config.Duration(10 * time.Second),
Certificate: "/etc/telegraf/cert.pem",
PrivateKey: "/etc/telegraf/key.pem",
AuthMethod: "Anonymous",
codes: []ua.StatusCode{ua.StatusOK},
ReadClientConfig: ReadClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "auto",
SecurityMode: "auto",
Certificate: "/etc/telegraf/cert.pem",
PrivateKey: "/etc/telegraf/key.pem",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(5 * time.Second),
RequestTimeout: config.Duration(10 * time.Second),
},
MetricName: "opcua",
Timestamp: input.TimestampSourceTelegraf,
},
},
}
})
}

View File

@ -2,19 +2,19 @@ package opcua
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"testing"
"time"
)
const servicePort = "4840"
type OPCTags struct {
Name string
Namespace string
@ -23,7 +23,13 @@ type OPCTags struct {
Want interface{}
}
const servicePort = "4840"
func MapOPCTag(tags OPCTags) (out input.NodeSettings) {
out.FieldName = tags.Name
out.Namespace = tags.Namespace
out.IdentifierType = tags.IdentifierType
out.Identifier = tags.Identifier
return out
}
func TestGetDataBadNodeContainerIntegration(t *testing.T) {
if testing.Short() {
@ -47,20 +53,28 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) {
{"ManufacturerName", "0", "i", "2263", "open62541"},
}
var o OpcUA
o.MetricName = "testing"
o.Endpoint = fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort])
fmt.Println(o.Endpoint)
o.AuthMethod = "Anonymous"
o.ConnectTimeout = config.Duration(10 * time.Second)
o.RequestTimeout = config.Duration(1 * time.Second)
o.SecurityPolicy = "None"
o.SecurityMode = "None"
o.codes = []ua.StatusCode{ua.StatusOK}
logger := &testutil.CaptureLogger{}
o.Log = logger
readConfig := ReadClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
SecurityPolicy: "None",
SecurityMode: "None",
Certificate: "",
PrivateKey: "",
Username: "",
Password: "",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
}
g := GroupSettings{
g := input.NodeGroupSettings{
MetricName: "anodic_current",
TagsSlice: [][]string{
{"pot", "2002"},
@ -70,15 +84,20 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) {
for _, tags := range testopctags {
g.Nodes = append(g.Nodes, MapOPCTag(tags))
}
o.Groups = append(o.Groups, g)
err = o.Init()
readConfig.Groups = append(readConfig.Groups, g)
logger := &testutil.CaptureLogger{}
readClient, err := readConfig.CreateReadClient(logger)
require.NoError(t, err)
err = Connect(&o)
err = readClient.Init()
require.NoError(t, err)
require.Contains(t, logger.LastError, "E! [] status not OK for node 'ProductName'(metric name 'anodic_current', tags 'pot=2002')")
err = readClient.Connect()
require.NoError(t, err)
require.Contains(t, logger.LastError, "E! [] status not OK for node ProductName")
}
func TestClient1Integration(t *testing.T) {
func TestReadClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
@ -99,79 +118,48 @@ func TestClient1Integration(t *testing.T) {
{"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"},
{"badnode", "1", "i", "1337", nil},
{"goodnode", "1", "s", "the.answer", "42"},
{"goodnode", "1", "s", "the.answer", int32(42)},
}
readConfig := ReadClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
SecurityPolicy: "None",
SecurityMode: "None",
Certificate: "",
PrivateKey: "",
Username: "",
Password: "",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
}
var o OpcUA
o.MetricName = "testing"
o.Endpoint = fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort])
o.AuthMethod = "Anonymous"
o.ConnectTimeout = config.Duration(10 * time.Second)
o.RequestTimeout = config.Duration(1 * time.Second)
o.SecurityPolicy = "None"
o.SecurityMode = "None"
o.codes = []ua.StatusCode{ua.StatusOK}
o.Log = testutil.Logger{}
for _, tags := range testopctags {
o.RootNodes = append(o.RootNodes, MapOPCTag(tags))
}
err = o.Init()
if err != nil {
t.Errorf("Initialize Error: %s", err)
}
err = Connect(&o)
if err != nil {
t.Fatalf("Connect Error: %s", err)
readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags))
}
for i, v := range o.nodeData {
if v.Value != nil {
types := reflect.TypeOf(v.Value)
value := reflect.ValueOf(v.Value)
compare := fmt.Sprintf("%v", value.Interface())
if compare != testopctags[i].Want {
t.Errorf("Tag %s: Values %v for type %s does not match record", o.nodes[i].tag.FieldName, value.Interface(), types)
}
} else if testopctags[i].Want != nil {
t.Errorf("Tag: %s has value: %v", o.nodes[i].tag.FieldName, v.Value)
}
}
client, err := readConfig.CreateReadClient(testutil.Logger{})
require.NoError(t, err)
// test unregistered reads workaround
o.Workarounds.UseUnregisteredReads = true
err = client.Init()
require.NoError(t, err, "Initialization")
err = client.Connect()
require.NoError(t, err, "Connect")
for i := range o.nodeData {
o.nodeData[i] = OPCData{}
}
err = Connect(&o)
if err != nil {
t.Fatalf("Connect Error: %s", err)
}
for i, v := range o.nodeData {
if v.Value != nil {
types := reflect.TypeOf(v.Value)
value := reflect.ValueOf(v.Value)
compare := fmt.Sprintf("%v", value.Interface())
if compare != testopctags[i].Want {
t.Errorf("Tag %s: Values %v for type %s does not match record", o.nodes[i].tag.FieldName, value.Interface(), types)
}
} else if testopctags[i].Want != nil {
t.Errorf("Tag: %s has value: %v", o.nodes[i].tag.FieldName, v.Value)
}
for i, v := range client.LastReceivedData {
require.Equal(t, testopctags[i].Want, v.Value)
}
}
func MapOPCTag(tags OPCTags) (out NodeSettings) {
out.FieldName = tags.Name
out.Namespace = tags.Namespace
out.IdentifierType = tags.IdentifierType
out.Identifier = tags.Identifier
return out
}
func TestConfig(t *testing.T) {
func TestReadClientConfig(t *testing.T) {
toml := `
[[inputs.opcua]]
name = "localhost"
@ -185,25 +173,49 @@ private_key = "/etc/telegraf/key.pem"
auth_method = "Anonymous"
username = ""
password = ""
nodes = [
{name="name", namespace="1", identifier_type="s", identifier="one", tags=[["tag0", "val0"]]},
{name="name2", namespace="2", identifier_type="s", identifier="two", tags=[["tag0", "val0"], ["tag00", "val00"]]},
]
[[inputs.opcua.nodes]]
name = "name"
namespace = "1"
identifier_type = "s"
identifier="one"
tags=[["tag0", "val0"]]
[[inputs.opcua.nodes]]
name="name2"
namespace="2"
identifier_type="s"
identifier="two"
tags=[["tag0", "val0"], ["tag00", "val00"]]
default_tags = {tag6 = "val6"}
[[inputs.opcua.group]]
name = "foo"
namespace = "3"
identifier_type = "i"
tags = [["tag1", "val1"], ["tag2", "val2"]]
nodes = [{name="name3", identifier="3000", tags=[["tag3", "val3"]]}]
[[inputs.opcua.group]]
name = "bar"
namespace = "0"
identifier_type = "i"
tags = [["tag1", "val1"], ["tag2", "val2"]]
nodes = [{name="name4", identifier="4000", tags=[["tag1", "override"]]}]
[[inputs.opcua.group.nodes]]
name = "name4"
identifier = "4000"
tags=[["tag4", "val4"]]
default_tags = { tag1 = "override" }
[[inputs.opcua.group.nodes]]
name = "name5"
identifier = "4001"
[inputs.opcua.workarounds]
additional_valid_status_codes = ["0xC0"]
[inputs.opcua.request_workarounds]
use_unregistered_reads = true
`
c := config.NewConfig()
@ -215,206 +227,70 @@ additional_valid_status_codes = ["0xC0"]
o, ok := c.Inputs[0].Input.(*OpcUA)
require.True(t, ok)
require.Len(t, o.RootNodes, 2)
require.Equal(t, o.RootNodes[0].FieldName, "name")
require.Equal(t, o.RootNodes[1].FieldName, "name2")
require.Len(t, o.Groups, 2)
require.Equal(t, o.Groups[0].MetricName, "foo")
require.Len(t, o.Groups[0].Nodes, 1)
require.Equal(t, o.Groups[0].Nodes[0].Identifier, "3000")
require.NoError(t, o.InitNodes())
require.Len(t, o.nodes, 4)
require.Len(t, o.nodes[0].metricTags, 1)
require.Len(t, o.nodes[1].metricTags, 2)
require.Len(t, o.nodes[2].metricTags, 3)
require.Len(t, o.nodes[3].metricTags, 2)
require.Len(t, o.Workarounds.AdditionalValidStatusCodes, 1)
require.Equal(t, o.Workarounds.AdditionalValidStatusCodes[0], "0xC0")
}
func TestConfigWithMismatchedTypes(t *testing.T) {
toml := `
[[inputs.opcua]]
name = "localhost"
endpoint = "opc.tcp://localhost:4840"
connect_timeout = "10s"
request_timeout = "5s"
security_policy = "auto"
security_mode = "auto"
certificate = "/etc/telegraf/cert.pem"
private_key = "/etc/telegraf/key.pem"
auth_method = "Anonymous"
username = ""
password = ""
nodes = [
{name="name", namespace="1", identifier_type="s", identifier="one"},
{name="name2", namespace="2", identifier_type="i", identifier="two"},
]
`
c := config.NewConfig()
err := c.LoadConfigData([]byte(toml))
require.Equal(t, "localhost", o.ReadClientConfig.MetricName)
require.Equal(t, "opc.tcp://localhost:4840", o.ReadClientConfig.Endpoint)
require.Equal(t, config.Duration(10*time.Second), o.ReadClientConfig.ConnectTimeout)
require.Equal(t, config.Duration(5*time.Second), o.ReadClientConfig.RequestTimeout)
require.Equal(t, "auto", o.ReadClientConfig.SecurityPolicy)
require.Equal(t, "auto", o.ReadClientConfig.SecurityMode)
require.Equal(t, "/etc/telegraf/cert.pem", o.ReadClientConfig.Certificate)
require.Equal(t, "/etc/telegraf/key.pem", o.ReadClientConfig.PrivateKey)
require.Equal(t, "Anonymous", o.ReadClientConfig.AuthMethod)
require.Equal(t, "", o.ReadClientConfig.Username)
require.Equal(t, "", o.ReadClientConfig.Password)
require.Equal(t, []input.NodeSettings{
{
FieldName: "name",
Namespace: "1",
IdentifierType: "s",
Identifier: "one",
TagsSlice: [][]string{{"tag0", "val0"}},
},
{
FieldName: "name2",
Namespace: "2",
IdentifierType: "s",
Identifier: "two",
TagsSlice: [][]string{{"tag0", "val0"}, {"tag00", "val00"}},
DefaultTags: map[string]string{"tag6": "val6"},
},
}, o.ReadClientConfig.RootNodes)
require.Equal(t, []input.NodeGroupSettings{
{
MetricName: "foo",
Namespace: "3",
IdentifierType: "i",
TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}},
Nodes: []input.NodeSettings{{
FieldName: "name3",
Identifier: "3000",
TagsSlice: [][]string{{"tag3", "val3"}},
}},
},
{
MetricName: "bar",
Namespace: "0",
IdentifierType: "i",
TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}},
Nodes: []input.NodeSettings{{
FieldName: "name4",
Identifier: "4000",
TagsSlice: [][]string{{"tag4", "val4"}},
DefaultTags: map[string]string{"tag1": "override"},
}, {
FieldName: "name5",
Identifier: "4001",
}},
},
}, o.ReadClientConfig.Groups)
require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.ReadClientConfig.Workarounds)
require.Equal(t, ReadClientWorkarounds{UseUnregisteredReads: true}, o.ReadClientConfig.ReadClientWorkarounds)
err = o.Init()
require.NoError(t, err)
require.Len(t, c.Inputs, 1)
o, ok := c.Inputs[0].Input.(*OpcUA)
require.True(t, ok)
require.Len(t, o.RootNodes, 2)
require.Equal(t, o.RootNodes[0].FieldName, "name")
require.Equal(t, o.RootNodes[1].FieldName, "name2")
require.Error(t, o.InitNodes())
}
func TestTagsSliceToMap(t *testing.T) {
m, err := tagsSliceToMap([][]string{{"foo", "bar"}, {"baz", "bat"}})
require.NoError(t, err)
require.Len(t, m, 2)
require.Equal(t, m["foo"], "bar")
require.Equal(t, m["baz"], "bat")
}
func TestTagsSliceToMap_twoStrings(t *testing.T) {
var err error
_, err = tagsSliceToMap([][]string{{"foo", "bar", "baz"}})
require.Error(t, err)
_, err = tagsSliceToMap([][]string{{"foo"}})
require.Error(t, err)
}
func TestTagsSliceToMap_dupeKey(t *testing.T) {
_, err := tagsSliceToMap([][]string{{"foo", "bar"}, {"foo", "bat"}})
require.Error(t, err)
}
func TestTagsSliceToMap_empty(t *testing.T) {
_, err := tagsSliceToMap([][]string{{"foo", ""}})
require.Equal(t, fmt.Errorf("tag 1 has empty value"), err)
_, err = tagsSliceToMap([][]string{{"", "bar"}})
require.Equal(t, fmt.Errorf("tag 1 has empty name"), err)
}
func TestValidateOPCTags(t *testing.T) {
tests := []struct {
name string
nodes []Node
err error
}{
{
"same",
[]Node{
{
metricName: "mn",
tag: NodeSettings{FieldName: "fn", IdentifierType: "s"},
metricTags: map[string]string{"t1": "v1", "t2": "v2"},
},
{
metricName: "mn",
tag: NodeSettings{FieldName: "fn", IdentifierType: "s"},
metricTags: map[string]string{"t1": "v1", "t2": "v2"},
},
},
fmt.Errorf("name 'fn' is duplicated (metric name 'mn', tags 't1=v1, t2=v2')"),
},
{
"different metric tag names",
[]Node{
{
metricName: "mn",
tag: NodeSettings{FieldName: "fn", IdentifierType: "s"},
metricTags: map[string]string{"t1": "", "t2": ""},
},
{
metricName: "mn",
tag: NodeSettings{FieldName: "fn", IdentifierType: "s"},
metricTags: map[string]string{"t1": "", "t3": ""},
},
},
nil,
},
{
"different metric tag values",
[]Node{
{
metricName: "mn",
tag: NodeSettings{FieldName: "fn", IdentifierType: "s"},
metricTags: map[string]string{"t1": "foo", "t2": ""},
},
{
metricName: "mn",
tag: NodeSettings{FieldName: "fn", IdentifierType: "s"},
metricTags: map[string]string{"t1": "bar", "t2": ""},
},
},
nil,
},
{
"different metric names",
[]Node{
{
metricName: "mn",
tag: NodeSettings{FieldName: "fn", IdentifierType: "s"},
metricTags: map[string]string{"t1": "", "t2": ""},
},
{
metricName: "mn2",
tag: NodeSettings{FieldName: "fn", IdentifierType: "s"},
metricTags: map[string]string{"t1": "", "t2": ""},
},
},
nil,
},
{
"different field names",
[]Node{
{
metricName: "mn",
tag: NodeSettings{FieldName: "fn", IdentifierType: "s"},
metricTags: map[string]string{"t1": "", "t2": ""},
},
{
metricName: "mn",
tag: NodeSettings{FieldName: "fn2", IdentifierType: "s"},
metricTags: map[string]string{"t1": "", "t2": ""},
},
},
nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
o := OpcUA{
nodes: tt.nodes,
Log: testutil.Logger{},
}
require.Equal(t, tt.err, o.validateOPCTags())
})
}
}
func TestSetupWorkarounds(t *testing.T) {
var o OpcUA
o.codes = []ua.StatusCode{ua.StatusOK}
o.Workarounds.AdditionalValidStatusCodes = []string{"0xC0", "0x00AA0000"}
err := o.setupWorkarounds()
require.NoError(t, err)
require.Len(t, o.codes, 3)
require.Equal(t, o.codes[0], ua.StatusCode(0))
require.Equal(t, o.codes[1], ua.StatusCode(192))
require.Equal(t, o.codes[2], ua.StatusCode(11141120))
}
func TestCheckStatusCode(t *testing.T) {
var o OpcUA
o.codes = []ua.StatusCode{ua.StatusCode(0), ua.StatusCode(192), ua.StatusCode(11141120)}
require.Equal(t, o.checkStatusCode(ua.StatusCode(192)), true)
require.Len(t, o.client.NodeMetricMapping, 5, "incorrect number of nodes")
require.EqualValues(t, o.client.NodeMetricMapping[0].MetricTags, map[string]string{"tag0": "val0"})
require.EqualValues(t, o.client.NodeMetricMapping[1].MetricTags, map[string]string{"tag6": "val6"})
require.EqualValues(t, o.client.NodeMetricMapping[2].MetricTags, map[string]string{"tag1": "val1", "tag2": "val2", "tag3": "val3"})
require.EqualValues(t, o.client.NodeMetricMapping[3].MetricTags, map[string]string{"tag1": "override", "tag2": "val2"})
require.EqualValues(t, o.client.NodeMetricMapping[4].MetricTags, map[string]string{"tag1": "val1", "tag2": "val2"})
}

View File

@ -0,0 +1,150 @@
package opcua
import (
"context"
"fmt"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/selfstat"
)
type ReadClientWorkarounds struct {
UseUnregisteredReads bool `toml:"use_unregistered_reads"`
}
type ReadClientConfig struct {
ReadClientWorkarounds ReadClientWorkarounds `toml:"request_workarounds"`
input.InputClientConfig
}
// ReadClient Requests the current values from the required nodes when gather is called.
type ReadClient struct {
*input.OpcUAInputClient
ReadSuccess selfstat.Stat
ReadError selfstat.Stat
Workarounds ReadClientWorkarounds
// internal values
req *ua.ReadRequest
}
func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, error) {
inputClient, err := rc.InputClientConfig.CreateInputClient(log)
if err != nil {
return nil, err
}
tags := map[string]string{
"endpoint": inputClient.Config.OpcUAClientConfig.Endpoint,
}
return &ReadClient{
OpcUAInputClient: inputClient,
ReadSuccess: selfstat.Register("opcua", "read_success", tags),
ReadError: selfstat.Register("opcua", "read_error", tags),
Workarounds: rc.ReadClientWorkarounds,
}, nil
}
func (o *ReadClient) Connect() error {
err := o.OpcUAClient.Connect()
if err != nil {
return err
}
readValueIds := make([]*ua.ReadValueID, len(o.NodeIDs))
if o.Workarounds.UseUnregisteredReads {
for i, nid := range o.NodeIDs {
readValueIds[i] = &ua.ReadValueID{NodeID: nid}
}
} else {
regResp, err := o.Client.RegisterNodes(&ua.RegisterNodesRequest{
NodesToRegister: o.NodeIDs,
})
if err != nil {
return fmt.Errorf("registerNodes failed: %v", err)
}
for i, v := range regResp.RegisteredNodeIDs {
readValueIds[i] = &ua.ReadValueID{NodeID: v}
}
}
o.req = &ua.ReadRequest{
MaxAge: 2000,
TimestampsToReturn: ua.TimestampsToReturnBoth,
NodesToRead: readValueIds,
}
err = o.read()
if err != nil {
return fmt.Errorf("get Data Failed: %v", err)
}
return nil
}
func (o *ReadClient) ensureConnected() error {
if o.State == opcua.Disconnected {
err := o.Connect()
if err != nil {
return err
}
}
return nil
}
func (o *ReadClient) CurrentValues() ([]telegraf.Metric, error) {
err := o.ensureConnected()
if err != nil {
return nil, err
}
err = o.read()
if err != nil && o.State == opcua.Connected {
// We do not return the disconnect error, as this would mask the
// original problem, but we do log it
disconnectErr := o.Disconnect(context.Background())
if disconnectErr != nil {
o.Log.Debug("Error while disconnecting: ", disconnectErr)
}
return nil, err
}
metrics := make([]telegraf.Metric, 0, len(o.NodeMetricMapping))
// Parse the resulting data into metrics
for i := range o.NodeIDs {
if !o.StatusCodeOK(o.LastReceivedData[i].Quality) {
continue
}
metrics = append(metrics, o.MetricForNode(i))
}
return metrics, nil
}
func (o *ReadClient) read() error {
resp, err := o.Client.Read(o.req)
if err != nil {
o.ReadError.Incr(1)
return fmt.Errorf("RegisterNodes Read failed: %v", err)
}
o.ReadSuccess.Incr(1)
for i, d := range resp.Results {
o.UpdateNodeValue(i, d)
}
return nil
}
// StartStreamValues does nothing for the read client, as it has to actively fetch values. The channel is closed immediately.
func (o *ReadClient) StartStreamValues(_ context.Context) (<-chan telegraf.Metric, error) {
c := make(chan telegraf.Metric)
defer close(c)
return c, nil
}

View File

@ -9,7 +9,7 @@
## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s"
#
## Maximum time allowed for a request over the estabilished connection.
## Maximum time allowed for a request over the established connection.
# request_timeout = "5s"
#
## Security policy, one of "None", "Basic128Rsa15", "Basic256",
@ -48,18 +48,38 @@
## namespace - OPC UA namespace of the node (integer value 0 thru 3)
## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque)
## identifier - OPC UA ID (tag as shown in opcua browser)
## tags - extra tags to be added to the output metric (optional)
## Example:
## {name="ProductUri", namespace="0", identifier_type="i", identifier="2262", tags=[["tag1","value1"],["tag2","value2"]]}
## tags - extra tags to be added to the output metric (optional); deprecated in 1.25.0; use default_tags
## default_tags - extra tags to be added to the output metric (optional)
##
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier=""},
# {name="", namespace="", identifier_type="", identifier=""},
#]
# {name="", namespace="", identifier_type="", identifier="", tags=[["tag1", "value1"], ["tag2", "value2"]},
# {name="", namespace="", identifier_type="", identifier=""},
# ]
#
## Bracketed notation
# [[inputs.opcua.nodes]]
# name = "node1"
# namespace = ""
# identifier_type = ""
# identifier = ""
# default_tags = { tag1 = "value1", tag2 = "value2" }
#
# [[inputs.opcua.nodes]]
# name = "node2"
# namespace = ""
# identifier_type = ""
# identifier = ""
#
## Node Group
## Sets defaults for OPC UA namespace and ID type so they aren't required in
## every node. A group can also have a metric name that overrides the main
## plugin metric name.
## Sets defaults so they aren't required in every node.
## Default values can be set for:
## * Metric name
## * OPC UA namespace
## * Identifier
## * Default tags
##
## Multiple node groups are allowed
#[[inputs.opcua.group]]
@ -75,16 +95,39 @@
## namespace, this is used.
# identifier_type =
#
## Default tags that are applied to every node in this group. Can be
## overwritten in a node by setting a different value for the tag name.
## example: default_tags = { tag1 = "value1" }
# default_tags = {}
#
## Node ID Configuration. Array of nodes with the same settings as above.
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier=""},
# {name="", namespace="", identifier_type="", identifier=""},
# {name="node1", namespace="", identifier_type="", identifier=""},
# {name="node2", namespace="", identifier_type="", identifier=""},
#]
#
## Bracketed notation
# [[inputs.opcua.group.nodes]]
# name = "node1"
# namespace = ""
# identifier_type = ""
# identifier = ""
# default_tags = { tag1 = "override1", tag2 = "value2" }
#
# [[inputs.opcua.group.nodes]]
# name = "node2"
# namespace = ""
# identifier_type = ""
# identifier = ""
## Enable workarounds required by some devices to work correctly
# [inputs.opcua.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid
# additional_valid_status_codes = ["0xC0"]
# [inputs.opcua.request_workarounds]
## Use unregistered reads instead of registered reads
# use_unregistered_reads = false

View File

@ -0,0 +1,241 @@
# OPC UA Client Listener Input Plugin
The `opcua_listener` plugin subscribes to data from OPC UA Server devices.
Telegraf minimum version: Telegraf 1.25
Plugin minimum tested version: 1.25
## Configuration
```toml @sample.conf
# Retrieve data from OPCUA devices
[[inputs.opcua_listener]]
## Metric name
# name = "opcua_listener"
#
## OPC UA Endpoint URL
# endpoint = "opc.tcp://localhost:4840"
#
## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s"
#
## Maximum time allowed for a request over the established connection.
# request_timeout = "5s"
#
## The interval at which the server should at least update its monitored items
# subscription_interval = "100ms"
#
## Security policy, one of "None", "Basic128Rsa15", "Basic256",
## "Basic256Sha256", or "auto"
# security_policy = "auto"
#
## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto"
# security_mode = "auto"
#
## Path to cert.pem. Required when security mode or policy isn't "None".
## If cert path is not supplied, self-signed cert and key will be generated.
# certificate = "/etc/telegraf/cert.pem"
#
## Path to private key.pem. Required when security mode or policy isn't "None".
## If key path is not supplied, self-signed cert and key will be generated.
# private_key = "/etc/telegraf/key.pem"
#
## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To
## authenticate using a specific ID, select 'Certificate' or 'UserName'
# auth_method = "Anonymous"
#
## Username. Required for auth_method = "UserName"
# username = ""
#
## Password. Required for auth_method = "UserName"
# password = ""
#
## Option to select the metric timestamp to use. Valid options are:
## "gather" -- uses the time of receiving the data in telegraf
## "server" -- uses the timestamp provided by the server
## "source" -- uses the timestamp provided by the source
# timestamp = "gather"
#
## Node ID configuration
## name - field name to use in the output
## namespace - OPC UA namespace of the node (integer value 0 thru 3)
## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque)
## identifier - OPC UA ID (tag as shown in opcua browser)
## default_tags - extra tags to be added to the output metric (optional)
##
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier=""},
# {name="", namespace="", identifier_type="", identifier=""},
# ]
#
## Bracketed notation
# [[inputs.opcua_listener.nodes]]
# name = "node1"
# namespace = ""
# identifier_type = ""
# identifier = ""
# default_tags = { tag1 = "value1", tag2 = "value2" }
#
# [[inputs.opcua_listener.nodes]]
# name = "node2"
# namespace = ""
# identifier_type = ""
# identifier = ""
#
## Node Group
## Sets defaults so they aren't required in every node.
## Default values can be set for:
## * Metric name
## * OPC UA namespace
## * Identifier
## * Default tags
##
## Multiple node groups are allowed
#[[inputs.opcua_listener.group]]
## Group Metric name. Overrides the top level name. If unset, the
## top level name is used.
# name =
#
## Group default namespace. If a node in the group doesn't set its
## namespace, this is used.
# namespace =
#
## Group default identifier type. If a node in the group doesn't set its
## namespace, this is used.
# identifier_type =
#
## Default tags that are applied to every node in this group. Can be
## overwritten in a node by setting a different value for the tag name.
## example: default_tags = { tag1 = "value1" }
# default_tags = {}
#
## Node ID Configuration. Array of nodes with the same settings as above.
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="node1", namespace="", identifier_type="", identifier=""},
# {name="node2", namespace="", identifier_type="", identifier=""},
#]
#
## Bracketed notation
# [[inputs.opcua_listener.group.nodes]]
# name = "node1"
# namespace = ""
# identifier_type = ""
# identifier = ""
# default_tags = { tag1 = "override1", tag2 = "value2" }
#
# [[inputs.opcua_listener.group.nodes]]
# name = "node2"
# namespace = ""
# identifier_type = ""
# identifier = ""
## Enable workarounds required by some devices to work correctly
# [inputs.opcua_listener.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid
# additional_valid_status_codes = ["0xC0"]
# [inputs.opcua_listener.request_workarounds]
## Use unregistered reads instead of registered reads
# use_unregistered_reads = false
```
## Node Configuration
An OPC UA node ID may resemble: "ns=3;s=Temperature". In this example:
- ns=3 is indicating the `namespace` is 3
- s=Temperature is indicting that the `identifier_type` is a string and `identifier` value is 'Temperature'
- This example temperature node has a value of 79.0
To gather data from this node enter the following line into the 'nodes' property above:
```text
{field_name="temp", namespace="3", identifier_type="s", identifier="Temperature"},
```
This node configuration produces a metric like this:
```text
opcua,id=ns\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000
```
## Group Configuration
Groups can set default values for the namespace, identifier type, and
tags settings. The default values apply to all the nodes in the
group. If a default is set, a node may omit the setting altogether.
This simplifies node configuration, especially when many nodes share
the same namespace or identifier type.
The output metric will include tags set in the group and the node. If
a tag with the same name is set in both places, the tag value from the
node is used.
This example group configuration has three groups with two nodes each:
```toml
# Group 1
[[inputs.opcua_listener.group]]
name = "group1_metric_name"
namespace = "3"
identifier_type = "i"
default_tags = { group1_tag = "val1" }
[[inputs.opcua.group.nodes]]
name = "name"
identifier = "1001"
default_tags = { node1_tag = "val2" }
[[inputs.opcua.group.nodes]]
name = "name"
identifier = "1002"
default_tags = {node1_tag = "val3"}
# Group 2
[[inputs.opcua_listener.group]]
name = "group2_metric_name"
namespace = "3"
identifier_type = "i"
default_tags = { group2_tag = "val3" }
[[inputs.opcua.group.nodes]]
name = "saw"
identifier = "1003"
default_tags = { node2_tag = "val4" }
[[inputs.opcua.group.nodes]]
name = "sin"
identifier = "1004"
# Group 3
[[inputs.opcua_listener.group]]
name = "group3_metric_name"
namespace = "3"
identifier_type = "i"
default_tags = { group3_tag = "val5" }
nodes = [
{name="name", identifier="1001"},
{name="name", identifier="1002"},
]
```
## Connection Service
This plugin subscribes to the specified nodes to receive data from
the OPC server. The updates are received at most as fast as the
`subscription_interval`.
## Metrics
The metrics collected by this input plugin will depend on the
configured `nodes` and `group`.
## Example Output
```text
group1_metric_name,group1_tag=val1,id=ns\=3;i\=1001,node1_tag=val2 name=0,Quality="OK (0x0)" 1606893246000000000
group1_metric_name,group1_tag=val1,id=ns\=3;i\=1002,node1_tag=val3 name=-1.389117,Quality="OK (0x0)" 1606893246000000000
group2_metric_name,group2_tag=val3,id=ns\=3;i\=1003,node2_tag=val4 Quality="OK (0x0)",saw=-1.6 1606893246000000000
group2_metric_name,group2_tag=val3,id=ns\=3;i\=1004 sin=1.902113,Quality="OK (0x0)" 1606893246000000000
```

View File

@ -0,0 +1,91 @@
package opcua_listener
import (
"context"
_ "embed"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/plugins/inputs"
"time"
)
type OpcUaListener struct {
SubscribeClientConfig
client *SubscribeClient
Log telegraf.Logger `toml:"-"`
}
//go:embed sample.conf
var sampleConfig string
func (*OpcUaListener) SampleConfig() string {
return sampleConfig
}
func (o *OpcUaListener) Init() (err error) {
o.client, err = o.SubscribeClientConfig.CreateSubscribeClient(o.Log)
return err
}
func (o *OpcUaListener) Gather(_ telegraf.Accumulator) error {
return nil
}
func (o *OpcUaListener) Start(acc telegraf.Accumulator) error {
ctx := context.Background()
ch, err := o.client.StartStreamValues(ctx)
if err != nil {
return err
}
go func() {
for {
m, ok := <-ch
if !ok {
o.Log.Debug("Metric collection stopped due to closed channel")
return
}
acc.AddMetric(m)
}
}()
return nil
}
func (o *OpcUaListener) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
select {
case <-o.client.Stop(ctx):
o.Log.Infof("Unsubscribed OPC UA successfully")
case <-ctx.Done(): // Timeout context
o.Log.Warn("Timeout while stopping OPC UA subscription")
}
cancel()
}
// Add this plugin to telegraf
func init() {
inputs.Add("opcua_listener", func() telegraf.Input {
return &OpcUaListener{
SubscribeClientConfig: SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://localhost:4840",
SecurityPolicy: "auto",
SecurityMode: "auto",
Certificate: "/etc/telegraf/cert.pem",
PrivateKey: "/etc/telegraf/key.pem",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(5 * time.Second),
RequestTimeout: config.Duration(10 * time.Second),
},
MetricName: "opcua",
Timestamp: input.TimestampSourceTelegraf,
},
SubscriptionInterval: config.Duration(100 * time.Millisecond),
},
}
})
}

View File

@ -0,0 +1,240 @@
package opcua_listener
import (
"context"
"fmt"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"testing"
"time"
)
const servicePort = "4840"
type OPCTags struct {
Name string
Namespace string
IdentifierType string
Identifier string
Want interface{}
}
func MapOPCTag(tags OPCTags) (out input.NodeSettings) {
out.FieldName = tags.Name
out.Namespace = tags.Namespace
out.IdentifierType = tags.IdentifierType
out.Identifier = tags.Identifier
return out
}
func TestSubscribeClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := testutil.Container{
Image: "open62541/open62541",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
var testopctags = []OPCTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"},
{"badnode", "1", "i", "1337", nil},
{"goodnode", "1", "s", "the.answer", int32(42)},
}
var tagsRemaining = make([]string, 0, len(testopctags))
for i, tag := range testopctags {
if tag.Want != nil {
tagsRemaining = append(tagsRemaining, testopctags[i].Name)
}
}
subscribeConfig := SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
SubscriptionInterval: 0,
}
for _, tags := range testopctags {
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, MapOPCTag(tags))
}
o, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
require.NoError(t, err)
err = o.Init()
require.NoError(t, err, "Initialization")
err = o.Connect()
require.NoError(t, err, "Connect")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
res, err := o.StartStreamValues(ctx)
require.NoError(t, err)
for {
select {
case m := <-res:
for fieldName, fieldValue := range m.Fields() {
for _, tag := range testopctags {
if fieldName != tag.Name {
continue
}
if tag.Want == nil {
t.Errorf("Tag: %s has value: %v", tag.Name, fieldValue)
return
}
require.Equal(t, tag.Want, fieldValue)
newRemaining := make([]string, 0, len(tagsRemaining))
for _, remainingTag := range tagsRemaining {
if fieldName != remainingTag {
newRemaining = append(newRemaining, remainingTag)
break
}
}
if len(newRemaining) <= 0 {
return
}
tagsRemaining = newRemaining
}
}
case <-ctx.Done():
msg := ""
for _, tag := range tagsRemaining {
msg += tag + ", "
}
t.Errorf("Tags %s are remaining without a received value", msg)
return
}
}
}
func TestSubscribeClientConfig(t *testing.T) {
toml := `
[[inputs.opcua_listener]]
name = "localhost"
endpoint = "opc.tcp://localhost:4840"
connect_timeout = "10s"
request_timeout = "5s"
subscription_interval = "200ms"
security_policy = "auto"
security_mode = "auto"
certificate = "/etc/telegraf/cert.pem"
private_key = "/etc/telegraf/key.pem"
auth_method = "Anonymous"
username = ""
password = ""
nodes = [
{name="name", namespace="1", identifier_type="s", identifier="one"},
{name="name2", namespace="2", identifier_type="s", identifier="two"},
]
[[inputs.opcua_listener.group]]
name = "foo"
namespace = "3"
identifier_type = "i"
tags = [["tag1", "val1"], ["tag2", "val2"]]
nodes = [{name="name3", identifier="3000", tags=[["tag3", "val3"]]}]
[[inputs.opcua_listener.group]]
name = "bar"
namespace = "0"
identifier_type = "i"
tags = [["tag1", "val1"], ["tag2", "val2"]]
nodes = [{name="name4", identifier="4000", tags=[["tag1", "override"]]}]
[inputs.opcua_listener.workarounds]
additional_valid_status_codes = ["0xC0"]
`
c := config.NewConfig()
err := c.LoadConfigData([]byte(toml))
require.NoError(t, err)
require.Len(t, c.Inputs, 1)
o, ok := c.Inputs[0].Input.(*OpcUaListener)
require.True(t, ok)
require.Equal(t, "localhost", o.SubscribeClientConfig.MetricName)
require.Equal(t, "opc.tcp://localhost:4840", o.SubscribeClientConfig.Endpoint)
require.Equal(t, config.Duration(10*time.Second), o.SubscribeClientConfig.ConnectTimeout)
require.Equal(t, config.Duration(5*time.Second), o.SubscribeClientConfig.RequestTimeout)
require.Equal(t, config.Duration(200*time.Millisecond), o.SubscribeClientConfig.SubscriptionInterval)
require.Equal(t, "auto", o.SubscribeClientConfig.SecurityPolicy)
require.Equal(t, "auto", o.SubscribeClientConfig.SecurityMode)
require.Equal(t, "/etc/telegraf/cert.pem", o.SubscribeClientConfig.Certificate)
require.Equal(t, "/etc/telegraf/key.pem", o.SubscribeClientConfig.PrivateKey)
require.Equal(t, "Anonymous", o.SubscribeClientConfig.AuthMethod)
require.Equal(t, "", o.SubscribeClientConfig.Username)
require.Equal(t, "", o.SubscribeClientConfig.Password)
require.Equal(t, []input.NodeSettings{
{
FieldName: "name",
Namespace: "1",
IdentifierType: "s",
Identifier: "one",
},
{
FieldName: "name2",
Namespace: "2",
IdentifierType: "s",
Identifier: "two",
},
}, o.SubscribeClientConfig.RootNodes)
require.Equal(t, []input.NodeGroupSettings{
{
MetricName: "foo",
Namespace: "3",
IdentifierType: "i",
TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}},
Nodes: []input.NodeSettings{{
FieldName: "name3",
Identifier: "3000",
TagsSlice: [][]string{{"tag3", "val3"}},
}},
},
{
MetricName: "bar",
Namespace: "0",
IdentifierType: "i",
TagsSlice: [][]string{{"tag1", "val1"}, {"tag2", "val2"}},
Nodes: []input.NodeSettings{{
FieldName: "name4",
Identifier: "4000",
TagsSlice: [][]string{{"tag1", "override"}},
}},
},
}, o.SubscribeClientConfig.Groups)
require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.SubscribeClientConfig.Workarounds)
}

View File

@ -0,0 +1,135 @@
# Retrieve data from OPCUA devices
[[inputs.opcua]]
## Metric name
# name = "opcua"
#
## OPC UA Endpoint URL
# endpoint = "opc.tcp://localhost:4840"
#
## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s"
#
## Maximum time allowed for a request over the established connection.
# request_timeout = "5s"
#
## The interval at which the server should at least update its monitored items
# subscription_interval = "100ms"
#
## Security policy, one of "None", "Basic128Rsa15", "Basic256",
## "Basic256Sha256", or "auto"
# security_policy = "auto"
#
## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto"
# security_mode = "auto"
#
## Path to cert.pem. Required when security mode or policy isn't "None".
## If cert path is not supplied, self-signed cert and key will be generated.
# certificate = "/etc/telegraf/cert.pem"
#
## Path to private key.pem. Required when security mode or policy isn't "None".
## If key path is not supplied, self-signed cert and key will be generated.
# private_key = "/etc/telegraf/key.pem"
#
## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To
## authenticate using a specific ID, select 'Certificate' or 'UserName'
# auth_method = "Anonymous"
#
## Username. Required for auth_method = "UserName"
# username = ""
#
## Password. Required for auth_method = "UserName"
# password = ""
#
## Option to select the metric timestamp to use. Valid options are:
## "gather" -- uses the time of receiving the data in telegraf
## "server" -- uses the timestamp provided by the server
## "source" -- uses the timestamp provided by the source
# timestamp = "gather"
#
## Node ID configuration
## name - field name to use in the output
## namespace - OPC UA namespace of the node (integer value 0 thru 3)
## identifier_type - OPC UA ID type (s=string, i=numeric, g=guid, b=opaque)
## identifier - OPC UA ID (tag as shown in opcua browser)
## default_tags - extra tags to be added to the output metric (optional)
##
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="", namespace="", identifier_type="", identifier=""},
# {name="", namespace="", identifier_type="", identifier=""},
# ]
#
## Bracketed notation
# [[inputs.opcua.nodes]]
# name = "node1"
# namespace = ""
# identifier_type = ""
# identifier = ""
# default_tags = { tag1 = "value1", tag2 = "value2" }
#
# [[inputs.opcua.nodes]]
# name = "node2"
# namespace = ""
# identifier_type = ""
# identifier = ""
#
## Node Group
## Sets defaults so they aren't required in every node.
## Default values can be set for:
## * Metric name
## * OPC UA namespace
## * Identifier
## * Default tags
##
## Multiple node groups are allowed
#[[inputs.opcua.group]]
## Group Metric name. Overrides the top level name. If unset, the
## top level name is used.
# name =
#
## Group default namespace. If a node in the group doesn't set its
## namespace, this is used.
# namespace =
#
## Group default identifier type. If a node in the group doesn't set its
## namespace, this is used.
# identifier_type =
#
## Default tags that are applied to every node in this group. Can be
## overwritten in a node by setting a different value for the tag name.
## example: default_tags = { tag1 = "value1" }
# default_tags = {}
#
## Node ID Configuration. Array of nodes with the same settings as above.
## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet)
# nodes = [
# {name="node1", namespace="", identifier_type="", identifier=""},
# {name="node2", namespace="", identifier_type="", identifier=""},
#]
#
## Bracketed notation
# [[inputs.opcua.group.nodes]]
# name = "node1"
# namespace = ""
# identifier_type = ""
# identifier = ""
# default_tags = { tag1 = "override1", tag2 = "value2" }
#
# [[inputs.opcua.group.nodes]]
# name = "node2"
# namespace = ""
# identifier_type = ""
# identifier = ""
## Enable workarounds required by some devices to work correctly
# [inputs.opcua.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid
# additional_valid_status_codes = ["0xC0"]
# [inputs.opcua.request_workarounds]
## Use unregistered reads instead of registered reads
# use_unregistered_reads = false

View File

@ -0,0 +1,153 @@
package opcua_listener
import (
"context"
"fmt"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"reflect"
"time"
)
type SubscribeClientConfig struct {
input.InputClientConfig
SubscriptionInterval config.Duration `toml:"subscription_interval"`
}
type SubscribeClient struct {
*input.OpcUAInputClient
Config SubscribeClientConfig
sub *opcua.Subscription
monitoredItemsReqs []*ua.MonitoredItemCreateRequest
dataNotifications chan *opcua.PublishNotificationData
metrics chan telegraf.Metric
processingCtx context.Context
processingCancel context.CancelFunc
}
func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*SubscribeClient, error) {
client, err := sc.InputClientConfig.CreateInputClient(log)
if err != nil {
return nil, err
}
subClient := &SubscribeClient{
OpcUAInputClient: client,
Config: *sc,
monitoredItemsReqs: make([]*ua.MonitoredItemCreateRequest, len(client.NodeIDs)),
// 100 was chosen to make sure that the channels will not block when multiple changes come in at the same time.
// The channel size should be increased if reports come in on Telegraf blocking when many changes come in at
// the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval.
dataNotifications: make(chan *opcua.PublishNotificationData, 100),
metrics: make(chan telegraf.Metric, 100),
}
log.Debugf("Creating monitored items")
for i, nodeID := range client.NodeIDs {
// The node id index (i) is used as the handle for the monitored item
req := opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, uint32(i))
subClient.monitoredItemsReqs[i] = req
}
return subClient, nil
}
func (o *SubscribeClient) Connect() error {
err := o.OpcUAClient.Connect()
if err != nil {
return err
}
o.Log.Debugf("Creating OPC UA subscription")
o.sub, err = o.Client.Subscribe(&opcua.SubscriptionParameters{
Interval: time.Duration(o.Config.SubscriptionInterval),
}, o.dataNotifications)
if err != nil {
o.Log.Error("Failed to create subscription")
return err
}
o.Log.Debugf("Subscribed with subscription ID %d", o.sub.SubscriptionID)
return nil
}
func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} {
o.Log.Debugf("Opc Subscribe Stopped")
err := o.sub.Cancel(ctx)
if err != nil {
o.Log.Warn("Cancelling OPC UA subscription failed with error ", err)
}
closing := o.OpcUAInputClient.Stop(ctx)
o.processingCancel()
return closing
}
func (o *SubscribeClient) CurrentValues() ([]telegraf.Metric, error) {
return []telegraf.Metric{}, nil
}
func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) {
err := o.Connect()
if err != nil {
return nil, err
}
resp, err := o.sub.MonitorWithContext(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...)
if err != nil {
o.Log.Error("Failed to create monitored items ", err)
return nil, fmt.Errorf("failed to start monitoring items %s", err)
}
o.Log.Debug("Monitoring items")
for _, res := range resp.Results {
if !o.StatusCodeOK(res.StatusCode) {
return nil, fmt.Errorf("creating monitored item failed with status code %d", res.StatusCode)
}
}
o.processingCtx, o.processingCancel = context.WithCancel(context.Background())
go o.processReceivedNotifications()
return o.metrics, nil
}
func (o *SubscribeClient) processReceivedNotifications() {
for {
select {
case <-o.processingCtx.Done():
o.Log.Debug("Processing received notifications stopped")
return
case res, ok := <-o.dataNotifications:
if !ok {
o.Log.Debugf("Data notification channel closed. Processing of received notifications stopped")
return
}
if res.Error != nil {
o.Log.Error(res.Error)
continue
}
switch notif := res.Value.(type) {
case *ua.DataChangeNotification:
o.Log.Debugf("Received data change notification with %d items", len(notif.MonitoredItems))
// It is assumed the notifications are ordered chronologically
for _, monitoredItemNotif := range notif.MonitoredItems {
i := int(monitoredItemNotif.ClientHandle)
oldValue := o.LastReceivedData[i].Value
o.UpdateNodeValue(i, monitoredItemNotif.Value)
o.Log.Debugf("Data change notification: node '%s' value changed from %f to %f",
o.NodeIDs[i].String(), oldValue, o.LastReceivedData[i].Value)
o.metrics <- o.MetricForNode(i)
}
default:
o.Log.Warnf("Received notification has unexpected type %s", reflect.TypeOf(res.Value))
}
}
}
}