feat(inputs.modbus): add workaround to enforce reads from zero for coil registers (#12408)

This commit is contained in:
Sven Rebhan 2023-01-09 20:35:46 +01:00 committed by GitHub
parent 11228ee8ad
commit 150f0cd3a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 338 additions and 125 deletions

View File

@ -256,6 +256,11 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## This might be necessary for devices not conforming to the spec,
## see https://github.com/influxdata/telegraf/issues/12071.
# one_request_per_field = false
## Enforce the starting address to be zero for the first request on
## coil registers. This is necessary for some devices see
## https://github.com/influxdata/telegraf/issues/8905
# read_coils_starting_at_zero = false
```
## Notes

View File

@ -95,7 +95,13 @@ func (c *ConfigurationOriginal) initRequests(fieldDefs []fieldDefinition, maxQua
if err != nil {
return nil, err
}
return groupFieldsToRequests(fields, nil, maxQuantity, "none", 0), nil
params := groupingParams{
MaxBatchSize: maxQuantity,
Optimization: "none",
EnforceFromZero: c.workarounds.ReadCoilsStartingAtZero,
}
return groupFieldsToRequests(fields, params), nil
}
func (c *ConfigurationOriginal) initFields(fieldDefs []fieldDefinition) ([]field, error) {

View File

@ -178,39 +178,47 @@ func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) {
}
}
params := groupingParams{
MaxExtraRegisters: def.MaxExtraRegisters,
Optimization: def.Optimization,
Tags: def.Tags,
}
switch def.RegisterType {
case "coil":
maxQuantity := maxQuantityCoils
params.MaxBatchSize = maxQuantityCoils
if c.workarounds.OnRequestPerField {
maxQuantity = 1
params.MaxBatchSize = 1
}
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization, def.MaxExtraRegisters)
params.EnforceFromZero = c.workarounds.ReadCoilsStartingAtZero
requests := groupFieldsToRequests(fields, params)
set.coil = append(set.coil, requests...)
case "discrete":
maxQuantity := maxQuantityDiscreteInput
params.MaxBatchSize = maxQuantityDiscreteInput
if c.workarounds.OnRequestPerField {
maxQuantity = 1
params.MaxBatchSize = 1
}
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization, def.MaxExtraRegisters)
requests := groupFieldsToRequests(fields, params)
set.discrete = append(set.discrete, requests...)
case "holding":
maxQuantity := maxQuantityHoldingRegisters
params.MaxBatchSize = maxQuantityHoldingRegisters
if c.workarounds.OnRequestPerField {
maxQuantity = 1
params.MaxBatchSize = 1
}
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization, def.MaxExtraRegisters)
requests := groupFieldsToRequests(fields, params)
set.holding = append(set.holding, requests...)
case "input":
maxQuantity := maxQuantityInputRegisters
params.MaxBatchSize = maxQuantityInputRegisters
if c.workarounds.OnRequestPerField {
maxQuantity = 1
params.MaxBatchSize = 1
}
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization, def.MaxExtraRegisters)
requests := groupFieldsToRequests(fields, params)
set.input = append(set.input, requests...)
default:
return nil, fmt.Errorf("unknown register type %q", def.RegisterType)
}
result[def.SlaveID] = set
if !set.Empty() {
result[def.SlaveID] = set
}
}
return result, nil

View File

@ -25,10 +25,11 @@ var sampleConfigStart string
var sampleConfigEnd string
type ModbusWorkarounds struct {
AfterConnectPause config.Duration `toml:"pause_after_connect"`
PollPause config.Duration `toml:"pause_between_requests"`
CloseAfterGather bool `toml:"close_connection_after_gather"`
OnRequestPerField bool `toml:"one_request_per_field"`
AfterConnectPause config.Duration `toml:"pause_after_connect"`
PollPause config.Duration `toml:"pause_between_requests"`
CloseAfterGather bool `toml:"close_connection_after_gather"`
OnRequestPerField bool `toml:"one_request_per_field"`
ReadCoilsStartingAtZero bool `toml:"read_coils_starting_at_zero"`
}
// Modbus holds all data relevant to the plugin
@ -68,6 +69,14 @@ type requestSet struct {
input []request
}
func (r requestSet) Empty() bool {
l := len(r.coil)
l += len(r.discrete)
l += len(r.holding)
l += len(r.input)
return l == 0
}
type field struct {
measurement string
name string

View File

@ -3468,6 +3468,91 @@ func TestRequestsStartingWithOmits(t *testing.T) {
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
func TestRequestsWithOmittedFieldsOnly(t *testing.T) {
modbus := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: testutil.Logger{},
}
modbus.Requests = []requestDefinition{
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-0",
Address: uint16(0),
InputType: "INT16",
Omit: true,
},
{
Name: "holding-1",
Address: uint16(1),
InputType: "UINT16",
Omit: true,
},
{
Name: "holding-2",
Address: uint16(2),
InputType: "INT16",
Omit: true,
},
},
},
}
require.NoError(t, modbus.Init())
require.Empty(t, modbus.requests)
}
func TestRequestsGroupWithOmittedFieldsOnly(t *testing.T) {
modbus := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: testutil.Logger{},
}
modbus.Requests = []requestDefinition{
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-0",
Address: uint16(0),
InputType: "INT16",
Omit: true,
},
{
Name: "holding-1",
Address: uint16(1),
InputType: "UINT16",
Omit: true,
},
{
Name: "holding-2",
Address: uint16(2),
InputType: "INT16",
Omit: true,
},
{
Name: "holding-8",
Address: uint16(8),
InputType: "INT16",
},
},
},
}
require.NoError(t, modbus.Init())
require.Len(t, modbus.requests, 1)
require.NotNil(t, modbus.requests[1])
require.Len(t, modbus.requests[1].holding, 1)
require.Equal(t, uint16(8), modbus.requests[1].holding[0].address)
require.Equal(t, uint16(1), modbus.requests[1].holding[0].length)
}
func TestRequestsEmptyFields(t *testing.T) {
modbus := Modbus{
Name: "Test",
@ -4399,102 +4484,6 @@ func TestRequestOptimizationAggressive(t *testing.T) {
}
}
func TestRequestsWorkaroundsOneRequestPerField(t *testing.T) {
plugin := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: testutil.Logger{},
Workarounds: ModbusWorkarounds{OnRequestPerField: true},
}
plugin.Requests = []requestDefinition{
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-1",
Address: uint16(1),
InputType: "INT16",
},
{
Name: "holding-2",
Address: uint16(2),
InputType: "INT16",
},
{
Name: "holding-3",
Address: uint16(3),
InputType: "INT16",
},
{
Name: "holding-4",
Address: uint16(4),
InputType: "INT16",
},
{
Name: "holding-5",
Address: uint16(5),
InputType: "INT16",
},
},
},
}
require.NoError(t, plugin.Init())
require.Len(t, plugin.requests[1].holding, len(plugin.Requests[0].Fields))
}
func TestRegisterWorkaroundsOneRequestPerField(t *testing.T) {
plugin := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
ConfigurationType: "register",
Log: testutil.Logger{},
Workarounds: ModbusWorkarounds{OnRequestPerField: true},
}
plugin.SlaveID = 1
plugin.HoldingRegisters = []fieldDefinition{
{
ByteOrder: "AB",
DataType: "INT16",
Name: "holding-1",
Address: []uint16{1},
Scale: 1.0,
},
{
ByteOrder: "AB",
DataType: "INT16",
Name: "holding-2",
Address: []uint16{2},
Scale: 1.0,
},
{
ByteOrder: "AB",
DataType: "INT16",
Name: "holding-3",
Address: []uint16{3},
Scale: 1.0,
},
{
ByteOrder: "AB",
DataType: "INT16",
Name: "holding-4",
Address: []uint16{4},
Scale: 1.0,
},
{
ByteOrder: "AB",
DataType: "INT16",
Name: "holding-5",
Address: []uint16{5},
Scale: 1.0,
},
}
require.NoError(t, plugin.Init())
require.Len(t, plugin.requests[1].holding, len(plugin.HoldingRegisters))
}
func TestRequestOptimizationMaxInsertSmall(t *testing.T) {
maxsize := maxQuantityHoldingRegisters
maxExtraRegisters := uint16(5)
@ -4627,3 +4616,168 @@ func TestRequestOptimizationMaxInsertSmall(t *testing.T) {
})
}
}
func TestRequestsWorkaroundsOneRequestPerField(t *testing.T) {
plugin := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: testutil.Logger{},
Workarounds: ModbusWorkarounds{OnRequestPerField: true},
}
plugin.Requests = []requestDefinition{
{
SlaveID: 1,
ByteOrder: "ABCD",
RegisterType: "holding",
Fields: []requestFieldDefinition{
{
Name: "holding-1",
Address: uint16(1),
InputType: "INT16",
},
{
Name: "holding-2",
Address: uint16(2),
InputType: "INT16",
},
{
Name: "holding-3",
Address: uint16(3),
InputType: "INT16",
},
{
Name: "holding-4",
Address: uint16(4),
InputType: "INT16",
},
{
Name: "holding-5",
Address: uint16(5),
InputType: "INT16",
},
},
},
}
require.NoError(t, plugin.Init())
require.Len(t, plugin.requests[1].holding, len(plugin.Requests[0].Fields))
}
func TestRegisterWorkaroundsOneRequestPerField(t *testing.T) {
plugin := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
ConfigurationType: "register",
Log: testutil.Logger{},
Workarounds: ModbusWorkarounds{OnRequestPerField: true},
}
plugin.SlaveID = 1
plugin.HoldingRegisters = []fieldDefinition{
{
ByteOrder: "AB",
DataType: "INT16",
Name: "holding-1",
Address: []uint16{1},
Scale: 1.0,
},
{
ByteOrder: "AB",
DataType: "INT16",
Name: "holding-2",
Address: []uint16{2},
Scale: 1.0,
},
{
ByteOrder: "AB",
DataType: "INT16",
Name: "holding-3",
Address: []uint16{3},
Scale: 1.0,
},
{
ByteOrder: "AB",
DataType: "INT16",
Name: "holding-4",
Address: []uint16{4},
Scale: 1.0,
},
{
ByteOrder: "AB",
DataType: "INT16",
Name: "holding-5",
Address: []uint16{5},
Scale: 1.0,
},
}
require.NoError(t, plugin.Init())
require.Len(t, plugin.requests[1].holding, len(plugin.HoldingRegisters))
}
func TestRequestsWorkaroundsReadCoilsStartingAtZeroRequest(t *testing.T) {
plugin := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
ConfigurationType: "request",
Log: testutil.Logger{},
Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true},
}
plugin.SlaveID = 1
plugin.Requests = []requestDefinition{
{
SlaveID: 1,
RegisterType: "coil",
Fields: []requestFieldDefinition{
{
Name: "coil-8",
Address: uint16(8),
},
{
Name: "coil-new-group",
Address: maxQuantityCoils,
},
},
},
}
require.NoError(t, plugin.Init())
require.Len(t, plugin.requests[1].coil, 2)
// First group should now start at zero and have the cumulated length
require.Equal(t, uint16(0), plugin.requests[1].coil[0].address)
require.Equal(t, uint16(9), plugin.requests[1].coil[0].length)
// The second field should form a new group as the previous request
// is now too large (beyond max-coils-per-read) after zero enforcement.
require.Equal(t, maxQuantityCoils, plugin.requests[1].coil[1].address)
require.Equal(t, uint16(1), plugin.requests[1].coil[1].length)
}
func TestRequestsWorkaroundsReadCoilsStartingAtZeroRegister(t *testing.T) {
plugin := Modbus{
Name: "Test",
Controller: "tcp://localhost:1502",
ConfigurationType: "register",
Log: testutil.Logger{},
Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true},
}
plugin.SlaveID = 1
plugin.Coils = []fieldDefinition{
{
Name: "coil-8",
Address: []uint16{8},
},
{
Name: "coil-new-group",
Address: []uint16{maxQuantityCoils},
},
}
require.NoError(t, plugin.Init())
require.Len(t, plugin.requests[1].coil, 2)
// First group should now start at zero and have the cumulated length
require.Equal(t, uint16(0), plugin.requests[1].coil[0].address)
require.Equal(t, uint16(9), plugin.requests[1].coil[0].length)
// The second field should form a new group as the previous request
// is now too large (beyond max-coils-per-read) after zero enforcement.
require.Equal(t, maxQuantityCoils, plugin.requests[1].coil[1].address)
require.Equal(t, uint16(1), plugin.requests[1].coil[1].length)
}

View File

@ -1,6 +1,8 @@
package modbus
import "sort"
import (
"sort"
)
type request struct {
address uint16
@ -157,7 +159,21 @@ func optimitzeGroupWithinLimits(g request, maxBatchSize uint16, maxExtraRegister
return requests
}
func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize uint16, optimization string, maxExtraRegisters uint16) []request {
type groupingParams struct {
// Maximum size of a request in registers
MaxBatchSize uint16
// Optimization to use for grouping register groups to requests.
// Also put potential optimization parameters here
Optimization string
MaxExtraRegisters uint16
// Will force reads to start at zero (if possible) while respecting
// the max-batch size.
EnforceFromZero bool
// Tags to add for the requests
Tags map[string]string
}
func groupFieldsToRequests(fields []field, params groupingParams) []request {
if len(fields) == 0 {
return nil
}
@ -187,7 +203,7 @@ func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize
}
// Finish the current request, add it to the list and construct a new one
if current.length > 0 {
if current.length > 0 && len(fields) > 0 {
groups = append(groups, current)
}
current = request{
@ -199,17 +215,27 @@ func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize
current.fields = append(current.fields, f)
}
}
if current.length > 0 {
if current.length > 0 && len(fields) > 0 {
groups = append(groups, current)
}
if len(groups) == 0 {
return nil
}
// Enforce the first read to start at zero if the option is set
if params.EnforceFromZero {
groups[0].length += groups[0].address
groups[0].address = 0
}
var requests []request
switch optimization {
switch params.Optimization {
case "shrink":
// Shrink request by striping leading and trailing fields with an omit flag set
for _, g := range groups {
if len(g.fields) > 0 {
requests = append(requests, shrinkGroup(g, maxBatchSize)...)
requests = append(requests, shrinkGroup(g, params.MaxBatchSize)...)
}
}
case "rearrange":
@ -217,7 +243,7 @@ func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize
// registers while keeping the number of requests
for _, g := range groups {
if len(g.fields) > 0 {
requests = append(requests, optimizeGroup(g, maxBatchSize)...)
requests = append(requests, optimizeGroup(g, params.MaxBatchSize)...)
}
}
case "aggressive":
@ -229,7 +255,7 @@ func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize
total.fields = append(total.fields, g.fields...)
}
}
requests = optimizeGroup(total, maxBatchSize)
requests = optimizeGroup(total, params.MaxBatchSize)
case "max_insert":
// Similar to aggressive but keeps the number of touched registers bellow a threshold
var total request
@ -238,12 +264,12 @@ func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize
total.fields = append(total.fields, g.fields...)
}
}
requests = optimitzeGroupWithinLimits(total, maxBatchSize, maxExtraRegisters)
requests = optimitzeGroupWithinLimits(total, params.MaxBatchSize, params.MaxExtraRegisters)
default:
// no optimization
for _, g := range groups {
if len(g.fields) > 0 {
requests = append(requests, splitMaxBatchSize(g, maxBatchSize)...)
requests = append(requests, splitMaxBatchSize(g, params.MaxBatchSize)...)
}
}
}
@ -251,7 +277,7 @@ func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize
// Copy the tags
for i := range requests {
requests[i].tags = make(map[string]string)
for k, v := range tags {
for k, v := range params.Tags {
requests[i].tags[k] = v
}
}

View File

@ -19,3 +19,8 @@
## This might be necessary for devices not conforming to the spec,
## see https://github.com/influxdata/telegraf/issues/12071.
# one_request_per_field = false
## Enforce the starting address to be zero for the first request on
## coil registers. This is necessary for some devices see
## https://github.com/influxdata/telegraf/issues/8905
# read_coils_starting_at_zero = false