This commit is contained in:
i-prudnikov 2021-04-29 19:06:36 +03:00 committed by GitHub
parent c0d5af1602
commit e3ae7caaf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 310 additions and 262 deletions

View File

@ -37,6 +37,25 @@ In the following order the plugin will attempt to authenticate.
# public_key_id = "" # public_key_id = ""
# role_name = "" # role_name = ""
## Specify the ali cloud region list to be queried for metrics and objects discovery
## If not set, all supported regions (see below) would be covered, it can provide a significant load on API, so the recommendation here
## is to limit the list as much as possible. Allowed values: https://www.alibabacloud.com/help/zh/doc-detail/40654.htm
## Default supported regions are:
## 21 items: cn-qingdao,cn-beijing,cn-zhangjiakou,cn-huhehaote,cn-hangzhou,cn-shanghai,cn-shenzhen,
## cn-heyuan,cn-chengdu,cn-hongkong,ap-southeast-1,ap-southeast-2,ap-southeast-3,ap-southeast-5,
## ap-south-1,ap-northeast-1,us-west-1,us-east-1,eu-central-1,eu-west-1,me-east-1
##
## From discovery perspective it set the scope for object discovery, the discovered info can be used to enrich
## the metrics with objects attributes/tags. Discovery is supported not for all projects (if not supported, then
## it will be reported on the start - for example for 'acs_cdn' project:
## 'E! [inputs.aliyuncms] Discovery tool is not activated: no discovery support for project "acs_cdn"' )
## Currently, discovery supported for the following projects:
## - acs_ecs_dashboard
## - acs_rds_dashboard
## - acs_slb_dashboard
## - acs_vpc_eip
regions = ["cn-hongkong"]
# The minimum period for AliyunCMS metrics is 1 minute (60s). However not all # The minimum period for AliyunCMS metrics is 1 minute (60s). However not all
# metrics are made available to the 1 minute period. Some are collected at # metrics are made available to the 1 minute period. Some are collected at
# 3 minute, 5 minute, or larger intervals. # 3 minute, 5 minute, or larger intervals.
@ -61,21 +80,7 @@ In the following order the plugin will attempt to authenticate.
## Maximum requests per second, default value is 200 ## Maximum requests per second, default value is 200
ratelimit = 200 ratelimit = 200
## Discovery regions set the scope for object discovery, the discovered info can be used to enrich ## How often the discovery API call executed (default 1m)
## the metrics with objects attributes/tags. Discovery is supported not for all projects (if not supported, then
## it will be reported on the start - foo example for 'acs_cdn' project:
## 'E! [inputs.aliyuncms] Discovery tool is not activated: no discovery support for project "acs_cdn"' )
## Currently, discovery supported for the following projects:
## - acs_ecs_dashboard
## - acs_rds_dashboard
## - acs_slb_dashboard
## - acs_vpc_eip
##
## If not set, all regions would be covered, it can provide a significant load on API, so the recommendation here
## is to limit the list as much as possible. Allowed values: https://www.alibabacloud.com/help/zh/doc-detail/40654.htm
discovery_regions = ["cn-hongkong"]
## how often the discovery API call executed (default 1m)
#discovery_interval = "1m" #discovery_interval = "1m"
## Metrics to Pull (Required) ## Metrics to Pull (Required)

View File

@ -11,112 +11,116 @@ import (
"github.com/aliyun/alibaba-cloud-sdk-go/sdk" "github.com/aliyun/alibaba-cloud-sdk-go/sdk"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials/providers" "github.com/aliyun/alibaba-cloud-sdk-go/sdk/auth/credentials/providers"
"github.com/aliyun/alibaba-cloud-sdk-go/services/cms" "github.com/aliyun/alibaba-cloud-sdk-go/services/cms"
"github.com/jmespath/go-jmespath"
"github.com/pkg/errors"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/internal/limiter"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
"github.com/jmespath/go-jmespath"
"github.com/pkg/errors"
) )
const ( const (
description = "Pull Metric Statistics from Aliyun CMS" description = "Pull Metric Statistics from Aliyun CMS"
sampleConfig = ` sampleConfig = `
## Aliyun Credentials ## Aliyun Credentials
## Credentials are loaded in the following order ## Credentials are loaded in the following order
## 1) Ram RoleArn credential ## 1) Ram RoleArn credential
## 2) AccessKey STS token credential ## 2) AccessKey STS token credential
## 3) AccessKey credential ## 3) AccessKey credential
## 4) Ecs Ram Role credential ## 4) Ecs Ram Role credential
## 5) RSA keypair credential ## 5) RSA keypair credential
## 6) Environment variables credential ## 6) Environment variables credential
## 7) Instance metadata credential ## 7) Instance metadata credential
# access_key_id = "" # access_key_id = ""
# access_key_secret = "" # access_key_secret = ""
# access_key_sts_token = "" # access_key_sts_token = ""
# role_arn = "" # role_arn = ""
# role_session_name = "" # role_session_name = ""
# private_key = "" # private_key = ""
# public_key_id = "" # public_key_id = ""
# role_name = "" # role_name = ""
# The minimum period for AliyunCMS metrics is 1 minute (60s). However not all ## Specify the ali cloud region list to be queried for metrics and objects discovery
# metrics are made available to the 1 minute period. Some are collected at ## If not set, all supported regions (see below) would be covered, it can provide a significant load on API, so the recommendation here
# 3 minute, 5 minute, or larger intervals. ## is to limit the list as much as possible. Allowed values: https://www.alibabacloud.com/help/zh/doc-detail/40654.htm
# See: https://help.aliyun.com/document_detail/51936.html?spm=a2c4g.11186623.2.18.2bc1750eeOw1Pv ## Default supported regions are:
# Note that if a period is configured that is smaller than the minimum for a ## 21 items: cn-qingdao,cn-beijing,cn-zhangjiakou,cn-huhehaote,cn-hangzhou,cn-shanghai,cn-shenzhen,
# particular metric, that metric will not be returned by the Aliyun OpenAPI ## cn-heyuan,cn-chengdu,cn-hongkong,ap-southeast-1,ap-southeast-2,ap-southeast-3,ap-southeast-5,
# and will not be collected by Telegraf. ## ap-south-1,ap-northeast-1,us-west-1,us-east-1,eu-central-1,eu-west-1,me-east-1
# ##
## Requested AliyunCMS aggregation Period (required - must be a multiple of 60s) ## From discovery perspective it set the scope for object discovery, the discovered info can be used to enrich
period = "5m" ## the metrics with objects attributes/tags. Discovery is supported not for all projects (if not supported, then
## it will be reported on the start - for example for 'acs_cdn' project:
## 'E! [inputs.aliyuncms] Discovery tool is not activated: no discovery support for project "acs_cdn"' )
## Currently, discovery supported for the following projects:
## - acs_ecs_dashboard
## - acs_rds_dashboard
## - acs_slb_dashboard
## - acs_vpc_eip
regions = ["cn-hongkong"]
## Collection Delay (required - must account for metrics availability via AliyunCMS API) # The minimum period for AliyunCMS metrics is 1 minute (60s). However not all
delay = "1m" # metrics are made available to the 1 minute period. Some are collected at
# 3 minute, 5 minute, or larger intervals.
# See: https://help.aliyun.com/document_detail/51936.html?spm=a2c4g.11186623.2.18.2bc1750eeOw1Pv
# Note that if a period is configured that is smaller than the minimum for a
# particular metric, that metric will not be returned by the Aliyun OpenAPI
# and will not be collected by Telegraf.
#
## Requested AliyunCMS aggregation Period (required - must be a multiple of 60s)
period = "5m"
## Recommended: use metric 'interval' that is a multiple of 'period' to avoid ## Collection Delay (required - must account for metrics availability via AliyunCMS API)
## gaps or overlap in pulled data delay = "1m"
interval = "5m"
## Metric Statistic Project (required) ## Recommended: use metric 'interval' that is a multiple of 'period' to avoid
project = "acs_slb_dashboard" ## gaps or overlap in pulled data
interval = "5m"
## Maximum requests per second, default value is 200 ## Metric Statistic Project (required)
ratelimit = 200 project = "acs_slb_dashboard"
## Discovery regions set the scope for object discovery, the discovered info can be used to enrich ## Maximum requests per second, default value is 200
## the metrics with objects attributes/tags. Discovery is supported not for all projects (if not supported, then ratelimit = 200
## it will be reported on the start - foo example for 'acs_cdn' project:
## 'E! [inputs.aliyuncms] Discovery tool is not activated: no discovery support for project "acs_cdn"' )
## Currently, discovery supported for the following projects:
## - acs_ecs_dashboard
## - acs_rds_dashboard
## - acs_slb_dashboard
## - acs_vpc_eip
##
## If not set, all regions would be covered, it can provide a significant load on API, so the recommendation here
## is to limit the list as much as possible. Allowed values: https://www.alibabacloud.com/help/zh/doc-detail/40654.htm
discovery_regions = ["cn-hongkong"]
## how often the discovery API call executed (default 1m) ## How often the discovery API call executed (default 1m)
#discovery_interval = "1m" #discovery_interval = "1m"
## Metrics to Pull (Required) ## Metrics to Pull (Required)
[[inputs.aliyuncms.metrics]] [[inputs.aliyuncms.metrics]]
## Metrics names to be requested, ## Metrics names to be requested,
## described here (per project): https://help.aliyun.com/document_detail/28619.html?spm=a2c4g.11186623.6.690.1938ad41wg8QSq ## described here (per project): https://help.aliyun.com/document_detail/28619.html?spm=a2c4g.11186623.6.690.1938ad41wg8QSq
names = ["InstanceActiveConnection", "InstanceNewConnection"] names = ["InstanceActiveConnection", "InstanceNewConnection"]
## Dimension filters for Metric (these are optional). ## Dimension filters for Metric (these are optional).
## This allows to get additional metric dimension. If dimension is not specified it can be returned or ## This allows to get additional metric dimension. If dimension is not specified it can be returned or
## the data can be aggregated - it depends on particular metric, you can find details here: https://help.aliyun.com/document_detail/28619.html?spm=a2c4g.11186623.6.690.1938ad41wg8QSq ## the data can be aggregated - it depends on particular metric, you can find details here: https://help.aliyun.com/document_detail/28619.html?spm=a2c4g.11186623.6.690.1938ad41wg8QSq
## ##
## Note, that by default dimension filter includes the list of discovered objects in scope (if discovery is enabled) ## Note, that by default dimension filter includes the list of discovered objects in scope (if discovery is enabled)
## Values specified here would be added into the list of discovered objects. ## Values specified here would be added into the list of discovered objects.
## You can specify either single dimension: ## You can specify either single dimension:
#dimensions = '{"instanceId": "p-example"}' #dimensions = '{"instanceId": "p-example"}'
## Or you can specify several dimensions at once: ## Or you can specify several dimensions at once:
#dimensions = '[{"instanceId": "p-example"},{"instanceId": "q-example"}]' #dimensions = '[{"instanceId": "p-example"},{"instanceId": "q-example"}]'
## Enrichment tags, can be added from discovery (if supported) ## Enrichment tags, can be added from discovery (if supported)
## Notation is <measurement_tag_name>:<JMES query path (https://jmespath.org/tutorial.html)> ## Notation is <measurement_tag_name>:<JMES query path (https://jmespath.org/tutorial.html)>
## To figure out which fields are available, consult the Describe<ObjectType> API per project. ## To figure out which fields are available, consult the Describe<ObjectType> API per project.
## For example, for SLB: https://api.aliyun.com/#/?product=Slb&version=2014-05-15&api=DescribeLoadBalancers&params={}&tab=MOCK&lang=GO ## For example, for SLB: https://api.aliyun.com/#/?product=Slb&version=2014-05-15&api=DescribeLoadBalancers&params={}&tab=MOCK&lang=GO
#tag_query_path = [ #tag_query_path = [
# "address:Address", # "address:Address",
# "name:LoadBalancerName", # "name:LoadBalancerName",
# "cluster_owner:Tags.Tag[?TagKey=='cs.cluster.name'].TagValue | [0]" # "cluster_owner:Tags.Tag[?TagKey=='cs.cluster.name'].TagValue | [0]"
# ] # ]
## The following tags added by default: regionId (if discovery enabled), userId, instanceId. ## The following tags added by default: regionId (if discovery enabled), userId, instanceId.
## Allow metrics without discovery data, if discovery is enabled. If set to true, then metric without discovery ## Allow metrics without discovery data, if discovery is enabled. If set to true, then metric without discovery
## data would be emitted, otherwise dropped. This cane be of help, in case debugging dimension filters, or partial coverage ## data would be emitted, otherwise dropped. This cane be of help, in case debugging dimension filters, or partial coverage
## of discovery scope vs monitoring scope ## of discovery scope vs monitoring scope
#allow_dps_without_discovery = false #allow_dps_without_discovery = false
` `
) )
@ -132,7 +136,7 @@ type (
PublicKeyID string `toml:"public_key_id"` PublicKeyID string `toml:"public_key_id"`
RoleName string `toml:"role_name"` RoleName string `toml:"role_name"`
DiscoveryRegions []string `toml:"discovery_regions"` Regions []string `toml:"regions"`
DiscoveryInterval config.Duration `toml:"discovery_interval"` DiscoveryInterval config.Duration `toml:"discovery_interval"`
Period config.Duration `toml:"period"` Period config.Duration `toml:"period"`
Delay config.Duration `toml:"delay"` Delay config.Duration `toml:"delay"`
@ -162,7 +166,7 @@ type (
dtLock sync.Mutex //Guard for discoveryTags & dimensions dtLock sync.Mutex //Guard for discoveryTags & dimensions
discoveryTags map[string]map[string]string //Internal data structure that can enrich metrics with tags discoveryTags map[string]map[string]string //Internal data structure that can enrich metrics with tags
dimensionsUdObj map[string]string dimensionsUdObj map[string]string
dimensionsUdArr []map[string]string //Parsed Dimensions JSON string (unmarshalled) dimensionsUdArr []map[string]string //Parsed Dimesnsions JSON string (unmarshalled)
requestDimensions []map[string]string //this is the actual dimensions list that would be used in API request requestDimensions []map[string]string //this is the actual dimensions list that would be used in API request
requestDimensionsStr string //String representation of the above requestDimensionsStr string //String representation of the above
@ -178,6 +182,31 @@ type (
} }
) )
// https://www.alibabacloud.com/help/doc-detail/40654.htm?gclid=Cj0KCQjw4dr0BRCxARIsAKUNjWTAMfyVUn_Y3OevFBV3CMaazrhq0URHsgE7c0m0SeMQRKlhlsJGgIEaAviyEALw_wcB
var aliyunRegionList = []string{
"cn-qingdao",
"cn-beijing",
"cn-zhangjiakou",
"cn-huhehaote",
"cn-hangzhou",
"cn-shanghai",
"cn-shenzhen",
"cn-heyuan",
"cn-chengdu",
"cn-hongkong",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-5",
"ap-south-1",
"ap-northeast-1",
"us-west-1",
"us-east-1",
"eu-central-1",
"eu-west-1",
"me-east-1",
}
// SampleConfig implements telegraf.Inputs interface // SampleConfig implements telegraf.Inputs interface
func (s *AliyunCMS) SampleConfig() string { func (s *AliyunCMS) SampleConfig() string {
return sampleConfig return sampleConfig
@ -188,6 +217,7 @@ func (s *AliyunCMS) Description() string {
return description return description
} }
// Init perform checks of plugin inputs and initialize internals
func (s *AliyunCMS) Init() error { func (s *AliyunCMS) Init() error {
if s.Project == "" { if s.Project == "" {
return errors.New("project is not set") return errors.New("project is not set")
@ -238,9 +268,16 @@ func (s *AliyunCMS) Init() error {
s.measurement = formatMeasurement(s.Project) s.measurement = formatMeasurement(s.Project)
//Check regions
if len(s.Regions) == 0 {
s.Regions = aliyunRegionList
s.Log.Infof("'regions' is not set. Metrics will be queried across %d regions:\n%s",
len(s.Regions), strings.Join(s.Regions, ","))
}
//Init discovery... //Init discovery...
if s.dt == nil { //Support for tests if s.dt == nil { //Support for tests
s.dt, err = newDiscoveryTool(s.DiscoveryRegions, s.Project, s.Log, credential, int(float32(s.RateLimit)*0.2), time.Duration(s.DiscoveryInterval)) s.dt, err = newDiscoveryTool(s.Regions, s.Project, s.Log, credential, int(float32(s.RateLimit)*0.2), time.Duration(s.DiscoveryInterval))
if err != nil { if err != nil {
s.Log.Errorf("Discovery tool is not activated: %v", err) s.Log.Errorf("Discovery tool is not activated: %v", err)
s.dt = nil s.dt = nil
@ -248,7 +285,7 @@ func (s *AliyunCMS) Init() error {
} }
} }
s.discoveryData, err = s.dt.getDiscoveryDataAllRegions(nil) s.discoveryData, err = s.dt.getDiscoveryDataAcrossRegions(nil)
if err != nil { if err != nil {
s.Log.Errorf("Discovery tool is not activated: %v", err) s.Log.Errorf("Discovery tool is not activated: %v", err)
s.dt = nil s.dt = nil
@ -265,10 +302,11 @@ func (s *AliyunCMS) Init() error {
return nil return nil
} }
// Start plugin discovery loop, metrics are gathered through Gather
func (s *AliyunCMS) Start(telegraf.Accumulator) error { func (s *AliyunCMS) Start(telegraf.Accumulator) error {
//Start periodic discovery process //Start periodic discovery process
if s.dt != nil { if s.dt != nil {
s.dt.Start() s.dt.start()
} }
return nil return nil
@ -300,9 +338,10 @@ func (s *AliyunCMS) Gather(acc telegraf.Accumulator) error {
return nil return nil
} }
// Stop - stops the plugin discovery loop
func (s *AliyunCMS) Stop() { func (s *AliyunCMS) Stop() {
if s.dt != nil { if s.dt != nil {
s.dt.Stop() s.dt.stop()
} }
} }
@ -327,78 +366,85 @@ func (s *AliyunCMS) updateWindow(relativeTo time.Time) {
// Gather given metric and emit error // Gather given metric and emit error
func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, metric *Metric) error { func (s *AliyunCMS) gatherMetric(acc telegraf.Accumulator, metricName string, metric *Metric) error {
req := cms.CreateDescribeMetricListRequest() for _, region := range s.Regions {
req.Period = strconv.FormatInt(int64(time.Duration(s.Period).Seconds()), 10) req := cms.CreateDescribeMetricListRequest()
req.MetricName = metricName req.Period = strconv.FormatInt(int64(time.Duration(s.Period).Seconds()), 10)
req.Length = "10000" req.MetricName = metricName
req.Namespace = s.Project req.Length = "10000"
req.EndTime = strconv.FormatInt(s.windowEnd.Unix()*1000, 10) req.Namespace = s.Project
req.StartTime = strconv.FormatInt(s.windowStart.Unix()*1000, 10) req.EndTime = strconv.FormatInt(s.windowEnd.Unix()*1000, 10)
req.Dimensions = metric.requestDimensionsStr req.StartTime = strconv.FormatInt(s.windowStart.Unix()*1000, 10)
req.Dimensions = metric.requestDimensionsStr
req.RegionId = region
for more := true; more; { for more := true; more; {
resp, err := s.client.DescribeMetricList(req) resp, err := s.client.DescribeMetricList(req)
if err != nil { if err != nil {
return errors.Errorf("failed to query metricName list: %v", err) return errors.Errorf("failed to query metricName list: %v", err)
} else if resp.Code != "200" {
s.Log.Errorf("failed to query metricName list: %v", resp.Message)
break
}
var datapoints []map[string]interface{}
if err = json.Unmarshal([]byte(resp.Datapoints), &datapoints); err != nil {
return errors.Errorf("failed to decode response datapoints: %v", err)
}
if len(datapoints) == 0 {
s.Log.Debugf("No metrics returned from CMS, response msg: %s", resp.Message)
break
}
NextDataPoint:
for _, datapoint := range datapoints {
fields := map[string]interface{}{}
datapointTime := int64(0)
tags := map[string]string{}
for key, value := range datapoint {
switch key {
case "instanceId", "BucketName":
tags[key] = value.(string)
if metric.discoveryTags != nil { //discovery can be not activated
//Skipping data point if discovery data not exist
if _, ok := metric.discoveryTags[value.(string)]; !ok &&
!metric.AllowDataPointWODiscoveryData {
s.Log.Warnf("Instance %q is not found in discovery, skipping monitoring datapoint...", value.(string))
continue NextDataPoint
}
for k, v := range metric.discoveryTags[value.(string)] {
tags[k] = v
}
}
case "userId":
tags[key] = value.(string)
case "timestamp":
datapointTime = int64(value.(float64)) / 1000
default:
fields[formatField(metricName, key)] = value
}
} }
//Log.logW("Datapoint time: %s, now: %s", time.Unix(datapointTime, 0).Format(time.RFC3339), time.Now().Format(time.RFC3339)) if resp.Code != "200" {
acc.AddFields(s.measurement, fields, tags, time.Unix(datapointTime, 0)) s.Log.Errorf("failed to query metricName list: %v", resp.Message)
break
}
var datapoints []map[string]interface{}
if err := json.Unmarshal([]byte(resp.Datapoints), &datapoints); err != nil {
return errors.Errorf("failed to decode response datapoints: %v", err)
}
if len(datapoints) == 0 {
s.Log.Debugf("No metrics returned from CMS, response msg: %s", resp.Message)
break
}
NextDataPoint:
for _, datapoint := range datapoints {
fields := map[string]interface{}{}
datapointTime := int64(0)
tags := map[string]string{}
for key, value := range datapoint {
switch key {
case "instanceId", "BucketName":
tags[key] = value.(string)
if metric.discoveryTags != nil { //discovery can be not activated
//Skipping data point if discovery data not exist
_, ok := metric.discoveryTags[value.(string)]
if !ok &&
!metric.AllowDataPointWODiscoveryData {
s.Log.Warnf("Instance %q is not found in discovery, skipping monitoring datapoint...", value.(string))
continue NextDataPoint
}
for k, v := range metric.discoveryTags[value.(string)] {
tags[k] = v
}
}
case "userId":
tags[key] = value.(string)
case "timestamp":
datapointTime = int64(value.(float64)) / 1000
default:
fields[formatField(metricName, key)] = value
}
}
//Log.logW("Datapoint time: %s, now: %s", time.Unix(datapointTime, 0).Format(time.RFC3339), time.Now().Format(time.RFC3339))
acc.AddFields(s.measurement, fields, tags, time.Unix(datapointTime, 0))
}
req.NextToken = resp.NextToken
more = req.NextToken != ""
} }
req.NextToken = resp.NextToken
more = req.NextToken != ""
} }
return nil return nil
} }
//Tag helper //tag helper
func parseTag(tagSpec string, data interface{}) (tagKey string, tagValue string, err error) { func parseTag(tagSpec string, data interface{}) (tagKey string, tagValue string, err error) {
var (
ok bool
queryPath = tagSpec
)
tagKey = tagSpec tagKey = tagSpec
queryPath := tagSpec
//Split query path to tagKey and query path //Split query path to tagKey and query path
if splitted := strings.Split(tagSpec, ":"); len(splitted) == 2 { if splitted := strings.Split(tagSpec, ":"); len(splitted) == 2 {
@ -416,7 +462,7 @@ func parseTag(tagSpec string, data interface{}) (tagKey string, tagValue string,
return "", "", nil return "", "", nil
} }
tagValue, ok := tagRawValue.(string) tagValue, ok = tagRawValue.(string)
if !ok { if !ok {
return "", "", errors.Errorf("Tag value %v parsed by query %q is not a string value", return "", "", errors.Errorf("Tag value %v parsed by query %q is not a string value",
tagRawValue, queryPath) tagRawValue, queryPath)

View File

@ -123,14 +123,13 @@ func TestPluginInitialize(t *testing.T) {
var err error var err error
plugin := new(AliyunCMS) plugin := new(AliyunCMS)
plugin.DiscoveryRegions = []string{"cn-shanghai"} plugin.Log = testutil.Logger{Name: inputTitle}
plugin.dt, err = getDiscoveryTool("acs_slb_dashboard", plugin.DiscoveryRegions) plugin.Regions = []string{"cn-shanghai"}
plugin.dt, err = getDiscoveryTool("acs_slb_dashboard", plugin.Regions)
if err != nil { if err != nil {
t.Fatalf("Can't create discovery tool object: %v", err) t.Fatalf("Can't create discovery tool object: %v", err)
} }
plugin.Log = testutil.Logger{Name: inputTitle}
httpResp := &http.Response{ httpResp := &http.Response{
StatusCode: 200, StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewBufferString( Body: ioutil.NopCloser(bytes.NewBufferString(
@ -150,7 +149,7 @@ func TestPluginInitialize(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Can't create mock sdk cli: %v", err) t.Fatalf("Can't create mock sdk cli: %v", err)
} }
plugin.dt.cli = map[string]aliyunSdkClient{plugin.DiscoveryRegions[0]: &mockCli} plugin.dt.cli = map[string]aliyunSdkClient{plugin.Regions[0]: &mockCli}
tests := []struct { tests := []struct {
name string name string
@ -158,14 +157,24 @@ func TestPluginInitialize(t *testing.T) {
accessKeyID string accessKeyID string
accessKeySecret string accessKeySecret string
expectedErrorString string expectedErrorString string
regions []string
discoveryRegions []string
}{ }{
{ {
name: "Empty project", name: "Empty project",
expectedErrorString: "project is not set", expectedErrorString: "project is not set",
regions: []string{"cn-shanghai"},
}, },
{ {
name: "Valid project", name: "Valid project",
project: "acs_slb_dashboard", project: "acs_slb_dashboard",
regions: []string{"cn-shanghai"},
accessKeyID: "dummy",
accessKeySecret: "dummy",
},
{
name: "'regions' is not set",
project: "acs_slb_dashboard",
accessKeyID: "dummy", accessKeyID: "dummy",
accessKeySecret: "dummy", accessKeySecret: "dummy",
}, },
@ -176,12 +185,16 @@ func TestPluginInitialize(t *testing.T) {
plugin.Project = tt.project plugin.Project = tt.project
plugin.AccessKeyID = tt.accessKeyID plugin.AccessKeyID = tt.accessKeyID
plugin.AccessKeySecret = tt.accessKeySecret plugin.AccessKeySecret = tt.accessKeySecret
plugin.Regions = tt.regions
if tt.expectedErrorString != "" { if tt.expectedErrorString != "" {
require.EqualError(t, plugin.Init(), tt.expectedErrorString) require.EqualError(t, plugin.Init(), tt.expectedErrorString)
} else { } else {
require.Equal(t, nil, plugin.Init()) require.Equal(t, nil, plugin.Init())
} }
if len(tt.regions) == 0 { //Check if set to default
require.Equal(t, plugin.Regions, aliyunRegionList)
}
}) })
} }
} }
@ -224,6 +237,7 @@ func TestGatherMetric(t *testing.T) {
client: new(mockGatherAliyunCMSClient), client: new(mockGatherAliyunCMSClient),
measurement: formatMeasurement("acs_slb_dashboard"), measurement: formatMeasurement("acs_slb_dashboard"),
Log: testutil.Logger{Name: inputTitle}, Log: testutil.Logger{Name: inputTitle},
Regions: []string{"cn-shanghai"},
} }
metric := &Metric{ metric := &Metric{
@ -262,15 +276,15 @@ func TestGather(t *testing.T) {
Dimensions: `{"instanceId": "i-abcdefgh123456"}`, Dimensions: `{"instanceId": "i-abcdefgh123456"}`,
} }
plugin := &AliyunCMS{ plugin := &AliyunCMS{
AccessKeyID: "my_access_key_id", AccessKeyID: "my_access_key_id",
AccessKeySecret: "my_access_key_secret", AccessKeySecret: "my_access_key_secret",
Project: "acs_slb_dashboard", Project: "acs_slb_dashboard",
Metrics: []*Metric{metric}, Metrics: []*Metric{metric},
RateLimit: 200, RateLimit: 200,
measurement: formatMeasurement("acs_slb_dashboard"), measurement: formatMeasurement("acs_slb_dashboard"),
DiscoveryRegions: []string{"cn-shanghai"}, Regions: []string{"cn-shanghai"},
client: new(mockGatherAliyunCMSClient), client: new(mockGatherAliyunCMSClient),
Log: testutil.Logger{Name: inputTitle}, Log: testutil.Logger{Name: inputTitle},
} }
//test table: //test table:
@ -326,7 +340,7 @@ func TestGather(t *testing.T) {
} }
} }
func TestGetDiscoveryDataAllRegions(t *testing.T) { func TestGetDiscoveryDataAcrossRegions(t *testing.T) {
//test table: //test table:
tests := []struct { tests := []struct {
name string name string
@ -391,7 +405,7 @@ func TestGetDiscoveryDataAllRegions(t *testing.T) {
t.Fatalf("Can't create mock sdk cli: %v", err) t.Fatalf("Can't create mock sdk cli: %v", err)
} }
dt.cli = map[string]aliyunSdkClient{tt.region: &mockCli} dt.cli = map[string]aliyunSdkClient{tt.region: &mockCli}
data, err := dt.getDiscoveryDataAllRegions(nil) data, err := dt.getDiscoveryDataAcrossRegions(nil)
require.Equal(t, tt.discData, data) require.Equal(t, tt.discData, data)
if err != nil { if err != nil {

View File

@ -5,6 +5,7 @@ import (
"reflect" "reflect"
"regexp" "regexp"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -16,37 +17,11 @@ import (
"github.com/aliyun/alibaba-cloud-sdk-go/services/rds" "github.com/aliyun/alibaba-cloud-sdk-go/services/rds"
"github.com/aliyun/alibaba-cloud-sdk-go/services/slb" "github.com/aliyun/alibaba-cloud-sdk-go/services/slb"
"github.com/aliyun/alibaba-cloud-sdk-go/services/vpc" "github.com/aliyun/alibaba-cloud-sdk-go/services/vpc"
"github.com/pkg/errors"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/internal/limiter"
"github.com/pkg/errors"
) )
// https://www.alibabacloud.com/help/doc-detail/40654.htm?gclid=Cj0KCQjw4dr0BRCxARIsAKUNjWTAMfyVUn_Y3OevFBV3CMaazrhq0URHsgE7c0m0SeMQRKlhlsJGgIEaAviyEALw_wcB
var aliyunRegionList = []string{
"cn-qingdao",
"cn-beijing",
"cn-zhangjiakou",
"cn-huhehaote",
"cn-hangzhou",
"cn-shanghai",
"cn-shenzhen",
"cn-heyuan",
"cn-chengdu",
"cn-hongkong",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-5",
"ap-south-1",
"ap-northeast-1",
"us-west-1",
"us-east-1",
"eu-central-1",
"eu-west-1",
"me-east-1",
}
type discoveryRequest interface { type discoveryRequest interface {
} }
@ -54,6 +29,7 @@ type aliyunSdkClient interface {
ProcessCommonRequest(req *requests.CommonRequest) (response *responses.CommonResponse, err error) ProcessCommonRequest(req *requests.CommonRequest) (response *responses.CommonResponse, err error)
} }
// discoveryTool is a object that provides discovery feature
type discoveryTool struct { type discoveryTool struct {
req map[string]discoveryRequest //Discovery request (specific per object type) req map[string]discoveryRequest //Discovery request (specific per object type)
rateLimit int //Rate limit for API query, as it is limited by API backend rateLimit int //Rate limit for API query, as it is limited by API backend
@ -70,8 +46,8 @@ type discoveryTool struct {
lg telegraf.Logger //Telegraf logger (should be provided) lg telegraf.Logger //Telegraf logger (should be provided)
} }
type response struct { type parsedDResp struct {
discData []interface{} data []interface{}
totalCount int totalCount int
pageSize int pageSize int
pageNumber int pageNumber int
@ -124,7 +100,8 @@ func newDiscoveryTool(regions []string, project string, lg telegraf.Logger, cred
if len(regions) == 0 { if len(regions) == 0 {
regions = aliyunRegionList regions = aliyunRegionList
lg.Warnf("Discovery regions are not provided! Data will be queried across %d regions!", len(aliyunRegionList)) lg.Infof("'regions' is not provided! Discovery data will be queried across %d regions:\n%s",
len(aliyunRegionList), strings.Join(aliyunRegionList, ","))
} }
if rateLimit == 0 { //Can be a rounding case if rateLimit == 0 { //Can be a rounding case
@ -300,21 +277,21 @@ func newDiscoveryTool(regions []string, project string, lg telegraf.Logger, cred
}, nil }, nil
} }
func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) (discoveryResponse *response, err error) { func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse) (*parsedDResp, error) {
var ( var (
fullOutput = map[string]interface{}{} fullOutput = map[string]interface{}{}
foundDataItem, foundRootKey bool data []byte
discData []interface{} foundDataItem bool
totalCount, pageSize, pageNumber int foundRootKey bool
pdResp = &parsedDResp{}
) )
data := resp.GetHttpContentBytes() data = resp.GetHttpContentBytes()
if data == nil { //No data if data == nil { //No data
return nil, errors.Errorf("No data in response to be parsed") return nil, errors.Errorf("No data in response to be parsed")
} }
err = json.Unmarshal(data, &fullOutput) if err := json.Unmarshal(data, &fullOutput); err != nil {
if err != nil {
return nil, errors.Errorf("Can't parse JSON from discovery response: %v", err) return nil, errors.Errorf("Can't parse JSON from discovery response: %v", err)
} }
@ -329,7 +306,7 @@ func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse)
//It should contain the array with discovered data //It should contain the array with discovered data
for _, item := range rootKeyVal { for _, item := range rootKeyVal {
if discData, foundDataItem = item.([]interface{}); foundDataItem { if pdResp.data, foundDataItem = item.([]interface{}); foundDataItem {
break break
} }
} }
@ -337,70 +314,72 @@ func (dt *discoveryTool) parseDiscoveryResponse(resp *responses.CommonResponse)
return nil, errors.Errorf("Didn't find array item in root key %q", key) return nil, errors.Errorf("Didn't find array item in root key %q", key)
} }
case "TotalCount": case "TotalCount":
totalCount = int(val.(float64)) pdResp.totalCount = int(val.(float64))
case "PageSize": case "PageSize":
pageSize = int(val.(float64)) pdResp.pageSize = int(val.(float64))
case "PageNumber": case "PageNumber":
pageNumber = int(val.(float64)) pdResp.pageNumber = int(val.(float64))
} }
} }
if !foundRootKey { if !foundRootKey {
return nil, errors.Errorf("Didn't find root key %q in discovery response", dt.respRootKey) return nil, errors.Errorf("Didn't find root key %q in discovery response", dt.respRootKey)
} }
return &response{ return pdResp, nil
discData: discData,
totalCount: totalCount,
pageSize: pageSize,
pageNumber: pageNumber,
}, nil
} }
func (dt *discoveryTool) getDiscoveryData(cli aliyunSdkClient, req *requests.CommonRequest, limiterChan chan bool) (map[string]interface{}, error) { func (dt *discoveryTool) getDiscoveryData(cli aliyunSdkClient, req *requests.CommonRequest, lmtr chan bool) (map[string]interface{}, error) {
var discoveryData []interface{} var (
err error
resp *responses.CommonResponse
pDResp *parsedDResp
discoveryData []interface{}
totalCount int
pageNumber int
)
defer delete(req.QueryParams, "PageNumber") defer delete(req.QueryParams, "PageNumber")
for { for {
if limiterChan != nil { if lmtr != nil {
<-limiterChan //Rate limiting <-lmtr //Rate limiting
} }
resp, err := cli.ProcessCommonRequest(req) resp, err = cli.ProcessCommonRequest(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
discoveryResponse, err := dt.parseDiscoveryResponse(resp) pDResp, err = dt.parseDiscoveryResponse(resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
discoveryData = append(discoveryData, discoveryResponse.discData...) discoveryData = append(discoveryData, pDResp.data...)
pageNumber = pDResp.pageNumber
totalCount = pDResp.totalCount
//Pagination //Pagination
discoveryResponse.pageNumber++ pageNumber++
req.QueryParams["PageNumber"] = strconv.Itoa(discoveryResponse.pageNumber) req.QueryParams["PageNumber"] = strconv.Itoa(pageNumber)
if len(discoveryData) == discoveryResponse.totalCount { //All data received if len(discoveryData) == totalCount { //All data received
//Map data to appropriate shape before return //Map data to appropriate shape before return
preparedData := map[string]interface{}{} preparedData := map[string]interface{}{}
for _, raw := range discoveryData { for _, raw := range discoveryData {
elem, ok := raw.(map[string]interface{}) elem, ok := raw.(map[string]interface{})
if !ok { if !ok {
return nil, errors.Errorf("Can't parse input data element, not a map[string]interface{} type") return nil, errors.Errorf("can't parse input data element, not a map[string]interface{} type")
} }
if objectID, ok := elem[dt.respObjectIDKey].(string); ok { if objectID, ok := elem[dt.respObjectIDKey].(string); ok {
preparedData[objectID] = elem preparedData[objectID] = elem
} }
} }
return preparedData, nil return preparedData, nil
} }
} }
} }
func (dt *discoveryTool) getDiscoveryDataAllRegions(limiterChan chan bool) (map[string]interface{}, error) { func (dt *discoveryTool) getDiscoveryDataAcrossRegions(lmtr chan bool) (map[string]interface{}, error) {
var ( var (
data map[string]interface{} data map[string]interface{}
resultData = map[string]interface{}{} resultData = map[string]interface{}{}
@ -431,7 +410,7 @@ func (dt *discoveryTool) getDiscoveryDataAllRegions(limiterChan chan bool) (map[
commonRequest.TransToAcsRequest() commonRequest.TransToAcsRequest()
//Get discovery data using common request //Get discovery data using common request
data, err = dt.getDiscoveryData(cli, commonRequest, limiterChan) data, err = dt.getDiscoveryData(cli, commonRequest, lmtr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -443,7 +422,9 @@ func (dt *discoveryTool) getDiscoveryDataAllRegions(limiterChan chan bool) (map[
return resultData, nil return resultData, nil
} }
func (dt *discoveryTool) Start() { // start the discovery pooling
// In case smth. new found it will be reported back through `DataChan`
func (dt *discoveryTool) start() {
var ( var (
err error err error
data map[string]interface{} data map[string]interface{}
@ -468,7 +449,7 @@ func (dt *discoveryTool) Start() {
case <-dt.done: case <-dt.done:
return return
case <-ticker.C: case <-ticker.C:
data, err = dt.getDiscoveryDataAllRegions(lmtr.C) data, err = dt.getDiscoveryDataAcrossRegions(lmtr.C)
if err != nil { if err != nil {
dt.lg.Errorf("Can't get discovery data: %v", err) dt.lg.Errorf("Can't get discovery data: %v", err)
continue continue
@ -489,7 +470,9 @@ func (dt *discoveryTool) Start() {
}() }()
} }
func (dt *discoveryTool) Stop() { // stop the discovery loop, making sure
// all data is read from 'dataChan'
func (dt *discoveryTool) stop() {
close(dt.done) close(dt.done)
//Shutdown timer //Shutdown timer