From 41b7a3d4670ee73b8981b7d15c98a2a7334d4642 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 7 Dec 2023 12:28:24 +0100 Subject: [PATCH] fix(inputs.s7comm): Reconnect if query fails (#14394) --- plugins/inputs/s7comm/s7comm.go | 21 ++++--- plugins/inputs/s7comm/s7comm_test.go | 85 ++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 7 deletions(-) diff --git a/plugins/inputs/s7comm/s7comm.go b/plugins/inputs/s7comm/s7comm.go index d6795792d..6070aabf5 100644 --- a/plugins/inputs/s7comm/s7comm.go +++ b/plugins/inputs/s7comm/s7comm.go @@ -126,18 +126,20 @@ func (s *S7comm) Init() error { s.Server += ":102" } - // Create the requests - return s.createRequests() -} - -// Start initializes the connection to the remote endpoint -func (s *S7comm) Start(_ telegraf.Accumulator) error { // Create handler for the connection s.handler = gos7.NewTCPClientHandler(s.Server, s.Rack, s.Slot) s.handler.Timeout = time.Duration(s.Timeout) if s.DebugConnection { s.handler.Logger = log.New(os.Stderr, "D! [inputs.s7comm]", log.LstdFlags) } + + // Create the requests + return s.createRequests() +} + +// Start initializes the connection to the remote endpoint +func (s *S7comm) Start(_ telegraf.Accumulator) error { + s.Log.Debugf("Connecting to %q...", s.Server) if err := s.handler.Connect(); err != nil { return fmt.Errorf("connecting to %q failed: %w", s.Server, err) } @@ -149,6 +151,7 @@ func (s *S7comm) Start(_ telegraf.Accumulator) error { // Stop disconnects from the remote endpoint and cleans up func (s *S7comm) Stop() { if s.handler != nil { + s.Log.Debugf("Disconnecting from %q...", s.handler.Address) s.handler.Close() } } @@ -162,7 +165,11 @@ func (s *S7comm) Gather(acc telegraf.Accumulator) error { // Read the batch s.Log.Debugf("Reading batch %d...", i+1) if err := s.client.AGReadMulti(b.items, len(b.items)); err != nil { - return fmt.Errorf("reading batch %d failed: %w", i+1, err) + // Try to reconnect and skip this gather cycle to avoid hammering + // the network if the server is down or under load. + s.Log.Errorf("reading batch %d failed: %v; reconnecting...", i+1, err) + s.Stop() + return s.Start(acc) } // Dissect the received data into fields diff --git a/plugins/inputs/s7comm/s7comm_test.go b/plugins/inputs/s7comm/s7comm_test.go index f368f1fe4..3e6f12082 100644 --- a/plugins/inputs/s7comm/s7comm_test.go +++ b/plugins/inputs/s7comm/s7comm_test.go @@ -2,6 +2,10 @@ package s7comm import ( _ "embed" + "encoding/binary" + "io" + "net" + "sync/atomic" "testing" "github.com/influxdata/telegraf/testutil" @@ -699,3 +703,84 @@ func TestMetricCollisions(t *testing.T) { }) } } + +func TestConnectionLoss(t *testing.T) { + // Create fake S7 comm server that can accept connects + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer listener.Close() + + var connectionAttempts uint32 + go func() { + for { + conn, err := listener.Accept() + if err != nil { + return + } + + // Count the number of connection attempts + atomic.AddUint32(&connectionAttempts, 1) + + buf := make([]byte, 4096) + + // Wait for ISO connection telegram + if _, err := io.ReadAtLeast(conn, buf, 22); err != nil { + conn.Close() + return + } + + // Send fake response + response := make([]byte, 22) + response[5] = 0xD0 + binary.BigEndian.PutUint16(response[2:4], uint16(len(response))) + if _, err := conn.Write(response); err != nil { + conn.Close() + return + } + + // Wait for PDU negotiation telegram + if _, err := io.ReadAtLeast(conn, buf, 25); err != nil { + conn.Close() + return + } + + // Send fake response + response = make([]byte, 27) + binary.BigEndian.PutUint16(response[2:4], uint16(len(response))) + binary.BigEndian.PutUint16(response[25:27], uint16(480)) + if _, err := conn.Write(response); err != nil { + return + } + + // Always close after connection is established + conn.Close() + } + }() + plugin := &S7comm{ + Server: listener.Addr().String(), + Rack: 0, + Slot: 2, + DebugConnection: true, + Configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.W2", + }, + }, + }, + }, + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + require.NoError(t, plugin.Gather(&acc)) + require.NoError(t, plugin.Gather(&acc)) + plugin.Stop() + listener.Close() + + require.Equal(t, 3, int(atomic.LoadUint32(&connectionAttempts))) +}