From 8b032b73eec60a54196d08a2ed74f6d3dc62e450 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Wed, 9 Aug 2023 21:56:47 +0200 Subject: [PATCH] feat(inputs): Add new S7comm plugin (#13731) --- docs/LICENSE_OF_DEPENDENCIES.md | 1 + go.mod | 3 + go.sum | 2 + plugins/inputs/all/s7comm.go | 5 + plugins/inputs/s7comm/README.md | 81 +++ plugins/inputs/s7comm/s7comm.go | 410 +++++++++++++ plugins/inputs/s7comm/s7comm_test.go | 700 ++++++++++++++++++++++ plugins/inputs/s7comm/sample.conf | 53 ++ plugins/inputs/s7comm/type_conversions.go | 67 +++ 9 files changed, 1322 insertions(+) create mode 100644 plugins/inputs/all/s7comm.go create mode 100644 plugins/inputs/s7comm/README.md create mode 100644 plugins/inputs/s7comm/s7comm.go create mode 100644 plugins/inputs/s7comm/s7comm_test.go create mode 100644 plugins/inputs/s7comm/sample.conf create mode 100644 plugins/inputs/s7comm/type_conversions.go diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index fdec7e5cf..6c7017533 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -298,6 +298,7 @@ following works: - github.com/remyoudompheng/bigfft [BSD 3-Clause "New" or "Revised" License](https://github.com/remyoudompheng/bigfft/blob/master/LICENSE) - github.com/riemann/riemann-go-client [MIT License](https://github.com/riemann/riemann-go-client/blob/master/LICENSE) - github.com/robbiet480/go.nut [MIT License](https://github.com/robbiet480/go.nut/blob/master/LICENSE) +- github.com/robinson/gos7 [BSD 3-Clause "New" or "Revised" License](https://github.com/robinson/gos7/blob/master/LICENSE) - github.com/russross/blackfriday [BSD 2-Clause "Simplified" License](https://github.com/russross/blackfriday/blob/master/LICENSE.txt) - github.com/safchain/ethtool [Apache License 2.0](https://github.com/safchain/ethtool/blob/master/LICENSE) - github.com/samber/lo [MIT License](https://github.com/samber/lo/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 26c306dea..ad69f259b 100644 --- a/go.mod +++ b/go.mod @@ -156,6 +156,7 @@ require ( github.com/rabbitmq/amqp091-go v1.8.1 github.com/riemann/riemann-go-client v0.5.1-0.20211206220514-f58f10cdce16 github.com/robbiet480/go.nut v0.0.0-20220219091450-bd8f121e1fa1 + github.com/robinson/gos7 v0.0.0-20230421131203-d20ac6ca08cd github.com/safchain/ethtool v0.3.0 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/sensu/sensu-go/api/core/v2 v2.16.0 @@ -491,3 +492,5 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) + +replace github.com/robinson/gos7 => github.com/srebhan/gos7 v0.0.0-20230807171120-77ee3120c4eb diff --git a/go.sum b/go.sum index 34f294576..b9711de14 100644 --- a/go.sum +++ b/go.sum @@ -1419,6 +1419,8 @@ github.com/srebhan/cborquery v0.0.0-20230626165538-38be85b82316 h1:HVv8JjpX24FuI github.com/srebhan/cborquery v0.0.0-20230626165538-38be85b82316/go.mod h1:9vX3Dhehey14KFYwWo4K/4JOJRve6jvQf6R9Y8PymLI= github.com/srebhan/protobufquery v0.0.0-20230803132024-ae4c0d878e55 h1:ksmbrLbJAm+8yxB7fJ245usD0b1v9JHBJrWF+WqGyjs= github.com/srebhan/protobufquery v0.0.0-20230803132024-ae4c0d878e55/go.mod h1:SIB3zq5pZq2Ff7aJtCdRpGiHc/meKyMLPEj8F5Tf1j8= +github.com/srebhan/gos7 v0.0.0-20230807171120-77ee3120c4eb h1:WUK18HBPrJVKkJp9VpvNZFE2bLmC+Mf0zMoMDcD1vFw= +github.com/srebhan/gos7 v0.0.0-20230807171120-77ee3120c4eb/go.mod h1:+96COZeFGpQFstWJ0eKQF+F6/z8gWVpBNGx/K3gD+QU= github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/plugins/inputs/all/s7comm.go b/plugins/inputs/all/s7comm.go new file mode 100644 index 000000000..ca5030b9a --- /dev/null +++ b/plugins/inputs/all/s7comm.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.s7comm + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/s7comm" // register plugin diff --git a/plugins/inputs/s7comm/README.md b/plugins/inputs/s7comm/README.md new file mode 100644 index 000000000..2f6f1893a --- /dev/null +++ b/plugins/inputs/s7comm/README.md @@ -0,0 +1,81 @@ +# Siemens S7 Input Plugin + +This plugin reads metrics from Siemens PLCs via the S7 protocol. + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +# Plugin for retrieving data from Siemens PLCs via the S7 protocol (RFC1006) +[[inputs.s7comm]] + ## Parameters to contact the PLC (mandatory) + ## The server is in the [:port] format where the port defaults to 102 + ## if not explicitly specified. + server = "127.0.0.1:102" + rack = 0 + slot = 0 + + ## Timeout for requests + # timeout = "10s" + + ## Log detailed connection messages for debugging + ## This option only has an effect when Telegraf runs in debug mode + # debug_connection = false + + ## Metric definition(s) + [[inputs.s7comm.metric]] + ## Name of the measurement + # name = "s7comm" + + ## Field definitions + ## name - field name + ## address - indirect address ".
[.extra]" + ## area - e.g. be "DB1" for data-block one + ## type - supported types are (uppercase) + ## X -- bit, requires the bit-number as 'extra' + ## parameter + ## B -- byte (8 bit) + ## C -- character (8 bit) + ## W -- word (16 bit) + ## DW -- double word (32 bit) + ## I -- integer (16 bit) + ## DI -- double integer (32 bit) + ## R -- IEEE 754 real floating point number (32 bit) + ## DT -- date-time, always converted to unix timestamp + ## with nano-second precision + ## S -- string, requires the maximum length of the + ## string as 'extra' parameter + ## address - start address to read if not specified otherwise + ## in the type field + ## extra - extra parameter e.g. for the bit and string type + fields = [ + { name="rpm", address="DB1.R4" }, + { name="status_ok", address="DB1.X2.1" }, + { name="last_error", address="DB2.S1.32" }, + { name="last_error_time", address="DB2.DT2" } + ] + + ## Tags assigned to the metric + # [inputs.s7comm.metric.tags] + # device = "compressor" + # location = "main building" +``` + +## Example Output + +```text +s7comm,host=Hugin rpm=712i,status_ok=true,last_error="empty slot",last_error_time=1611319681000000000i 1611332164000000000 +``` + +## Metrics + +The format of metrics produced by this plugin depends on the metric +configuration(s). diff --git a/plugins/inputs/s7comm/s7comm.go b/plugins/inputs/s7comm/s7comm.go new file mode 100644 index 000000000..93a601a8b --- /dev/null +++ b/plugins/inputs/s7comm/s7comm.go @@ -0,0 +1,410 @@ +//go:generate ../../../tools/readme_config_includer/generator +package s7comm + +import ( + _ "embed" + "errors" + "fmt" + "hash/maphash" + "log" //nolint:depguard // Required for tracing connection issues + "net" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/robinson/gos7" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/inputs" +) + +//go:embed sample.conf +var sampleConfig string + +const maxRequestsPerBatch = 20 +const addressRegexp = `^(?P[A-Z]+)(?P[0-9]+)\.(?P[A-Z]+)(?P[0-9]+)(?:\.(?P.*))?$` + +var ( + regexAddr = regexp.MustCompile(addressRegexp) + // Area mapping taken from https://github.com/robinson/gos7/blob/master/client.go + areaMap = map[string]int{ + "PE": 0x81, // process inputs + "PA": 0x82, // process outputs + "MK": 0x83, // Merkers + "DB": 0x84, // DB + "C": 0x1C, // counters + "T": 0x1D, // timers + } + // Word-length mapping taken from https://github.com/robinson/gos7/blob/master/client.go + wordLenMap = map[string]int{ + "X": 0x01, // Bit + "B": 0x02, // Byte (8 bit) + "C": 0x03, // Char (8 bit) + "S": 0x03, // String (8 bit) + "W": 0x04, // Word (16 bit) + "I": 0x05, // Integer (16 bit) + "DW": 0x06, // Double Word (32 bit) + "DI": 0x07, // Double integer (32 bit) + "R": 0x08, // IEEE 754 real (32 bit) + // see https://support.industry.siemens.com/cs/document/36479/date_and_time-format-for-s7-?dti=0&lc=en-DE + "DT": 0x0F, // Date and time (7 byte) + } +) + +type metricFieldDefinition struct { + Name string `toml:"name"` + Address string `toml:"address"` +} + +type metricDefinition struct { + Name string `toml:"name"` + Fields []metricFieldDefinition `toml:"fields"` + Tags map[string]string `toml:"tags"` +} + +type converterFunc func([]byte) interface{} + +type batch struct { + items []gos7.S7DataItem + mappings []fieldMapping +} + +type fieldMapping struct { + measurement string + field string + tags map[string]string + convert converterFunc +} + +// S7comm represents the plugin +type S7comm struct { + Server string `toml:"server"` + Rack int `toml:"rack"` + Slot int `toml:"slot"` + Timeout config.Duration `toml:"timeout"` + DebugConnection bool `toml:"debug_connection"` + Configs []metricDefinition `toml:"metric"` + Log telegraf.Logger `toml:"-"` + + handler *gos7.TCPClientHandler + client gos7.Client + batches []batch +} + +// SampleConfig returns a basic configuration for the plugin +func (*S7comm) SampleConfig() string { + return sampleConfig +} + +// Init checks the config settings and prepares the plugin. It's called +// once by the Telegraf agent after parsing the config settings. +func (s *S7comm) Init() error { + // Check settings + if s.Server == "" { + return errors.New("'server' has to be specified") + } + if s.Rack < 0 { + return errors.New("'rack' has to be specified") + } + if s.Slot < 0 { + return errors.New("'slot' has to be specified") + } + if len(s.Configs) == 0 { + return errors.New("no metric defined") + } + + // Set default port to 102 if none is given + var nerr *net.AddrError + if _, _, err := net.SplitHostPort(s.Server); errors.As(err, &nerr) { + if !strings.Contains(nerr.Err, "missing port") { + return errors.New("invalid 'server' address") + } + s.Server += ":102" + } + + // Create the requests + return s.createRequests() +} + +// Start initializes the connection to the remote endpoint +func (s *S7comm) Start(_ telegraf.Accumulator) error { + // Create handler for the connection + s.handler = gos7.NewTCPClientHandler(s.Server, s.Rack, s.Slot) + s.handler.Timeout = time.Duration(s.Timeout) + if s.DebugConnection { + s.handler.Logger = log.New(os.Stderr, "D! [inputs.s7comm]", log.LstdFlags) + } + if err := s.handler.Connect(); err != nil { + return fmt.Errorf("connecting to %q failed: %w", s.Server, err) + } + s.client = gos7.NewClient(s.handler) + + return nil +} + +// Stop disconnects from the remote endpoint and cleans up +func (s *S7comm) Stop() { + if s.handler != nil { + s.handler.Close() + } +} + +// Gather collects the data from the device +func (s *S7comm) Gather(acc telegraf.Accumulator) error { + timestamp := time.Now() + grouper := metric.NewSeriesGrouper() + + for i, b := range s.batches { + // Read the batch + s.Log.Debugf("Reading batch %d...", i+1) + if err := s.client.AGReadMulti(b.items, len(b.items)); err != nil { + return fmt.Errorf("reading batch %d failed: %w", i+1, err) + } + + // Dissect the received data into fields + for j, m := range b.mappings { + // Convert the data + buf := b.items[j].Data + value := m.convert(buf) + s.Log.Debugf(" got %v for field %q @ %d --> %v (%T)", buf, m.field, b.items[j].Start, value, value) + + // Group the data by series + grouper.Add(m.measurement, m.tags, timestamp, m.field, value) + } + } + + // Add the metrics grouped by series to the accumulator + for _, x := range grouper.Metrics() { + acc.AddMetric(x) + } + + return nil +} + +// Internal functions +func (s *S7comm) createRequests() error { + seed := maphash.MakeSeed() + seenFields := make(map[uint64]bool) + s.batches = make([]batch, 0) + + current := batch{} + for i, cfg := range s.Configs { + // Set the defaults + if cfg.Name == "" { + cfg.Name = "s7comm" + } + + // Check the metric definitions + if len(cfg.Fields) == 0 { + return fmt.Errorf("no fields defined for metric %q", cfg.Name) + } + + // Create requests for all fields and add it to the current slot + for _, f := range cfg.Fields { + if f.Name == "" { + return fmt.Errorf("unnamed field in metric %q", cfg.Name) + } + + item, cfunc, err := handleFieldAddress(f.Address) + if err != nil { + return fmt.Errorf("field %q of metric %q: %w", f.Name, cfg.Name, err) + } + m := fieldMapping{ + measurement: cfg.Name, + field: f.Name, + tags: s.Configs[i].Tags, + convert: cfunc, + } + current.items = append(current.items, *item) + current.mappings = append(current.mappings, m) + + // If the batch is full, start a new one + if len(current.items) == maxRequestsPerBatch { + s.batches = append(s.batches, current) + current = batch{} + } + + // Check for duplicate field definitions + id, err := fieldID(seed, cfg, f) + if err != nil { + return fmt.Errorf("cannot determine field id for %q: %w", f.Name, err) + } + if seenFields[id] { + return fmt.Errorf("duplicate field definition field %q in metric %q", f.Name, cfg.Name) + } + seenFields[id] = true + } + + // Update the configuration if changed + s.Configs[i] = cfg + } + + // Add the last batch if any + if len(current.items) > 0 { + s.batches = append(s.batches, current) + } + + return nil +} + +func handleFieldAddress(address string) (*gos7.S7DataItem, converterFunc, error) { + // Parse the address into the different parts + if !regexAddr.MatchString(address) { + return nil, nil, fmt.Errorf("invalid address %q", address) + } + names := regexAddr.SubexpNames()[1:] + parts := regexAddr.FindStringSubmatch(address)[1:] + if len(names) != len(parts) { + return nil, nil, fmt.Errorf("names %v do not match parts %v", names, parts) + } + groups := make(map[string]string, len(names)) + for i, n := range names { + groups[n] = parts[i] + } + + // Check that we do have the required entries in the address + if _, found := groups["area"]; !found { + return nil, nil, errors.New("area is missing from address") + } + + if _, found := groups["no"]; !found { + return nil, nil, errors.New("area index is missing from address") + } + if _, found := groups["type"]; !found { + return nil, nil, errors.New("type is missing from address") + } + if _, found := groups["start"]; !found { + return nil, nil, errors.New("start address is missing from address") + } + dtype := groups["type"] + + // Lookup the item values from names and check the params + area, found := areaMap[groups["area"]] + if !found { + return nil, nil, errors.New("invalid area") + } + wordlen, found := wordLenMap[dtype] + if !found { + return nil, nil, errors.New("unknown data type") + } + areaidx, err := strconv.Atoi(groups["no"]) + if err != nil { + return nil, nil, fmt.Errorf("invalid area index: %w", err) + } + start, err := strconv.Atoi(groups["start"]) + if err != nil { + return nil, nil, fmt.Errorf("invalid start address: %w", err) + } + + // Check the amount parameter if any + var extra int + switch dtype { + case "X", "S": + // We require an extra parameter + x := groups["extra"] + if x == "" { + return nil, nil, errors.New("extra parameter required") + } + + extra, err = strconv.Atoi(x) + if err != nil { + return nil, nil, fmt.Errorf("invalid extra parameter: %w", err) + } + if extra < 1 { + return nil, nil, fmt.Errorf("invalid extra parameter %d", extra) + } + default: + if groups["extra"] != "" { + return nil, nil, errors.New("extra parameter specified but not used") + } + } + + // Get the required buffer size + amount := 1 + var buflen int + switch dtype { + case "X", "B", "C": // 8-bit types + buflen = 1 + case "W", "I": // 16-bit types + buflen = 2 + case "DW", "DI", "R": // 32-bit types + buflen = 4 + case "DT": // 7-byte + buflen = 7 + case "S": + amount = extra + // Extra bytes as the first byte is the max-length of the string and + // the second byte is the actual length of the string. + buflen = extra + 2 + default: + return nil, nil, errors.New("invalid data type") + } + + // Setup the data item + item := &gos7.S7DataItem{ + Area: area, + WordLen: wordlen, + DBNumber: areaidx, + Start: start, + Amount: amount, + Data: make([]byte, buflen), + } + + // Determine the type converter function + f := determineConversion(dtype, extra) + return item, f, nil +} + +func fieldID(seed maphash.Seed, def metricDefinition, field metricFieldDefinition) (uint64, error) { + var mh maphash.Hash + mh.SetSeed(seed) + + if _, err := mh.WriteString(def.Name); err != nil { + return 0, err + } + if err := mh.WriteByte(0); err != nil { + return 0, err + } + if _, err := mh.WriteString(field.Name); err != nil { + return 0, err + } + if err := mh.WriteByte(0); err != nil { + return 0, err + } + + // Tags + for k, v := range def.Tags { + if _, err := mh.WriteString(k); err != nil { + return 0, err + } + if err := mh.WriteByte('='); err != nil { + return 0, err + } + if _, err := mh.WriteString(v); err != nil { + return 0, err + } + if err := mh.WriteByte(':'); err != nil { + return 0, err + } + } + if err := mh.WriteByte(0); err != nil { + return 0, err + } + + return mh.Sum64(), nil +} + +// Add this plugin to telegraf +func init() { + inputs.Add("s7comm", func() telegraf.Input { + return &S7comm{ + Rack: -1, + Slot: -1, + Timeout: config.Duration(10 * time.Second), + } + }) +} diff --git a/plugins/inputs/s7comm/s7comm_test.go b/plugins/inputs/s7comm/s7comm_test.go new file mode 100644 index 000000000..e390c849b --- /dev/null +++ b/plugins/inputs/s7comm/s7comm_test.go @@ -0,0 +1,700 @@ +package s7comm + +import ( + _ "embed" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/robinson/gos7" + "github.com/stretchr/testify/require" +) + +func TestSampleConfig(t *testing.T) { + plugin := &S7comm{} + require.NotEmpty(t, plugin.SampleConfig()) +} + +func TestInitFail(t *testing.T) { + tests := []struct { + name string + server string + rack int + slot int + configs []metricDefinition + expectedError string + }{ + { + name: "empty settings", + rack: -1, // This is the default in `init()` + slot: -1, // This is the default in `init()` + expectedError: "'server' has to be specified", + }, + { + name: "missing rack", + server: "127.0.0.1:102", + rack: -1, // This is the default in `init()` + slot: -1, // This is the default in `init()` + expectedError: "'rack' has to be specified", + }, + { + name: "missing slot", + server: "127.0.0.1:102", + rack: 0, + slot: -1, // This is the default in `init()` + expectedError: "'slot' has to be specified", + }, + { + name: "missing configs", + server: "127.0.0.1:102", + expectedError: "no metric defined", + }, + { + name: "single empty metric", + server: "127.0.0.1:102", + configs: []metricDefinition{{}}, + expectedError: "no fields defined for metric", + }, + { + name: "single empty metric field", + server: "127.0.0.1:102", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{{}}, + }, + }, + expectedError: "unnamed field in metric", + }, + { + name: "no address", + server: "127.0.0.1:102", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + }, + }, + }, + }, + expectedError: "invalid address", + }, + { + name: "invalid address pattern", + server: "127.0.0.1:102", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "FOO", + }, + }, + }, + }, + expectedError: "invalid address", + }, + { + name: "invalid address area", + server: "127.0.0.1:102", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "FOO1.W2", + }, + }, + }, + }, + expectedError: "invalid area", + }, + { + name: "invalid address area index", + server: "127.0.0.1:102", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB.W2", + }, + }, + }, + }, + expectedError: "invalid address", + }, + { + name: "invalid address type", + server: "127.0.0.1:102", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.A2", + }, + }, + }, + }, + expectedError: "unknown data type", + }, + { + name: "invalid address start", + server: "127.0.0.1:102", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.A", + }, + }, + }, + }, + expectedError: "invalid address", + }, + { + name: "missing extra parameter bit", + server: "127.0.0.1:102", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.X1", + }, + }, + }, + }, + expectedError: "extra parameter required", + }, + { + name: "missing extra parameter string", + server: "127.0.0.1:102", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.S1", + }, + }, + }, + }, + expectedError: "extra parameter required", + }, + { + name: "invalid address extra parameter", + server: "127.0.0.1:102", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.W1.23", + }, + }, + }, + }, + expectedError: "extra parameter specified but not used", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &S7comm{ + Server: tt.server, + Rack: tt.rack, + Slot: tt.slot, + Configs: tt.configs, + Log: &testutil.Logger{}, + } + require.ErrorContains(t, plugin.Init(), tt.expectedError) + }) + } +} + +func TestInit(t *testing.T) { + plugin := &S7comm{ + Server: "127.0.0.1:102", + Rack: 0, + Slot: 0, + Configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.W2", + }, + }, + }, + }, + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) +} + +func TestFieldMappings(t *testing.T) { + tests := []struct { + name string + configs []metricDefinition + expected []batch + }{ + { + name: "single field bit", + configs: []metricDefinition{ + { + Name: "test", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB5.X3.2", + }, + }, + }, + }, + expected: []batch{ + { + items: []gos7.S7DataItem{ + { + Area: 0x84, + WordLen: 0x01, + DBNumber: 5, + Start: 3, + Amount: 1, + Data: make([]byte, 1), + }, + }, + mappings: []fieldMapping{ + { + measurement: "test", + field: "foo", + convert: func(b []byte) interface{} { return false }, + }, + }, + }, + }, + }, + { + name: "single field byte", + configs: []metricDefinition{ + { + Name: "test", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB5.B3", + }, + }, + }, + }, + expected: []batch{ + { + items: []gos7.S7DataItem{ + { + Area: 0x84, + WordLen: 0x02, + DBNumber: 5, + Start: 3, + Amount: 1, + Data: make([]byte, 1), + }, + }, + mappings: []fieldMapping{ + { + measurement: "test", + field: "foo", + convert: func(b []byte) interface{} { return byte(0) }, + }, + }, + }, + }, + }, + { + name: "single field char", + configs: []metricDefinition{ + { + Name: "test", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB5.C3", + }, + }, + }, + }, + expected: []batch{ + { + items: []gos7.S7DataItem{ + { + Area: 0x84, + WordLen: 0x03, + DBNumber: 5, + Start: 3, + Amount: 1, + Data: make([]byte, 1), + }, + }, + mappings: []fieldMapping{ + { + measurement: "test", + field: "foo", + convert: func(b []byte) interface{} { return string([]byte{0}) }, + }, + }, + }, + }, + }, + { + name: "single field string", + configs: []metricDefinition{ + { + Name: "test", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB5.S3.10", + }, + }, + }, + }, + expected: []batch{ + { + items: []gos7.S7DataItem{ + { + Area: 0x84, + WordLen: 0x03, + DBNumber: 5, + Start: 3, + Amount: 10, + Data: make([]byte, 12), + }, + }, + mappings: []fieldMapping{ + { + measurement: "test", + field: "foo", + convert: func(b []byte) interface{} { return "" }, + }, + }, + }, + }, + }, + { + name: "single field word", + configs: []metricDefinition{ + { + Name: "test", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB5.W3", + }, + }, + }, + }, + expected: []batch{ + { + items: []gos7.S7DataItem{ + { + Area: 0x84, + WordLen: 0x04, + DBNumber: 5, + Start: 3, + Amount: 1, + Data: make([]byte, 2), + }, + }, + mappings: []fieldMapping{ + { + measurement: "test", + field: "foo", + convert: func(b []byte) interface{} { return uint16(0) }, + }, + }, + }, + }, + }, + { + name: "single field integer", + configs: []metricDefinition{ + { + Name: "test", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB5.I3", + }, + }, + }, + }, + expected: []batch{ + { + items: []gos7.S7DataItem{ + { + Area: 0x84, + WordLen: 0x05, + DBNumber: 5, + Start: 3, + Amount: 1, + Data: make([]byte, 2), + }, + }, + mappings: []fieldMapping{ + { + measurement: "test", + field: "foo", + convert: func(b []byte) interface{} { return int16(0) }, + }, + }, + }, + }, + }, + { + name: "single field double word", + configs: []metricDefinition{ + { + Name: "test", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB5.DW3", + }, + }, + }, + }, + expected: []batch{ + { + items: []gos7.S7DataItem{ + { + Area: 0x84, + WordLen: 0x06, + DBNumber: 5, + Start: 3, + Amount: 1, + Data: make([]byte, 4), + }, + }, + mappings: []fieldMapping{ + { + measurement: "test", + field: "foo", + convert: func(b []byte) interface{} { return uint32(0) }, + }, + }, + }, + }, + }, + { + name: "single field double integer", + configs: []metricDefinition{ + { + Name: "test", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB5.DI3", + }, + }, + }, + }, + expected: []batch{ + { + items: []gos7.S7DataItem{ + { + Area: 0x84, + WordLen: 0x07, + DBNumber: 5, + Start: 3, + Amount: 1, + Data: make([]byte, 4), + }, + }, + mappings: []fieldMapping{ + { + measurement: "test", + field: "foo", + convert: func(b []byte) interface{} { return int32(0) }, + }, + }, + }, + }, + }, + { + name: "single field float", + configs: []metricDefinition{ + { + Name: "test", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB5.R3", + }, + }, + }, + }, + expected: []batch{ + { + items: []gos7.S7DataItem{ + { + Area: 0x84, + WordLen: 0x08, + DBNumber: 5, + Start: 3, + Amount: 1, + Data: make([]byte, 4), + }, + }, + mappings: []fieldMapping{ + { + measurement: "test", + field: "foo", + convert: func(b []byte) interface{} { return float32(0) }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &S7comm{ + Server: "127.0.0.1:102", + Rack: 0, + Slot: 2, + Configs: tt.configs, + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + // Check the length + require.Len(t, plugin.batches, len(tt.expected)) + // Check the actual content + for i, eb := range tt.expected { + ab := plugin.batches[i] + require.Len(t, ab.items, len(eb.items)) + require.Len(t, ab.mappings, len(eb.mappings)) + require.EqualValues(t, eb.items, plugin.batches[i].items, "different items") + for j, em := range eb.mappings { + am := ab.mappings[j] + require.Equal(t, em.measurement, am.measurement) + require.Equal(t, em.field, am.field) + buf := ab.items[j].Data + require.Equal(t, em.convert(buf), am.convert(buf)) + } + } + }) + } +} + +func TestMetricCollisions(t *testing.T) { + tests := []struct { + name string + configs []metricDefinition + expectedError string + }{ + { + name: "duplicate fields same config", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.W1", + }, + { + Name: "foo", + Address: "DB1.B1", + }, + }, + }, + }, + expectedError: "duplicate field definition", + }, + { + name: "duplicate fields different config", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.B1", + }, + }, + }, + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.B1", + }, + }, + }, + }, + expectedError: "duplicate field definition", + }, + { + name: "same fields different name", + configs: []metricDefinition{ + { + Name: "foo", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.B1", + }, + }, + }, + { + Name: "bar", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.B1", + }, + }, + }, + }, + }, + { + name: "same fields different tags", + configs: []metricDefinition{ + { + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.B1", + }, + }, + Tags: map[string]string{"device": "foo"}, + }, + { + Name: "bar", + Fields: []metricFieldDefinition{ + { + Name: "foo", + Address: "DB1.B1", + }, + }, + Tags: map[string]string{"device": "bar"}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plugin := &S7comm{ + Server: "127.0.0.1:102", + Rack: 0, + Slot: 2, + Configs: tt.configs, + Log: &testutil.Logger{}, + } + err := plugin.Init() + if tt.expectedError != "" { + require.ErrorContains(t, err, tt.expectedError) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/plugins/inputs/s7comm/sample.conf b/plugins/inputs/s7comm/sample.conf new file mode 100644 index 000000000..62f66788c --- /dev/null +++ b/plugins/inputs/s7comm/sample.conf @@ -0,0 +1,53 @@ +# Plugin for retrieving data from Siemens PLCs via the S7 protocol (RFC1006) +[[inputs.s7comm]] + ## Parameters to contact the PLC (mandatory) + ## The server is in the [:port] format where the port defaults to 102 + ## if not explicitly specified. + server = "127.0.0.1:102" + rack = 0 + slot = 0 + + ## Timeout for requests + # timeout = "10s" + + ## Log detailed connection messages for debugging + ## This option only has an effect when Telegraf runs in debug mode + # debug_connection = false + + ## Metric definition(s) + [[inputs.s7comm.metric]] + ## Name of the measurement + # name = "s7comm" + + ## Field definitions + ## name - field name + ## address - indirect address ".
[.extra]" + ## area - e.g. be "DB1" for data-block one + ## type - supported types are (uppercase) + ## X -- bit, requires the bit-number as 'extra' + ## parameter + ## B -- byte (8 bit) + ## C -- character (8 bit) + ## W -- word (16 bit) + ## DW -- double word (32 bit) + ## I -- integer (16 bit) + ## DI -- double integer (32 bit) + ## R -- IEEE 754 real floating point number (32 bit) + ## DT -- date-time, always converted to unix timestamp + ## with nano-second precision + ## S -- string, requires the maximum length of the + ## string as 'extra' parameter + ## address - start address to read if not specified otherwise + ## in the type field + ## extra - extra parameter e.g. for the bit and string type + fields = [ + { name="rpm", address="DB1.R4" }, + { name="status_ok", address="DB1.X2.1" }, + { name="last_error", address="DB2.S1.32" }, + { name="last_error_time", address="DB2.DT2" } + ] + + ## Tags assigned to the metric + # [inputs.s7comm.metric.tags] + # device = "compressor" + # location = "main building" diff --git a/plugins/inputs/s7comm/type_conversions.go b/plugins/inputs/s7comm/type_conversions.go new file mode 100644 index 000000000..404b43a0a --- /dev/null +++ b/plugins/inputs/s7comm/type_conversions.go @@ -0,0 +1,67 @@ +package s7comm + +import ( + "encoding/binary" + "math" + + "github.com/robinson/gos7" +) + +var helper = &gos7.Helper{} + +func determineConversion(dtype string, extra int) converterFunc { + switch dtype { + case "X": + return func(buf []byte) interface{} { + return (buf[0] & (1 << extra)) != 0 + } + case "B": + return func(buf []byte) interface{} { + return buf[0] + } + case "C": + return func(buf []byte) interface{} { + return string(buf[0]) + } + case "S": + return func(buf []byte) interface{} { + if len(buf) <= 2 { + return "" + } + // Get the length of the encoded string + length := int(buf[0]) + // Clip the string if we do not fill the whole buffer + if length < len(buf)-2 { + return string(buf[2 : 2+length]) + } + return string(buf[2:]) + } + case "W": + return func(buf []byte) interface{} { + return binary.BigEndian.Uint16(buf) + } + case "I": + return func(buf []byte) interface{} { + return int16(binary.BigEndian.Uint16(buf)) + } + case "DW": + return func(buf []byte) interface{} { + return binary.BigEndian.Uint32(buf) + } + case "DI": + return func(buf []byte) interface{} { + return int32(binary.BigEndian.Uint32(buf)) + } + case "R": + return func(buf []byte) interface{} { + x := binary.BigEndian.Uint32(buf) + return math.Float32frombits(x) + } + case "DT": + return func(buf []byte) interface{} { + return helper.GetDateTimeAt(buf, 0).UnixNano() + } + } + + panic("Unknown type! Please file an issue on https://github.com/influxdata/telegraf including your config.") +}