test: migrate elasticsearch to testcontainers (#11207)

This commit is contained in:
Joshua Powers 2022-06-02 08:20:06 -06:00 committed by GitHub
parent 8106b281af
commit 1c61aa7fb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 137 additions and 33 deletions

View File

@ -3,25 +3,27 @@ package elasticsearch_query
import ( import (
"bufio" "bufio"
"context" "context"
"fmt"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
"github.com/docker/go-connections/nat"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
elastic5 "gopkg.in/olivere/elastic.v5" elastic5 "gopkg.in/olivere/elastic.v5"
) )
var ( const (
testindex = "test-elasticsearch_query-" + strconv.Itoa(int(time.Now().Unix())) servicePort = "9200"
setupOnce sync.Once testindex = "test-elasticsearch"
) )
type esAggregationQueryTest struct { type esAggregationQueryTest struct {
@ -503,7 +505,7 @@ var testEsAggregationData = []esAggregationQueryTest{
}, },
} }
func setupIntegrationTest() error { func setupIntegrationTest(t *testing.T) (testutil.Container, error) {
type nginxlog struct { type nginxlog struct {
IPaddress string `json:"IP"` IPaddress string `json:"IP"`
Timestamp time.Time `json:"@timestamp"` Timestamp time.Time `json:"@timestamp"`
@ -515,15 +517,32 @@ func setupIntegrationTest() error {
ResponseTime float64 `json:"response_time"` ResponseTime float64 `json:"response_time"`
} }
container := testutil.Container{
Image: "elasticsearch:6.8.23",
ExposedPorts: []string{servicePort},
Env: map[string]string{
"discovery.type": "single-node",
},
WaitingFor: wait.ForAll(
wait.ForLog("] mode [basic] - valid"),
wait.ForListeningPort(nat.Port(servicePort)),
),
}
err := container.Start()
require.NoError(t, err, "failed to start container")
url := fmt.Sprintf(
"http://%s:%s", container.Address, container.Ports[servicePort],
)
e := &ElasticsearchQuery{ e := &ElasticsearchQuery{
URLs: []string{"http://" + testutil.GetLocalHost() + ":9200"}, URLs: []string{url},
Timeout: config.Duration(time.Second * 30), Timeout: config.Duration(time.Second * 30),
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
err := e.connectToES() err = e.connectToES()
if err != nil { if err != nil {
return err return container, err
} }
bulkRequest := e.esClient.Bulk() bulkRequest := e.esClient.Bulk()
@ -531,7 +550,7 @@ func setupIntegrationTest() error {
// populate elasticsearch with nginx_logs test data file // populate elasticsearch with nginx_logs test data file
file, err := os.Open("testdata/nginx_logs") file, err := os.Open("testdata/nginx_logs")
if err != nil { if err != nil {
return err return container, err
} }
defer file.Close() defer file.Close()
@ -560,22 +579,22 @@ func setupIntegrationTest() error {
Doc(logline)) Doc(logline))
} }
if scanner.Err() != nil { if scanner.Err() != nil {
return err return container, err
} }
_, err = bulkRequest.Do(context.Background()) _, err = bulkRequest.Do(context.Background())
if err != nil { if err != nil {
return err return container, err
} }
// force elastic to refresh indexes to get new batch data // force elastic to refresh indexes to get new batch data
ctx := context.Background() ctx := context.Background()
_, err = e.esClient.Refresh().Do(ctx) _, err = e.esClient.Refresh().Do(ctx)
if err != nil { if err != nil {
return err return container, err
} }
return nil return container, nil
} }
func TestElasticsearchQuery(t *testing.T) { func TestElasticsearchQuery(t *testing.T) {
@ -583,19 +602,22 @@ func TestElasticsearchQuery(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
setupOnce.Do(func() { container, err := setupIntegrationTest(t)
err := setupIntegrationTest() require.NoError(t, err)
require.NoError(t, err) defer func() {
}) require.NoError(t, container.Terminate(), "terminating container failed")
}()
var acc testutil.Accumulator var acc testutil.Accumulator
e := &ElasticsearchQuery{ e := &ElasticsearchQuery{
URLs: []string{"http://" + testutil.GetLocalHost() + ":9200"}, URLs: []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
},
Timeout: config.Duration(time.Second * 30), Timeout: config.Duration(time.Second * 30),
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
err := e.connectToES() err = e.connectToES()
require.NoError(t, err) require.NoError(t, err)
var aggs []esAggregation var aggs []esAggregation
@ -641,10 +663,11 @@ func TestElasticsearchQuery_getMetricFields(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
setupOnce.Do(func() { container, err := setupIntegrationTest(t)
err := setupIntegrationTest() require.NoError(t, err)
require.NoError(t, err) defer func() {
}) require.NoError(t, container.Terminate(), "terminating container failed")
}()
type args struct { type args struct {
ctx context.Context ctx context.Context
@ -652,12 +675,14 @@ func TestElasticsearchQuery_getMetricFields(t *testing.T) {
} }
e := &ElasticsearchQuery{ e := &ElasticsearchQuery{
URLs: []string{"http://" + testutil.GetLocalHost() + ":9200"}, URLs: []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
},
Timeout: config.Duration(time.Second * 30), Timeout: config.Duration(time.Second * 30),
Log: testutil.Logger{}, Log: testutil.Logger{},
} }
err := e.connectToES() err = e.connectToES()
require.NoError(t, err) require.NoError(t, err)
type test struct { type test struct {

View File

@ -2,6 +2,7 @@ package elasticsearch
import ( import (
"context" "context"
"fmt"
"math" "math"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -9,18 +10,47 @@ import (
"testing" "testing"
"time" "time"
"github.com/docker/go-connections/nat"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go/wait"
) )
const servicePort = "9200"
func launchTestContainer(t *testing.T) *testutil.Container {
container := testutil.Container{
Image: "elasticsearch:6.8.23",
ExposedPorts: []string{servicePort},
Env: map[string]string{
"discovery.type": "single-node",
},
WaitingFor: wait.ForAll(
wait.ForLog("] mode [basic] - valid"),
wait.ForListeningPort(nat.Port(servicePort)),
),
}
err := container.Start()
require.NoError(t, err, "failed to start container")
return &container
}
func TestConnectAndWriteIntegration(t *testing.T) { func TestConnectAndWriteIntegration(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Elasticsearch{ e := &Elasticsearch{
URLs: urls, URLs: urls,
@ -49,7 +79,14 @@ func TestConnectAndWriteMetricWithNaNValueEmpty(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Elasticsearch{ e := &Elasticsearch{
URLs: urls, URLs: urls,
@ -85,7 +122,14 @@ func TestConnectAndWriteMetricWithNaNValueNone(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Elasticsearch{ e := &Elasticsearch{
URLs: urls, URLs: urls,
@ -122,7 +166,14 @@ func TestConnectAndWriteMetricWithNaNValueDrop(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Elasticsearch{ e := &Elasticsearch{
URLs: urls, URLs: urls,
@ -181,7 +232,14 @@ func TestConnectAndWriteMetricWithNaNValueReplacement(t *testing.T) {
}, },
} }
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
for _, test := range tests { for _, test := range tests {
e := &Elasticsearch{ e := &Elasticsearch{
@ -224,7 +282,14 @@ func TestTemplateManagementEmptyTemplateIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
ctx := context.Background() ctx := context.Background()
@ -248,7 +313,14 @@ func TestTemplateManagementIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Elasticsearch{ e := &Elasticsearch{
URLs: urls, URLs: urls,
@ -276,7 +348,14 @@ func TestTemplateInvalidIndexPatternIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode") t.Skip("Skipping integration test in short mode")
} }
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"} container := launchTestContainer(t)
defer func() {
require.NoError(t, container.Terminate(), "terminating container failed")
}()
urls := []string{
fmt.Sprintf("http://%s:%s", container.Address, container.Ports[servicePort]),
}
e := &Elasticsearch{ e := &Elasticsearch{
URLs: urls, URLs: urls,