feat: Pull metrics from multiple AWS CloudWatch namespaces (#9386)

This commit is contained in:
Alexander Krantz 2021-08-10 14:47:23 -07:00 committed by GitHub
parent b1930526f8
commit 83bd10b4db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 144 additions and 94 deletions

View File

@ -75,8 +75,10 @@ API endpoint. In the following order the plugin will attempt to authenticate.
## Configure the TTL for the internal cache of metrics.
# cache_ttl = "1h"
## Metric Statistic Namespace (required)
namespace = "AWS/ELB"
## Metric Statistic Namespaces (required)
namespaces = ["AWS/ELB"]
# A single metric statistic namespace that will be appended to namespaces on startup
# namespace = "AWS/ELB"
## Maximum requests per second. Note that the global default AWS rate limit is
## 50 reqs/sec, so if you define multiple namespaces, these should add up to a

View File

@ -34,6 +34,7 @@ type CloudWatch struct {
Period config.Duration `toml:"period"`
Delay config.Duration `toml:"delay"`
Namespace string `toml:"namespace"`
Namespaces []string `toml:"namespaces"`
Metrics []*Metric `toml:"metrics"`
CacheTTL config.Duration `toml:"cache_ttl"`
RateLimit int `toml:"ratelimit"`
@ -71,7 +72,7 @@ type metricCache struct {
ttl time.Duration
built time.Time
metrics []filteredMetric
queries []*cwClient.MetricDataQuery
queries map[string][]*cwClient.MetricDataQuery
}
type cloudwatchClient interface {
@ -139,8 +140,10 @@ func (c *CloudWatch) SampleConfig() string {
## Configure the TTL for the internal cache of metrics.
# cache_ttl = "1h"
## Metric Statistic Namespace (required)
namespace = "AWS/ELB"
## Metric Statistic Namespaces (required)
namespaces = ["AWS/ELB"]
# A single metric statistic namespace that will be appended to namespaces on startup
# namespace = "AWS/ELB"
## Maximum requests per second. Note that the global default AWS rate limit is
## 50 reqs/sec, so if you define multiple namespaces, these should add up to a
@ -181,25 +184,28 @@ func (c *CloudWatch) Description() string {
return "Pull Metric Statistics from Amazon CloudWatch"
}
func (c *CloudWatch) Init() error {
if len(c.Namespace) != 0 {
c.Namespaces = append(c.Namespaces, c.Namespace)
}
err := c.initializeCloudWatch()
if err != nil {
return err
}
// Set config level filter (won't change throughout life of plugin).
c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude)
if err != nil {
return err
}
return nil
}
// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval".
func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
if c.statFilter == nil {
var err error
// Set config level filter (won't change throughout life of plugin).
c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude)
if err != nil {
return err
}
}
if c.client == nil {
err := c.initializeCloudWatch()
if err != nil {
return err
}
}
filteredMetrics, err := getFilteredMetrics(c)
if err != nil {
return err
@ -221,32 +227,34 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
wg := sync.WaitGroup{}
rLock := sync.Mutex{}
results := []*cwClient.MetricDataResult{}
results := map[string][]*cwClient.MetricDataResult{}
// 500 is the maximum number of metric data queries a `GetMetricData` request can contain.
batchSize := 500
var batches [][]*cwClient.MetricDataQuery
for namespace, namespacedQueries := range queries {
// 500 is the maximum number of metric data queries a `GetMetricData` request can contain.
batchSize := 500
var batches [][]*cwClient.MetricDataQuery
for batchSize < len(queries) {
queries, batches = queries[batchSize:], append(batches, queries[0:batchSize:batchSize])
}
batches = append(batches, queries)
for batchSize < len(namespacedQueries) {
namespacedQueries, batches = namespacedQueries[batchSize:], append(batches, namespacedQueries[0:batchSize:batchSize])
}
batches = append(batches, namespacedQueries)
for i := range batches {
wg.Add(1)
<-lmtr.C
go func(inm []*cwClient.MetricDataQuery) {
defer wg.Done()
result, err := c.gatherMetrics(c.getDataInputs(inm))
if err != nil {
acc.AddError(err)
return
}
for i := range batches {
wg.Add(1)
<-lmtr.C
go func(n string, inm []*cwClient.MetricDataQuery) {
defer wg.Done()
result, err := c.gatherMetrics(c.getDataInputs(inm))
if err != nil {
acc.AddError(err)
return
}
rLock.Lock()
results = append(results, result...)
rLock.Unlock()
}(batches[i])
rLock.Lock()
results[n] = append(results[n], result...)
rLock.Unlock()
}(namespace, batches[i])
}
}
wg.Wait()
@ -323,11 +331,13 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
}
}
for _, name := range m.MetricNames {
metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String(c.Namespace),
MetricName: aws.String(name),
Dimensions: dimensions,
})
for _, namespace := range c.Namespaces {
metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String(namespace),
MetricName: aws.String(name),
Dimensions: dimensions,
})
}
}
} else {
allMetrics, err := c.fetchNamespaceMetrics()
@ -337,11 +347,13 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
for _, name := range m.MetricNames {
for _, metric := range allMetrics {
if isSelected(name, metric, m.Dimensions) {
metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String(c.Namespace),
MetricName: aws.String(name),
Dimensions: metric.Dimensions,
})
for _, namespace := range c.Namespaces {
metrics = append(metrics, &cwClient.Metric{
Namespace: aws.String(namespace),
MetricName: aws.String(name),
Dimensions: metric.Dimensions,
})
}
}
}
}
@ -399,24 +411,26 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cwClient.Metric, error) {
recentlyActive = nil
}
params = &cwClient.ListMetricsInput{
Namespace: aws.String(c.Namespace),
Dimensions: []*cwClient.DimensionFilter{},
NextToken: token,
MetricName: nil,
RecentlyActive: recentlyActive,
}
for {
resp, err := c.client.ListMetrics(params)
if err != nil {
return nil, err
}
for _, namespace := range c.Namespaces {
params.Namespace = aws.String(namespace)
for {
resp, err := c.client.ListMetrics(params)
if err != nil {
return nil, err
}
metrics = append(metrics, resp.Metrics...)
if resp.NextToken == nil {
break
}
metrics = append(metrics, resp.Metrics...)
if resp.NextToken == nil {
break
}
params.NextToken = resp.NextToken
params.NextToken = resp.NextToken
}
}
return metrics, nil
@ -437,21 +451,21 @@ func (c *CloudWatch) updateWindow(relativeTo time.Time) {
}
// getDataQueries gets all of the possible queries so we can maximize the request payload.
func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClient.MetricDataQuery {
func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) map[string][]*cwClient.MetricDataQuery {
if c.metricCache != nil && c.metricCache.queries != nil && c.metricCache.isValid() {
return c.metricCache.queries
}
c.queryDimensions = map[string]*map[string]string{}
dataQueries := []*cwClient.MetricDataQuery{}
dataQueries := map[string][]*cwClient.MetricDataQuery{}
for i, filtered := range filteredMetrics {
for j, metric := range filtered.metrics {
id := strconv.Itoa(j) + "_" + strconv.Itoa(i)
dimension := ctod(metric.Dimensions)
if filtered.statFilter.Match("average") {
c.queryDimensions["average_"+id] = dimension
dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{
Id: aws.String("average_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_average")),
MetricStat: &cwClient.MetricStat{
@ -463,7 +477,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien
}
if filtered.statFilter.Match("maximum") {
c.queryDimensions["maximum_"+id] = dimension
dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{
Id: aws.String("maximum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_maximum")),
MetricStat: &cwClient.MetricStat{
@ -475,7 +489,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien
}
if filtered.statFilter.Match("minimum") {
c.queryDimensions["minimum_"+id] = dimension
dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{
Id: aws.String("minimum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_minimum")),
MetricStat: &cwClient.MetricStat{
@ -487,7 +501,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien
}
if filtered.statFilter.Match("sum") {
c.queryDimensions["sum_"+id] = dimension
dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{
Id: aws.String("sum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_sum")),
MetricStat: &cwClient.MetricStat{
@ -499,7 +513,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) []*cwClien
}
if filtered.statFilter.Match("sample_count") {
c.queryDimensions["sample_count_"+id] = dimension
dataQueries = append(dataQueries, &cwClient.MetricDataQuery{
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], &cwClient.MetricDataQuery{
Id: aws.String("sample_count_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")),
MetricStat: &cwClient.MetricStat{
@ -554,24 +568,27 @@ func (c *CloudWatch) gatherMetrics(
func (c *CloudWatch) aggregateMetrics(
acc telegraf.Accumulator,
metricDataResults []*cwClient.MetricDataResult,
metricDataResults map[string][]*cwClient.MetricDataResult,
) error {
var (
grouper = internalMetric.NewSeriesGrouper()
namespace = sanitizeMeasurement(c.Namespace)
grouper = internalMetric.NewSeriesGrouper()
)
for _, result := range metricDataResults {
tags := map[string]string{}
for namespace, results := range metricDataResults {
namespace = sanitizeMeasurement(namespace)
if dimensions, ok := c.queryDimensions[*result.Id]; ok {
tags = *dimensions
}
tags["region"] = c.Region
for _, result := range results {
tags := map[string]string{}
for i := range result.Values {
if err := grouper.Add(namespace, tags, *result.Timestamps[i], *result.Label, *result.Values[i]); err != nil {
acc.AddError(err)
if dimensions, ok := c.queryDimensions[*result.Id]; ok {
tags = *dimensions
}
tags["region"] = c.Region
for i := range result.Values {
if err := grouper.Add(namespace, tags, *result.Timestamps[i], *result.Label, *result.Values[i]); err != nil {
acc.AddError(err)
}
}
}
}

View File

@ -116,8 +116,9 @@ func TestGather(t *testing.T) {
}
var acc testutil.Accumulator
c.client = &mockGatherCloudWatchClient{}
require.NoError(t, c.Init())
c.client = &mockGatherCloudWatchClient{}
require.NoError(t, acc.GatherError(c.Gather))
fields := map[string]interface{}{}
@ -135,6 +136,26 @@ func TestGather(t *testing.T) {
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
}
func TestGather_MultipleNamespaces(t *testing.T) {
duration, _ := time.ParseDuration("1m")
internalDuration := config.Duration(duration)
c := &CloudWatch{
Namespaces: []string{"AWS/ELB", "AWS/EC2"},
Delay: internalDuration,
Period: internalDuration,
RateLimit: 200,
}
var acc testutil.Accumulator
require.NoError(t, c.Init())
c.client = &mockGatherCloudWatchClient{}
require.NoError(t, acc.GatherError(c.Gather))
require.True(t, acc.HasMeasurement("cloudwatch_aws_elb"))
require.True(t, acc.HasMeasurement("cloudwatch_aws_ec2"))
}
type mockSelectMetricsCloudWatchClient struct{}
func (m *mockSelectMetricsCloudWatchClient) ListMetrics(_ *cwClient.ListMetricsInput) (*cwClient.ListMetricsOutput, error) {
@ -215,8 +236,7 @@ func TestSelectMetrics(t *testing.T) {
},
},
}
err := c.initializeCloudWatch()
require.NoError(t, err)
require.NoError(t, c.Init())
c.client = &mockSelectMetricsCloudWatchClient{}
filtered, err := getFilteredMetrics(c)
// We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2
@ -231,18 +251,20 @@ func TestGenerateStatisticsInputParams(t *testing.T) {
Value: aws.String("p-example"),
}
namespace := "AWS/ELB"
m := &cwClient.Metric{
MetricName: aws.String("Latency"),
Dimensions: []*cwClient.Dimension{d},
Namespace: &namespace,
}
duration, _ := time.ParseDuration("1m")
internalDuration := config.Duration(duration)
c := &CloudWatch{
Namespace: "AWS/ELB",
Delay: internalDuration,
Period: internalDuration,
Namespaces: []string{namespace},
Delay: internalDuration,
Period: internalDuration,
}
require.NoError(t, c.initializeCloudWatch())
@ -253,7 +275,7 @@ func TestGenerateStatisticsInputParams(t *testing.T) {
statFilter, _ := filter.NewIncludeExcludeFilter(nil, nil)
queries := c.getDataQueries([]filteredMetric{{metrics: []*cwClient.Metric{m}, statFilter: statFilter}})
params := c.getDataInputs(queries)
params := c.getDataInputs(queries[namespace])
require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay)))
require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay)))
@ -268,18 +290,20 @@ func TestGenerateStatisticsInputParamsFiltered(t *testing.T) {
Value: aws.String("p-example"),
}
namespace := "AWS/ELB"
m := &cwClient.Metric{
MetricName: aws.String("Latency"),
Dimensions: []*cwClient.Dimension{d},
Namespace: &namespace,
}
duration, _ := time.ParseDuration("1m")
internalDuration := config.Duration(duration)
c := &CloudWatch{
Namespace: "AWS/ELB",
Delay: internalDuration,
Period: internalDuration,
Namespaces: []string{namespace},
Delay: internalDuration,
Period: internalDuration,
}
require.NoError(t, c.initializeCloudWatch())
@ -290,7 +314,7 @@ func TestGenerateStatisticsInputParamsFiltered(t *testing.T) {
statFilter, _ := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil)
queries := c.getDataQueries([]filteredMetric{{metrics: []*cwClient.Metric{m}, statFilter: statFilter}})
params := c.getDataInputs(queries)
params := c.getDataInputs(queries[namespace])
require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay)))
require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay)))
@ -354,3 +378,10 @@ func TestProxyFunction(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "www.penguins.com", proxyResult.Host)
}
func TestCombineNamespaces(t *testing.T) {
c := &CloudWatch{Namespace: "AWS/ELB", Namespaces: []string{"AWS/EC2", "AWS/Billing"}}
require.NoError(t, c.Init())
require.Equal(t, []string{"AWS/EC2", "AWS/Billing", "AWS/ELB"}, c.Namespaces)
}