feat(inputs.gcs): Google Cloud Storage Input Plugin (#8413)

This commit is contained in:
gkatzioura 2022-09-19 21:01:04 +01:00 committed by GitHub
parent da5e1cd6ee
commit e5ee9e198b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 811 additions and 0 deletions

View File

@ -139,6 +139,7 @@ following works:
- github.com/google/uuid [BSD 3-Clause "New" or "Revised" License](https://github.com/google/uuid/blob/master/LICENSE)
- github.com/googleapis/enterprise-certificate-proxy [Apache License 2.0](https://github.com/googleapis/enterprise-certificate-proxy/blob/main/LICENSE)
- github.com/googleapis/gax-go [BSD 3-Clause "New" or "Revised" License](https://github.com/googleapis/gax-go/blob/master/LICENSE)
- github.com/googleapis/go-type-adapters [Apache License 2.0](https://github.com/googleapis/go-type-adapters/blob/main/LICENSE)
- github.com/gopcua/opcua [MIT License](https://github.com/gopcua/opcua/blob/master/LICENSE)
- github.com/gophercloud/gophercloud [Apache License 2.0](https://github.com/gophercloud/gophercloud/blob/master/LICENSE)
- github.com/gorilla/mux [BSD 3-Clause "New" or "Revised" License](https://github.com/gorilla/mux/blob/master/LICENSE)

3
go.mod
View File

@ -9,6 +9,7 @@ require (
cloud.google.com/go/bigquery v1.40.0
cloud.google.com/go/monitoring v1.5.0
cloud.google.com/go/pubsub v1.25.1
cloud.google.com/go/storage v1.23.0
collectd.org v0.5.0
github.com/Azure/azure-event-hubs-go/v3 v3.3.18
github.com/Azure/azure-kusto-go v0.8.0
@ -279,6 +280,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.5.1 // indirect
github.com/googleapis/go-type-adapters v1.0.0 // indirect
github.com/grid-x/serial v0.0.0-20211107191517-583c7356b3aa // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
@ -410,6 +412,7 @@ require (
gopkg.in/sourcemap.v1 v1.0.5 // indirect
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools v2.2.0+incompatible
honnef.co/go/tools v0.2.2 // indirect
k8s.io/klog/v2 v2.70.1 // indirect
k8s.io/kube-openapi v0.0.0-20220803164354-a70c9af30aea // indirect

6
go.sum
View File

@ -32,6 +32,7 @@ cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Ud
cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2ZA=
cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A=
cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc=
cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU=
cloud.google.com/go v0.104.0 h1:gSmWO7DY1vOm0MVU6DNXM11BWHHsTUmsC5cv1fuW5X8=
cloud.google.com/go v0.104.0/go.mod h1:OO6xxXdJyvuJPcEPBLN9BJPD+jep5G1+2U5B5gkRYtA=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
@ -72,6 +73,7 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y=
cloud.google.com/go/storage v1.23.0 h1:wWRIaDURQA8xxHguFCshYepGlrWIrbBnAmc7wfg07qY=
cloud.google.com/go/storage v1.23.0/go.mod h1:vOEEDNFnciUMhBeT6hsJIn3ieU5cFRmzeLgDvXzfIXc=
code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8=
code.cloudfoundry.org/clock v1.0.0 h1:kFXWQM4bxYvdBw2X8BbBeXwQNgfoWv1vqAk2ZZyBN2o=
code.cloudfoundry.org/clock v1.0.0/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8=
@ -2676,6 +2678,7 @@ golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su
golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.0.0-20220728211354-c7608f3a8462/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
@ -2882,6 +2885,7 @@ golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -3110,6 +3114,7 @@ google.golang.org/api v0.75.0/go.mod h1:pU9QmyHLnzlpar1Mjt4IbapUCy8J+6HD6GeELN69
google.golang.org/api v0.78.0/go.mod h1:1Sg78yoMLOhlQTeF+ARBoytAcH1NNyyl390YMy6rKmw=
google.golang.org/api v0.80.0/go.mod h1:xY3nI94gbvBrE0J6NHXhxOmW97HG7Khjkku6AFB3Hyg=
google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o=
google.golang.org/api v0.85.0/go.mod h1:AqZf8Ep9uZ2pyTvgL+x0D3Zt0eoT9b5E8fmzfu6FO2g=
google.golang.org/api v0.94.0 h1:KtKM9ru3nzQioV1HLlUf1cR7vMYJIpgls5VhAYQXIwA=
google.golang.org/api v0.94.0/go.mod h1:eADj+UBuxkh5zlrSntJghuNeg8HwQ1w5lTKkuqaETEI=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
@ -3215,6 +3220,7 @@ google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335/go.mod h1:RAyBrSAP
google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220902135211-223410557253 h1:vXJMM8Shg7TGaYxZsQ++A/FOSlbDmDtWhS/o+3w/hj4=
google.golang.org/genproto v0.0.0-20220902135211-223410557253/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk=

View File

@ -0,0 +1,5 @@
//go:build !custom || inputs || inputs.google_cloud_storage
package all
import _ "github.com/influxdata/telegraf/plugins/inputs/google_cloud_storage" // register plugin

View File

@ -0,0 +1,73 @@
# Google Cloud Storage Input Plugin
The Google Cloud Storage plugin will collect metrics
on the given Google Cloud Storage Buckets.
## Configuration
```toml @sample.conf
# Gather metrics by iterating the files located on a Cloud Storage Bucket.
[[inputs.google_cloud_storage]]
## Required. Name of Cloud Storage bucket to ingest metrics from.
bucket = "my-bucket"
## Optional. Prefix of Cloud Storage bucket keys to list metrics from.
# key_prefix = "my-bucket"
## Key that will store the offsets in order to pick up where the ingestion was left.
offset_key = "offset_key"
## Key that will store the offsets in order to pick up where the ingestion was left.
objects_per_iteration = 10
## Required. Data format to consume.
## Each data format has its own unique set of configuration options.
## Read more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Optional. Filepath for GCP credentials JSON file to authorize calls to
## Google Cloud Storage APIs. If not set explicitly, Telegraf will attempt to use
## Application Default Credentials, which is preferred.
# credentials_file = "path/to/my/creds.json"
```
## Metrics
- Measurements will reside on Google Cloud Storage with the format specified
- example when [[inputs.google_cloud_storage.data_format]] is json
```json
{
"metrics": [
{
"fields": {
"cosine": 10,
"sine": -1.0975806427415925e-12
},
"name": "cpu",
"tags": {
"datacenter": "us-east-1",
"host": "localhost"
},
"timestamp": 1604148850990
}
]
}
```
## Example Output
The example output
```shell
2022-09-17T17:52:19Z I! Starting Telegraf 1.25.0-a93ec9a0
2022-09-17T17:52:19Z I! Available plugins: 209 inputs, 9 aggregators, 26 processors, 20 parsers, 57 outputs
2022-09-17T17:52:19Z I! Loaded inputs: google_cloud_storage
2022-09-17T17:52:19Z I! Loaded aggregators:
2022-09-17T17:52:19Z I! Loaded processors:
2022-09-17T17:52:19Z I! Loaded outputs: influxdb
2022-09-17T17:52:19Z I! Tags enabled: host=user-N9RXNWKWY3
2022-09-17T17:52:19Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"user-N9RXNWKWY3", Flush Interval:10s
```

View File

@ -0,0 +1,284 @@
//go:generate ../../../tools/readme_config_includer/generator
package gcs
import (
"bytes"
"context"
_ "embed"
"encoding/json"
"fmt"
"io"
"os"
"cloud.google.com/go/storage"
"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
const (
emulatorHostEnv = "STORAGE_EMULATOR_HOST"
defaultOffSetKey = "offset-key.json"
)
//go:embed sample.conf
var sampleConfig string
type GCS struct {
CredentialsFile string `toml:"credentials_file"`
Bucket string `toml:"bucket"`
Prefix string `toml:"key_prefix"`
OffsetKey string `toml:"offset_key"`
ObjectsPerIteration int `toml:"objects_per_iteration"`
Log telegraf.Logger
offSet OffSet
parser parsers.Parser
client *storage.Client
ctx context.Context
}
type OffSet struct {
OffSet string `json:"offSet"`
}
func NewEmptyOffset() *OffSet {
return &OffSet{OffSet: ""}
}
func NewOffset(offset string) *OffSet {
return &OffSet{OffSet: offset}
}
func (offSet *OffSet) isPresent() bool {
return offSet.OffSet != ""
}
func (gcs *GCS) SampleConfig() string {
return sampleConfig
}
func (gcs *GCS) SetParser(parser parsers.Parser) {
gcs.parser = parser
}
func (gcs *GCS) Gather(acc telegraf.Accumulator) error {
query := gcs.createQuery()
bucketName := gcs.Bucket
bucket := gcs.client.Bucket(bucketName)
it := bucket.Objects(gcs.ctx, &query)
processed := 0
var name string
for {
attrs, err := it.Next()
if err == iterator.Done {
gcs.Log.Infof("Iterated all the keys")
break
}
if err != nil {
gcs.Log.Errorf("Error during iteration of keys", err)
return err
}
name = attrs.Name
if !gcs.shoudIgnore(name) {
if err := gcs.processMeasurementsInObject(name, bucket, acc); err != nil {
gcs.Log.Errorf("Could not process object: %v in bucket: %v", name, bucketName, err)
acc.AddError(fmt.Errorf("COULD NOT PROCESS OBJECT: %v IN BUCKET: %v", name, err))
}
}
processed++
if gcs.reachedThreshlod(processed) {
return gcs.updateOffset(bucket, name)
}
}
return gcs.updateOffset(bucket, name)
}
func (gcs *GCS) createQuery() storage.Query {
if gcs.offSet.isPresent() {
return storage.Query{Prefix: gcs.Prefix, StartOffset: gcs.offSet.OffSet}
}
return storage.Query{Prefix: gcs.Prefix}
}
func (gcs *GCS) shoudIgnore(name string) bool {
return gcs.offSet.OffSet == name || gcs.OffsetKey == name
}
func (gcs *GCS) processMeasurementsInObject(name string, bucket *storage.BucketHandle, acc telegraf.Accumulator) error {
gcs.Log.Debugf("Fetching key: %s", name)
r, err := bucket.Object(name).NewReader(gcs.ctx)
defer gcs.closeReader(r)
if err != nil {
return err
}
metrics, err := gcs.fetchedMetrics(r)
if err != nil {
return err
}
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}
func (gcs *GCS) fetchedMetrics(r *storage.Reader) ([]telegraf.Metric, error) {
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(r); err != nil {
return nil, err
}
return gcs.parser.Parse(buf.Bytes())
}
func (gcs *GCS) reachedThreshlod(processed int) bool {
return gcs.ObjectsPerIteration != 0 && processed >= gcs.ObjectsPerIteration
}
func (gcs *GCS) updateOffset(bucket *storage.BucketHandle, name string) error {
if gcs.shoudIgnore(name) {
return nil
}
offsetModel := NewOffset(name)
marshalled, err := json.Marshal(offsetModel)
if err != nil {
return err
}
offsetKey := bucket.Object(gcs.OffsetKey)
writer := offsetKey.NewWriter(gcs.ctx)
writer.ContentType = "application/json"
defer writer.Close()
if _, err := writer.Write(marshalled); err != nil {
return err
}
gcs.offSet = *offsetModel
return nil
}
func (gcs *GCS) Init() error {
gcs.ctx = context.Background()
err := gcs.setUpClient()
if err != nil {
gcs.Log.Error("Could not create client", err)
return err
}
return gcs.setOffset()
}
func (gcs *GCS) setUpClient() error {
if endpoint, present := os.LookupEnv(emulatorHostEnv); present {
return gcs.setUpLocalClient(endpoint)
}
return gcs.setUpDefaultClient()
}
func (gcs *GCS) setUpLocalClient(endpoint string) error {
noAuth := option.WithoutAuthentication()
endpoints := option.WithEndpoint("http://" + endpoint)
client, err := storage.NewClient(gcs.ctx, noAuth, endpoints)
if err != nil {
return err
}
gcs.client = client
return nil
}
func (gcs *GCS) setUpDefaultClient() error {
var credentialsOption option.ClientOption
if gcs.CredentialsFile != "" {
credentialsOption = option.WithCredentialsFile(gcs.CredentialsFile)
} else {
creds, err := google.FindDefaultCredentials(gcs.ctx, storage.ScopeReadOnly)
if err != nil {
return fmt.Errorf(
"unable to find GCP Application Default Credentials: %v."+
"Either set ADC or provide CredentialsFile config", err)
}
credentialsOption = option.WithCredentials(creds)
}
client, err := storage.NewClient(gcs.ctx, credentialsOption)
gcs.client = client
return err
}
func (gcs *GCS) setOffset() error {
if gcs.client == nil {
return fmt.Errorf("CANNOT SET OFFSET IF CLIENT IS NOT SET")
}
if gcs.OffsetKey != "" {
gcs.OffsetKey = gcs.Prefix + gcs.OffsetKey
} else {
gcs.OffsetKey = gcs.Prefix + defaultOffSetKey
}
btk := gcs.client.Bucket(gcs.Bucket)
obj := btk.Object(gcs.OffsetKey)
var offSet OffSet
if r, err := obj.NewReader(gcs.ctx); err == nil {
defer gcs.closeReader(r)
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, r); err == nil {
if marshalError := json.Unmarshal(buf.Bytes(), &offSet); marshalError != nil {
return marshalError
}
}
} else {
offSet = *NewEmptyOffset()
}
gcs.offSet = offSet
return nil
}
func init() {
inputs.Add("google_cloud_storage", func() telegraf.Input {
gcs := &GCS{}
return gcs
})
}
func (gcs *GCS) closeReader(r *storage.Reader) {
if err := r.Close(); err != nil {
gcs.Log.Errorf("Could not close reader", err)
}
}

View File

@ -0,0 +1,415 @@
package gcs
import (
"encoding/json"
"fmt"
"io"
"mime"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"github.com/influxdata/telegraf/plugins/parsers"
_ "github.com/influxdata/telegraf/plugins/parsers/all"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"gotest.tools/assert"
)
const (
singleObjectNotFound = "{\"error\":{\"code\":404,\"message\":\"No such object: test-iteration-bucket/prefix/offset-key.json\",\"errors\":[{\"message\":\"No such object: test-iteration-bucket/prefix/offset-key.json\",\"domain\":\"global\",\"reason\":\"notFound\"}]}}"
singleFileList = "{\"kind\":\"storage#objects\",\"items\":[{\"kind\":\"storage#object\",\"id\":\"test-iteration-bucket/1604148850990/1604148851295698\",\"selfLink\":\"https://www.googleapis.com/storage/v1/b/1604148850990/o/1604148850990\",\"mediaLink\":\"https://content-storage.googleapis.com/download/storage/v1/b/test-iteration-bucket/o/1604148850990?generation=1604148851295698&alt=media\",\"name\":\"1604148850990\",\"bucket\":\"test-iteration-bucket\",\"generation\":\"1604148851295698\",\"metageneration\":\"1\",\"contentType\":\"text/plain; charset=utf-8\",\"storageClass\":\"STANDARD\",\"size\":\"161\",\"md5Hash\":\"y59iuRCTpkm7wpvU5YHUYw==\",\"crc32c\":\"y57reA==\",\"etag\":\"CNKLy5Pw3uwCEAE=\",\"timeCreated\":\"2020-10-31T12:54:11.295Z\",\"updated\":\"2020-10-31T12:54:11.295Z\",\"timeStorageClassUpdated\":\"2020-10-31T12:54:11.295Z\"}]}"
firstFile = "{\"metrics\":[{\"fields\":{\"cosine\":10,\"sine\":-1.0975806427415925e-12},\"name\":\"cpu\",\"tags\":{\"datacenter\":\"us-east-1\",\"host\":\"localhost\"},\"timestamp\":1604148850991}]}"
secondFile = "{\"metrics\":[{\"fields\":{\"cosine\":11,\"sine\":-2.0975806427415925e-12},\"name\":\"cpu\",\"tags\":{\"datacenter\":\"us-east-1\",\"host\":\"localhost\"},\"timestamp\":1604148850992}]}"
thirdFile = "{\"metrics\":[{\"fields\":{\"cosine\":12,\"sine\":-3.0975806427415925e-12},\"name\":\"cpu\",\"tags\":{\"datacenter\":\"us-east-1\",\"host\":\"localhost\"},\"timestamp\":1604148850993}]}"
fourthFile = "{\"metrics\":[{\"fields\":{\"cosine\":13,\"sine\":-4.0975806427415925e-12},\"name\":\"cpu\",\"tags\":{\"datacenter\":\"us-east-1\",\"host\":\"localhost\"},\"timestamp\":1604148850994}]}"
firstFileListing = "{\"kind\":\"storage#object\",\"id\":\"test-iteration-bucket/prefix/1604148850991/1604148851353983\",\"selfLink\":\"https://www.googleapis.com/storage/v1/b/test-iteration-bucket/o/1604148850991\",\"mediaLink\":\"https://content-storage.googleapis.com/download/storage/v1/b/test-iteration-bucket/o/1604148850991?generation=1604148851353983&alt=media\",\"name\":\"prefix/1604148850991\",\"bucket\":\"test-iteration-bucket\",\"generation\":\"1604148851353983\",\"metageneration\":\"1\",\"contentType\":\"text/plain; charset=utf-8\",\"storageClass\":\"STANDARD\",\"size\":\"161\",\"md5Hash\":\"y59iuRCTpkm7wpvU5YHUYw==\",\"crc32c\":\"y57reA==\",\"etag\":\"CP/SzpPw3uwCEAE=\",\"timeCreated\":\"2020-10-31T12:54:11.353Z\",\"updated\":\"2020-10-31T12:54:11.353Z\",\"timeStorageClassUpdated\":\"2020-10-31T12:54:11.353Z\"}"
secondFileListing = "{\"kind\":\"storage#object\",\"id\":\"test-iteration-bucket/prefix/1604148850992/1604148851414237\",\"selfLink\":\"https://www.googleapis.com/storage/v1/b/test-iteration-bucket/o/1604148850992\",\"mediaLink\":\"https://content-storage.googleapis.com/download/storage/v1/b/test-iteration-bucket/o/1604148850992?generation=1604148851414237&alt=media\",\"name\":\"prefix/1604148850992\",\"bucket\":\"test-iteration-bucket\",\"generation\":\"1604148851414237\",\"metageneration\":\"1\",\"contentType\":\"text/plain; charset=utf-8\",\"storageClass\":\"STANDARD\",\"size\":\"161\",\"md5Hash\":\"y59iuRCTpkm7wpvU5YHUYw==\",\"crc32c\":\"y57reA==\",\"etag\":\"CN2p0pPw3uwCEAE=\",\"timeCreated\":\"2020-10-31T12:54:11.414Z\",\"updated\":\"2020-10-31T12:54:11.414Z\",\"timeStorageClassUpdated\":\"2020-10-31T12:54:11.414Z\"}"
thirdFileListing = "{\"kind\":\"storage#object\",\"id\":\"test-iteration-bucket/prefix/1604148850993/1604148851467554\",\"selfLink\":\"https://www.googleapis.com/storage/v1/b/test-iteration-bucket/o/1604148850993\",\"mediaLink\":\"https://content-storage.googleapis.com/download/storage/v1/b/test-iteration-bucket/o/1604148850993?generation=1604148851467554&alt=media\",\"name\":\"prefix/1604148850993\",\"bucket\":\"test-iteration-bucket\",\"generation\":\"1604148851467554\",\"metageneration\":\"1\",\"contentType\":\"text/plain; charset=utf-8\",\"storageClass\":\"STANDARD\",\"size\":\"161\",\"md5Hash\":\"y59iuRCTpkm7wpvU5YHUYw==\",\"crc32c\":\"y57reA==\",\"etag\":\"CKLK1ZPw3uwCEAE=\",\"timeCreated\":\"2020-10-31T12:54:11.467Z\",\"updated\":\"2020-10-31T12:54:11.467Z\",\"timeStorageClassUpdated\":\"2020-10-31T12:54:11.467Z\"}"
fourthFileListing = "{\"kind\":\"storage#object\",\"id\":\"test-iteration-bucket/prefix/1604148850994/1604148851467554\",\"selfLink\":\"https://www.googleapis.com/storage/v1/b/test-iteration-bucket/o/1604148850994\",\"mediaLink\":\"https://content-storage.googleapis.com/download/storage/v1/b/test-iteration-bucket/o/1604148850994?generation=1604148851467554&alt=media\",\"name\":\"prefix/1604148850994\",\"bucket\":\"test-iteration-bucket\",\"generation\":\"1604148851467554\",\"metageneration\":\"1\",\"contentType\":\"text/plain; charset=utf-8\",\"storageClass\":\"STANDARD\",\"size\":\"161\",\"md5Hash\":\"y59iuRCTpkm7wpvU5YHUYw==\",\"crc32c\":\"y57reA==\",\"etag\":\"CKLK1ZPw3uwCEAE=\",\"timeCreated\":\"2020-10-31T12:54:11.467Z\",\"updated\":\"2020-10-31T12:54:11.467Z\",\"timeStorageClassUpdated\":\"2020-10-31T12:54:11.467Z\"}"
fileListing = "{\"kind\":\"storage#objects\"}"
offSetTemplate = "{\"offSet\":\"%s\"}"
)
var objListing = parseJSONFromText(fileListing)
var firstElement = parseJSONFromText(firstFileListing)
var secondElement = parseJSONFromText(secondFileListing)
var thirdElement = parseJSONFromText(thirdFileListing)
var fourthElement = parseJSONFromText(fourthFileListing)
func TestRunSetUpClient(t *testing.T) {
gcs := &GCS{
Bucket: "test-bucket",
Prefix: "prefix",
OffsetKey: "1230405",
Log: testutil.Logger{},
}
if err := gcs.setUpClient(); err != nil {
t.Log(err)
}
}
func TestRunInit(t *testing.T) {
srv := startGCSServer(t)
defer srv.Close()
emulatorSetEnv(t, srv)
gcs := &GCS{
Bucket: "test-bucket",
Prefix: "prefix/",
OffsetKey: "offset.json",
Log: testutil.Logger{},
}
require.NoError(t, gcs.Init())
assert.Equal(t, "offsetfile", gcs.offSet.OffSet)
}
func TestRunInitNoOffsetKey(t *testing.T) {
srv := startGCSServer(t)
defer srv.Close()
emulatorSetEnv(t, srv)
gcs := &GCS{
Bucket: "test-bucket",
Prefix: "prefix/",
Log: testutil.Logger{},
}
require.NoError(t, gcs.Init())
assert.Equal(t, "offsetfile", gcs.offSet.OffSet)
assert.Equal(t, "prefix/offset-key.json", gcs.OffsetKey)
}
func TestRunGatherOneItem(t *testing.T) {
srv := startOneItemGCSServer(t)
defer srv.Close()
emulatorSetEnv(t, srv)
acc := &testutil.Accumulator{}
gcs := &GCS{
Bucket: "test-iteration-bucket",
Prefix: "prefix/",
Log: testutil.Logger{},
parser: createParser(),
}
require.NoError(t, gcs.Init())
require.NoError(t, gcs.Gather(acc))
metric := acc.Metrics[0]
assert.Equal(t, "cpu", metric.Measurement)
assert.Equal(t, "us-east-1", metric.Tags["tags_datacenter"])
assert.Equal(t, "localhost", metric.Tags["tags_host"])
assert.Equal(t, 10.0, metric.Fields["fields_cosine"])
assert.Equal(t, -1.0975806427415925e-12, metric.Fields["fields_sine"])
}
func TestRunGatherOneIteration(t *testing.T) {
srv := startMultipleItemGCSServer(t)
defer srv.Close()
emulatorSetEnv(t, srv)
gcs := &GCS{
Bucket: "test-iteration-bucket",
Prefix: "prefix/",
OffsetKey: "custom-offset-key.json",
Log: testutil.Logger{},
parser: createParser(),
}
acc := &testutil.Accumulator{}
require.NoError(t, gcs.Init())
require.NoError(t, gcs.Gather(acc))
assert.Equal(t, 3, len(acc.Metrics))
}
func TestRunGatherIteratiosnWithLimit(t *testing.T) {
srv := startMultipleItemGCSServer(t)
defer srv.Close()
emulatorSetEnv(t, srv)
gcs := &GCS{
Bucket: "test-iteration-bucket",
Prefix: "prefix/",
ObjectsPerIteration: 1,
OffsetKey: "custom-offset-key.json",
Log: testutil.Logger{},
parser: createParser(),
}
acc := &testutil.Accumulator{}
require.NoError(t, gcs.Init())
require.NoError(t, gcs.Gather(acc))
assert.Equal(t, 1, len(acc.Metrics))
require.NoError(t, gcs.Gather(acc))
assert.Equal(t, 2, len(acc.Metrics))
require.NoError(t, gcs.Gather(acc))
assert.Equal(t, 3, len(acc.Metrics))
}
func TestRunGatherIterationWithPages(t *testing.T) {
srv := stateFulGCSServer(t)
defer srv.Close()
emulatorSetEnv(t, srv)
gcs := &GCS{
Bucket: "test-iteration-bucket",
Prefix: "prefix/",
OffsetKey: "custom-offset-key.json",
Log: testutil.Logger{},
parser: createParser(),
}
acc := &testutil.Accumulator{}
require.NoError(t, gcs.Init())
require.NoError(t, gcs.Gather(acc))
assert.Equal(t, 4, len(acc.Metrics))
assert.Equal(t, true, gcs.offSet.isPresent())
assert.Equal(t, "prefix/1604148850994", gcs.offSet.OffSet)
emptyAcc := &testutil.Accumulator{}
require.NoError(t, gcs.Gather(emptyAcc))
assert.Equal(t, 0, len(emptyAcc.Metrics))
}
func createParser() parsers.Parser {
testParser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "cpu",
JSONQuery: "metrics",
TagKeys: []string{"tags_datacenter", "tags_host"},
JSONTimeKey: "timestamp",
JSONTimeFormat: "unix_ms",
})
return testParser
}
func startGCSServer(t *testing.T) *httptest.Server {
srv := httptest.NewServer(http.NotFoundHandler())
currentOffSetKey := fmt.Sprintf(offSetTemplate, "offsetfile")
srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/test-bucket/prefix/offset.json":
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(currentOffSetKey))
require.NoError(t, err)
case "/test-bucket/prefix/offset-key.json":
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{\"offSet\":\"offsetfile\"}"))
require.NoError(t, err)
default:
failPath(r.URL.Path, t, w)
}
})
return srv
}
func startOneItemGCSServer(t *testing.T) *httptest.Server {
srv := httptest.NewServer(http.NotFoundHandler())
srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/b/test-iteration-bucket/o":
serveJSONText(w, singleFileList)
default:
serveBlobs(r.URL.Path, "", t, w)
}
})
return srv
}
func startMultipleItemGCSServer(t *testing.T) *httptest.Server {
srv := httptest.NewServer(http.NotFoundHandler())
currentOffSetKey := fmt.Sprintf(offSetTemplate, "prefix/1604148850991")
srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/b/test-iteration-bucket/o":
offset := r.URL.Query().Get("startOffset")
if offset == "prefix/1604148850990" {
objListing["items"] = []interface{}{firstElement, secondElement, thirdElement, fourthElement}
} else if offset == "prefix/1604148850991" {
objListing["items"] = []interface{}{secondElement, thirdElement, fourthElement}
} else if offset == "prefix/16041488509912" {
objListing["items"] = []interface{}{thirdElement, fourthElement}
} else if offset == "prefix/16041488509913" {
objListing["items"] = []interface{}{thirdElement, fourthElement}
} else {
objListing["items"] = []interface{}{firstElement, secondElement, thirdElement, fourthElement}
}
if data, err := json.Marshal(objListing); err == nil {
w.WriteHeader(http.StatusOK)
_, err := w.Write(data)
require.NoError(t, err)
} else {
w.WriteHeader(http.StatusNotFound)
t.Fatalf("unexpected path: " + r.URL.Path)
}
default:
serveBlobs(r.URL.Path, currentOffSetKey, t, w)
}
})
return srv
}
func stateFulGCSServer(t *testing.T) *httptest.Server {
srv := httptest.NewServer(http.NotFoundHandler())
currentOffSetKey := fmt.Sprintf(offSetTemplate, "prefix/1604148850990")
srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/b/test-iteration-bucket/o":
offset := r.URL.Query().Get("startOffset")
objListing := parseJSONFromText(fileListing)
pageToken := r.URL.Query().Get("pageToken")
if pageToken == "page2" {
objListing["items"] = []interface{}{secondElement}
objListing["nextPageToken"] = "page3"
} else if pageToken == "page3" {
objListing["items"] = []interface{}{thirdElement}
objListing["nextPageToken"] = "page4"
} else if pageToken == "page4" {
objListing["items"] = []interface{}{fourthElement}
} else if offset == "prefix/1604148850994" {
objListing["items"] = []interface{}{}
} else {
objListing["items"] = []interface{}{firstElement}
objListing["nextPageToken"] = "page2"
}
if data, err := json.Marshal(objListing); err == nil {
w.WriteHeader(http.StatusOK)
_, err := w.Write(data)
require.NoError(t, err)
} else {
failPath(r.URL.Path, t, w)
}
case "/upload/storage/v1/b/test-iteration-bucket/o":
_, params, _ := mime.ParseMediaType(r.Header["Content-Type"][0])
boundary := params["boundary"]
currentOffSetKey, _ = fetchJSON(t, boundary, r.Body)
default:
serveBlobs(r.URL.Path, currentOffSetKey, t, w)
}
})
return srv
}
func serveBlobs(urlPath string, offsetKey string, t *testing.T, w http.ResponseWriter) {
switch urlPath {
case "/test-iteration-bucket/prefix/offset-key.json":
w.WriteHeader(http.StatusNotFound)
_, err := w.Write([]byte(singleObjectNotFound))
require.NoError(t, err)
case "/test-bucket/prefix/offset.json":
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(offsetKey))
require.NoError(t, err)
case "/test-bucket/prefix/offset-key.json":
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{\"offSet\":\"offsetfile\"}"))
require.NoError(t, err)
case "/test-iteration-bucket/prefix/custom-offset-key.json":
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(offsetKey))
require.NoError(t, err)
case "/test-iteration-bucket/1604148850990":
serveJSONText(w, firstFile)
case "/test-iteration-bucket/prefix/1604148850991":
serveJSONText(w, firstFile)
case "/test-iteration-bucket/prefix/1604148850992":
serveJSONText(w, secondFile)
case "/test-iteration-bucket/prefix/1604148850993":
serveJSONText(w, thirdFile)
case "/test-iteration-bucket/prefix/1604148850994":
serveJSONText(w, fourthFile)
case "/upload/storage/v1/b/test-iteration-bucket/o":
w.WriteHeader(http.StatusOK)
default:
failPath(urlPath, t, w)
}
}
func fetchJSON(t *testing.T, boundary string, rc io.ReadCloser) (string, error) {
defer rc.Close()
bodyBytes, err := io.ReadAll(rc)
if err != nil {
t.Fatalf("Could not read bytes from offset action")
return "", err
}
splits := strings.Split(string(bodyBytes), boundary)
offsetPart := splits[2]
offsets := strings.Split(offsetPart, "\n")
fmt.Printf("%s", offsets[3])
return offsets[3], nil
}
func serveJSONText(w http.ResponseWriter, jsonText string) {
w.WriteHeader(http.StatusOK)
if _, err := w.Write([]byte(jsonText)); err != nil {
fmt.Println(err)
}
}
func failPath(path string, t *testing.T, w http.ResponseWriter) {
w.WriteHeader(http.StatusNotFound)
t.Fatalf("unexpected path: " + path)
}
func parseJSONFromText(jsonText string) map[string]interface{} {
var element map[string]interface{}
if err := json.Unmarshal([]byte(jsonText), &element); err != nil {
fmt.Println(err)
}
return element
}
func emulatorSetEnv(t *testing.T, srv *httptest.Server) {
if err := os.Setenv("STORAGE_EMULATOR_HOST", strings.ReplaceAll(srv.URL, "http://", "")); err != nil {
t.Error(err)
}
}

View File

@ -0,0 +1,24 @@
# Gather metrics by iterating the files located on a Cloud Storage Bucket.
[[inputs.google_cloud_storage]]
## Required. Name of Cloud Storage bucket to ingest metrics from.
bucket = "my-bucket"
## Optional. Prefix of Cloud Storage bucket keys to list metrics from.
# key_prefix = "my-bucket"
## Key that will store the offsets in order to pick up where the ingestion was left.
offset_key = "offset_key"
## Key that will store the offsets in order to pick up where the ingestion was left.
objects_per_iteration = 10
## Required. Data format to consume.
## Each data format has its own unique set of configuration options.
## Read more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
## Optional. Filepath for GCP credentials JSON file to authorize calls to
## Google Cloud Storage APIs. If not set explicitly, Telegraf will attempt to use
## Application Default Credentials, which is preferred.
# credentials_file = "path/to/my/creds.json"