feat(inputs.cloudwatch): Add support for cross account oberservability (#12448)

This commit is contained in:
Muhammad Ahsan Ali 2023-05-24 09:42:30 +02:00 committed by GitHub
parent 7ced2606b2
commit 2010926e25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 170 additions and 87 deletions

View File

@ -50,7 +50,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# role_session_name = ""
# profile = ""
# shared_credential_file = ""
## If you are using CloudWatch cross-account observability, you can
## set IncludeLinkedAccounts to true in a monitoring account
## and collect metrics from the linked source accounts
# include_linked_accounts = false
## Endpoint to make request against, the correct endpoint is automatically
## determined and this option should only be set if you wish to override the
## default.
@ -226,6 +231,8 @@ case](https://en.wikipedia.org/wiki/Snake_case)
- All measurements have the following tags:
- region (CloudWatch Region)
- {dimension-name} (Cloudwatch Dimension value - one per metric dimension)
- If `include_linked_accounts` is set to true then below tag is also provided:
- account (The ID of the account where the metrics are located.)
## Troubleshooting

View File

@ -30,14 +30,6 @@ import (
//go:embed sample.conf
var sampleConfig string
const (
StatisticAverage = "Average"
StatisticMaximum = "Maximum"
StatisticMinimum = "Minimum"
StatisticSum = "Sum"
StatisticSampleCount = "SampleCount"
)
// CloudWatch contains the configuration and cache for the cloudwatch plugin.
type CloudWatch struct {
StatisticExclude []string `toml:"statistic_exclude"`
@ -46,15 +38,16 @@ type CloudWatch struct {
internalProxy.HTTPProxy
Period config.Duration `toml:"period"`
Delay config.Duration `toml:"delay"`
Namespace string `toml:"namespace" deprecated:"1.25.0;use 'namespaces' instead"`
Namespaces []string `toml:"namespaces"`
Metrics []*Metric `toml:"metrics"`
CacheTTL config.Duration `toml:"cache_ttl"`
RateLimit int `toml:"ratelimit"`
RecentlyActive string `toml:"recently_active"`
BatchSize int `toml:"batch_size"`
Period config.Duration `toml:"period"`
Delay config.Duration `toml:"delay"`
Namespace string `toml:"namespace" deprecated:"1.25.0;use 'namespaces' instead"`
Namespaces []string `toml:"namespaces"`
Metrics []*Metric `toml:"metrics"`
CacheTTL config.Duration `toml:"cache_ttl"`
RateLimit int `toml:"ratelimit"`
RecentlyActive string `toml:"recently_active"`
BatchSize int `toml:"batch_size"`
IncludeLinkedAccounts bool `toml:"include_linked_accounts"`
Log telegraf.Logger `toml:"-"`
@ -126,7 +119,6 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
if err != nil {
return err
}
c.updateWindow(time.Now())
// Get all of the possible queries so we can send groups of 100.
@ -172,7 +164,6 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
}
wg.Wait()
return c.aggregateMetrics(acc, results)
}
@ -233,6 +224,7 @@ func (c *CloudWatch) initializeCloudWatch() error {
type filteredMetric struct {
metrics []types.Metric
accounts []string
statFilter filter.Filter
}
@ -248,6 +240,7 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
if c.Metrics != nil {
for _, m := range c.Metrics {
metrics := []types.Metric{}
var accounts []string
if !hasWildcard(m.Dimensions) {
dimensions := make([]types.Dimension, 0, len(m.Dimensions))
for _, d := range m.Dimensions {
@ -266,9 +259,10 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
}
}
} else {
allMetrics := c.fetchNamespaceMetrics()
allMetrics, allAccounts := c.fetchNamespaceMetrics()
for _, name := range m.MetricNames {
for _, metric := range allMetrics {
for i, metric := range allMetrics {
if isSelected(name, metric, m.Dimensions) {
for _, namespace := range c.Namespaces {
metrics = append(metrics, types.Metric{
@ -277,6 +271,9 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
Dimensions: metric.Dimensions,
})
}
if c.IncludeLinkedAccounts {
accounts = append(accounts, allAccounts[i])
}
}
}
}
@ -292,39 +289,39 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
if err != nil {
return nil, err
}
fMetrics = append(fMetrics, filteredMetric{
metrics: metrics,
statFilter: statFilter,
accounts: accounts,
})
}
} else {
metrics := c.fetchNamespaceMetrics()
metrics, accounts := c.fetchNamespaceMetrics()
fMetrics = []filteredMetric{
{
metrics: metrics,
statFilter: c.statFilter,
accounts: accounts,
},
}
}
c.metricCache = &metricCache{
metrics: fMetrics,
built: time.Now(),
ttl: time.Duration(c.CacheTTL),
}
return fMetrics, nil
}
// fetchNamespaceMetrics retrieves available metrics for a given CloudWatch namespace.
func (c *CloudWatch) fetchNamespaceMetrics() []types.Metric {
func (c *CloudWatch) fetchNamespaceMetrics() ([]types.Metric, []string) {
metrics := []types.Metric{}
var accounts []string
for _, namespace := range c.Namespaces {
params := &cwClient.ListMetricsInput{
Dimensions: []types.DimensionFilter{},
Namespace: aws.String(namespace),
Dimensions: []types.DimensionFilter{},
Namespace: aws.String(namespace),
IncludeLinkedAccounts: c.IncludeLinkedAccounts,
}
if c.RecentlyActive == "PT3H" {
params.RecentlyActive = types.RecentlyActivePt3h
@ -338,6 +335,7 @@ func (c *CloudWatch) fetchNamespaceMetrics() []types.Metric {
break
}
metrics = append(metrics, resp.Metrics...)
accounts = append(accounts, resp.OwningAccounts...)
if resp.NextToken == nil {
break
@ -345,7 +343,7 @@ func (c *CloudWatch) fetchNamespaceMetrics() []types.Metric {
params.NextToken = resp.NextToken
}
}
return metrics
return metrics, accounts
}
func (c *CloudWatch) updateWindow(relativeTo time.Time) {
@ -375,63 +373,34 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) map[string
for j, metric := range filtered.metrics {
id := strconv.Itoa(j) + "_" + strconv.Itoa(i)
dimension := ctod(metric.Dimensions)
if filtered.statFilter.Match("average") {
c.queryDimensions["average_"+id] = dimension
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], types.MetricDataQuery{
Id: aws.String("average_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_average")),
MetricStat: &types.MetricStat{
Metric: &filtered.metrics[j],
Period: aws.Int32(int32(time.Duration(c.Period).Seconds())),
Stat: aws.String(StatisticAverage),
},
})
var accountID *string
if c.IncludeLinkedAccounts {
accountID = aws.String(filtered.accounts[j])
(*dimension)["account"] = filtered.accounts[j]
}
if filtered.statFilter.Match("maximum") {
c.queryDimensions["maximum_"+id] = dimension
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], types.MetricDataQuery{
Id: aws.String("maximum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_maximum")),
MetricStat: &types.MetricStat{
Metric: &filtered.metrics[j],
Period: aws.Int32(int32(time.Duration(c.Period).Seconds())),
Stat: aws.String(StatisticMaximum),
},
})
statisticTypes := map[string]string{
"average": "Average",
"maximum": "Maximum",
"minimum": "Minimum",
"sum": "Sum",
"sample_count": "SampleCount",
}
if filtered.statFilter.Match("minimum") {
c.queryDimensions["minimum_"+id] = dimension
for statisticType, statistic := range statisticTypes {
if !filtered.statFilter.Match(statisticType) {
continue
}
queryID := statisticType + "_" + id
c.queryDimensions[queryID] = dimension
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], types.MetricDataQuery{
Id: aws.String("minimum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_minimum")),
Id: aws.String(queryID),
AccountId: accountID,
Label: aws.String(snakeCase(*metric.MetricName + "_" + statisticType)),
MetricStat: &types.MetricStat{
Metric: &filtered.metrics[j],
Period: aws.Int32(int32(time.Duration(c.Period).Seconds())),
Stat: aws.String(StatisticMinimum),
},
})
}
if filtered.statFilter.Match("sum") {
c.queryDimensions["sum_"+id] = dimension
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], types.MetricDataQuery{
Id: aws.String("sum_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_sum")),
MetricStat: &types.MetricStat{
Metric: &filtered.metrics[j],
Period: aws.Int32(int32(time.Duration(c.Period).Seconds())),
Stat: aws.String(StatisticSum),
},
})
}
if filtered.statFilter.Match("sample_count") {
c.queryDimensions["sample_count_"+id] = dimension
dataQueries[*metric.Namespace] = append(dataQueries[*metric.Namespace], types.MetricDataQuery{
Id: aws.String("sample_count_" + id),
Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")),
MetricStat: &types.MetricStat{
Metric: &filtered.metrics[j],
Period: aws.Int32(int32(time.Duration(c.Period).Seconds())),
Stat: aws.String(StatisticSampleCount),
Stat: aws.String(statistic),
},
})
}

View File

@ -26,7 +26,7 @@ func (m *mockGatherCloudWatchClient) ListMetrics(
params *cwClient.ListMetricsInput,
_ ...func(*cwClient.Options),
) (*cwClient.ListMetricsOutput, error) {
return &cwClient.ListMetricsOutput{
response := &cwClient.ListMetricsOutput{
Metrics: []types.Metric{
{
Namespace: params.Namespace,
@ -34,12 +34,26 @@ func (m *mockGatherCloudWatchClient) ListMetrics(
Dimensions: []types.Dimension{
{
Name: aws.String("LoadBalancerName"),
Value: aws.String("p-example"),
Value: aws.String("p-example1"),
},
},
},
{
Namespace: params.Namespace,
MetricName: aws.String("Latency"),
Dimensions: []types.Dimension{
{
Name: aws.String("LoadBalancerName"),
Value: aws.String("p-example2"),
},
},
},
},
}, nil
}
if params.IncludeLinkedAccounts {
(*response).OwningAccounts = []string{"123456789012", "923456789017"}
}
return response, nil
}
func (m *mockGatherCloudWatchClient) GetMetricData(
@ -94,6 +108,51 @@ func (m *mockGatherCloudWatchClient) GetMetricData(
},
Values: []float64{100},
},
{
Id: aws.String("minimum_1_0"),
Label: aws.String("latency_minimum"),
StatusCode: types.StatusCodeComplete,
Timestamps: []time.Time{
*params.EndTime,
},
Values: []float64{0.1},
},
{
Id: aws.String("maximum_1_0"),
Label: aws.String("latency_maximum"),
StatusCode: types.StatusCodeComplete,
Timestamps: []time.Time{
*params.EndTime,
},
Values: []float64{0.3},
},
{
Id: aws.String("average_1_0"),
Label: aws.String("latency_average"),
StatusCode: types.StatusCodeComplete,
Timestamps: []time.Time{
*params.EndTime,
},
Values: []float64{0.2},
},
{
Id: aws.String("sum_1_0"),
Label: aws.String("latency_sum"),
StatusCode: types.StatusCodeComplete,
Timestamps: []time.Time{
*params.EndTime,
},
Values: []float64{124},
},
{
Id: aws.String("sample_count_1_0"),
Label: aws.String("latency_sample_count"),
StatusCode: types.StatusCodeComplete,
Timestamps: []time.Time{
*params.EndTime,
},
Values: []float64{100},
},
},
}, nil
}
@ -133,12 +192,55 @@ func TestGather(t *testing.T) {
tags := map[string]string{}
tags["region"] = "us-east-1"
tags["load_balancer_name"] = "p-example"
tags["load_balancer_name"] = "p-example1"
require.True(t, acc.HasMeasurement("cloudwatch_aws_elb"))
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
}
func TestMultiAccountGather(t *testing.T) {
duration, _ := time.ParseDuration("1m")
internalDuration := config.Duration(duration)
c := &CloudWatch{
CredentialConfig: internalaws.CredentialConfig{
Region: "us-east-1",
},
Namespace: "AWS/ELB",
Delay: internalDuration,
Period: internalDuration,
RateLimit: 200,
BatchSize: 500,
Log: testutil.Logger{},
IncludeLinkedAccounts: true,
}
var acc testutil.Accumulator
require.NoError(t, c.Init())
c.client = &mockGatherCloudWatchClient{}
require.NoError(t, acc.GatherError(c.Gather))
fields := map[string]interface{}{}
fields["latency_minimum"] = 0.1
fields["latency_maximum"] = 0.3
fields["latency_average"] = 0.2
fields["latency_sum"] = 123.0
fields["latency_sample_count"] = 100.0
tags := map[string]string{}
tags["region"] = "us-east-1"
tags["load_balancer_name"] = "p-example1"
tags["account"] = "123456789012"
require.True(t, acc.HasMeasurement("cloudwatch_aws_elb"))
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
tags["load_balancer_name"] = "p-example2"
tags["account"] = "923456789017"
fields["latency_sum"] = 124.0
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
}
func TestGather_MultipleNamespaces(t *testing.T) {
duration, _ := time.ParseDuration("1m")
internalDuration := config.Duration(duration)

View File

@ -21,7 +21,12 @@
# role_session_name = ""
# profile = ""
# shared_credential_file = ""
## If you are using CloudWatch cross-account observability, you can
## set IncludeLinkedAccounts to true in a monitoring account
## and collect metrics from the linked source accounts
# include_linked_accounts = false
## Endpoint to make request against, the correct endpoint is automatically
## determined and this option should only be set if you wish to override the
## default.