fix(inputs.knx_listener): Reconnect after connection loss (#14959)

This commit is contained in:
Sven Rebhan 2024-03-18 15:57:33 +01:00 committed by GitHub
parent 0f91ca6f67
commit 6afa18fd52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 95 additions and 17 deletions

View File

@ -3,9 +3,11 @@ package knx_listener
import (
_ "embed"
"errors"
"fmt"
"reflect"
"sync"
"sync/atomic"
"github.com/vapourismo/knx-go/knx"
"github.com/vapourismo/knx-go/knx/dpt"
@ -43,22 +45,27 @@ type KNXListener struct {
gaTargetMap map[string]addressTarget
gaLogbook map[string]bool
acc telegraf.Accumulator
wg sync.WaitGroup
wg sync.WaitGroup
connected atomic.Bool
}
func (*KNXListener) SampleConfig() string {
return sampleConfig
}
func (kl *KNXListener) Gather(_ telegraf.Accumulator) error {
func (kl *KNXListener) Gather(acc telegraf.Accumulator) error {
if !kl.connected.Load() {
// We got disconnected for some reason, so try to reconnect in every
// gather cycle until we are reconnected
if err := kl.Start(acc); err != nil {
return fmt.Errorf("reconnecting to bus failed: %w", err)
}
}
return nil
}
func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
// Store the accumulator for later use
kl.acc = acc
func (kl *KNXListener) Init() error {
// Setup a logbook to track unknown GAs to avoid log-spamming
kl.gaLogbook = make(map[string]bool)
@ -80,6 +87,10 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
}
}
return nil
}
func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
// Connect to the KNX-IP interface
kl.Log.Infof("Trying to connect to %q at %q", kl.ServiceType, kl.ServiceAddress)
switch kl.ServiceType {
@ -112,12 +123,15 @@ func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
return fmt.Errorf("invalid interface type: %s", kl.ServiceAddress)
}
kl.Log.Infof("Connected!")
kl.connected.Store(true)
// Listen to the KNX bus
kl.wg.Add(1)
go func() {
kl.wg.Done()
kl.listen()
defer kl.wg.Done()
kl.listen(acc)
kl.connected.Store(false)
acc.AddError(errors.New("disconnected from bus"))
}()
return nil
@ -130,7 +144,7 @@ func (kl *KNXListener) Stop() {
}
}
func (kl *KNXListener) listen() {
func (kl *KNXListener) listen(acc telegraf.Accumulator) {
for msg := range kl.client.Inbound() {
// Match GA to DataPointType and measurement name
ga := msg.Destination.String()
@ -177,7 +191,7 @@ func (kl *KNXListener) listen() {
"unit": target.datapoint.(dpt.DatapointMeta).Unit(),
"source": msg.Source.String(),
}
kl.acc.AddFields(target.measurement, fields, tags)
acc.AddFields(target.measurement, fields, tags)
}
}

View File

@ -36,13 +36,13 @@ func setValue(data dpt.DatapointValue, value interface{}) error {
return nil
}
type TestMessage struct {
type message struct {
address string
dpt string
value interface{}
}
func ProduceKnxEvent(t *testing.T, address string, datapoint string, value interface{}) *knx.GroupEvent {
func produceKnxEvent(t *testing.T, address string, datapoint string, value interface{}) *knx.GroupEvent {
addr, err := cemi.NewGroupAddrString(address)
require.NoError(t, err)
@ -60,7 +60,7 @@ func ProduceKnxEvent(t *testing.T, address string, datapoint string, value inter
func TestRegularReceives_DPT(t *testing.T) {
// Define the test-cases
var testcases = []TestMessage{
var testcases = []message{
{"1/0/1", "1.001", true},
{"1/0/2", "1.002", false},
{"1/0/3", "1.003", true},
@ -101,6 +101,7 @@ func TestRegularReceives_DPT(t *testing.T) {
Measurements: measurements,
Log: testutil.Logger{Name: "knx_listener"},
}
require.NoError(t, listener.Init())
// Setup the listener to test
err := listener.Start(acc)
@ -111,7 +112,7 @@ func TestRegularReceives_DPT(t *testing.T) {
// Send the defined test data
for _, testcase := range testcases {
event := ProduceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}
@ -147,6 +148,7 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
},
Log: testutil.Logger{Name: "knx_listener"},
}
require.NoError(t, listener.Init())
acc := &testutil.Accumulator{}
@ -155,7 +157,7 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
require.NoError(t, err)
client := listener.client.(*KNXDummyInterface)
testMessages := []TestMessage{
testMessages := []message{
{"1/1/1", "1.001", true},
{"1/1/1", "1.001", false},
{"1/1/2", "1.001", false},
@ -163,7 +165,7 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
}
for _, testcase := range testMessages {
event := ProduceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}
@ -190,3 +192,65 @@ func TestRegularReceives_MultipleMessages(t *testing.T) {
require.Truef(t, ok, "bool type expected, got '%T' with '%v' value instead", acc.Metrics[1].Fields["value"], acc.Metrics[1].Fields["value"])
require.False(t, v)
}
func TestReconnect(t *testing.T) {
listener := KNXListener{
ServiceType: "dummy",
Measurements: []Measurement{
{"temperature", "1.001", []string{"1/1/1"}},
},
Log: testutil.Logger{Name: "knx_listener"},
}
require.NoError(t, listener.Init())
var acc testutil.Accumulator
// Setup the listener to test
require.NoError(t, listener.Start(&acc))
defer listener.Stop()
client := listener.client.(*KNXDummyInterface)
testMessages := []message{
{"1/1/1", "1.001", true},
{"1/1/1", "1.001", false},
{"1/1/2", "1.001", false},
{"1/1/2", "1.001", true},
}
for _, testcase := range testMessages {
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}
// Give the accumulator some time to collect the data
require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= 2
}, 3*time.Second, 100*time.Millisecond, "expected 2 metric but got %d", acc.NMetrics())
require.True(t, listener.connected.Load())
client.Close()
require.Eventually(t, func() bool {
return !listener.connected.Load()
}, 3*time.Second, 100*time.Millisecond, "no disconnect")
acc.Lock()
err := acc.FirstError()
acc.Unlock()
require.ErrorContains(t, err, "disconnected from bus")
require.NoError(t, listener.Gather(&acc))
require.Eventually(t, func() bool {
return listener.connected.Load()
}, 3*time.Second, 100*time.Millisecond, "no reconnect")
client = listener.client.(*KNXDummyInterface)
for _, testcase := range testMessages {
event := produceKnxEvent(t, testcase.address, testcase.dpt, testcase.value)
client.Send(*event)
}
// Give the accumulator some time to collect the data
require.Eventuallyf(t, func() bool {
return acc.NMetrics() >= 2
}, 3*time.Second, 100*time.Millisecond, "expected 2 metric but got %d", acc.NMetrics())
}