feat(inputs.opcua_listener): Add retry options for connection failures

Co-authored-by: Anna Carrigan <acarrigan@terabase.energy>
This commit is contained in:
anna 2024-01-16 02:30:52 -08:00 committed by GitHub
parent 2a152df7fa
commit 0c4992042c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 138 additions and 4 deletions

View File

@ -28,7 +28,7 @@ Documentation is double commented, full sentences, and ends with a period.
# exchange_type = "topic"
```
Try to give every parameter a default value whenever possible. If an
Try to give every parameter a default value whenever possible. If a
parameter does not have a default or must frequently be changed then have it
uncommented.

View File

@ -48,6 +48,12 @@ to use them.
## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s"
#
## Behavior when we fail to connect to the endpoint on initialization. Valid options are:
## "error": throw an error and exits Telegraf
## "ignore": ignore this plugin if errors are encountered
# "retry": retry connecting at each interval
# connect_fail_behavior = "error"
#
## Maximum time allowed for a request over the established connection.
# request_timeout = "5s"
#

View File

@ -4,6 +4,7 @@ package opcua_listener
import (
"context"
_ "embed"
"fmt"
"time"
"github.com/influxdata/telegraf"
@ -27,15 +28,26 @@ func (*OpcUaListener) SampleConfig() string {
}
func (o *OpcUaListener) Init() (err error) {
switch o.ConnectFailBehavior {
case "":
o.ConnectFailBehavior = "error"
case "error", "ignore", "retry":
// Do nothing as these are valid
default:
return fmt.Errorf("unknown setting %q for 'connect_fail_behavior'", o.ConnectFailBehavior)
}
o.client, err = o.SubscribeClientConfig.CreateSubscribeClient(o.Log)
return err
}
func (o *OpcUaListener) Gather(_ telegraf.Accumulator) error {
return nil
func (o *OpcUaListener) Gather(acc telegraf.Accumulator) error {
if o.client.State() == opcua.Connected || o.SubscribeClientConfig.ConnectFailBehavior == "ignore" {
return nil
}
return o.connect(acc)
}
func (o *OpcUaListener) Start(acc telegraf.Accumulator) error {
func (o *OpcUaListener) connect(acc telegraf.Accumulator) error {
ctx := context.Background()
ch, err := o.client.StartStreamValues(ctx)
if err != nil {
@ -56,6 +68,10 @@ func (o *OpcUaListener) Start(acc telegraf.Accumulator) error {
return nil
}
func (o *OpcUaListener) Start(acc telegraf.Accumulator) error {
return o.connect(acc)
}
func (o *OpcUaListener) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
select {

View File

@ -37,6 +37,96 @@ func MapOPCTag(tags OPCTags) (out input.NodeSettings) {
return out
}
func TestInitPluginWithBadConnectFailBehaviorValue(t *testing.T) {
plugin := OpcUaListener{
SubscribeClientConfig: SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://notarealserver:4840",
SecurityPolicy: "None",
SecurityMode: "None",
ConnectTimeout: config.Duration(5 * time.Second),
RequestTimeout: config.Duration(10 * time.Second),
},
MetricName: "opcua",
Timestamp: input.TimestampSourceTelegraf,
RootNodes: make([]input.NodeSettings, 0),
},
ConnectFailBehavior: "notanoption",
SubscriptionInterval: config.Duration(100 * time.Millisecond),
},
Log: testutil.Logger{},
}
err := plugin.Init()
require.ErrorContains(t, err, "unknown setting \"notanoption\" for 'connect_fail_behavior'")
}
func TestStartPlugin(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
acc := &testutil.Accumulator{}
plugin := OpcUaListener{
SubscribeClientConfig: SubscribeClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: "opc.tcp://notarealserver:4840",
SecurityPolicy: "None",
SecurityMode: "None",
ConnectTimeout: config.Duration(5 * time.Second),
RequestTimeout: config.Duration(10 * time.Second),
},
MetricName: "opcua",
Timestamp: input.TimestampSourceTelegraf,
RootNodes: make([]input.NodeSettings, 0),
},
SubscriptionInterval: config.Duration(100 * time.Millisecond),
},
Log: testutil.Logger{},
}
testopctags := []OPCTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
}
for _, tags := range testopctags {
plugin.SubscribeClientConfig.RootNodes = append(plugin.SubscribeClientConfig.RootNodes, MapOPCTag(tags))
}
require.NoError(t, plugin.Init())
err := plugin.Start(acc)
require.ErrorContains(t, err, "could not resolve address")
plugin.SubscribeClientConfig.ConnectFailBehavior = "ignore"
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(acc))
require.Equal(t, opcua.Disconnected, plugin.client.OpcUAClient.State())
plugin.Stop()
container := testutil.Container{
Image: "open62541/open62541",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("TCP network layer listening on opc.tcp://"),
),
}
plugin.SubscribeClientConfig.ConnectFailBehavior = "retry"
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(acc))
require.Equal(t, opcua.Disconnected, plugin.client.OpcUAClient.State())
err = container.Start()
require.NoError(t, err, "failed to start container")
defer container.Terminate()
newEndpoint := fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort])
plugin.client.Config.Endpoint = newEndpoint
plugin.client.OpcUAClient.Config.Endpoint = newEndpoint
err = plugin.Gather(acc)
require.NoError(t, err)
require.Equal(t, opcua.Connected, plugin.client.OpcUAClient.State())
}
func TestSubscribeClientIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
@ -104,6 +194,7 @@ func TestSubscribeClientIntegration(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
res, err := o.StartStreamValues(ctx)
require.Equal(t, opcua.Connected, o.State())
require.NoError(t, err)
for {
@ -299,6 +390,7 @@ endpoint = "opc.tcp://localhost:4840"
connect_timeout = "10s"
request_timeout = "5s"
subscription_interval = "200ms"
connect_fail_behavior = "error"
security_policy = "auto"
security_mode = "auto"
certificate = "/etc/telegraf/cert.pem"
@ -347,6 +439,7 @@ additional_valid_status_codes = ["0xC0"]
require.Equal(t, config.Duration(10*time.Second), o.SubscribeClientConfig.ConnectTimeout)
require.Equal(t, config.Duration(5*time.Second), o.SubscribeClientConfig.RequestTimeout)
require.Equal(t, config.Duration(200*time.Millisecond), o.SubscribeClientConfig.SubscriptionInterval)
require.Equal(t, "error", o.SubscribeClientConfig.ConnectFailBehavior)
require.Equal(t, "auto", o.SubscribeClientConfig.SecurityPolicy)
require.Equal(t, "auto", o.SubscribeClientConfig.SecurityMode)
require.Equal(t, "/etc/telegraf/cert.pem", o.SubscribeClientConfig.Certificate)

View File

@ -9,6 +9,12 @@
## Maximum time allowed to establish a connect to the endpoint.
# connect_timeout = "10s"
#
## Behavior when we fail to connect to the endpoint on initialization. Valid options are:
## "error": throw an error and exits Telegraf
## "ignore": ignore this plugin if errors are encountered
# "retry": retry connecting at each interval
# connect_fail_behavior = "error"
#
## Maximum time allowed for a request over the established connection.
# request_timeout = "5s"
#

View File

@ -11,12 +11,14 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
opcuaclient "github.com/influxdata/telegraf/plugins/common/opcua"
"github.com/influxdata/telegraf/plugins/common/opcua/input"
)
type SubscribeClientConfig struct {
input.InputClientConfig
SubscriptionInterval config.Duration `toml:"subscription_interval"`
ConnectFailBehavior string `toml:"connect_fail_behavior"`
}
type SubscribeClient struct {
@ -133,6 +135,9 @@ func (o *SubscribeClient) Connect() error {
func (o *SubscribeClient) Stop(ctx context.Context) <-chan struct{} {
o.Log.Debugf("Stopping OPC subscription...")
if o.State() != opcuaclient.Connected {
return nil
}
if o.sub != nil {
if err := o.sub.Cancel(ctx); err != nil {
o.Log.Warn("Cancelling OPC UA subscription failed with error ", err)
@ -150,6 +155,14 @@ func (o *SubscribeClient) CurrentValues() ([]telegraf.Metric, error) {
func (o *SubscribeClient) StartStreamValues(ctx context.Context) (<-chan telegraf.Metric, error) {
err := o.Connect()
if err != nil {
switch o.Config.ConnectFailBehavior {
case "retry":
o.Log.Warnf("Failed to connect to OPC UA server %s. Will attempt to connect again at the next interval: %s", o.Config.Endpoint, err)
return nil, nil
case "ignore":
o.Log.Errorf("Failed to connect to OPC UA server %s. Will not retry: %s", o.Config.Endpoint, err)
return nil, nil
}
return nil, err
}