248 lines
5.9 KiB
Go
248 lines
5.9 KiB
Go
package bigquery
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"cloud.google.com/go/bigquery"
|
|
"golang.org/x/oauth2/google"
|
|
"google.golang.org/api/option"
|
|
|
|
"github.com/influxdata/telegraf"
|
|
"github.com/influxdata/telegraf/config"
|
|
"github.com/influxdata/telegraf/plugins/outputs"
|
|
)
|
|
|
|
const timeStampFieldName = "timestamp"
|
|
|
|
var defaultTimeout = config.Duration(5 * time.Second)
|
|
|
|
const sampleConfig = `
|
|
## Credentials File
|
|
credentials_file = "/path/to/service/account/key.json"
|
|
|
|
## Google Cloud Platform Project
|
|
project = "my-gcp-project"
|
|
|
|
## The namespace for the metric descriptor
|
|
dataset = "telegraf"
|
|
|
|
## Timeout for BigQuery operations.
|
|
# timeout = "5s"
|
|
|
|
## Character to replace hyphens on Metric name
|
|
# replace_hyphen_to = "_"
|
|
`
|
|
|
|
type BigQuery struct {
|
|
CredentialsFile string `toml:"credentials_file"`
|
|
Project string `toml:"project"`
|
|
Dataset string `toml:"dataset"`
|
|
|
|
Timeout config.Duration `toml:"timeout"`
|
|
ReplaceHyphenTo string `toml:"replace_hyphen_to"`
|
|
|
|
Log telegraf.Logger `toml:"-"`
|
|
|
|
client *bigquery.Client
|
|
|
|
warnedOnHyphens map[string]bool
|
|
}
|
|
|
|
// SampleConfig returns the formatted sample configuration for the plugin.
|
|
func (s *BigQuery) SampleConfig() string {
|
|
return sampleConfig
|
|
}
|
|
|
|
// Description returns the human-readable function definition of the plugin.
|
|
func (s *BigQuery) Description() string {
|
|
return "Configuration for Google Cloud BigQuery to send entries"
|
|
}
|
|
|
|
func (s *BigQuery) Connect() error {
|
|
if s.Project == "" {
|
|
return fmt.Errorf("Project is a required field for BigQuery output")
|
|
}
|
|
|
|
if s.Dataset == "" {
|
|
return fmt.Errorf("Dataset is a required field for BigQuery output")
|
|
}
|
|
|
|
if s.client == nil {
|
|
return s.setUpDefaultClient()
|
|
}
|
|
|
|
s.warnedOnHyphens = make(map[string]bool)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *BigQuery) setUpDefaultClient() error {
|
|
var credentialsOption option.ClientOption
|
|
|
|
ctx := context.Background()
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.Timeout))
|
|
defer cancel()
|
|
|
|
if s.CredentialsFile != "" {
|
|
credentialsOption = option.WithCredentialsFile(s.CredentialsFile)
|
|
} else {
|
|
creds, err := google.FindDefaultCredentials(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf(
|
|
"unable to find Google Cloud Platform Application Default Credentials: %v. "+
|
|
"Either set ADC or provide CredentialsFile config", err)
|
|
}
|
|
credentialsOption = option.WithCredentials(creds)
|
|
}
|
|
|
|
client, err := bigquery.NewClient(ctx, s.Project, credentialsOption)
|
|
s.client = client
|
|
return err
|
|
}
|
|
|
|
// Write the metrics to Google Cloud BigQuery.
|
|
func (s *BigQuery) Write(metrics []telegraf.Metric) error {
|
|
groupedMetrics := s.groupByMetricName(metrics)
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
for k, v := range groupedMetrics {
|
|
wg.Add(1)
|
|
go func(k string, v []bigquery.ValueSaver) {
|
|
defer wg.Done()
|
|
s.insertToTable(k, v)
|
|
}(k, v)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *BigQuery) groupByMetricName(metrics []telegraf.Metric) map[string][]bigquery.ValueSaver {
|
|
groupedMetrics := make(map[string][]bigquery.ValueSaver)
|
|
|
|
for _, m := range metrics {
|
|
bqm := newValuesSaver(m)
|
|
groupedMetrics[m.Name()] = append(groupedMetrics[m.Name()], bqm)
|
|
}
|
|
|
|
return groupedMetrics
|
|
}
|
|
|
|
func newValuesSaver(m telegraf.Metric) *bigquery.ValuesSaver {
|
|
s := make(bigquery.Schema, 0)
|
|
r := make([]bigquery.Value, 0)
|
|
timeSchema := timeStampFieldSchema()
|
|
s = append(s, timeSchema)
|
|
r = append(r, m.Time())
|
|
|
|
s, r = tagsSchemaAndValues(m, s, r)
|
|
s, r = valuesSchemaAndValues(m, s, r)
|
|
|
|
return &bigquery.ValuesSaver{
|
|
Schema: s.Relax(),
|
|
Row: r,
|
|
}
|
|
}
|
|
|
|
func timeStampFieldSchema() *bigquery.FieldSchema {
|
|
return &bigquery.FieldSchema{
|
|
Name: timeStampFieldName,
|
|
Type: bigquery.TimestampFieldType,
|
|
}
|
|
}
|
|
|
|
func tagsSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) {
|
|
for _, t := range m.TagList() {
|
|
s = append(s, tagFieldSchema(t))
|
|
r = append(r, t.Value)
|
|
}
|
|
|
|
return s, r
|
|
}
|
|
|
|
func tagFieldSchema(t *telegraf.Tag) *bigquery.FieldSchema {
|
|
return &bigquery.FieldSchema{
|
|
Name: t.Key,
|
|
Type: bigquery.StringFieldType,
|
|
}
|
|
}
|
|
|
|
func valuesSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) {
|
|
for _, f := range m.FieldList() {
|
|
s = append(s, valuesSchema(f))
|
|
r = append(r, f.Value)
|
|
}
|
|
|
|
return s, r
|
|
}
|
|
|
|
func valuesSchema(f *telegraf.Field) *bigquery.FieldSchema {
|
|
return &bigquery.FieldSchema{
|
|
Name: f.Key,
|
|
Type: valueToBqType(f.Value),
|
|
}
|
|
}
|
|
|
|
func valueToBqType(v interface{}) bigquery.FieldType {
|
|
switch reflect.ValueOf(v).Kind() {
|
|
case reflect.Int, reflect.Int16, reflect.Int32, reflect.Int64:
|
|
return bigquery.IntegerFieldType
|
|
case reflect.Float32, reflect.Float64:
|
|
return bigquery.FloatFieldType
|
|
case reflect.Bool:
|
|
return bigquery.BooleanFieldType
|
|
default:
|
|
return bigquery.StringFieldType
|
|
}
|
|
}
|
|
|
|
func (s *BigQuery) insertToTable(metricName string, metrics []bigquery.ValueSaver) {
|
|
ctx := context.Background()
|
|
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.Timeout))
|
|
defer cancel()
|
|
|
|
tableName := s.metricToTable(metricName)
|
|
table := s.client.DatasetInProject(s.Project, s.Dataset).Table(tableName)
|
|
inserter := table.Inserter()
|
|
|
|
if err := inserter.Put(ctx, metrics); err != nil {
|
|
s.Log.Errorf("inserting metric %q failed: %v", metricName, err)
|
|
}
|
|
}
|
|
|
|
func (s *BigQuery) metricToTable(metricName string) string {
|
|
if !strings.Contains(metricName, "-") {
|
|
return metricName
|
|
}
|
|
|
|
dhm := strings.ReplaceAll(metricName, "-", s.ReplaceHyphenTo)
|
|
|
|
if warned := s.warnedOnHyphens[metricName]; !warned {
|
|
s.Log.Warnf("Metric %q contains hyphens please consider using the rename processor plugin, falling back to %q", metricName, dhm)
|
|
s.warnedOnHyphens[metricName] = true
|
|
}
|
|
|
|
return dhm
|
|
}
|
|
|
|
// Close will terminate the session to the backend, returning error if an issue arises.
|
|
func (s *BigQuery) Close() error {
|
|
return s.client.Close()
|
|
}
|
|
|
|
func init() {
|
|
outputs.Add("bigquery", func() telegraf.Output {
|
|
return &BigQuery{
|
|
Timeout: defaultTimeout,
|
|
ReplaceHyphenTo: "_",
|
|
}
|
|
})
|
|
}
|