From 58b696bc59d4e5176bd42724d34e6a0cf6c8b9c1 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Fri, 11 Apr 2025 20:19:19 +0200 Subject: [PATCH] test(outputs.influxdb): Add benchmarks (#16694) --- plugins/outputs/influxdb/influxdb_test.go | 319 ++++++++++++++++++++++ 1 file changed, 319 insertions(+) diff --git a/plugins/outputs/influxdb/influxdb_test.go b/plugins/outputs/influxdb/influxdb_test.go index 53e08c3b5..8921984c0 100644 --- a/plugins/outputs/influxdb/influxdb_test.go +++ b/plugins/outputs/influxdb/influxdb_test.go @@ -3,6 +3,7 @@ package influxdb_test import ( "context" "net/http" + "net/http/httptest" "testing" "time" @@ -234,3 +235,321 @@ func TestInfluxDBLocalAddress(t *testing.T) { require.NoError(t, output.Connect()) } + +func BenchmarkWrite1k(b *testing.B) { + batchsize := 1000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Username: config.NewSecret([]byte("user")), + Password: config.NewSecret([]byte("secret")), + Database: "my_database", + Timeout: config.Duration(time.Second * 5), + Log: &testutil.Logger{}, + CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { + return influxdb.NewHTTPClient(*config) + }, + CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) { + return influxdb.NewUDPClient(*config) + }, + ContentEncoding: "gzip", + SkipDatabaseCreation: true, + } + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "database": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +} + +func BenchmarkWrite5k(b *testing.B) { + batchsize := 5000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Username: config.NewSecret([]byte("user")), + Password: config.NewSecret([]byte("secret")), + Database: "my_database", + Timeout: config.Duration(time.Second * 5), + Log: &testutil.Logger{}, + CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { + return influxdb.NewHTTPClient(*config) + }, + CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) { + return influxdb.NewUDPClient(*config) + }, + ContentEncoding: "gzip", + SkipDatabaseCreation: true, + } + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "database": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +} + +func BenchmarkWrite10k(b *testing.B) { + batchsize := 10000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Username: config.NewSecret([]byte("user")), + Password: config.NewSecret([]byte("secret")), + Database: "my_database", + Timeout: config.Duration(time.Second * 5), + Log: &testutil.Logger{}, + CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { + return influxdb.NewHTTPClient(*config) + }, + CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) { + return influxdb.NewUDPClient(*config) + }, + ContentEncoding: "gzip", + SkipDatabaseCreation: true, + } + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "database": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +} + +func BenchmarkWrite25k(b *testing.B) { + batchsize := 25000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Username: config.NewSecret([]byte("user")), + Password: config.NewSecret([]byte("secret")), + Database: "my_database", + Timeout: config.Duration(time.Second * 5), + Log: &testutil.Logger{}, + CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { + return influxdb.NewHTTPClient(*config) + }, + CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) { + return influxdb.NewUDPClient(*config) + }, + ContentEncoding: "gzip", + SkipDatabaseCreation: true, + } + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "database": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +} + +func BenchmarkWrite50k(b *testing.B) { + batchsize := 50000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Username: config.NewSecret([]byte("user")), + Password: config.NewSecret([]byte("secret")), + Database: "my_database", + Timeout: config.Duration(time.Second * 5), + Log: &testutil.Logger{}, + CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { + return influxdb.NewHTTPClient(*config) + }, + CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) { + return influxdb.NewUDPClient(*config) + }, + ContentEncoding: "gzip", + SkipDatabaseCreation: true, + } + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "database": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +} + +func BenchmarkWrite100k(b *testing.B) { + batchsize := 100000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Username: config.NewSecret([]byte("user")), + Password: config.NewSecret([]byte("secret")), + Database: "my_database", + Timeout: config.Duration(time.Second * 5), + Log: &testutil.Logger{}, + CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) { + return influxdb.NewHTTPClient(*config) + }, + CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) { + return influxdb.NewUDPClient(*config) + }, + ContentEncoding: "gzip", + SkipDatabaseCreation: true, + } + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "database": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +}