feat(processors.scale): Add scaling by factor and offset (#13227)

This commit is contained in:
Sven Rebhan 2023-05-08 17:27:58 +02:00 committed by GitHub
parent 5cb928cc5d
commit afab345573
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 394 additions and 120 deletions

View File

@ -10,11 +10,18 @@ the given output range according to this formula:
\text{output\_minimum}
```
Input fields are converted to floating point values.
If the conversion fails, those fields are ignored.
Alternatively, you can apply a factor and offset to the input according to
this formula
**Please note:** Neither the input nor the output values
are clipped to their respective ranges!
```math
\text{result}=\text{factor} \cdot \text{value} + \text{offset}
```
Input fields are converted to floating point values if possible. Otherwise,
fields that cannot be converted are ignored and keep their original value.
**Please note:** Neither the input nor the output values are clipped to their
respective ranges!
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
@ -37,24 +44,24 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## - input_maximum: Maximum expected input value
## - output_minimum: Minimum desired output value
## - output_maximum: Maximum desired output value
## alternatively you can specify a scaling with factor and offset
## - factor: factor to scale the input value with
## - offset: additive offset for value after scaling
## - fields: a list of field names (or filters) to apply this scaling to
## Example: Define a scaling
## Example: Scaling with minimum and maximum values
# [processors.scale.scaling]
# input_minimum = 0
# input_maximum = 1
# output_minimum = 0
# output_maximum = 100
# fields = ["temperature1", "temperature2"]
## Multiple scalings can be defined simultaneously
## Example: A second scaling.
## Example: Scaling with factor and offset
# [processors.scale.scaling]
# input_minimum = 0
# input_maximum = 50
# output_minimum = 50
# output_maximum = 100
# fields = ["humidity*"]
# factor = 10.0
# offset = -5.0
# fields = ["voltage*"]
```
## Example

View File

@ -7,21 +7,21 @@
## - input_maximum: Maximum expected input value
## - output_minimum: Minimum desired output value
## - output_maximum: Maximum desired output value
## alternatively you can specify a scaling with factor and offset
## - factor: factor to scale the input value with
## - offset: additive offset for value after scaling
## - fields: a list of field names (or filters) to apply this scaling to
## Example: Define a scaling
## Example: Scaling with minimum and maximum values
# [processors.scale.scaling]
# input_minimum = 0
# input_maximum = 1
# output_minimum = 0
# output_maximum = 100
# fields = ["temperature1", "temperature2"]
## Multiple scalings can be defined simultaneously
## Example: A second scaling.
## Example: Scaling with factor and offset
# [processors.scale.scaling]
# input_minimum = 0
# input_maximum = 50
# output_minimum = 50
# output_maximum = 100
# fields = ["humidity*"]
# factor = 10.0
# offset = -5.0
# fields = ["voltage*"]

View File

@ -21,14 +21,18 @@ func (*Scale) SampleConfig() string {
}
type Scaling struct {
InMin float64 `toml:"input_minimum"`
InMax float64 `toml:"input_maximum"`
OutMin float64 `toml:"output_minimum"`
OutMax float64 `toml:"output_maximum"`
InMin *float64 `toml:"input_minimum"`
InMax *float64 `toml:"input_maximum"`
OutMin *float64 `toml:"output_minimum"`
OutMax *float64 `toml:"output_maximum"`
Factor *float64 `toml:"factor"`
Offset *float64 `toml:"offset"`
Fields []string `toml:"fields"`
factor float64
fieldFilter filter.Filter
scale float64
shiftIn float64
shiftOut float64
}
type Scale struct {
@ -36,13 +40,36 @@ type Scale struct {
Log telegraf.Logger `toml:"-"`
}
func (s *Scaling) init() error {
if s.InMax == s.InMin {
return fmt.Errorf("input minimum and maximum are equal for fields %s", strings.Join(s.Fields, ","))
}
func (s *Scaling) Init() error {
s.scale, s.shiftOut, s.shiftIn = float64(1.0), float64(0.0), float64(0.0)
allMinMaxSet := s.OutMax != nil && s.OutMin != nil && s.InMax != nil && s.InMin != nil
anyMinMaxSet := s.OutMax != nil || s.OutMin != nil || s.InMax != nil || s.InMin != nil
factorSet := s.Factor != nil || s.Offset != nil
if anyMinMaxSet && factorSet {
return fmt.Errorf("cannot use factor/offset and minimum/maximum at the same time for fields %s", strings.Join(s.Fields, ","))
} else if anyMinMaxSet && !allMinMaxSet {
return fmt.Errorf("all minimum and maximum values need to be set for fields %s", strings.Join(s.Fields, ","))
} else if !anyMinMaxSet && !factorSet {
return fmt.Errorf("no scaling defined for fields %s", strings.Join(s.Fields, ","))
} else if allMinMaxSet {
if *s.InMax == *s.InMin {
return fmt.Errorf("input minimum and maximum are equal for fields %s", strings.Join(s.Fields, ","))
}
if s.OutMax == s.OutMin {
return fmt.Errorf("output minimum and maximum are equal for fields %s", strings.Join(s.Fields, ","))
if *s.OutMax == *s.OutMin {
return fmt.Errorf("output minimum and maximum are equal for fields %s", strings.Join(s.Fields, ","))
}
s.scale = (*s.OutMax - *s.OutMin) / (*s.InMax - *s.InMin)
s.shiftOut = *s.OutMin
s.shiftIn = *s.InMin
} else {
if s.Factor != nil {
s.scale = *s.Factor
}
if s.Offset != nil {
s.shiftOut = *s.Offset
}
}
scalingFilter, err := filter.Compile(s.Fields)
@ -51,18 +78,17 @@ func (s *Scaling) init() error {
}
s.fieldFilter = scalingFilter
s.factor = (s.OutMax - s.OutMin) / (s.InMax - s.InMin)
return nil
}
// scale a float according to the input and output range
func (s *Scaling) process(value float64) float64 {
return (value-s.InMin)*s.factor + s.OutMin
return s.scale*(value-s.shiftIn) + s.shiftOut
}
func (s *Scale) Init() error {
if s.Scalings == nil {
return errors.New("no valid scalings defined")
return errors.New("no valid scaling defined")
}
allFields := make(map[string]bool)
@ -77,7 +103,7 @@ func (s *Scale) Init() error {
}
}
if err := s.Scalings[i].init(); err != nil {
if err := s.Scalings[i].Init(); err != nil {
return fmt.Errorf("scaling %d: %w", i+1, err)
}
}

View File

@ -4,36 +4,49 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestScaler(t *testing.T) {
type scalingValuesMinMax struct {
InMin float64
InMax float64
OutMin float64
OutMax float64
Fields []string
}
type scalingValuesFactor struct {
Factor float64
Offset float64
Fields []string
}
func TestMinMax(t *testing.T) {
tests := []struct {
name string
scale *Scale
scale []scalingValuesMinMax
inputs []telegraf.Metric
expected []telegraf.Metric
}{
{
name: "Field Scaling",
scale: &Scale{
Scalings: []Scaling{
{
InMin: -1,
InMax: 1,
OutMin: 0,
OutMax: 100,
Fields: []string{"test1", "test2"},
},
{
InMin: -5,
InMax: 0,
OutMin: 1,
OutMax: 9,
Fields: []string{"test3", "test4"},
},
scale: []scalingValuesMinMax{
{
InMin: -1,
InMax: 1,
OutMin: 0,
OutMax: 100,
Fields: []string{"test1", "test2"},
},
{
InMin: -5,
InMax: 0,
OutMin: 1,
OutMax: 9,
Fields: []string{"test3", "test4"},
},
},
inputs: []telegraf.Metric{
@ -82,16 +95,14 @@ func TestScaler(t *testing.T) {
},
},
{
name: "Ignored Fileds",
scale: &Scale{
Scalings: []Scaling{
{
InMin: -1,
InMax: 1,
OutMin: 0,
OutMax: 100,
Fields: []string{"test1", "test2"},
},
name: "Ignored Fields",
scale: []scalingValuesMinMax{
{
InMin: -1,
InMax: 1,
OutMin: 0,
OutMax: 100,
Fields: []string{"test1", "test2"},
},
},
inputs: []telegraf.Metric{
@ -113,15 +124,13 @@ func TestScaler(t *testing.T) {
},
{
name: "Out of range tests",
scale: &Scale{
Scalings: []Scaling{
{
InMin: -1,
InMax: 1,
OutMin: 0,
OutMax: 100,
Fields: []string{"test1", "test2"},
},
scale: []scalingValuesMinMax{
{
InMin: -1,
InMax: 1,
OutMin: 0,
OutMax: 100,
Fields: []string{"test1", "test2"},
},
},
inputs: []telegraf.Metric{
@ -141,15 +150,13 @@ func TestScaler(t *testing.T) {
},
{
name: "Missing field Fileds",
scale: &Scale{
Scalings: []Scaling{
{
InMin: -1,
InMax: 1,
OutMin: 0,
OutMax: 100,
Fields: []string{"test1", "test2"},
},
scale: []scalingValuesMinMax{
{
InMin: -1,
InMax: 1,
OutMin: 0,
OutMax: 100,
Fields: []string{"test1", "test2"},
},
},
inputs: []telegraf.Metric{
@ -169,63 +176,297 @@ func TestScaler(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.scale.Log = testutil.Logger{}
require.NoError(t, tt.scale.Init())
actual := tt.scale.Apply(tt.inputs...)
plugin := &Scale{
Scalings: make([]Scaling, 0, len(tt.scale)),
Log: testutil.Logger{},
}
for i := range tt.scale {
plugin.Scalings = append(plugin.Scalings, Scaling{
InMin: &tt.scale[i].InMin,
InMax: &tt.scale[i].InMax,
OutMin: &tt.scale[i].OutMin,
OutMax: &tt.scale[i].OutMax,
Fields: tt.scale[i].Fields,
})
}
require.NoError(t, plugin.Init())
actual := plugin.Apply(tt.inputs...)
testutil.RequireMetricsEqual(t, tt.expected, actual)
})
}
}
func TestErrorCases(t *testing.T) {
func TestFactor(t *testing.T) {
tests := []struct {
name string
scale *Scale
expectedErrorMsg string
name string
scale []scalingValuesFactor
inputs []telegraf.Metric
expected []telegraf.Metric
}{
{
name: "Same input range values",
scale: &Scale{
Scalings: []Scaling{
{
InMin: 1,
InMax: 1,
OutMin: 0,
OutMax: 100,
Fields: []string{"test"},
},
name: "Field Scaling",
scale: []scalingValuesFactor{
{
Factor: 50.0,
Offset: 50.0,
Fields: []string{"test1", "test2"},
},
{
Factor: 1.6,
Offset: 9.0,
Fields: []string{"test3", "test4"},
},
},
expectedErrorMsg: "input minimum and maximum are equal for fields test",
inputs: []telegraf.Metric{
testutil.MustMetric("Name1", map[string]string{},
map[string]interface{}{
"test1": int64(0),
"test2": uint64(1),
}, time.Unix(0, 0)),
testutil.MustMetric("Name2", map[string]string{},
map[string]interface{}{
"test1": "0.5",
"test2": float32(-0.5),
}, time.Unix(0, 0)),
testutil.MustMetric("Name3", map[string]string{},
map[string]interface{}{
"test3": int64(-3),
"test4": uint64(0),
}, time.Unix(0, 0)),
testutil.MustMetric("Name4", map[string]string{},
map[string]interface{}{
"test3": int64(-5),
"test4": float32(-0.5),
}, time.Unix(0, 0)),
},
expected: []telegraf.Metric{
testutil.MustMetric("Name1", map[string]string{},
map[string]interface{}{
"test1": float64(50),
"test2": float64(100),
}, time.Unix(0, 0)),
testutil.MustMetric("Name2", map[string]string{},
map[string]interface{}{
"test1": float64(75),
"test2": float32(25),
}, time.Unix(0, 0)),
testutil.MustMetric("Name3", map[string]string{},
map[string]interface{}{
"test3": float64(4.2),
"test4": float64(9),
}, time.Unix(0, 0)),
testutil.MustMetric("Name4", map[string]string{},
map[string]interface{}{
"test3": float64(1),
"test4": float64(8.2),
}, time.Unix(0, 0)),
},
},
{
name: "Same input range values",
scale: &Scale{
Scalings: []Scaling{
{
InMin: 0,
InMax: 1,
OutMin: 100,
OutMax: 100,
Fields: []string{"test"},
},
name: "Ignored Fields",
scale: []scalingValuesFactor{
{
Factor: 50.0,
Offset: 50.0,
Fields: []string{"test1", "test2"},
},
},
expectedErrorMsg: "output minimum and maximum are equal for fields test",
inputs: []telegraf.Metric{
testutil.MustMetric("Name1", map[string]string{},
map[string]interface{}{
"test1": int64(0),
"test2": uint64(1),
"test3": int64(1),
}, time.Unix(0, 0)),
},
expected: []telegraf.Metric{
testutil.MustMetric("Name1", map[string]string{},
map[string]interface{}{
"test1": float64(50),
"test2": float64(100),
"test3": int64(1),
}, time.Unix(0, 0)),
},
},
{
name: "No scalings",
scale: &Scale{Log: testutil.Logger{}},
expectedErrorMsg: "no valid scalings defined",
name: "Missing field Fields",
scale: []scalingValuesFactor{
{
Factor: 50.0,
Offset: 50.0,
Fields: []string{"test1", "test2"},
},
},
inputs: []telegraf.Metric{
testutil.MustMetric("Name1", map[string]string{},
map[string]interface{}{
"test1": int64(0),
}, time.Unix(0, 0)),
},
expected: []telegraf.Metric{
testutil.MustMetric("Name1", map[string]string{},
map[string]interface{}{
"test1": float64(50),
}, time.Unix(0, 0)),
},
},
{
name: "No Offset",
scale: []scalingValuesFactor{
{
Factor: 50.0,
Fields: []string{"test1"},
},
},
inputs: []telegraf.Metric{
testutil.MustMetric("Name1", map[string]string{},
map[string]interface{}{
"test1": int64(1),
}, time.Unix(0, 0)),
},
expected: []telegraf.Metric{
testutil.MustMetric("Name1", map[string]string{},
map[string]interface{}{
"test1": float64(50),
}, time.Unix(0, 0)),
},
},
{
name: "No Factor",
scale: []scalingValuesFactor{
{
Offset: 50.0,
Fields: []string{"test1"},
},
},
inputs: []telegraf.Metric{
testutil.MustMetric("Name1", map[string]string{},
map[string]interface{}{
"test1": int64(1),
}, time.Unix(0, 0)),
},
expected: []telegraf.Metric{
testutil.MustMetric("Name1", map[string]string{},
map[string]interface{}{
"test1": float64(51),
}, time.Unix(0, 0)),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.scale.Log = testutil.Logger{}
require.ErrorContains(t, tt.scale.Init(), tt.expectedErrorMsg)
plugin := &Scale{
Scalings: make([]Scaling, 0, len(tt.scale)),
Log: testutil.Logger{},
}
for i := range tt.scale {
s := Scaling{
Fields: tt.scale[i].Fields,
}
if tt.scale[i].Factor != 0.0 {
s.Factor = &tt.scale[i].Factor
}
if tt.scale[i].Offset != 0.0 {
s.Offset = &tt.scale[i].Offset
}
plugin.Scalings = append(plugin.Scalings, s)
}
require.NoError(t, plugin.Init())
actual := plugin.Apply(tt.inputs...)
testutil.RequireMetricsEqual(t, tt.expected, actual, cmpopts.EquateApprox(0, 1e-6))
})
}
}
func TestErrorCasesMinMax(t *testing.T) {
a0, a1, a100 := float64(0.0), float64(1.0), float64(100.0)
tests := []struct {
name string
scaling []Scaling
fields []string
expectedErrorMsg string
}{
{
name: "Same input range values",
scaling: []Scaling{
{
InMin: &a1,
InMax: &a1,
OutMin: &a0,
OutMax: &a100,
Fields: []string{"test"},
},
},
fields: []string{"test"},
expectedErrorMsg: "input minimum and maximum are equal for fields test",
},
{
name: "Same input range values",
scaling: []Scaling{
{
InMin: &a0,
InMax: &a1,
OutMin: &a100,
OutMax: &a100,
Fields: []string{"test"},
},
},
fields: []string{"test"},
expectedErrorMsg: "output minimum and maximum are equal",
},
{
name: "Nothing set",
scaling: []Scaling{
{
Fields: []string{"test"},
},
},
fields: []string{"test"},
expectedErrorMsg: "no scaling defined",
},
{
name: "Partial minimum and maximum",
scaling: []Scaling{
{
InMin: &a0,
Fields: []string{"test"},
},
},
fields: []string{"test"},
expectedErrorMsg: "all minimum and maximum values need to be set",
},
{
name: "Mixed minimum, maximum and factor",
scaling: []Scaling{
{
InMin: &a0,
InMax: &a1,
OutMin: &a100,
OutMax: &a100,
Factor: &a1,
Fields: []string{"test"},
},
},
fields: []string{"test"},
expectedErrorMsg: "cannot use factor/offset and minimum/maximum at the same time",
},
{
name: "No scaling",
expectedErrorMsg: "no valid scaling defined",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
plugin := &Scale{
Scalings: tt.scaling,
Log: testutil.Logger{},
}
err := plugin.Init()
require.ErrorContains(t, err, tt.expectedErrorMsg)
})
}
}