diff --git a/plugins/inputs/cloudwatch/README.md b/plugins/inputs/cloudwatch/README.md index 06b6e0bc6..7873a068b 100644 --- a/plugins/inputs/cloudwatch/README.md +++ b/plugins/inputs/cloudwatch/README.md @@ -100,13 +100,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Do not enable if "period" or "delay" is longer than 3 hours, as it will ## not return data more than 3 hours old. ## See https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_ListMetrics.html - #recently_active = "PT3H" + # recently_active = "PT3H" ## Configure the TTL for the internal cache of metrics. # cache_ttl = "1h" - ## Metric Statistic Namespaces (required) - namespaces = ["AWS/ELB"] + ## Metric Statistic Namespaces, wildcards are allowed + # namespaces = ["*"] ## Metric Format ## This determines the format of the produces metrics. 'sparse', the default diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go index f9caa49f5..dc3a0302b 100644 --- a/plugins/inputs/cloudwatch/cloudwatch.go +++ b/plugins/inputs/cloudwatch/cloudwatch.go @@ -8,6 +8,7 @@ import ( "net" "net/http" "regexp" + "slices" "strconv" "strings" "sync" @@ -20,7 +21,6 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/filter" - "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal/limiter" "github.com/influxdata/telegraf/metric" common_aws "github.com/influxdata/telegraf/plugins/common/aws" @@ -31,7 +31,6 @@ import ( //go:embed sample.conf var sampleConfig string -// CloudWatch contains the configuration and cache for the cloudwatch plugin. type CloudWatch struct { StatisticExclude []string `toml:"statistic_exclude"` StatisticInclude []string `toml:"statistic_include"` @@ -51,33 +50,30 @@ type CloudWatch struct { IncludeLinkedAccounts bool `toml:"include_linked_accounts"` MetricFormat string `toml:"metric_format"` Log telegraf.Logger `toml:"-"` + common_aws.CredentialConfig client cloudwatchClient + nsFilter filter.Filter statFilter filter.Filter - metricCache *metricCache + cache *metricCache queryDimensions map[string]*map[string]string windowStart time.Time windowEnd time.Time - - common_aws.CredentialConfig } -// cloudwatchMetric defines a simplified Cloudwatch metric. type cloudwatchMetric struct { - StatisticExclude *[]string `toml:"statistic_exclude"` - StatisticInclude *[]string `toml:"statistic_include"` MetricNames []string `toml:"names"` Dimensions []*dimension `toml:"dimensions"` + StatisticExclude *[]string `toml:"statistic_exclude"` + StatisticInclude *[]string `toml:"statistic_include"` } -// dimension defines a simplified Cloudwatch dimension (provides metric filtering). type dimension struct { Name string `toml:"name"` Value string `toml:"value"` valueMatcher filter.Filter } -// metricCache caches metrics, their filters, and generated queries. type metricCache struct { ttl time.Duration built time.Time @@ -85,6 +81,12 @@ type metricCache struct { queries map[string][]types.MetricDataQuery } +type filteredMetric struct { + metrics []types.Metric + accounts []string + statFilter filter.Filter +} + type cloudwatchClient interface { ListMetrics(context.Context, *cloudwatch.ListMetricsInput, ...func(*cloudwatch.Options)) (*cloudwatch.ListMetricsOutput, error) GetMetricData(context.Context, *cloudwatch.GetMetricDataInput, ...func(*cloudwatch.Options)) (*cloudwatch.GetMetricDataOutput, error) @@ -95,10 +97,12 @@ func (*CloudWatch) SampleConfig() string { } func (c *CloudWatch) Init() error { + // For backward compatibility if len(c.Namespace) != 0 { c.Namespaces = append(c.Namespaces, c.Namespace) } + // Check user settings switch c.MetricFormat { case "": c.MetricFormat = "sparse" @@ -107,22 +111,69 @@ func (c *CloudWatch) Init() error { return fmt.Errorf("invalid metric_format: %s", c.MetricFormat) } - err := c.initializeCloudWatch() + // Setup the cloudwatch client + proxyFunc, err := c.HTTPProxy.Proxy() if err != nil { - return err + return fmt.Errorf("creating proxy failed: %w", err) } - // Set config level filter (won't change throughout life of plugin). + creds, err := c.CredentialConfig.Credentials() + if err != nil { + return fmt.Errorf("getting credentials failed: %w", err) + } + + c.client = cloudwatch.NewFromConfig(creds, func(options *cloudwatch.Options) { + if c.CredentialConfig.EndpointURL != "" && c.CredentialConfig.Region != "" { + options.BaseEndpoint = &c.CredentialConfig.EndpointURL + } + + options.ClientLogMode = 0 + options.HTTPClient = &http.Client{ + // use values from DefaultTransport + Transport: &http.Transport{ + Proxy: proxyFunc, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + Timeout: time.Duration(c.Timeout), + } + }) + + // Initialize filter for metric dimensions to include + for _, m := range c.Metrics { + for _, dimension := range m.Dimensions { + matcher, err := filter.NewIncludeExcludeFilter([]string{dimension.Value}, nil) + if err != nil { + return fmt.Errorf("creating dimension filter for dimension %q failed: %w", dimension, err) + } + dimension.valueMatcher = matcher + } + } + + // Initialize statistics-type filter c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude) if err != nil { - return err + return fmt.Errorf("creating statistics filter failed: %w", err) + } + + // Initialize namespace filter + c.nsFilter, err = filter.Compile(c.Namespaces) + if err != nil { + return fmt.Errorf("creating namespace filter failed: %w", err) } return nil } func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { - filteredMetrics, err := getFilteredMetrics(c) + filteredMetrics, err := c.getFilteredMetrics() if err != nil { return err } @@ -156,7 +207,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { <-lmtr.C go func(n string, inm []types.MetricDataQuery) { defer wg.Done() - result, err := c.gatherMetrics(c.getDataInputs(inm)) + result, err := c.gatherMetrics(inm) if err != nil { acc.AddError(err) return @@ -174,182 +225,107 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error { return nil } -func (c *CloudWatch) initializeCloudWatch() error { - proxyFunc, err := c.HTTPProxy.Proxy() - if err != nil { - return err +func (c *CloudWatch) getFilteredMetrics() ([]filteredMetric, error) { + if c.cache != nil && c.cache.metrics != nil && time.Since(c.cache.built) < c.cache.ttl { + return c.cache.metrics, nil } - awsCreds, err := c.CredentialConfig.Credentials() - if err != nil { - return err + // Get all metrics from cloudwatch for filtering + params := &cloudwatch.ListMetricsInput{ + IncludeLinkedAccounts: &c.IncludeLinkedAccounts, + } + if c.RecentlyActive == "PT3H" { + params.RecentlyActive = types.RecentlyActivePt3h } - c.client = cloudwatch.NewFromConfig(awsCreds, func(options *cloudwatch.Options) { - if c.CredentialConfig.EndpointURL != "" && c.CredentialConfig.Region != "" { - options.BaseEndpoint = &c.CredentialConfig.EndpointURL + // Return the subset of metrics matching the namespace and at one of the + // metric definitions if any + var metrics []types.Metric + var accounts []string + for { + resp, err := c.client.ListMetrics(context.Background(), params) + if err != nil { + c.Log.Errorf("failed to list metrics: %v", err) + break } - - options.ClientLogMode = 0 - options.HTTPClient = &http.Client{ - // use values from DefaultTransport - Transport: &http.Transport{ - Proxy: proxyFunc, - DialContext: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - DualStack: true, - }).DialContext, - MaxIdleConns: 100, - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - }, - Timeout: time.Duration(c.Timeout), - } - }) - - // Initialize regex matchers for each dimension value. - for _, m := range c.Metrics { - for _, dimension := range m.Dimensions { - matcher, err := filter.NewIncludeExcludeFilter([]string{dimension.Value}, nil) - if err != nil { - return err - } - - dimension.valueMatcher = matcher - } - } - - return nil -} - -type filteredMetric struct { - metrics []types.Metric - accounts []string - statFilter filter.Filter -} - -// getFilteredMetrics returns metrics specified in the config file or metrics listed from Cloudwatch. -func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) { - if c.metricCache != nil && c.metricCache.isValid() { - return c.metricCache.metrics, nil - } - - fMetrics := make([]filteredMetric, 0) - - // check for provided metric filter - if c.Metrics != nil { - for _, m := range c.Metrics { - metrics := make([]types.Metric, 0) - var accounts []string - if !hasWildcard(m.Dimensions) { - dimensions := make([]types.Dimension, 0, len(m.Dimensions)) + c.Log.Tracef("got %d metrics with %d accounts", len(resp.Metrics), len(resp.OwningAccounts)) + for i, m := range resp.Metrics { + if c.Log.Level().Includes(telegraf.Trace) { + dims := make([]string, 0, len(m.Dimensions)) for _, d := range m.Dimensions { - dimensions = append(dimensions, types.Dimension{ - Name: aws.String(d.Name), - Value: aws.String(d.Value), - }) + dims = append(dims, *d.Name+"="+*d.Value) } - for _, name := range m.MetricNames { - for _, namespace := range c.Namespaces { - metrics = append(metrics, types.Metric{ - Namespace: aws.String(namespace), - MetricName: aws.String(name), - Dimensions: dimensions, - }) - } - } - if c.IncludeLinkedAccounts { - _, allAccounts := c.fetchNamespaceMetrics() - accounts = append(accounts, allAccounts...) - } - } else { - allMetrics, allAccounts := c.fetchNamespaceMetrics() - - for _, name := range m.MetricNames { - for i, singleMetric := range allMetrics { - if isSelected(name, singleMetric, m.Dimensions) { - for _, namespace := range c.Namespaces { - metrics = append(metrics, types.Metric{ - Namespace: aws.String(namespace), - MetricName: aws.String(name), - Dimensions: singleMetric.Dimensions, - }) - } - if c.IncludeLinkedAccounts { - accounts = append(accounts, allAccounts[i]) - } - } - } + a := "none" + if len(resp.OwningAccounts) > 0 { + a = resp.OwningAccounts[i] } + c.Log.Tracef(" metric %3d: %s (%s): %s [%s]\n", i, *m.MetricName, *m.Namespace, strings.Join(dims, ", "), a) } - if m.StatisticExclude == nil { - m.StatisticExclude = &c.StatisticExclude + if c.nsFilter != nil && !c.nsFilter.Match(*m.Namespace) { + c.Log.Trace(" -> rejected by namespace") + continue } - if m.StatisticInclude == nil { - m.StatisticInclude = &c.StatisticInclude + + if len(c.Metrics) > 0 && !slices.ContainsFunc(c.Metrics, func(cm *cloudwatchMetric) bool { + return metricMatch(cm, m) + }) { + c.Log.Trace(" -> rejected by metric mismatch") + continue } - statFilter, err := filter.NewIncludeExcludeFilter(*m.StatisticInclude, *m.StatisticExclude) - if err != nil { - return nil, err + c.Log.Trace(" -> keeping metric") + + metrics = append(metrics, m) + if len(resp.OwningAccounts) > 0 { + accounts = append(accounts, resp.OwningAccounts[i]) } - fMetrics = append(fMetrics, filteredMetric{ - metrics: metrics, - statFilter: statFilter, - accounts: accounts, - }) } + + if resp.NextToken == nil { + break + } + params.NextToken = resp.NextToken + } + + var filtered []filteredMetric + if len(c.Metrics) == 0 { + filtered = append(filtered, filteredMetric{ + metrics: metrics, + accounts: accounts, + statFilter: c.statFilter, + }) } else { - metrics, accounts := c.fetchNamespaceMetrics() - fMetrics = []filteredMetric{ - { - metrics: metrics, - statFilter: c.statFilter, - accounts: accounts, - }, + for idx, cm := range c.Metrics { + var entry filteredMetric + if cm.StatisticInclude == nil && cm.StatisticExclude == nil { + entry.statFilter = c.statFilter + } else { + f, err := filter.NewIncludeExcludeFilter(*cm.StatisticInclude, *cm.StatisticExclude) + if err != nil { + return nil, fmt.Errorf("creating statistics filter for metric %d failed: %w", idx+1, err) + } + entry.statFilter = f + } + + for i, m := range metrics { + if metricMatch(cm, m) { + entry.metrics = append(entry.metrics, m) + if len(accounts) > 0 { + entry.accounts = append(entry.accounts, accounts[i]) + } + } + } + filtered = append(filtered, entry) } } - c.metricCache = &metricCache{ - metrics: fMetrics, + + c.cache = &metricCache{ + metrics: filtered, built: time.Now(), ttl: time.Duration(c.CacheTTL), } - return fMetrics, nil -} -// fetchNamespaceMetrics retrieves available metrics for a given CloudWatch namespace. -func (c *CloudWatch) fetchNamespaceMetrics() ([]types.Metric, []string) { - metrics := make([]types.Metric, 0) - var accounts []string - for _, namespace := range c.Namespaces { - params := &cloudwatch.ListMetricsInput{ - Dimensions: make([]types.DimensionFilter, 0), - Namespace: aws.String(namespace), - IncludeLinkedAccounts: &c.IncludeLinkedAccounts, - } - if c.RecentlyActive == "PT3H" { - params.RecentlyActive = types.RecentlyActivePt3h - } - - for { - resp, err := c.client.ListMetrics(context.Background(), params) - if err != nil { - c.Log.Errorf("failed to list metrics with namespace %s: %v", namespace, err) - // skip problem namespace on error and continue to next namespace - break - } - metrics = append(metrics, resp.Metrics...) - accounts = append(accounts, resp.OwningAccounts...) - - if resp.NextToken == nil { - break - } - params.NextToken = resp.NextToken - } - } - return metrics, accounts + return filtered, nil } func (c *CloudWatch) updateWindow(relativeTo time.Time) { @@ -368,8 +344,8 @@ func (c *CloudWatch) updateWindow(relativeTo time.Time) { // getDataQueries gets all of the possible queries so we can maximize the request payload. func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) map[string][]types.MetricDataQuery { - if c.metricCache != nil && c.metricCache.queries != nil && c.metricCache.isValid() { - return c.metricCache.queries + if c.cache != nil && c.cache.queries != nil && c.cache.metrics != nil && time.Since(c.cache.built) < c.cache.ttl { + return c.cache.queries } c.queryDimensions = make(map[string]*map[string]string) @@ -417,25 +393,27 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) map[string return nil } - if c.metricCache == nil { - c.metricCache = &metricCache{ + if c.cache == nil { + c.cache = &metricCache{ queries: dataQueries, built: time.Now(), ttl: time.Duration(c.CacheTTL), } } else { - c.metricCache.queries = dataQueries + c.cache.queries = dataQueries } return dataQueries } -// gatherMetrics gets metric data from Cloudwatch. -func (c *CloudWatch) gatherMetrics( - params *cloudwatch.GetMetricDataInput, -) ([]types.MetricDataResult, error) { - results := make([]types.MetricDataResult, 0) +func (c *CloudWatch) gatherMetrics(queries []types.MetricDataQuery) ([]types.MetricDataResult, error) { + params := &cloudwatch.GetMetricDataInput{ + StartTime: aws.Time(c.windowStart), + EndTime: aws.Time(c.windowEnd), + MetricDataQueries: queries, + } + results := make([]types.MetricDataResult, 0) for { resp, err := c.client.GetMetricData(context.Background(), params) if err != nil { @@ -489,84 +467,13 @@ func (c *CloudWatch) aggregateMetrics(acc telegraf.Accumulator, metricDataResult } } -func sanitizeMeasurement(namespace string) string { - namespace = strings.ReplaceAll(namespace, "/", "_") - namespace = snakeCase(namespace) - return "cloudwatch_" + namespace -} - -func snakeCase(s string) string { - s = internal.SnakeCase(s) - s = strings.ReplaceAll(s, " ", "_") - s = strings.ReplaceAll(s, "__", "_") - return s -} - -// ctod converts cloudwatch dimensions to regular dimensions. -func ctod(cDimensions []types.Dimension) *map[string]string { - dimensions := make(map[string]string, len(cDimensions)) - for i := range cDimensions { - dimensions[snakeCase(*cDimensions[i].Name)] = *cDimensions[i].Value - } - return &dimensions -} - -func (c *CloudWatch) getDataInputs(dataQueries []types.MetricDataQuery) *cloudwatch.GetMetricDataInput { - return &cloudwatch.GetMetricDataInput{ - StartTime: aws.Time(c.windowStart), - EndTime: aws.Time(c.windowEnd), - MetricDataQueries: dataQueries, - } -} - -// isValid checks the validity of the metric cache. -func (f *metricCache) isValid() bool { - return f.metrics != nil && time.Since(f.built) < f.ttl -} - -func hasWildcard(dimensions []*dimension) bool { - for _, d := range dimensions { - if d.Value == "" || strings.ContainsAny(d.Value, "*?[") { - return true - } - } - return false -} - -func isSelected(name string, cloudwatchMetric types.Metric, dimensions []*dimension) bool { - if name != *cloudwatchMetric.MetricName { - return false - } - if len(cloudwatchMetric.Dimensions) != len(dimensions) { - return false - } - for _, d := range dimensions { - selected := false - for _, d2 := range cloudwatchMetric.Dimensions { - if d.Name == *d2.Name { - if d.Value == "" || d.valueMatcher.Match(*d2.Value) { - selected = true - } - } - } - if !selected { - return false - } - } - return true -} - -func newCloudWatch() *CloudWatch { - return &CloudWatch{ - CacheTTL: config.Duration(time.Hour), - RateLimit: 25, - Timeout: config.Duration(time.Second * 5), - BatchSize: 500, - } -} - func init() { inputs.Add("cloudwatch", func() telegraf.Input { - return newCloudWatch() + return &CloudWatch{ + CacheTTL: config.Duration(time.Hour), + RateLimit: 25, + Timeout: config.Duration(time.Second * 5), + BatchSize: 500, + } }) } diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go index 602da9b46..443c2fdaf 100644 --- a/plugins/inputs/cloudwatch/cloudwatch_test.go +++ b/plugins/inputs/cloudwatch/cloudwatch_test.go @@ -10,26 +10,482 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/metric" common_aws "github.com/influxdata/telegraf/plugins/common/aws" "github.com/influxdata/telegraf/plugins/common/proxy" "github.com/influxdata/telegraf/testutil" ) -type mockGatherCloudWatchClient struct{} +func TestSnakeCase(t *testing.T) { + require.Equal(t, "cluster_name", snakeCase("Cluster Name")) + require.Equal(t, "broker_id", snakeCase("Broker ID")) +} -func (*mockGatherCloudWatchClient) ListMetrics( - _ context.Context, - params *cloudwatch.ListMetricsInput, - _ ...func(*cloudwatch.Options), -) (*cloudwatch.ListMetricsOutput, error) { - response := &cloudwatch.ListMetricsOutput{ - Metrics: []types.Metric{ +func TestGather(t *testing.T) { + plugin := &CloudWatch{ + CredentialConfig: common_aws.CredentialConfig{ + Region: "us-east-1", + }, + Namespace: "AWS/ELB", + Delay: config.Duration(1 * time.Minute), + Period: config.Duration(1 * time.Minute), + RateLimit: 200, + BatchSize: 500, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + plugin.client = defaultMockClient("AWS/ELB") + + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + expected := []telegraf.Metric{ + metric.New( + "cloudwatch_aws_elb", + map[string]string{ + "region": "us-east-1", + "load_balancer_name": "p-example1", + }, + map[string]interface{}{ + "latency_minimum": 0.1, + "latency_maximum": 0.3, + "latency_average": 0.2, + "latency_sum": 123.0, + "latency_sample_count": 100.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cloudwatch_aws_elb", + map[string]string{ + "region": "us-east-1", + "load_balancer_name": "p-example2", + }, + map[string]interface{}{ + "latency_minimum": 0.1, + "latency_maximum": 0.3, + "latency_average": 0.2, + "latency_sum": 124.0, + "latency_sample_count": 100.0, + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) +} + +func TestGatherDenseMetric(t *testing.T) { + plugin := &CloudWatch{ + CredentialConfig: common_aws.CredentialConfig{ + Region: "us-east-1", + }, + Namespace: "AWS/ELB", + Delay: config.Duration(1 * time.Minute), + Period: config.Duration(1 * time.Minute), + RateLimit: 200, + BatchSize: 500, + MetricFormat: "dense", + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + plugin.client = defaultMockClient("AWS/ELB") + + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + expected := []telegraf.Metric{ + metric.New( + "cloudwatch_aws_elb", + map[string]string{ + "region": "us-east-1", + "load_balancer_name": "p-example1", + "metric_name": "latency", + }, + map[string]interface{}{ + "minimum": 0.1, + "maximum": 0.3, + "average": 0.2, + "sum": 123.0, + "sample_count": 100.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cloudwatch_aws_elb", + map[string]string{ + "region": "us-east-1", + "load_balancer_name": "p-example2", + "metric_name": "latency", + }, + map[string]interface{}{ + "minimum": 0.1, + "maximum": 0.3, + "average": 0.2, + "sum": 124.0, + "sample_count": 100.0, + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) +} + +func TestMultiAccountGather(t *testing.T) { + plugin := &CloudWatch{ + CredentialConfig: common_aws.CredentialConfig{ + Region: "us-east-1", + }, + Namespace: "AWS/ELB", + Delay: config.Duration(1 * time.Minute), + Period: config.Duration(1 * time.Minute), + RateLimit: 200, + BatchSize: 500, + Log: testutil.Logger{}, + IncludeLinkedAccounts: true, + } + require.NoError(t, plugin.Init()) + plugin.client = defaultMockClient("AWS/ELB") + + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + expected := []telegraf.Metric{ + metric.New( + "cloudwatch_aws_elb", + map[string]string{ + "region": "us-east-1", + "load_balancer_name": "p-example1", + "account": "123456789012", + }, + map[string]interface{}{ + "latency_minimum": 0.1, + "latency_maximum": 0.3, + "latency_average": 0.2, + "latency_sum": 123.0, + "latency_sample_count": 100.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cloudwatch_aws_elb", + map[string]string{ + "region": "us-east-1", + "load_balancer_name": "p-example2", + "account": "923456789017", + }, + map[string]interface{}{ + "latency_minimum": 0.1, + "latency_maximum": 0.3, + "latency_average": 0.2, + "latency_sum": 124.0, + "latency_sample_count": 100.0, + }, + time.Unix(0, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime()) +} + +func TestGatherMultipleNamespaces(t *testing.T) { + plugin := &CloudWatch{ + CredentialConfig: common_aws.CredentialConfig{ + Region: "us-east-1", + }, + Namespaces: []string{"AWS/ELB", "AWS/EC2"}, + Delay: config.Duration(1 * time.Minute), + Period: config.Duration(1 * time.Minute), + RateLimit: 200, + BatchSize: 500, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + plugin.client = defaultMockClient("AWS/ELB", "AWS/EC2") + + var acc testutil.Accumulator + require.NoError(t, acc.GatherError(plugin.Gather)) + + expected := []telegraf.Metric{ + metric.New( + "cloudwatch_aws_elb", + map[string]string{ + "region": "us-east-1", + "load_balancer_name": "p-example1", + }, + map[string]interface{}{ + "latency_minimum": 0.1, + "latency_maximum": 0.3, + "latency_average": 0.2, + "latency_sum": 123.0, + "latency_sample_count": 100.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cloudwatch_aws_elb", + map[string]string{ + "region": "us-east-1", + "load_balancer_name": "p-example2", + }, + map[string]interface{}{ + "latency_minimum": 0.1, + "latency_maximum": 0.3, + "latency_average": 0.2, + "latency_sum": 124.0, + "latency_sample_count": 100.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cloudwatch_aws_ec2", + map[string]string{ + "region": "us-east-1", + "load_balancer_name": "p-example1", + }, + map[string]interface{}{ + "latency_minimum": 0.1, + "latency_maximum": 0.3, + "latency_average": 0.2, + "latency_sum": 123.0, + "latency_sample_count": 100.0, + }, + time.Unix(0, 0), + ), + metric.New( + "cloudwatch_aws_ec2", + map[string]string{ + "region": "us-east-1", + "load_balancer_name": "p-example2", + }, + map[string]interface{}{ + "latency_minimum": 0.1, + "latency_maximum": 0.3, + "latency_average": 0.2, + "latency_sum": 124.0, + "latency_sample_count": 100.0, + }, + time.Unix(0, 0), + ), + } + + option := []cmp.Option{ + testutil.IgnoreTime(), + testutil.SortMetrics(), + } + + testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), option...) +} + +func TestSelectMetrics(t *testing.T) { + plugin := &CloudWatch{ + CredentialConfig: common_aws.CredentialConfig{ + Region: "us-east-1", + }, + Namespace: "AWS/ELB", + Delay: config.Duration(1 * time.Minute), + Period: config.Duration(1 * time.Minute), + RateLimit: 200, + BatchSize: 500, + Metrics: []*cloudwatchMetric{ { - Namespace: params.Namespace, + MetricNames: []string{"Latency", "RequestCount"}, + Dimensions: []*dimension{ + { + Name: "LoadBalancerName", + Value: "lb*", + }, + { + Name: "AvailabilityZone", + Value: "us-east*", + }, + }, + }, + }, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + plugin.client = selectedMockClient() + filtered, err := plugin.getFilteredMetrics() + // We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2 + // AZs. We should get 12 metrics. + require.Len(t, filtered[0].metrics, 12) + require.NoError(t, err) +} + +func TestGenerateStatisticsInputParams(t *testing.T) { + d := types.Dimension{ + Name: aws.String("LoadBalancerName"), + Value: aws.String("p-example"), + } + + namespace := "AWS/ELB" + m := types.Metric{ + MetricName: aws.String("Latency"), + Dimensions: []types.Dimension{d}, + Namespace: aws.String(namespace), + } + + plugin := &CloudWatch{ + Namespaces: []string{namespace}, + Delay: config.Duration(1 * time.Minute), + Period: config.Duration(1 * time.Minute), + BatchSize: 500, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + now := time.Now() + + plugin.updateWindow(now) + + statFilter, err := filter.NewIncludeExcludeFilter(nil, nil) + require.NoError(t, err) + queries := plugin.getDataQueries([]filteredMetric{{metrics: []types.Metric{m}, statFilter: statFilter}}) + params := &cloudwatch.GetMetricDataInput{ + StartTime: aws.Time(plugin.windowStart), + EndTime: aws.Time(plugin.windowEnd), + MetricDataQueries: queries[namespace], + } + + require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(plugin.Delay))) + require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(plugin.Period)).Add(-time.Duration(plugin.Delay))) + require.Len(t, params.MetricDataQueries, 5) + require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) + require.EqualValues(t, 60, *params.MetricDataQueries[0].MetricStat.Period) +} + +func TestGenerateStatisticsInputParamsFiltered(t *testing.T) { + d := types.Dimension{ + Name: aws.String("LoadBalancerName"), + Value: aws.String("p-example"), + } + + namespace := "AWS/ELB" + m := types.Metric{ + MetricName: aws.String("Latency"), + Dimensions: []types.Dimension{d}, + Namespace: aws.String(namespace), + } + + plugin := &CloudWatch{ + Namespaces: []string{namespace}, + Delay: config.Duration(1 * time.Minute), + Period: config.Duration(1 * time.Minute), + BatchSize: 500, + Log: testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + now := time.Now() + + plugin.updateWindow(now) + + statFilter, err := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil) + require.NoError(t, err) + queries := plugin.getDataQueries([]filteredMetric{{metrics: []types.Metric{m}, statFilter: statFilter}}) + params := &cloudwatch.GetMetricDataInput{ + StartTime: aws.Time(plugin.windowStart), + EndTime: aws.Time(plugin.windowEnd), + MetricDataQueries: queries[namespace], + } + + require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(plugin.Delay))) + require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(plugin.Period)).Add(-time.Duration(plugin.Delay))) + require.Len(t, params.MetricDataQueries, 2) + require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) + require.EqualValues(t, 60, *params.MetricDataQueries[0].MetricStat.Period) +} + +func TestMetricsCacheTimeout(t *testing.T) { + cache := &metricCache{ + metrics: make([]filteredMetric, 0), + built: time.Now(), + ttl: time.Minute, + } + + require.True(t, cache.metrics != nil && time.Since(cache.built) < cache.ttl) + cache.built = time.Now().Add(-time.Minute) + require.False(t, cache.metrics != nil && time.Since(cache.built) < cache.ttl) +} + +func TestUpdateWindow(t *testing.T) { + plugin := &CloudWatch{ + Namespace: "AWS/ELB", + Delay: config.Duration(1 * time.Minute), + Period: config.Duration(1 * time.Minute), + BatchSize: 500, + Log: testutil.Logger{}, + } + + now := time.Now() + + require.True(t, plugin.windowEnd.IsZero()) + require.True(t, plugin.windowStart.IsZero()) + + plugin.updateWindow(now) + + newStartTime := plugin.windowEnd + + // initial window just has a single period + require.EqualValues(t, plugin.windowEnd, now.Add(-time.Duration(plugin.Delay))) + require.EqualValues(t, plugin.windowStart, now.Add(-time.Duration(plugin.Delay)).Add(-time.Duration(plugin.Period))) + + now = time.Now() + plugin.updateWindow(now) + + // subsequent window uses previous end time as start time + require.EqualValues(t, plugin.windowEnd, now.Add(-time.Duration(plugin.Delay))) + require.EqualValues(t, plugin.windowStart, newStartTime) +} + +func TestProxyFunction(t *testing.T) { + proxyCfg := proxy.HTTPProxy{HTTPProxyURL: "http://www.penguins.com"} + + proxyFunction, err := proxyCfg.Proxy() + require.NoError(t, err) + + u, err := url.Parse("https://monitoring.us-west-1.amazonaws.com/") + require.NoError(t, err) + + proxyResult, err := proxyFunction(&http.Request{URL: u}) + require.NoError(t, err) + require.Equal(t, "www.penguins.com", proxyResult.Host) +} + +func TestCombineNamespaces(t *testing.T) { + plugin := &CloudWatch{ + Namespace: "AWS/ELB", + Namespaces: []string{"AWS/EC2", "AWS/Billing"}, + BatchSize: 500, + Log: testutil.Logger{}, + } + + require.NoError(t, plugin.Init()) + require.Equal(t, []string{"AWS/EC2", "AWS/Billing", "AWS/ELB"}, plugin.Namespaces) +} + +// INTERNAL mock client implementation +type mockClient struct { + metrics []types.Metric +} + +func defaultMockClient(namespaces ...string) *mockClient { + c := &mockClient{ + metrics: make([]types.Metric, 0, len(namespaces)), + } + + for _, namespace := range namespaces { + c.metrics = append(c.metrics, + types.Metric{ + Namespace: aws.String(namespace), MetricName: aws.String("Latency"), Dimensions: []types.Dimension{ { @@ -38,8 +494,8 @@ func (*mockGatherCloudWatchClient) ListMetrics( }, }, }, - { - Namespace: params.Namespace, + types.Metric{ + Namespace: aws.String(namespace), MetricName: aws.String("Latency"), Dimensions: []types.Dimension{ { @@ -47,16 +503,68 @@ func (*mockGatherCloudWatchClient) ListMetrics( Value: aws.String("p-example2"), }, }, - }, - }, + }) } + return c +} + +func selectedMockClient() *mockClient { + c := &mockClient{ + metrics: make([]types.Metric, 0, 4*3*2), + } + // 4 metrics for 3 ELBs in 2 AZs + for _, m := range []string{"Latency", "RequestCount", "HealthyHostCount", "UnHealthyHostCount"} { + for _, lb := range []string{"lb-1", "lb-2", "lb-3"} { + // For each metric/ELB pair, we get an aggregate value across all AZs. + c.metrics = append(c.metrics, types.Metric{ + Namespace: aws.String("AWS/ELB"), + MetricName: aws.String(m), + Dimensions: []types.Dimension{ + { + Name: aws.String("LoadBalancerName"), + Value: aws.String(lb), + }, + }, + }) + for _, az := range []string{"us-east-1a", "us-east-1b"} { + // We get a metric for each metric/ELB/AZ triplet. + c.metrics = append(c.metrics, types.Metric{ + Namespace: aws.String("AWS/ELB"), + MetricName: aws.String(m), + Dimensions: []types.Dimension{ + { + Name: aws.String("LoadBalancerName"), + Value: aws.String(lb), + }, + { + Name: aws.String("AvailabilityZone"), + Value: aws.String(az), + }, + }, + }) + } + } + } + + return c +} + +func (c *mockClient) ListMetrics( + _ context.Context, + params *cloudwatch.ListMetricsInput, + _ ...func(*cloudwatch.Options), +) (*cloudwatch.ListMetricsOutput, error) { + response := &cloudwatch.ListMetricsOutput{ + Metrics: c.metrics, + } + if params.IncludeLinkedAccounts != nil && *params.IncludeLinkedAccounts { - (*response).OwningAccounts = []string{"123456789012", "923456789017"} + response.OwningAccounts = []string{"123456789012", "923456789017"} } return response, nil } -func (*mockGatherCloudWatchClient) GetMetricData( +func (*mockClient) GetMetricData( _ context.Context, params *cloudwatch.GetMetricDataInput, _ ...func(*cloudwatch.Options), @@ -156,415 +664,3 @@ func (*mockGatherCloudWatchClient) GetMetricData( }, }, nil } - -func TestSnakeCase(t *testing.T) { - require.Equal(t, "cluster_name", snakeCase("Cluster Name")) - require.Equal(t, "broker_id", snakeCase("Broker ID")) -} - -func TestGather(t *testing.T) { - duration, err := time.ParseDuration("1m") - require.NoError(t, err) - internalDuration := config.Duration(duration) - c := &CloudWatch{ - CredentialConfig: common_aws.CredentialConfig{ - Region: "us-east-1", - }, - Namespace: "AWS/ELB", - Delay: internalDuration, - Period: internalDuration, - RateLimit: 200, - BatchSize: 500, - Log: testutil.Logger{}, - } - - var acc testutil.Accumulator - - require.NoError(t, c.Init()) - c.client = &mockGatherCloudWatchClient{} - require.NoError(t, acc.GatherError(c.Gather)) - - fields := map[string]interface{}{} - fields["latency_minimum"] = 0.1 - fields["latency_maximum"] = 0.3 - fields["latency_average"] = 0.2 - fields["latency_sum"] = 123.0 - fields["latency_sample_count"] = 100.0 - - tags := map[string]string{} - tags["region"] = "us-east-1" - tags["load_balancer_name"] = "p-example1" - - require.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) - acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) -} - -func TestGatherDenseMetric(t *testing.T) { - duration, err := time.ParseDuration("1m") - require.NoError(t, err) - internalDuration := config.Duration(duration) - c := &CloudWatch{ - CredentialConfig: common_aws.CredentialConfig{ - Region: "us-east-1", - }, - Namespace: "AWS/ELB", - Delay: internalDuration, - Period: internalDuration, - RateLimit: 200, - BatchSize: 500, - MetricFormat: "dense", - Log: testutil.Logger{}, - } - - var acc testutil.Accumulator - - require.NoError(t, c.Init()) - c.client = &mockGatherCloudWatchClient{} - require.NoError(t, acc.GatherError(c.Gather)) - - fields := map[string]interface{}{} - fields["minimum"] = 0.1 - fields["maximum"] = 0.3 - fields["average"] = 0.2 - fields["sum"] = 123.0 - fields["sample_count"] = 100.0 - - tags := map[string]string{} - tags["region"] = "us-east-1" - tags["load_balancer_name"] = "p-example1" - tags["metric_name"] = "latency" - - require.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) - acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) -} - -func TestMultiAccountGather(t *testing.T) { - duration, err := time.ParseDuration("1m") - require.NoError(t, err) - internalDuration := config.Duration(duration) - c := &CloudWatch{ - CredentialConfig: common_aws.CredentialConfig{ - Region: "us-east-1", - }, - Namespace: "AWS/ELB", - Delay: internalDuration, - Period: internalDuration, - RateLimit: 200, - BatchSize: 500, - Log: testutil.Logger{}, - IncludeLinkedAccounts: true, - } - - var acc testutil.Accumulator - - require.NoError(t, c.Init()) - c.client = &mockGatherCloudWatchClient{} - require.NoError(t, acc.GatherError(c.Gather)) - - fields := map[string]interface{}{} - fields["latency_minimum"] = 0.1 - fields["latency_maximum"] = 0.3 - fields["latency_average"] = 0.2 - fields["latency_sum"] = 123.0 - fields["latency_sample_count"] = 100.0 - - tags := map[string]string{} - tags["region"] = "us-east-1" - tags["load_balancer_name"] = "p-example1" - tags["account"] = "123456789012" - - require.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) - acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) - - tags["load_balancer_name"] = "p-example2" - tags["account"] = "923456789017" - fields["latency_sum"] = 124.0 - acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags) -} - -func TestGather_MultipleNamespaces(t *testing.T) { - duration, err := time.ParseDuration("1m") - require.NoError(t, err) - internalDuration := config.Duration(duration) - c := &CloudWatch{ - Namespaces: []string{"AWS/ELB", "AWS/EC2"}, - Delay: internalDuration, - Period: internalDuration, - RateLimit: 200, - BatchSize: 500, - Log: testutil.Logger{}, - } - - var acc testutil.Accumulator - - require.NoError(t, c.Init()) - c.client = &mockGatherCloudWatchClient{} - require.NoError(t, acc.GatherError(c.Gather)) - - require.True(t, acc.HasMeasurement("cloudwatch_aws_elb")) - require.True(t, acc.HasMeasurement("cloudwatch_aws_ec2")) -} - -type mockSelectMetricsCloudWatchClient struct{} - -func (*mockSelectMetricsCloudWatchClient) ListMetrics( - context.Context, - *cloudwatch.ListMetricsInput, - ...func(*cloudwatch.Options), -) (*cloudwatch.ListMetricsOutput, error) { - metrics := make([]types.Metric, 0) - // 4 metrics are available - metricNames := []string{"Latency", "RequestCount", "HealthyHostCount", "UnHealthyHostCount"} - // for 3 ELBs - loadBalancers := []string{"lb-1", "lb-2", "lb-3"} - // in 2 AZs - availabilityZones := []string{"us-east-1a", "us-east-1b"} - for _, m := range metricNames { - for _, lb := range loadBalancers { - // For each metric/ELB pair, we get an aggregate value across all AZs. - metrics = append(metrics, types.Metric{ - Namespace: aws.String("AWS/ELB"), - MetricName: aws.String(m), - Dimensions: []types.Dimension{ - { - Name: aws.String("LoadBalancerName"), - Value: aws.String(lb), - }, - }, - }) - for _, az := range availabilityZones { - // We get a metric for each metric/ELB/AZ triplet. - metrics = append(metrics, types.Metric{ - Namespace: aws.String("AWS/ELB"), - MetricName: aws.String(m), - Dimensions: []types.Dimension{ - { - Name: aws.String("LoadBalancerName"), - Value: aws.String(lb), - }, - { - Name: aws.String("AvailabilityZone"), - Value: aws.String(az), - }, - }, - }) - } - } - } - - result := &cloudwatch.ListMetricsOutput{ - Metrics: metrics, - } - return result, nil -} - -func (*mockSelectMetricsCloudWatchClient) GetMetricData( - context.Context, - *cloudwatch.GetMetricDataInput, - ...func(*cloudwatch.Options), -) (*cloudwatch.GetMetricDataOutput, error) { - return nil, nil -} - -func TestSelectMetrics(t *testing.T) { - duration, err := time.ParseDuration("1m") - require.NoError(t, err) - internalDuration := config.Duration(duration) - c := &CloudWatch{ - CredentialConfig: common_aws.CredentialConfig{ - Region: "us-east-1", - }, - Namespace: "AWS/ELB", - Delay: internalDuration, - Period: internalDuration, - RateLimit: 200, - BatchSize: 500, - Metrics: []*cloudwatchMetric{ - { - MetricNames: []string{"Latency", "RequestCount"}, - Dimensions: []*dimension{ - { - Name: "LoadBalancerName", - Value: "lb*", - }, - { - Name: "AvailabilityZone", - Value: "us-east*", - }, - }, - }, - }, - Log: testutil.Logger{}, - } - require.NoError(t, c.Init()) - c.client = &mockSelectMetricsCloudWatchClient{} - filtered, err := getFilteredMetrics(c) - // We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2 - // AZs. We should get 12 metrics. - require.Len(t, filtered[0].metrics, 12) - require.NoError(t, err) -} - -func TestGenerateStatisticsInputParams(t *testing.T) { - d := types.Dimension{ - Name: aws.String("LoadBalancerName"), - Value: aws.String("p-example"), - } - - namespace := "AWS/ELB" - m := types.Metric{ - MetricName: aws.String("Latency"), - Dimensions: []types.Dimension{d}, - Namespace: aws.String(namespace), - } - - duration, err := time.ParseDuration("1m") - require.NoError(t, err) - internalDuration := config.Duration(duration) - - c := &CloudWatch{ - Namespaces: []string{namespace}, - Delay: internalDuration, - Period: internalDuration, - BatchSize: 500, - Log: testutil.Logger{}, - } - - require.NoError(t, c.initializeCloudWatch()) - - now := time.Now() - - c.updateWindow(now) - - statFilter, err := filter.NewIncludeExcludeFilter(nil, nil) - require.NoError(t, err) - queries := c.getDataQueries([]filteredMetric{{metrics: []types.Metric{m}, statFilter: statFilter}}) - params := c.getDataInputs(queries[namespace]) - - require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay))) - require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay))) - require.Len(t, params.MetricDataQueries, 5) - require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) - require.EqualValues(t, 60, *params.MetricDataQueries[0].MetricStat.Period) -} - -func TestGenerateStatisticsInputParamsFiltered(t *testing.T) { - d := types.Dimension{ - Name: aws.String("LoadBalancerName"), - Value: aws.String("p-example"), - } - - namespace := "AWS/ELB" - m := types.Metric{ - MetricName: aws.String("Latency"), - Dimensions: []types.Dimension{d}, - Namespace: aws.String(namespace), - } - - duration, err := time.ParseDuration("1m") - require.NoError(t, err) - internalDuration := config.Duration(duration) - - c := &CloudWatch{ - Namespaces: []string{namespace}, - Delay: internalDuration, - Period: internalDuration, - BatchSize: 500, - Log: testutil.Logger{}, - } - - require.NoError(t, c.initializeCloudWatch()) - - now := time.Now() - - c.updateWindow(now) - - statFilter, err := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil) - require.NoError(t, err) - queries := c.getDataQueries([]filteredMetric{{metrics: []types.Metric{m}, statFilter: statFilter}}) - params := c.getDataInputs(queries[namespace]) - - require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay))) - require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay))) - require.Len(t, params.MetricDataQueries, 2) - require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1) - require.EqualValues(t, 60, *params.MetricDataQueries[0].MetricStat.Period) -} - -func TestMetricsCacheTimeout(t *testing.T) { - cache := &metricCache{ - metrics: make([]filteredMetric, 0), - built: time.Now(), - ttl: time.Minute, - } - - require.True(t, cache.isValid()) - cache.built = time.Now().Add(-time.Minute) - require.False(t, cache.isValid()) -} - -func TestUpdateWindow(t *testing.T) { - duration, err := time.ParseDuration("1m") - require.NoError(t, err) - internalDuration := config.Duration(duration) - - c := &CloudWatch{ - Namespace: "AWS/ELB", - Delay: internalDuration, - Period: internalDuration, - BatchSize: 500, - Log: testutil.Logger{}, - } - - now := time.Now() - - require.True(t, c.windowEnd.IsZero()) - require.True(t, c.windowStart.IsZero()) - - c.updateWindow(now) - - newStartTime := c.windowEnd - - // initial window just has a single period - require.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay))) - require.EqualValues(t, c.windowStart, now.Add(-time.Duration(c.Delay)).Add(-time.Duration(c.Period))) - - now = time.Now() - c.updateWindow(now) - - // subsequent window uses previous end time as start time - require.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay))) - require.EqualValues(t, c.windowStart, newStartTime) -} - -func TestProxyFunction(t *testing.T) { - c := &CloudWatch{ - HTTPProxy: proxy.HTTPProxy{ - HTTPProxyURL: "http://www.penguins.com", - }, - BatchSize: 500, - Log: testutil.Logger{}, - } - - proxyFunction, err := c.HTTPProxy.Proxy() - require.NoError(t, err) - - u, err := url.Parse("https://monitoring.us-west-1.amazonaws.com/") - require.NoError(t, err) - - proxyResult, err := proxyFunction(&http.Request{URL: u}) - require.NoError(t, err) - require.Equal(t, "www.penguins.com", proxyResult.Host) -} - -func TestCombineNamespaces(t *testing.T) { - c := &CloudWatch{ - Namespace: "AWS/ELB", - Namespaces: []string{"AWS/EC2", "AWS/Billing"}, - BatchSize: 500, - Log: testutil.Logger{}, - } - - require.NoError(t, c.Init()) - require.Equal(t, []string{"AWS/EC2", "AWS/Billing", "AWS/ELB"}, c.Namespaces) -} diff --git a/plugins/inputs/cloudwatch/sample.conf b/plugins/inputs/cloudwatch/sample.conf index f0544a59d..242b956ff 100644 --- a/plugins/inputs/cloudwatch/sample.conf +++ b/plugins/inputs/cloudwatch/sample.conf @@ -65,13 +65,13 @@ ## Do not enable if "period" or "delay" is longer than 3 hours, as it will ## not return data more than 3 hours old. ## See https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_ListMetrics.html - #recently_active = "PT3H" + # recently_active = "PT3H" ## Configure the TTL for the internal cache of metrics. # cache_ttl = "1h" - ## Metric Statistic Namespaces (required) - namespaces = ["AWS/ELB"] + ## Metric Statistic Namespaces, wildcards are allowed + # namespaces = ["*"] ## Metric Format ## This determines the format of the produces metrics. 'sparse', the default diff --git a/plugins/inputs/cloudwatch/utils.go b/plugins/inputs/cloudwatch/utils.go new file mode 100644 index 000000000..1cef0dd97 --- /dev/null +++ b/plugins/inputs/cloudwatch/utils.go @@ -0,0 +1,53 @@ +package cloudwatch + +import ( + "slices" + "strings" + + "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + "github.com/influxdata/telegraf/internal" +) + +func dimensionsMatch(ref []*dimension, values []types.Dimension) bool { + for _, rd := range ref { + var found bool + for _, vd := range values { + if rd.Name == *vd.Name && (rd.valueMatcher == nil || rd.valueMatcher.Match(*vd.Value)) { + found = true + break + } + } + if !found { + return false + } + } + return true +} + +func metricMatch(cm *cloudwatchMetric, m types.Metric) bool { + if !slices.Contains(cm.MetricNames, *m.MetricName) { + return false + } + return dimensionsMatch(cm.Dimensions, m.Dimensions) +} + +func sanitizeMeasurement(namespace string) string { + namespace = strings.ReplaceAll(namespace, "/", "_") + namespace = snakeCase(namespace) + return "cloudwatch_" + namespace +} + +func snakeCase(s string) string { + s = internal.SnakeCase(s) + s = strings.ReplaceAll(s, " ", "_") + s = strings.ReplaceAll(s, "__", "_") + return s +} + +func ctod(cDimensions []types.Dimension) *map[string]string { + dimensions := make(map[string]string, len(cDimensions)) + for i := range cDimensions { + dimensions[snakeCase(*cDimensions[i].Name)] = *cDimensions[i].Value + } + return &dimensions +}