chore(deps): Bump github.com/gopcua/opcua from 0.4.0 to 0.5.3 (#14686)

Co-authored-by: Josh Powers <powersj@fastmail.com>
This commit is contained in:
dependabot[bot] 2024-02-09 14:12:03 -05:00 committed by GitHub
parent f5cf3ff4b5
commit cb81959e69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 37 additions and 29 deletions

2
go.mod
View File

@ -95,7 +95,7 @@ require (
github.com/google/gopacket v1.1.19 github.com/google/gopacket v1.1.19
github.com/google/licensecheck v0.3.1 github.com/google/licensecheck v0.3.1
github.com/google/uuid v1.5.0 github.com/google/uuid v1.5.0
github.com/gopcua/opcua v0.4.0 github.com/gopcua/opcua v0.5.3
github.com/gophercloud/gophercloud v1.7.0 github.com/gophercloud/gophercloud v1.7.0
github.com/gorilla/mux v1.8.1 github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1 github.com/gorilla/websocket v1.5.1

4
go.sum
View File

@ -1386,8 +1386,8 @@ github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56
github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU= github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU=
github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4= github.com/googleapis/go-type-adapters v1.0.0/go.mod h1:zHW75FOG2aur7gAO2B+MLby+cLsWGBF62rFAi7WjWO4=
github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g=
github.com/gopcua/opcua v0.4.0 h1:Pr0PMFViNOzvkcvmzP3yTKqtLFVL1OUgav3tDj+hpqQ= github.com/gopcua/opcua v0.5.3 h1:K5QQhjK9KQxQW8doHL/Cd8oljUeXWnJJsNgP7mOGIhw=
github.com/gopcua/opcua v0.4.0/go.mod h1:6BsaYGu33RhVRxnK+EqHWwSG+hYCSAMjyIjx3RGV1PQ= github.com/gopcua/opcua v0.5.3/go.mod h1:nrVl4/Rs3SDQRhNQ50EbAiI5JSpDrTG6Frx3s4HLnw4=
github.com/gophercloud/gophercloud v1.7.0 h1:fyJGKh0LBvIZKLvBWvQdIgkaV5yTM3Jh9EYUh+UNCAs= github.com/gophercloud/gophercloud v1.7.0 h1:fyJGKh0LBvIZKLvBWvQdIgkaV5yTM3Jh9EYUh+UNCAs=
github.com/gophercloud/gophercloud v1.7.0/go.mod h1:aAVqcocTSXh2vYFZ1JTvx4EQmfgzxRcNupUfxZbBNDM= github.com/gophercloud/gophercloud v1.7.0/go.mod h1:aAVqcocTSXh2vYFZ1JTvx4EQmfgzxRcNupUfxZbBNDM=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=

View File

@ -175,7 +175,7 @@ func (o *OpcUAClient) StatusCodeOK(code ua.StatusCode) bool {
} }
// Connect to an OPC UA device // Connect to an OPC UA device
func (o *OpcUAClient) Connect() error { func (o *OpcUAClient) Connect(ctx context.Context) error {
o.Log.Debug("Connecting OPC UA Client to server") o.Log.Debug("Connecting OPC UA Client to server")
u, err := url.Parse(o.Config.Endpoint) u, err := url.Parse(o.Config.Endpoint)
if err != nil { if err != nil {
@ -190,14 +190,17 @@ func (o *OpcUAClient) Connect() error {
if o.Client != nil { if o.Client != nil {
o.Log.Warnf("Closing connection to %q as already connected", u) o.Log.Warnf("Closing connection to %q as already connected", u)
if err := o.Client.Close(); err != nil { if err := o.Client.Close(ctx); err != nil {
// Only log the error but to not bail-out here as this prevents // Only log the error but to not bail-out here as this prevents
// reconnections for multiple parties (see e.g. #9523). // reconnections for multiple parties (see e.g. #9523).
o.Log.Errorf("Closing connection failed: %v", err) o.Log.Errorf("Closing connection failed: %v", err)
} }
} }
o.Client = opcua.NewClient(o.Config.Endpoint, o.opts...) o.Client, err = opcua.NewClient(o.Config.Endpoint, o.opts...)
if err != nil {
return fmt.Errorf("error in new client: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout)) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout))
defer cancel() defer cancel()
if err := o.Client.Connect(ctx); err != nil { if err := o.Client.Connect(ctx); err != nil {
@ -221,7 +224,7 @@ func (o *OpcUAClient) Disconnect(ctx context.Context) error {
switch u.Scheme { switch u.Scheme {
case "opc.tcp": case "opc.tcp":
// We can't do anything about failing to close a connection // We can't do anything about failing to close a connection
err := o.Client.CloseWithContext(ctx) err := o.Client.Close(ctx)
o.Client = nil o.Client = nil
return err return err
default: default:

View File

@ -832,7 +832,7 @@ func TestMetricForNode(t *testing.T) {
status: ua.StatusOK, status: ua.StatusOK,
expected: metric.New("testingmetric", expected: metric.New("testingmetric",
map[string]string{"t1": "v1", "id": "ns=3;s=hi"}, map[string]string{"t1": "v1", "id": "ns=3;s=hi"},
map[string]interface{}{"Quality": "OK (0x0)", "fn": 16}, map[string]interface{}{"Quality": "The operation succeeded. StatusGood (0x0)", "fn": 16},
time.Date(2022, 03, 17, 8, 55, 00, 00, &time.Location{})), time.Date(2022, 03, 17, 8, 55, 00, 00, &time.Location{})),
}, },
} }

View File

@ -185,12 +185,12 @@ func TestReadClientIntegrationAdditionalFields(t *testing.T) {
"DateTime", "DateTime",
} }
testopcquality := []string{ testopcquality := []string{
"OK (0x0)", "The operation succeeded. StatusGood (0x0)",
"OK (0x0)", "The operation succeeded. StatusGood (0x0)",
"OK (0x0)", "The operation succeeded. StatusGood (0x0)",
"User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)", "User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)",
"OK (0x0)", "The operation succeeded. StatusGood (0x0)",
"OK (0x0)", "The operation succeeded. StatusGood (0x0)",
} }
expectedopcmetrics := []telegraf.Metric{} expectedopcmetrics := []telegraf.Metric{}
for i, x := range testopctags { for i, x := range testopctags {

View File

@ -31,6 +31,7 @@ type ReadClient struct {
// internal values // internal values
req *ua.ReadRequest req *ua.ReadRequest
ctx context.Context
} }
func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, error) { func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient, error) {
@ -52,7 +53,9 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient,
} }
func (o *ReadClient) Connect() error { func (o *ReadClient) Connect() error {
if err := o.OpcUAClient.Connect(); err != nil { o.ctx = context.Background()
if err := o.OpcUAClient.Connect(o.ctx); err != nil {
return fmt.Errorf("connect failed: %w", err) return fmt.Errorf("connect failed: %w", err)
} }
@ -68,7 +71,7 @@ func (o *ReadClient) Connect() error {
readValueIds = append(readValueIds, &ua.ReadValueID{NodeID: nid}) readValueIds = append(readValueIds, &ua.ReadValueID{NodeID: nid})
} }
} else { } else {
regResp, err := o.Client.RegisterNodes(&ua.RegisterNodesRequest{ regResp, err := o.Client.RegisterNodes(o.ctx, &ua.RegisterNodesRequest{
NodesToRegister: o.NodeIDs, NodesToRegister: o.NodeIDs,
}) })
if err != nil { if err != nil {
@ -133,7 +136,7 @@ func (o *ReadClient) CurrentValues() ([]telegraf.Metric, error) {
} }
func (o *ReadClient) read() error { func (o *ReadClient) read() error {
resp, err := o.Client.Read(o.req) resp, err := o.Client.Read(o.ctx, o.req)
if err != nil { if err != nil {
o.ReadError.Incr(1) o.ReadError.Incr(1)
return fmt.Errorf("RegisterNodes Read failed: %w", err) return fmt.Errorf("RegisterNodes Read failed: %w", err)

View File

@ -274,12 +274,12 @@ func TestSubscribeClientIntegrationAdditionalFields(t *testing.T) {
"DateTime", "DateTime",
} }
testopcquality := []string{ testopcquality := []string{
"OK (0x0)", "The operation succeeded. StatusGood (0x0)",
"OK (0x0)", "The operation succeeded. StatusGood (0x0)",
"OK (0x0)", "The operation succeeded. StatusGood (0x0)",
"User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)", "User does not have permission to perform the requested operation. StatusBadUserAccessDenied (0x801F0000)",
"OK (0x0)", "The operation succeeded. StatusGood (0x0)",
"OK (0x0)", "The operation succeeded. StatusGood (0x0)",
} }
expectedopcmetrics := []telegraf.Metric{} expectedopcmetrics := []telegraf.Metric{}
for i, x := range testopctags { for i, x := range testopctags {

View File

@ -31,8 +31,8 @@ type SubscribeClient struct {
dataNotifications chan *opcua.PublishNotificationData dataNotifications chan *opcua.PublishNotificationData
metrics chan telegraf.Metric metrics chan telegraf.Metric
processingCtx context.Context ctx context.Context
processingCancel context.CancelFunc cancel context.CancelFunc
} }
func checkDataChangeFilterParameters(params *input.DataChangeFilter) error { func checkDataChangeFilterParameters(params *input.DataChangeFilter) error {
@ -91,6 +91,7 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
return nil, err return nil, err
} }
processingCtx, processingCancel := context.WithCancel(context.Background())
subClient := &SubscribeClient{ subClient := &SubscribeClient{
OpcUAInputClient: client, OpcUAInputClient: client,
Config: *sc, Config: *sc,
@ -100,6 +101,8 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
// the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval. // the same time. It could be made dependent on the number of nodes subscribed to and the subscription interval.
dataNotifications: make(chan *opcua.PublishNotificationData, 100), dataNotifications: make(chan *opcua.PublishNotificationData, 100),
metrics: make(chan telegraf.Metric, 100), metrics: make(chan telegraf.Metric, 100),
ctx: processingCtx,
cancel: processingCancel,
} }
log.Debugf("Creating monitored items") log.Debugf("Creating monitored items")
@ -116,13 +119,13 @@ func (sc *SubscribeClientConfig) CreateSubscribeClient(log telegraf.Logger) (*Su
} }
func (o *SubscribeClient) Connect() error { func (o *SubscribeClient) Connect() error {
err := o.OpcUAClient.Connect() err := o.OpcUAClient.Connect(o.ctx)
if err != nil { if err != nil {
return err return err
} }
o.Log.Debugf("Creating OPC UA subscription") o.Log.Debugf("Creating OPC UA subscription")
o.sub, err = o.Client.Subscribe(&opcua.SubscriptionParameters{ o.sub, err = o.Client.Subscribe(o.ctx, &opcua.SubscriptionParameters{
Interval: time.Duration(o.Config.SubscriptionInterval), Interval: time.Duration(o.Config.SubscriptionInterval),
}, o.dataNotifications) }, o.dataNotifications)
if err != nil { if err != nil {
@ -145,7 +148,7 @@ func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} {
} }
} }
closing := o.OpcUAInputClient.Stop(ctx) closing := o.OpcUAInputClient.Stop(ctx)
o.processingCancel() o.cancel()
return closing return closing
} }
@ -167,7 +170,7 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra
return nil, err return nil, err
} }
resp, err := o.sub.MonitorWithContext(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...) resp, err := o.sub.Monitor(ctx, ua.TimestampsToReturnBoth, o.monitoredItemsReqs...)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to start monitoring items: %w", err) return nil, fmt.Errorf("failed to start monitoring items: %w", err)
} }
@ -187,7 +190,6 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra
} }
} }
o.processingCtx, o.processingCancel = context.WithCancel(context.Background())
go o.processReceivedNotifications() go o.processReceivedNotifications()
return o.metrics, nil return o.metrics, nil
@ -196,7 +198,7 @@ func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegra
func (o *SubscribeClient) processReceivedNotifications() { func (o *SubscribeClient) processReceivedNotifications() {
for { for {
select { select {
case <-o.processingCtx.Done(): case <-o.ctx.Done():
o.Log.Debug("Processing received notifications stopped") o.Log.Debug("Processing received notifications stopped")
return return