feat(inputs): Add new S7comm plugin (#13731)

This commit is contained in:
Sven Rebhan 2023-08-09 21:56:47 +02:00 committed by GitHub
parent 96a9597720
commit 8b032b73ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1322 additions and 0 deletions

View File

@ -298,6 +298,7 @@ following works:
- github.com/remyoudompheng/bigfft [BSD 3-Clause "New" or "Revised" License](https://github.com/remyoudompheng/bigfft/blob/master/LICENSE)
- github.com/riemann/riemann-go-client [MIT License](https://github.com/riemann/riemann-go-client/blob/master/LICENSE)
- github.com/robbiet480/go.nut [MIT License](https://github.com/robbiet480/go.nut/blob/master/LICENSE)
- github.com/robinson/gos7 [BSD 3-Clause "New" or "Revised" License](https://github.com/robinson/gos7/blob/master/LICENSE)
- github.com/russross/blackfriday [BSD 2-Clause "Simplified" License](https://github.com/russross/blackfriday/blob/master/LICENSE.txt)
- github.com/safchain/ethtool [Apache License 2.0](https://github.com/safchain/ethtool/blob/master/LICENSE)
- github.com/samber/lo [MIT License](https://github.com/samber/lo/blob/master/LICENSE)

3
go.mod
View File

@ -156,6 +156,7 @@ require (
github.com/rabbitmq/amqp091-go v1.8.1
github.com/riemann/riemann-go-client v0.5.1-0.20211206220514-f58f10cdce16
github.com/robbiet480/go.nut v0.0.0-20220219091450-bd8f121e1fa1
github.com/robinson/gos7 v0.0.0-20230421131203-d20ac6ca08cd
github.com/safchain/ethtool v0.3.0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/sensu/sensu-go/api/core/v2 v2.16.0
@ -491,3 +492,5 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace github.com/robinson/gos7 => github.com/srebhan/gos7 v0.0.0-20230807171120-77ee3120c4eb

2
go.sum
View File

@ -1419,6 +1419,8 @@ github.com/srebhan/cborquery v0.0.0-20230626165538-38be85b82316 h1:HVv8JjpX24FuI
github.com/srebhan/cborquery v0.0.0-20230626165538-38be85b82316/go.mod h1:9vX3Dhehey14KFYwWo4K/4JOJRve6jvQf6R9Y8PymLI=
github.com/srebhan/protobufquery v0.0.0-20230803132024-ae4c0d878e55 h1:ksmbrLbJAm+8yxB7fJ245usD0b1v9JHBJrWF+WqGyjs=
github.com/srebhan/protobufquery v0.0.0-20230803132024-ae4c0d878e55/go.mod h1:SIB3zq5pZq2Ff7aJtCdRpGiHc/meKyMLPEj8F5Tf1j8=
github.com/srebhan/gos7 v0.0.0-20230807171120-77ee3120c4eb h1:WUK18HBPrJVKkJp9VpvNZFE2bLmC+Mf0zMoMDcD1vFw=
github.com/srebhan/gos7 v0.0.0-20230807171120-77ee3120c4eb/go.mod h1:+96COZeFGpQFstWJ0eKQF+F6/z8gWVpBNGx/K3gD+QU=
github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@ -0,0 +1,5 @@
//go:build !custom || inputs || inputs.s7comm
package all
import _ "github.com/influxdata/telegraf/plugins/inputs/s7comm" // register plugin

View File

@ -0,0 +1,81 @@
# Siemens S7 Input Plugin
This plugin reads metrics from Siemens PLCs via the S7 protocol.
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
In addition to the plugin-specific configuration settings, plugins support
additional global and plugin configuration settings. These settings are used to
modify metrics, tags, and field or create aliases and configure ordering, etc.
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
## Configuration
```toml @sample.conf
# Plugin for retrieving data from Siemens PLCs via the S7 protocol (RFC1006)
[[inputs.s7comm]]
## Parameters to contact the PLC (mandatory)
## The server is in the <host>[:port] format where the port defaults to 102
## if not explicitly specified.
server = "127.0.0.1:102"
rack = 0
slot = 0
## Timeout for requests
# timeout = "10s"
## Log detailed connection messages for debugging
## This option only has an effect when Telegraf runs in debug mode
# debug_connection = false
## Metric definition(s)
[[inputs.s7comm.metric]]
## Name of the measurement
# name = "s7comm"
## Field definitions
## name - field name
## address - indirect address "<area>.<type><address>[.extra]"
## area - e.g. be "DB1" for data-block one
## type - supported types are (uppercase)
## X -- bit, requires the bit-number as 'extra'
## parameter
## B -- byte (8 bit)
## C -- character (8 bit)
## W -- word (16 bit)
## DW -- double word (32 bit)
## I -- integer (16 bit)
## DI -- double integer (32 bit)
## R -- IEEE 754 real floating point number (32 bit)
## DT -- date-time, always converted to unix timestamp
## with nano-second precision
## S -- string, requires the maximum length of the
## string as 'extra' parameter
## address - start address to read if not specified otherwise
## in the type field
## extra - extra parameter e.g. for the bit and string type
fields = [
{ name="rpm", address="DB1.R4" },
{ name="status_ok", address="DB1.X2.1" },
{ name="last_error", address="DB2.S1.32" },
{ name="last_error_time", address="DB2.DT2" }
]
## Tags assigned to the metric
# [inputs.s7comm.metric.tags]
# device = "compressor"
# location = "main building"
```
## Example Output
```text
s7comm,host=Hugin rpm=712i,status_ok=true,last_error="empty slot",last_error_time=1611319681000000000i 1611332164000000000
```
## Metrics
The format of metrics produced by this plugin depends on the metric
configuration(s).

View File

@ -0,0 +1,410 @@
//go:generate ../../../tools/readme_config_includer/generator
package s7comm
import (
_ "embed"
"errors"
"fmt"
"hash/maphash"
"log" //nolint:depguard // Required for tracing connection issues
"net"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/robinson/gos7"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
const maxRequestsPerBatch = 20
const addressRegexp = `^(?P<area>[A-Z]+)(?P<no>[0-9]+)\.(?P<type>[A-Z]+)(?P<start>[0-9]+)(?:\.(?P<extra>.*))?$`
var (
regexAddr = regexp.MustCompile(addressRegexp)
// Area mapping taken from https://github.com/robinson/gos7/blob/master/client.go
areaMap = map[string]int{
"PE": 0x81, // process inputs
"PA": 0x82, // process outputs
"MK": 0x83, // Merkers
"DB": 0x84, // DB
"C": 0x1C, // counters
"T": 0x1D, // timers
}
// Word-length mapping taken from https://github.com/robinson/gos7/blob/master/client.go
wordLenMap = map[string]int{
"X": 0x01, // Bit
"B": 0x02, // Byte (8 bit)
"C": 0x03, // Char (8 bit)
"S": 0x03, // String (8 bit)
"W": 0x04, // Word (16 bit)
"I": 0x05, // Integer (16 bit)
"DW": 0x06, // Double Word (32 bit)
"DI": 0x07, // Double integer (32 bit)
"R": 0x08, // IEEE 754 real (32 bit)
// see https://support.industry.siemens.com/cs/document/36479/date_and_time-format-for-s7-?dti=0&lc=en-DE
"DT": 0x0F, // Date and time (7 byte)
}
)
type metricFieldDefinition struct {
Name string `toml:"name"`
Address string `toml:"address"`
}
type metricDefinition struct {
Name string `toml:"name"`
Fields []metricFieldDefinition `toml:"fields"`
Tags map[string]string `toml:"tags"`
}
type converterFunc func([]byte) interface{}
type batch struct {
items []gos7.S7DataItem
mappings []fieldMapping
}
type fieldMapping struct {
measurement string
field string
tags map[string]string
convert converterFunc
}
// S7comm represents the plugin
type S7comm struct {
Server string `toml:"server"`
Rack int `toml:"rack"`
Slot int `toml:"slot"`
Timeout config.Duration `toml:"timeout"`
DebugConnection bool `toml:"debug_connection"`
Configs []metricDefinition `toml:"metric"`
Log telegraf.Logger `toml:"-"`
handler *gos7.TCPClientHandler
client gos7.Client
batches []batch
}
// SampleConfig returns a basic configuration for the plugin
func (*S7comm) SampleConfig() string {
return sampleConfig
}
// Init checks the config settings and prepares the plugin. It's called
// once by the Telegraf agent after parsing the config settings.
func (s *S7comm) Init() error {
// Check settings
if s.Server == "" {
return errors.New("'server' has to be specified")
}
if s.Rack < 0 {
return errors.New("'rack' has to be specified")
}
if s.Slot < 0 {
return errors.New("'slot' has to be specified")
}
if len(s.Configs) == 0 {
return errors.New("no metric defined")
}
// Set default port to 102 if none is given
var nerr *net.AddrError
if _, _, err := net.SplitHostPort(s.Server); errors.As(err, &nerr) {
if !strings.Contains(nerr.Err, "missing port") {
return errors.New("invalid 'server' address")
}
s.Server += ":102"
}
// Create the requests
return s.createRequests()
}
// Start initializes the connection to the remote endpoint
func (s *S7comm) Start(_ telegraf.Accumulator) error {
// Create handler for the connection
s.handler = gos7.NewTCPClientHandler(s.Server, s.Rack, s.Slot)
s.handler.Timeout = time.Duration(s.Timeout)
if s.DebugConnection {
s.handler.Logger = log.New(os.Stderr, "D! [inputs.s7comm]", log.LstdFlags)
}
if err := s.handler.Connect(); err != nil {
return fmt.Errorf("connecting to %q failed: %w", s.Server, err)
}
s.client = gos7.NewClient(s.handler)
return nil
}
// Stop disconnects from the remote endpoint and cleans up
func (s *S7comm) Stop() {
if s.handler != nil {
s.handler.Close()
}
}
// Gather collects the data from the device
func (s *S7comm) Gather(acc telegraf.Accumulator) error {
timestamp := time.Now()
grouper := metric.NewSeriesGrouper()
for i, b := range s.batches {
// Read the batch
s.Log.Debugf("Reading batch %d...", i+1)
if err := s.client.AGReadMulti(b.items, len(b.items)); err != nil {
return fmt.Errorf("reading batch %d failed: %w", i+1, err)
}
// Dissect the received data into fields
for j, m := range b.mappings {
// Convert the data
buf := b.items[j].Data
value := m.convert(buf)
s.Log.Debugf(" got %v for field %q @ %d --> %v (%T)", buf, m.field, b.items[j].Start, value, value)
// Group the data by series
grouper.Add(m.measurement, m.tags, timestamp, m.field, value)
}
}
// Add the metrics grouped by series to the accumulator
for _, x := range grouper.Metrics() {
acc.AddMetric(x)
}
return nil
}
// Internal functions
func (s *S7comm) createRequests() error {
seed := maphash.MakeSeed()
seenFields := make(map[uint64]bool)
s.batches = make([]batch, 0)
current := batch{}
for i, cfg := range s.Configs {
// Set the defaults
if cfg.Name == "" {
cfg.Name = "s7comm"
}
// Check the metric definitions
if len(cfg.Fields) == 0 {
return fmt.Errorf("no fields defined for metric %q", cfg.Name)
}
// Create requests for all fields and add it to the current slot
for _, f := range cfg.Fields {
if f.Name == "" {
return fmt.Errorf("unnamed field in metric %q", cfg.Name)
}
item, cfunc, err := handleFieldAddress(f.Address)
if err != nil {
return fmt.Errorf("field %q of metric %q: %w", f.Name, cfg.Name, err)
}
m := fieldMapping{
measurement: cfg.Name,
field: f.Name,
tags: s.Configs[i].Tags,
convert: cfunc,
}
current.items = append(current.items, *item)
current.mappings = append(current.mappings, m)
// If the batch is full, start a new one
if len(current.items) == maxRequestsPerBatch {
s.batches = append(s.batches, current)
current = batch{}
}
// Check for duplicate field definitions
id, err := fieldID(seed, cfg, f)
if err != nil {
return fmt.Errorf("cannot determine field id for %q: %w", f.Name, err)
}
if seenFields[id] {
return fmt.Errorf("duplicate field definition field %q in metric %q", f.Name, cfg.Name)
}
seenFields[id] = true
}
// Update the configuration if changed
s.Configs[i] = cfg
}
// Add the last batch if any
if len(current.items) > 0 {
s.batches = append(s.batches, current)
}
return nil
}
func handleFieldAddress(address string) (*gos7.S7DataItem, converterFunc, error) {
// Parse the address into the different parts
if !regexAddr.MatchString(address) {
return nil, nil, fmt.Errorf("invalid address %q", address)
}
names := regexAddr.SubexpNames()[1:]
parts := regexAddr.FindStringSubmatch(address)[1:]
if len(names) != len(parts) {
return nil, nil, fmt.Errorf("names %v do not match parts %v", names, parts)
}
groups := make(map[string]string, len(names))
for i, n := range names {
groups[n] = parts[i]
}
// Check that we do have the required entries in the address
if _, found := groups["area"]; !found {
return nil, nil, errors.New("area is missing from address")
}
if _, found := groups["no"]; !found {
return nil, nil, errors.New("area index is missing from address")
}
if _, found := groups["type"]; !found {
return nil, nil, errors.New("type is missing from address")
}
if _, found := groups["start"]; !found {
return nil, nil, errors.New("start address is missing from address")
}
dtype := groups["type"]
// Lookup the item values from names and check the params
area, found := areaMap[groups["area"]]
if !found {
return nil, nil, errors.New("invalid area")
}
wordlen, found := wordLenMap[dtype]
if !found {
return nil, nil, errors.New("unknown data type")
}
areaidx, err := strconv.Atoi(groups["no"])
if err != nil {
return nil, nil, fmt.Errorf("invalid area index: %w", err)
}
start, err := strconv.Atoi(groups["start"])
if err != nil {
return nil, nil, fmt.Errorf("invalid start address: %w", err)
}
// Check the amount parameter if any
var extra int
switch dtype {
case "X", "S":
// We require an extra parameter
x := groups["extra"]
if x == "" {
return nil, nil, errors.New("extra parameter required")
}
extra, err = strconv.Atoi(x)
if err != nil {
return nil, nil, fmt.Errorf("invalid extra parameter: %w", err)
}
if extra < 1 {
return nil, nil, fmt.Errorf("invalid extra parameter %d", extra)
}
default:
if groups["extra"] != "" {
return nil, nil, errors.New("extra parameter specified but not used")
}
}
// Get the required buffer size
amount := 1
var buflen int
switch dtype {
case "X", "B", "C": // 8-bit types
buflen = 1
case "W", "I": // 16-bit types
buflen = 2
case "DW", "DI", "R": // 32-bit types
buflen = 4
case "DT": // 7-byte
buflen = 7
case "S":
amount = extra
// Extra bytes as the first byte is the max-length of the string and
// the second byte is the actual length of the string.
buflen = extra + 2
default:
return nil, nil, errors.New("invalid data type")
}
// Setup the data item
item := &gos7.S7DataItem{
Area: area,
WordLen: wordlen,
DBNumber: areaidx,
Start: start,
Amount: amount,
Data: make([]byte, buflen),
}
// Determine the type converter function
f := determineConversion(dtype, extra)
return item, f, nil
}
func fieldID(seed maphash.Seed, def metricDefinition, field metricFieldDefinition) (uint64, error) {
var mh maphash.Hash
mh.SetSeed(seed)
if _, err := mh.WriteString(def.Name); err != nil {
return 0, err
}
if err := mh.WriteByte(0); err != nil {
return 0, err
}
if _, err := mh.WriteString(field.Name); err != nil {
return 0, err
}
if err := mh.WriteByte(0); err != nil {
return 0, err
}
// Tags
for k, v := range def.Tags {
if _, err := mh.WriteString(k); err != nil {
return 0, err
}
if err := mh.WriteByte('='); err != nil {
return 0, err
}
if _, err := mh.WriteString(v); err != nil {
return 0, err
}
if err := mh.WriteByte(':'); err != nil {
return 0, err
}
}
if err := mh.WriteByte(0); err != nil {
return 0, err
}
return mh.Sum64(), nil
}
// Add this plugin to telegraf
func init() {
inputs.Add("s7comm", func() telegraf.Input {
return &S7comm{
Rack: -1,
Slot: -1,
Timeout: config.Duration(10 * time.Second),
}
})
}

View File

@ -0,0 +1,700 @@
package s7comm
import (
_ "embed"
"testing"
"github.com/influxdata/telegraf/testutil"
"github.com/robinson/gos7"
"github.com/stretchr/testify/require"
)
func TestSampleConfig(t *testing.T) {
plugin := &S7comm{}
require.NotEmpty(t, plugin.SampleConfig())
}
func TestInitFail(t *testing.T) {
tests := []struct {
name string
server string
rack int
slot int
configs []metricDefinition
expectedError string
}{
{
name: "empty settings",
rack: -1, // This is the default in `init()`
slot: -1, // This is the default in `init()`
expectedError: "'server' has to be specified",
},
{
name: "missing rack",
server: "127.0.0.1:102",
rack: -1, // This is the default in `init()`
slot: -1, // This is the default in `init()`
expectedError: "'rack' has to be specified",
},
{
name: "missing slot",
server: "127.0.0.1:102",
rack: 0,
slot: -1, // This is the default in `init()`
expectedError: "'slot' has to be specified",
},
{
name: "missing configs",
server: "127.0.0.1:102",
expectedError: "no metric defined",
},
{
name: "single empty metric",
server: "127.0.0.1:102",
configs: []metricDefinition{{}},
expectedError: "no fields defined for metric",
},
{
name: "single empty metric field",
server: "127.0.0.1:102",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{{}},
},
},
expectedError: "unnamed field in metric",
},
{
name: "no address",
server: "127.0.0.1:102",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
},
},
},
},
expectedError: "invalid address",
},
{
name: "invalid address pattern",
server: "127.0.0.1:102",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "FOO",
},
},
},
},
expectedError: "invalid address",
},
{
name: "invalid address area",
server: "127.0.0.1:102",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "FOO1.W2",
},
},
},
},
expectedError: "invalid area",
},
{
name: "invalid address area index",
server: "127.0.0.1:102",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB.W2",
},
},
},
},
expectedError: "invalid address",
},
{
name: "invalid address type",
server: "127.0.0.1:102",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.A2",
},
},
},
},
expectedError: "unknown data type",
},
{
name: "invalid address start",
server: "127.0.0.1:102",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.A",
},
},
},
},
expectedError: "invalid address",
},
{
name: "missing extra parameter bit",
server: "127.0.0.1:102",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.X1",
},
},
},
},
expectedError: "extra parameter required",
},
{
name: "missing extra parameter string",
server: "127.0.0.1:102",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.S1",
},
},
},
},
expectedError: "extra parameter required",
},
{
name: "invalid address extra parameter",
server: "127.0.0.1:102",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W1.23",
},
},
},
},
expectedError: "extra parameter specified but not used",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plugin := &S7comm{
Server: tt.server,
Rack: tt.rack,
Slot: tt.slot,
Configs: tt.configs,
Log: &testutil.Logger{},
}
require.ErrorContains(t, plugin.Init(), tt.expectedError)
})
}
}
func TestInit(t *testing.T) {
plugin := &S7comm{
Server: "127.0.0.1:102",
Rack: 0,
Slot: 0,
Configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W2",
},
},
},
},
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())
}
func TestFieldMappings(t *testing.T) {
tests := []struct {
name string
configs []metricDefinition
expected []batch
}{
{
name: "single field bit",
configs: []metricDefinition{
{
Name: "test",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB5.X3.2",
},
},
},
},
expected: []batch{
{
items: []gos7.S7DataItem{
{
Area: 0x84,
WordLen: 0x01,
DBNumber: 5,
Start: 3,
Amount: 1,
Data: make([]byte, 1),
},
},
mappings: []fieldMapping{
{
measurement: "test",
field: "foo",
convert: func(b []byte) interface{} { return false },
},
},
},
},
},
{
name: "single field byte",
configs: []metricDefinition{
{
Name: "test",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB5.B3",
},
},
},
},
expected: []batch{
{
items: []gos7.S7DataItem{
{
Area: 0x84,
WordLen: 0x02,
DBNumber: 5,
Start: 3,
Amount: 1,
Data: make([]byte, 1),
},
},
mappings: []fieldMapping{
{
measurement: "test",
field: "foo",
convert: func(b []byte) interface{} { return byte(0) },
},
},
},
},
},
{
name: "single field char",
configs: []metricDefinition{
{
Name: "test",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB5.C3",
},
},
},
},
expected: []batch{
{
items: []gos7.S7DataItem{
{
Area: 0x84,
WordLen: 0x03,
DBNumber: 5,
Start: 3,
Amount: 1,
Data: make([]byte, 1),
},
},
mappings: []fieldMapping{
{
measurement: "test",
field: "foo",
convert: func(b []byte) interface{} { return string([]byte{0}) },
},
},
},
},
},
{
name: "single field string",
configs: []metricDefinition{
{
Name: "test",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB5.S3.10",
},
},
},
},
expected: []batch{
{
items: []gos7.S7DataItem{
{
Area: 0x84,
WordLen: 0x03,
DBNumber: 5,
Start: 3,
Amount: 10,
Data: make([]byte, 12),
},
},
mappings: []fieldMapping{
{
measurement: "test",
field: "foo",
convert: func(b []byte) interface{} { return "" },
},
},
},
},
},
{
name: "single field word",
configs: []metricDefinition{
{
Name: "test",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB5.W3",
},
},
},
},
expected: []batch{
{
items: []gos7.S7DataItem{
{
Area: 0x84,
WordLen: 0x04,
DBNumber: 5,
Start: 3,
Amount: 1,
Data: make([]byte, 2),
},
},
mappings: []fieldMapping{
{
measurement: "test",
field: "foo",
convert: func(b []byte) interface{} { return uint16(0) },
},
},
},
},
},
{
name: "single field integer",
configs: []metricDefinition{
{
Name: "test",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB5.I3",
},
},
},
},
expected: []batch{
{
items: []gos7.S7DataItem{
{
Area: 0x84,
WordLen: 0x05,
DBNumber: 5,
Start: 3,
Amount: 1,
Data: make([]byte, 2),
},
},
mappings: []fieldMapping{
{
measurement: "test",
field: "foo",
convert: func(b []byte) interface{} { return int16(0) },
},
},
},
},
},
{
name: "single field double word",
configs: []metricDefinition{
{
Name: "test",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB5.DW3",
},
},
},
},
expected: []batch{
{
items: []gos7.S7DataItem{
{
Area: 0x84,
WordLen: 0x06,
DBNumber: 5,
Start: 3,
Amount: 1,
Data: make([]byte, 4),
},
},
mappings: []fieldMapping{
{
measurement: "test",
field: "foo",
convert: func(b []byte) interface{} { return uint32(0) },
},
},
},
},
},
{
name: "single field double integer",
configs: []metricDefinition{
{
Name: "test",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB5.DI3",
},
},
},
},
expected: []batch{
{
items: []gos7.S7DataItem{
{
Area: 0x84,
WordLen: 0x07,
DBNumber: 5,
Start: 3,
Amount: 1,
Data: make([]byte, 4),
},
},
mappings: []fieldMapping{
{
measurement: "test",
field: "foo",
convert: func(b []byte) interface{} { return int32(0) },
},
},
},
},
},
{
name: "single field float",
configs: []metricDefinition{
{
Name: "test",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB5.R3",
},
},
},
},
expected: []batch{
{
items: []gos7.S7DataItem{
{
Area: 0x84,
WordLen: 0x08,
DBNumber: 5,
Start: 3,
Amount: 1,
Data: make([]byte, 4),
},
},
mappings: []fieldMapping{
{
measurement: "test",
field: "foo",
convert: func(b []byte) interface{} { return float32(0) },
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plugin := &S7comm{
Server: "127.0.0.1:102",
Rack: 0,
Slot: 2,
Configs: tt.configs,
Log: &testutil.Logger{},
}
require.NoError(t, plugin.Init())
// Check the length
require.Len(t, plugin.batches, len(tt.expected))
// Check the actual content
for i, eb := range tt.expected {
ab := plugin.batches[i]
require.Len(t, ab.items, len(eb.items))
require.Len(t, ab.mappings, len(eb.mappings))
require.EqualValues(t, eb.items, plugin.batches[i].items, "different items")
for j, em := range eb.mappings {
am := ab.mappings[j]
require.Equal(t, em.measurement, am.measurement)
require.Equal(t, em.field, am.field)
buf := ab.items[j].Data
require.Equal(t, em.convert(buf), am.convert(buf))
}
}
})
}
}
func TestMetricCollisions(t *testing.T) {
tests := []struct {
name string
configs []metricDefinition
expectedError string
}{
{
name: "duplicate fields same config",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.W1",
},
{
Name: "foo",
Address: "DB1.B1",
},
},
},
},
expectedError: "duplicate field definition",
},
{
name: "duplicate fields different config",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.B1",
},
},
},
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.B1",
},
},
},
},
expectedError: "duplicate field definition",
},
{
name: "same fields different name",
configs: []metricDefinition{
{
Name: "foo",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.B1",
},
},
},
{
Name: "bar",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.B1",
},
},
},
},
},
{
name: "same fields different tags",
configs: []metricDefinition{
{
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.B1",
},
},
Tags: map[string]string{"device": "foo"},
},
{
Name: "bar",
Fields: []metricFieldDefinition{
{
Name: "foo",
Address: "DB1.B1",
},
},
Tags: map[string]string{"device": "bar"},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plugin := &S7comm{
Server: "127.0.0.1:102",
Rack: 0,
Slot: 2,
Configs: tt.configs,
Log: &testutil.Logger{},
}
err := plugin.Init()
if tt.expectedError != "" {
require.ErrorContains(t, err, tt.expectedError)
} else {
require.NoError(t, err)
}
})
}
}

View File

@ -0,0 +1,53 @@
# Plugin for retrieving data from Siemens PLCs via the S7 protocol (RFC1006)
[[inputs.s7comm]]
## Parameters to contact the PLC (mandatory)
## The server is in the <host>[:port] format where the port defaults to 102
## if not explicitly specified.
server = "127.0.0.1:102"
rack = 0
slot = 0
## Timeout for requests
# timeout = "10s"
## Log detailed connection messages for debugging
## This option only has an effect when Telegraf runs in debug mode
# debug_connection = false
## Metric definition(s)
[[inputs.s7comm.metric]]
## Name of the measurement
# name = "s7comm"
## Field definitions
## name - field name
## address - indirect address "<area>.<type><address>[.extra]"
## area - e.g. be "DB1" for data-block one
## type - supported types are (uppercase)
## X -- bit, requires the bit-number as 'extra'
## parameter
## B -- byte (8 bit)
## C -- character (8 bit)
## W -- word (16 bit)
## DW -- double word (32 bit)
## I -- integer (16 bit)
## DI -- double integer (32 bit)
## R -- IEEE 754 real floating point number (32 bit)
## DT -- date-time, always converted to unix timestamp
## with nano-second precision
## S -- string, requires the maximum length of the
## string as 'extra' parameter
## address - start address to read if not specified otherwise
## in the type field
## extra - extra parameter e.g. for the bit and string type
fields = [
{ name="rpm", address="DB1.R4" },
{ name="status_ok", address="DB1.X2.1" },
{ name="last_error", address="DB2.S1.32" },
{ name="last_error_time", address="DB2.DT2" }
]
## Tags assigned to the metric
# [inputs.s7comm.metric.tags]
# device = "compressor"
# location = "main building"

View File

@ -0,0 +1,67 @@
package s7comm
import (
"encoding/binary"
"math"
"github.com/robinson/gos7"
)
var helper = &gos7.Helper{}
func determineConversion(dtype string, extra int) converterFunc {
switch dtype {
case "X":
return func(buf []byte) interface{} {
return (buf[0] & (1 << extra)) != 0
}
case "B":
return func(buf []byte) interface{} {
return buf[0]
}
case "C":
return func(buf []byte) interface{} {
return string(buf[0])
}
case "S":
return func(buf []byte) interface{} {
if len(buf) <= 2 {
return ""
}
// Get the length of the encoded string
length := int(buf[0])
// Clip the string if we do not fill the whole buffer
if length < len(buf)-2 {
return string(buf[2 : 2+length])
}
return string(buf[2:])
}
case "W":
return func(buf []byte) interface{} {
return binary.BigEndian.Uint16(buf)
}
case "I":
return func(buf []byte) interface{} {
return int16(binary.BigEndian.Uint16(buf))
}
case "DW":
return func(buf []byte) interface{} {
return binary.BigEndian.Uint32(buf)
}
case "DI":
return func(buf []byte) interface{} {
return int32(binary.BigEndian.Uint32(buf))
}
case "R":
return func(buf []byte) interface{} {
x := binary.BigEndian.Uint32(buf)
return math.Float32frombits(x)
}
case "DT":
return func(buf []byte) interface{} {
return helper.GetDateTimeAt(buf, 0).UnixNano()
}
}
panic("Unknown type! Please file an issue on https://github.com/influxdata/telegraf including your config.")
}