diff --git a/plugins/outputs/influxdb_v2/influxdb_v2_test.go b/plugins/outputs/influxdb_v2/influxdb_v2_test.go index 5a9434a1d..e5ae708f7 100644 --- a/plugins/outputs/influxdb_v2/influxdb_v2_test.go +++ b/plugins/outputs/influxdb_v2/influxdb_v2_test.go @@ -906,3 +906,267 @@ func TestUseDynamicSecret(t *testing.T) { require.NoError(t, secretToken.Set([]byte(token))) require.NoError(t, plugin.Write(metrics)) } + +func BenchmarkWrite1k(b *testing.B) { + batchsize := 1000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + } + require.NoError(b, plugin.Init()) + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +} + +func BenchmarkWrite5k(b *testing.B) { + batchsize := 5000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + } + require.NoError(b, plugin.Init()) + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +} + +func BenchmarkWrite10k(b *testing.B) { + batchsize := 10000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + } + require.NoError(b, plugin.Init()) + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +} + +func BenchmarkWrite25k(b *testing.B) { + batchsize := 25000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + } + require.NoError(b, plugin.Init()) + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +} + +func BenchmarkWrite50k(b *testing.B) { + batchsize := 50000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + } + require.NoError(b, plugin.Init()) + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +} + +func BenchmarkWrite100k(b *testing.B) { + batchsize := 100000 + + // Setup a test server + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), + ) + defer ts.Close() + + // Setup plugin and connect + plugin := &influxdb.InfluxDB{ + URLs: []string{"http://" + ts.Listener.Addr().String()}, + Token: config.NewSecret([]byte("sometoken")), + Bucket: "my_bucket", + Log: &testutil.Logger{}, + } + require.NoError(b, plugin.Init()) + require.NoError(b, plugin.Connect()) + defer plugin.Close() + + metrics := make([]telegraf.Metric, 0, batchsize) + for i := range batchsize { + metrics = append(metrics, metric.New( + "cpu", + map[string]string{ + "bucket": "foo", + }, + map[string]interface{}{ + "value": float64(i), + }, + time.Unix(0, 0), + )) + } + + // Benchmark the writing + b.ResetTimer() + for b.Loop() { + require.NoError(b, plugin.Write(metrics)) + } + b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s") +}