From e5ee9e198bc267cf9d1b79b91e42695343dbcc0e Mon Sep 17 00:00:00 2001 From: gkatzioura Date: Mon, 19 Sep 2022 21:01:04 +0100 Subject: [PATCH] feat(inputs.gcs): Google Cloud Storage Input Plugin (#8413) --- docs/LICENSE_OF_DEPENDENCIES.md | 1 + go.mod | 3 + go.sum | 6 + plugins/inputs/all/google_cloud_storage.go | 5 + plugins/inputs/google_cloud_storage/README.md | 73 +++ .../google_cloud_storage.go | 284 ++++++++++++ .../google_cloud_storage_test.go | 415 ++++++++++++++++++ .../inputs/google_cloud_storage/sample.conf | 24 + 8 files changed, 811 insertions(+) create mode 100644 plugins/inputs/all/google_cloud_storage.go create mode 100644 plugins/inputs/google_cloud_storage/README.md create mode 100644 plugins/inputs/google_cloud_storage/google_cloud_storage.go create mode 100644 plugins/inputs/google_cloud_storage/google_cloud_storage_test.go create mode 100644 plugins/inputs/google_cloud_storage/sample.conf diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 54d8ee7a3..62caa9ae1 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -139,6 +139,7 @@ following works: - github.com/google/uuid [BSD 3-Clause "New" or "Revised" License](https://github.com/google/uuid/blob/master/LICENSE) - github.com/googleapis/enterprise-certificate-proxy [Apache License 2.0](https://github.com/googleapis/enterprise-certificate-proxy/blob/main/LICENSE) - github.com/googleapis/gax-go [BSD 3-Clause "New" or "Revised" License](https://github.com/googleapis/gax-go/blob/master/LICENSE) +- github.com/googleapis/go-type-adapters [Apache License 2.0](https://github.com/googleapis/go-type-adapters/blob/main/LICENSE) - github.com/gopcua/opcua [MIT License](https://github.com/gopcua/opcua/blob/master/LICENSE) - github.com/gophercloud/gophercloud [Apache License 2.0](https://github.com/gophercloud/gophercloud/blob/master/LICENSE) - github.com/gorilla/mux [BSD 3-Clause "New" or "Revised" License](https://github.com/gorilla/mux/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 07f01470d..605f8f0b4 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( cloud.google.com/go/bigquery v1.40.0 cloud.google.com/go/monitoring v1.5.0 cloud.google.com/go/pubsub v1.25.1 + cloud.google.com/go/storage v1.23.0 collectd.org v0.5.0 github.com/Azure/azure-event-hubs-go/v3 v3.3.18 github.com/Azure/azure-kusto-go v0.8.0 @@ -279,6 +280,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect github.com/googleapis/gax-go/v2 v2.5.1 // indirect + github.com/googleapis/go-type-adapters v1.0.0 // indirect github.com/grid-x/serial v0.0.0-20211107191517-583c7356b3aa // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect @@ -410,6 +412,7 @@ require ( gopkg.in/sourcemap.v1 v1.0.5 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + gotest.tools v2.2.0+incompatible honnef.co/go/tools v0.2.2 // indirect k8s.io/klog/v2 v2.70.1 // indirect k8s.io/kube-openapi v0.0.0-20220803164354-a70c9af30aea // indirect diff --git a/go.sum b/go.sum index 582a57252..72353eb70 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,7 @@ cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Ud cloud.google.com/go v0.99.0/go.mod h1:w0Xx2nLzqWJPuozYQX+hFfCSI8WioryfRDzkoI/Y2ZA= cloud.google.com/go v0.100.2/go.mod h1:4Xra9TjzAeYHrl5+oeLlzbM2k3mjVhZh4UqTZ//w99A= cloud.google.com/go v0.102.0/go.mod h1:oWcCzKlqJ5zgHQt9YsaeTY9KzIvjyy0ArmiBUgpQ+nc= +cloud.google.com/go v0.102.1/go.mod h1:XZ77E9qnTEnrgEOvr4xzfdX5TRo7fB4T2F4O6+34hIU= cloud.google.com/go v0.104.0 h1:gSmWO7DY1vOm0MVU6DNXM11BWHHsTUmsC5cv1fuW5X8= cloud.google.com/go v0.104.0/go.mod h1:OO6xxXdJyvuJPcEPBLN9BJPD+jep5G1+2U5B5gkRYtA= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= @@ -72,6 +73,7 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= cloud.google.com/go/storage v1.23.0 h1:wWRIaDURQA8xxHguFCshYepGlrWIrbBnAmc7wfg07qY= +cloud.google.com/go/storage v1.23.0/go.mod h1:vOEEDNFnciUMhBeT6hsJIn3ieU5cFRmzeLgDvXzfIXc= code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= code.cloudfoundry.org/clock v1.0.0 h1:kFXWQM4bxYvdBw2X8BbBeXwQNgfoWv1vqAk2ZZyBN2o= code.cloudfoundry.org/clock v1.0.0/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= @@ -2676,6 +2678,7 @@ golang.org/x/net v0.0.0-20220325170049-de3da57026de/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220412020605-290c469a71a5/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220617184016-355a448f1bc9/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.0.0-20220728211354-c7608f3a8462/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= @@ -2882,6 +2885,7 @@ golang.org/x/sys v0.0.0-20220502124256-b6088ccd6cba/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -3110,6 +3114,7 @@ google.golang.org/api v0.75.0/go.mod h1:pU9QmyHLnzlpar1Mjt4IbapUCy8J+6HD6GeELN69 google.golang.org/api v0.78.0/go.mod h1:1Sg78yoMLOhlQTeF+ARBoytAcH1NNyyl390YMy6rKmw= google.golang.org/api v0.80.0/go.mod h1:xY3nI94gbvBrE0J6NHXhxOmW97HG7Khjkku6AFB3Hyg= google.golang.org/api v0.84.0/go.mod h1:NTsGnUFJMYROtiquksZHBWtHfeMC7iYthki7Eq3pa8o= +google.golang.org/api v0.85.0/go.mod h1:AqZf8Ep9uZ2pyTvgL+x0D3Zt0eoT9b5E8fmzfu6FO2g= google.golang.org/api v0.94.0 h1:KtKM9ru3nzQioV1HLlUf1cR7vMYJIpgls5VhAYQXIwA= google.golang.org/api v0.94.0/go.mod h1:eADj+UBuxkh5zlrSntJghuNeg8HwQ1w5lTKkuqaETEI= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -3215,6 +3220,7 @@ google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335/go.mod h1:RAyBrSAP google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220616135557-88e70c0c3a90/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= +google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/genproto v0.0.0-20220902135211-223410557253 h1:vXJMM8Shg7TGaYxZsQ++A/FOSlbDmDtWhS/o+3w/hj4= google.golang.org/genproto v0.0.0-20220902135211-223410557253/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk= diff --git a/plugins/inputs/all/google_cloud_storage.go b/plugins/inputs/all/google_cloud_storage.go new file mode 100644 index 000000000..3456f28dc --- /dev/null +++ b/plugins/inputs/all/google_cloud_storage.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.google_cloud_storage + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/google_cloud_storage" // register plugin diff --git a/plugins/inputs/google_cloud_storage/README.md b/plugins/inputs/google_cloud_storage/README.md new file mode 100644 index 000000000..bbbd413af --- /dev/null +++ b/plugins/inputs/google_cloud_storage/README.md @@ -0,0 +1,73 @@ +# Google Cloud Storage Input Plugin + +The Google Cloud Storage plugin will collect metrics +on the given Google Cloud Storage Buckets. + +## Configuration + +```toml @sample.conf +# Gather metrics by iterating the files located on a Cloud Storage Bucket. +[[inputs.google_cloud_storage]] + ## Required. Name of Cloud Storage bucket to ingest metrics from. + bucket = "my-bucket" + + ## Optional. Prefix of Cloud Storage bucket keys to list metrics from. + # key_prefix = "my-bucket" + + ## Key that will store the offsets in order to pick up where the ingestion was left. + offset_key = "offset_key" + + ## Key that will store the offsets in order to pick up where the ingestion was left. + objects_per_iteration = 10 + + ## Required. Data format to consume. + ## Each data format has its own unique set of configuration options. + ## Read more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Optional. Filepath for GCP credentials JSON file to authorize calls to + ## Google Cloud Storage APIs. If not set explicitly, Telegraf will attempt to use + ## Application Default Credentials, which is preferred. + # credentials_file = "path/to/my/creds.json" +``` + +## Metrics + +- Measurements will reside on Google Cloud Storage with the format specified + +- example when [[inputs.google_cloud_storage.data_format]] is json + +```json +{ + "metrics": [ + { + "fields": { + "cosine": 10, + "sine": -1.0975806427415925e-12 + }, + "name": "cpu", + "tags": { + "datacenter": "us-east-1", + "host": "localhost" + }, + "timestamp": 1604148850990 + } + ] +} +``` + +## Example Output + +The example output + +```shell +2022-09-17T17:52:19Z I! Starting Telegraf 1.25.0-a93ec9a0 +2022-09-17T17:52:19Z I! Available plugins: 209 inputs, 9 aggregators, 26 processors, 20 parsers, 57 outputs +2022-09-17T17:52:19Z I! Loaded inputs: google_cloud_storage +2022-09-17T17:52:19Z I! Loaded aggregators: +2022-09-17T17:52:19Z I! Loaded processors: +2022-09-17T17:52:19Z I! Loaded outputs: influxdb +2022-09-17T17:52:19Z I! Tags enabled: host=user-N9RXNWKWY3 +2022-09-17T17:52:19Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"user-N9RXNWKWY3", Flush Interval:10s +``` diff --git a/plugins/inputs/google_cloud_storage/google_cloud_storage.go b/plugins/inputs/google_cloud_storage/google_cloud_storage.go new file mode 100644 index 000000000..477da8dda --- /dev/null +++ b/plugins/inputs/google_cloud_storage/google_cloud_storage.go @@ -0,0 +1,284 @@ +//go:generate ../../../tools/readme_config_includer/generator +package gcs + +import ( + "bytes" + "context" + _ "embed" + "encoding/json" + "fmt" + "io" + "os" + + "cloud.google.com/go/storage" + "golang.org/x/oauth2/google" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" +) + +const ( + emulatorHostEnv = "STORAGE_EMULATOR_HOST" + defaultOffSetKey = "offset-key.json" +) + +//go:embed sample.conf +var sampleConfig string + +type GCS struct { + CredentialsFile string `toml:"credentials_file"` + Bucket string `toml:"bucket"` + + Prefix string `toml:"key_prefix"` + OffsetKey string `toml:"offset_key"` + ObjectsPerIteration int `toml:"objects_per_iteration"` + + Log telegraf.Logger + offSet OffSet + + parser parsers.Parser + client *storage.Client + + ctx context.Context +} + +type OffSet struct { + OffSet string `json:"offSet"` +} + +func NewEmptyOffset() *OffSet { + return &OffSet{OffSet: ""} +} + +func NewOffset(offset string) *OffSet { + return &OffSet{OffSet: offset} +} + +func (offSet *OffSet) isPresent() bool { + return offSet.OffSet != "" +} + +func (gcs *GCS) SampleConfig() string { + return sampleConfig +} + +func (gcs *GCS) SetParser(parser parsers.Parser) { + gcs.parser = parser +} + +func (gcs *GCS) Gather(acc telegraf.Accumulator) error { + query := gcs.createQuery() + + bucketName := gcs.Bucket + bucket := gcs.client.Bucket(bucketName) + it := bucket.Objects(gcs.ctx, &query) + + processed := 0 + + var name string + for { + attrs, err := it.Next() + + if err == iterator.Done { + gcs.Log.Infof("Iterated all the keys") + break + } + + if err != nil { + gcs.Log.Errorf("Error during iteration of keys", err) + return err + } + + name = attrs.Name + + if !gcs.shoudIgnore(name) { + if err := gcs.processMeasurementsInObject(name, bucket, acc); err != nil { + gcs.Log.Errorf("Could not process object: %v in bucket: %v", name, bucketName, err) + acc.AddError(fmt.Errorf("COULD NOT PROCESS OBJECT: %v IN BUCKET: %v", name, err)) + } + } + + processed++ + + if gcs.reachedThreshlod(processed) { + return gcs.updateOffset(bucket, name) + } + } + + return gcs.updateOffset(bucket, name) +} + +func (gcs *GCS) createQuery() storage.Query { + if gcs.offSet.isPresent() { + return storage.Query{Prefix: gcs.Prefix, StartOffset: gcs.offSet.OffSet} + } + + return storage.Query{Prefix: gcs.Prefix} +} + +func (gcs *GCS) shoudIgnore(name string) bool { + return gcs.offSet.OffSet == name || gcs.OffsetKey == name +} + +func (gcs *GCS) processMeasurementsInObject(name string, bucket *storage.BucketHandle, acc telegraf.Accumulator) error { + gcs.Log.Debugf("Fetching key: %s", name) + r, err := bucket.Object(name).NewReader(gcs.ctx) + defer gcs.closeReader(r) + + if err != nil { + return err + } + + metrics, err := gcs.fetchedMetrics(r) + + if err != nil { + return err + } + + for _, metric := range metrics { + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + + return nil +} + +func (gcs *GCS) fetchedMetrics(r *storage.Reader) ([]telegraf.Metric, error) { + buf := new(bytes.Buffer) + if _, err := buf.ReadFrom(r); err != nil { + return nil, err + } + + return gcs.parser.Parse(buf.Bytes()) +} + +func (gcs *GCS) reachedThreshlod(processed int) bool { + return gcs.ObjectsPerIteration != 0 && processed >= gcs.ObjectsPerIteration +} + +func (gcs *GCS) updateOffset(bucket *storage.BucketHandle, name string) error { + if gcs.shoudIgnore(name) { + return nil + } + + offsetModel := NewOffset(name) + marshalled, err := json.Marshal(offsetModel) + + if err != nil { + return err + } + + offsetKey := bucket.Object(gcs.OffsetKey) + writer := offsetKey.NewWriter(gcs.ctx) + writer.ContentType = "application/json" + defer writer.Close() + + if _, err := writer.Write(marshalled); err != nil { + return err + } + + gcs.offSet = *offsetModel + + return nil +} + +func (gcs *GCS) Init() error { + gcs.ctx = context.Background() + err := gcs.setUpClient() + if err != nil { + gcs.Log.Error("Could not create client", err) + return err + } + + return gcs.setOffset() +} + +func (gcs *GCS) setUpClient() error { + if endpoint, present := os.LookupEnv(emulatorHostEnv); present { + return gcs.setUpLocalClient(endpoint) + } + + return gcs.setUpDefaultClient() +} + +func (gcs *GCS) setUpLocalClient(endpoint string) error { + noAuth := option.WithoutAuthentication() + endpoints := option.WithEndpoint("http://" + endpoint) + client, err := storage.NewClient(gcs.ctx, noAuth, endpoints) + + if err != nil { + return err + } + + gcs.client = client + return nil +} + +func (gcs *GCS) setUpDefaultClient() error { + var credentialsOption option.ClientOption + + if gcs.CredentialsFile != "" { + credentialsOption = option.WithCredentialsFile(gcs.CredentialsFile) + } else { + creds, err := google.FindDefaultCredentials(gcs.ctx, storage.ScopeReadOnly) + if err != nil { + return fmt.Errorf( + "unable to find GCP Application Default Credentials: %v."+ + "Either set ADC or provide CredentialsFile config", err) + } + credentialsOption = option.WithCredentials(creds) + } + + client, err := storage.NewClient(gcs.ctx, credentialsOption) + gcs.client = client + return err +} + +func (gcs *GCS) setOffset() error { + if gcs.client == nil { + return fmt.Errorf("CANNOT SET OFFSET IF CLIENT IS NOT SET") + } + + if gcs.OffsetKey != "" { + gcs.OffsetKey = gcs.Prefix + gcs.OffsetKey + } else { + gcs.OffsetKey = gcs.Prefix + defaultOffSetKey + } + + btk := gcs.client.Bucket(gcs.Bucket) + obj := btk.Object(gcs.OffsetKey) + + var offSet OffSet + + if r, err := obj.NewReader(gcs.ctx); err == nil { + defer gcs.closeReader(r) + buf := new(bytes.Buffer) + + if _, err := io.Copy(buf, r); err == nil { + if marshalError := json.Unmarshal(buf.Bytes(), &offSet); marshalError != nil { + return marshalError + } + } + } else { + offSet = *NewEmptyOffset() + } + + gcs.offSet = offSet + + return nil +} + +func init() { + inputs.Add("google_cloud_storage", func() telegraf.Input { + gcs := &GCS{} + return gcs + }) +} + +func (gcs *GCS) closeReader(r *storage.Reader) { + if err := r.Close(); err != nil { + gcs.Log.Errorf("Could not close reader", err) + } +} diff --git a/plugins/inputs/google_cloud_storage/google_cloud_storage_test.go b/plugins/inputs/google_cloud_storage/google_cloud_storage_test.go new file mode 100644 index 000000000..75536fc2a --- /dev/null +++ b/plugins/inputs/google_cloud_storage/google_cloud_storage_test.go @@ -0,0 +1,415 @@ +package gcs + +import ( + "encoding/json" + "fmt" + "io" + "mime" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + + "github.com/influxdata/telegraf/plugins/parsers" + _ "github.com/influxdata/telegraf/plugins/parsers/all" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" + "gotest.tools/assert" +) + +const ( + singleObjectNotFound = "{\"error\":{\"code\":404,\"message\":\"No such object: test-iteration-bucket/prefix/offset-key.json\",\"errors\":[{\"message\":\"No such object: test-iteration-bucket/prefix/offset-key.json\",\"domain\":\"global\",\"reason\":\"notFound\"}]}}" + singleFileList = "{\"kind\":\"storage#objects\",\"items\":[{\"kind\":\"storage#object\",\"id\":\"test-iteration-bucket/1604148850990/1604148851295698\",\"selfLink\":\"https://www.googleapis.com/storage/v1/b/1604148850990/o/1604148850990\",\"mediaLink\":\"https://content-storage.googleapis.com/download/storage/v1/b/test-iteration-bucket/o/1604148850990?generation=1604148851295698&alt=media\",\"name\":\"1604148850990\",\"bucket\":\"test-iteration-bucket\",\"generation\":\"1604148851295698\",\"metageneration\":\"1\",\"contentType\":\"text/plain; charset=utf-8\",\"storageClass\":\"STANDARD\",\"size\":\"161\",\"md5Hash\":\"y59iuRCTpkm7wpvU5YHUYw==\",\"crc32c\":\"y57reA==\",\"etag\":\"CNKLy5Pw3uwCEAE=\",\"timeCreated\":\"2020-10-31T12:54:11.295Z\",\"updated\":\"2020-10-31T12:54:11.295Z\",\"timeStorageClassUpdated\":\"2020-10-31T12:54:11.295Z\"}]}" + firstFile = "{\"metrics\":[{\"fields\":{\"cosine\":10,\"sine\":-1.0975806427415925e-12},\"name\":\"cpu\",\"tags\":{\"datacenter\":\"us-east-1\",\"host\":\"localhost\"},\"timestamp\":1604148850991}]}" + secondFile = "{\"metrics\":[{\"fields\":{\"cosine\":11,\"sine\":-2.0975806427415925e-12},\"name\":\"cpu\",\"tags\":{\"datacenter\":\"us-east-1\",\"host\":\"localhost\"},\"timestamp\":1604148850992}]}" + thirdFile = "{\"metrics\":[{\"fields\":{\"cosine\":12,\"sine\":-3.0975806427415925e-12},\"name\":\"cpu\",\"tags\":{\"datacenter\":\"us-east-1\",\"host\":\"localhost\"},\"timestamp\":1604148850993}]}" + fourthFile = "{\"metrics\":[{\"fields\":{\"cosine\":13,\"sine\":-4.0975806427415925e-12},\"name\":\"cpu\",\"tags\":{\"datacenter\":\"us-east-1\",\"host\":\"localhost\"},\"timestamp\":1604148850994}]}" + firstFileListing = "{\"kind\":\"storage#object\",\"id\":\"test-iteration-bucket/prefix/1604148850991/1604148851353983\",\"selfLink\":\"https://www.googleapis.com/storage/v1/b/test-iteration-bucket/o/1604148850991\",\"mediaLink\":\"https://content-storage.googleapis.com/download/storage/v1/b/test-iteration-bucket/o/1604148850991?generation=1604148851353983&alt=media\",\"name\":\"prefix/1604148850991\",\"bucket\":\"test-iteration-bucket\",\"generation\":\"1604148851353983\",\"metageneration\":\"1\",\"contentType\":\"text/plain; charset=utf-8\",\"storageClass\":\"STANDARD\",\"size\":\"161\",\"md5Hash\":\"y59iuRCTpkm7wpvU5YHUYw==\",\"crc32c\":\"y57reA==\",\"etag\":\"CP/SzpPw3uwCEAE=\",\"timeCreated\":\"2020-10-31T12:54:11.353Z\",\"updated\":\"2020-10-31T12:54:11.353Z\",\"timeStorageClassUpdated\":\"2020-10-31T12:54:11.353Z\"}" + secondFileListing = "{\"kind\":\"storage#object\",\"id\":\"test-iteration-bucket/prefix/1604148850992/1604148851414237\",\"selfLink\":\"https://www.googleapis.com/storage/v1/b/test-iteration-bucket/o/1604148850992\",\"mediaLink\":\"https://content-storage.googleapis.com/download/storage/v1/b/test-iteration-bucket/o/1604148850992?generation=1604148851414237&alt=media\",\"name\":\"prefix/1604148850992\",\"bucket\":\"test-iteration-bucket\",\"generation\":\"1604148851414237\",\"metageneration\":\"1\",\"contentType\":\"text/plain; charset=utf-8\",\"storageClass\":\"STANDARD\",\"size\":\"161\",\"md5Hash\":\"y59iuRCTpkm7wpvU5YHUYw==\",\"crc32c\":\"y57reA==\",\"etag\":\"CN2p0pPw3uwCEAE=\",\"timeCreated\":\"2020-10-31T12:54:11.414Z\",\"updated\":\"2020-10-31T12:54:11.414Z\",\"timeStorageClassUpdated\":\"2020-10-31T12:54:11.414Z\"}" + thirdFileListing = "{\"kind\":\"storage#object\",\"id\":\"test-iteration-bucket/prefix/1604148850993/1604148851467554\",\"selfLink\":\"https://www.googleapis.com/storage/v1/b/test-iteration-bucket/o/1604148850993\",\"mediaLink\":\"https://content-storage.googleapis.com/download/storage/v1/b/test-iteration-bucket/o/1604148850993?generation=1604148851467554&alt=media\",\"name\":\"prefix/1604148850993\",\"bucket\":\"test-iteration-bucket\",\"generation\":\"1604148851467554\",\"metageneration\":\"1\",\"contentType\":\"text/plain; charset=utf-8\",\"storageClass\":\"STANDARD\",\"size\":\"161\",\"md5Hash\":\"y59iuRCTpkm7wpvU5YHUYw==\",\"crc32c\":\"y57reA==\",\"etag\":\"CKLK1ZPw3uwCEAE=\",\"timeCreated\":\"2020-10-31T12:54:11.467Z\",\"updated\":\"2020-10-31T12:54:11.467Z\",\"timeStorageClassUpdated\":\"2020-10-31T12:54:11.467Z\"}" + fourthFileListing = "{\"kind\":\"storage#object\",\"id\":\"test-iteration-bucket/prefix/1604148850994/1604148851467554\",\"selfLink\":\"https://www.googleapis.com/storage/v1/b/test-iteration-bucket/o/1604148850994\",\"mediaLink\":\"https://content-storage.googleapis.com/download/storage/v1/b/test-iteration-bucket/o/1604148850994?generation=1604148851467554&alt=media\",\"name\":\"prefix/1604148850994\",\"bucket\":\"test-iteration-bucket\",\"generation\":\"1604148851467554\",\"metageneration\":\"1\",\"contentType\":\"text/plain; charset=utf-8\",\"storageClass\":\"STANDARD\",\"size\":\"161\",\"md5Hash\":\"y59iuRCTpkm7wpvU5YHUYw==\",\"crc32c\":\"y57reA==\",\"etag\":\"CKLK1ZPw3uwCEAE=\",\"timeCreated\":\"2020-10-31T12:54:11.467Z\",\"updated\":\"2020-10-31T12:54:11.467Z\",\"timeStorageClassUpdated\":\"2020-10-31T12:54:11.467Z\"}" + fileListing = "{\"kind\":\"storage#objects\"}" + offSetTemplate = "{\"offSet\":\"%s\"}" +) + +var objListing = parseJSONFromText(fileListing) +var firstElement = parseJSONFromText(firstFileListing) +var secondElement = parseJSONFromText(secondFileListing) +var thirdElement = parseJSONFromText(thirdFileListing) +var fourthElement = parseJSONFromText(fourthFileListing) + +func TestRunSetUpClient(t *testing.T) { + gcs := &GCS{ + Bucket: "test-bucket", + Prefix: "prefix", + OffsetKey: "1230405", + Log: testutil.Logger{}, + } + + if err := gcs.setUpClient(); err != nil { + t.Log(err) + } +} + +func TestRunInit(t *testing.T) { + srv := startGCSServer(t) + defer srv.Close() + + emulatorSetEnv(t, srv) + + gcs := &GCS{ + Bucket: "test-bucket", + Prefix: "prefix/", + OffsetKey: "offset.json", + Log: testutil.Logger{}, + } + + require.NoError(t, gcs.Init()) + + assert.Equal(t, "offsetfile", gcs.offSet.OffSet) +} + +func TestRunInitNoOffsetKey(t *testing.T) { + srv := startGCSServer(t) + defer srv.Close() + + emulatorSetEnv(t, srv) + + gcs := &GCS{ + Bucket: "test-bucket", + Prefix: "prefix/", + Log: testutil.Logger{}, + } + + require.NoError(t, gcs.Init()) + + assert.Equal(t, "offsetfile", gcs.offSet.OffSet) + assert.Equal(t, "prefix/offset-key.json", gcs.OffsetKey) +} + +func TestRunGatherOneItem(t *testing.T) { + srv := startOneItemGCSServer(t) + defer srv.Close() + + emulatorSetEnv(t, srv) + + acc := &testutil.Accumulator{} + + gcs := &GCS{ + Bucket: "test-iteration-bucket", + Prefix: "prefix/", + Log: testutil.Logger{}, + parser: createParser(), + } + + require.NoError(t, gcs.Init()) + + require.NoError(t, gcs.Gather(acc)) + + metric := acc.Metrics[0] + assert.Equal(t, "cpu", metric.Measurement) + assert.Equal(t, "us-east-1", metric.Tags["tags_datacenter"]) + assert.Equal(t, "localhost", metric.Tags["tags_host"]) + assert.Equal(t, 10.0, metric.Fields["fields_cosine"]) + assert.Equal(t, -1.0975806427415925e-12, metric.Fields["fields_sine"]) +} + +func TestRunGatherOneIteration(t *testing.T) { + srv := startMultipleItemGCSServer(t) + defer srv.Close() + + emulatorSetEnv(t, srv) + + gcs := &GCS{ + Bucket: "test-iteration-bucket", + Prefix: "prefix/", + OffsetKey: "custom-offset-key.json", + Log: testutil.Logger{}, + parser: createParser(), + } + + acc := &testutil.Accumulator{} + + require.NoError(t, gcs.Init()) + + require.NoError(t, gcs.Gather(acc)) + + assert.Equal(t, 3, len(acc.Metrics)) +} + +func TestRunGatherIteratiosnWithLimit(t *testing.T) { + srv := startMultipleItemGCSServer(t) + defer srv.Close() + + emulatorSetEnv(t, srv) + + gcs := &GCS{ + Bucket: "test-iteration-bucket", + Prefix: "prefix/", + ObjectsPerIteration: 1, + OffsetKey: "custom-offset-key.json", + Log: testutil.Logger{}, + parser: createParser(), + } + + acc := &testutil.Accumulator{} + + require.NoError(t, gcs.Init()) + + require.NoError(t, gcs.Gather(acc)) + + assert.Equal(t, 1, len(acc.Metrics)) + require.NoError(t, gcs.Gather(acc)) + + assert.Equal(t, 2, len(acc.Metrics)) + require.NoError(t, gcs.Gather(acc)) + + assert.Equal(t, 3, len(acc.Metrics)) +} + +func TestRunGatherIterationWithPages(t *testing.T) { + srv := stateFulGCSServer(t) + defer srv.Close() + + emulatorSetEnv(t, srv) + + gcs := &GCS{ + Bucket: "test-iteration-bucket", + Prefix: "prefix/", + OffsetKey: "custom-offset-key.json", + Log: testutil.Logger{}, + parser: createParser(), + } + + acc := &testutil.Accumulator{} + + require.NoError(t, gcs.Init()) + + require.NoError(t, gcs.Gather(acc)) + + assert.Equal(t, 4, len(acc.Metrics)) + assert.Equal(t, true, gcs.offSet.isPresent()) + assert.Equal(t, "prefix/1604148850994", gcs.offSet.OffSet) + + emptyAcc := &testutil.Accumulator{} + require.NoError(t, gcs.Gather(emptyAcc)) + + assert.Equal(t, 0, len(emptyAcc.Metrics)) +} + +func createParser() parsers.Parser { + testParser, _ := parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "cpu", + JSONQuery: "metrics", + TagKeys: []string{"tags_datacenter", "tags_host"}, + JSONTimeKey: "timestamp", + JSONTimeFormat: "unix_ms", + }) + + return testParser +} + +func startGCSServer(t *testing.T) *httptest.Server { + srv := httptest.NewServer(http.NotFoundHandler()) + + currentOffSetKey := fmt.Sprintf(offSetTemplate, "offsetfile") + + srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/test-bucket/prefix/offset.json": + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(currentOffSetKey)) + require.NoError(t, err) + case "/test-bucket/prefix/offset-key.json": + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("{\"offSet\":\"offsetfile\"}")) + require.NoError(t, err) + default: + failPath(r.URL.Path, t, w) + } + }) + + return srv +} + +func startOneItemGCSServer(t *testing.T) *httptest.Server { + srv := httptest.NewServer(http.NotFoundHandler()) + + srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/b/test-iteration-bucket/o": + serveJSONText(w, singleFileList) + default: + serveBlobs(r.URL.Path, "", t, w) + } + }) + + return srv +} + +func startMultipleItemGCSServer(t *testing.T) *httptest.Server { + srv := httptest.NewServer(http.NotFoundHandler()) + + currentOffSetKey := fmt.Sprintf(offSetTemplate, "prefix/1604148850991") + + srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/b/test-iteration-bucket/o": + + offset := r.URL.Query().Get("startOffset") + + if offset == "prefix/1604148850990" { + objListing["items"] = []interface{}{firstElement, secondElement, thirdElement, fourthElement} + } else if offset == "prefix/1604148850991" { + objListing["items"] = []interface{}{secondElement, thirdElement, fourthElement} + } else if offset == "prefix/16041488509912" { + objListing["items"] = []interface{}{thirdElement, fourthElement} + } else if offset == "prefix/16041488509913" { + objListing["items"] = []interface{}{thirdElement, fourthElement} + } else { + objListing["items"] = []interface{}{firstElement, secondElement, thirdElement, fourthElement} + } + + if data, err := json.Marshal(objListing); err == nil { + w.WriteHeader(http.StatusOK) + _, err := w.Write(data) + require.NoError(t, err) + } else { + w.WriteHeader(http.StatusNotFound) + t.Fatalf("unexpected path: " + r.URL.Path) + } + + default: + serveBlobs(r.URL.Path, currentOffSetKey, t, w) + } + }) + + return srv +} + +func stateFulGCSServer(t *testing.T) *httptest.Server { + srv := httptest.NewServer(http.NotFoundHandler()) + + currentOffSetKey := fmt.Sprintf(offSetTemplate, "prefix/1604148850990") + + srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/b/test-iteration-bucket/o": + offset := r.URL.Query().Get("startOffset") + objListing := parseJSONFromText(fileListing) + + pageToken := r.URL.Query().Get("pageToken") + + if pageToken == "page2" { + objListing["items"] = []interface{}{secondElement} + objListing["nextPageToken"] = "page3" + } else if pageToken == "page3" { + objListing["items"] = []interface{}{thirdElement} + objListing["nextPageToken"] = "page4" + } else if pageToken == "page4" { + objListing["items"] = []interface{}{fourthElement} + } else if offset == "prefix/1604148850994" { + objListing["items"] = []interface{}{} + } else { + objListing["items"] = []interface{}{firstElement} + objListing["nextPageToken"] = "page2" + } + + if data, err := json.Marshal(objListing); err == nil { + w.WriteHeader(http.StatusOK) + _, err := w.Write(data) + require.NoError(t, err) + } else { + failPath(r.URL.Path, t, w) + } + case "/upload/storage/v1/b/test-iteration-bucket/o": + _, params, _ := mime.ParseMediaType(r.Header["Content-Type"][0]) + boundary := params["boundary"] + currentOffSetKey, _ = fetchJSON(t, boundary, r.Body) + default: + serveBlobs(r.URL.Path, currentOffSetKey, t, w) + } + }) + + return srv +} + +func serveBlobs(urlPath string, offsetKey string, t *testing.T, w http.ResponseWriter) { + switch urlPath { + case "/test-iteration-bucket/prefix/offset-key.json": + w.WriteHeader(http.StatusNotFound) + _, err := w.Write([]byte(singleObjectNotFound)) + require.NoError(t, err) + case "/test-bucket/prefix/offset.json": + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(offsetKey)) + require.NoError(t, err) + case "/test-bucket/prefix/offset-key.json": + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("{\"offSet\":\"offsetfile\"}")) + require.NoError(t, err) + case "/test-iteration-bucket/prefix/custom-offset-key.json": + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(offsetKey)) + require.NoError(t, err) + case "/test-iteration-bucket/1604148850990": + serveJSONText(w, firstFile) + case "/test-iteration-bucket/prefix/1604148850991": + serveJSONText(w, firstFile) + case "/test-iteration-bucket/prefix/1604148850992": + serveJSONText(w, secondFile) + case "/test-iteration-bucket/prefix/1604148850993": + serveJSONText(w, thirdFile) + case "/test-iteration-bucket/prefix/1604148850994": + serveJSONText(w, fourthFile) + case "/upload/storage/v1/b/test-iteration-bucket/o": + w.WriteHeader(http.StatusOK) + default: + failPath(urlPath, t, w) + } +} + +func fetchJSON(t *testing.T, boundary string, rc io.ReadCloser) (string, error) { + defer rc.Close() + bodyBytes, err := io.ReadAll(rc) + + if err != nil { + t.Fatalf("Could not read bytes from offset action") + return "", err + } + + splits := strings.Split(string(bodyBytes), boundary) + offsetPart := splits[2] + offsets := strings.Split(offsetPart, "\n") + fmt.Printf("%s", offsets[3]) + return offsets[3], nil +} + +func serveJSONText(w http.ResponseWriter, jsonText string) { + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte(jsonText)); err != nil { + fmt.Println(err) + } +} + +func failPath(path string, t *testing.T, w http.ResponseWriter) { + w.WriteHeader(http.StatusNotFound) + t.Fatalf("unexpected path: " + path) +} + +func parseJSONFromText(jsonText string) map[string]interface{} { + var element map[string]interface{} + if err := json.Unmarshal([]byte(jsonText), &element); err != nil { + fmt.Println(err) + } + + return element +} + +func emulatorSetEnv(t *testing.T, srv *httptest.Server) { + if err := os.Setenv("STORAGE_EMULATOR_HOST", strings.ReplaceAll(srv.URL, "http://", "")); err != nil { + t.Error(err) + } +} diff --git a/plugins/inputs/google_cloud_storage/sample.conf b/plugins/inputs/google_cloud_storage/sample.conf new file mode 100644 index 000000000..8e6f7c705 --- /dev/null +++ b/plugins/inputs/google_cloud_storage/sample.conf @@ -0,0 +1,24 @@ +# Gather metrics by iterating the files located on a Cloud Storage Bucket. +[[inputs.google_cloud_storage]] + ## Required. Name of Cloud Storage bucket to ingest metrics from. + bucket = "my-bucket" + + ## Optional. Prefix of Cloud Storage bucket keys to list metrics from. + # key_prefix = "my-bucket" + + ## Key that will store the offsets in order to pick up where the ingestion was left. + offset_key = "offset_key" + + ## Key that will store the offsets in order to pick up where the ingestion was left. + objects_per_iteration = 10 + + ## Required. Data format to consume. + ## Each data format has its own unique set of configuration options. + ## Read more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "influx" + + ## Optional. Filepath for GCP credentials JSON file to authorize calls to + ## Google Cloud Storage APIs. If not set explicitly, Telegraf will attempt to use + ## Application Default Credentials, which is preferred. + # credentials_file = "path/to/my/creds.json"