feat(inputs.modbus): optimize grouped requests (#11106)
This commit is contained in:
parent
337e4e34bc
commit
edb2358764
|
|
@ -147,11 +147,21 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
|||
## | to reduce the number of requested registers by keeping
|
||||
## | the number of requests.
|
||||
## |---aggressive -- Rearrange request boundaries similar to "rearrange" but
|
||||
## allow to request registers not specified by the user to
|
||||
## fill gaps. This usually reduces the number of requests at the
|
||||
## cost of more requested registers.
|
||||
## | allow to request registers not specified by the user to
|
||||
## | fill gaps. This usually reduces the number of requests at the
|
||||
## | cost of more requested registers.
|
||||
## |---max_insert -- Rearrange request keeping the number of extra fields below the value
|
||||
## provided in "optimization_max_register_fill". It is not necessary to define 'omitted'
|
||||
## fields as the optimisation will add such field only where needed.
|
||||
# optimization = "none"
|
||||
|
||||
## Maximum number register the optimizer is allowed to insert between two fields to
|
||||
## save requests.
|
||||
## This option is only used for the 'max_insert' optimization strategy.
|
||||
## NOTE: All omitted fields are ignored, so this option denotes the effective hole
|
||||
## size to fill.
|
||||
# optimization_max_register_fill = 50
|
||||
|
||||
## Field definitions
|
||||
## Analog Variables, Input Registers and Holding Registers
|
||||
## address - address of the register to query. For coil and discrete inputs this is the bit address.
|
||||
|
|
@ -397,6 +407,23 @@ interested in but want to minimize the number of requests sent to the device.
|
|||
__Please note:__ This optimization might take long in case of many
|
||||
non-consecutive, non-omitted fields!
|
||||
|
||||
##### `max_insert`
|
||||
|
||||
Fields are assigned to the same request as long as the hole between the fields
|
||||
do not exceed the maximum fill size given in `optimization_max_register_fill`.
|
||||
User-defined omitted fields are ignored and interpreted as holes, so the best
|
||||
practice is to not manually insert omitted fields for this optimizer. This
|
||||
allows to specify only actually used fields and let the optimizer figure out
|
||||
the request organization which can dramatically improve query time. The
|
||||
trade-off here is between the cost of reading additional registers trashed
|
||||
later and the cost of many requests.
|
||||
|
||||
__Please note:__ The optimal value for `optimization_max_register_fill` depends
|
||||
on the network and the queried device. It is hence recommended to test several
|
||||
values and assess performance in order to find the best value. Use the
|
||||
`--test --debug` flags to monitor how may requests are sent and the number of
|
||||
touched registers.
|
||||
|
||||
#### Field definitions
|
||||
|
||||
Each `request` can contain a list of fields to collect from the modbus device.
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ func (c *ConfigurationOriginal) initRequests(fieldDefs []fieldDefinition, maxQua
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return groupFieldsToRequests(fields, nil, maxQuantity, "none"), nil
|
||||
return groupFieldsToRequests(fields, nil, maxQuantity, "none", 0), nil
|
||||
}
|
||||
|
||||
func (c *ConfigurationOriginal) initFields(fieldDefs []fieldDefinition) ([]field, error) {
|
||||
|
|
|
|||
|
|
@ -5,8 +5,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"hash/maphash"
|
||||
|
||||
"github.com/influxdata/telegraf/internal/choice"
|
||||
)
|
||||
|
||||
//go:embed sample_request.conf
|
||||
|
|
@ -23,13 +21,14 @@ type requestFieldDefinition struct {
|
|||
}
|
||||
|
||||
type requestDefinition struct {
|
||||
SlaveID byte `toml:"slave_id"`
|
||||
ByteOrder string `toml:"byte_order"`
|
||||
RegisterType string `toml:"register"`
|
||||
Measurement string `toml:"measurement"`
|
||||
Optimization string `toml:"optimization"`
|
||||
Fields []requestFieldDefinition `toml:"fields"`
|
||||
Tags map[string]string `toml:"tags"`
|
||||
SlaveID byte `toml:"slave_id"`
|
||||
ByteOrder string `toml:"byte_order"`
|
||||
RegisterType string `toml:"register"`
|
||||
Measurement string `toml:"measurement"`
|
||||
Optimization string `toml:"optimization"`
|
||||
MaxExtraRegisters uint16 `toml:"optimization_max_register_fill"`
|
||||
Fields []requestFieldDefinition `toml:"fields"`
|
||||
Tags map[string]string `toml:"tags"`
|
||||
}
|
||||
|
||||
type ConfigurationPerRequest struct {
|
||||
|
|
@ -46,12 +45,6 @@ func (c *ConfigurationPerRequest) Check() error {
|
|||
seenFields := make(map[uint64]bool)
|
||||
|
||||
for _, def := range c.Requests {
|
||||
// Check for valid optimization
|
||||
validOptimizations := []string{"", "none", "shrink", "rearrange", "aggressive"}
|
||||
if !choice.Contains(def.Optimization, validOptimizations) {
|
||||
return fmt.Errorf("unknown optimization %q", def.Optimization)
|
||||
}
|
||||
|
||||
// Check byte order of the data
|
||||
switch def.ByteOrder {
|
||||
case "":
|
||||
|
|
@ -69,7 +62,31 @@ func (c *ConfigurationPerRequest) Check() error {
|
|||
default:
|
||||
return fmt.Errorf("unknown register-type %q", def.RegisterType)
|
||||
}
|
||||
|
||||
// Check for valid optimization
|
||||
switch def.Optimization {
|
||||
case "", "none", "shrink", "rearrange", "aggressive":
|
||||
case "max_insert":
|
||||
switch def.RegisterType {
|
||||
case "coil":
|
||||
if def.MaxExtraRegisters <= 0 || def.MaxExtraRegisters > maxQuantityCoils {
|
||||
return fmt.Errorf("optimization_max_register_fill has to be between 1 and %d", maxQuantityCoils)
|
||||
}
|
||||
case "discrete":
|
||||
if def.MaxExtraRegisters <= 0 || def.MaxExtraRegisters > maxQuantityDiscreteInput {
|
||||
return fmt.Errorf("optimization_max_register_fill has to be between 1 and %d", maxQuantityDiscreteInput)
|
||||
}
|
||||
case "holding":
|
||||
if def.MaxExtraRegisters <= 0 || def.MaxExtraRegisters > maxQuantityHoldingRegisters {
|
||||
return fmt.Errorf("optimization_max_register_fill has to be between 1 and %d", maxQuantityHoldingRegisters)
|
||||
}
|
||||
case "input":
|
||||
if def.MaxExtraRegisters <= 0 || def.MaxExtraRegisters > maxQuantityInputRegisters {
|
||||
return fmt.Errorf("optimization_max_register_fill has to be between 1 and %d", maxQuantityInputRegisters)
|
||||
}
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown optimization %q", def.Optimization)
|
||||
}
|
||||
// Set the default for measurement if required
|
||||
if def.Measurement == "" {
|
||||
def.Measurement = "modbus"
|
||||
|
|
@ -167,28 +184,28 @@ func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) {
|
|||
if c.workarounds.OnRequestPerField {
|
||||
maxQuantity = 1
|
||||
}
|
||||
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization)
|
||||
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization, def.MaxExtraRegisters)
|
||||
set.coil = append(set.coil, requests...)
|
||||
case "discrete":
|
||||
maxQuantity := maxQuantityDiscreteInput
|
||||
if c.workarounds.OnRequestPerField {
|
||||
maxQuantity = 1
|
||||
}
|
||||
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization)
|
||||
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization, def.MaxExtraRegisters)
|
||||
set.discrete = append(set.discrete, requests...)
|
||||
case "holding":
|
||||
maxQuantity := maxQuantityHoldingRegisters
|
||||
if c.workarounds.OnRequestPerField {
|
||||
maxQuantity = 1
|
||||
}
|
||||
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization)
|
||||
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization, def.MaxExtraRegisters)
|
||||
set.holding = append(set.holding, requests...)
|
||||
case "input":
|
||||
maxQuantity := maxQuantityInputRegisters
|
||||
if c.workarounds.OnRequestPerField {
|
||||
maxQuantity = 1
|
||||
}
|
||||
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization)
|
||||
requests := groupFieldsToRequests(fields, def.Tags, maxQuantity, def.Optimization, def.MaxExtraRegisters)
|
||||
set.input = append(set.input, requests...)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown register type %q", def.RegisterType)
|
||||
|
|
|
|||
|
|
@ -138,7 +138,35 @@ func (m *Modbus) Init() error {
|
|||
if err := m.initClient(); err != nil {
|
||||
return fmt.Errorf("initializing client failed: %v", err)
|
||||
}
|
||||
for slaveID, rqs := range m.requests {
|
||||
var nHoldingRegs, nInputsRegs, nDiscreteRegs, nCoilRegs uint16
|
||||
var nHoldingFields, nInputsFields, nDiscreteFields, nCoilFields int
|
||||
|
||||
for _, r := range rqs.holding {
|
||||
nHoldingRegs += r.length
|
||||
nHoldingFields += len(r.fields)
|
||||
}
|
||||
for _, r := range rqs.input {
|
||||
nInputsRegs += r.length
|
||||
nInputsFields += len(r.fields)
|
||||
}
|
||||
for _, r := range rqs.discrete {
|
||||
nDiscreteRegs += r.length
|
||||
nDiscreteFields += len(r.fields)
|
||||
}
|
||||
for _, r := range rqs.coil {
|
||||
nCoilRegs += r.length
|
||||
nCoilFields += len(r.fields)
|
||||
}
|
||||
m.Log.Infof("Got %d request(s) touching %d holding registers for %d fields (slave %d)",
|
||||
len(rqs.holding), nHoldingRegs, nHoldingFields, slaveID)
|
||||
m.Log.Infof("Got %d request(s) touching %d inputs registers for %d fields (slave %d)",
|
||||
len(rqs.input), nInputsRegs, nInputsFields, slaveID)
|
||||
m.Log.Infof("Got %d request(s) touching %d discrete registers for %d fields (slave %d)",
|
||||
len(rqs.discrete), nDiscreteRegs, nDiscreteFields, slaveID)
|
||||
m.Log.Infof("Got %d request(s) touching %d coil registers for %d fields (slave %d)",
|
||||
len(rqs.coil), nCoilRegs, nCoilFields, slaveID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3130,6 +3130,70 @@ func TestConfigurationPerRequestFail(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestConfigurationMaxExtraRegisterFail(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
requests []requestDefinition
|
||||
errormsg string
|
||||
}{{
|
||||
name: "MaxExtraRegister too large",
|
||||
requests: []requestDefinition{
|
||||
{
|
||||
SlaveID: 1,
|
||||
ByteOrder: "ABCD",
|
||||
RegisterType: "input",
|
||||
Optimization: "max_insert",
|
||||
MaxExtraRegisters: 5000,
|
||||
Fields: []requestFieldDefinition{
|
||||
{
|
||||
Name: "input-0",
|
||||
Address: uint16(0),
|
||||
Measurement: "foo",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
errormsg: "configuration invalid: optimization_max_register_fill has to be between 1 and 125",
|
||||
},
|
||||
{
|
||||
name: "MaxExtraRegister too small",
|
||||
requests: []requestDefinition{
|
||||
{
|
||||
SlaveID: 1,
|
||||
ByteOrder: "ABCD",
|
||||
RegisterType: "input",
|
||||
Optimization: "max_insert",
|
||||
MaxExtraRegisters: 0,
|
||||
Fields: []requestFieldDefinition{
|
||||
{
|
||||
Name: "input-0",
|
||||
Address: uint16(0),
|
||||
Measurement: "foo",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
errormsg: "configuration invalid: optimization_max_register_fill has to be between 1 and 125",
|
||||
}}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
plugin := Modbus{
|
||||
Name: "Test",
|
||||
Controller: "tcp://localhost:1502",
|
||||
ConfigurationType: "request",
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
plugin.Requests = tt.requests
|
||||
|
||||
err := plugin.Init()
|
||||
require.Error(t, err)
|
||||
require.Equal(t, tt.errormsg, err.Error())
|
||||
require.Empty(t, plugin.requests)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestsStartingWithOmits(t *testing.T) {
|
||||
modbus := Modbus{
|
||||
Name: "Test",
|
||||
|
|
@ -4224,3 +4288,136 @@ func TestRegisterWorkaroundsOneRequestPerField(t *testing.T) {
|
|||
require.NoError(t, plugin.Init())
|
||||
require.Len(t, plugin.requests[1].holding, len(plugin.HoldingRegisters))
|
||||
}
|
||||
|
||||
func TestRequestOptimizationMaxInsertSmall(t *testing.T) {
|
||||
maxsize := maxQuantityHoldingRegisters
|
||||
maxExtraRegisters := uint16(5)
|
||||
tests := []struct {
|
||||
name string
|
||||
inputs []rangeDefinition
|
||||
expected []requestExpectation
|
||||
}{
|
||||
{
|
||||
name: "large gaps",
|
||||
inputs: []rangeDefinition{
|
||||
{18, 3, 1, 1, "INT16", false},
|
||||
{maxsize - 2, 5, 1, 1, "INT16", false},
|
||||
{maxsize + 42, 2, 1, 1, "INT16", false},
|
||||
},
|
||||
expected: []requestExpectation{
|
||||
{
|
||||
fields: []rangeDefinition{{start: 18, count: 3, length: 1}},
|
||||
req: request{address: 18, length: 3},
|
||||
},
|
||||
{
|
||||
fields: []rangeDefinition{
|
||||
{start: maxsize - 2, count: 5, length: 1},
|
||||
},
|
||||
req: request{address: maxsize - 2, length: 5},
|
||||
},
|
||||
{
|
||||
fields: []rangeDefinition{
|
||||
{start: maxsize + 42, count: 2, length: 1},
|
||||
},
|
||||
req: request{address: maxsize + 42, length: 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "large gaps filled",
|
||||
inputs: []rangeDefinition{
|
||||
{0, 1, 1, 1, "INT16", false},
|
||||
{1, 17, 1, 1, "INT16", true},
|
||||
{18, 3, 1, 1, "INT16", false},
|
||||
{21, maxsize - 23, 1, 1, "INT16", true},
|
||||
{maxsize - 2, 5, 1, 1, "INT16", false},
|
||||
{maxsize + 3, 39, 1, 1, "INT16", true},
|
||||
{maxsize + 42, 2, 1, 1, "INT16", false},
|
||||
},
|
||||
expected: []requestExpectation{
|
||||
{
|
||||
fields: []rangeDefinition{
|
||||
{start: 0, count: 1, length: 1},
|
||||
},
|
||||
req: request{address: 0, length: 1},
|
||||
},
|
||||
{
|
||||
fields: []rangeDefinition{
|
||||
{start: 18, count: 3, length: 1},
|
||||
},
|
||||
req: request{address: 18, length: 3},
|
||||
},
|
||||
{
|
||||
fields: []rangeDefinition{
|
||||
{start: maxsize - 2, count: 5, length: 1},
|
||||
},
|
||||
req: request{address: maxsize - 2, length: 5},
|
||||
},
|
||||
{
|
||||
fields: []rangeDefinition{
|
||||
{start: maxsize + 42, count: 2, length: 1},
|
||||
},
|
||||
req: request{address: maxsize + 42, length: 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "large gaps filled with offset",
|
||||
inputs: []rangeDefinition{
|
||||
{18, 3, 1, 1, "INT16", false},
|
||||
{21, maxsize - 23, 1, 1, "INT16", true},
|
||||
{maxsize - 2, 5, 1, 1, "INT16", false},
|
||||
{maxsize + 3, 39, 1, 1, "INT16", true},
|
||||
{maxsize + 42, 2, 1, 1, "INT16", false},
|
||||
},
|
||||
expected: []requestExpectation{
|
||||
{
|
||||
fields: []rangeDefinition{{start: 18, count: 3, length: 1}},
|
||||
req: request{address: 18, length: 3},
|
||||
},
|
||||
{
|
||||
fields: []rangeDefinition{
|
||||
{start: maxsize - 2, count: 5, length: 1},
|
||||
},
|
||||
req: request{address: maxsize - 2, length: 5},
|
||||
},
|
||||
{
|
||||
fields: []rangeDefinition{
|
||||
{start: maxsize + 42, count: 2, length: 1},
|
||||
},
|
||||
req: request{address: maxsize + 42, length: 2},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Generate the input structure and the expectation
|
||||
requestFields := generateRequestDefinitions(tt.inputs)
|
||||
expected := generateExpectation(tt.expected)
|
||||
|
||||
// Setup the plugin
|
||||
slaveID := byte(1)
|
||||
plugin := Modbus{
|
||||
Name: "Test",
|
||||
Controller: "tcp://localhost:1502",
|
||||
ConfigurationType: "request",
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
plugin.Requests = []requestDefinition{
|
||||
{
|
||||
SlaveID: slaveID,
|
||||
ByteOrder: "ABCD",
|
||||
RegisterType: "holding",
|
||||
Optimization: "max_insert",
|
||||
MaxExtraRegisters: maxExtraRegisters,
|
||||
Fields: requestFields,
|
||||
},
|
||||
}
|
||||
require.NoError(t, plugin.Init())
|
||||
require.NotEmpty(t, plugin.requests)
|
||||
require.Contains(t, plugin.requests, slaveID)
|
||||
requireEqualRequests(t, expected, plugin.requests[slaveID].holding)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -123,7 +123,41 @@ func optimizeGroup(g request, maxBatchSize uint16) []request {
|
|||
return requests
|
||||
}
|
||||
|
||||
func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize uint16, optimization string) []request {
|
||||
func optimitzeGroupWithinLimits(g request, maxBatchSize uint16, maxExtraRegisters uint16) []request {
|
||||
if len(g.fields) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var requests []request
|
||||
currentRequest := request{
|
||||
fields: []field{g.fields[0]},
|
||||
address: g.fields[0].address,
|
||||
length: g.fields[0].length,
|
||||
}
|
||||
for i := 1; i <= len(g.fields)-1; i++ {
|
||||
// Check if we need to interrupt the current chunk and require a new one
|
||||
holeSize := g.fields[i].address - (g.fields[i-1].address + g.fields[i-1].length)
|
||||
needInterrupt := holeSize > maxExtraRegisters // too far apart
|
||||
needInterrupt = needInterrupt || currentRequest.length+holeSize+g.fields[i].length > maxBatchSize // too large
|
||||
if !needInterrupt {
|
||||
// Still safe to add the field to the current request
|
||||
currentRequest.length = g.fields[i].address + g.fields[i].length - currentRequest.address
|
||||
currentRequest.fields = append(currentRequest.fields, g.fields[i])
|
||||
continue
|
||||
}
|
||||
// Finish the current request, add it to the list and construct a new one
|
||||
requests = append(requests, currentRequest)
|
||||
currentRequest = request{
|
||||
fields: []field{g.fields[i]},
|
||||
address: g.fields[i].address,
|
||||
length: g.fields[i].length,
|
||||
}
|
||||
}
|
||||
requests = append(requests, currentRequest)
|
||||
return requests
|
||||
}
|
||||
|
||||
func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize uint16, optimization string, maxExtraRegisters uint16) []request {
|
||||
if len(fields) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -196,6 +230,15 @@ func groupFieldsToRequests(fields []field, tags map[string]string, maxBatchSize
|
|||
}
|
||||
}
|
||||
requests = optimizeGroup(total, maxBatchSize)
|
||||
case "max_insert":
|
||||
// Similar to aggressive but keeps the number of touched registers bellow a threshold
|
||||
var total request
|
||||
for _, g := range groups {
|
||||
if len(g.fields) > 0 {
|
||||
total.fields = append(total.fields, g.fields...)
|
||||
}
|
||||
}
|
||||
requests = optimitzeGroupWithinLimits(total, maxBatchSize, maxExtraRegisters)
|
||||
default:
|
||||
// no optimization
|
||||
for _, g := range groups {
|
||||
|
|
|
|||
|
|
@ -33,11 +33,21 @@
|
|||
## | to reduce the number of requested registers by keeping
|
||||
## | the number of requests.
|
||||
## |---aggressive -- Rearrange request boundaries similar to "rearrange" but
|
||||
## allow to request registers not specified by the user to
|
||||
## fill gaps. This usually reduces the number of requests at the
|
||||
## cost of more requested registers.
|
||||
## | allow to request registers not specified by the user to
|
||||
## | fill gaps. This usually reduces the number of requests at the
|
||||
## | cost of more requested registers.
|
||||
## |---max_insert -- Rearrange request keeping the number of extra fields below the value
|
||||
## provided in "optimization_max_register_fill". It is not necessary to define 'omitted'
|
||||
## fields as the optimisation will add such field only where needed.
|
||||
# optimization = "none"
|
||||
|
||||
## Maximum number register the optimizer is allowed to insert between two fields to
|
||||
## save requests.
|
||||
## This option is only used for the 'max_insert' optimization strategy.
|
||||
## NOTE: All omitted fields are ignored, so this option denotes the effective hole
|
||||
## size to fill.
|
||||
# optimization_max_register_fill = 50
|
||||
|
||||
## Field definitions
|
||||
## Analog Variables, Input Registers and Holding Registers
|
||||
## address - address of the register to query. For coil and discrete inputs this is the bit address.
|
||||
|
|
|
|||
Loading…
Reference in New Issue