diff --git a/README.md b/README.md index 909e7dfec..5535b9527 100644 --- a/README.md +++ b/README.md @@ -420,6 +420,7 @@ For documentation on the latest development code see the [documentation index][d * [aws kinesis](./plugins/outputs/kinesis) * [aws cloudwatch](./plugins/outputs/cloudwatch) * [azure_monitor](./plugins/outputs/azure_monitor) +* [bigquery](./plugins/outputs/bigquery) * [cloud_pubsub](./plugins/outputs/cloud_pubsub) Google Cloud Pub/Sub * [cratedb](./plugins/outputs/cratedb) * [datadog](./plugins/outputs/datadog) diff --git a/go.mod b/go.mod index e0737bb89..edb407d8e 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.16 require ( cloud.google.com/go v0.53.0 - cloud.google.com/go/datastore v1.1.0 // indirect - cloud.google.com/go/pubsub v1.2.0 + cloud.google.com/go/bigquery v1.3.0 + cloud.google.com/go/pubsub v1.1.0 code.cloudfoundry.org/clock v1.0.0 // indirect collectd.org v0.3.0 github.com/Azure/azure-event-hubs-go/v3 v3.2.0 diff --git a/go.sum b/go.sum index 44312d661..369efcde6 100644 --- a/go.sum +++ b/go.sum @@ -7,21 +7,18 @@ cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxK cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc= cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0= cloud.google.com/go v0.50.0/go.mod h1:r9sluTvynVuxRIOHXQEHMFffphuXHOMZMycpNR5e6To= -cloud.google.com/go v0.52.0/go.mod h1:pXajvRH/6o3+F9jDHZWQ5PbGhn+o8w9qiu/CffaVdO4= cloud.google.com/go v0.53.0 h1:MZQCQQaRwOrAcuKjiHWHrgKykt4fZyuwF2dtiG3fGW8= cloud.google.com/go v0.53.0/go.mod h1:fp/UouUEsRkN6ryDKNW/Upv/JBKnv6WDthjR6+vze6M= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= +cloud.google.com/go/bigquery v1.3.0 h1:sAbMqjY1PEQKZBWfbu6Y6bsupJ9c4QdHnzg/VvYTLcE= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= -cloud.google.com/go/bigquery v1.4.0 h1:xE3CPsOgttP4ACBePh79zTKALtXwn/Edhcr16R5hMWU= -cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= +cloud.google.com/go/datastore v1.0.0 h1:Kt+gOPPp2LEPWp8CSfxhsM8ik9CcyE/gYu+0r+RnZvM= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= -cloud.google.com/go/datastore v1.1.0 h1:/May9ojXjRkPBNVrq+oWLqmWCkr4OU5uRY29bu0mRyQ= -cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= +cloud.google.com/go/pubsub v1.1.0 h1:9/vpR43S4aJaROxqQHQ3nH9lfyKKV0dC3vOmnw8ebQQ= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= -cloud.google.com/go/pubsub v1.2.0 h1:Lpy6hKgdcl7a3WGSfJIFmxmcdjSpP6OmBEfcOv1Y680= -cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= +cloud.google.com/go/storage v1.5.0 h1:RPUcBvDeYgQFMfQu1eBMq6piD1SXmLH+vK3qjewZPus= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= code.cloudfoundry.org/clock v1.0.0 h1:kFXWQM4bxYvdBw2X8BbBeXwQNgfoWv1vqAk2ZZyBN2o= code.cloudfoundry.org/clock v1.0.0/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= @@ -337,10 +334,10 @@ github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= -github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= @@ -734,7 +731,6 @@ golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm0 golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= -golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= @@ -789,7 +785,6 @@ golang.org/x/net v0.0.0-20191003171128-d98b1b443823/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -834,7 +829,6 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -842,7 +836,6 @@ golang.org/x/sys v0.0.0-20191003212358-c178f38b412c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -866,9 +859,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -900,10 +892,7 @@ golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.0.0-20200117161641-43d50277825c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.0.0-20200122220014-bf1340f18c4a/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/tools v0.0.0-20200204074204-1cc6d1ef6c74/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9 h1:sEvmEcJVKBNUvgCUClbUQeHOAa9U0I2Ce1BooMvVCY4= @@ -955,9 +944,6 @@ google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvx google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= -google.golang.org/genproto v0.0.0-20200115191322-ca5a22157cba/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= -google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= -google.golang.org/genproto v0.0.0-20200204135345-fa8e72b47b90/go.mod h1:GmwEX6Z4W5gMy59cAlVYjN9JhxgbQH6Gn+gFDQe2lzA= google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884 h1:fiNLklpBwWK1mth30Hlwk+fcdBmIALlgF5iy77O37Ig= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= diff --git a/plugins/outputs/bigquery/README.md b/plugins/outputs/bigquery/README.md new file mode 100644 index 000000000..96e659956 --- /dev/null +++ b/plugins/outputs/bigquery/README.md @@ -0,0 +1,56 @@ +# BigQuery Google Cloud Output Plugin + +This plugin writes to the [Google Cloud BigQuery][bigquery] and requires [authentication][] +with Google Cloud using either a service account or user credentials + +This plugin accesses APIs which are [chargeable][pricing]; you might incur +costs. + +Requires `project` to specify where BigQuery entries will be persisted. + +Requires `dataset` to specify under which BigQuery dataset the corresponding metrics tables reside. + +Each metric should have a corresponding table to BigQuery. +The schema of the table on BigQuery: +* Should contain the field `timestamp` which is the timestamp of a telegraph metrics +* Should contain the metric's tags with the same name and the column type should be set to string. +* Should contain the metric's fields with the same name and the column type should match the field type. + +### Configuration + +```toml +[[outputs.bigquery]] + ## GCP Project + project = "erudite-bloom-151019" + + ## The BigQuery dataset + dataset = "telegraf" + + ## Timeout for BigQuery operations. + # timeout = "5s" + + ## Character to replace hyphens on Metric name + # replace_hyphen_to = "_" +``` + +### Restrictions + +Avoid hyphens on BigQuery tables, underlying SDK cannot handle streaming inserts to Table with hyphens. + +In cases of metrics with hyphens please use the [Rename Processor Plugin](https://github.com/influxdata/telegraf/tree/master/plugins/processors/rename). + +In case of a metric with hyphen by default hyphens shall be replaced with underscores (_). +This can be altered using the `replace_hyphen_to` configuration property. + +Available data type options are: +* integer +* float or long +* string +* boolean + +All field naming restrictions that apply to BigQuery should apply to the measurements to be imported. + +Tables on BigQuery should be created beforehand and they are not created during persistence + +Pay attention to the column `timestamp` since it is reserved upfront and cannot change. +If partitioning is required make sure it is applied beforehand. diff --git a/plugins/outputs/bigquery/bigquery.go b/plugins/outputs/bigquery/bigquery.go new file mode 100644 index 000000000..fd1f3c7bc --- /dev/null +++ b/plugins/outputs/bigquery/bigquery.go @@ -0,0 +1,247 @@ +package bigquery + +import ( + "context" + "fmt" + "reflect" + "strings" + "sync" + "time" + + "cloud.google.com/go/bigquery" + "golang.org/x/oauth2/google" + "google.golang.org/api/option" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/outputs" +) + +const timeStampFieldName = "timestamp" + +var defaultTimeout = internal.Duration{Duration: 5 * time.Second} + +const sampleConfig = ` + ## Credentials File + credentials_file = "/path/to/service/account/key.json" + + ## Google Cloud Platform Project + project = "my-gcp-project" + + ## The namespace for the metric descriptor + dataset = "telegraf" + + ## Timeout for BigQuery operations. + # timeout = "5s" + + ## Character to replace hyphens on Metric name + # replace_hyphen_to = "_" +` + +type BigQuery struct { + CredentialsFile string `toml:"credentials_file"` + Project string `toml:"project"` + Dataset string `toml:"dataset"` + + Timeout internal.Duration `toml:"timeout"` + ReplaceHyphenTo string `toml:"replace_hyphen_to"` + + Log telegraf.Logger `toml:"-"` + + client *bigquery.Client + + warnedOnHyphens map[string]bool +} + +// SampleConfig returns the formatted sample configuration for the plugin. +func (s *BigQuery) SampleConfig() string { + return sampleConfig +} + +// Description returns the human-readable function definition of the plugin. +func (s *BigQuery) Description() string { + return "Configuration for Google Cloud BigQuery to send entries" +} + +func (s *BigQuery) Connect() error { + if s.Project == "" { + return fmt.Errorf("Project is a required field for BigQuery output") + } + + if s.Dataset == "" { + return fmt.Errorf("Dataset is a required field for BigQuery output") + } + + if s.client == nil { + return s.setUpDefaultClient() + } + + s.warnedOnHyphens = make(map[string]bool) + + return nil +} + +func (s *BigQuery) setUpDefaultClient() error { + var credentialsOption option.ClientOption + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, s.Timeout.Duration) + defer cancel() + + if s.CredentialsFile != "" { + credentialsOption = option.WithCredentialsFile(s.CredentialsFile) + } else { + creds, err := google.FindDefaultCredentials(ctx) + if err != nil { + return fmt.Errorf( + "unable to find Google Cloud Platform Application Default Credentials: %v. "+ + "Either set ADC or provide CredentialsFile config", err) + } + credentialsOption = option.WithCredentials(creds) + } + + client, err := bigquery.NewClient(ctx, s.Project, credentialsOption) + s.client = client + return err +} + +// Write the metrics to Google Cloud BigQuery. +func (s *BigQuery) Write(metrics []telegraf.Metric) error { + groupedMetrics := s.groupByMetricName(metrics) + + var wg sync.WaitGroup + + for k, v := range groupedMetrics { + wg.Add(1) + go func(k string, v []bigquery.ValueSaver) { + defer wg.Done() + s.insertToTable(k, v) + }(k, v) + } + + wg.Wait() + + return nil +} + +func (s *BigQuery) groupByMetricName(metrics []telegraf.Metric) map[string][]bigquery.ValueSaver { + groupedMetrics := make(map[string][]bigquery.ValueSaver) + + for _, m := range metrics { + bqm := newValuesSaver(m) + groupedMetrics[m.Name()] = append(groupedMetrics[m.Name()], bqm) + } + + return groupedMetrics +} + +func newValuesSaver(m telegraf.Metric) *bigquery.ValuesSaver { + s := make(bigquery.Schema, 0) + r := make([]bigquery.Value, 0) + timeSchema := timeStampFieldSchema() + s = append(s, timeSchema) + r = append(r, m.Time()) + + s, r = tagsSchemaAndValues(m, s, r) + s, r = valuesSchemaAndValues(m, s, r) + + return &bigquery.ValuesSaver{ + Schema: s.Relax(), + Row: r, + } +} + +func timeStampFieldSchema() *bigquery.FieldSchema { + return &bigquery.FieldSchema{ + Name: timeStampFieldName, + Type: bigquery.TimestampFieldType, + } +} + +func tagsSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) { + for _, t := range m.TagList() { + s = append(s, tagFieldSchema(t)) + r = append(r, t.Value) + } + + return s, r +} + +func tagFieldSchema(t *telegraf.Tag) *bigquery.FieldSchema { + return &bigquery.FieldSchema{ + Name: t.Key, + Type: bigquery.StringFieldType, + } +} + +func valuesSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) { + for _, f := range m.FieldList() { + s = append(s, valuesSchema(f)) + r = append(r, f.Value) + } + + return s, r +} + +func valuesSchema(f *telegraf.Field) *bigquery.FieldSchema { + return &bigquery.FieldSchema{ + Name: f.Key, + Type: valueToBqType(f.Value), + } +} + +func valueToBqType(v interface{}) bigquery.FieldType { + switch reflect.ValueOf(v).Kind() { + case reflect.Int, reflect.Int16, reflect.Int32, reflect.Int64: + return bigquery.IntegerFieldType + case reflect.Float32, reflect.Float64: + return bigquery.FloatFieldType + case reflect.Bool: + return bigquery.BooleanFieldType + default: + return bigquery.StringFieldType + } +} + +func (s *BigQuery) insertToTable(metricName string, metrics []bigquery.ValueSaver) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, s.Timeout.Duration) + defer cancel() + + tableName := s.metricToTable(metricName) + table := s.client.DatasetInProject(s.Project, s.Dataset).Table(tableName) + inserter := table.Inserter() + + if err := inserter.Put(ctx, metrics); err != nil { + s.Log.Errorf("inserting metric %q failed: %v", metricName, err) + } +} + +func (s *BigQuery) metricToTable(metricName string) string { + if !strings.Contains(metricName, "-") { + return metricName + } + + dhm := strings.ReplaceAll(metricName, "-", s.ReplaceHyphenTo) + + if warned := s.warnedOnHyphens[metricName]; !warned { + s.Log.Warnf("Metric %q contains hyphens please consider using the rename processor plugin, falling back to %q", metricName, dhm) + s.warnedOnHyphens[metricName] = true + } + + return dhm +} + +// Close will terminate the session to the backend, returning error if an issue arises. +func (s *BigQuery) Close() error { + return s.client.Close() +} + +func init() { + outputs.Add("bigquery", func() telegraf.Output { + return &BigQuery{ + Timeout: defaultTimeout, + ReplaceHyphenTo: "_", + } + }) +} diff --git a/plugins/outputs/bigquery/bigquery_test.go b/plugins/outputs/bigquery/bigquery_test.go new file mode 100644 index 000000000..34d889fcb --- /dev/null +++ b/plugins/outputs/bigquery/bigquery_test.go @@ -0,0 +1,165 @@ +package bigquery + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "cloud.google.com/go/bigquery" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "google.golang.org/api/option" +) + +const ( + successfulResponse = "{\"kind\": \"bigquery#tableDataInsertAllResponse\"}" +) + +var testingHost string +var testDuration = internal.Duration{Duration: 5 * time.Second} +var receivedBody map[string]json.RawMessage + +type Row struct { + Tag1 string `json:"tag1"` + Timestamp string `json:"timestamp"` + Value float64 `json:"value"` +} + +func TestConnect(t *testing.T) { + srv := localBigQueryServer(t) + testingHost = strings.ReplaceAll(srv.URL, "http://", "") + defer srv.Close() + + b := &BigQuery{ + Project: "test-project", + Dataset: "test-dataset", + Timeout: testDuration, + } + + cerr := b.setUpTestClient() + require.NoError(t, cerr) + berr := b.Connect() + require.NoError(t, berr) +} + +func TestWrite(t *testing.T) { + srv := localBigQueryServer(t) + testingHost = strings.ReplaceAll(srv.URL, "http://", "") + defer srv.Close() + + b := &BigQuery{ + Project: "test-project", + Dataset: "test-dataset", + Timeout: testDuration, + } + + mockMetrics := testutil.MockMetrics() + + if err := b.setUpTestClient(); err != nil { + require.NoError(t, err) + } + if err := b.Connect(); err != nil { + require.NoError(t, err) + } + + if err := b.Write(mockMetrics); err != nil { + require.NoError(t, err) + } + + var rows []map[string]json.RawMessage + if err := json.Unmarshal(receivedBody["rows"], &rows); err != nil { + require.NoError(t, err) + } + + var row Row + if err := json.Unmarshal(rows[0]["json"], &row); err != nil { + require.NoError(t, err) + } + + pt, _ := time.Parse(time.RFC3339, row.Timestamp) + require.Equal(t, mockMetrics[0].Tags()["tag1"], row.Tag1) + require.Equal(t, mockMetrics[0].Time(), pt) + require.Equal(t, mockMetrics[0].Fields()["value"], row.Value) +} + +func TestMetricToTableDefault(t *testing.T) { + b := &BigQuery{ + Project: "test-project", + Dataset: "test-dataset", + Timeout: testDuration, + warnedOnHyphens: make(map[string]bool), + ReplaceHyphenTo: "_", + Log: testutil.Logger{}, + } + + otn := "table-with-hyphens" + ntn := b.metricToTable(otn) + + require.Equal(t, "table_with_hyphens", ntn) + require.True(t, b.warnedOnHyphens[otn]) +} + +func TestMetricToTableCustom(t *testing.T) { + log := testutil.Logger{} + + b := &BigQuery{ + Project: "test-project", + Dataset: "test-dataset", + Timeout: testDuration, + warnedOnHyphens: make(map[string]bool), + ReplaceHyphenTo: "*", + Log: log, + } + + otn := "table-with-hyphens" + ntn := b.metricToTable(otn) + + require.Equal(t, "table*with*hyphens", ntn) + require.True(t, b.warnedOnHyphens[otn]) +} + +func (b *BigQuery) setUpTestClient() error { + noAuth := option.WithoutAuthentication() + endpoints := option.WithEndpoint("http://" + testingHost) + + ctx := context.Background() + + c, err := bigquery.NewClient(ctx, b.Project, noAuth, endpoints) + + if err != nil { + return err + } + + b.client = c + + return nil +} + +func localBigQueryServer(t *testing.T) *httptest.Server { + srv := httptest.NewServer(http.NotFoundHandler()) + + srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/projects/test-project/datasets/test-dataset/tables/test1/insertAll": + decoder := json.NewDecoder(r.Body) + + if err := decoder.Decode(&receivedBody); err != nil { + require.NoError(t, err) + } + + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte(successfulResponse)); err != nil { + require.NoError(t, err) + } + default: + w.WriteHeader(http.StatusNotFound) + } + }) + + return srv +}