fix(inputs.modbus): Do not fail if a single slave reports errors (#11785)
This commit is contained in:
parent
97b4db8438
commit
d637a665e8
|
|
@ -146,26 +146,24 @@ func (m *Modbus) Gather(acc telegraf.Accumulator) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
timestamp := time.Now()
|
|
||||||
for retry := 0; retry <= m.Retries; retry++ {
|
|
||||||
timestamp = time.Now()
|
|
||||||
if err := m.gatherFields(); err != nil {
|
|
||||||
if mberr, ok := err.(*mb.Error); ok && mberr.ExceptionCode == mb.ExceptionCodeServerDeviceBusy && retry < m.Retries {
|
|
||||||
m.Log.Infof("Device busy! Retrying %d more time(s)...", m.Retries-retry)
|
|
||||||
time.Sleep(time.Duration(m.RetriesWaitTime))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Show the disconnect error this way to not shadow the initial error
|
|
||||||
if discerr := m.disconnect(); discerr != nil {
|
|
||||||
m.Log.Errorf("Disconnecting failed: %v", discerr)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Reading was successful, leave the retry loop
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
for slaveID, requests := range m.requests {
|
for slaveID, requests := range m.requests {
|
||||||
|
m.Log.Debugf("Reading slave %d for %s...", slaveID, m.Controller)
|
||||||
|
if err := m.readSlaveData(slaveID, requests); err != nil {
|
||||||
|
acc.AddError(fmt.Errorf("slave %d: %w", slaveID, err))
|
||||||
|
mberr, ok := err.(*mb.Error)
|
||||||
|
if !ok || mberr.ExceptionCode != mb.ExceptionCodeServerDeviceBusy {
|
||||||
|
m.Log.Debugf("Reconnecting to %s...", m.Controller)
|
||||||
|
if err := m.disconnect(); err != nil {
|
||||||
|
return fmt.Errorf("disconnecting failed: %w", err)
|
||||||
|
}
|
||||||
|
if err := m.connect(); err != nil {
|
||||||
|
return fmt.Errorf("connecting failed: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
timestamp := time.Now()
|
||||||
|
|
||||||
tags := map[string]string{
|
tags := map[string]string{
|
||||||
"name": m.Name,
|
"name": m.Name,
|
||||||
"type": cCoils,
|
"type": cCoils,
|
||||||
|
|
@ -276,24 +274,40 @@ func (m *Modbus) disconnect() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Modbus) gatherFields() error {
|
func (m *Modbus) readSlaveData(slaveID byte, requests requestSet) error {
|
||||||
for slaveID, requests := range m.requests {
|
m.handler.SetSlave(slaveID)
|
||||||
m.handler.SetSlave(slaveID)
|
|
||||||
if err := m.gatherRequestsCoil(requests.coil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := m.gatherRequestsDiscrete(requests.discrete); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := m.gatherRequestsHolding(requests.holding); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := m.gatherRequestsInput(requests.input); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
for retry := 0; retry < m.Retries; retry++ {
|
||||||
|
err := m.gatherFields(requests)
|
||||||
|
if err == nil {
|
||||||
|
// Reading was successful
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exit in case a non-recoverable error occurred
|
||||||
|
mberr, ok := err.(*mb.Error)
|
||||||
|
if !ok || mberr.ExceptionCode != mb.ExceptionCodeServerDeviceBusy {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait some time and try again reading the slave.
|
||||||
|
m.Log.Infof("Device busy! Retrying %d more time(s)...", m.Retries-retry)
|
||||||
|
time.Sleep(time.Duration(m.RetriesWaitTime))
|
||||||
|
}
|
||||||
|
return m.gatherFields(requests)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Modbus) gatherFields(requests requestSet) error {
|
||||||
|
if err := m.gatherRequestsCoil(requests.coil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := m.gatherRequestsDiscrete(requests.discrete); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := m.gatherRequestsHolding(requests.holding); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return m.gatherRequestsInput(requests.input)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Modbus) gatherRequestsCoil(requests []request) error {
|
func (m *Modbus) gatherRequestsCoil(requests []request) error {
|
||||||
|
|
|
||||||
|
|
@ -1050,9 +1050,9 @@ func TestRetryFailExhausted(t *testing.T) {
|
||||||
require.NoError(t, modbus.Init())
|
require.NoError(t, modbus.Init())
|
||||||
require.NotEmpty(t, modbus.requests)
|
require.NotEmpty(t, modbus.requests)
|
||||||
|
|
||||||
err := modbus.Gather(&acc)
|
require.NoError(t, modbus.Gather(&acc))
|
||||||
require.Error(t, err)
|
require.Len(t, acc.Errors, 1)
|
||||||
require.Equal(t, "modbus: exception '6' (server device busy), function '129'", err.Error())
|
require.EqualError(t, acc.FirstError(), "slave 1: modbus: exception '6' (server device busy), function '129'")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetryFailIllegal(t *testing.T) {
|
func TestRetryFailIllegal(t *testing.T) {
|
||||||
|
|
@ -1072,7 +1072,8 @@ func TestRetryFailIllegal(t *testing.T) {
|
||||||
data[1] = byte(0)
|
data[1] = byte(0)
|
||||||
|
|
||||||
return data, &mbserver.IllegalFunction
|
return data, &mbserver.IllegalFunction
|
||||||
})
|
},
|
||||||
|
)
|
||||||
|
|
||||||
modbus := Modbus{
|
modbus := Modbus{
|
||||||
Name: "TestRetryFailExhausted",
|
Name: "TestRetryFailExhausted",
|
||||||
|
|
@ -1092,9 +1093,9 @@ func TestRetryFailIllegal(t *testing.T) {
|
||||||
require.NoError(t, modbus.Init())
|
require.NoError(t, modbus.Init())
|
||||||
require.NotEmpty(t, modbus.requests)
|
require.NotEmpty(t, modbus.requests)
|
||||||
|
|
||||||
err := modbus.Gather(&acc)
|
require.NoError(t, modbus.Gather(&acc))
|
||||||
require.Error(t, err)
|
require.Len(t, acc.Errors, 1)
|
||||||
require.Equal(t, "modbus: exception '1' (illegal function), function '129'", err.Error())
|
require.EqualError(t, acc.FirstError(), "slave 1: modbus: exception '1' (illegal function), function '129'")
|
||||||
require.Equal(t, counter, 1)
|
require.Equal(t, counter, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1866,7 +1867,8 @@ func TestRequestsStartingWithOmits(t *testing.T) {
|
||||||
Log: testutil.Logger{},
|
Log: testutil.Logger{},
|
||||||
}
|
}
|
||||||
modbus.Requests = []requestDefinition{
|
modbus.Requests = []requestDefinition{
|
||||||
{SlaveID: 1,
|
{
|
||||||
|
SlaveID: 1,
|
||||||
ByteOrder: "ABCD",
|
ByteOrder: "ABCD",
|
||||||
RegisterType: "holding",
|
RegisterType: "holding",
|
||||||
Fields: []requestFieldDefinition{
|
Fields: []requestFieldDefinition{
|
||||||
|
|
@ -1942,3 +1944,103 @@ func TestRequestsEmptyFields(t *testing.T) {
|
||||||
err := modbus.Init()
|
err := modbus.Init()
|
||||||
require.EqualError(t, err, `configuraton invalid: found request section without fields`)
|
require.EqualError(t, err, `configuraton invalid: found request section without fields`)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultipleSlavesOneFail(t *testing.T) {
|
||||||
|
telegraf.Debug = true
|
||||||
|
modbus := Modbus{
|
||||||
|
Name: "Test",
|
||||||
|
Controller: "tcp://localhost:1502",
|
||||||
|
Retries: 1,
|
||||||
|
ConfigurationType: "request",
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
modbus.Requests = []requestDefinition{
|
||||||
|
{
|
||||||
|
SlaveID: 1,
|
||||||
|
ByteOrder: "ABCD",
|
||||||
|
RegisterType: "holding",
|
||||||
|
Fields: []requestFieldDefinition{
|
||||||
|
{
|
||||||
|
Name: "holding-0",
|
||||||
|
Address: uint16(0),
|
||||||
|
InputType: "INT16",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
SlaveID: 2,
|
||||||
|
ByteOrder: "ABCD",
|
||||||
|
RegisterType: "holding",
|
||||||
|
Fields: []requestFieldDefinition{
|
||||||
|
{
|
||||||
|
Name: "holding-0",
|
||||||
|
Address: uint16(0),
|
||||||
|
InputType: "INT16",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
SlaveID: 3,
|
||||||
|
ByteOrder: "ABCD",
|
||||||
|
RegisterType: "holding",
|
||||||
|
Fields: []requestFieldDefinition{
|
||||||
|
{
|
||||||
|
Name: "holding-0",
|
||||||
|
Address: uint16(0),
|
||||||
|
InputType: "INT16",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
require.NoError(t, modbus.Init())
|
||||||
|
|
||||||
|
serv := mbserver.NewServer()
|
||||||
|
require.NoError(t, serv.ListenTCP("localhost:1502"))
|
||||||
|
defer serv.Close()
|
||||||
|
|
||||||
|
serv.RegisterFunctionHandler(3,
|
||||||
|
func(s *mbserver.Server, frame mbserver.Framer) ([]byte, *mbserver.Exception) {
|
||||||
|
tcpframe, ok := frame.(*mbserver.TCPFrame)
|
||||||
|
if !ok {
|
||||||
|
return nil, &mbserver.IllegalFunction
|
||||||
|
}
|
||||||
|
|
||||||
|
if tcpframe.Device == 2 {
|
||||||
|
// Simulate device 2 being unavailable
|
||||||
|
return []byte{}, &mbserver.GatewayTargetDeviceFailedtoRespond
|
||||||
|
}
|
||||||
|
return []byte{0x02, 0x00, 0x42}, &mbserver.Success
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
testutil.MustMetric(
|
||||||
|
"modbus",
|
||||||
|
map[string]string{
|
||||||
|
"type": cHoldingRegisters,
|
||||||
|
"slave_id": "1",
|
||||||
|
"name": modbus.Name,
|
||||||
|
},
|
||||||
|
map[string]interface{}{"holding-0": int16(0x42)},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
testutil.MustMetric(
|
||||||
|
"modbus",
|
||||||
|
map[string]string{
|
||||||
|
"type": cHoldingRegisters,
|
||||||
|
"slave_id": "3",
|
||||||
|
"name": modbus.Name,
|
||||||
|
},
|
||||||
|
map[string]interface{}{"holding-0": int16(0x42)},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, modbus.Gather(&acc))
|
||||||
|
acc.Wait(len(expected))
|
||||||
|
actual := acc.GetTelegrafMetrics()
|
||||||
|
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime(), testutil.SortMetrics())
|
||||||
|
require.Len(t, acc.Errors, 1)
|
||||||
|
require.EqualError(t, acc.FirstError(), "slave 2: modbus: exception '11' (gateway target device failed to respond), function '131'")
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue