Add time.now() starlark processor example test. (#9133)

This commit is contained in:
David Bennett 2021-04-19 11:14:53 -04:00 committed by GitHub
parent 9d163f6a83
commit da5991d16c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 125 additions and 61 deletions

View File

@ -46,10 +46,11 @@ type Starlark struct {
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
thread *starlark.Thread thread *starlark.Thread
applyFunc *starlark.Function applyFunc *starlark.Function
args starlark.Tuple args starlark.Tuple
results []telegraf.Metric results []telegraf.Metric
starlarkLoadFunc func(module string, logger telegraf.Logger) (starlark.StringDict, error)
} }
func (s *Starlark) Init() error { func (s *Starlark) Init() error {
@ -63,7 +64,7 @@ func (s *Starlark) Init() error {
s.thread = &starlark.Thread{ s.thread = &starlark.Thread{
Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) }, Print: func(_ *starlark.Thread, msg string) { s.Log.Debug(msg) },
Load: func(thread *starlark.Thread, module string) (starlark.StringDict, error) { Load: func(thread *starlark.Thread, module string) (starlark.StringDict, error) {
return loadFunc(module, s.Log) return s.starlarkLoadFunc(module, s.Log)
}, },
} }
@ -240,7 +241,9 @@ func init() {
func init() { func init() {
processors.AddStreaming("starlark", func() telegraf.StreamingProcessor { processors.AddStreaming("starlark", func() telegraf.StreamingProcessor {
return &Starlark{} return &Starlark{
starlarkLoadFunc: loadFunc,
}
}) })
} }

View File

@ -15,6 +15,9 @@ import (
"github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
starlarktime "go.starlark.net/lib/time"
"go.starlark.net/starlark"
"go.starlark.net/starlarkstruct"
) )
// Tests for runtime errors in the processors Init function. // Tests for runtime errors in the processors Init function.
@ -26,8 +29,9 @@ func TestInitError(t *testing.T) {
{ {
name: "source must define apply", name: "source must define apply",
plugin: &Starlark{ plugin: &Starlark{
Source: "", Source: "",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
}, },
{ {
@ -36,7 +40,8 @@ func TestInitError(t *testing.T) {
Source: ` Source: `
apply = 42 apply = 42
`, `,
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
}, },
{ {
@ -46,7 +51,8 @@ apply = 42
def apply(): def apply():
pass pass
`, `,
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
}, },
{ {
@ -55,13 +61,15 @@ def apply():
Source: ` Source: `
for for
`, `,
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
}, },
{ {
name: "no source no script", name: "no source no script",
plugin: &Starlark{ plugin: &Starlark{
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
}, },
{ {
@ -71,15 +79,17 @@ for
def apply(): def apply():
pass pass
`, `,
Script: "testdata/ratio.star", Script: "testdata/ratio.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
}, },
{ {
name: "script file not found", name: "script file not found",
plugin: &Starlark{ plugin: &Starlark{
Script: "testdata/file_not_found.star", Script: "testdata/file_not_found.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
}, },
} }
@ -219,8 +229,9 @@ def apply(metric):
for _, tt := range applyTests { for _, tt := range applyTests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
plugin := &Starlark{ plugin := &Starlark{
Source: tt.source, Source: tt.source,
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
} }
err := plugin.Init() err := plugin.Init()
require.NoError(t, err) require.NoError(t, err)
@ -2476,9 +2487,10 @@ def apply(metric):
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
plugin := &Starlark{ plugin := &Starlark{
Source: tt.source, Source: tt.source,
Log: testutil.Logger{}, Log: testutil.Logger{},
Constants: tt.constants, Constants: tt.constants,
starlarkLoadFunc: testLoadFunc,
} }
err := plugin.Init() err := plugin.Init()
require.NoError(t, err) require.NoError(t, err)
@ -2618,8 +2630,9 @@ func TestScript(t *testing.T) {
{ {
name: "rename", name: "rename",
plugin: &Starlark{ plugin: &Starlark{
Script: "testdata/rename.star", Script: "testdata/rename.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
input: []telegraf.Metric{ input: []telegraf.Metric{
testutil.MustMetric("cpu", testutil.MustMetric("cpu",
@ -2645,8 +2658,9 @@ func TestScript(t *testing.T) {
{ {
name: "drop fields by type", name: "drop fields by type",
plugin: &Starlark{ plugin: &Starlark{
Script: "testdata/drop_string_fields.star", Script: "testdata/drop_string_fields.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
input: []telegraf.Metric{ input: []telegraf.Metric{
testutil.MustMetric("device", testutil.MustMetric("device",
@ -2676,8 +2690,9 @@ func TestScript(t *testing.T) {
{ {
name: "drop fields with unexpected type", name: "drop fields with unexpected type",
plugin: &Starlark{ plugin: &Starlark{
Script: "testdata/drop_fields_with_unexpected_type.star", Script: "testdata/drop_fields_with_unexpected_type.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
input: []telegraf.Metric{ input: []telegraf.Metric{
testutil.MustMetric("device", testutil.MustMetric("device",
@ -2710,8 +2725,9 @@ func TestScript(t *testing.T) {
{ {
name: "scale", name: "scale",
plugin: &Starlark{ plugin: &Starlark{
Script: "testdata/scale.star", Script: "testdata/scale.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
input: []telegraf.Metric{ input: []telegraf.Metric{
testutil.MustMetric("cpu", testutil.MustMetric("cpu",
@ -2731,8 +2747,9 @@ func TestScript(t *testing.T) {
{ {
name: "ratio", name: "ratio",
plugin: &Starlark{ plugin: &Starlark{
Script: "testdata/ratio.star", Script: "testdata/ratio.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
input: []telegraf.Metric{ input: []telegraf.Metric{
testutil.MustMetric("mem", testutil.MustMetric("mem",
@ -2759,8 +2776,9 @@ func TestScript(t *testing.T) {
{ {
name: "logging", name: "logging",
plugin: &Starlark{ plugin: &Starlark{
Script: "testdata/logging.star", Script: "testdata/logging.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
input: []telegraf.Metric{ input: []telegraf.Metric{
testutil.MustMetric("log", testutil.MustMetric("log",
@ -2784,8 +2802,9 @@ func TestScript(t *testing.T) {
{ {
name: "multiple_metrics", name: "multiple_metrics",
plugin: &Starlark{ plugin: &Starlark{
Script: "testdata/multiple_metrics.star", Script: "testdata/multiple_metrics.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
input: []telegraf.Metric{ input: []telegraf.Metric{
testutil.MustMetric("mm", testutil.MustMetric("mm",
@ -2816,8 +2835,9 @@ func TestScript(t *testing.T) {
{ {
name: "multiple_metrics_with_json", name: "multiple_metrics_with_json",
plugin: &Starlark{ plugin: &Starlark{
Script: "testdata/multiple_metrics_with_json.star", Script: "testdata/multiple_metrics_with_json.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
input: []telegraf.Metric{ input: []telegraf.Metric{
testutil.MustMetric("json", testutil.MustMetric("json",
@ -2825,7 +2845,7 @@ func TestScript(t *testing.T) {
map[string]interface{}{ map[string]interface{}{
"value": "[{\"label\": \"hello\"}, {\"label\": \"world\"}]", "value": "[{\"label\": \"hello\"}, {\"label\": \"world\"}]",
}, },
time.Unix(0, 0), time.Unix(1618488000, 999),
), ),
}, },
expected: []telegraf.Metric{ expected: []telegraf.Metric{
@ -2834,22 +2854,23 @@ func TestScript(t *testing.T) {
map[string]interface{}{ map[string]interface{}{
"value": "hello", "value": "hello",
}, },
time.Unix(0, 0), time.Unix(1618488000, 999),
), ),
testutil.MustMetric("json", testutil.MustMetric("json",
map[string]string{}, map[string]string{},
map[string]interface{}{ map[string]interface{}{
"value": "world", "value": "world",
}, },
time.Unix(0, 0), time.Unix(1618488000, 999),
), ),
}, },
}, },
{ {
name: "fail", name: "fail",
plugin: &Starlark{ plugin: &Starlark{
Script: "testdata/fail.star", Script: "testdata/fail.star",
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
}, },
input: []telegraf.Metric{ input: []telegraf.Metric{
testutil.MustMetric("fail", testutil.MustMetric("fail",
@ -3137,8 +3158,9 @@ def apply(metric):
for _, tt := range tests { for _, tt := range tests {
b.Run(tt.name, func(b *testing.B) { b.Run(tt.name, func(b *testing.B) {
plugin := &Starlark{ plugin := &Starlark{
Source: tt.source, Source: tt.source,
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
} }
err := plugin.Init() err := plugin.Init()
@ -3182,8 +3204,9 @@ func TestAllScriptTestData(t *testing.T) {
outputMetrics = parseMetricsFrom(t, lines, "Example Output:") outputMetrics = parseMetricsFrom(t, lines, "Example Output:")
} }
plugin := &Starlark{ plugin := &Starlark{
Script: fn, Script: fn,
Log: testutil.Logger{}, Log: testutil.Logger{},
starlarkLoadFunc: testLoadFunc,
} }
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())
@ -3204,7 +3227,7 @@ func TestAllScriptTestData(t *testing.T) {
err = plugin.Stop() err = plugin.Stop()
require.NoError(t, err) require.NoError(t, err)
testutil.RequireMetricsEqual(t, outputMetrics, acc.GetTelegrafMetrics(), testutil.SortMetrics(), testutil.IgnoreTime()) testutil.RequireMetricsEqual(t, outputMetrics, acc.GetTelegrafMetrics(), testutil.SortMetrics())
}) })
return nil return nil
}) })
@ -3256,3 +3279,22 @@ func parseErrorMessage(t *testing.T, lines []string, header string) string {
require.True(t, startIdx < len(lines), fmt.Sprintf("Expected to find the error message after %q, but found none", header)) require.True(t, startIdx < len(lines), fmt.Sprintf("Expected to find the error message after %q, but found none", header))
return strings.TrimLeft(lines[startIdx], "# ") return strings.TrimLeft(lines[startIdx], "# ")
} }
func testLoadFunc(module string, logger telegraf.Logger) (starlark.StringDict, error) {
result, err := loadFunc(module, logger)
if err != nil {
return nil, err
}
if module == "time.star" {
customModule := result["time"].(*starlarkstruct.Module)
customModule.Members["now"] = starlark.NewBuiltin("now", testNow)
result["time"] = customModule
}
return result, nil
}
func testNow(thread *starlark.Thread, _ *starlark.Builtin, args starlark.Tuple, kwargs []starlark.Tuple) (starlark.Value, error) {
return starlarktime.Time(time.Date(2021, 4, 15, 12, 0, 0, 999, time.UTC)), nil
}

View File

@ -22,4 +22,5 @@ def apply(metric):
result = Metric("cpu_diff") result = Metric("cpu_diff")
# Set the field "value" to the difference between the value of the last metric and the current one # Set the field "value" to the difference between the value of the last metric and the current one
result.fields["value"] = last.fields["value"] - metric.fields["value"] result.fields["value"] = last.fields["value"] - metric.fields["value"]
result.time = metric.time
return result return result

View File

@ -41,7 +41,8 @@ def apply(metric):
diskiops.fields["iops"] = ( io / interval_seconds ) diskiops.fields["iops"] = ( io / interval_seconds )
diskiops.tags["name"] = disk_name diskiops.tags["name"] = disk_name
diskiops.tags["host"] = metric.tags["host"] diskiops.tags["host"] = metric.tags["host"]
return [diskiops] diskiops.time = metric.time
return diskiops
# This could be aggregated to obtain max IOPS using: # This could be aggregated to obtain max IOPS using:
# #

View File

@ -27,7 +27,7 @@
# json value="[{\"fields\": {\"LogEndOffset\": 339238, \"LogStartOffset\": 339238, \"NumLogSegments\": 1, \"Size\": 0, \"UnderReplicatedPartitions\": 0}, \"name\": \"partition\", \"tags\": {\"host\": \"CUD1-001559\", \"jolokia_agent_url\": \"http://localhost:7777/jolokia\", \"partition\": \"1\", \"topic\": \"qa-kafka-connect-logs\"}, \"timestamp\": 1591124461}]" # json value="[{\"fields\": {\"LogEndOffset\": 339238, \"LogStartOffset\": 339238, \"NumLogSegments\": 1, \"Size\": 0, \"UnderReplicatedPartitions\": 0}, \"name\": \"partition\", \"tags\": {\"host\": \"CUD1-001559\", \"jolokia_agent_url\": \"http://localhost:7777/jolokia\", \"partition\": \"1\", \"topic\": \"qa-kafka-connect-logs\"}, \"timestamp\": 1591124461}]"
# Example Output: # Example Output:
# partition,host=CUD1-001559,jolokia_agent_url=http://localhost:7777/jolokia,partition=1,topic=qa-kafka-connect-logs LogEndOffset=339238i,LogStartOffset=339238i,NumLogSegments=1i,Size=0i,UnderReplicatedPartitions=0i 1610056029037925000 # partition,host=CUD1-001559,jolokia_agent_url=http://localhost:7777/jolokia,partition=1,topic=qa-kafka-connect-logs LogEndOffset=339238i,LogStartOffset=339238i,NumLogSegments=1i,Size=0i,UnderReplicatedPartitions=0i 1591124461000000000
load("json.star", "json") load("json.star", "json")
@ -41,5 +41,6 @@ def apply(metric):
new_metric.tags[str(tag[0])] = tag[1] new_metric.tags[str(tag[0])] = tag[1]
for field in obj["fields"].items(): # 5 Fields to iterate through for field in obj["fields"].items(): # 5 Fields to iterate through
new_metric.fields[str(field[0])] = field[1] new_metric.fields[str(field[0])] = field[1]
new_metric.time = int(obj["timestamp"] * 1e9)
metrics.append(new_metric) metrics.append(new_metric)
return metrics return metrics

View File

@ -15,7 +15,7 @@ def apply(metric):
# Set the field "value" to b # Set the field "value" to b
metric2.fields["value"] = "b" metric2.fields["value"] = "b"
# Reset the time (only needed for testing purpose) # Reset the time (only needed for testing purpose)
metric2.time = 0 metric2.time = metric.time
# Add metric2 to the list of metrics # Add metric2 to the list of metrics
metrics.append(metric2) metrics.append(metric2)
# Rename the original metric to "mm1" # Rename the original metric to "mm1"

View File

@ -4,11 +4,12 @@
# json value="[{\"label\": \"hello\"}, {\"label\": \"world\"}]" # json value="[{\"label\": \"hello\"}, {\"label\": \"world\"}]"
# #
# Example Output: # Example Output:
# json value="hello" 1465839830100400201 # json value="hello" 1618488000000000999
# json value="world" 1465839830100400201 # json value="world" 1618488000000000999
# loads json.encode(), json.decode(), json.indent() # loads json.encode(), json.decode(), json.indent()
load("json.star", "json") load("json.star", "json")
load("time.star", "time")
def apply(metric): def apply(metric):
# Initialize a list of metrics # Initialize a list of metrics
@ -20,7 +21,7 @@ def apply(metric):
# Set the field "value" to the label extracted from the current json object # Set the field "value" to the label extracted from the current json object
current_metric.fields["value"] = obj["label"] current_metric.fields["value"] = obj["label"]
# Reset the time (only needed for testing purpose) # Reset the time (only needed for testing purpose)
current_metric.time = 0 current_metric.time = time.now().unix_nano
# Add metric to the list of metrics # Add metric to the list of metrics
metrics.append(current_metric) metrics.append(current_metric)
return metrics return metrics

View File

@ -4,10 +4,10 @@ In this example it pivots the value of key `sensor`
to be the key of the value in key `value` to be the key of the value in key `value`
Example Input: Example Input:
temperature sensor="001A0",value=111.48 temperature sensor="001A0",value=111.48 1618488000000000999
Example Output: Example Output:
temperature 001A0=111.48 temperature 001A0=111.48 1618488000000000999
''' '''
def apply(metric): def apply(metric):

View File

@ -2,10 +2,10 @@
# Assumes there is only one field as is the case for prometheus remote write. # Assumes there is only one field as is the case for prometheus remote write.
# #
# Example Input: # Example Input:
# prometheus_remote_write,instance=localhost:9090,job=prometheus,quantile=0.99 go_gc_duration_seconds=4.63 1614889298859000000 # prometheus_remote_write,instance=localhost:9090,job=prometheus,quantile=0.99 go_gc_duration_seconds=4.63 1618488000000000999
# #
# Example Output: # Example Output:
# go_gc_duration_seconds,instance=localhost:9090,job=prometheus,quantile=0.99 value=4.63 1614889299000000000 # go_gc_duration_seconds,instance=localhost:9090,job=prometheus,quantile=0.99 value=4.63 1618488000000000999
def apply(metric): def apply(metric):
if metric.name == "prometheus_remote_write": if metric.name == "prometheus_remote_write":

View File

@ -51,7 +51,7 @@ def apply(metric):
produce_pairs(new_metric, str_keys, "str", key=True) produce_pairs(new_metric, str_keys, "str", key=True)
produce_pairs(new_metric, str_vals, "str") produce_pairs(new_metric, str_vals, "str")
new_metric.time = metric.time
return new_metric return new_metric
def produce_pairs(metric, li, field_type, key=False): def produce_pairs(metric, li, field_type, key=False):

View File

@ -0,0 +1,15 @@
# Example of setting the metric timestamp to the current time.
#
# Example Input:
# time result="OK" 1515581000000000000
#
# Example Output:
# time result="OK" 1618488000000000999
load('time.star', 'time')
def apply(metric):
# You can set the timestamp by using the current time.
metric.time = time.now().unix_nano
return metric

View File

@ -4,11 +4,11 @@ In this example we look at the `value` field of the metric.
If the value is zeor, we delete all the fields, effectively dropping the metric. If the value is zeor, we delete all the fields, effectively dropping the metric.
Example Input: Example Input:
temperature sensor="001A0",value=111.48 temperature sensor="001A0",value=111.48 1618488000000000999
temperature sensor="001B0",value=0.0 temperature sensor="001B0",value=0.0 1618488000000000999
Example Output: Example Output:
temperature sensor="001A0",value=111.48 temperature sensor="001A0",value=111.48 1618488000000000999
''' '''
def apply(metric): def apply(metric):