test(outputs.influxdb): Add benchmarks (#16694)

This commit is contained in:
Sven Rebhan 2025-04-11 20:19:19 +02:00 committed by GitHub
parent 7dd1886248
commit 58b696bc59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 319 additions and 0 deletions

View File

@ -3,6 +3,7 @@ package influxdb_test
import ( import (
"context" "context"
"net/http" "net/http"
"net/http/httptest"
"testing" "testing"
"time" "time"
@ -234,3 +235,321 @@ func TestInfluxDBLocalAddress(t *testing.T) {
require.NoError(t, output.Connect()) require.NoError(t, output.Connect())
} }
func BenchmarkWrite1k(b *testing.B) {
batchsize := 1000
// Setup a test server
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}),
)
defer ts.Close()
// Setup plugin and connect
plugin := &influxdb.InfluxDB{
URLs: []string{"http://" + ts.Listener.Addr().String()},
Username: config.NewSecret([]byte("user")),
Password: config.NewSecret([]byte("secret")),
Database: "my_database",
Timeout: config.Duration(time.Second * 5),
Log: &testutil.Logger{},
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) {
return influxdb.NewUDPClient(*config)
},
ContentEncoding: "gzip",
SkipDatabaseCreation: true,
}
require.NoError(b, plugin.Connect())
defer plugin.Close()
metrics := make([]telegraf.Metric, 0, batchsize)
for i := range batchsize {
metrics = append(metrics, metric.New(
"cpu",
map[string]string{
"database": "foo",
},
map[string]interface{}{
"value": float64(i),
},
time.Unix(0, 0),
))
}
// Benchmark the writing
b.ResetTimer()
for b.Loop() {
require.NoError(b, plugin.Write(metrics))
}
b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s")
}
func BenchmarkWrite5k(b *testing.B) {
batchsize := 5000
// Setup a test server
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}),
)
defer ts.Close()
// Setup plugin and connect
plugin := &influxdb.InfluxDB{
URLs: []string{"http://" + ts.Listener.Addr().String()},
Username: config.NewSecret([]byte("user")),
Password: config.NewSecret([]byte("secret")),
Database: "my_database",
Timeout: config.Duration(time.Second * 5),
Log: &testutil.Logger{},
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) {
return influxdb.NewUDPClient(*config)
},
ContentEncoding: "gzip",
SkipDatabaseCreation: true,
}
require.NoError(b, plugin.Connect())
defer plugin.Close()
metrics := make([]telegraf.Metric, 0, batchsize)
for i := range batchsize {
metrics = append(metrics, metric.New(
"cpu",
map[string]string{
"database": "foo",
},
map[string]interface{}{
"value": float64(i),
},
time.Unix(0, 0),
))
}
// Benchmark the writing
b.ResetTimer()
for b.Loop() {
require.NoError(b, plugin.Write(metrics))
}
b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s")
}
func BenchmarkWrite10k(b *testing.B) {
batchsize := 10000
// Setup a test server
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}),
)
defer ts.Close()
// Setup plugin and connect
plugin := &influxdb.InfluxDB{
URLs: []string{"http://" + ts.Listener.Addr().String()},
Username: config.NewSecret([]byte("user")),
Password: config.NewSecret([]byte("secret")),
Database: "my_database",
Timeout: config.Duration(time.Second * 5),
Log: &testutil.Logger{},
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) {
return influxdb.NewUDPClient(*config)
},
ContentEncoding: "gzip",
SkipDatabaseCreation: true,
}
require.NoError(b, plugin.Connect())
defer plugin.Close()
metrics := make([]telegraf.Metric, 0, batchsize)
for i := range batchsize {
metrics = append(metrics, metric.New(
"cpu",
map[string]string{
"database": "foo",
},
map[string]interface{}{
"value": float64(i),
},
time.Unix(0, 0),
))
}
// Benchmark the writing
b.ResetTimer()
for b.Loop() {
require.NoError(b, plugin.Write(metrics))
}
b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s")
}
func BenchmarkWrite25k(b *testing.B) {
batchsize := 25000
// Setup a test server
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}),
)
defer ts.Close()
// Setup plugin and connect
plugin := &influxdb.InfluxDB{
URLs: []string{"http://" + ts.Listener.Addr().String()},
Username: config.NewSecret([]byte("user")),
Password: config.NewSecret([]byte("secret")),
Database: "my_database",
Timeout: config.Duration(time.Second * 5),
Log: &testutil.Logger{},
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) {
return influxdb.NewUDPClient(*config)
},
ContentEncoding: "gzip",
SkipDatabaseCreation: true,
}
require.NoError(b, plugin.Connect())
defer plugin.Close()
metrics := make([]telegraf.Metric, 0, batchsize)
for i := range batchsize {
metrics = append(metrics, metric.New(
"cpu",
map[string]string{
"database": "foo",
},
map[string]interface{}{
"value": float64(i),
},
time.Unix(0, 0),
))
}
// Benchmark the writing
b.ResetTimer()
for b.Loop() {
require.NoError(b, plugin.Write(metrics))
}
b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s")
}
func BenchmarkWrite50k(b *testing.B) {
batchsize := 50000
// Setup a test server
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}),
)
defer ts.Close()
// Setup plugin and connect
plugin := &influxdb.InfluxDB{
URLs: []string{"http://" + ts.Listener.Addr().String()},
Username: config.NewSecret([]byte("user")),
Password: config.NewSecret([]byte("secret")),
Database: "my_database",
Timeout: config.Duration(time.Second * 5),
Log: &testutil.Logger{},
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) {
return influxdb.NewUDPClient(*config)
},
ContentEncoding: "gzip",
SkipDatabaseCreation: true,
}
require.NoError(b, plugin.Connect())
defer plugin.Close()
metrics := make([]telegraf.Metric, 0, batchsize)
for i := range batchsize {
metrics = append(metrics, metric.New(
"cpu",
map[string]string{
"database": "foo",
},
map[string]interface{}{
"value": float64(i),
},
time.Unix(0, 0),
))
}
// Benchmark the writing
b.ResetTimer()
for b.Loop() {
require.NoError(b, plugin.Write(metrics))
}
b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s")
}
func BenchmarkWrite100k(b *testing.B) {
batchsize := 100000
// Setup a test server
ts := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}),
)
defer ts.Close()
// Setup plugin and connect
plugin := &influxdb.InfluxDB{
URLs: []string{"http://" + ts.Listener.Addr().String()},
Username: config.NewSecret([]byte("user")),
Password: config.NewSecret([]byte("secret")),
Database: "my_database",
Timeout: config.Duration(time.Second * 5),
Log: &testutil.Logger{},
CreateHTTPClientF: func(config *influxdb.HTTPConfig) (influxdb.Client, error) {
return influxdb.NewHTTPClient(*config)
},
CreateUDPClientF: func(config *influxdb.UDPConfig) (influxdb.Client, error) {
return influxdb.NewUDPClient(*config)
},
ContentEncoding: "gzip",
SkipDatabaseCreation: true,
}
require.NoError(b, plugin.Connect())
defer plugin.Close()
metrics := make([]telegraf.Metric, 0, batchsize)
for i := range batchsize {
metrics = append(metrics, metric.New(
"cpu",
map[string]string{
"database": "foo",
},
map[string]interface{}{
"value": float64(i),
},
time.Unix(0, 0),
))
}
// Benchmark the writing
b.ResetTimer()
for b.Loop() {
require.NoError(b, plugin.Write(metrics))
}
b.ReportMetric(float64(batchsize*b.N)/b.Elapsed().Seconds(), "metrics/s")
}