From fbd2dcc9d9081e08bf07f52b95d5b4d5e0a03a14 Mon Sep 17 00:00:00 2001 From: Christian Allinson Date: Thu, 7 Dec 2023 15:02:25 -0500 Subject: [PATCH] feat(common.opcua): Add option to include OPC-UA DataType as a Field (#14345) --- plugins/common/opcua/client.go | 13 +- plugins/common/opcua/input/input_client.go | 3 + plugins/inputs/opcua/README.md | 14 +- plugins/inputs/opcua/opcua_test.go | 95 +++++++++++ plugins/inputs/opcua/sample.conf | 5 + plugins/inputs/opcua_listener/README.md | 15 +- .../opcua_listener/opcua_listener_test.go | 147 ++++++++++++++++++ plugins/inputs/opcua_listener/sample.conf | 6 + 8 files changed, 295 insertions(+), 3 deletions(-) diff --git a/plugins/common/opcua/client.go b/plugins/common/opcua/client.go index 03229c303..de8c265a2 100644 --- a/plugins/common/opcua/client.go +++ b/plugins/common/opcua/client.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/choice" ) type OpcUAWorkarounds struct { @@ -44,13 +45,23 @@ type OpcUAClientConfig struct { ConnectTimeout config.Duration `toml:"connect_timeout"` RequestTimeout config.Duration `toml:"request_timeout"` - Workarounds OpcUAWorkarounds `toml:"workarounds"` + OptionalFields []string `toml:"optional_fields"` + Workarounds OpcUAWorkarounds `toml:"workarounds"` } func (o *OpcUAClientConfig) Validate() error { + if err := o.validateOptionalFields(); err != nil { + return fmt.Errorf("invalid 'optional_fields': %w", err) + } + return o.validateEndpoint() } +func (o *OpcUAClientConfig) validateOptionalFields() error { + validFields := []string{"DataType"} + return choice.CheckSlice(o.OptionalFields, validFields) +} + func (o *OpcUAClientConfig) validateEndpoint() error { if o.Endpoint == "" { return fmt.Errorf("endpoint url is empty") diff --git a/plugins/common/opcua/input/input_client.go b/plugins/common/opcua/input/input_client.go index 5ee49d7bd..f14771081 100644 --- a/plugins/common/opcua/input/input_client.go +++ b/plugins/common/opcua/input/input_client.go @@ -421,6 +421,9 @@ func (o *OpcUAInputClient) MetricForNode(nodeIdx int) telegraf.Metric { fields[nmm.Tag.FieldName] = o.LastReceivedData[nodeIdx].Value fields["Quality"] = strings.TrimSpace(o.LastReceivedData[nodeIdx].Quality.Error()) + if choice.Contains("DataType", o.Config.OptionalFields) { + fields["DataType"] = strings.Replace(o.LastReceivedData[nodeIdx].DataType.String(), "TypeID", "", 1) + } if !o.StatusCodeOK(o.LastReceivedData[nodeIdx].Quality) { mp := newMP(nmm) o.Log.Debugf("status not OK for node %q(metric name %q, tags %q)", diff --git a/plugins/inputs/opcua/README.md b/plugins/inputs/opcua/README.md index 137cdebce..099e6ea5c 100644 --- a/plugins/inputs/opcua/README.md +++ b/plugins/inputs/opcua/README.md @@ -71,6 +71,11 @@ to use them. ## "source" -- uses the timestamp provided by the source # timestamp = "gather" # + ## Include additional Fields in each metric + ## Available options are: + ## DataType -- OPC-UA Data Type (string) + # optional_fields = [] + # ## Node ID configuration ## name - field name to use in the output ## namespace - OPC UA namespace of the node (integer value 0 thru 3) @@ -177,7 +182,14 @@ To gather data from this node enter the following line into the 'nodes' property This node configuration produces a metric like this: ```text -opcua,id=ns\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000 +opcua,id=ns\=3;s\=Temperature temp=79.0,Quality="OK (0x0)" 1597820490000000000 +``` + +With 'DataType' entered in Additional Metrics, this node configuration +produces a metric like this: + +```text +opcua,id=ns\=3;s\=Temperature temp=79.0,Quality="OK (0x0)",DataType="Float" 1597820490000000000 ``` ## Group Configuration diff --git a/plugins/inputs/opcua/opcua_test.go b/plugins/inputs/opcua/opcua_test.go index 631e760ca..afa5d5ec4 100644 --- a/plugins/inputs/opcua/opcua_test.go +++ b/plugins/inputs/opcua/opcua_test.go @@ -9,7 +9,9 @@ import ( "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/common/opcua" "github.com/influxdata/telegraf/plugins/common/opcua/input" "github.com/influxdata/telegraf/testutil" @@ -150,6 +152,95 @@ func TestReadClientIntegration(t *testing.T) { } } +func TestReadClientIntegrationAdditionalFields(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := testutil.Container{ + Image: "open62541/open62541", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("TCP network layer listening on opc.tcp://"), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + testopctags := []OPCTags{ + {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, + {"ProductUri", "0", "i", "2262", "http://open62541.org"}, + {"ManufacturerName", "0", "i", "2263", "open62541"}, + {"badnode", "1", "i", "1337", nil}, + {"goodnode", "1", "s", "the.answer", int32(42)}, + {"DateTime", "1", "i", "51037", "0001-01-01T00:00:00Z"}, + } + testopctypes := []string{ + "String", + "String", + "String", + "Null", + "Int32", + "DateTime", + } + testopcquality := []string{ + "OK (0x0)", + "OK (0x0)", + "OK (0x0)", + "User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)", + "OK (0x0)", + "OK (0x0)", + } + expectedopcmetrics := []telegraf.Metric{} + for i, x := range testopctags { + now := time.Now() + tags := map[string]string{ + "id": fmt.Sprintf("ns=%s;%s=%s", x.Namespace, x.IdentifierType, x.Identifier), + } + fields := map[string]interface{}{ + x.Name: x.Want, + "Quality": testopcquality[i], + "DataType": testopctypes[i], + } + expectedopcmetrics = append(expectedopcmetrics, metric.New("testing", tags, fields, now)) + } + + readConfig := ReadClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + OptionalFields: []string{"DataType"}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + } + + for _, tags := range testopctags { + readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags)) + } + + client, err := readConfig.CreateReadClient(testutil.Logger{}) + require.NoError(t, err) + + require.NoError(t, client.Connect()) + + actualopcmetrics := []telegraf.Metric{} + + for i := range client.LastReceivedData { + actualopcmetrics = append(actualopcmetrics, client.MetricForNode(i)) + } + testutil.RequireMetricsEqual(t, expectedopcmetrics, actualopcmetrics, testutil.IgnoreTime()) +} + func TestReadClientIntegrationWithPasswordAuth(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") @@ -223,6 +314,8 @@ auth_method = "Anonymous" username = "" password = "" +optional_fields = ["DataType"] + [[inputs.opcua.nodes]] name = "name" namespace = "1" @@ -265,6 +358,7 @@ additional_valid_status_codes = ["0xC0"] [inputs.opcua.request_workarounds] use_unregistered_reads = true + ` c := config.NewConfig() @@ -334,6 +428,7 @@ use_unregistered_reads = true }, o.ReadClientConfig.Groups) require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.ReadClientConfig.Workarounds) require.Equal(t, ReadClientWorkarounds{UseUnregisteredReads: true}, o.ReadClientConfig.ReadClientWorkarounds) + require.Equal(t, []string{"DataType"}, o.ReadClientConfig.OptionalFields) err = o.Init() require.NoError(t, err) require.Len(t, o.client.NodeMetricMapping, 5, "incorrect number of nodes") diff --git a/plugins/inputs/opcua/sample.conf b/plugins/inputs/opcua/sample.conf index 3822bb66d..a2b975acd 100644 --- a/plugins/inputs/opcua/sample.conf +++ b/plugins/inputs/opcua/sample.conf @@ -43,6 +43,11 @@ ## "source" -- uses the timestamp provided by the source # timestamp = "gather" # + ## Include additional Fields in each metric + ## Available options are: + ## DataType -- OPC-UA Data Type (string) + # optional_fields = [] + # ## Node ID configuration ## name - field name to use in the output ## namespace - OPC UA namespace of the node (integer value 0 thru 3) diff --git a/plugins/inputs/opcua_listener/README.md b/plugins/inputs/opcua_listener/README.md index 54be90daf..c8c266bf8 100644 --- a/plugins/inputs/opcua_listener/README.md +++ b/plugins/inputs/opcua_listener/README.md @@ -91,6 +91,11 @@ to use them. # e.g.: json_timestamp_format = "2006-01-02T15:04:05Z07:00" #timestamp_format = "" # + ## Include additional Fields in each metric + ## Available options are: + ## DataType -- OPC-UA Data Type (string) + # optional_fields = [] + # ## Node ID configuration ## name - field name to use in the output ## namespace - OPC UA namespace of the node (integer value 0 thru 3) @@ -226,6 +231,7 @@ to use them. # deadband_type = "Absolute" # deadband_value = 0.0 # + ## Enable workarounds required by some devices to work correctly # [inputs.opcua_listener.workarounds] ## Set additional valid status codes, StatusOK (0x0) is always considered valid @@ -252,7 +258,14 @@ To gather data from this node enter the following line into the 'nodes' property This node configuration produces a metric like this: ```text -opcua,id=ns\=3;s\=Temperature temp=79.0,quality="OK (0x0)" 1597820490000000000 +opcua,id=ns\=3;s\=Temperature temp=79.0,Quality="OK (0x0)" 1597820490000000000 +``` + +With 'DataType' entered in Additional Metrics, this node configuration +produces a metric like this: + +```text +opcua,id=ns\=3;s\=Temperature temp=79.0,Quality="OK (0x0)",DataType="Float" 1597820490000000000 ``` ## Group Configuration diff --git a/plugins/inputs/opcua_listener/opcua_listener_test.go b/plugins/inputs/opcua_listener/opcua_listener_test.go index 06807f5e8..8846653f7 100644 --- a/plugins/inputs/opcua_listener/opcua_listener_test.go +++ b/plugins/inputs/opcua_listener/opcua_listener_test.go @@ -11,7 +11,9 @@ import ( "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/plugins/common/opcua" "github.com/influxdata/telegraf/plugins/common/opcua/input" "github.com/influxdata/telegraf/testutil" @@ -148,6 +150,147 @@ func TestSubscribeClientIntegration(t *testing.T) { } } +func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + container := testutil.Container{ + Image: "open62541/open62541", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("TCP network layer listening on opc.tcp://"), + ), + } + require.NoError(t, container.Start(), "failed to start container") + defer container.Terminate() + + testopctags := []OPCTags{ + {"ProductName", "0", "i", "2261", "open62541 OPC UA Server"}, + {"ProductUri", "0", "i", "2262", "http://open62541.org"}, + {"ManufacturerName", "0", "i", "2263", "open62541"}, + {"badnode", "1", "i", "1337", nil}, + {"goodnode", "1", "s", "the.answer", int32(42)}, + {"DateTime", "1", "i", "51037", "0001-01-01T00:00:00Z"}, + } + testopctypes := []string{ + "String", + "String", + "String", + "Null", + "Int32", + "DateTime", + } + testopcquality := []string{ + "OK (0x0)", + "OK (0x0)", + "OK (0x0)", + "User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)", + "OK (0x0)", + "OK (0x0)", + } + expectedopcmetrics := []telegraf.Metric{} + for i, x := range testopctags { + now := time.Now() + tags := map[string]string{ + "id": fmt.Sprintf("ns=%s;%s=%s", x.Namespace, x.IdentifierType, x.Identifier), + } + fields := map[string]interface{}{ + x.Name: x.Want, + "Quality": testopcquality[i], + "DataType": testopctypes[i], + } + expectedopcmetrics = append(expectedopcmetrics, metric.New("testing", tags, fields, now)) + } + + tagsRemaining := make([]string, 0, len(testopctags)) + for i, tag := range testopctags { + if tag.Want != nil { + tagsRemaining = append(tagsRemaining, testopctags[i].Name) + } + } + + subscribeConfig := SubscribeClientConfig{ + InputClientConfig: input.InputClientConfig{ + OpcUAClientConfig: opcua.OpcUAClientConfig{ + Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]), + SecurityPolicy: "None", + SecurityMode: "None", + AuthMethod: "Anonymous", + ConnectTimeout: config.Duration(10 * time.Second), + RequestTimeout: config.Duration(1 * time.Second), + Workarounds: opcua.OpcUAWorkarounds{}, + OptionalFields: []string{"DataType"}, + }, + MetricName: "testing", + RootNodes: make([]input.NodeSettings, 0), + Groups: make([]input.NodeGroupSettings, 0), + }, + SubscriptionInterval: 0, + } + for _, tags := range testopctags { + subscribeConfig.RootNodes = append(subscribeConfig.RootNodes, MapOPCTag(tags)) + } + o, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{}) + require.NoError(t, err) + + // give initial setup a couple extra attempts, as on CircleCI this can be + // attempted to soon + require.Eventually(t, func() bool { + return o.SetupOptions() == nil + }, 5*time.Second, 10*time.Millisecond) + + require.NoError(t, o.Connect(), "Connection failed") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + res, err := o.StartStreamValues(ctx) + require.NoError(t, err) + + for { + select { + case m := <-res: + for fieldName, fieldValue := range m.Fields() { + for _, tag := range testopctags { + if fieldName != tag.Name { + continue + } + // nil-value tags should not be sent from server, error if one does + if tag.Want == nil { + t.Errorf("Tag: %s has value: %v", tag.Name, fieldValue) + return + } + + newRemaining := make([]string, 0, len(tagsRemaining)) + for _, remainingTag := range tagsRemaining { + if fieldName != remainingTag { + newRemaining = append(newRemaining, remainingTag) + break + } + } + + if len(newRemaining) <= 0 { + return + } + // Test if the received metric matches one of the expected + testutil.RequireMetricsSubset(t, []telegraf.Metric{m}, expectedopcmetrics, testutil.IgnoreTime()) + tagsRemaining = newRemaining + } + } + + case <-ctx.Done(): + msg := "" + for _, tag := range tagsRemaining { + msg += tag + ", " + } + + t.Errorf("Tags %s are remaining without a received value", msg) + return + } + } +} + func TestSubscribeClientConfig(t *testing.T) { toml := ` [[inputs.opcua_listener]] @@ -164,6 +307,9 @@ auth_method = "Anonymous" timestamp_format = "2006-01-02T15:04:05Z07:00" username = "" password = "" + +optional_fields = ["DataType"] + nodes = [ {name="name", namespace="1", identifier_type="s", identifier="one"}, {name="name2", namespace="2", identifier_type="s", identifier="two"}, @@ -247,6 +393,7 @@ additional_valid_status_codes = ["0xC0"] }, }, o.SubscribeClientConfig.Groups) require.Equal(t, opcua.OpcUAWorkarounds{AdditionalValidStatusCodes: []string{"0xC0"}}, o.SubscribeClientConfig.Workarounds) + require.Equal(t, []string{"DataType"}, o.SubscribeClientConfig.OptionalFields) } func TestSubscribeClientConfigWithMonitoringParams(t *testing.T) { diff --git a/plugins/inputs/opcua_listener/sample.conf b/plugins/inputs/opcua_listener/sample.conf index 788502937..3e333020c 100644 --- a/plugins/inputs/opcua_listener/sample.conf +++ b/plugins/inputs/opcua_listener/sample.conf @@ -52,6 +52,11 @@ # e.g.: json_timestamp_format = "2006-01-02T15:04:05Z07:00" #timestamp_format = "" # + ## Include additional Fields in each metric + ## Available options are: + ## DataType -- OPC-UA Data Type (string) + # optional_fields = [] + # ## Node ID configuration ## name - field name to use in the output ## namespace - OPC UA namespace of the node (integer value 0 thru 3) @@ -187,6 +192,7 @@ # deadband_type = "Absolute" # deadband_value = 0.0 # + ## Enable workarounds required by some devices to work correctly # [inputs.opcua_listener.workarounds] ## Set additional valid status codes, StatusOK (0x0) is always considered valid