fix(inputs.modbus): Fix optimization of overlapping requests and add warning (#13486)
This commit is contained in:
parent
c459d7b8ff
commit
56aac4f0e1
|
|
@ -3,6 +3,8 @@ package modbus
|
||||||
import (
|
import (
|
||||||
_ "embed"
|
_ "embed"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:embed sample_register.conf
|
//go:embed sample_register.conf
|
||||||
|
|
@ -24,6 +26,7 @@ type ConfigurationOriginal struct {
|
||||||
HoldingRegisters []fieldDefinition `toml:"holding_registers"`
|
HoldingRegisters []fieldDefinition `toml:"holding_registers"`
|
||||||
InputRegisters []fieldDefinition `toml:"input_registers"`
|
InputRegisters []fieldDefinition `toml:"input_registers"`
|
||||||
workarounds ModbusWorkarounds
|
workarounds ModbusWorkarounds
|
||||||
|
logger telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConfigurationOriginal) SampleConfigPart() string {
|
func (c *ConfigurationOriginal) SampleConfigPart() string {
|
||||||
|
|
@ -99,6 +102,7 @@ func (c *ConfigurationOriginal) initRequests(fieldDefs []fieldDefinition, maxQua
|
||||||
MaxBatchSize: maxQuantity,
|
MaxBatchSize: maxQuantity,
|
||||||
Optimization: "none",
|
Optimization: "none",
|
||||||
EnforceFromZero: c.workarounds.ReadCoilsStartingAtZero,
|
EnforceFromZero: c.workarounds.ReadCoilsStartingAtZero,
|
||||||
|
Log: c.logger,
|
||||||
}
|
}
|
||||||
|
|
||||||
return groupFieldsToRequests(fields, params), nil
|
return groupFieldsToRequests(fields, params), nil
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/maphash"
|
"hash/maphash"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
)
|
)
|
||||||
|
|
||||||
//go:embed sample_request.conf
|
//go:embed sample_request.conf
|
||||||
|
|
@ -34,6 +36,7 @@ type requestDefinition struct {
|
||||||
type ConfigurationPerRequest struct {
|
type ConfigurationPerRequest struct {
|
||||||
Requests []requestDefinition `toml:"request"`
|
Requests []requestDefinition `toml:"request"`
|
||||||
workarounds ModbusWorkarounds
|
workarounds ModbusWorkarounds
|
||||||
|
logger telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConfigurationPerRequest) SampleConfigPart() string {
|
func (c *ConfigurationPerRequest) SampleConfigPart() string {
|
||||||
|
|
@ -191,6 +194,7 @@ func (c *ConfigurationPerRequest) Process() (map[byte]requestSet, error) {
|
||||||
MaxExtraRegisters: def.MaxExtraRegisters,
|
MaxExtraRegisters: def.MaxExtraRegisters,
|
||||||
Optimization: def.Optimization,
|
Optimization: def.Optimization,
|
||||||
Tags: def.Tags,
|
Tags: def.Tags,
|
||||||
|
Log: c.logger,
|
||||||
}
|
}
|
||||||
switch def.RegisterType {
|
switch def.RegisterType {
|
||||||
case "coil":
|
case "coil":
|
||||||
|
|
|
||||||
|
|
@ -137,9 +137,11 @@ func (m *Modbus) Init() error {
|
||||||
switch m.ConfigurationType {
|
switch m.ConfigurationType {
|
||||||
case "", "register":
|
case "", "register":
|
||||||
m.ConfigurationOriginal.workarounds = m.Workarounds
|
m.ConfigurationOriginal.workarounds = m.Workarounds
|
||||||
|
m.ConfigurationOriginal.logger = m.Log
|
||||||
cfg = &m.ConfigurationOriginal
|
cfg = &m.ConfigurationOriginal
|
||||||
case "request":
|
case "request":
|
||||||
m.ConfigurationPerRequest.workarounds = m.Workarounds
|
m.ConfigurationPerRequest.workarounds = m.Workarounds
|
||||||
|
m.ConfigurationPerRequest.logger = m.Log
|
||||||
cfg = &m.ConfigurationPerRequest
|
cfg = &m.ConfigurationPerRequest
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unknown configuration type %q", m.ConfigurationType)
|
return fmt.Errorf("unknown configuration type %q", m.ConfigurationType)
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -4937,3 +4938,61 @@ func TestRequestsWorkaroundsReadCoilsStartingAtZeroRegister(t *testing.T) {
|
||||||
require.Equal(t, maxQuantityCoils, plugin.requests[1].coil[1].address)
|
require.Equal(t, maxQuantityCoils, plugin.requests[1].coil[1].address)
|
||||||
require.Equal(t, uint16(1), plugin.requests[1].coil[1].length)
|
require.Equal(t, uint16(1), plugin.requests[1].coil[1].length)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRequestsOverlap(t *testing.T) {
|
||||||
|
logger := &testutil.CaptureLogger{}
|
||||||
|
plugin := Modbus{
|
||||||
|
Name: "Test",
|
||||||
|
Controller: "tcp://localhost:1502",
|
||||||
|
ConfigurationType: "request",
|
||||||
|
Log: logger,
|
||||||
|
Workarounds: ModbusWorkarounds{ReadCoilsStartingAtZero: true},
|
||||||
|
}
|
||||||
|
plugin.Requests = []requestDefinition{
|
||||||
|
{
|
||||||
|
SlaveID: 1,
|
||||||
|
RegisterType: "holding",
|
||||||
|
Optimization: "max_insert",
|
||||||
|
MaxExtraRegisters: 16,
|
||||||
|
Fields: []requestFieldDefinition{
|
||||||
|
{
|
||||||
|
Name: "field-1",
|
||||||
|
InputType: "UINT32",
|
||||||
|
Address: uint16(1),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "field-2",
|
||||||
|
InputType: "UINT64",
|
||||||
|
Address: uint16(3),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "field-3",
|
||||||
|
InputType: "UINT32",
|
||||||
|
Address: uint16(5),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "field-4",
|
||||||
|
InputType: "UINT32",
|
||||||
|
Address: uint16(7),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
return len(logger.Warnings()) > 0
|
||||||
|
}, 3*time.Second, 100*time.Millisecond)
|
||||||
|
|
||||||
|
var found bool
|
||||||
|
for _, w := range logger.Warnings() {
|
||||||
|
if strings.Contains(w, "Request at 3 with length 4 overlaps with next request at 5") {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.True(t, found, "Overlap warning not found!")
|
||||||
|
|
||||||
|
require.Len(t, plugin.requests, 1)
|
||||||
|
require.Len(t, plugin.requests[1].holding, 1)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,8 @@ package modbus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
)
|
)
|
||||||
|
|
||||||
type request struct {
|
type request struct {
|
||||||
|
|
@ -125,7 +127,7 @@ func optimizeGroup(g request, maxBatchSize uint16) []request {
|
||||||
return requests
|
return requests
|
||||||
}
|
}
|
||||||
|
|
||||||
func optimitzeGroupWithinLimits(g request, maxBatchSize uint16, maxExtraRegisters uint16) []request {
|
func optimitzeGroupWithinLimits(g request, params groupingParams) []request {
|
||||||
if len(g.fields) == 0 {
|
if len(g.fields) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -139,8 +141,15 @@ func optimitzeGroupWithinLimits(g request, maxBatchSize uint16, maxExtraRegister
|
||||||
for i := 1; i <= len(g.fields)-1; i++ {
|
for i := 1; i <= len(g.fields)-1; i++ {
|
||||||
// Check if we need to interrupt the current chunk and require a new one
|
// Check if we need to interrupt the current chunk and require a new one
|
||||||
holeSize := g.fields[i].address - (g.fields[i-1].address + g.fields[i-1].length)
|
holeSize := g.fields[i].address - (g.fields[i-1].address + g.fields[i-1].length)
|
||||||
needInterrupt := holeSize > maxExtraRegisters // too far apart
|
if g.fields[i].address < g.fields[i-1].address+g.fields[i-1].length {
|
||||||
needInterrupt = needInterrupt || currentRequest.length+holeSize+g.fields[i].length > maxBatchSize // too large
|
params.Log.Warnf(
|
||||||
|
"Request at %d with length %d overlaps with next request at %d",
|
||||||
|
g.fields[i-1].address, g.fields[i-1].length, g.fields[i].address,
|
||||||
|
)
|
||||||
|
holeSize = 0
|
||||||
|
}
|
||||||
|
needInterrupt := holeSize > params.MaxExtraRegisters // too far apart
|
||||||
|
needInterrupt = needInterrupt || currentRequest.length+holeSize+g.fields[i].length > params.MaxBatchSize // too large
|
||||||
if !needInterrupt {
|
if !needInterrupt {
|
||||||
// Still safe to add the field to the current request
|
// Still safe to add the field to the current request
|
||||||
currentRequest.length = g.fields[i].address + g.fields[i].length - currentRequest.address
|
currentRequest.length = g.fields[i].address + g.fields[i].length - currentRequest.address
|
||||||
|
|
@ -171,6 +180,8 @@ type groupingParams struct {
|
||||||
EnforceFromZero bool
|
EnforceFromZero bool
|
||||||
// Tags to add for the requests
|
// Tags to add for the requests
|
||||||
Tags map[string]string
|
Tags map[string]string
|
||||||
|
// Log facility to inform the user
|
||||||
|
Log telegraf.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func groupFieldsToRequests(fields []field, params groupingParams) []request {
|
func groupFieldsToRequests(fields []field, params groupingParams) []request {
|
||||||
|
|
@ -264,7 +275,7 @@ func groupFieldsToRequests(fields []field, params groupingParams) []request {
|
||||||
total.fields = append(total.fields, g.fields...)
|
total.fields = append(total.fields, g.fields...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
requests = optimitzeGroupWithinLimits(total, params.MaxBatchSize, params.MaxExtraRegisters)
|
requests = optimitzeGroupWithinLimits(total, params)
|
||||||
default:
|
default:
|
||||||
// no optimization
|
// no optimization
|
||||||
for _, g := range groups {
|
for _, g := range groups {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue