diff --git a/plugins/inputs/cloudwatch/README.md b/plugins/inputs/cloudwatch/README.md index e09acc518..97592f519 100644 --- a/plugins/inputs/cloudwatch/README.md +++ b/plugins/inputs/cloudwatch/README.md @@ -75,8 +75,10 @@ API endpoint. In the following order the plugin will attempt to authenticate. ## Configure the TTL for the internal cache of metrics. # cache_ttl = "1h" - ## Metric Statistic Namespace (required) - namespace = "AWS/ELB" + ## Metric Statistic Namespaces (required) + namespaces = ["AWS/ELB"] + # A single metric statistic namespace that will be appended to namespaces on startup + # namespace = "AWS/ELB" ## Maximum requests per second. Note that the global default AWS rate limit is ## 50 reqs/sec, so if you define multiple namespaces, these should add up to a diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index fff2da1d3..1cd795830 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -34,6 +34,7 @@ type CloudWatch struct { Period config.Duration `toml:"period"` Delay config.Duration `toml:"delay"` Namespace string `toml:"namespace"` + Namespaces []string `toml:"namespaces"` Metrics []*Metric `toml:"metrics"` CacheTTL config.Duration `toml:"cache_ttl"` RateLimit int `toml:"ratelimit"` @@ -71,7 +72,7 @@ type metricCache struct { ttl time.Duration built time.Time metrics []filteredMetric - queries []*cwClient.MetricDataQuery + queries map[string][]*cwClient.MetricDataQuery } type cloudwatchClient interface { @@ -139,8 +140,10 @@ func (c *CloudWatch) SampleConfig() string { ## Configure the TTL for the internal cache of metrics. # cache_ttl = "1h" - ## Metric Statistic Namespace (required) - namespace = "AWS/ELB" + ## Metric Statistic Namespaces (required) + namespaces = ["AWS/ELB"] + # A single metric statistic namespace that will be appended to namespaces on startup + # namespace = "AWS/ELB" ## Maximum requests per second. Note that the global default AWS rate limit is ## 50 reqs/sec, so if you define multiple namespaces, these should add up to a @@ -181,25 +184,28 @@ func (c *CloudWatch) Description() string { return "Pull Metric Statistics from Amazon CloudWatch" } +func (c *CloudWatch) Init() error { + if len(c.Namespace) != 0 { + c.Namespaces = append(c.Namespaces, c.Namespace) + } + + err := c.initializeCloudWatch() + if err != nil { + return err + } + + // Set config level filter (won't change throughout life of plugin). + c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude) + if err != nil { + return err + } + + return nil +} + // Gather takes in an accumulator and adds the metrics that the Input // gathers. This is called every "interval". func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { - if c.statFilter == nil { - var err error - // Set config level filter (won't change throughout life of plugin). - c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude) - if err != nil { - return err - } - } - - if c.client == nil { - err := c.initializeCloudWatch() - if err != nil { - return err - } - } - filteredMetrics, err := getFilteredMetrics(c) if err != nil { return err @@ -221,32 +227,34 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { wg := sync.WaitGroup{} rLock := sync.Mutex{} - results := []*cwClient.MetricDataResult{} + results := map[string][]*cwClient.MetricDataResult{} - // 500 is the maximum number of metric data queries a `GetMetricData` request can contain. - batchSize := 500 - var batches [][]*cwClient.MetricDataQuery + for namespace, namespacedQueries := range queries { + // 500 is the maximum number of metric data queries a `GetMetricData` request can contain. + batchSize := 500 + var batches [][]*cwClient.MetricDataQuery - for batchSize < len(queries) { - queries, batches = queries[batchSize:], append(batches, queries[0:batchSize:batchSize]) - } - batches = append(batches, queries) + for batchSize < len(namespacedQueries) { + namespacedQueries, batches = namespacedQueries[batchSize:], append(batches, namespacedQueries[0:batchSize:batchSize]) + } + batches = append(batches, namespacedQueries) - for i := range batches { - wg.Add(1) - <-lmtr.C - go func(inm []*cwClient.MetricDataQuery) { - defer wg.Done() - result, err := c.gatherMetrics(c.getDataInputs(inm)) - if err != nil { - acc.AddError(err) - return - } + for i := range batches { + wg.Add(1) + <-lmtr.C + go func(n string, inm []*cwClient.MetricDataQuery) { + defer wg.Done() + result, err := c.gatherMetrics(c.getDataInputs(inm)) + if err != nil { + acc.AddError(err) + return + } - rLock.Lock() - results = append(results, result...) - rLock.Unlock() - }(batches[i]) + rLock.Lock() + results[n] = append(results[n], result...) + rLock.Unlock() + }(namespace, batches[i]) + } } wg.Wait() @@ -323,11 +331,13 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { } } for _, name := range m.MetricNames { - metrics = append(metrics, &cwClient.Metric{ - Namespace: aws.String(c.Namespace), - MetricName: aws.String(name), - Dimensions: dimensions, - }) + for _, namespace := range c.Namespaces { + metrics = append(metrics, &cwClient.Metric{ + Namespace: aws.String(namespace), + MetricName: aws.String(name), + Dimensions: dimensions, + }) + } } } else { allMetrics, err := c.fetchNamespaceMetrics() @@ -337,11 +347,13 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { for _, name := range m.MetricNames { for _, metric := range allMetrics { if isSelected(name, metric, m.Dimensions) { - metrics = append(metrics, &cwClient.Metric{ - Namespace: aws.String(c.Namespace), - MetricName: aws.String(name), - Dimensions: metric.Dimensions, - }) + for _, namespace := range c.Namespaces { + metrics = append(metrics, &cwClient.Metric{ + Namespace: aws.String(namespace), + MetricName: aws.String(name), + Dimensions: metric.Dimensions, + }) + } } } } @@ -399,24 +411,26 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cwClient.Metric, error) { recentlyActive = nil } params = &cwClient.ListMetricsInput{ - Namespace: aws.String(c.Namespace), Dimensions: []*cwClient.DimensionFilter{}, NextToken: token, MetricName: nil, RecentlyActive: recentlyActive, } - for { - resp, err := c.client.ListMetrics(params) - if err != nil { - return nil, err - } + for _, namespace := range c.Namespaces { + params.Namespace = aws.String(namespace) + for { + resp, err := c.client.ListMetrics(params) + if err != nil { + return nil, err + } - metrics = append(metrics, resp.Metrics...) - if resp.NextToken == nil { - break - } + metrics = append(metrics, resp.Metrics...) + if resp.NextToken == nil { + break + } - params.NextToken = resp.NextToken + params.NextToken = resp.NextToken + } } return metrics, nil @@ -437,21 +451,21 @@ func (c *CloudWatch) updateWindow(relativeTo time.Time) { } // getDataQueries gets all of the possible queries so we can maximize the request payload. -func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClient.MetricDataQuery { +func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) map[string][]*cwClient.MetricDataQuery { if c.metricCache != nil && c.metricCache.queries != nil && c.metricCache.isValid() { return c.metricCache.queries } c.queryDimensions = map[string]*map[string]string{} - dataQueries := []*cwClient.MetricDataQuery{} + dataQueries := map[string][]*cwClient.MetricDataQuery{} for i, filtered := range filteredMetrics { for j, metric := range filtered.metrics { id := strconv.Itoa(j) + "_" + strconv.Itoa(i) dimension := ctod(metric.Dimensions) if filtered.statFilter.Match("average") { c.queryDimensions["average_"+id] = dimension - dataQueries = append(dataQueries, &cwClient.MetricDataQuery{ + dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{ Id: aws.String("average_" + id), Label: aws.String(snakeCase(*metric.MetricName + "_average")), MetricStat: &cwClient.MetricStat{ @@ -463,7 +477,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien } if filtered.statFilter.Match("maximum") { c.queryDimensions["maximum_"+id] = dimension - dataQueries = append(dataQueries, &cwClient.MetricDataQuery{ + dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{ Id: aws.String("maximum_" + id), Label: aws.String(snakeCase(*metric.MetricName + "_maximum")), MetricStat: &cwClient.MetricStat{ @@ -475,7 +489,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien } if filtered.statFilter.Match("minimum") { c.queryDimensions["minimum_"+id] = dimension - dataQueries = append(dataQueries, &cwClient.MetricDataQuery{ + dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{ Id: aws.String("minimum_" + id), Label: aws.String(snakeCase(*metric.MetricName + "_minimum")), MetricStat: &cwClient.MetricStat{ @@ -487,7 +501,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien } if filtered.statFilter.Match("sum") { c.queryDimensions["sum_"+id] = dimension - dataQueries = append(dataQueries, &cwClient.MetricDataQuery{ + dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{ Id: aws.String("sum_" + id), Label: aws.String(snakeCase(*metric.MetricName + "_sum")), MetricStat: &cwClient.MetricStat{ @@ -499,7 +513,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien } if filtered.statFilter.Match("sample_count") { c.queryDimensions["sample_count_"+id] = dimension - dataQueries = append(dataQueries, &cwClient.MetricDataQuery{ + dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{ Id: aws.String("sample_count_" + id), Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")), MetricStat: &cwClient.MetricStat{ @@ -554,24 +568,27 @@ func (c *CloudWatch) gatherMetrics( func (c *CloudWatch) aggregateMetrics( acc telegraf.Accumulator, - metricDataResults []*cwClient.MetricDataResult, + metricDataResults map[string][]*cwClient.MetricDataResult, ) error { var ( - grouper = internalMetric.NewSeriesGrouper() - namespace = sanitizeMeasurement(c.Namespace) + grouper = internalMetric.NewSeriesGrouper() ) - for _, result := range metricDataResults { - tags := map[string]string{} + for namespace, results := range metricDataResults { + namespace = sanitizeMeasurement(namespace) - if dimensions, ok := c.queryDimensions[*result.Id]; ok { - tags = *dimensions - } - tags["region"] = c.Region + for _, result := range results { + tags := map[string]string{} - for i := range result.Values { - if err := grouper.Add(namespace, tags, *result.Timestamps[i], *result.Label, *result.Values[i]); err != nil { - acc.AddError(err) + if dimensions, ok := c.queryDimensions[*result.Id]; ok { + tags = *dimensions + } + tags["region"] = c.Region + + for i := range result.Values { + if err := grouper.Add(namespace, tags, *result.Timestamps[i], *result.Label, *result.Values[i]); err != nil { + acc.AddError(err) + } } } } diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index 860bf41d9..3114240ec 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -116,8 +116,9 @@ func TestGather(t *testing.T) { } var acc testutil.Accumulator - c.client = &mockGatherCloudWatchClient{} + require.NoError(t, c.Init()) + c.client = &mockGatherCloudWatchClient{} require.NoError(t, acc.GatherError(c.Gather)) fields := map[string]interface{}{} @@ -135,6 +136,26 @@ func TestGather(t *testing.T) { acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) } +func TestGather_MultipleNamespaces(t *testing.T) { + duration, _ := time.ParseDuration("1m") + internalDuration := config.Duration(duration) + c := &CloudWatch{ + Namespaces: []string{"AWS/ELB", "AWS/EC2"}, + Delay: internalDuration, + Period: internalDuration, + RateLimit: 200, + } + + var acc testutil.Accumulator + + require.NoError(t, c.Init()) + c.client = &mockGatherCloudWatchClient{} + require.NoError(t, acc.GatherError(c.Gather)) + + require.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) + require.True(t, acc.HasMeasurement("cloudwatch_aws_ec2")) +} + type mockSelectMetricsCloudWatchClient struct{} func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cwClient.ListMetricsInput) (*cwClient.ListMetricsOutput, error) { @@ -215,8 +236,7 @@ func TestSelectMetrics(t *testing.T) { }, }, } - err := c.initializeCloudWatch() - require.NoError(t, err) + require.NoError(t, c.Init()) c.client = &mockSelectMetricsCloudWatchClient{} filtered, err := getFilteredMetrics(c) // We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2 @@ -231,18 +251,20 @@ func TestGenerateStatisticsInputParams(t *testing.T) { Value: aws.String("p-example"), } + namespace := "AWS/ELB" m := &cwClient.Metric{ MetricName: aws.String("Latency"), Dimensions: []*cwClient.Dimension{d}, + Namespace: &namespace, } duration, _ := time.ParseDuration("1m") internalDuration := config.Duration(duration) c := &CloudWatch{ - Namespace: "AWS/ELB", - Delay: internalDuration, - Period: internalDuration, + Namespaces: []string{namespace}, + Delay: internalDuration, + Period: internalDuration, } require.NoError(t, c.initializeCloudWatch()) @@ -253,7 +275,7 @@ func TestGenerateStatisticsInputParams(t *testing.T) { statFilter, _ := filter.NewIncludeExcludeFilter(nil, nil) queries := c.getDataQueries([]filteredMetric{{metrics: []*cwClient.Metric{m}, statFilter: statFilter}}) - params := c.getDataInputs(queries) + params := c.getDataInputs(queries[namespace]) require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay))) require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay))) @@ -268,18 +290,20 @@ func TestGenerateStatisticsInputParamsFiltered(t *testing.T) { Value: aws.String("p-example"), } + namespace := "AWS/ELB" m := &cwClient.Metric{ MetricName: aws.String("Latency"), Dimensions: []*cwClient.Dimension{d}, + Namespace: &namespace, } duration, _ := time.ParseDuration("1m") internalDuration := config.Duration(duration) c := &CloudWatch{ - Namespace: "AWS/ELB", - Delay: internalDuration, - Period: internalDuration, + Namespaces: []string{namespace}, + Delay: internalDuration, + Period: internalDuration, } require.NoError(t, c.initializeCloudWatch()) @@ -290,7 +314,7 @@ func TestGenerateStatisticsInputParamsFiltered(t *testing.T) { statFilter, _ := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil) queries := c.getDataQueries([]filteredMetric{{metrics: []*cwClient.Metric{m}, statFilter: statFilter}}) - params := c.getDataInputs(queries) + params := c.getDataInputs(queries[namespace]) require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay))) require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay))) @@ -354,3 +378,10 @@ func TestProxyFunction(t *testing.T) { require.NoError(t, err) require.Equal(t, "www.penguins.com", proxyResult.Host) } + +func TestCombineNamespaces(t *testing.T) { + c := &CloudWatch{Namespace: "AWS/ELB", Namespaces: []string{"AWS/EC2", "AWS/Billing"}} + + require.NoError(t, c.Init()) + require.Equal(t, []string{"AWS/EC2", "AWS/Billing", "AWS/ELB"}, c.Namespaces) +}