fix(inputs.opcua): Allow to retry reads on invalid sessions (#16026)

This commit is contained in:
Sven Rebhan 2024-10-17 10:03:57 +02:00 committed by GitHub
parent c0bea1beb8
commit 0abd184087
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 113 additions and 73 deletions

View File

@ -30,61 +30,64 @@ to use them.
[[inputs.opcua]] [[inputs.opcua]]
## Metric name ## Metric name
# name = "opcua" # name = "opcua"
#
## OPC UA Endpoint URL ## OPC UA Endpoint URL
# endpoint = "opc.tcp://localhost:4840" # endpoint = "opc.tcp://localhost:4840"
#
## Maximum time allowed to establish a connect to the endpoint. ## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s" # connect_timeout = "10s"
#
## Maximum time allowed for a request over the established connection. ## Maximum time allowed for a request over the established connection.
# request_timeout = "5s" # request_timeout = "5s"
# Maximum time that a session shall remain open without activity. ## Maximum time that a session shall remain open without activity.
# session_timeout = "20m" # session_timeout = "20m"
#
## Retry options for failing reads e.g. due to invalid sessions
## If the retry count is zero, the read will fail after the initial attempt.
# read_retry_timeout = "100ms"
# read_retry_count = 0
## Security policy, one of "None", "Basic128Rsa15", "Basic256", ## Security policy, one of "None", "Basic128Rsa15", "Basic256",
## "Basic256Sha256", or "auto" ## "Basic256Sha256", or "auto"
# security_policy = "auto" # security_policy = "auto"
#
## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto" ## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto"
# security_mode = "auto" # security_mode = "auto"
#
## Path to cert.pem. Required when security mode or policy isn't "None". ## Path to cert.pem. Required when security mode or policy isn't "None".
## If cert path is not supplied, self-signed cert and key will be generated. ## If cert path is not supplied, self-signed cert and key will be generated.
# certificate = "/etc/telegraf/cert.pem" # certificate = "/etc/telegraf/cert.pem"
#
## Path to private key.pem. Required when security mode or policy isn't "None". ## Path to private key.pem. Required when security mode or policy isn't "None".
## If key path is not supplied, self-signed cert and key will be generated. ## If key path is not supplied, self-signed cert and key will be generated.
# private_key = "/etc/telegraf/key.pem" # private_key = "/etc/telegraf/key.pem"
#
## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To ## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To
## authenticate using a specific ID, select 'Certificate' or 'UserName' ## authenticate using a specific ID, select 'Certificate' or 'UserName'
# auth_method = "Anonymous" # auth_method = "Anonymous"
#
## Username. Required for auth_method = "UserName" ## Username and password required for auth_method = "UserName"
# username = "" # username = ""
#
## Password. Required for auth_method = "UserName"
# password = "" # password = ""
#
## Option to select the metric timestamp to use. Valid options are: ## Option to select the metric timestamp to use. Valid options are:
## "gather" -- uses the time of receiving the data in telegraf ## "gather" -- uses the time of receiving the data in telegraf
## "server" -- uses the timestamp provided by the server ## "server" -- uses the timestamp provided by the server
## "source" -- uses the timestamp provided by the source ## "source" -- uses the timestamp provided by the source
# timestamp = "gather" # timestamp = "gather"
#
## Client trace messages ## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the OPCUA ## When set to true, and debug mode enabled in the agent settings, the OPCUA
## client's messages are included in telegraf logs. These messages are very ## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues. ## noisey, but essential for debugging issues.
# client_trace = false # client_trace = false
#
## Include additional Fields in each metric ## Include additional Fields in each metric
## Available options are: ## Available options are:
## DataType -- OPC-UA Data Type (string) ## DataType -- OPC-UA Data Type (string)
# optional_fields = [] # optional_fields = []
#
## Node ID configuration ## Node ID configuration
## name - field name to use in the output ## name - field name to use in the output
## namespace - OPC UA namespace of the node (integer value 0 thru 3) ## namespace - OPC UA namespace of the node (integer value 0 thru 3)
@ -93,12 +96,12 @@ to use them.
## default_tags - extra tags to be added to the output metric (optional) ## default_tags - extra tags to be added to the output metric (optional)
## ##
## Use either the inline notation or the bracketed notation, not both. ## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet) ## Inline notation (default_tags not supported yet)
# nodes = [ # nodes = [
# {name="", namespace="", identifier_type="", identifier=""}, # {name="", namespace="", identifier_type="", identifier=""},
# ] # ]
#
## Bracketed notation ## Bracketed notation
# [[inputs.opcua.nodes]] # [[inputs.opcua.nodes]]
# name = "node1" # name = "node1"
@ -112,7 +115,7 @@ to use them.
# namespace = "" # namespace = ""
# identifier_type = "" # identifier_type = ""
# identifier = "" # identifier = ""
#
## Node Group ## Node Group
## Sets defaults so they aren't required in every node. ## Sets defaults so they aren't required in every node.
## Default values can be set for: ## Default values can be set for:
@ -126,29 +129,29 @@ to use them.
## Group Metric name. Overrides the top level name. If unset, the ## Group Metric name. Overrides the top level name. If unset, the
## top level name is used. ## top level name is used.
# name = # name =
#
## Group default namespace. If a node in the group doesn't set its ## Group default namespace. If a node in the group doesn't set its
## namespace, this is used. ## namespace, this is used.
# namespace = # namespace =
#
## Group default identifier type. If a node in the group doesn't set its ## Group default identifier type. If a node in the group doesn't set its
## namespace, this is used. ## namespace, this is used.
# identifier_type = # identifier_type =
#
## Default tags that are applied to every node in this group. Can be ## Default tags that are applied to every node in this group. Can be
## overwritten in a node by setting a different value for the tag name. ## overwritten in a node by setting a different value for the tag name.
## example: default_tags = { tag1 = "value1" } ## example: default_tags = { tag1 = "value1" }
# default_tags = {} # default_tags = {}
#
## Node ID Configuration. Array of nodes with the same settings as above. ## Node ID Configuration. Array of nodes with the same settings as above.
## Use either the inline notation or the bracketed notation, not both. ## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet) ## Inline notation (default_tags not supported yet)
# nodes = [ # nodes = [
# {name="node1", namespace="", identifier_type="", identifier=""}, # {name="node1", namespace="", identifier_type="", identifier=""},
# {name="node2", namespace="", identifier_type="", identifier=""}, # {name="node2", namespace="", identifier_type="", identifier=""},
#] #]
#
## Bracketed notation ## Bracketed notation
# [[inputs.opcua.group.nodes]] # [[inputs.opcua.group.nodes]]
# name = "node1" # name = "node1"
@ -165,12 +168,12 @@ to use them.
## Enable workarounds required by some devices to work correctly ## Enable workarounds required by some devices to work correctly
# [inputs.opcua.workarounds] # [inputs.opcua.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid # ## Set additional valid status codes, StatusOK (0x0) is always considered valid
# additional_valid_status_codes = ["0xC0"] # # additional_valid_status_codes = ["0xC0"]
# [inputs.opcua.request_workarounds] # [inputs.opcua.request_workarounds]
## Use unregistered reads instead of registered reads # ## Use unregistered reads instead of registered reads
# use_unregistered_reads = false # # use_unregistered_reads = false
``` ```
## Node Configuration ## Node Configuration

View File

@ -2,11 +2,14 @@ package opcua
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time"
"github.com/gopcua/opcua/ua" "github.com/gopcua/opcua/ua"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/opcua" "github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input" "github.com/influxdata/telegraf/plugins/common/opcua/input"
"github.com/influxdata/telegraf/selfstat" "github.com/influxdata/telegraf/selfstat"
@ -17,6 +20,8 @@ type ReadClientWorkarounds struct {
} }
type ReadClientConfig struct { type ReadClientConfig struct {
ReadRetryTimeout config.Duration `toml:"read_retry_timeout"`
ReadRetries uint64 `toml:"read_retry_count"`
ReadClientWorkarounds ReadClientWorkarounds `toml:"request_workarounds"` ReadClientWorkarounds ReadClientWorkarounds `toml:"request_workarounds"`
input.InputClientConfig input.InputClientConfig
} }
@ -25,9 +30,11 @@ type ReadClientConfig struct {
type ReadClient struct { type ReadClient struct {
*input.OpcUAInputClient *input.OpcUAInputClient
ReadSuccess selfstat.Stat ReadRetryTimeout time.Duration
ReadError selfstat.Stat ReadRetries uint64
Workarounds ReadClientWorkarounds ReadSuccess selfstat.Stat
ReadError selfstat.Stat
Workarounds ReadClientWorkarounds
// internal values // internal values
reqIDs []*ua.ReadValueID reqIDs []*ua.ReadValueID
@ -44,8 +51,14 @@ func (rc *ReadClientConfig) CreateReadClient(log telegraf.Logger) (*ReadClient,
"endpoint": inputClient.Config.OpcUAClientConfig.Endpoint, "endpoint": inputClient.Config.OpcUAClientConfig.Endpoint,
} }
if rc.ReadRetryTimeout == 0 {
rc.ReadRetryTimeout = config.Duration(100 * time.Millisecond)
}
return &ReadClient{ return &ReadClient{
OpcUAInputClient: inputClient, OpcUAInputClient: inputClient,
ReadRetryTimeout: time.Duration(rc.ReadRetryTimeout),
ReadRetries: rc.ReadRetries,
ReadSuccess: selfstat.Register("opcua", "read_success", tags), ReadSuccess: selfstat.Register("opcua", "read_success", tags),
ReadError: selfstat.Register("opcua", "read_error", tags), ReadError: selfstat.Register("opcua", "read_error", tags),
Workarounds: rc.ReadClientWorkarounds, Workarounds: rc.ReadClientWorkarounds,
@ -136,14 +149,35 @@ func (o *ReadClient) read() error {
NodesToRead: o.reqIDs, NodesToRead: o.reqIDs,
} }
resp, err := o.Client.Read(o.ctx, req) var count uint64
if err != nil { for {
count++
// Try to update the values for all registered nodes
resp, err := o.Client.Read(o.ctx, req)
if err == nil {
// Success, update the node values and exit
o.ReadSuccess.Incr(1)
for i, d := range resp.Results {
o.UpdateNodeValue(i, d)
}
return nil
}
o.ReadError.Incr(1) o.ReadError.Incr(1)
return fmt.Errorf("reading registered nodes failed: %w", err)
switch {
case count > o.ReadRetries:
// We exceeded the number of retries and should exit
return fmt.Errorf("reading registered nodes failed after %d attempts: %w", count, err)
case errors.Is(err, ua.StatusBadSessionIDInvalid),
errors.Is(err, ua.StatusBadSessionNotActivated),
errors.Is(err, ua.StatusBadSecureChannelIDInvalid):
// Retry after the defined period as session and channels should be refreshed
o.Log.Debugf("reading failed with %v, retry %d / %d...", err, count, o.ReadRetries)
time.Sleep(o.ReadRetryTimeout)
default:
// Non-retryable error, there is nothing we can do
return fmt.Errorf("reading registered nodes failed: %w", err)
}
} }
o.ReadSuccess.Incr(1)
for i, d := range resp.Results {
o.UpdateNodeValue(i, d)
}
return nil
} }

View File

@ -2,61 +2,64 @@
[[inputs.opcua]] [[inputs.opcua]]
## Metric name ## Metric name
# name = "opcua" # name = "opcua"
#
## OPC UA Endpoint URL ## OPC UA Endpoint URL
# endpoint = "opc.tcp://localhost:4840" # endpoint = "opc.tcp://localhost:4840"
#
## Maximum time allowed to establish a connect to the endpoint. ## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s" # connect_timeout = "10s"
#
## Maximum time allowed for a request over the established connection. ## Maximum time allowed for a request over the established connection.
# request_timeout = "5s" # request_timeout = "5s"
# Maximum time that a session shall remain open without activity. ## Maximum time that a session shall remain open without activity.
# session_timeout = "20m" # session_timeout = "20m"
#
## Retry options for failing reads e.g. due to invalid sessions
## If the retry count is zero, the read will fail after the initial attempt.
# read_retry_timeout = "100ms"
# read_retry_count = 0
## Security policy, one of "None", "Basic128Rsa15", "Basic256", ## Security policy, one of "None", "Basic128Rsa15", "Basic256",
## "Basic256Sha256", or "auto" ## "Basic256Sha256", or "auto"
# security_policy = "auto" # security_policy = "auto"
#
## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto" ## Security mode, one of "None", "Sign", "SignAndEncrypt", or "auto"
# security_mode = "auto" # security_mode = "auto"
#
## Path to cert.pem. Required when security mode or policy isn't "None". ## Path to cert.pem. Required when security mode or policy isn't "None".
## If cert path is not supplied, self-signed cert and key will be generated. ## If cert path is not supplied, self-signed cert and key will be generated.
# certificate = "/etc/telegraf/cert.pem" # certificate = "/etc/telegraf/cert.pem"
#
## Path to private key.pem. Required when security mode or policy isn't "None". ## Path to private key.pem. Required when security mode or policy isn't "None".
## If key path is not supplied, self-signed cert and key will be generated. ## If key path is not supplied, self-signed cert and key will be generated.
# private_key = "/etc/telegraf/key.pem" # private_key = "/etc/telegraf/key.pem"
#
## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To ## Authentication Method, one of "Certificate", "UserName", or "Anonymous". To
## authenticate using a specific ID, select 'Certificate' or 'UserName' ## authenticate using a specific ID, select 'Certificate' or 'UserName'
# auth_method = "Anonymous" # auth_method = "Anonymous"
#
## Username. Required for auth_method = "UserName" ## Username and password required for auth_method = "UserName"
# username = "" # username = ""
#
## Password. Required for auth_method = "UserName"
# password = "" # password = ""
#
## Option to select the metric timestamp to use. Valid options are: ## Option to select the metric timestamp to use. Valid options are:
## "gather" -- uses the time of receiving the data in telegraf ## "gather" -- uses the time of receiving the data in telegraf
## "server" -- uses the timestamp provided by the server ## "server" -- uses the timestamp provided by the server
## "source" -- uses the timestamp provided by the source ## "source" -- uses the timestamp provided by the source
# timestamp = "gather" # timestamp = "gather"
#
## Client trace messages ## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the OPCUA ## When set to true, and debug mode enabled in the agent settings, the OPCUA
## client's messages are included in telegraf logs. These messages are very ## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues. ## noisey, but essential for debugging issues.
# client_trace = false # client_trace = false
#
## Include additional Fields in each metric ## Include additional Fields in each metric
## Available options are: ## Available options are:
## DataType -- OPC-UA Data Type (string) ## DataType -- OPC-UA Data Type (string)
# optional_fields = [] # optional_fields = []
#
## Node ID configuration ## Node ID configuration
## name - field name to use in the output ## name - field name to use in the output
## namespace - OPC UA namespace of the node (integer value 0 thru 3) ## namespace - OPC UA namespace of the node (integer value 0 thru 3)
@ -65,12 +68,12 @@
## default_tags - extra tags to be added to the output metric (optional) ## default_tags - extra tags to be added to the output metric (optional)
## ##
## Use either the inline notation or the bracketed notation, not both. ## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet) ## Inline notation (default_tags not supported yet)
# nodes = [ # nodes = [
# {name="", namespace="", identifier_type="", identifier=""}, # {name="", namespace="", identifier_type="", identifier=""},
# ] # ]
#
## Bracketed notation ## Bracketed notation
# [[inputs.opcua.nodes]] # [[inputs.opcua.nodes]]
# name = "node1" # name = "node1"
@ -84,7 +87,7 @@
# namespace = "" # namespace = ""
# identifier_type = "" # identifier_type = ""
# identifier = "" # identifier = ""
#
## Node Group ## Node Group
## Sets defaults so they aren't required in every node. ## Sets defaults so they aren't required in every node.
## Default values can be set for: ## Default values can be set for:
@ -98,29 +101,29 @@
## Group Metric name. Overrides the top level name. If unset, the ## Group Metric name. Overrides the top level name. If unset, the
## top level name is used. ## top level name is used.
# name = # name =
#
## Group default namespace. If a node in the group doesn't set its ## Group default namespace. If a node in the group doesn't set its
## namespace, this is used. ## namespace, this is used.
# namespace = # namespace =
#
## Group default identifier type. If a node in the group doesn't set its ## Group default identifier type. If a node in the group doesn't set its
## namespace, this is used. ## namespace, this is used.
# identifier_type = # identifier_type =
#
## Default tags that are applied to every node in this group. Can be ## Default tags that are applied to every node in this group. Can be
## overwritten in a node by setting a different value for the tag name. ## overwritten in a node by setting a different value for the tag name.
## example: default_tags = { tag1 = "value1" } ## example: default_tags = { tag1 = "value1" }
# default_tags = {} # default_tags = {}
#
## Node ID Configuration. Array of nodes with the same settings as above. ## Node ID Configuration. Array of nodes with the same settings as above.
## Use either the inline notation or the bracketed notation, not both. ## Use either the inline notation or the bracketed notation, not both.
#
## Inline notation (default_tags not supported yet) ## Inline notation (default_tags not supported yet)
# nodes = [ # nodes = [
# {name="node1", namespace="", identifier_type="", identifier=""}, # {name="node1", namespace="", identifier_type="", identifier=""},
# {name="node2", namespace="", identifier_type="", identifier=""}, # {name="node2", namespace="", identifier_type="", identifier=""},
#] #]
#
## Bracketed notation ## Bracketed notation
# [[inputs.opcua.group.nodes]] # [[inputs.opcua.group.nodes]]
# name = "node1" # name = "node1"
@ -137,9 +140,9 @@
## Enable workarounds required by some devices to work correctly ## Enable workarounds required by some devices to work correctly
# [inputs.opcua.workarounds] # [inputs.opcua.workarounds]
## Set additional valid status codes, StatusOK (0x0) is always considered valid # ## Set additional valid status codes, StatusOK (0x0) is always considered valid
# additional_valid_status_codes = ["0xC0"] # # additional_valid_status_codes = ["0xC0"]
# [inputs.opcua.request_workarounds] # [inputs.opcua.request_workarounds]
## Use unregistered reads instead of registered reads # ## Use unregistered reads instead of registered reads
# use_unregistered_reads = false # # use_unregistered_reads = false