fix(inputs.opcua): fix opcua and opcua-listener for servers using password-based auth (#12529)

This commit is contained in:
Viraj Sinha 2023-01-30 07:14:58 -08:00 committed by GitHub
parent dc9eb02301
commit e96a49ea83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 17 deletions

View File

@ -101,12 +101,8 @@ type OpcUAClient struct {
codes []ua.StatusCode
}
func (o *OpcUAClient) Init() error {
return o.setupOptions()
}
// / setupOptions read the endpoints from the specified server and setup all authentication
func (o *OpcUAClient) setupOptions() error {
func (o *OpcUAClient) SetupOptions() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(o.Config.ConnectTimeout))
defer cancel()
// Get a list of the endpoints for our target server
@ -169,6 +165,11 @@ func (o *OpcUAClient) Connect() error {
case "opc.tcp":
o.State = Connecting
err = o.SetupOptions()
if err != nil {
return err
}
if o.Client != nil {
o.Log.Warnf("Closing connection due to Connect called while already instantiated", u)
if err := o.Client.Close(); err != nil {

View File

@ -34,6 +34,7 @@ func (o *OpcUA) Init() (err error) {
// Gather defines what data the plugin will gather.
func (o *OpcUA) Gather(acc telegraf.Accumulator) error {
// Will (re)connect if the client is disconnected
metrics, err := o.client.CurrentValues()
if err != nil {
return err

View File

@ -50,7 +50,7 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) {
require.NoError(t, err, "failed to start container")
defer container.Terminate()
var testopctags = []OPCTags{
testopctags := []OPCTags{
{"ProductName", "1", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"},
@ -92,9 +92,6 @@ func TestGetDataBadNodeContainerIntegration(t *testing.T) {
logger := &testutil.CaptureLogger{}
readClient, err := readConfig.CreateReadClient(logger)
require.NoError(t, err)
err = readClient.Init()
require.NoError(t, err)
err = readClient.Connect()
require.NoError(t, err)
}
@ -116,7 +113,7 @@ func TestReadClientIntegration(t *testing.T) {
require.NoError(t, err, "failed to start container")
defer container.Terminate()
var testopctags = []OPCTags{
testopctags := []OPCTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"},
@ -153,8 +150,64 @@ func TestReadClientIntegration(t *testing.T) {
client, err := readConfig.CreateReadClient(testutil.Logger{})
require.NoError(t, err)
err = client.Init()
require.NoError(t, err, "Initialization")
err = client.Connect()
require.NoError(t, err, "Connect")
for i, v := range client.LastReceivedData {
require.Equal(t, testopctags[i].Want, v.Value)
}
}
func TestReadClientIntegrationWithPasswordAuth(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
container := testutil.Container{
Image: "open62541/open62541",
Entrypoint: []string{"/opt/open62541/build/bin/examples/access_control_server"},
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port(servicePort)),
wait.ForLog("TCP network layer listening on opc.tcp://"),
),
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer container.Terminate()
testopctags := []OPCTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"},
}
readConfig := ReadClientConfig{
InputClientConfig: input.InputClientConfig{
OpcUAClientConfig: opcua.OpcUAClientConfig{
Endpoint: fmt.Sprintf("opc.tcp://%s:%s", container.Address, container.Ports[servicePort]),
SecurityPolicy: "None",
SecurityMode: "None",
Username: "peter",
Password: "peter123",
AuthMethod: "UserName",
ConnectTimeout: config.Duration(10 * time.Second),
RequestTimeout: config.Duration(1 * time.Second),
Workarounds: opcua.OpcUAWorkarounds{},
},
MetricName: "testing",
RootNodes: make([]input.NodeSettings, 0),
Groups: make([]input.NodeGroupSettings, 0),
},
}
for _, tags := range testopctags {
readConfig.RootNodes = append(readConfig.RootNodes, MapOPCTag(tags))
}
client, err := readConfig.CreateReadClient(testutil.Logger{})
require.NoError(t, err)
err = client.Connect()
require.NoError(t, err, "Connect")

View File

@ -51,7 +51,7 @@ func TestSubscribeClientIntegration(t *testing.T) {
require.NoError(t, err, "failed to start container")
defer container.Terminate()
var testopctags = []OPCTags{
testopctags := []OPCTags{
{"ProductName", "0", "i", "2261", "open62541 OPC UA Server"},
{"ProductUri", "0", "i", "2262", "http://open62541.org"},
{"ManufacturerName", "0", "i", "2263", "open62541"},
@ -59,7 +59,7 @@ func TestSubscribeClientIntegration(t *testing.T) {
{"goodnode", "1", "s", "the.answer", int32(42)},
{"DateTime", "1", "i", "51037", "0001-01-01T00:00:00Z"},
}
var tagsRemaining = make([]string, 0, len(testopctags))
tagsRemaining := make([]string, 0, len(testopctags))
for i, tag := range testopctags {
if tag.Want != nil {
tagsRemaining = append(tagsRemaining, testopctags[i].Name)
@ -89,10 +89,10 @@ func TestSubscribeClientIntegration(t *testing.T) {
o, err := subscribeConfig.CreateSubscribeClient(testutil.Logger{})
require.NoError(t, err)
// give init a couple extra attempts, seconds as on CircleCI this can
// be attempted to soon
// give initial setup a couple extra attempts, as on CircleCI this can be
// attempted to soon
require.Eventually(t, func() bool {
return o.Init() == nil
return o.SetupOptions() == nil
}, 5*time.Second, 10*time.Millisecond)
err = o.Connect()