Add input plugin for KNX home automation bus (#7048)

This commit is contained in:
Sven Rebhan 2021-03-18 15:39:44 +01:00 committed by GitHub
parent 1746f96f15
commit 1eb47e245c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 432 additions and 0 deletions

View File

@ -232,6 +232,7 @@ For documentation on the latest development code see the [documentation index][d
* [kernel](./plugins/inputs/kernel)
* [kernel_vmstat](./plugins/inputs/kernel_vmstat)
* [kibana](./plugins/inputs/kibana)
* [knx_listener](./plugins/inputs/knx_listener)
* [kubernetes](./plugins/inputs/kubernetes)
* [kube_inventory](./plugins/inputs/kube_inventory)
* [lanz](./plugins/inputs/lanz)

View File

@ -169,6 +169,7 @@ following works:
- github.com/tidwall/match [MIT License](https://github.com/tidwall/match/blob/master/LICENSE)
- github.com/tidwall/pretty [MIT License](https://github.com/tidwall/pretty/blob/master/LICENSE)
- github.com/tinylib/msgp [MIT License](https://github.com/tinylib/msgp/blob/master/LICENSE)
- github.com/vapourismo/knx-go [MIT License](https://github.com/vapourismo/knx-go/blob/master/LICENSE)
- github.com/vishvananda/netlink [Apache License 2.0](https://github.com/vishvananda/netlink/blob/master/LICENSE)
- github.com/vishvananda/netns [Apache License 2.0](https://github.com/vishvananda/netns/blob/master/LICENSE)
- github.com/vjeantet/grok [Apache License 2.0](https://github.com/vjeantet/grok/blob/master/LICENSE)

1
go.mod
View File

@ -131,6 +131,7 @@ require (
github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62
github.com/tidwall/gjson v1.6.0
github.com/tinylib/msgp v1.1.5
github.com/vapourismo/knx-go v0.0.0-20201122213738-75fe09ace330
github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e // indirect
github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc // indirect
github.com/vjeantet/grok v1.0.1

2
go.sum
View File

@ -711,6 +711,8 @@ github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/vapourismo/knx-go v0.0.0-20201122213738-75fe09ace330 h1:iBlTJosRsR70amr0zsmSPvaKNH8K/p3YlX/5SdPmSl8=
github.com/vapourismo/knx-go v0.0.0-20201122213738-75fe09ace330/go.mod h1:7+aWBsUJCo9OQRCgTypRmIQW9KKKcPMjtrdnYIBsS70=
github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec/go.mod h1:owBmyHYMLkxyrugmfwE/DLJyW8Ro9mkphwuVErQ0iUw=
github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e h1:f1yevOHP+Suqk0rVc13fIkzcLULJbyQcXDba2klljD0=
github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=

View File

@ -87,6 +87,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/kernel_vmstat"
_ "github.com/influxdata/telegraf/plugins/inputs/kibana"
_ "github.com/influxdata/telegraf/plugins/inputs/kinesis_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/knx_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/kube_inventory"
_ "github.com/influxdata/telegraf/plugins/inputs/kubernetes"
_ "github.com/influxdata/telegraf/plugins/inputs/lanz"

View File

@ -0,0 +1,66 @@
# KNX input plugin
The KNX input plugin that listens for messages on the KNX home-automation bus.
This plugin connects to the KNX bus via a KNX-IP interface.
Information about supported KNX message datapoint types can be found at the
underlying "knx-go" project site (https://github.com/vapourismo/knx-go).
### Configuration
This is a sample config for the plugin.
```toml
# Listener capable of handling KNX bus messages provided through a KNX-IP Interface.
[[inputs.KNXListener]]
## Type of KNX-IP interface.
## Can be either "tunnel" or "router".
# service_type = "tunnel"
## Address of the KNX-IP interface.
service_address = "localhost:3671"
## Measurement definition(s)
# [[inputs.KNXListener.measurement]]
# ## Name of the measurement
# name = "temperature"
# ## Datapoint-Type (DPT) of the KNX messages
# dpt = "9.001"
# ## List of Group-Addresses (GAs) assigned to the measurement
# addresses = ["5/5/1"]
# [[inputs.KNXListener.measurement]]
# name = "illumination"
# dpt = "9.004"
# addresses = ["5/5/3"]
```
#### Measurement configurations
Each measurement contains only one datapoint-type (DPT) and assigns a list of
addresses to this measurement. You can, for example group all temperature sensor
messages within a "temperature" measurement. However, you are free to split
messages of one datapoint-type to multiple measurements.
**NOTE: You should not assign a group-address (GA) to multiple measurements!**
### Metrics
Received KNX data is stored in the named measurement as configured above using
the "value" field. Additional to the value, there are the following tags added
to the datapoint:
- "groupaddress": KNX group-address corresponding to the value
- "unit": unit of the value
- "source": KNX physical address sending the value
To find out about the datatype of the datapoint please check your KNX project,
the KNX-specification or the "knx-go" project for the corresponding DPT.
### Example Output
This section shows example output in Line Protocol format.
```
illumination,groupaddress=5/5/4,host=Hugin,source=1.1.12,unit=lux value=17.889999389648438 1582132674999013274
temperature,groupaddress=5/5/1,host=Hugin,source=1.1.8,unit=°C value=17.799999237060547 1582132663427587361
windowopen,groupaddress=1/0/1,host=Hugin,source=1.1.3 value=true 1582132630425581320
```

View File

@ -0,0 +1,28 @@
package knx_listener
import (
"github.com/vapourismo/knx-go/knx"
)
type KNXDummyInterface struct {
inbound chan knx.GroupEvent
}
func NewDummyInterface() (di KNXDummyInterface, err error) {
di, err = KNXDummyInterface{}, nil
di.inbound = make(chan knx.GroupEvent)
return di, err
}
func (di *KNXDummyInterface) Send(event knx.GroupEvent) {
di.inbound <- event
}
func (di *KNXDummyInterface) Inbound() <-chan knx.GroupEvent {
return di.inbound
}
func (di *KNXDummyInterface) Close() {
close(di.inbound)
}

View File

@ -0,0 +1,197 @@
package knx_listener
import (
"fmt"
"reflect"
"sync"
"github.com/vapourismo/knx-go/knx"
"github.com/vapourismo/knx-go/knx/dpt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
type KNXInterface interface {
Inbound() <-chan knx.GroupEvent
Close()
}
type addressTarget struct {
measurement string
datapoint dpt.DatapointValue
}
type Measurement struct {
Name string
Dpt string
Addresses []string
}
type KNXListener struct {
ServiceType string `toml:"service_type"`
ServiceAddress string `toml:"service_address"`
Measurements []Measurement `toml:"measurement"`
Log telegraf.Logger `toml:"-"`
client KNXInterface
gaTargetMap map[string]addressTarget
gaLogbook map[string]bool
acc telegraf.Accumulator
wg sync.WaitGroup
}
func (kl *KNXListener) Description() string {
return "Listener capable of handling KNX bus messages provided through a KNX-IP Interface."
}
func (kl *KNXListener) SampleConfig() string {
return `
## Type of KNX-IP interface.
## Can be either "tunnel" or "router".
# service_type = "tunnel"
## Address of the KNX-IP interface.
service_address = "localhost:3671"
## Measurement definition(s)
# [[inputs.KNXListener.measurement]]
# ## Name of the measurement
# name = "temperature"
# ## Datapoint-Type (DPT) of the KNX messages
# dpt = "9.001"
# ## List of Group-Addresses (GAs) assigned to the measurement
# addresses = ["5/5/1"]
# [[inputs.KNXListener.measurement]]
# name = "illumination"
# dpt = "9.004"
# addresses = ["5/5/3"]
`
}
func (kl *KNXListener) Gather(_ telegraf.Accumulator) error {
return nil
}
func (kl *KNXListener) Start(acc telegraf.Accumulator) error {
// Store the accumulator for later use
kl.acc = acc
// Setup a logbook to track unknown GAs to avoid log-spamming
kl.gaLogbook = make(map[string]bool)
// Construct the mapping of Group-addresses (GAs) to DPTs and the name
// of the measurement
kl.gaTargetMap = make(map[string]addressTarget)
for _, m := range kl.Measurements {
kl.Log.Debugf("Group-address mapping for measurement %q:", m.Name)
for _, ga := range m.Addresses {
kl.Log.Debugf(" %s --> %s", ga, m.Dpt)
if _, ok := kl.gaTargetMap[ga]; ok {
return fmt.Errorf("duplicate specification of address %q", ga)
}
d, ok := dpt.Produce(m.Dpt)
if !ok {
return fmt.Errorf("cannot create datapoint-type %q for address %q", m.Dpt, ga)
}
kl.gaTargetMap[ga] = addressTarget{m.Name, d}
}
}
// Connect to the KNX-IP interface
kl.Log.Infof("Trying to connect to %q at %q", kl.ServiceType, kl.ServiceAddress)
switch kl.ServiceType {
case "tunnel":
c, err := knx.NewGroupTunnel(kl.ServiceAddress, knx.DefaultTunnelConfig)
if err != nil {
return err
}
kl.client = &c
case "router":
c, err := knx.NewGroupRouter(kl.ServiceAddress, knx.DefaultRouterConfig)
if err != nil {
return err
}
kl.client = &c
case "dummy":
c, err := NewDummyInterface()
if err != nil {
return err
}
kl.client = &c
default:
return fmt.Errorf("invalid interface type: %s", kl.ServiceAddress)
}
kl.Log.Infof("Connected!")
// Listen to the KNX bus
kl.wg.Add(1)
go func() {
kl.wg.Done()
kl.listen()
}()
return nil
}
func (kl *KNXListener) Stop() {
if kl.client != nil {
kl.client.Close()
kl.wg.Wait()
}
}
func (kl *KNXListener) listen() {
for msg := range kl.client.Inbound() {
// Match GA to DataPointType and measurement name
ga := msg.Destination.String()
target, ok := kl.gaTargetMap[ga]
if !ok && !kl.gaLogbook[ga] {
kl.Log.Infof("Ignoring message %+v for unknown GA %q", msg, ga)
kl.gaLogbook[ga] = true
continue
}
// Extract the value from the data-frame
err := target.datapoint.Unpack(msg.Data)
if err != nil {
kl.Log.Errorf("Unpacking data failed: %v", err)
continue
}
kl.Log.Debugf("Matched GA %q to measurement %q with value %v", ga, target.measurement, target.datapoint)
// Convert the DatapointValue interface back to its basic type again
// as otherwise telegraf will not push out the metrics and eat it
// silently.
var value interface{}
vi := reflect.Indirect(reflect.ValueOf(target.datapoint))
switch vi.Kind() {
case reflect.Bool:
value = vi.Bool()
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
value = vi.Int()
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
value = vi.Uint()
case reflect.Float32, reflect.Float64:
value = vi.Float()
default:
kl.Log.Errorf("Type conversion %v failed for address %q", vi.Kind(), ga)
continue
}
// Compose the actual data to be pushed out
fields := map[string]interface{}{"value": value}
tags := map[string]string{
"groupaddress": ga,
"unit": target.datapoint.(dpt.DatapointMeta).Unit(),
"source": msg.Source.String(),
}
kl.acc.AddFields(target.measurement, fields, tags)
}
}
func init() {
inputs.Add("KNXListener", func() telegraf.Input { return &KNXListener{ServiceType: "tunnel"} })
}

View File

@ -0,0 +1,135 @@
package knx_listener
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vapourismo/knx-go/knx"
"github.com/vapourismo/knx-go/knx/cemi"
"github.com/vapourismo/knx-go/knx/dpt"
)
const epsilon = 1e-3
func setValue(data dpt.DatapointValue, value interface{}) error {
d := reflect.Indirect(reflect.ValueOf(data))
if !d.CanSet() {
return fmt.Errorf("cannot set datapoint %v", data)
}
switch v := value.(type) {
case bool:
d.SetBool(v)
case float64:
d.SetFloat(v)
case int64:
d.SetInt(v)
case uint64:
d.SetUint(v)
default:
return fmt.Errorf("unknown type '%T' when setting value for DPT", value)
}
return nil
}
func TestRegularReceives_DPT(t *testing.T) {
// Define the test-cases
var testcases = []struct {
address string
dpt string
value interface{}
}{
{"1/0/1", "1.001", true},
{"1/0/2", "1.002", false},
{"1/0/3", "1.003", true},
{"1/0/9", "1.009", false},
{"1/1/0", "1.010", true},
{"5/0/1", "5.001", 12.157},
{"5/0/3", "5.003", 121.412},
{"5/0/4", "5.004", uint64(25)},
{"9/0/1", "9.001", 18.56},
{"9/0/4", "9.004", 243.84},
{"9/0/5", "9.005", 12.01},
{"9/0/7", "9.007", 59.32},
{"13/0/1", "13.001", int64(-15)},
{"13/0/2", "13.002", int64(183)},
{"13/1/0", "13.010", int64(-141)},
{"13/1/1", "13.011", int64(277)},
{"13/1/2", "13.012", int64(-4096)},
{"13/1/3", "13.013", int64(8192)},
{"13/1/4", "13.014", int64(-65536)},
{"13/1/5", "13.015", int64(2147483647)},
{"14/0/0", "14.000", -1.31},
{"14/0/1", "14.001", 0.44},
{"14/0/2", "14.002", 32.08},
// {"14/0/3", "14.003", 92.69},
// {"14/0/4", "14.004", 1.00794},
{"14/1/0", "14.010", 5963.78},
{"14/1/1", "14.011", 150.95},
}
acc := &testutil.Accumulator{}
// Setup the unit-under-test
measurements := make([]Measurement, 0, len(testcases))
for _, testcase := range testcases {
measurements = append(measurements, Measurement{"test", testcase.dpt, []string{testcase.address}})
}
listener := KNXListener{
ServiceType: "dummy",
Measurements: measurements,
Log: testutil.Logger{Name: "knx_listener"},
}
// Setup the listener to test
err := listener.Start(acc)
require.NoError(t, err)
client := listener.client.(*KNXDummyInterface)
tstart := time.Now()
// Send the defined test data
for _, testcase := range testcases {
addr, err := cemi.NewGroupAddrString(testcase.address)
require.NoError(t, err)
data, ok := dpt.Produce(testcase.dpt)
require.True(t, ok)
err = setValue(data, testcase.value)
require.NoError(t, err)
client.Send(knx.GroupEvent{
Command: knx.GroupWrite,
Destination: addr,
Data: data.Pack(),
})
}
// Give the accumulator some time to collect the data
acc.Wait(len(testcases))
// Stop the listener
listener.Stop()
tstop := time.Now()
// Check if we got what we expected
require.Len(t, acc.Metrics, len(testcases))
for i, m := range acc.Metrics {
assert.Equal(t, "test", m.Measurement)
assert.Equal(t, testcases[i].address, m.Tags["groupaddress"])
assert.Len(t, m.Fields, 1)
switch v := testcases[i].value.(type) {
case bool, int64, uint64:
assert.Equal(t, v, m.Fields["value"])
case float64:
assert.InDelta(t, v, m.Fields["value"], epsilon)
}
assert.True(t, !tstop.Before(m.Time))
assert.True(t, !tstart.After(m.Time))
}
}