feat(outputs.bigquery): Allow to add metrics in one compact table (#14342)

This commit is contained in:
Thomas Casteleyn 2023-12-04 15:40:06 +01:00 committed by GitHub
parent 78e41f6649
commit 6ff28c7593
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 197 additions and 18 deletions

View File

@ -36,6 +36,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Character to replace hyphens on Metric name
# replace_hyphen_to = "_"
## Write all metrics in a single compact table
# compact_table = ""
```
Leaving `project` empty indicates the plugin will try to retrieve the project
@ -54,6 +57,36 @@ table on BigQuery:
* Should contain the metric's fields with the same name and the column type
should match the field type.
## Compact table
When enabling the compact table, all metrics are inserted to the given table
with the following schema:
```json
[
{
"mode": "REQUIRED",
"name": "timestamp",
"type": "TIMESTAMP"
},
{
"mode": "REQUIRED",
"name": "name",
"type": "STRING"
},
{
"mode": "REQUIRED",
"name": "tags",
"type": "JSON"
},
{
"mode": "REQUIRED",
"name": "fields",
"type": "JSON"
}
]
```
## Restrictions
Avoid hyphens on BigQuery tables, underlying SDK cannot handle streaming inserts

View File

@ -4,6 +4,7 @@ package bigquery
import (
"context"
_ "embed"
"encoding/json"
"errors"
"fmt"
"reflect"
@ -34,6 +35,7 @@ type BigQuery struct {
Timeout config.Duration `toml:"timeout"`
ReplaceHyphenTo string `toml:"replace_hyphen_to"`
CompactTable string `toml:"compact_table"`
Log telegraf.Logger `toml:"-"`
@ -62,9 +64,22 @@ func (s *BigQuery) Init() error {
func (s *BigQuery) Connect() error {
if s.client == nil {
return s.setUpDefaultClient()
if err := s.setUpDefaultClient(); err != nil {
return err
}
}
if s.CompactTable != "" {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.Timeout))
defer cancel()
// Check if the compact table exists
_, err := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTable).Metadata(ctx)
if err != nil {
return fmt.Errorf("compact table: %w", err)
}
}
return nil
}
@ -81,7 +96,7 @@ func (s *BigQuery) setUpDefaultClient() error {
creds, err := google.FindDefaultCredentials(ctx)
if err != nil {
return fmt.Errorf(
"unable to find Google Cloud Platform Application Default Credentials: %v. "+
"unable to find Google Cloud Platform Application Default Credentials: %w. "+
"Either set ADC or provide CredentialsFile config", err)
}
credentialsOption = option.WithCredentials(creds)
@ -94,6 +109,10 @@ func (s *BigQuery) setUpDefaultClient() error {
// Write the metrics to Google Cloud BigQuery.
func (s *BigQuery) Write(metrics []telegraf.Metric) error {
if s.CompactTable != "" {
return s.writeCompact(metrics)
}
groupedMetrics := s.groupByMetricName(metrics)
var wg sync.WaitGroup
@ -111,6 +130,26 @@ func (s *BigQuery) Write(metrics []telegraf.Metric) error {
return nil
}
func (s *BigQuery) writeCompact(metrics []telegraf.Metric) error {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.Timeout))
defer cancel()
// Always returns an instance, even if table doesn't exist (anymore).
inserter := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTable).Inserter()
var compactValues []*bigquery.ValuesSaver
for _, m := range metrics {
valueSaver, err := s.newCompactValuesSaver(m)
if err != nil {
s.Log.Warnf("could not prepare metric as compact value: %v", err)
} else {
compactValues = append(compactValues, valueSaver)
}
}
return inserter.Put(ctx, compactValues)
}
func (s *BigQuery) groupByMetricName(metrics []telegraf.Metric) map[string][]bigquery.ValueSaver {
groupedMetrics := make(map[string][]bigquery.ValueSaver)
@ -138,6 +177,33 @@ func newValuesSaver(m telegraf.Metric) *bigquery.ValuesSaver {
}
}
func (s *BigQuery) newCompactValuesSaver(m telegraf.Metric) (*bigquery.ValuesSaver, error) {
tags, err := json.Marshal(m.Tags())
if err != nil {
return nil, fmt.Errorf("serializing tags: %w", err)
}
fields, err := json.Marshal(m.Fields())
if err != nil {
return nil, fmt.Errorf("serializing fields: %w", err)
}
return &bigquery.ValuesSaver{
Schema: bigquery.Schema{
timeStampFieldSchema(),
newStringFieldSchema("name"),
newJSONFieldSchema("tags"),
newJSONFieldSchema("fields"),
},
Row: []bigquery.Value{
m.Time(),
m.Name(),
string(tags),
string(fields),
},
}, nil
}
func timeStampFieldSchema() *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: timeStampFieldName,
@ -145,22 +211,29 @@ func timeStampFieldSchema() *bigquery.FieldSchema {
}
}
func newStringFieldSchema(name string) *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: name,
Type: bigquery.StringFieldType,
}
}
func newJSONFieldSchema(name string) *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: name,
Type: bigquery.JSONFieldType,
}
}
func tagsSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) {
for _, t := range m.TagList() {
s = append(s, tagFieldSchema(t))
s = append(s, newStringFieldSchema(t.Key))
r = append(r, t.Value)
}
return s, r
}
func tagFieldSchema(t *telegraf.Tag) *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: t.Key,
Type: bigquery.StringFieldType,
}
}
func valuesSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) {
for _, f := range m.FieldList() {
s = append(s, valuesSchema(f))

View File

@ -107,15 +107,42 @@ func TestConnect(t *testing.T) {
srv := localBigQueryServer(t)
defer srv.Close()
b := &BigQuery{
Project: "test-project",
Dataset: "test-dataset",
Timeout: defaultTimeout,
tests := []struct {
name string
compactTable string
errorString string
}{
{name: "normal"},
{
name: "compact table existing",
compactTable: "test-metrics",
},
{
name: "compact table not existing",
compactTable: "foobar",
errorString: "compact table: googleapi: got HTTP response code 404",
},
}
require.NoError(t, b.Init())
require.NoError(t, b.setUpTestClient(srv.URL))
require.NoError(t, b.Connect())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &BigQuery{
Project: "test-project",
Dataset: "test-dataset",
Timeout: defaultTimeout,
CompactTable: tt.compactTable,
}
require.NoError(t, b.Init())
require.NoError(t, b.setUpTestClient(srv.URL))
if tt.errorString != "" {
require.ErrorContains(t, b.Connect(), tt.errorString)
} else {
require.NoError(t, b.Connect())
}
})
}
}
func TestWrite(t *testing.T) {
@ -148,6 +175,42 @@ func TestWrite(t *testing.T) {
require.Equal(t, mockMetrics[0].Fields()["value"], row.Value)
}
func TestWriteCompact(t *testing.T) {
srv := localBigQueryServer(t)
defer srv.Close()
b := &BigQuery{
Project: "test-project",
Dataset: "test-dataset",
Timeout: defaultTimeout,
CompactTable: "test-metrics",
}
mockMetrics := testutil.MockMetrics()
require.NoError(t, b.Init())
require.NoError(t, b.setUpTestClient(srv.URL))
require.NoError(t, b.Connect())
require.NoError(t, b.Write(mockMetrics))
var rows []map[string]json.RawMessage
require.NoError(t, json.Unmarshal(receivedBody["rows"], &rows))
require.Len(t, rows, 1)
require.Contains(t, rows[0], "json")
var row interface{}
require.NoError(t, json.Unmarshal(rows[0]["json"], &row))
require.Equal(t, map[string]interface{}{
"timestamp": "2009-11-10T23:00:00Z",
"name": "test1",
"tags": `{"tag1":"value1"}`,
"fields": `{"value":1}`,
}, row)
require.NoError(t, b.Close())
}
func (b *BigQuery) setUpTestClient(endpointURL string) error {
noAuth := option.WithoutAuthentication()
endpoint := option.WithEndpoint(endpointURL)
@ -170,15 +233,22 @@ func localBigQueryServer(t *testing.T) *httptest.Server {
srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/projects/test-project/datasets/test-dataset/tables/test1/insertAll":
case "/projects/test-project/datasets/test-dataset/tables/test1/insertAll",
"/projects/test-project/datasets/test-dataset/tables/test-metrics/insertAll":
decoder := json.NewDecoder(r.Body)
require.NoError(t, decoder.Decode(&receivedBody))
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(successfulResponse))
require.NoError(t, err)
case "/projects/test-project/datasets/test-dataset/tables/test-metrics":
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{}"))
require.NoError(t, err)
default:
w.WriteHeader(http.StatusNotFound)
_, err := w.Write([]byte(r.URL.String()))
require.NoError(t, err)
}
})

View File

@ -14,3 +14,6 @@
## Character to replace hyphens on Metric name
# replace_hyphen_to = "_"
## Write all metrics in a single compact table
# compact_table = ""