diff --git a/plugins/processors/all/scale.go b/plugins/processors/all/scale.go new file mode 100644 index 000000000..1fd0fc270 --- /dev/null +++ b/plugins/processors/all/scale.go @@ -0,0 +1,5 @@ +//go:build !custom || processors || processors.scale + +package all + +import _ "github.com/influxdata/telegraf/plugins/processors/scale" // register plugin diff --git a/plugins/processors/scale/README.md b/plugins/processors/scale/README.md new file mode 100644 index 000000000..17e458c16 --- /dev/null +++ b/plugins/processors/scale/README.md @@ -0,0 +1,76 @@ +# Scale Processor Plugin + +The scale processor filters for a set of fields, +and scales the respective values from an input range into +the given output range according to this formula: + +```math +\text{result}=(\text{value}-\text{input\_minimum})\cdot\frac{(\text{output\_maximum}-\text{output\_minimum})} +{(\text{input\_maximum}-\text{input\_minimum})} + +\text{output\_minimum} +``` + +Input fields are converted to floating point values. +If the conversion fails, those fields are ignored. + +**Please note:** Neither the input nor the output values +are clipped to their respective ranges! + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +# Scale values with a predefined range to a different output range. +[[processors.scale]] + ## It is possible to define multiple different scaling that can be applied + ## do different sets of fields. Each scaling expects the following + ## arguments: + ## - input_minimum: Minimum expected input value + ## - input_maximum: Maximum expected input value + ## - output_minimum: Minimum desired output value + ## - output_maximum: Maximum desired output value + ## - fields: a list of field names (or filters) to apply this scaling to + + ## Example: Define a scaling + # [processors.scale.scaling] + # input_minimum = 0 + # input_maximum = 1 + # output_minimum = 0 + # output_maximum = 100 + # fields = ["temperature1", "temperature2"] + + ## Multiple scalings can be defined simultaneously + ## Example: A second scaling. + # [processors.scale.scaling] + # input_minimum = 0 + # input_maximum = 50 + # output_minimum = 50 + # output_maximum = 100 + # fields = ["humidity*"] +``` + +## Example + +The example below uses these scaling values: + +```toml +[processors.scale.scaling] + input_minimum = 0 + input_maximum = 50 + output_minimum = 50 + output_maximum = 100 + fields = ["cpu"] +``` + +```diff +- temperature, cpu=25 ++ temperature, cpu=75.0 +``` diff --git a/plugins/processors/scale/sample.conf b/plugins/processors/scale/sample.conf new file mode 100644 index 000000000..d1cee7040 --- /dev/null +++ b/plugins/processors/scale/sample.conf @@ -0,0 +1,27 @@ +# Scale values with a predefined range to a different output range. +[[processors.scale]] + ## It is possible to define multiple different scaling that can be applied + ## do different sets of fields. Each scaling expects the following + ## arguments: + ## - input_minimum: Minimum expected input value + ## - input_maximum: Maximum expected input value + ## - output_minimum: Minimum desired output value + ## - output_maximum: Maximum desired output value + ## - fields: a list of field names (or filters) to apply this scaling to + + ## Example: Define a scaling + # [processors.scale.scaling] + # input_minimum = 0 + # input_maximum = 1 + # output_minimum = 0 + # output_maximum = 100 + # fields = ["temperature1", "temperature2"] + + ## Multiple scalings can be defined simultaneously + ## Example: A second scaling. + # [processors.scale.scaling] + # input_minimum = 0 + # input_maximum = 50 + # output_minimum = 50 + # output_maximum = 100 + # fields = ["humidity*"] diff --git a/plugins/processors/scale/scale.go b/plugins/processors/scale/scale.go new file mode 100644 index 000000000..45b6b399e --- /dev/null +++ b/plugins/processors/scale/scale.go @@ -0,0 +1,120 @@ +//go:generate ../../../tools/readme_config_includer/generator +package scale + +import ( + _ "embed" + "errors" + "fmt" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/processors" +) + +//go:embed sample.conf +var sampleConfig string + +func (*Scale) SampleConfig() string { + return sampleConfig +} + +type Scaling struct { + InMin float64 `toml:"input_minimum"` + InMax float64 `toml:"input_maximum"` + OutMin float64 `toml:"output_minimum"` + OutMax float64 `toml:"output_maximum"` + Fields []string `toml:"fields"` + + factor float64 + fieldFilter filter.Filter +} + +type Scale struct { + Scalings []Scaling `toml:"scaling"` + Log telegraf.Logger `toml:"-"` +} + +func (s *Scaling) init() error { + if s.InMax == s.InMin { + return fmt.Errorf("input minimum and maximum are equal for fields %s", strings.Join(s.Fields, ",")) + } + + if s.OutMax == s.OutMin { + return fmt.Errorf("output minimum and maximum are equal for fields %s", strings.Join(s.Fields, ",")) + } + + scalingFilter, err := filter.Compile(s.Fields) + if err != nil { + return fmt.Errorf("could not compile fields filter: %w", err) + } + s.fieldFilter = scalingFilter + + s.factor = (s.OutMax - s.OutMin) / (s.InMax - s.InMin) + return nil +} + +// scale a float according to the input and output range +func (s *Scaling) process(value float64) float64 { + return (value-s.InMin)*s.factor + s.OutMin +} + +func (s *Scale) Init() error { + if s.Scalings == nil { + return errors.New("no valid scalings defined") + } + + allFields := make(map[string]bool) + for i := range s.Scalings { + for _, field := range s.Scalings[i].Fields { + // only generate a warning for the first duplicate field filter + if warn, ok := allFields[field]; ok && warn { + s.Log.Warnf("filter field %q used twice in scalings", field) + allFields[field] = false + } else { + allFields[field] = true + } + } + + if err := s.Scalings[i].init(); err != nil { + return fmt.Errorf("scaling %d: %w", i+1, err) + } + } + return nil +} + +// handle the scaling process +func (s *Scale) scaleValues(metric telegraf.Metric) { + fields := metric.FieldList() + + for _, scaling := range s.Scalings { + for _, field := range fields { + if !scaling.fieldFilter.Match(field.Key) { + continue + } + + v, err := internal.ToFloat64(field.Value) + if err != nil { + s.Log.Errorf("error converting %q to float: %w\n", field.Key, err) + continue + } + + // scale the field values using the defined scaler + field.Value = scaling.process(v) + } + } +} + +func (s *Scale) Apply(in ...telegraf.Metric) []telegraf.Metric { + for _, metric := range in { + s.scaleValues(metric) + } + return in +} + +func init() { + processors.Add("scale", func() telegraf.Processor { + return &Scale{} + }) +} diff --git a/plugins/processors/scale/scale_test.go b/plugins/processors/scale/scale_test.go new file mode 100644 index 000000000..6fb8dc13d --- /dev/null +++ b/plugins/processors/scale/scale_test.go @@ -0,0 +1,231 @@ +package scale + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestScaler(t *testing.T) { + tests := []struct { + name string + scale *Scale + inputs []telegraf.Metric + expected []telegraf.Metric + }{ + { + name: "Field Scaling", + scale: &Scale{ + Scalings: []Scaling{ + { + InMin: -1, + InMax: 1, + OutMin: 0, + OutMax: 100, + Fields: []string{"test1", "test2"}, + }, + { + InMin: -5, + InMax: 0, + OutMin: 1, + OutMax: 9, + Fields: []string{"test3", "test4"}, + }, + }, + }, + inputs: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": int64(0), + "test2": uint64(1), + }, time.Unix(0, 0)), + testutil.MustMetric("Name2", map[string]string{}, + map[string]interface{}{ + "test1": "0.5", + "test2": float32(-0.5), + }, time.Unix(0, 0)), + testutil.MustMetric("Name3", map[string]string{}, + map[string]interface{}{ + "test3": int64(-3), + "test4": uint64(0), + }, time.Unix(0, 0)), + testutil.MustMetric("Name4", map[string]string{}, + map[string]interface{}{ + "test3": int64(-5), + "test4": float32(-0.5), + }, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": float64(50), + "test2": float64(100), + }, time.Unix(0, 0)), + testutil.MustMetric("Name2", map[string]string{}, + map[string]interface{}{ + "test1": float64(75), + "test2": float32(25), + }, time.Unix(0, 0)), + testutil.MustMetric("Name3", map[string]string{}, + map[string]interface{}{ + "test3": float64(4.2), + "test4": float64(9), + }, time.Unix(0, 0)), + testutil.MustMetric("Name4", map[string]string{}, + map[string]interface{}{ + "test3": float64(1), + "test4": float64(8.2), + }, time.Unix(0, 0)), + }, + }, + { + name: "Ignored Fileds", + scale: &Scale{ + Scalings: []Scaling{ + { + InMin: -1, + InMax: 1, + OutMin: 0, + OutMax: 100, + Fields: []string{"test1", "test2"}, + }, + }, + }, + inputs: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": int64(0), + "test2": uint64(1), + "test3": int64(1), + }, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": float64(50), + "test2": float64(100), + "test3": int64(1), + }, time.Unix(0, 0)), + }, + }, + { + name: "Out of range tests", + scale: &Scale{ + Scalings: []Scaling{ + { + InMin: -1, + InMax: 1, + OutMin: 0, + OutMax: 100, + Fields: []string{"test1", "test2"}, + }, + }, + }, + inputs: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": int64(-2), + "test2": uint64(2), + }, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": float64(-50), + "test2": float64(150), + }, time.Unix(0, 0)), + }, + }, + { + name: "Missing field Fileds", + scale: &Scale{ + Scalings: []Scaling{ + { + InMin: -1, + InMax: 1, + OutMin: 0, + OutMax: 100, + Fields: []string{"test1", "test2"}, + }, + }, + }, + inputs: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": int64(0), + }, time.Unix(0, 0)), + }, + expected: []telegraf.Metric{ + testutil.MustMetric("Name1", map[string]string{}, + map[string]interface{}{ + "test1": float64(50), + }, time.Unix(0, 0)), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.scale.Log = testutil.Logger{} + + require.NoError(t, tt.scale.Init()) + actual := tt.scale.Apply(tt.inputs...) + + testutil.RequireMetricsEqual(t, tt.expected, actual) + }) + } +} + +func TestErrorCases(t *testing.T) { + tests := []struct { + name string + scale *Scale + expectedErrorMsg string + }{ + { + name: "Same input range values", + scale: &Scale{ + Scalings: []Scaling{ + { + InMin: 1, + InMax: 1, + OutMin: 0, + OutMax: 100, + Fields: []string{"test"}, + }, + }, + }, + expectedErrorMsg: "input minimum and maximum are equal for fields test", + }, + { + name: "Same input range values", + scale: &Scale{ + Scalings: []Scaling{ + { + InMin: 0, + InMax: 1, + OutMin: 100, + OutMax: 100, + Fields: []string{"test"}, + }, + }, + }, + expectedErrorMsg: "output minimum and maximum are equal for fields test", + }, + { + name: "No scalings", + scale: &Scale{Log: testutil.Logger{}}, + expectedErrorMsg: "no valid scalings defined", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.scale.Log = testutil.Logger{} + require.ErrorContains(t, tt.scale.Init(), tt.expectedErrorMsg) + }) + } +}