From 1eb47e245c0c22270aaf9a42938f5f5f6697a959 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Thu, 18 Mar 2021 15:39:44 +0100 Subject: [PATCH] Add input plugin for KNX home automation bus (#7048) --- README.md | 1 + docs/LICENSE_OF_DEPENDENCIES.md | 1 + go.mod | 1 + go.sum | 2 + plugins/inputs/all/all.go | 1 + plugins/inputs/knx_listener/README.md | 66 ++++++ .../knx_listener/knx_dummy_interface.go | 28 +++ plugins/inputs/knx_listener/knx_listener.go | 197 ++++++++++++++++++ .../inputs/knx_listener/knx_listener_test.go | 135 ++++++++++++ 9 files changed, 432 insertions(+) create mode 100644 plugins/inputs/knx_listener/README.md create mode 100644 plugins/inputs/knx_listener/knx_dummy_interface.go create mode 100644 plugins/inputs/knx_listener/knx_listener.go create mode 100644 plugins/inputs/knx_listener/knx_listener_test.go diff --git a/README.md b/README.md index 5535b9527..45bdc43ba 100644 --- a/README.md +++ b/README.md @@ -232,6 +232,7 @@ For documentation on the latest development code see the [documentation index][d * [kernel](./plugins/inputs/kernel) * [kernel_vmstat](./plugins/inputs/kernel_vmstat) * [kibana](./plugins/inputs/kibana) +* [knx_listener](./plugins/inputs/knx_listener) * [kubernetes](./plugins/inputs/kubernetes) * [kube_inventory](./plugins/inputs/kube_inventory) * [lanz](./plugins/inputs/lanz) diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 0fafa339f..4ca2e10c5 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -169,6 +169,7 @@ following works: - github.com/tidwall/match [MIT License](https://github.com/tidwall/match/blob/master/LICENSE) - github.com/tidwall/pretty [MIT License](https://github.com/tidwall/pretty/blob/master/LICENSE) - github.com/tinylib/msgp [MIT License](https://github.com/tinylib/msgp/blob/master/LICENSE) +- github.com/vapourismo/knx-go [MIT License](https://github.com/vapourismo/knx-go/blob/master/LICENSE) - github.com/vishvananda/netlink [Apache License 2.0](https://github.com/vishvananda/netlink/blob/master/LICENSE) - github.com/vishvananda/netns [Apache License 2.0](https://github.com/vishvananda/netns/blob/master/LICENSE) - github.com/vjeantet/grok [Apache License 2.0](https://github.com/vjeantet/grok/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 1754c0fe7..1dfe1d9a2 100644 --- a/go.mod +++ b/go.mod @@ -131,6 +131,7 @@ require ( github.com/tbrandon/mbserver v0.0.0-20170611213546-993e1772cc62 github.com/tidwall/gjson v1.6.0 github.com/tinylib/msgp v1.1.5 + github.com/vapourismo/knx-go v0.0.0-20201122213738-75fe09ace330 github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e // indirect github.com/vishvananda/netns v0.0.0-20180720170159-13995c7128cc // indirect github.com/vjeantet/grok v1.0.1 diff --git a/go.sum b/go.sum index 93c77eda1..26aea881c 100644 --- a/go.sum +++ b/go.sum @@ -711,6 +711,8 @@ github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= +github.com/vapourismo/knx-go v0.0.0-20201122213738-75fe09ace330 h1:iBlTJosRsR70amr0zsmSPvaKNH8K/p3YlX/5SdPmSl8= +github.com/vapourismo/knx-go v0.0.0-20201122213738-75fe09ace330/go.mod h1:7+aWBsUJCo9OQRCgTypRmIQW9KKKcPMjtrdnYIBsS70= github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec/go.mod h1:owBmyHYMLkxyrugmfwE/DLJyW8Ro9mkphwuVErQ0iUw= github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e h1:f1yevOHP+Suqk0rVc13fIkzcLULJbyQcXDba2klljD0= github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 9b22cd442..5f7e81648 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -87,6 +87,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/kernel_vmstat" _ "github.com/influxdata/telegraf/plugins/inputs/kibana" _ "github.com/influxdata/telegraf/plugins/inputs/kinesis_consumer" + _ "github.com/influxdata/telegraf/plugins/inputs/knx_listener" _ "github.com/influxdata/telegraf/plugins/inputs/kube_inventory" _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" _ "github.com/influxdata/telegraf/plugins/inputs/lanz" diff --git a/plugins/inputs/knx_listener/README.md b/plugins/inputs/knx_listener/README.md new file mode 100644 index 000000000..de015ddc2 --- /dev/null +++ b/plugins/inputs/knx_listener/README.md @@ -0,0 +1,66 @@ +# KNX input plugin + +The KNX input plugin that listens for messages on the KNX home-automation bus. +This plugin connects to the KNX bus via a KNX-IP interface. +Information about supported KNX message datapoint types can be found at the +underlying "knx-go" project site (https://github.com/vapourismo/knx-go). + +### Configuration + +This is a sample config for the plugin. + +```toml +# Listener capable of handling KNX bus messages provided through a KNX-IP Interface. +[[inputs.KNXListener]] + ## Type of KNX-IP interface. + ## Can be either "tunnel" or "router". + # service_type = "tunnel" + + ## Address of the KNX-IP interface. + service_address = "localhost:3671" + + ## Measurement definition(s) + # [[inputs.KNXListener.measurement]] + # ## Name of the measurement + # name = "temperature" + # ## Datapoint-Type (DPT) of the KNX messages + # dpt = "9.001" + # ## List of Group-Addresses (GAs) assigned to the measurement + # addresses = ["5/5/1"] + + # [[inputs.KNXListener.measurement]] + # name = "illumination" + # dpt = "9.004" + # addresses = ["5/5/3"] +``` + +#### Measurement configurations + +Each measurement contains only one datapoint-type (DPT) and assigns a list of +addresses to this measurement. You can, for example group all temperature sensor +messages within a "temperature" measurement. However, you are free to split +messages of one datapoint-type to multiple measurements. + +**NOTE: You should not assign a group-address (GA) to multiple measurements!** + +### Metrics + +Received KNX data is stored in the named measurement as configured above using +the "value" field. Additional to the value, there are the following tags added +to the datapoint: + - "groupaddress": KNX group-address corresponding to the value + - "unit": unit of the value + - "source": KNX physical address sending the value + +To find out about the datatype of the datapoint please check your KNX project, +the KNX-specification or the "knx-go" project for the corresponding DPT. + +### Example Output + +This section shows example output in Line Protocol format. + +``` +illumination,groupaddress=5/5/4,host=Hugin,source=1.1.12,unit=lux value=17.889999389648438 1582132674999013274 +temperature,groupaddress=5/5/1,host=Hugin,source=1.1.8,unit=°C value=17.799999237060547 1582132663427587361 +windowopen,groupaddress=1/0/1,host=Hugin,source=1.1.3 value=true 1582132630425581320 +``` diff --git a/plugins/inputs/knx_listener/knx_dummy_interface.go b/plugins/inputs/knx_listener/knx_dummy_interface.go new file mode 100644 index 000000000..1f897c4d9 --- /dev/null +++ b/plugins/inputs/knx_listener/knx_dummy_interface.go @@ -0,0 +1,28 @@ +package knx_listener + +import ( + "github.com/vapourismo/knx-go/knx" +) + +type KNXDummyInterface struct { + inbound chan knx.GroupEvent +} + +func NewDummyInterface() (di KNXDummyInterface, err error) { + di, err = KNXDummyInterface{}, nil + di.inbound = make(chan knx.GroupEvent) + + return di, err +} + +func (di *KNXDummyInterface) Send(event knx.GroupEvent) { + di.inbound <- event +} + +func (di *KNXDummyInterface) Inbound() <-chan knx.GroupEvent { + return di.inbound +} + +func (di *KNXDummyInterface) Close() { + close(di.inbound) +} diff --git a/plugins/inputs/knx_listener/knx_listener.go b/plugins/inputs/knx_listener/knx_listener.go new file mode 100644 index 000000000..3bb93fbb2 --- /dev/null +++ b/plugins/inputs/knx_listener/knx_listener.go @@ -0,0 +1,197 @@ +package knx_listener + +import ( + "fmt" + "reflect" + "sync" + + "github.com/vapourismo/knx-go/knx" + "github.com/vapourismo/knx-go/knx/dpt" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type KNXInterface interface { + Inbound() <-chan knx.GroupEvent + Close() +} + +type addressTarget struct { + measurement string + datapoint dpt.DatapointValue +} + +type Measurement struct { + Name string + Dpt string + Addresses []string +} + +type KNXListener struct { + ServiceType string `toml:"service_type"` + ServiceAddress string `toml:"service_address"` + Measurements []Measurement `toml:"measurement"` + Log telegraf.Logger `toml:"-"` + + client KNXInterface + gaTargetMap map[string]addressTarget + gaLogbook map[string]bool + + acc telegraf.Accumulator + wg sync.WaitGroup +} + +func (kl *KNXListener) Description() string { + return "Listener capable of handling KNX bus messages provided through a KNX-IP Interface." +} + +func (kl *KNXListener) SampleConfig() string { + return ` + ## Type of KNX-IP interface. + ## Can be either "tunnel" or "router". + # service_type = "tunnel" + + ## Address of the KNX-IP interface. + service_address = "localhost:3671" + + ## Measurement definition(s) + # [[inputs.KNXListener.measurement]] + # ## Name of the measurement + # name = "temperature" + # ## Datapoint-Type (DPT) of the KNX messages + # dpt = "9.001" + # ## List of Group-Addresses (GAs) assigned to the measurement + # addresses = ["5/5/1"] + + # [[inputs.KNXListener.measurement]] + # name = "illumination" + # dpt = "9.004" + # addresses = ["5/5/3"] +` +} + +func (kl *KNXListener) Gather(_ telegraf.Accumulator) error { + return nil +} + +func (kl *KNXListener) Start(acc telegraf.Accumulator) error { + // Store the accumulator for later use + kl.acc = acc + + // Setup a logbook to track unknown GAs to avoid log-spamming + kl.gaLogbook = make(map[string]bool) + + // Construct the mapping of Group-addresses (GAs) to DPTs and the name + // of the measurement + kl.gaTargetMap = make(map[string]addressTarget) + for _, m := range kl.Measurements { + kl.Log.Debugf("Group-address mapping for measurement %q:", m.Name) + for _, ga := range m.Addresses { + kl.Log.Debugf(" %s --> %s", ga, m.Dpt) + if _, ok := kl.gaTargetMap[ga]; ok { + return fmt.Errorf("duplicate specification of address %q", ga) + } + d, ok := dpt.Produce(m.Dpt) + if !ok { + return fmt.Errorf("cannot create datapoint-type %q for address %q", m.Dpt, ga) + } + kl.gaTargetMap[ga] = addressTarget{m.Name, d} + } + } + + // Connect to the KNX-IP interface + kl.Log.Infof("Trying to connect to %q at %q", kl.ServiceType, kl.ServiceAddress) + switch kl.ServiceType { + case "tunnel": + c, err := knx.NewGroupTunnel(kl.ServiceAddress, knx.DefaultTunnelConfig) + if err != nil { + return err + } + kl.client = &c + case "router": + c, err := knx.NewGroupRouter(kl.ServiceAddress, knx.DefaultRouterConfig) + if err != nil { + return err + } + kl.client = &c + case "dummy": + c, err := NewDummyInterface() + if err != nil { + return err + } + kl.client = &c + default: + return fmt.Errorf("invalid interface type: %s", kl.ServiceAddress) + } + kl.Log.Infof("Connected!") + + // Listen to the KNX bus + kl.wg.Add(1) + go func() { + kl.wg.Done() + kl.listen() + }() + + return nil +} + +func (kl *KNXListener) Stop() { + if kl.client != nil { + kl.client.Close() + kl.wg.Wait() + } +} + +func (kl *KNXListener) listen() { + for msg := range kl.client.Inbound() { + // Match GA to DataPointType and measurement name + ga := msg.Destination.String() + target, ok := kl.gaTargetMap[ga] + if !ok && !kl.gaLogbook[ga] { + kl.Log.Infof("Ignoring message %+v for unknown GA %q", msg, ga) + kl.gaLogbook[ga] = true + continue + } + + // Extract the value from the data-frame + err := target.datapoint.Unpack(msg.Data) + if err != nil { + kl.Log.Errorf("Unpacking data failed: %v", err) + continue + } + kl.Log.Debugf("Matched GA %q to measurement %q with value %v", ga, target.measurement, target.datapoint) + + // Convert the DatapointValue interface back to its basic type again + // as otherwise telegraf will not push out the metrics and eat it + // silently. + var value interface{} + vi := reflect.Indirect(reflect.ValueOf(target.datapoint)) + switch vi.Kind() { + case reflect.Bool: + value = vi.Bool() + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + value = vi.Int() + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + value = vi.Uint() + case reflect.Float32, reflect.Float64: + value = vi.Float() + default: + kl.Log.Errorf("Type conversion %v failed for address %q", vi.Kind(), ga) + continue + } + + // Compose the actual data to be pushed out + fields := map[string]interface{}{"value": value} + tags := map[string]string{ + "groupaddress": ga, + "unit": target.datapoint.(dpt.DatapointMeta).Unit(), + "source": msg.Source.String(), + } + kl.acc.AddFields(target.measurement, fields, tags) + } +} + +func init() { + inputs.Add("KNXListener", func() telegraf.Input { return &KNXListener{ServiceType: "tunnel"} }) +} diff --git a/plugins/inputs/knx_listener/knx_listener_test.go b/plugins/inputs/knx_listener/knx_listener_test.go new file mode 100644 index 000000000..973605886 --- /dev/null +++ b/plugins/inputs/knx_listener/knx_listener_test.go @@ -0,0 +1,135 @@ +package knx_listener + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/vapourismo/knx-go/knx" + "github.com/vapourismo/knx-go/knx/cemi" + "github.com/vapourismo/knx-go/knx/dpt" +) + +const epsilon = 1e-3 + +func setValue(data dpt.DatapointValue, value interface{}) error { + d := reflect.Indirect(reflect.ValueOf(data)) + if !d.CanSet() { + return fmt.Errorf("cannot set datapoint %v", data) + } + switch v := value.(type) { + case bool: + d.SetBool(v) + case float64: + d.SetFloat(v) + case int64: + d.SetInt(v) + case uint64: + d.SetUint(v) + default: + return fmt.Errorf("unknown type '%T' when setting value for DPT", value) + } + return nil +} + +func TestRegularReceives_DPT(t *testing.T) { + // Define the test-cases + var testcases = []struct { + address string + dpt string + value interface{} + }{ + {"1/0/1", "1.001", true}, + {"1/0/2", "1.002", false}, + {"1/0/3", "1.003", true}, + {"1/0/9", "1.009", false}, + {"1/1/0", "1.010", true}, + {"5/0/1", "5.001", 12.157}, + {"5/0/3", "5.003", 121.412}, + {"5/0/4", "5.004", uint64(25)}, + {"9/0/1", "9.001", 18.56}, + {"9/0/4", "9.004", 243.84}, + {"9/0/5", "9.005", 12.01}, + {"9/0/7", "9.007", 59.32}, + {"13/0/1", "13.001", int64(-15)}, + {"13/0/2", "13.002", int64(183)}, + {"13/1/0", "13.010", int64(-141)}, + {"13/1/1", "13.011", int64(277)}, + {"13/1/2", "13.012", int64(-4096)}, + {"13/1/3", "13.013", int64(8192)}, + {"13/1/4", "13.014", int64(-65536)}, + {"13/1/5", "13.015", int64(2147483647)}, + {"14/0/0", "14.000", -1.31}, + {"14/0/1", "14.001", 0.44}, + {"14/0/2", "14.002", 32.08}, + // {"14/0/3", "14.003", 92.69}, + // {"14/0/4", "14.004", 1.00794}, + {"14/1/0", "14.010", 5963.78}, + {"14/1/1", "14.011", 150.95}, + } + acc := &testutil.Accumulator{} + + // Setup the unit-under-test + measurements := make([]Measurement, 0, len(testcases)) + for _, testcase := range testcases { + measurements = append(measurements, Measurement{"test", testcase.dpt, []string{testcase.address}}) + } + listener := KNXListener{ + ServiceType: "dummy", + Measurements: measurements, + Log: testutil.Logger{Name: "knx_listener"}, + } + + // Setup the listener to test + err := listener.Start(acc) + require.NoError(t, err) + client := listener.client.(*KNXDummyInterface) + + tstart := time.Now() + + // Send the defined test data + for _, testcase := range testcases { + addr, err := cemi.NewGroupAddrString(testcase.address) + require.NoError(t, err) + + data, ok := dpt.Produce(testcase.dpt) + require.True(t, ok) + err = setValue(data, testcase.value) + require.NoError(t, err) + + client.Send(knx.GroupEvent{ + Command: knx.GroupWrite, + Destination: addr, + Data: data.Pack(), + }) + } + + // Give the accumulator some time to collect the data + acc.Wait(len(testcases)) + + // Stop the listener + listener.Stop() + tstop := time.Now() + + // Check if we got what we expected + require.Len(t, acc.Metrics, len(testcases)) + for i, m := range acc.Metrics { + assert.Equal(t, "test", m.Measurement) + assert.Equal(t, testcases[i].address, m.Tags["groupaddress"]) + assert.Len(t, m.Fields, 1) + switch v := testcases[i].value.(type) { + case bool, int64, uint64: + assert.Equal(t, v, m.Fields["value"]) + case float64: + assert.InDelta(t, v, m.Fields["value"], epsilon) + } + assert.True(t, !tstop.Before(m.Time)) + assert.True(t, !tstart.After(m.Time)) + } +}