From da5991d16c104c811184cfb7b29b10b2d08b580e Mon Sep 17 00:00:00 2001 From: David Bennett <71459415+Jagularr@users.noreply.github.com> Date: Mon, 19 Apr 2021 11:14:53 -0400 Subject: [PATCH] Add time.now() starlark processor example test. (#9133) --- plugins/processors/starlark/starlark.go | 15 ++- plugins/processors/starlark/starlark_test.go | 124 ++++++++++++------ .../starlark/testdata/compare_metrics.star | 1 + .../processors/starlark/testdata/iops.star | 3 +- .../starlark/testdata/json_nested.star | 3 +- .../starlark/testdata/multiple_metrics.star | 2 +- .../testdata/multiple_metrics_with_json.star | 7 +- .../processors/starlark/testdata/pivot.star | 4 +- .../rename_prometheus_remote_write.star | 4 +- .../starlark/testdata/schema_sizing.star | 2 +- .../starlark/testdata/time_set_timestamp.star | 15 +++ .../starlark/testdata/value_filter.star | 6 +- 12 files changed, 125 insertions(+), 61 deletions(-) create mode 100644 plugins/processors/starlark/testdata/time_set_timestamp.star diff --git a/plugins/processors/starlark/starlark.go b/plugins/processors/starlark/starlark.go index dceee7bfb..44f78fa6b 100644 --- a/plugins/processors/starlark/starlark.go +++ b/plugins/processors/starlark/starlark.go @@ -46,10 +46,11 @@ type Starlark struct { Log telegraf.Logger `toml:"-"` - thread *starlark.Thread - applyFunc *starlark.Function - args starlark.Tuple - results []telegraf.Metric + thread *starlark.Thread + applyFunc *starlark.Function + args starlark.Tuple + results []telegraf.Metric + starlarkLoadFunc func(module string, logger telegraf.Logger) (starlark.StringDict, error) } func (s *Starlark) Init() error { @@ -63,7 +64,7 @@ func (s *Starlark) Init() error { s.thread = &starlark.Thread{ Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) }, Load: func(thread *starlark.Thread, module string) (starlark.StringDict, error) { - return loadFunc(module, s.Log) + return s.starlarkLoadFunc(module, s.Log) }, } @@ -240,7 +241,9 @@ func init() { func init() { processors.AddStreaming("starlark", func() telegraf.StreamingProcessor { - return &Starlark{} + return &Starlark{ + starlarkLoadFunc: loadFunc, + } }) } diff --git a/plugins/processors/starlark/starlark_test.go b/plugins/processors/starlark/starlark_test.go index f506e26ec..15152a2f3 100644 --- a/plugins/processors/starlark/starlark_test.go +++ b/plugins/processors/starlark/starlark_test.go @@ -15,6 +15,9 @@ import ( "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" + starlarktime "go.starlark.net/lib/time" + "go.starlark.net/starlark" + "go.starlark.net/starlarkstruct" ) // Tests for runtime errors in the processors Init function. @@ -26,8 +29,9 @@ func TestInitError(t *testing.T) { { name: "source must define apply", plugin: &Starlark{ - Source: "", - Log: testutil.Logger{}, + Source: "", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, }, { @@ -36,7 +40,8 @@ func TestInitError(t *testing.T) { Source: ` apply = 42 `, - Log: testutil.Logger{}, + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, }, { @@ -46,7 +51,8 @@ apply = 42 def apply(): pass `, - Log: testutil.Logger{}, + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, }, { @@ -55,13 +61,15 @@ def apply(): Source: ` for `, - Log: testutil.Logger{}, + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, }, { name: "no source no script", plugin: &Starlark{ - Log: testutil.Logger{}, + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, }, { @@ -71,15 +79,17 @@ for def apply(): pass `, - Script: "testdata/ratio.star", - Log: testutil.Logger{}, + Script: "testdata/ratio.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, }, { name: "script file not found", plugin: &Starlark{ - Script: "testdata/file_not_found.star", - Log: testutil.Logger{}, + Script: "testdata/file_not_found.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, }, } @@ -219,8 +229,9 @@ def apply(metric): for _, tt := range applyTests { t.Run(tt.name, func(t *testing.T) { plugin := &Starlark{ - Source: tt.source, - Log: testutil.Logger{}, + Source: tt.source, + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, } err := plugin.Init() require.NoError(t, err) @@ -2476,9 +2487,10 @@ def apply(metric): for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { plugin := &Starlark{ - Source: tt.source, - Log: testutil.Logger{}, - Constants: tt.constants, + Source: tt.source, + Log: testutil.Logger{}, + Constants: tt.constants, + starlarkLoadFunc: testLoadFunc, } err := plugin.Init() require.NoError(t, err) @@ -2618,8 +2630,9 @@ func TestScript(t *testing.T) { { name: "rename", plugin: &Starlark{ - Script: "testdata/rename.star", - Log: testutil.Logger{}, + Script: "testdata/rename.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, input: []telegraf.Metric{ testutil.MustMetric("cpu", @@ -2645,8 +2658,9 @@ func TestScript(t *testing.T) { { name: "drop fields by type", plugin: &Starlark{ - Script: "testdata/drop_string_fields.star", - Log: testutil.Logger{}, + Script: "testdata/drop_string_fields.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, input: []telegraf.Metric{ testutil.MustMetric("device", @@ -2676,8 +2690,9 @@ func TestScript(t *testing.T) { { name: "drop fields with unexpected type", plugin: &Starlark{ - Script: "testdata/drop_fields_with_unexpected_type.star", - Log: testutil.Logger{}, + Script: "testdata/drop_fields_with_unexpected_type.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, input: []telegraf.Metric{ testutil.MustMetric("device", @@ -2710,8 +2725,9 @@ func TestScript(t *testing.T) { { name: "scale", plugin: &Starlark{ - Script: "testdata/scale.star", - Log: testutil.Logger{}, + Script: "testdata/scale.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, input: []telegraf.Metric{ testutil.MustMetric("cpu", @@ -2731,8 +2747,9 @@ func TestScript(t *testing.T) { { name: "ratio", plugin: &Starlark{ - Script: "testdata/ratio.star", - Log: testutil.Logger{}, + Script: "testdata/ratio.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, input: []telegraf.Metric{ testutil.MustMetric("mem", @@ -2759,8 +2776,9 @@ func TestScript(t *testing.T) { { name: "logging", plugin: &Starlark{ - Script: "testdata/logging.star", - Log: testutil.Logger{}, + Script: "testdata/logging.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, input: []telegraf.Metric{ testutil.MustMetric("log", @@ -2784,8 +2802,9 @@ func TestScript(t *testing.T) { { name: "multiple_metrics", plugin: &Starlark{ - Script: "testdata/multiple_metrics.star", - Log: testutil.Logger{}, + Script: "testdata/multiple_metrics.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, input: []telegraf.Metric{ testutil.MustMetric("mm", @@ -2816,8 +2835,9 @@ func TestScript(t *testing.T) { { name: "multiple_metrics_with_json", plugin: &Starlark{ - Script: "testdata/multiple_metrics_with_json.star", - Log: testutil.Logger{}, + Script: "testdata/multiple_metrics_with_json.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, input: []telegraf.Metric{ testutil.MustMetric("json", @@ -2825,7 +2845,7 @@ func TestScript(t *testing.T) { map[string]interface{}{ "value": "[{\"label\": \"hello\"}, {\"label\": \"world\"}]", }, - time.Unix(0, 0), + time.Unix(1618488000, 999), ), }, expected: []telegraf.Metric{ @@ -2834,22 +2854,23 @@ func TestScript(t *testing.T) { map[string]interface{}{ "value": "hello", }, - time.Unix(0, 0), + time.Unix(1618488000, 999), ), testutil.MustMetric("json", map[string]string{}, map[string]interface{}{ "value": "world", }, - time.Unix(0, 0), + time.Unix(1618488000, 999), ), }, }, { name: "fail", plugin: &Starlark{ - Script: "testdata/fail.star", - Log: testutil.Logger{}, + Script: "testdata/fail.star", + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, }, input: []telegraf.Metric{ testutil.MustMetric("fail", @@ -3137,8 +3158,9 @@ def apply(metric): for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { plugin := &Starlark{ - Source: tt.source, - Log: testutil.Logger{}, + Source: tt.source, + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, } err := plugin.Init() @@ -3182,8 +3204,9 @@ func TestAllScriptTestData(t *testing.T) { outputMetrics = parseMetricsFrom(t, lines, "Example Output:") } plugin := &Starlark{ - Script: fn, - Log: testutil.Logger{}, + Script: fn, + Log: testutil.Logger{}, + starlarkLoadFunc: testLoadFunc, } require.NoError(t, plugin.Init()) @@ -3204,7 +3227,7 @@ func TestAllScriptTestData(t *testing.T) { err = plugin.Stop() require.NoError(t, err) - testutil.RequireMetricsEqual(t, outputMetrics, acc.GetTelegrafMetrics(), testutil.SortMetrics(), testutil.IgnoreTime()) + testutil.RequireMetricsEqual(t, outputMetrics, acc.GetTelegrafMetrics(), testutil.SortMetrics()) }) return nil }) @@ -3256,3 +3279,22 @@ func parseErrorMessage(t *testing.T, lines []string, header string) string { require.True(t, startIdx < len(lines), fmt.Sprintf("Expected to find the error message after %q, but found none", header)) return strings.TrimLeft(lines[startIdx], "# ") } + +func testLoadFunc(module string, logger telegraf.Logger) (starlark.StringDict, error) { + result, err := loadFunc(module, logger) + if err != nil { + return nil, err + } + + if module == "time.star" { + customModule := result["time"].(*starlarkstruct.Module) + customModule.Members["now"] = starlark.NewBuiltin("now", testNow) + result["time"] = customModule + } + + return result, nil +} + +func testNow(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) { + return starlarktime.Time(time.Date(2021, 4, 15, 12, 0, 0, 999, time.UTC)), nil +} diff --git a/plugins/processors/starlark/testdata/compare_metrics.star b/plugins/processors/starlark/testdata/compare_metrics.star index 79555729d..5e855df44 100644 --- a/plugins/processors/starlark/testdata/compare_metrics.star +++ b/plugins/processors/starlark/testdata/compare_metrics.star @@ -22,4 +22,5 @@ def apply(metric): result = Metric("cpu_diff") # Set the field "value" to the difference between the value of the last metric and the current one result.fields["value"] = last.fields["value"] - metric.fields["value"] + result.time = metric.time return result diff --git a/plugins/processors/starlark/testdata/iops.star b/plugins/processors/starlark/testdata/iops.star index e92b79e0a..fad572f27 100644 --- a/plugins/processors/starlark/testdata/iops.star +++ b/plugins/processors/starlark/testdata/iops.star @@ -41,7 +41,8 @@ def apply(metric): diskiops.fields["iops"] = ( io / interval_seconds ) diskiops.tags["name"] = disk_name diskiops.tags["host"] = metric.tags["host"] - return [diskiops] + diskiops.time = metric.time + return diskiops # This could be aggregated to obtain max IOPS using: # diff --git a/plugins/processors/starlark/testdata/json_nested.star b/plugins/processors/starlark/testdata/json_nested.star index 3ffa20d0c..cc391d6a5 100644 --- a/plugins/processors/starlark/testdata/json_nested.star +++ b/plugins/processors/starlark/testdata/json_nested.star @@ -27,7 +27,7 @@ # json value="[{\"fields\": {\"LogEndOffset\": 339238, \"LogStartOffset\": 339238, \"NumLogSegments\": 1, \"Size\": 0, \"UnderReplicatedPartitions\": 0}, \"name\": \"partition\", \"tags\": {\"host\": \"CUD1-001559\", \"jolokia_agent_url\": \"http://localhost:7777/jolokia\", \"partition\": \"1\", \"topic\": \"qa-kafka-connect-logs\"}, \"timestamp\": 1591124461}]" # Example Output: -# partition,host=CUD1-001559,jolokia_agent_url=http://localhost:7777/jolokia,partition=1,topic=qa-kafka-connect-logs LogEndOffset=339238i,LogStartOffset=339238i,NumLogSegments=1i,Size=0i,UnderReplicatedPartitions=0i 1610056029037925000 +# partition,host=CUD1-001559,jolokia_agent_url=http://localhost:7777/jolokia,partition=1,topic=qa-kafka-connect-logs LogEndOffset=339238i,LogStartOffset=339238i,NumLogSegments=1i,Size=0i,UnderReplicatedPartitions=0i 1591124461000000000 load("json.star", "json") @@ -41,5 +41,6 @@ def apply(metric): new_metric.tags[str(tag[0])] = tag[1] for field in obj["fields"].items(): # 5 Fields to iterate through new_metric.fields[str(field[0])] = field[1] + new_metric.time = int(obj["timestamp"] * 1e9) metrics.append(new_metric) return metrics diff --git a/plugins/processors/starlark/testdata/multiple_metrics.star b/plugins/processors/starlark/testdata/multiple_metrics.star index 3d2e3d85f..6abf567f6 100644 --- a/plugins/processors/starlark/testdata/multiple_metrics.star +++ b/plugins/processors/starlark/testdata/multiple_metrics.star @@ -15,7 +15,7 @@ def apply(metric): # Set the field "value" to b metric2.fields["value"] = "b" # Reset the time (only needed for testing purpose) - metric2.time = 0 + metric2.time = metric.time # Add metric2 to the list of metrics metrics.append(metric2) # Rename the original metric to "mm1" diff --git a/plugins/processors/starlark/testdata/multiple_metrics_with_json.star b/plugins/processors/starlark/testdata/multiple_metrics_with_json.star index 78f318e62..fa4dfcc48 100644 --- a/plugins/processors/starlark/testdata/multiple_metrics_with_json.star +++ b/plugins/processors/starlark/testdata/multiple_metrics_with_json.star @@ -4,11 +4,12 @@ # json value="[{\"label\": \"hello\"}, {\"label\": \"world\"}]" # # Example Output: -# json value="hello" 1465839830100400201 -# json value="world" 1465839830100400201 +# json value="hello" 1618488000000000999 +# json value="world" 1618488000000000999 # loads json.encode(), json.decode(), json.indent() load("json.star", "json") +load("time.star", "time") def apply(metric): # Initialize a list of metrics @@ -20,7 +21,7 @@ def apply(metric): # Set the field "value" to the label extracted from the current json object current_metric.fields["value"] = obj["label"] # Reset the time (only needed for testing purpose) - current_metric.time = 0 + current_metric.time = time.now().unix_nano # Add metric to the list of metrics metrics.append(current_metric) return metrics diff --git a/plugins/processors/starlark/testdata/pivot.star b/plugins/processors/starlark/testdata/pivot.star index f32ebf45d..c57d13d5f 100644 --- a/plugins/processors/starlark/testdata/pivot.star +++ b/plugins/processors/starlark/testdata/pivot.star @@ -4,10 +4,10 @@ In this example it pivots the value of key `sensor` to be the key of the value in key `value` Example Input: -temperature sensor="001A0",value=111.48 +temperature sensor="001A0",value=111.48 1618488000000000999 Example Output: -temperature 001A0=111.48 +temperature 001A0=111.48 1618488000000000999 ''' def apply(metric): diff --git a/plugins/processors/starlark/testdata/rename_prometheus_remote_write.star b/plugins/processors/starlark/testdata/rename_prometheus_remote_write.star index cee49196c..87c4e764b 100644 --- a/plugins/processors/starlark/testdata/rename_prometheus_remote_write.star +++ b/plugins/processors/starlark/testdata/rename_prometheus_remote_write.star @@ -2,10 +2,10 @@ # Assumes there is only one field as is the case for prometheus remote write. # # Example Input: -# prometheus_remote_write,instance=localhost:9090,job=prometheus,quantile=0.99 go_gc_duration_seconds=4.63 1614889298859000000 +# prometheus_remote_write,instance=localhost:9090,job=prometheus,quantile=0.99 go_gc_duration_seconds=4.63 1618488000000000999 # # Example Output: -# go_gc_duration_seconds,instance=localhost:9090,job=prometheus,quantile=0.99 value=4.63 1614889299000000000 +# go_gc_duration_seconds,instance=localhost:9090,job=prometheus,quantile=0.99 value=4.63 1618488000000000999 def apply(metric): if metric.name == "prometheus_remote_write": diff --git a/plugins/processors/starlark/testdata/schema_sizing.star b/plugins/processors/starlark/testdata/schema_sizing.star index d382749cb..c716a153c 100644 --- a/plugins/processors/starlark/testdata/schema_sizing.star +++ b/plugins/processors/starlark/testdata/schema_sizing.star @@ -51,7 +51,7 @@ def apply(metric): produce_pairs(new_metric, str_keys, "str", key=True) produce_pairs(new_metric, str_vals, "str") - + new_metric.time = metric.time return new_metric def produce_pairs(metric, li, field_type, key=False): diff --git a/plugins/processors/starlark/testdata/time_set_timestamp.star b/plugins/processors/starlark/testdata/time_set_timestamp.star new file mode 100644 index 000000000..bc64457dc --- /dev/null +++ b/plugins/processors/starlark/testdata/time_set_timestamp.star @@ -0,0 +1,15 @@ +# Example of setting the metric timestamp to the current time. +# +# Example Input: +# time result="OK" 1515581000000000000 +# +# Example Output: +# time result="OK" 1618488000000000999 + +load('time.star', 'time') + +def apply(metric): + # You can set the timestamp by using the current time. + metric.time = time.now().unix_nano + + return metric \ No newline at end of file diff --git a/plugins/processors/starlark/testdata/value_filter.star b/plugins/processors/starlark/testdata/value_filter.star index eeb2432f6..a4ceb28a6 100644 --- a/plugins/processors/starlark/testdata/value_filter.star +++ b/plugins/processors/starlark/testdata/value_filter.star @@ -4,11 +4,11 @@ In this example we look at the `value` field of the metric. If the value is zeor, we delete all the fields, effectively dropping the metric. Example Input: -temperature sensor="001A0",value=111.48 -temperature sensor="001B0",value=0.0 +temperature sensor="001A0",value=111.48 1618488000000000999 +temperature sensor="001B0",value=0.0 1618488000000000999 Example Output: -temperature sensor="001A0",value=111.48 +temperature sensor="001A0",value=111.48 1618488000000000999 ''' def apply(metric):