telegraf/plugins/outputs/bigquery/bigquery_test.go

187 lines
4.1 KiB
Go
Raw Normal View History

2021-03-13 01:16:45 +08:00
package bigquery
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"cloud.google.com/go/bigquery"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"google.golang.org/api/option"
)
const (
successfulResponse = `{"kind": "bigquery#tableDataInsertAllResponse"}`
2021-03-13 01:16:45 +08:00
)
var receivedBody map[string]json.RawMessage
type Row struct {
Tag1 string `json:"tag1"`
Timestamp string `json:"timestamp"`
Value float64 `json:"value"`
}
func TestInit(t *testing.T) {
tests := []struct {
name string
errorString string
plugin *BigQuery
}{
{
name: "dataset is not set",
errorString: `"dataset" is required`,
plugin: &BigQuery{},
},
{
name: "valid config",
plugin: &BigQuery{
Dataset: "test-dataset",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.errorString != "" {
require.EqualError(t, tt.plugin.Init(), tt.errorString)
} else {
require.NoError(t, tt.plugin.Init())
}
})
}
}
func TestMetricToTable(t *testing.T) {
tests := []struct {
name string
replaceHyphenTo string
metricName string
expectedTable string
}{
{
name: "no rename",
replaceHyphenTo: "_",
metricName: "test",
expectedTable: "test",
},
{
name: "default config",
replaceHyphenTo: "_",
metricName: "table-with-hyphens",
expectedTable: "table_with_hyphens",
},
{
name: "custom hypens",
replaceHyphenTo: "*",
metricName: "table-with-hyphens",
expectedTable: "table*with*hyphens",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &BigQuery{
Dataset: "test-dataset",
ReplaceHyphenTo: tt.replaceHyphenTo,
Log: testutil.Logger{},
}
require.NoError(t, b.Init())
require.Equal(t, tt.expectedTable, b.metricToTable(tt.metricName))
if tt.metricName != tt.expectedTable {
require.Contains(t, b.warnedOnHyphens, tt.metricName)
require.True(t, b.warnedOnHyphens[tt.metricName])
} else {
require.NotContains(t, b.warnedOnHyphens, tt.metricName)
}
})
}
}
2021-03-13 01:16:45 +08:00
func TestConnect(t *testing.T) {
srv := localBigQueryServer(t)
defer srv.Close()
b := &BigQuery{
Project: "test-project",
Dataset: "test-dataset",
Timeout: defaultTimeout,
2021-03-13 01:16:45 +08:00
}
require.NoError(t, b.Init())
require.NoError(t, b.setUpTestClient(srv.URL))
require.NoError(t, b.Connect())
2021-03-13 01:16:45 +08:00
}
func TestWrite(t *testing.T) {
srv := localBigQueryServer(t)
defer srv.Close()
b := &BigQuery{
Project: "test-project",
Dataset: "test-dataset",
Timeout: defaultTimeout,
2021-03-13 01:16:45 +08:00
}
mockMetrics := testutil.MockMetrics()
require.NoError(t, b.Init())
require.NoError(t, b.setUpTestClient(srv.URL))
require.NoError(t, b.Connect())
2021-03-13 01:16:45 +08:00
require.NoError(t, b.Write(mockMetrics))
2021-03-13 01:16:45 +08:00
var rows []map[string]json.RawMessage
require.NoError(t, json.Unmarshal(receivedBody["rows"], &rows))
2021-03-13 01:16:45 +08:00
var row Row
require.NoError(t, json.Unmarshal(rows[0]["json"], &row))
2021-03-13 01:16:45 +08:00
pt, _ := time.Parse(time.RFC3339, row.Timestamp)
require.Equal(t, mockMetrics[0].Tags()["tag1"], row.Tag1)
require.Equal(t, mockMetrics[0].Time(), pt)
require.Equal(t, mockMetrics[0].Fields()["value"], row.Value)
}
func (b *BigQuery) setUpTestClient(endpointURL string) error {
2021-03-13 01:16:45 +08:00
noAuth := option.WithoutAuthentication()
endpoint := option.WithEndpoint(endpointURL)
2021-03-13 01:16:45 +08:00
ctx := context.Background()
c, err := bigquery.NewClient(ctx, b.Project, noAuth, endpoint)
2021-03-13 01:16:45 +08:00
if err != nil {
return err
}
b.client = c
return nil
}
func localBigQueryServer(t *testing.T) *httptest.Server {
srv := httptest.NewServer(http.NotFoundHandler())
srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/projects/test-project/datasets/test-dataset/tables/test1/insertAll":
decoder := json.NewDecoder(r.Body)
require.NoError(t, decoder.Decode(&receivedBody))
2021-03-13 01:16:45 +08:00
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(successfulResponse))
require.NoError(t, err)
2021-03-13 01:16:45 +08:00
default:
w.WriteHeader(http.StatusNotFound)
}
})
return srv
}