Bigquery output Plugin (#8634)
This commit is contained in:
parent
c905116ade
commit
5085f595ac
|
|
@ -420,6 +420,7 @@ For documentation on the latest development code see the [documentation index][d
|
|||
* [aws kinesis](./plugins/outputs/kinesis)
|
||||
* [aws cloudwatch](./plugins/outputs/cloudwatch)
|
||||
* [azure_monitor](./plugins/outputs/azure_monitor)
|
||||
* [bigquery](./plugins/outputs/bigquery)
|
||||
* [cloud_pubsub](./plugins/outputs/cloud_pubsub) Google Cloud Pub/Sub
|
||||
* [cratedb](./plugins/outputs/cratedb)
|
||||
* [datadog](./plugins/outputs/datadog)
|
||||
|
|
|
|||
4
go.mod
4
go.mod
|
|
@ -4,8 +4,8 @@ go 1.16
|
|||
|
||||
require (
|
||||
cloud.google.com/go v0.53.0
|
||||
cloud.google.com/go/datastore v1.1.0 // indirect
|
||||
cloud.google.com/go/pubsub v1.2.0
|
||||
cloud.google.com/go/bigquery v1.3.0
|
||||
cloud.google.com/go/pubsub v1.1.0
|
||||
code.cloudfoundry.org/clock v1.0.0 // indirect
|
||||
collectd.org v0.3.0
|
||||
github.com/Azure/azure-event-hubs-go/v3 v3.2.0
|
||||
|
|
|
|||
26
go.sum
26
go.sum
|
|
@ -7,21 +7,18 @@ cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxK
|
|||
cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc=
|
||||
cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
|
||||
cloud.google.com/go v0.50.0/go.mod h1:r9sluTvynVuxRIOHXQEHMFffphuXHOMZMycpNR5e6To=
|
||||
cloud.google.com/go v0.52.0/go.mod h1:pXajvRH/6o3+F9jDHZWQ5PbGhn+o8w9qiu/CffaVdO4=
|
||||
cloud.google.com/go v0.53.0 h1:MZQCQQaRwOrAcuKjiHWHrgKykt4fZyuwF2dtiG3fGW8=
|
||||
cloud.google.com/go v0.53.0/go.mod h1:fp/UouUEsRkN6ryDKNW/Upv/JBKnv6WDthjR6+vze6M=
|
||||
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
|
||||
cloud.google.com/go/bigquery v1.3.0 h1:sAbMqjY1PEQKZBWfbu6Y6bsupJ9c4QdHnzg/VvYTLcE=
|
||||
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
|
||||
cloud.google.com/go/bigquery v1.4.0 h1:xE3CPsOgttP4ACBePh79zTKALtXwn/Edhcr16R5hMWU=
|
||||
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
|
||||
cloud.google.com/go/datastore v1.0.0 h1:Kt+gOPPp2LEPWp8CSfxhsM8ik9CcyE/gYu+0r+RnZvM=
|
||||
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
|
||||
cloud.google.com/go/datastore v1.1.0 h1:/May9ojXjRkPBNVrq+oWLqmWCkr4OU5uRY29bu0mRyQ=
|
||||
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
|
||||
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
|
||||
cloud.google.com/go/pubsub v1.1.0 h1:9/vpR43S4aJaROxqQHQ3nH9lfyKKV0dC3vOmnw8ebQQ=
|
||||
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
|
||||
cloud.google.com/go/pubsub v1.2.0 h1:Lpy6hKgdcl7a3WGSfJIFmxmcdjSpP6OmBEfcOv1Y680=
|
||||
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
|
||||
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
|
||||
cloud.google.com/go/storage v1.5.0 h1:RPUcBvDeYgQFMfQu1eBMq6piD1SXmLH+vK3qjewZPus=
|
||||
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
|
||||
code.cloudfoundry.org/clock v1.0.0 h1:kFXWQM4bxYvdBw2X8BbBeXwQNgfoWv1vqAk2ZZyBN2o=
|
||||
code.cloudfoundry.org/clock v1.0.0/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8=
|
||||
|
|
@ -337,10 +334,10 @@ github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO
|
|||
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
|
||||
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
|
||||
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
|
||||
github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
|
||||
github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
|
||||
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
|
||||
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
|
||||
|
|
@ -734,7 +731,6 @@ golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm0
|
|||
golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
|
||||
golang.org/x/exp v0.0.0-20191129062945-2f5052295587/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
|
||||
golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
|
||||
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
|
||||
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
|
||||
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
|
||||
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
|
||||
|
|
@ -789,7 +785,6 @@ golang.org/x/net v0.0.0-20191003171128-d98b1b443823/go.mod h1:z5CRVTTTmAJ677TzLL
|
|||
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
|
||||
|
|
@ -834,7 +829,6 @@ golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
|
@ -842,7 +836,6 @@ golang.org/x/sys v0.0.0-20191003212358-c178f38b412c/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
|
@ -866,9 +859,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
|||
golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc=
|
||||
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
|
@ -900,10 +892,7 @@ golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtn
|
|||
golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20200117161641-43d50277825c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20200122220014-bf1340f18c4a/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20200204074204-1cc6d1ef6c74/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9 h1:sEvmEcJVKBNUvgCUClbUQeHOAa9U0I2Ce1BooMvVCY4=
|
||||
|
|
@ -955,9 +944,6 @@ google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvx
|
|||
google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
|
||||
google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
|
||||
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
|
||||
google.golang.org/genproto v0.0.0-20200115191322-ca5a22157cba/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
|
||||
google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
|
||||
google.golang.org/genproto v0.0.0-20200204135345-fa8e72b47b90/go.mod h1:GmwEX6Z4W5gMy59cAlVYjN9JhxgbQH6Gn+gFDQe2lzA=
|
||||
google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
|
||||
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884 h1:fiNLklpBwWK1mth30Hlwk+fcdBmIALlgF5iy77O37Ig=
|
||||
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
|
||||
|
|
|
|||
|
|
@ -0,0 +1,56 @@
|
|||
# BigQuery Google Cloud Output Plugin
|
||||
|
||||
This plugin writes to the [Google Cloud BigQuery][bigquery] and requires [authentication][]
|
||||
with Google Cloud using either a service account or user credentials
|
||||
|
||||
This plugin accesses APIs which are [chargeable][pricing]; you might incur
|
||||
costs.
|
||||
|
||||
Requires `project` to specify where BigQuery entries will be persisted.
|
||||
|
||||
Requires `dataset` to specify under which BigQuery dataset the corresponding metrics tables reside.
|
||||
|
||||
Each metric should have a corresponding table to BigQuery.
|
||||
The schema of the table on BigQuery:
|
||||
* Should contain the field `timestamp` which is the timestamp of a telegraph metrics
|
||||
* Should contain the metric's tags with the same name and the column type should be set to string.
|
||||
* Should contain the metric's fields with the same name and the column type should match the field type.
|
||||
|
||||
### Configuration
|
||||
|
||||
```toml
|
||||
[[outputs.bigquery]]
|
||||
## GCP Project
|
||||
project = "erudite-bloom-151019"
|
||||
|
||||
## The BigQuery dataset
|
||||
dataset = "telegraf"
|
||||
|
||||
## Timeout for BigQuery operations.
|
||||
# timeout = "5s"
|
||||
|
||||
## Character to replace hyphens on Metric name
|
||||
# replace_hyphen_to = "_"
|
||||
```
|
||||
|
||||
### Restrictions
|
||||
|
||||
Avoid hyphens on BigQuery tables, underlying SDK cannot handle streaming inserts to Table with hyphens.
|
||||
|
||||
In cases of metrics with hyphens please use the [Rename Processor Plugin](https://github.com/influxdata/telegraf/tree/master/plugins/processors/rename).
|
||||
|
||||
In case of a metric with hyphen by default hyphens shall be replaced with underscores (_).
|
||||
This can be altered using the `replace_hyphen_to` configuration property.
|
||||
|
||||
Available data type options are:
|
||||
* integer
|
||||
* float or long
|
||||
* string
|
||||
* boolean
|
||||
|
||||
All field naming restrictions that apply to BigQuery should apply to the measurements to be imported.
|
||||
|
||||
Tables on BigQuery should be created beforehand and they are not created during persistence
|
||||
|
||||
Pay attention to the column `timestamp` since it is reserved upfront and cannot change.
|
||||
If partitioning is required make sure it is applied beforehand.
|
||||
|
|
@ -0,0 +1,247 @@
|
|||
package bigquery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"cloud.google.com/go/bigquery"
|
||||
"golang.org/x/oauth2/google"
|
||||
"google.golang.org/api/option"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
)
|
||||
|
||||
const timeStampFieldName = "timestamp"
|
||||
|
||||
var defaultTimeout = internal.Duration{Duration: 5 * time.Second}
|
||||
|
||||
const sampleConfig = `
|
||||
## Credentials File
|
||||
credentials_file = "/path/to/service/account/key.json"
|
||||
|
||||
## Google Cloud Platform Project
|
||||
project = "my-gcp-project"
|
||||
|
||||
## The namespace for the metric descriptor
|
||||
dataset = "telegraf"
|
||||
|
||||
## Timeout for BigQuery operations.
|
||||
# timeout = "5s"
|
||||
|
||||
## Character to replace hyphens on Metric name
|
||||
# replace_hyphen_to = "_"
|
||||
`
|
||||
|
||||
type BigQuery struct {
|
||||
CredentialsFile string `toml:"credentials_file"`
|
||||
Project string `toml:"project"`
|
||||
Dataset string `toml:"dataset"`
|
||||
|
||||
Timeout internal.Duration `toml:"timeout"`
|
||||
ReplaceHyphenTo string `toml:"replace_hyphen_to"`
|
||||
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
client *bigquery.Client
|
||||
|
||||
warnedOnHyphens map[string]bool
|
||||
}
|
||||
|
||||
// SampleConfig returns the formatted sample configuration for the plugin.
|
||||
func (s *BigQuery) SampleConfig() string {
|
||||
return sampleConfig
|
||||
}
|
||||
|
||||
// Description returns the human-readable function definition of the plugin.
|
||||
func (s *BigQuery) Description() string {
|
||||
return "Configuration for Google Cloud BigQuery to send entries"
|
||||
}
|
||||
|
||||
func (s *BigQuery) Connect() error {
|
||||
if s.Project == "" {
|
||||
return fmt.Errorf("Project is a required field for BigQuery output")
|
||||
}
|
||||
|
||||
if s.Dataset == "" {
|
||||
return fmt.Errorf("Dataset is a required field for BigQuery output")
|
||||
}
|
||||
|
||||
if s.client == nil {
|
||||
return s.setUpDefaultClient()
|
||||
}
|
||||
|
||||
s.warnedOnHyphens = make(map[string]bool)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BigQuery) setUpDefaultClient() error {
|
||||
var credentialsOption option.ClientOption
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithTimeout(ctx, s.Timeout.Duration)
|
||||
defer cancel()
|
||||
|
||||
if s.CredentialsFile != "" {
|
||||
credentialsOption = option.WithCredentialsFile(s.CredentialsFile)
|
||||
} else {
|
||||
creds, err := google.FindDefaultCredentials(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf(
|
||||
"unable to find Google Cloud Platform Application Default Credentials: %v. "+
|
||||
"Either set ADC or provide CredentialsFile config", err)
|
||||
}
|
||||
credentialsOption = option.WithCredentials(creds)
|
||||
}
|
||||
|
||||
client, err := bigquery.NewClient(ctx, s.Project, credentialsOption)
|
||||
s.client = client
|
||||
return err
|
||||
}
|
||||
|
||||
// Write the metrics to Google Cloud BigQuery.
|
||||
func (s *BigQuery) Write(metrics []telegraf.Metric) error {
|
||||
groupedMetrics := s.groupByMetricName(metrics)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for k, v := range groupedMetrics {
|
||||
wg.Add(1)
|
||||
go func(k string, v []bigquery.ValueSaver) {
|
||||
defer wg.Done()
|
||||
s.insertToTable(k, v)
|
||||
}(k, v)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *BigQuery) groupByMetricName(metrics []telegraf.Metric) map[string][]bigquery.ValueSaver {
|
||||
groupedMetrics := make(map[string][]bigquery.ValueSaver)
|
||||
|
||||
for _, m := range metrics {
|
||||
bqm := newValuesSaver(m)
|
||||
groupedMetrics[m.Name()] = append(groupedMetrics[m.Name()], bqm)
|
||||
}
|
||||
|
||||
return groupedMetrics
|
||||
}
|
||||
|
||||
func newValuesSaver(m telegraf.Metric) *bigquery.ValuesSaver {
|
||||
s := make(bigquery.Schema, 0)
|
||||
r := make([]bigquery.Value, 0)
|
||||
timeSchema := timeStampFieldSchema()
|
||||
s = append(s, timeSchema)
|
||||
r = append(r, m.Time())
|
||||
|
||||
s, r = tagsSchemaAndValues(m, s, r)
|
||||
s, r = valuesSchemaAndValues(m, s, r)
|
||||
|
||||
return &bigquery.ValuesSaver{
|
||||
Schema: s.Relax(),
|
||||
Row: r,
|
||||
}
|
||||
}
|
||||
|
||||
func timeStampFieldSchema() *bigquery.FieldSchema {
|
||||
return &bigquery.FieldSchema{
|
||||
Name: timeStampFieldName,
|
||||
Type: bigquery.TimestampFieldType,
|
||||
}
|
||||
}
|
||||
|
||||
func tagsSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) {
|
||||
for _, t := range m.TagList() {
|
||||
s = append(s, tagFieldSchema(t))
|
||||
r = append(r, t.Value)
|
||||
}
|
||||
|
||||
return s, r
|
||||
}
|
||||
|
||||
func tagFieldSchema(t *telegraf.Tag) *bigquery.FieldSchema {
|
||||
return &bigquery.FieldSchema{
|
||||
Name: t.Key,
|
||||
Type: bigquery.StringFieldType,
|
||||
}
|
||||
}
|
||||
|
||||
func valuesSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) {
|
||||
for _, f := range m.FieldList() {
|
||||
s = append(s, valuesSchema(f))
|
||||
r = append(r, f.Value)
|
||||
}
|
||||
|
||||
return s, r
|
||||
}
|
||||
|
||||
func valuesSchema(f *telegraf.Field) *bigquery.FieldSchema {
|
||||
return &bigquery.FieldSchema{
|
||||
Name: f.Key,
|
||||
Type: valueToBqType(f.Value),
|
||||
}
|
||||
}
|
||||
|
||||
func valueToBqType(v interface{}) bigquery.FieldType {
|
||||
switch reflect.ValueOf(v).Kind() {
|
||||
case reflect.Int, reflect.Int16, reflect.Int32, reflect.Int64:
|
||||
return bigquery.IntegerFieldType
|
||||
case reflect.Float32, reflect.Float64:
|
||||
return bigquery.FloatFieldType
|
||||
case reflect.Bool:
|
||||
return bigquery.BooleanFieldType
|
||||
default:
|
||||
return bigquery.StringFieldType
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BigQuery) insertToTable(metricName string, metrics []bigquery.ValueSaver) {
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithTimeout(ctx, s.Timeout.Duration)
|
||||
defer cancel()
|
||||
|
||||
tableName := s.metricToTable(metricName)
|
||||
table := s.client.DatasetInProject(s.Project, s.Dataset).Table(tableName)
|
||||
inserter := table.Inserter()
|
||||
|
||||
if err := inserter.Put(ctx, metrics); err != nil {
|
||||
s.Log.Errorf("inserting metric %q failed: %v", metricName, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BigQuery) metricToTable(metricName string) string {
|
||||
if !strings.Contains(metricName, "-") {
|
||||
return metricName
|
||||
}
|
||||
|
||||
dhm := strings.ReplaceAll(metricName, "-", s.ReplaceHyphenTo)
|
||||
|
||||
if warned := s.warnedOnHyphens[metricName]; !warned {
|
||||
s.Log.Warnf("Metric %q contains hyphens please consider using the rename processor plugin, falling back to %q", metricName, dhm)
|
||||
s.warnedOnHyphens[metricName] = true
|
||||
}
|
||||
|
||||
return dhm
|
||||
}
|
||||
|
||||
// Close will terminate the session to the backend, returning error if an issue arises.
|
||||
func (s *BigQuery) Close() error {
|
||||
return s.client.Close()
|
||||
}
|
||||
|
||||
func init() {
|
||||
outputs.Add("bigquery", func() telegraf.Output {
|
||||
return &BigQuery{
|
||||
Timeout: defaultTimeout,
|
||||
ReplaceHyphenTo: "_",
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -0,0 +1,165 @@
|
|||
package bigquery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"cloud.google.com/go/bigquery"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/api/option"
|
||||
)
|
||||
|
||||
const (
|
||||
successfulResponse = "{\"kind\": \"bigquery#tableDataInsertAllResponse\"}"
|
||||
)
|
||||
|
||||
var testingHost string
|
||||
var testDuration = internal.Duration{Duration: 5 * time.Second}
|
||||
var receivedBody map[string]json.RawMessage
|
||||
|
||||
type Row struct {
|
||||
Tag1 string `json:"tag1"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Value float64 `json:"value"`
|
||||
}
|
||||
|
||||
func TestConnect(t *testing.T) {
|
||||
srv := localBigQueryServer(t)
|
||||
testingHost = strings.ReplaceAll(srv.URL, "http://", "")
|
||||
defer srv.Close()
|
||||
|
||||
b := &BigQuery{
|
||||
Project: "test-project",
|
||||
Dataset: "test-dataset",
|
||||
Timeout: testDuration,
|
||||
}
|
||||
|
||||
cerr := b.setUpTestClient()
|
||||
require.NoError(t, cerr)
|
||||
berr := b.Connect()
|
||||
require.NoError(t, berr)
|
||||
}
|
||||
|
||||
func TestWrite(t *testing.T) {
|
||||
srv := localBigQueryServer(t)
|
||||
testingHost = strings.ReplaceAll(srv.URL, "http://", "")
|
||||
defer srv.Close()
|
||||
|
||||
b := &BigQuery{
|
||||
Project: "test-project",
|
||||
Dataset: "test-dataset",
|
||||
Timeout: testDuration,
|
||||
}
|
||||
|
||||
mockMetrics := testutil.MockMetrics()
|
||||
|
||||
if err := b.setUpTestClient(); err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
if err := b.Connect(); err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if err := b.Write(mockMetrics); err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var rows []map[string]json.RawMessage
|
||||
if err := json.Unmarshal(receivedBody["rows"], &rows); err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
var row Row
|
||||
if err := json.Unmarshal(rows[0]["json"], &row); err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
pt, _ := time.Parse(time.RFC3339, row.Timestamp)
|
||||
require.Equal(t, mockMetrics[0].Tags()["tag1"], row.Tag1)
|
||||
require.Equal(t, mockMetrics[0].Time(), pt)
|
||||
require.Equal(t, mockMetrics[0].Fields()["value"], row.Value)
|
||||
}
|
||||
|
||||
func TestMetricToTableDefault(t *testing.T) {
|
||||
b := &BigQuery{
|
||||
Project: "test-project",
|
||||
Dataset: "test-dataset",
|
||||
Timeout: testDuration,
|
||||
warnedOnHyphens: make(map[string]bool),
|
||||
ReplaceHyphenTo: "_",
|
||||
Log: testutil.Logger{},
|
||||
}
|
||||
|
||||
otn := "table-with-hyphens"
|
||||
ntn := b.metricToTable(otn)
|
||||
|
||||
require.Equal(t, "table_with_hyphens", ntn)
|
||||
require.True(t, b.warnedOnHyphens[otn])
|
||||
}
|
||||
|
||||
func TestMetricToTableCustom(t *testing.T) {
|
||||
log := testutil.Logger{}
|
||||
|
||||
b := &BigQuery{
|
||||
Project: "test-project",
|
||||
Dataset: "test-dataset",
|
||||
Timeout: testDuration,
|
||||
warnedOnHyphens: make(map[string]bool),
|
||||
ReplaceHyphenTo: "*",
|
||||
Log: log,
|
||||
}
|
||||
|
||||
otn := "table-with-hyphens"
|
||||
ntn := b.metricToTable(otn)
|
||||
|
||||
require.Equal(t, "table*with*hyphens", ntn)
|
||||
require.True(t, b.warnedOnHyphens[otn])
|
||||
}
|
||||
|
||||
func (b *BigQuery) setUpTestClient() error {
|
||||
noAuth := option.WithoutAuthentication()
|
||||
endpoints := option.WithEndpoint("http://" + testingHost)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
c, err := bigquery.NewClient(ctx, b.Project, noAuth, endpoints)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.client = c
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func localBigQueryServer(t *testing.T) *httptest.Server {
|
||||
srv := httptest.NewServer(http.NotFoundHandler())
|
||||
|
||||
srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/projects/test-project/datasets/test-dataset/tables/test1/insertAll":
|
||||
decoder := json.NewDecoder(r.Body)
|
||||
|
||||
if err := decoder.Decode(&receivedBody); err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
if _, err := w.Write([]byte(successfulResponse)); err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
})
|
||||
|
||||
return srv
|
||||
}
|
||||
Loading…
Reference in New Issue