diff --git a/plugins/inputs/cloudwatch/README.md b/plugins/inputs/cloudwatch/README.md index 3dd70d916..1fcd5e098 100644 --- a/plugins/inputs/cloudwatch/README.md +++ b/plugins/inputs/cloudwatch/README.md @@ -50,7 +50,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. # role_session_name = "" # profile = "" # shared_credential_file = "" - + + ## If you are using CloudWatch cross-account observability, you can + ## set IncludeLinkedAccounts to true in a monitoring account + ## and collect metrics from the linked source accounts + # include_linked_accounts = false + ## Endpoint to make request against, the correct endpoint is automatically ## determined and this option should only be set if you wish to override the ## default. @@ -226,6 +231,8 @@ case](https://en.wikipedia.org/wiki/Snake_case) - All measurements have the following tags: - region (CloudWatch Region) - {dimension-name} (Cloudwatch Dimension value - one per metric dimension) +- If `include_linked_accounts` is set to true then below tag is also provided: + - account (The ID of the account where the metrics are located.) ## Troubleshooting diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index 1e26b9d2b..6f3795929 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -30,14 +30,6 @@ import ( //go:embed sample.conf var sampleConfig string -const ( - StatisticAverage = "Average" - StatisticMaximum = "Maximum" - StatisticMinimum = "Minimum" - StatisticSum = "Sum" - StatisticSampleCount = "SampleCount" -) - // CloudWatch contains the configuration and cache for the cloudwatch plugin. type CloudWatch struct { StatisticExclude []string `toml:"statistic_exclude"` @@ -46,15 +38,16 @@ type CloudWatch struct { internalProxy.HTTPProxy - Period config.Duration `toml:"period"` - Delay config.Duration `toml:"delay"` - Namespace string `toml:"namespace" deprecated:"1.25.0;use 'namespaces' instead"` - Namespaces []string `toml:"namespaces"` - Metrics []*Metric `toml:"metrics"` - CacheTTL config.Duration `toml:"cache_ttl"` - RateLimit int `toml:"ratelimit"` - RecentlyActive string `toml:"recently_active"` - BatchSize int `toml:"batch_size"` + Period config.Duration `toml:"period"` + Delay config.Duration `toml:"delay"` + Namespace string `toml:"namespace" deprecated:"1.25.0;use 'namespaces' instead"` + Namespaces []string `toml:"namespaces"` + Metrics []*Metric `toml:"metrics"` + CacheTTL config.Duration `toml:"cache_ttl"` + RateLimit int `toml:"ratelimit"` + RecentlyActive string `toml:"recently_active"` + BatchSize int `toml:"batch_size"` + IncludeLinkedAccounts bool `toml:"include_linked_accounts"` Log telegraf.Logger `toml:"-"` @@ -126,7 +119,6 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { if err != nil { return err } - c.updateWindow(time.Now()) // Get all of the possible queries so we can send groups of 100. @@ -172,7 +164,6 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { } wg.Wait() - return c.aggregateMetrics(acc, results) } @@ -233,6 +224,7 @@ func (c *CloudWatch) initializeCloudWatch() error { type filteredMetric struct { metrics []types.Metric + accounts []string statFilter filter.Filter } @@ -248,6 +240,7 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { if c.Metrics != nil { for _, m := range c.Metrics { metrics := []types.Metric{} + var accounts []string if !hasWildcard(m.Dimensions) { dimensions := make([]types.Dimension, 0, len(m.Dimensions)) for _, d := range m.Dimensions { @@ -266,9 +259,10 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { } } } else { - allMetrics := c.fetchNamespaceMetrics() + allMetrics, allAccounts := c.fetchNamespaceMetrics() + for _, name := range m.MetricNames { - for _, metric := range allMetrics { + for i, metric := range allMetrics { if isSelected(name, metric, m.Dimensions) { for _, namespace := range c.Namespaces { metrics = append(metrics, types.Metric{ @@ -277,6 +271,9 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { Dimensions: metric.Dimensions, }) } + if c.IncludeLinkedAccounts { + accounts = append(accounts, allAccounts[i]) + } } } } @@ -292,39 +289,39 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { if err != nil { return nil, err } - fMetrics = append(fMetrics, filteredMetric{ metrics: metrics, statFilter: statFilter, + accounts: accounts, }) } } else { - metrics := c.fetchNamespaceMetrics() + metrics, accounts := c.fetchNamespaceMetrics() fMetrics = []filteredMetric{ { metrics: metrics, statFilter: c.statFilter, + accounts: accounts, }, } } - c.metricCache = &metricCache{ metrics: fMetrics, built: time.Now(), ttl: time.Duration(c.CacheTTL), } - return fMetrics, nil } // fetchNamespaceMetrics retrieves available metrics for a given CloudWatch namespace. -func (c *CloudWatch) fetchNamespaceMetrics() []types.Metric { +func (c *CloudWatch) fetchNamespaceMetrics() ([]types.Metric, []string) { metrics := []types.Metric{} - + var accounts []string for _, namespace := range c.Namespaces { params := &cwClient.ListMetricsInput{ - Dimensions: []types.DimensionFilter{}, - Namespace: aws.String(namespace), + Dimensions: []types.DimensionFilter{}, + Namespace: aws.String(namespace), + IncludeLinkedAccounts: c.IncludeLinkedAccounts, } if c.RecentlyActive == "PT3H" { params.RecentlyActive = types.RecentlyActivePt3h @@ -338,6 +335,7 @@ func (c *CloudWatch) fetchNamespaceMetrics() []types.Metric { break } metrics = append(metrics, resp.Metrics...) + accounts = append(accounts, resp.OwningAccounts...) if resp.NextToken == nil { break @@ -345,7 +343,7 @@ func (c *CloudWatch) fetchNamespaceMetrics() []types.Metric { params.NextToken = resp.NextToken } } - return metrics + return metrics, accounts } func (c *CloudWatch) updateWindow(relativeTo time.Time) { @@ -375,63 +373,34 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) map[string for j, metric := range filtered.metrics { id := strconv.Itoa(j) + "_" + strconv.Itoa(i) dimension := ctod(metric.Dimensions) - if filtered.statFilter.Match("average") { - c.queryDimensions["average_"+id] = dimension - dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], types.MetricDataQuery{ - Id: aws.String("average_" + id), - Label: aws.String(snakeCase(*metric.MetricName + "_average")), - MetricStat: &types.MetricStat{ - Metric: &filtered.metrics[j], - Period: aws.Int32(int32(time.Duration(c.Period).Seconds())), - Stat: aws.String(StatisticAverage), - }, - }) + var accountID *string + if c.IncludeLinkedAccounts { + accountID = aws.String(filtered.accounts[j]) + (*dimension)["account"] = filtered.accounts[j] } - if filtered.statFilter.Match("maximum") { - c.queryDimensions["maximum_"+id] = dimension - dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], types.MetricDataQuery{ - Id: aws.String("maximum_" + id), - Label: aws.String(snakeCase(*metric.MetricName + "_maximum")), - MetricStat: &types.MetricStat{ - Metric: &filtered.metrics[j], - Period: aws.Int32(int32(time.Duration(c.Period).Seconds())), - Stat: aws.String(StatisticMaximum), - }, - }) + + statisticTypes := map[string]string{ + "average": "Average", + "maximum": "Maximum", + "minimum": "Minimum", + "sum": "Sum", + "sample_count": "SampleCount", } - if filtered.statFilter.Match("minimum") { - c.queryDimensions["minimum_"+id] = dimension + + for statisticType, statistic := range statisticTypes { + if !filtered.statFilter.Match(statisticType) { + continue + } + queryID := statisticType + "_" + id + c.queryDimensions[queryID] = dimension dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], types.MetricDataQuery{ - Id: aws.String("minimum_" + id), - Label: aws.String(snakeCase(*metric.MetricName + "_minimum")), + Id: aws.String(queryID), + AccountId: accountID, + Label: aws.String(snakeCase(*metric.MetricName + "_" + statisticType)), MetricStat: &types.MetricStat{ Metric: &filtered.metrics[j], Period: aws.Int32(int32(time.Duration(c.Period).Seconds())), - Stat: aws.String(StatisticMinimum), - }, - }) - } - if filtered.statFilter.Match("sum") { - c.queryDimensions["sum_"+id] = dimension - dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], types.MetricDataQuery{ - Id: aws.String("sum_" + id), - Label: aws.String(snakeCase(*metric.MetricName + "_sum")), - MetricStat: &types.MetricStat{ - Metric: &filtered.metrics[j], - Period: aws.Int32(int32(time.Duration(c.Period).Seconds())), - Stat: aws.String(StatisticSum), - }, - }) - } - if filtered.statFilter.Match("sample_count") { - c.queryDimensions["sample_count_"+id] = dimension - dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], types.MetricDataQuery{ - Id: aws.String("sample_count_" + id), - Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")), - MetricStat: &types.MetricStat{ - Metric: &filtered.metrics[j], - Period: aws.Int32(int32(time.Duration(c.Period).Seconds())), - Stat: aws.String(StatisticSampleCount), + Stat: aws.String(statistic), }, }) } diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index 77b353547..9a5e87fe5 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -26,7 +26,7 @@ func (m *mockGatherCloudWatchClient) ListMetrics( params *cwClient.ListMetricsInput, _ ...func(*cwClient.Options), ) (*cwClient.ListMetricsOutput, error) { - return &cwClient.ListMetricsOutput{ + response := &cwClient.ListMetricsOutput{ Metrics: []types.Metric{ { Namespace: params.Namespace, @@ -34,12 +34,26 @@ func (m *mockGatherCloudWatchClient) ListMetrics( Dimensions: []types.Dimension{ { Name: aws.String("LoadBalancerName"), - Value: aws.String("p-example"), + Value: aws.String("p-example1"), + }, + }, + }, + { + Namespace: params.Namespace, + MetricName: aws.String("Latency"), + Dimensions: []types.Dimension{ + { + Name: aws.String("LoadBalancerName"), + Value: aws.String("p-example2"), }, }, }, }, - }, nil + } + if params.IncludeLinkedAccounts { + (*response).OwningAccounts = []string{"123456789012", "923456789017"} + } + return response, nil } func (m *mockGatherCloudWatchClient) GetMetricData( @@ -94,6 +108,51 @@ func (m *mockGatherCloudWatchClient) GetMetricData( }, Values: []float64{100}, }, + { + Id: aws.String("minimum_1_0"), + Label: aws.String("latency_minimum"), + StatusCode: types.StatusCodeComplete, + Timestamps: []time.Time{ + *params.EndTime, + }, + Values: []float64{0.1}, + }, + { + Id: aws.String("maximum_1_0"), + Label: aws.String("latency_maximum"), + StatusCode: types.StatusCodeComplete, + Timestamps: []time.Time{ + *params.EndTime, + }, + Values: []float64{0.3}, + }, + { + Id: aws.String("average_1_0"), + Label: aws.String("latency_average"), + StatusCode: types.StatusCodeComplete, + Timestamps: []time.Time{ + *params.EndTime, + }, + Values: []float64{0.2}, + }, + { + Id: aws.String("sum_1_0"), + Label: aws.String("latency_sum"), + StatusCode: types.StatusCodeComplete, + Timestamps: []time.Time{ + *params.EndTime, + }, + Values: []float64{124}, + }, + { + Id: aws.String("sample_count_1_0"), + Label: aws.String("latency_sample_count"), + StatusCode: types.StatusCodeComplete, + Timestamps: []time.Time{ + *params.EndTime, + }, + Values: []float64{100}, + }, }, }, nil } @@ -133,12 +192,55 @@ func TestGather(t *testing.T) { tags := map[string]string{} tags["region"] = "us-east-1" - tags["load_balancer_name"] = "p-example" + tags["load_balancer_name"] = "p-example1" require.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) } +func TestMultiAccountGather(t *testing.T) { + duration, _ := time.ParseDuration("1m") + internalDuration := config.Duration(duration) + c := &CloudWatch{ + CredentialConfig: internalaws.CredentialConfig{ + Region: "us-east-1", + }, + Namespace: "AWS/ELB", + Delay: internalDuration, + Period: internalDuration, + RateLimit: 200, + BatchSize: 500, + Log: testutil.Logger{}, + IncludeLinkedAccounts: true, + } + + var acc testutil.Accumulator + + require.NoError(t, c.Init()) + c.client = &mockGatherCloudWatchClient{} + require.NoError(t, acc.GatherError(c.Gather)) + + fields := map[string]interface{}{} + fields["latency_minimum"] = 0.1 + fields["latency_maximum"] = 0.3 + fields["latency_average"] = 0.2 + fields["latency_sum"] = 123.0 + fields["latency_sample_count"] = 100.0 + + tags := map[string]string{} + tags["region"] = "us-east-1" + tags["load_balancer_name"] = "p-example1" + tags["account"] = "123456789012" + + require.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) + acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) + + tags["load_balancer_name"] = "p-example2" + tags["account"] = "923456789017" + fields["latency_sum"] = 124.0 + acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) +} + func TestGather_MultipleNamespaces(t *testing.T) { duration, _ := time.ParseDuration("1m") internalDuration := config.Duration(duration) diff --git a/plugins/inputs/cloudwatch/sample.conf b/plugins/inputs/cloudwatch/sample.conf index 26d311003..fd9f48ecc 100644 --- a/plugins/inputs/cloudwatch/sample.conf +++ b/plugins/inputs/cloudwatch/sample.conf @@ -21,7 +21,12 @@ # role_session_name = "" # profile = "" # shared_credential_file = "" - + + ## If you are using CloudWatch cross-account observability, you can + ## set IncludeLinkedAccounts to true in a monitoring account + ## and collect metrics from the linked source accounts + # include_linked_accounts = false + ## Endpoint to make request against, the correct endpoint is automatically ## determined and this option should only be set if you wish to override the ## default.