feat(common.opcua): Add option to include OPC-UA DataType as a Field (#14345)

This commit is contained in:
Christian Allinson 2023-12-07 15:02:25 -05:00 committed by GitHub
parent 2a81343ad3
commit fbd2dcc9d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 295 additions and 3 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal/choice"
)
type OpcUAWorkarounds struct {
@ -44,13 +45,23 @@ type OpcUAClientConfig struct {
ConnectTimeout config.Duration `toml:"connect_timeout"`
RequestTimeout config.Duration `toml:"request_timeout"`
Workarounds OpcUAWorkarounds `toml:"workarounds"`
OptionalFields []string `toml:"optional_fields"`
Workarounds OpcUAWorkarounds `toml:"workarounds"`
}
func (o *OpcUAClientConfig) Validate() error {
if err := o.validateOptionalFields(); err != nil {
return fmt.Errorf("invalid 'optional_fields': %w", err)
}
return o.validateEndpoint()
}
func (o *OpcUAClientConfig) validateOptionalFields() error {
validFields := []string{"DataType"}
return choice.CheckSlice(o.OptionalFields, validFields)
}
func (o *OpcUAClientConfig) validateEndpoint() error {
if o.Endpoint == "" {
return fmt.Errorf("endpoint url is empty")

View File

@ -421,6 +421,9 @@ func (o *OpcUAInputClient) MetricForNode(nodeIdx int) telegraf.Metric {
fields[nmm.Tag.FieldName] = o.LastReceivedData[nodeIdx].Value
fields["Quality"] = strings.TrimSpace(o.LastReceivedData[nodeIdx].Quality.Error())
if choice.Contains("DataType", o.Config.OptionalFields) {
fields["DataType"] = strings.Replace(o.LastReceivedData[nodeIdx].DataType.String(), "TypeID", "", 1)
}
if !o.StatusCodeOK(o.LastReceivedData[nodeIdx].Quality) {
mp := newMP(nmm)
o.Log.Debugf("status not OK for node %q(metric name %q, tags %q)",

View File

@ -71,6 +71,11 @@ to use them.
## "source" -- uses the timestamp provided by the source
# timestamp = "gather"
#
## Include additional Fields in each metric
## Available options are:
## DataType -- OPC-UA Data Type (string)
# optional_fields = []
#
## Node ID configuration
## name - field name to use in the output
## namespace - OPC UA namespace of the node (integer value 0 thru 3)
@ -177,7 +182,14 @@ To gather data from this node enter the following line into the 'nodes' property
This node configuration produces a metric like this:
```text
opcua,id=ns\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000
opcua,id=ns\=3;s\=Temperature temp=79.0,Quality="OK (0x0)" 1597820490000000000
```
With 'DataType' entered in Additional Metrics, this node configuration
produces a metric like this:
```text
opcua,id=ns\=3;s\=Temperature temp=79.0,Quality="OK (0x0)",DataType="Float" 1597820490000000000
```
## Group Configuration

View File

@ -9,7 +9,9 @@ import (
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/testutil"
@ -150,6 +152,95 @@ func TestReadClientIntegration(t *testing.T) {
}
}
func TestReadClientIntegrationAdditionalFields(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := testutil.Container{
Image: "open62541/open62541",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("TCP network layer listening on opc.tcp://"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
testopctags := []OPCTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"},
{"badnode", "1", "i", "1337", nil},
{"goodnode", "1", "s", "the.answer", int32(42)},
{"DateTime", "1", "i", "51037", "0001-01-01T00:00:00Z"},
}
testopctypes := []string{
"String",
"String",
"String",
"Null",
"Int32",
"DateTime",
}
testopcquality := []string{
"OK (0x0)",
"OK (0x0)",
"OK (0x0)",
"User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)",
"OK (0x0)",
"OK (0x0)",
}
expectedopcmetrics := []telegraf.Metric{}
for i, x := range testopctags {
now := time.Now()
tags := map[string]string{
"id": fmt.Sprintf("ns=%s;%s=%s", x.Namespace, x.IdentifierType, x.Identifier),
}
fields := map[string]interface{}{
x.Name: x.Want,
"Quality": testopcquality[i],
"DataType": testopctypes[i],
}
expectedopcmetrics = append(expectedopcmetrics, metric.New("testing", tags, fields, now))
}
readConfig := ReadClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
OptionalFields: []string{"DataType"},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
}
for _, tags := range testopctags {
readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags))
}
client, err := readConfig.CreateReadClient(testutil.Logger{})
require.NoError(t, err)
require.NoError(t, client.Connect())
actualopcmetrics := []telegraf.Metric{}
for i := range client.LastReceivedData {
actualopcmetrics = append(actualopcmetrics, client.MetricForNode(i))
}
testutil.RequireMetricsEqual(t, expectedopcmetrics, actualopcmetrics, testutil.IgnoreTime())
}
func TestReadClientIntegrationWithPasswordAuth(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
@ -223,6 +314,8 @@ auth_method = "Anonymous"
username = ""
password = ""
optional_fields = ["DataType"]
[[inputs.opcua.nodes]]
name = "name"
namespace = "1"
@ -265,6 +358,7 @@ additional_valid_status_codes = ["0xC0"]
[inputs.opcua.request_workarounds]
use_unregistered_reads = true
`
c := config.NewConfig()
@ -334,6 +428,7 @@ use_unregistered_reads = true
}, o.ReadClientConfig.Groups)
require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.ReadClientConfig.Workarounds)
require.Equal(t, ReadClientWorkarounds{UseUnregisteredReads: true}, o.ReadClientConfig.ReadClientWorkarounds)
require.Equal(t, []string{"DataType"}, o.ReadClientConfig.OptionalFields)
err = o.Init()
require.NoError(t, err)
require.Len(t, o.client.NodeMetricMapping, 5, "incorrect number of nodes")

View File

@ -43,6 +43,11 @@
## "source" -- uses the timestamp provided by the source
# timestamp = "gather"
#
## Include additional Fields in each metric
## Available options are:
## DataType -- OPC-UA Data Type (string)
# optional_fields = []
#
## Node ID configuration
## name - field name to use in the output
## namespace - OPC UA namespace of the node (integer value 0 thru 3)

View File

@ -91,6 +91,11 @@ to use them.
# e.g.: json_timestamp_format = "2006-01-02T15:04:05Z07:00"
#timestamp_format = ""
#
## Include additional Fields in each metric
## Available options are:
## DataType -- OPC-UA Data Type (string)
# optional_fields = []
#
## Node ID configuration
## name - field name to use in the output
## namespace - OPC UA namespace of the node (integer value 0 thru 3)
@ -226,6 +231,7 @@ to use them.
# deadband_type = "Absolute"
# deadband_value = 0.0
#
## Enable workarounds required by some devices to work correctly
# [inputs.opcua_listener.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid
@ -252,7 +258,14 @@ To gather data from this node enter the following line into the 'nodes' property
This node configuration produces a metric like this:
```text
opcua,id=ns\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000
opcua,id=ns\=3;s\=Temperature temp=79.0,Quality="OK (0x0)" 1597820490000000000
```
With 'DataType' entered in Additional Metrics, this node configuration
produces a metric like this:
```text
opcua,id=ns\=3;s\=Temperature temp=79.0,Quality="OK (0x0)",DataType="Float" 1597820490000000000
```
## Group Configuration

View File

@ -11,7 +11,9 @@ import (
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/testutil"
@ -148,6 +150,147 @@ func TestSubscribeClientIntegration(t *testing.T) {
}
}
func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := testutil.Container{
Image: "open62541/open62541",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("TCP network layer listening on opc.tcp://"),
),
}
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()
testopctags := []OPCTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"},
{"badnode", "1", "i", "1337", nil},
{"goodnode", "1", "s", "the.answer", int32(42)},
{"DateTime", "1", "i", "51037", "0001-01-01T00:00:00Z"},
}
testopctypes := []string{
"String",
"String",
"String",
"Null",
"Int32",
"DateTime",
}
testopcquality := []string{
"OK (0x0)",
"OK (0x0)",
"OK (0x0)",
"User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)",
"OK (0x0)",
"OK (0x0)",
}
expectedopcmetrics := []telegraf.Metric{}
for i, x := range testopctags {
now := time.Now()
tags := map[string]string{
"id": fmt.Sprintf("ns=%s;%s=%s", x.Namespace, x.IdentifierType, x.Identifier),
}
fields := map[string]interface{}{
x.Name: x.Want,
"Quality": testopcquality[i],
"DataType": testopctypes[i],
}
expectedopcmetrics = append(expectedopcmetrics, metric.New("testing", tags, fields, now))
}
tagsRemaining := make([]string, 0, len(testopctags))
for i, tag := range testopctags {
if tag.Want != nil {
tagsRemaining = append(tagsRemaining, testopctags[i].Name)
}
}
subscribeConfig := SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
SecurityPolicy: "None",
SecurityMode: "None",
AuthMethod: "Anonymous",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
OptionalFields: []string{"DataType"},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
SubscriptionInterval: 0,
}
for _, tags := range testopctags {
subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, MapOPCTag(tags))
}
o, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
require.NoError(t, err)
// give initial setup a couple extra attempts, as on CircleCI this can be
// attempted to soon
require.Eventually(t, func() bool {
return o.SetupOptions() == nil
}, 5*time.Second, 10*time.Millisecond)
require.NoError(t, o.Connect(), "Connection failed")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
res, err := o.StartStreamValues(ctx)
require.NoError(t, err)
for {
select {
case m := <-res:
for fieldName, fieldValue := range m.Fields() {
for _, tag := range testopctags {
if fieldName != tag.Name {
continue
}
// nil-value tags should not be sent from server, error if one does
if tag.Want == nil {
t.Errorf("Tag: %s has value: %v", tag.Name, fieldValue)
return
}
newRemaining := make([]string, 0, len(tagsRemaining))
for _, remainingTag := range tagsRemaining {
if fieldName != remainingTag {
newRemaining = append(newRemaining, remainingTag)
break
}
}
if len(newRemaining) <= 0 {
return
}
// Test if the received metric matches one of the expected
testutil.RequireMetricsSubset(t, []telegraf.Metric{m}, expectedopcmetrics, testutil.IgnoreTime())
tagsRemaining = newRemaining
}
}
case <-ctx.Done():
msg := ""
for _, tag := range tagsRemaining {
msg += tag + ", "
}
t.Errorf("Tags %s are remaining without a received value", msg)
return
}
}
}
func TestSubscribeClientConfig(t *testing.T) {
toml := `
[[inputs.opcua_listener]]
@ -164,6 +307,9 @@ auth_method = "Anonymous"
timestamp_format = "2006-01-02T15:04:05Z07:00"
username = ""
password = ""
optional_fields = ["DataType"]
nodes = [
{name="name", namespace="1", identifier_type="s", identifier="one"},
{name="name2", namespace="2", identifier_type="s", identifier="two"},
@ -247,6 +393,7 @@ additional_valid_status_codes = ["0xC0"]
},
}, o.SubscribeClientConfig.Groups)
require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.SubscribeClientConfig.Workarounds)
require.Equal(t, []string{"DataType"}, o.SubscribeClientConfig.OptionalFields)
}
func TestSubscribeClientConfigWithMonitoringParams(t *testing.T) {

View File

@ -52,6 +52,11 @@
# e.g.: json_timestamp_format = "2006-01-02T15:04:05Z07:00"
#timestamp_format = ""
#
## Include additional Fields in each metric
## Available options are:
## DataType -- OPC-UA Data Type (string)
# optional_fields = []
#
## Node ID configuration
## name - field name to use in the output
## namespace - OPC UA namespace of the node (integer value 0 thru 3)
@ -187,6 +192,7 @@
# deadband_type = "Absolute"
# deadband_value = 0.0
#
## Enable workarounds required by some devices to work correctly
# [inputs.opcua_listener.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid