fix(outputs.bigquery): Correct use of auto-detected project ID (#14416)
This commit is contained in:
parent
9cf8afc123
commit
fe6e5d86fc
|
|
@ -18,6 +18,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
"github.com/influxdata/telegraf/plugins/outputs"
|
"github.com/influxdata/telegraf/plugins/outputs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -75,7 +76,7 @@ func (s *BigQuery) Connect() error {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Check if the compact table exists
|
// Check if the compact table exists
|
||||||
_, err := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTable).Metadata(ctx)
|
_, err := s.client.Dataset(s.Dataset).Table(s.CompactTable).Metadata(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("compact table: %w", err)
|
return fmt.Errorf("compact table: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -102,7 +103,10 @@ func (s *BigQuery) setUpDefaultClient() error {
|
||||||
credentialsOption = option.WithCredentials(creds)
|
credentialsOption = option.WithCredentials(creds)
|
||||||
}
|
}
|
||||||
|
|
||||||
client, err := bigquery.NewClient(ctx, s.Project, credentialsOption)
|
client, err := bigquery.NewClient(ctx, s.Project,
|
||||||
|
credentialsOption,
|
||||||
|
option.WithUserAgent(internal.ProductToken()),
|
||||||
|
)
|
||||||
s.client = client
|
s.client = client
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -136,7 +140,7 @@ func (s *BigQuery) writeCompact(metrics []telegraf.Metric) error {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Always returns an instance, even if table doesn't exist (anymore).
|
// Always returns an instance, even if table doesn't exist (anymore).
|
||||||
inserter := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTable).Inserter()
|
inserter := s.client.Dataset(s.Dataset).Table(s.CompactTable).Inserter()
|
||||||
|
|
||||||
var compactValues []*bigquery.ValuesSaver
|
var compactValues []*bigquery.ValuesSaver
|
||||||
for _, m := range metrics {
|
for _, m := range metrics {
|
||||||
|
|
@ -269,7 +273,7 @@ func (s *BigQuery) insertToTable(metricName string, metrics []bigquery.ValueSave
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
tableName := s.metricToTable(metricName)
|
tableName := s.metricToTable(metricName)
|
||||||
table := s.client.DatasetInProject(s.Project, s.Dataset).Table(tableName)
|
table := s.client.Dataset(s.Dataset).Table(tableName)
|
||||||
inserter := table.Inserter()
|
inserter := table.Inserter()
|
||||||
|
|
||||||
if err := inserter.Put(ctx, metrics); err != nil {
|
if err := inserter.Put(ctx, metrics); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -211,6 +211,24 @@ func TestWriteCompact(t *testing.T) {
|
||||||
require.NoError(t, b.Close())
|
require.NoError(t, b.Close())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAutoDetect(t *testing.T) {
|
||||||
|
srv := localBigQueryServer(t)
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
b := &BigQuery{
|
||||||
|
Dataset: "test-dataset",
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
CompactTable: "test-metrics",
|
||||||
|
}
|
||||||
|
|
||||||
|
credentialsJSON := []byte(`{"type": "service_account", "project_id": "test-project"}`)
|
||||||
|
|
||||||
|
require.NoError(t, b.Init())
|
||||||
|
require.NoError(t, b.setUpTestClientWithJSON(srv.URL, credentialsJSON))
|
||||||
|
require.NoError(t, b.Connect())
|
||||||
|
require.NoError(t, b.Close())
|
||||||
|
}
|
||||||
|
|
||||||
func (b *BigQuery) setUpTestClient(endpointURL string) error {
|
func (b *BigQuery) setUpTestClient(endpointURL string) error {
|
||||||
noAuth := option.WithoutAuthentication()
|
noAuth := option.WithoutAuthentication()
|
||||||
endpoint := option.WithEndpoint(endpointURL)
|
endpoint := option.WithEndpoint(endpointURL)
|
||||||
|
|
@ -228,6 +246,19 @@ func (b *BigQuery) setUpTestClient(endpointURL string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *BigQuery) setUpTestClientWithJSON(endpointURL string, credentialsJSON []byte) error {
|
||||||
|
noAuth := option.WithoutAuthentication()
|
||||||
|
endpoint := option.WithEndpoint(endpointURL)
|
||||||
|
credentials := option.WithCredentialsJSON(credentialsJSON)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
c, err := bigquery.NewClient(ctx, b.Project, credentials, noAuth, endpoint)
|
||||||
|
|
||||||
|
b.client = c
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func localBigQueryServer(t *testing.T) *httptest.Server {
|
func localBigQueryServer(t *testing.T) *httptest.Server {
|
||||||
srv := httptest.NewServer(http.NotFoundHandler())
|
srv := httptest.NewServer(http.NotFoundHandler())
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue