2021-03-13 01:16:45 +08:00
|
|
|
package bigquery
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"net/http"
|
|
|
|
|
"net/http/httptest"
|
|
|
|
|
"strings"
|
|
|
|
|
"testing"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"cloud.google.com/go/bigquery"
|
2021-04-10 01:15:04 +08:00
|
|
|
"github.com/influxdata/telegraf/config"
|
2021-03-13 01:16:45 +08:00
|
|
|
"github.com/influxdata/telegraf/testutil"
|
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
|
"google.golang.org/api/option"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
successfulResponse = "{\"kind\": \"bigquery#tableDataInsertAllResponse\"}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var testingHost string
|
2021-04-10 01:15:04 +08:00
|
|
|
var testDuration = config.Duration(5 * time.Second)
|
2021-03-13 01:16:45 +08:00
|
|
|
var receivedBody map[string]json.RawMessage
|
|
|
|
|
|
|
|
|
|
type Row struct {
|
|
|
|
|
Tag1 string `json:"tag1"`
|
|
|
|
|
Timestamp string `json:"timestamp"`
|
|
|
|
|
Value float64 `json:"value"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestConnect(t *testing.T) {
|
|
|
|
|
srv := localBigQueryServer(t)
|
|
|
|
|
testingHost = strings.ReplaceAll(srv.URL, "http://", "")
|
|
|
|
|
defer srv.Close()
|
|
|
|
|
|
|
|
|
|
b := &BigQuery{
|
|
|
|
|
Project: "test-project",
|
|
|
|
|
Dataset: "test-dataset",
|
|
|
|
|
Timeout: testDuration,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cerr := b.setUpTestClient()
|
|
|
|
|
require.NoError(t, cerr)
|
|
|
|
|
berr := b.Connect()
|
|
|
|
|
require.NoError(t, berr)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestWrite(t *testing.T) {
|
|
|
|
|
srv := localBigQueryServer(t)
|
|
|
|
|
testingHost = strings.ReplaceAll(srv.URL, "http://", "")
|
|
|
|
|
defer srv.Close()
|
|
|
|
|
|
|
|
|
|
b := &BigQuery{
|
|
|
|
|
Project: "test-project",
|
|
|
|
|
Dataset: "test-dataset",
|
|
|
|
|
Timeout: testDuration,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mockMetrics := testutil.MockMetrics()
|
|
|
|
|
|
|
|
|
|
if err := b.setUpTestClient(); err != nil {
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
}
|
|
|
|
|
if err := b.Connect(); err != nil {
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := b.Write(mockMetrics); err != nil {
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var rows []map[string]json.RawMessage
|
|
|
|
|
if err := json.Unmarshal(receivedBody["rows"], &rows); err != nil {
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var row Row
|
|
|
|
|
if err := json.Unmarshal(rows[0]["json"], &row); err != nil {
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pt, _ := time.Parse(time.RFC3339, row.Timestamp)
|
|
|
|
|
require.Equal(t, mockMetrics[0].Tags()["tag1"], row.Tag1)
|
|
|
|
|
require.Equal(t, mockMetrics[0].Time(), pt)
|
|
|
|
|
require.Equal(t, mockMetrics[0].Fields()["value"], row.Value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestMetricToTableDefault(t *testing.T) {
|
|
|
|
|
b := &BigQuery{
|
|
|
|
|
Project: "test-project",
|
|
|
|
|
Dataset: "test-dataset",
|
|
|
|
|
Timeout: testDuration,
|
|
|
|
|
warnedOnHyphens: make(map[string]bool),
|
|
|
|
|
ReplaceHyphenTo: "_",
|
|
|
|
|
Log: testutil.Logger{},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
otn := "table-with-hyphens"
|
|
|
|
|
ntn := b.metricToTable(otn)
|
|
|
|
|
|
|
|
|
|
require.Equal(t, "table_with_hyphens", ntn)
|
|
|
|
|
require.True(t, b.warnedOnHyphens[otn])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestMetricToTableCustom(t *testing.T) {
|
|
|
|
|
log := testutil.Logger{}
|
|
|
|
|
|
|
|
|
|
b := &BigQuery{
|
|
|
|
|
Project: "test-project",
|
|
|
|
|
Dataset: "test-dataset",
|
|
|
|
|
Timeout: testDuration,
|
|
|
|
|
warnedOnHyphens: make(map[string]bool),
|
|
|
|
|
ReplaceHyphenTo: "*",
|
|
|
|
|
Log: log,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
otn := "table-with-hyphens"
|
|
|
|
|
ntn := b.metricToTable(otn)
|
|
|
|
|
|
|
|
|
|
require.Equal(t, "table*with*hyphens", ntn)
|
|
|
|
|
require.True(t, b.warnedOnHyphens[otn])
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (b *BigQuery) setUpTestClient() error {
|
|
|
|
|
noAuth := option.WithoutAuthentication()
|
|
|
|
|
endpoints := option.WithEndpoint("http://" + testingHost)
|
|
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
|
|
|
|
|
|
c, err := bigquery.NewClient(ctx, b.Project, noAuth, endpoints)
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b.client = c
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func localBigQueryServer(t *testing.T) *httptest.Server {
|
|
|
|
|
srv := httptest.NewServer(http.NotFoundHandler())
|
|
|
|
|
|
|
|
|
|
srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
switch r.URL.Path {
|
|
|
|
|
case "/projects/test-project/datasets/test-dataset/tables/test1/insertAll":
|
|
|
|
|
decoder := json.NewDecoder(r.Body)
|
|
|
|
|
|
|
|
|
|
if err := decoder.Decode(&receivedBody); err != nil {
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
|
if _, err := w.Write([]byte(successfulResponse)); err != nil {
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
w.WriteHeader(http.StatusNotFound)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
return srv
|
|
|
|
|
}
|