chore(parsers): Add benchmarks for avro, collectd, prometheus remote write (#14303)
This commit is contained in:
parent
5505a2150a
commit
ba251d8849
|
|
@ -80,3 +80,40 @@ func TestCases(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const benchmarkSchema = `
|
||||||
|
{
|
||||||
|
"namespace": "com.benchmark",
|
||||||
|
"name": "benchmark",
|
||||||
|
"type": "record",
|
||||||
|
"version": "1",
|
||||||
|
"fields": [
|
||||||
|
{"name": "value", "type": "float", "doc": ""},
|
||||||
|
{"name": "timestamp", "type": "long", "doc": ""},
|
||||||
|
{"name": "tags_platform", "type": "string", "doc": ""},
|
||||||
|
{"name": "tags_sdkver", "type": "string", "default": "", "doc": ""},
|
||||||
|
{"name": "source", "type": "string", "default": "", "doc": ""}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func BenchmarkParsing(b *testing.B) {
|
||||||
|
plugin := &Parser{
|
||||||
|
Format: "json",
|
||||||
|
Measurement: "benchmark",
|
||||||
|
Tags: []string{"tags_platform", "tags_sdkver", "source"},
|
||||||
|
Fields: []string{"value"},
|
||||||
|
Timestamp: "timestamp",
|
||||||
|
TimestampFormat: "unix",
|
||||||
|
Schema: benchmarkSchema,
|
||||||
|
}
|
||||||
|
require.NoError(b, plugin.Init())
|
||||||
|
|
||||||
|
benchmarkData, err := os.ReadFile(filepath.Join("testdata", "benchmark", "message.json"))
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
_, _ = plugin.Parse(benchmarkData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
benchmark,source=myhost,tags_platform=python,tags_sdkver=3.11.5 value=5.0 1653643421
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
{
|
||||||
|
"timestamp": 1653643421,
|
||||||
|
"value": 5,
|
||||||
|
"source": "myhost",
|
||||||
|
"tags_platform": "python",
|
||||||
|
"tags_sdkver": "3.11.5"
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,25 @@
|
||||||
|
[[ inputs.file ]]
|
||||||
|
files = ["./testdata/benchmark/message.json"]
|
||||||
|
data_format = "avro"
|
||||||
|
|
||||||
|
avro_format = "json"
|
||||||
|
avro_measurement = "benchmark"
|
||||||
|
avro_tags = ["tags_platform", "tags_sdkver", "source"]
|
||||||
|
avro_fields = ["value"]
|
||||||
|
avro_timestamp = "timestamp"
|
||||||
|
avro_timestamp_format = "unix"
|
||||||
|
avro_schema = '''
|
||||||
|
{
|
||||||
|
"namespace": "com.benchmark",
|
||||||
|
"name": "benchmark",
|
||||||
|
"type": "record",
|
||||||
|
"version": "1",
|
||||||
|
"fields": [
|
||||||
|
{"name": "value", "type": "float", "doc": ""},
|
||||||
|
{"name": "timestamp", "type": "long", "doc": ""},
|
||||||
|
{"name": "tags_platform", "type": "string", "doc": ""},
|
||||||
|
{"name": "tags_sdkver", "type": "string", "default": "", "doc": ""},
|
||||||
|
{"name": "source", "type": "string", "default": "", "doc": ""}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
'''
|
||||||
|
|
@ -3,12 +3,15 @@ package collectd
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"collectd.org/api"
|
"collectd.org/api"
|
||||||
"collectd.org/network"
|
"collectd.org/network"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AuthMap struct {
|
type AuthMap struct {
|
||||||
|
|
@ -290,10 +293,10 @@ func TestParseLine(t *testing.T) {
|
||||||
ParseMultiValue: "split",
|
ParseMultiValue: "split",
|
||||||
}
|
}
|
||||||
require.NoError(t, parser.Init())
|
require.NoError(t, parser.Init())
|
||||||
metric, err := parser.ParseLine(string(bytes))
|
m, err := parser.ParseLine(string(bytes))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assertEqualMetrics(t, singleMetric.expected, []telegraf.Metric{metric})
|
assertEqualMetrics(t, singleMetric.expected, []telegraf.Metric{m})
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeValueList(valueLists []api.ValueList) (*network.Buffer, error) {
|
func writeValueList(valueLists []api.ValueList) (*network.Buffer, error) {
|
||||||
|
|
@ -318,3 +321,90 @@ func assertEqualMetrics(t *testing.T, expected []metricData, received []telegraf
|
||||||
require.Equal(t, expected[i].fields, m.Fields())
|
require.Equal(t, expected[i].fields, m.Fields())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var benchmarkData = []api.ValueList{
|
||||||
|
{
|
||||||
|
Identifier: api.Identifier{
|
||||||
|
Host: "xyzzy",
|
||||||
|
Plugin: "cpu",
|
||||||
|
PluginInstance: "1",
|
||||||
|
Type: "cpu",
|
||||||
|
TypeInstance: "user",
|
||||||
|
},
|
||||||
|
Values: []api.Value{
|
||||||
|
api.Counter(4),
|
||||||
|
},
|
||||||
|
DSNames: []string(nil),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Identifier: api.Identifier{
|
||||||
|
Host: "xyzzy",
|
||||||
|
Plugin: "cpu",
|
||||||
|
PluginInstance: "2",
|
||||||
|
Type: "cpu",
|
||||||
|
TypeInstance: "user",
|
||||||
|
},
|
||||||
|
Values: []api.Value{
|
||||||
|
api.Counter(5),
|
||||||
|
},
|
||||||
|
DSNames: []string(nil),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBenchmarkData(t *testing.T) {
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"cpu_value",
|
||||||
|
map[string]string{
|
||||||
|
"host": "xyzzy",
|
||||||
|
"instance": "1",
|
||||||
|
"type": "cpu",
|
||||||
|
"type_instance": "user",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 4.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cpu_value",
|
||||||
|
map[string]string{
|
||||||
|
"host": "xyzzy",
|
||||||
|
"instance": "2",
|
||||||
|
"type": "cpu",
|
||||||
|
"type_instance": "user",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"value": 5.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
buf, err := writeValueList(benchmarkData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
bytes, err := buf.Bytes()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
parser := &Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
actual, err := parser.Parse(bytes)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testutil.RequireMetricsEqual(t, expected, actual, testutil.IgnoreTime(), testutil.SortMetrics())
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkParsing(b *testing.B) {
|
||||||
|
buf, err := writeValueList(benchmarkData)
|
||||||
|
require.NoError(b, err)
|
||||||
|
bytes, err := buf.Bytes()
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
parser := &Parser{}
|
||||||
|
require.NoError(b, parser.Init())
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
_, _ = parser.Parse(bytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -156,3 +157,79 @@ func TestMetricsWithTimestamp(t *testing.T) {
|
||||||
require.Len(t, metrics, 1)
|
require.Len(t, metrics, 1)
|
||||||
testutil.RequireMetricsEqual(t, expected, metrics, testutil.SortMetrics())
|
testutil.RequireMetricsEqual(t, expected, metrics, testutil.SortMetrics())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var benchmarkData = prompb.WriteRequest{
|
||||||
|
Timeseries: []prompb.TimeSeries{
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: "__name__", Value: "benchmark_a"},
|
||||||
|
{Name: "source", Value: "myhost"},
|
||||||
|
{Name: "tags_platform", Value: "python"},
|
||||||
|
{Name: "tags_sdkver", Value: "3.11.5"},
|
||||||
|
},
|
||||||
|
Samples: []prompb.Sample{
|
||||||
|
{Value: 5.0, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixMilli()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: "__name__", Value: "benchmark_b"},
|
||||||
|
{Name: "source", Value: "myhost"},
|
||||||
|
{Name: "tags_platform", Value: "python"},
|
||||||
|
{Name: "tags_sdkver", Value: "3.11.4"},
|
||||||
|
},
|
||||||
|
Samples: []prompb.Sample{
|
||||||
|
{Value: 4.0, Timestamp: time.Date(2020, 4, 1, 0, 0, 0, 0, time.UTC).UnixMilli()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBenchmarkData(t *testing.T) {
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"prometheus_remote_write",
|
||||||
|
map[string]string{
|
||||||
|
"source": "myhost",
|
||||||
|
"tags_platform": "python",
|
||||||
|
"tags_sdkver": "3.11.5",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"benchmark_a": 5.0,
|
||||||
|
},
|
||||||
|
time.Unix(1585699200, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"prometheus_remote_write",
|
||||||
|
map[string]string{
|
||||||
|
"source": "myhost",
|
||||||
|
"tags_platform": "python",
|
||||||
|
"tags_sdkver": "3.11.4",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"benchmark_b": 4.0,
|
||||||
|
},
|
||||||
|
time.Unix(1585699200, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
benchmarkData, err := benchmarkData.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
plugin := &Parser{}
|
||||||
|
actual, err := plugin.Parse(benchmarkData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkParsing(b *testing.B) {
|
||||||
|
benchmarkData, err := benchmarkData.Marshal()
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
plugin := &Parser{}
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for n := 0; n < b.N; n++ {
|
||||||
|
_, _ = plugin.Parse(benchmarkData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue