feat(inputs.cloudwatch): Allow wildcards for namespaces (#16337)
This commit is contained in:
parent
fc6bf15944
commit
88f24052de
|
|
@ -100,13 +100,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
## Do not enable if "period" or "delay" is longer than 3 hours, as it will
|
## Do not enable if "period" or "delay" is longer than 3 hours, as it will
|
||||||
## not return data more than 3 hours old.
|
## not return data more than 3 hours old.
|
||||||
## See https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_ListMetrics.html
|
## See https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_ListMetrics.html
|
||||||
#recently_active = "PT3H"
|
# recently_active = "PT3H"
|
||||||
|
|
||||||
## Configure the TTL for the internal cache of metrics.
|
## Configure the TTL for the internal cache of metrics.
|
||||||
# cache_ttl = "1h"
|
# cache_ttl = "1h"
|
||||||
|
|
||||||
## Metric Statistic Namespaces (required)
|
## Metric Statistic Namespaces, wildcards are allowed
|
||||||
namespaces = ["AWS/ELB"]
|
# namespaces = ["*"]
|
||||||
|
|
||||||
## Metric Format
|
## Metric Format
|
||||||
## This determines the format of the produces metrics. 'sparse', the default
|
## This determines the format of the produces metrics. 'sparse', the default
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
@ -20,7 +21,6 @@ import (
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/filter"
|
"github.com/influxdata/telegraf/filter"
|
||||||
"github.com/influxdata/telegraf/internal"
|
|
||||||
"github.com/influxdata/telegraf/internal/limiter"
|
"github.com/influxdata/telegraf/internal/limiter"
|
||||||
"github.com/influxdata/telegraf/metric"
|
"github.com/influxdata/telegraf/metric"
|
||||||
common_aws "github.com/influxdata/telegraf/plugins/common/aws"
|
common_aws "github.com/influxdata/telegraf/plugins/common/aws"
|
||||||
|
|
@ -31,7 +31,6 @@ import (
|
||||||
//go:embed sample.conf
|
//go:embed sample.conf
|
||||||
var sampleConfig string
|
var sampleConfig string
|
||||||
|
|
||||||
// CloudWatch contains the configuration and cache for the cloudwatch plugin.
|
|
||||||
type CloudWatch struct {
|
type CloudWatch struct {
|
||||||
StatisticExclude []string `toml:"statistic_exclude"`
|
StatisticExclude []string `toml:"statistic_exclude"`
|
||||||
StatisticInclude []string `toml:"statistic_include"`
|
StatisticInclude []string `toml:"statistic_include"`
|
||||||
|
|
@ -51,33 +50,30 @@ type CloudWatch struct {
|
||||||
IncludeLinkedAccounts bool `toml:"include_linked_accounts"`
|
IncludeLinkedAccounts bool `toml:"include_linked_accounts"`
|
||||||
MetricFormat string `toml:"metric_format"`
|
MetricFormat string `toml:"metric_format"`
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
common_aws.CredentialConfig
|
||||||
|
|
||||||
client cloudwatchClient
|
client cloudwatchClient
|
||||||
|
nsFilter filter.Filter
|
||||||
statFilter filter.Filter
|
statFilter filter.Filter
|
||||||
metricCache *metricCache
|
cache *metricCache
|
||||||
queryDimensions map[string]*map[string]string
|
queryDimensions map[string]*map[string]string
|
||||||
windowStart time.Time
|
windowStart time.Time
|
||||||
windowEnd time.Time
|
windowEnd time.Time
|
||||||
|
|
||||||
common_aws.CredentialConfig
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// cloudwatchMetric defines a simplified Cloudwatch metric.
|
|
||||||
type cloudwatchMetric struct {
|
type cloudwatchMetric struct {
|
||||||
StatisticExclude *[]string `toml:"statistic_exclude"`
|
|
||||||
StatisticInclude *[]string `toml:"statistic_include"`
|
|
||||||
MetricNames []string `toml:"names"`
|
MetricNames []string `toml:"names"`
|
||||||
Dimensions []*dimension `toml:"dimensions"`
|
Dimensions []*dimension `toml:"dimensions"`
|
||||||
|
StatisticExclude *[]string `toml:"statistic_exclude"`
|
||||||
|
StatisticInclude *[]string `toml:"statistic_include"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// dimension defines a simplified Cloudwatch dimension (provides metric filtering).
|
|
||||||
type dimension struct {
|
type dimension struct {
|
||||||
Name string `toml:"name"`
|
Name string `toml:"name"`
|
||||||
Value string `toml:"value"`
|
Value string `toml:"value"`
|
||||||
valueMatcher filter.Filter
|
valueMatcher filter.Filter
|
||||||
}
|
}
|
||||||
|
|
||||||
// metricCache caches metrics, their filters, and generated queries.
|
|
||||||
type metricCache struct {
|
type metricCache struct {
|
||||||
ttl time.Duration
|
ttl time.Duration
|
||||||
built time.Time
|
built time.Time
|
||||||
|
|
@ -85,6 +81,12 @@ type metricCache struct {
|
||||||
queries map[string][]types.MetricDataQuery
|
queries map[string][]types.MetricDataQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type filteredMetric struct {
|
||||||
|
metrics []types.Metric
|
||||||
|
accounts []string
|
||||||
|
statFilter filter.Filter
|
||||||
|
}
|
||||||
|
|
||||||
type cloudwatchClient interface {
|
type cloudwatchClient interface {
|
||||||
ListMetrics(context.Context, *cloudwatch.ListMetricsInput, ...func(*cloudwatch.Options)) (*cloudwatch.ListMetricsOutput, error)
|
ListMetrics(context.Context, *cloudwatch.ListMetricsInput, ...func(*cloudwatch.Options)) (*cloudwatch.ListMetricsOutput, error)
|
||||||
GetMetricData(context.Context, *cloudwatch.GetMetricDataInput, ...func(*cloudwatch.Options)) (*cloudwatch.GetMetricDataOutput, error)
|
GetMetricData(context.Context, *cloudwatch.GetMetricDataInput, ...func(*cloudwatch.Options)) (*cloudwatch.GetMetricDataOutput, error)
|
||||||
|
|
@ -95,10 +97,12 @@ func (*CloudWatch) SampleConfig() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CloudWatch) Init() error {
|
func (c *CloudWatch) Init() error {
|
||||||
|
// For backward compatibility
|
||||||
if len(c.Namespace) != 0 {
|
if len(c.Namespace) != 0 {
|
||||||
c.Namespaces = append(c.Namespaces, c.Namespace)
|
c.Namespaces = append(c.Namespaces, c.Namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check user settings
|
||||||
switch c.MetricFormat {
|
switch c.MetricFormat {
|
||||||
case "":
|
case "":
|
||||||
c.MetricFormat = "sparse"
|
c.MetricFormat = "sparse"
|
||||||
|
|
@ -107,22 +111,69 @@ func (c *CloudWatch) Init() error {
|
||||||
return fmt.Errorf("invalid metric_format: %s", c.MetricFormat)
|
return fmt.Errorf("invalid metric_format: %s", c.MetricFormat)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.initializeCloudWatch()
|
// Setup the cloudwatch client
|
||||||
|
proxyFunc, err := c.HTTPProxy.Proxy()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("creating proxy failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set config level filter (won't change throughout life of plugin).
|
creds, err := c.CredentialConfig.Credentials()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("getting credentials failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.client = cloudwatch.NewFromConfig(creds, func(options *cloudwatch.Options) {
|
||||||
|
if c.CredentialConfig.EndpointURL != "" && c.CredentialConfig.Region != "" {
|
||||||
|
options.BaseEndpoint = &c.CredentialConfig.EndpointURL
|
||||||
|
}
|
||||||
|
|
||||||
|
options.ClientLogMode = 0
|
||||||
|
options.HTTPClient = &http.Client{
|
||||||
|
// use values from DefaultTransport
|
||||||
|
Transport: &http.Transport{
|
||||||
|
Proxy: proxyFunc,
|
||||||
|
DialContext: (&net.Dialer{
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
KeepAlive: 30 * time.Second,
|
||||||
|
DualStack: true,
|
||||||
|
}).DialContext,
|
||||||
|
MaxIdleConns: 100,
|
||||||
|
IdleConnTimeout: 90 * time.Second,
|
||||||
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
|
},
|
||||||
|
Timeout: time.Duration(c.Timeout),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Initialize filter for metric dimensions to include
|
||||||
|
for _, m := range c.Metrics {
|
||||||
|
for _, dimension := range m.Dimensions {
|
||||||
|
matcher, err := filter.NewIncludeExcludeFilter([]string{dimension.Value}, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating dimension filter for dimension %q failed: %w", dimension, err)
|
||||||
|
}
|
||||||
|
dimension.valueMatcher = matcher
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize statistics-type filter
|
||||||
c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude)
|
c.statFilter, err = filter.NewIncludeExcludeFilter(c.StatisticInclude, c.StatisticExclude)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("creating statistics filter failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize namespace filter
|
||||||
|
c.nsFilter, err = filter.Compile(c.Namespaces)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating namespace filter failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
|
func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
|
||||||
filteredMetrics, err := getFilteredMetrics(c)
|
filteredMetrics, err := c.getFilteredMetrics()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -156,7 +207,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
|
||||||
<-lmtr.C
|
<-lmtr.C
|
||||||
go func(n string, inm []types.MetricDataQuery) {
|
go func(n string, inm []types.MetricDataQuery) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
result, err := c.gatherMetrics(c.getDataInputs(inm))
|
result, err := c.gatherMetrics(inm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
acc.AddError(err)
|
acc.AddError(err)
|
||||||
return
|
return
|
||||||
|
|
@ -174,182 +225,107 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CloudWatch) initializeCloudWatch() error {
|
func (c *CloudWatch) getFilteredMetrics() ([]filteredMetric, error) {
|
||||||
proxyFunc, err := c.HTTPProxy.Proxy()
|
if c.cache != nil && c.cache.metrics != nil && time.Since(c.cache.built) < c.cache.ttl {
|
||||||
if err != nil {
|
return c.cache.metrics, nil
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
awsCreds, err := c.CredentialConfig.Credentials()
|
// Get all metrics from cloudwatch for filtering
|
||||||
if err != nil {
|
params := &cloudwatch.ListMetricsInput{
|
||||||
return err
|
IncludeLinkedAccounts: &c.IncludeLinkedAccounts,
|
||||||
|
}
|
||||||
|
if c.RecentlyActive == "PT3H" {
|
||||||
|
params.RecentlyActive = types.RecentlyActivePt3h
|
||||||
}
|
}
|
||||||
|
|
||||||
c.client = cloudwatch.NewFromConfig(awsCreds, func(options *cloudwatch.Options) {
|
// Return the subset of metrics matching the namespace and at one of the
|
||||||
if c.CredentialConfig.EndpointURL != "" && c.CredentialConfig.Region != "" {
|
// metric definitions if any
|
||||||
options.BaseEndpoint = &c.CredentialConfig.EndpointURL
|
var metrics []types.Metric
|
||||||
|
var accounts []string
|
||||||
|
for {
|
||||||
|
resp, err := c.client.ListMetrics(context.Background(), params)
|
||||||
|
if err != nil {
|
||||||
|
c.Log.Errorf("failed to list metrics: %v", err)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
c.Log.Tracef("got %d metrics with %d accounts", len(resp.Metrics), len(resp.OwningAccounts))
|
||||||
options.ClientLogMode = 0
|
for i, m := range resp.Metrics {
|
||||||
options.HTTPClient = &http.Client{
|
if c.Log.Level().Includes(telegraf.Trace) {
|
||||||
// use values from DefaultTransport
|
dims := make([]string, 0, len(m.Dimensions))
|
||||||
Transport: &http.Transport{
|
|
||||||
Proxy: proxyFunc,
|
|
||||||
DialContext: (&net.Dialer{
|
|
||||||
Timeout: 30 * time.Second,
|
|
||||||
KeepAlive: 30 * time.Second,
|
|
||||||
DualStack: true,
|
|
||||||
}).DialContext,
|
|
||||||
MaxIdleConns: 100,
|
|
||||||
IdleConnTimeout: 90 * time.Second,
|
|
||||||
TLSHandshakeTimeout: 10 * time.Second,
|
|
||||||
ExpectContinueTimeout: 1 * time.Second,
|
|
||||||
},
|
|
||||||
Timeout: time.Duration(c.Timeout),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// Initialize regex matchers for each dimension value.
|
|
||||||
for _, m := range c.Metrics {
|
|
||||||
for _, dimension := range m.Dimensions {
|
|
||||||
matcher, err := filter.NewIncludeExcludeFilter([]string{dimension.Value}, nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
dimension.valueMatcher = matcher
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type filteredMetric struct {
|
|
||||||
metrics []types.Metric
|
|
||||||
accounts []string
|
|
||||||
statFilter filter.Filter
|
|
||||||
}
|
|
||||||
|
|
||||||
// getFilteredMetrics returns metrics specified in the config file or metrics listed from Cloudwatch.
|
|
||||||
func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
|
|
||||||
if c.metricCache != nil && c.metricCache.isValid() {
|
|
||||||
return c.metricCache.metrics, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
fMetrics := make([]filteredMetric, 0)
|
|
||||||
|
|
||||||
// check for provided metric filter
|
|
||||||
if c.Metrics != nil {
|
|
||||||
for _, m := range c.Metrics {
|
|
||||||
metrics := make([]types.Metric, 0)
|
|
||||||
var accounts []string
|
|
||||||
if !hasWildcard(m.Dimensions) {
|
|
||||||
dimensions := make([]types.Dimension, 0, len(m.Dimensions))
|
|
||||||
for _, d := range m.Dimensions {
|
for _, d := range m.Dimensions {
|
||||||
dimensions = append(dimensions, types.Dimension{
|
dims = append(dims, *d.Name+"="+*d.Value)
|
||||||
Name: aws.String(d.Name),
|
|
||||||
Value: aws.String(d.Value),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
for _, name := range m.MetricNames {
|
a := "none"
|
||||||
for _, namespace := range c.Namespaces {
|
if len(resp.OwningAccounts) > 0 {
|
||||||
metrics = append(metrics, types.Metric{
|
a = resp.OwningAccounts[i]
|
||||||
Namespace: aws.String(namespace),
|
|
||||||
MetricName: aws.String(name),
|
|
||||||
Dimensions: dimensions,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if c.IncludeLinkedAccounts {
|
|
||||||
_, allAccounts := c.fetchNamespaceMetrics()
|
|
||||||
accounts = append(accounts, allAccounts...)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
allMetrics, allAccounts := c.fetchNamespaceMetrics()
|
|
||||||
|
|
||||||
for _, name := range m.MetricNames {
|
|
||||||
for i, singleMetric := range allMetrics {
|
|
||||||
if isSelected(name, singleMetric, m.Dimensions) {
|
|
||||||
for _, namespace := range c.Namespaces {
|
|
||||||
metrics = append(metrics, types.Metric{
|
|
||||||
Namespace: aws.String(namespace),
|
|
||||||
MetricName: aws.String(name),
|
|
||||||
Dimensions: singleMetric.Dimensions,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if c.IncludeLinkedAccounts {
|
|
||||||
accounts = append(accounts, allAccounts[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
c.Log.Tracef(" metric %3d: %s (%s): %s [%s]\n", i, *m.MetricName, *m.Namespace, strings.Join(dims, ", "), a)
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.StatisticExclude == nil {
|
if c.nsFilter != nil && !c.nsFilter.Match(*m.Namespace) {
|
||||||
m.StatisticExclude = &c.StatisticExclude
|
c.Log.Trace(" -> rejected by namespace")
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
if m.StatisticInclude == nil {
|
|
||||||
m.StatisticInclude = &c.StatisticInclude
|
if len(c.Metrics) > 0 && !slices.ContainsFunc(c.Metrics, func(cm *cloudwatchMetric) bool {
|
||||||
|
return metricMatch(cm, m)
|
||||||
|
}) {
|
||||||
|
c.Log.Trace(" -> rejected by metric mismatch")
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
statFilter, err := filter.NewIncludeExcludeFilter(*m.StatisticInclude, *m.StatisticExclude)
|
c.Log.Trace(" -> keeping metric")
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
metrics = append(metrics, m)
|
||||||
|
if len(resp.OwningAccounts) > 0 {
|
||||||
|
accounts = append(accounts, resp.OwningAccounts[i])
|
||||||
}
|
}
|
||||||
fMetrics = append(fMetrics, filteredMetric{
|
|
||||||
metrics: metrics,
|
|
||||||
statFilter: statFilter,
|
|
||||||
accounts: accounts,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if resp.NextToken == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
params.NextToken = resp.NextToken
|
||||||
|
}
|
||||||
|
|
||||||
|
var filtered []filteredMetric
|
||||||
|
if len(c.Metrics) == 0 {
|
||||||
|
filtered = append(filtered, filteredMetric{
|
||||||
|
metrics: metrics,
|
||||||
|
accounts: accounts,
|
||||||
|
statFilter: c.statFilter,
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
metrics, accounts := c.fetchNamespaceMetrics()
|
for idx, cm := range c.Metrics {
|
||||||
fMetrics = []filteredMetric{
|
var entry filteredMetric
|
||||||
{
|
if cm.StatisticInclude == nil && cm.StatisticExclude == nil {
|
||||||
metrics: metrics,
|
entry.statFilter = c.statFilter
|
||||||
statFilter: c.statFilter,
|
} else {
|
||||||
accounts: accounts,
|
f, err := filter.NewIncludeExcludeFilter(*cm.StatisticInclude, *cm.StatisticExclude)
|
||||||
},
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("creating statistics filter for metric %d failed: %w", idx+1, err)
|
||||||
|
}
|
||||||
|
entry.statFilter = f
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, m := range metrics {
|
||||||
|
if metricMatch(cm, m) {
|
||||||
|
entry.metrics = append(entry.metrics, m)
|
||||||
|
if len(accounts) > 0 {
|
||||||
|
entry.accounts = append(entry.accounts, accounts[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
filtered = append(filtered, entry)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.metricCache = &metricCache{
|
|
||||||
metrics: fMetrics,
|
c.cache = &metricCache{
|
||||||
|
metrics: filtered,
|
||||||
built: time.Now(),
|
built: time.Now(),
|
||||||
ttl: time.Duration(c.CacheTTL),
|
ttl: time.Duration(c.CacheTTL),
|
||||||
}
|
}
|
||||||
return fMetrics, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetchNamespaceMetrics retrieves available metrics for a given CloudWatch namespace.
|
return filtered, nil
|
||||||
func (c *CloudWatch) fetchNamespaceMetrics() ([]types.Metric, []string) {
|
|
||||||
metrics := make([]types.Metric, 0)
|
|
||||||
var accounts []string
|
|
||||||
for _, namespace := range c.Namespaces {
|
|
||||||
params := &cloudwatch.ListMetricsInput{
|
|
||||||
Dimensions: make([]types.DimensionFilter, 0),
|
|
||||||
Namespace: aws.String(namespace),
|
|
||||||
IncludeLinkedAccounts: &c.IncludeLinkedAccounts,
|
|
||||||
}
|
|
||||||
if c.RecentlyActive == "PT3H" {
|
|
||||||
params.RecentlyActive = types.RecentlyActivePt3h
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
resp, err := c.client.ListMetrics(context.Background(), params)
|
|
||||||
if err != nil {
|
|
||||||
c.Log.Errorf("failed to list metrics with namespace %s: %v", namespace, err)
|
|
||||||
// skip problem namespace on error and continue to next namespace
|
|
||||||
break
|
|
||||||
}
|
|
||||||
metrics = append(metrics, resp.Metrics...)
|
|
||||||
accounts = append(accounts, resp.OwningAccounts...)
|
|
||||||
|
|
||||||
if resp.NextToken == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
params.NextToken = resp.NextToken
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return metrics, accounts
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CloudWatch) updateWindow(relativeTo time.Time) {
|
func (c *CloudWatch) updateWindow(relativeTo time.Time) {
|
||||||
|
|
@ -368,8 +344,8 @@ func (c *CloudWatch) updateWindow(relativeTo time.Time) {
|
||||||
|
|
||||||
// getDataQueries gets all of the possible queries so we can maximize the request payload.
|
// getDataQueries gets all of the possible queries so we can maximize the request payload.
|
||||||
func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) map[string][]types.MetricDataQuery {
|
func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) map[string][]types.MetricDataQuery {
|
||||||
if c.metricCache != nil && c.metricCache.queries != nil && c.metricCache.isValid() {
|
if c.cache != nil && c.cache.queries != nil && c.cache.metrics != nil && time.Since(c.cache.built) < c.cache.ttl {
|
||||||
return c.metricCache.queries
|
return c.cache.queries
|
||||||
}
|
}
|
||||||
|
|
||||||
c.queryDimensions = make(map[string]*map[string]string)
|
c.queryDimensions = make(map[string]*map[string]string)
|
||||||
|
|
@ -417,25 +393,27 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) map[string
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.metricCache == nil {
|
if c.cache == nil {
|
||||||
c.metricCache = &metricCache{
|
c.cache = &metricCache{
|
||||||
queries: dataQueries,
|
queries: dataQueries,
|
||||||
built: time.Now(),
|
built: time.Now(),
|
||||||
ttl: time.Duration(c.CacheTTL),
|
ttl: time.Duration(c.CacheTTL),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
c.metricCache.queries = dataQueries
|
c.cache.queries = dataQueries
|
||||||
}
|
}
|
||||||
|
|
||||||
return dataQueries
|
return dataQueries
|
||||||
}
|
}
|
||||||
|
|
||||||
// gatherMetrics gets metric data from Cloudwatch.
|
func (c *CloudWatch) gatherMetrics(queries []types.MetricDataQuery) ([]types.MetricDataResult, error) {
|
||||||
func (c *CloudWatch) gatherMetrics(
|
params := &cloudwatch.GetMetricDataInput{
|
||||||
params *cloudwatch.GetMetricDataInput,
|
StartTime: aws.Time(c.windowStart),
|
||||||
) ([]types.MetricDataResult, error) {
|
EndTime: aws.Time(c.windowEnd),
|
||||||
results := make([]types.MetricDataResult, 0)
|
MetricDataQueries: queries,
|
||||||
|
}
|
||||||
|
|
||||||
|
results := make([]types.MetricDataResult, 0)
|
||||||
for {
|
for {
|
||||||
resp, err := c.client.GetMetricData(context.Background(), params)
|
resp, err := c.client.GetMetricData(context.Background(), params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -489,84 +467,13 @@ func (c *CloudWatch) aggregateMetrics(acc telegraf.Accumulator, metricDataResult
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func sanitizeMeasurement(namespace string) string {
|
|
||||||
namespace = strings.ReplaceAll(namespace, "/", "_")
|
|
||||||
namespace = snakeCase(namespace)
|
|
||||||
return "cloudwatch_" + namespace
|
|
||||||
}
|
|
||||||
|
|
||||||
func snakeCase(s string) string {
|
|
||||||
s = internal.SnakeCase(s)
|
|
||||||
s = strings.ReplaceAll(s, " ", "_")
|
|
||||||
s = strings.ReplaceAll(s, "__", "_")
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// ctod converts cloudwatch dimensions to regular dimensions.
|
|
||||||
func ctod(cDimensions []types.Dimension) *map[string]string {
|
|
||||||
dimensions := make(map[string]string, len(cDimensions))
|
|
||||||
for i := range cDimensions {
|
|
||||||
dimensions[snakeCase(*cDimensions[i].Name)] = *cDimensions[i].Value
|
|
||||||
}
|
|
||||||
return &dimensions
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CloudWatch) getDataInputs(dataQueries []types.MetricDataQuery) *cloudwatch.GetMetricDataInput {
|
|
||||||
return &cloudwatch.GetMetricDataInput{
|
|
||||||
StartTime: aws.Time(c.windowStart),
|
|
||||||
EndTime: aws.Time(c.windowEnd),
|
|
||||||
MetricDataQueries: dataQueries,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// isValid checks the validity of the metric cache.
|
|
||||||
func (f *metricCache) isValid() bool {
|
|
||||||
return f.metrics != nil && time.Since(f.built) < f.ttl
|
|
||||||
}
|
|
||||||
|
|
||||||
func hasWildcard(dimensions []*dimension) bool {
|
|
||||||
for _, d := range dimensions {
|
|
||||||
if d.Value == "" || strings.ContainsAny(d.Value, "*?[") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func isSelected(name string, cloudwatchMetric types.Metric, dimensions []*dimension) bool {
|
|
||||||
if name != *cloudwatchMetric.MetricName {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if len(cloudwatchMetric.Dimensions) != len(dimensions) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
for _, d := range dimensions {
|
|
||||||
selected := false
|
|
||||||
for _, d2 := range cloudwatchMetric.Dimensions {
|
|
||||||
if d.Name == *d2.Name {
|
|
||||||
if d.Value == "" || d.valueMatcher.Match(*d2.Value) {
|
|
||||||
selected = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !selected {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func newCloudWatch() *CloudWatch {
|
|
||||||
return &CloudWatch{
|
|
||||||
CacheTTL: config.Duration(time.Hour),
|
|
||||||
RateLimit: 25,
|
|
||||||
Timeout: config.Duration(time.Second * 5),
|
|
||||||
BatchSize: 500,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("cloudwatch", func() telegraf.Input {
|
inputs.Add("cloudwatch", func() telegraf.Input {
|
||||||
return newCloudWatch()
|
return &CloudWatch{
|
||||||
|
CacheTTL: config.Duration(time.Hour),
|
||||||
|
RateLimit: 25,
|
||||||
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
BatchSize: 500,
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,26 +10,482 @@ import (
|
||||||
"github.com/aws/aws-sdk-go-v2/aws"
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/filter"
|
"github.com/influxdata/telegraf/filter"
|
||||||
|
"github.com/influxdata/telegraf/metric"
|
||||||
common_aws "github.com/influxdata/telegraf/plugins/common/aws"
|
common_aws "github.com/influxdata/telegraf/plugins/common/aws"
|
||||||
"github.com/influxdata/telegraf/plugins/common/proxy"
|
"github.com/influxdata/telegraf/plugins/common/proxy"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockGatherCloudWatchClient struct{}
|
func TestSnakeCase(t *testing.T) {
|
||||||
|
require.Equal(t, "cluster_name", snakeCase("Cluster Name"))
|
||||||
|
require.Equal(t, "broker_id", snakeCase("Broker ID"))
|
||||||
|
}
|
||||||
|
|
||||||
func (*mockGatherCloudWatchClient) ListMetrics(
|
func TestGather(t *testing.T) {
|
||||||
_ context.Context,
|
plugin := &CloudWatch{
|
||||||
params *cloudwatch.ListMetricsInput,
|
CredentialConfig: common_aws.CredentialConfig{
|
||||||
_ ...func(*cloudwatch.Options),
|
Region: "us-east-1",
|
||||||
) (*cloudwatch.ListMetricsOutput, error) {
|
},
|
||||||
response := &cloudwatch.ListMetricsOutput{
|
Namespace: "AWS/ELB",
|
||||||
Metrics: []types.Metric{
|
Delay: config.Duration(1 * time.Minute),
|
||||||
|
Period: config.Duration(1 * time.Minute),
|
||||||
|
RateLimit: 200,
|
||||||
|
BatchSize: 500,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
plugin.client = defaultMockClient("AWS/ELB")
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"cloudwatch_aws_elb",
|
||||||
|
map[string]string{
|
||||||
|
"region": "us-east-1",
|
||||||
|
"load_balancer_name": "p-example1",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"latency_minimum": 0.1,
|
||||||
|
"latency_maximum": 0.3,
|
||||||
|
"latency_average": 0.2,
|
||||||
|
"latency_sum": 123.0,
|
||||||
|
"latency_sample_count": 100.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cloudwatch_aws_elb",
|
||||||
|
map[string]string{
|
||||||
|
"region": "us-east-1",
|
||||||
|
"load_balancer_name": "p-example2",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"latency_minimum": 0.1,
|
||||||
|
"latency_maximum": 0.3,
|
||||||
|
"latency_average": 0.2,
|
||||||
|
"latency_sum": 124.0,
|
||||||
|
"latency_sample_count": 100.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGatherDenseMetric(t *testing.T) {
|
||||||
|
plugin := &CloudWatch{
|
||||||
|
CredentialConfig: common_aws.CredentialConfig{
|
||||||
|
Region: "us-east-1",
|
||||||
|
},
|
||||||
|
Namespace: "AWS/ELB",
|
||||||
|
Delay: config.Duration(1 * time.Minute),
|
||||||
|
Period: config.Duration(1 * time.Minute),
|
||||||
|
RateLimit: 200,
|
||||||
|
BatchSize: 500,
|
||||||
|
MetricFormat: "dense",
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
plugin.client = defaultMockClient("AWS/ELB")
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"cloudwatch_aws_elb",
|
||||||
|
map[string]string{
|
||||||
|
"region": "us-east-1",
|
||||||
|
"load_balancer_name": "p-example1",
|
||||||
|
"metric_name": "latency",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"minimum": 0.1,
|
||||||
|
"maximum": 0.3,
|
||||||
|
"average": 0.2,
|
||||||
|
"sum": 123.0,
|
||||||
|
"sample_count": 100.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cloudwatch_aws_elb",
|
||||||
|
map[string]string{
|
||||||
|
"region": "us-east-1",
|
||||||
|
"load_balancer_name": "p-example2",
|
||||||
|
"metric_name": "latency",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"minimum": 0.1,
|
||||||
|
"maximum": 0.3,
|
||||||
|
"average": 0.2,
|
||||||
|
"sum": 124.0,
|
||||||
|
"sample_count": 100.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultiAccountGather(t *testing.T) {
|
||||||
|
plugin := &CloudWatch{
|
||||||
|
CredentialConfig: common_aws.CredentialConfig{
|
||||||
|
Region: "us-east-1",
|
||||||
|
},
|
||||||
|
Namespace: "AWS/ELB",
|
||||||
|
Delay: config.Duration(1 * time.Minute),
|
||||||
|
Period: config.Duration(1 * time.Minute),
|
||||||
|
RateLimit: 200,
|
||||||
|
BatchSize: 500,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
IncludeLinkedAccounts: true,
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
plugin.client = defaultMockClient("AWS/ELB")
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"cloudwatch_aws_elb",
|
||||||
|
map[string]string{
|
||||||
|
"region": "us-east-1",
|
||||||
|
"load_balancer_name": "p-example1",
|
||||||
|
"account": "123456789012",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"latency_minimum": 0.1,
|
||||||
|
"latency_maximum": 0.3,
|
||||||
|
"latency_average": 0.2,
|
||||||
|
"latency_sum": 123.0,
|
||||||
|
"latency_sample_count": 100.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cloudwatch_aws_elb",
|
||||||
|
map[string]string{
|
||||||
|
"region": "us-east-1",
|
||||||
|
"load_balancer_name": "p-example2",
|
||||||
|
"account": "923456789017",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"latency_minimum": 0.1,
|
||||||
|
"latency_maximum": 0.3,
|
||||||
|
"latency_average": 0.2,
|
||||||
|
"latency_sum": 124.0,
|
||||||
|
"latency_sample_count": 100.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGatherMultipleNamespaces(t *testing.T) {
|
||||||
|
plugin := &CloudWatch{
|
||||||
|
CredentialConfig: common_aws.CredentialConfig{
|
||||||
|
Region: "us-east-1",
|
||||||
|
},
|
||||||
|
Namespaces: []string{"AWS/ELB", "AWS/EC2"},
|
||||||
|
Delay: config.Duration(1 * time.Minute),
|
||||||
|
Period: config.Duration(1 * time.Minute),
|
||||||
|
RateLimit: 200,
|
||||||
|
BatchSize: 500,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
plugin.client = defaultMockClient("AWS/ELB", "AWS/EC2")
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, acc.GatherError(plugin.Gather))
|
||||||
|
|
||||||
|
expected := []telegraf.Metric{
|
||||||
|
metric.New(
|
||||||
|
"cloudwatch_aws_elb",
|
||||||
|
map[string]string{
|
||||||
|
"region": "us-east-1",
|
||||||
|
"load_balancer_name": "p-example1",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"latency_minimum": 0.1,
|
||||||
|
"latency_maximum": 0.3,
|
||||||
|
"latency_average": 0.2,
|
||||||
|
"latency_sum": 123.0,
|
||||||
|
"latency_sample_count": 100.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cloudwatch_aws_elb",
|
||||||
|
map[string]string{
|
||||||
|
"region": "us-east-1",
|
||||||
|
"load_balancer_name": "p-example2",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"latency_minimum": 0.1,
|
||||||
|
"latency_maximum": 0.3,
|
||||||
|
"latency_average": 0.2,
|
||||||
|
"latency_sum": 124.0,
|
||||||
|
"latency_sample_count": 100.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cloudwatch_aws_ec2",
|
||||||
|
map[string]string{
|
||||||
|
"region": "us-east-1",
|
||||||
|
"load_balancer_name": "p-example1",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"latency_minimum": 0.1,
|
||||||
|
"latency_maximum": 0.3,
|
||||||
|
"latency_average": 0.2,
|
||||||
|
"latency_sum": 123.0,
|
||||||
|
"latency_sample_count": 100.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
metric.New(
|
||||||
|
"cloudwatch_aws_ec2",
|
||||||
|
map[string]string{
|
||||||
|
"region": "us-east-1",
|
||||||
|
"load_balancer_name": "p-example2",
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"latency_minimum": 0.1,
|
||||||
|
"latency_maximum": 0.3,
|
||||||
|
"latency_average": 0.2,
|
||||||
|
"latency_sum": 124.0,
|
||||||
|
"latency_sample_count": 100.0,
|
||||||
|
},
|
||||||
|
time.Unix(0, 0),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
option := []cmp.Option{
|
||||||
|
testutil.IgnoreTime(),
|
||||||
|
testutil.SortMetrics(),
|
||||||
|
}
|
||||||
|
|
||||||
|
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), option...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSelectMetrics(t *testing.T) {
|
||||||
|
plugin := &CloudWatch{
|
||||||
|
CredentialConfig: common_aws.CredentialConfig{
|
||||||
|
Region: "us-east-1",
|
||||||
|
},
|
||||||
|
Namespace: "AWS/ELB",
|
||||||
|
Delay: config.Duration(1 * time.Minute),
|
||||||
|
Period: config.Duration(1 * time.Minute),
|
||||||
|
RateLimit: 200,
|
||||||
|
BatchSize: 500,
|
||||||
|
Metrics: []*cloudwatchMetric{
|
||||||
{
|
{
|
||||||
Namespace: params.Namespace,
|
MetricNames: []string{"Latency", "RequestCount"},
|
||||||
|
Dimensions: []*dimension{
|
||||||
|
{
|
||||||
|
Name: "LoadBalancerName",
|
||||||
|
Value: "lb*",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "AvailabilityZone",
|
||||||
|
Value: "us-east*",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
plugin.client = selectedMockClient()
|
||||||
|
filtered, err := plugin.getFilteredMetrics()
|
||||||
|
// We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2
|
||||||
|
// AZs. We should get 12 metrics.
|
||||||
|
require.Len(t, filtered[0].metrics, 12)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGenerateStatisticsInputParams(t *testing.T) {
|
||||||
|
d := types.Dimension{
|
||||||
|
Name: aws.String("LoadBalancerName"),
|
||||||
|
Value: aws.String("p-example"),
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace := "AWS/ELB"
|
||||||
|
m := types.Metric{
|
||||||
|
MetricName: aws.String("Latency"),
|
||||||
|
Dimensions: []types.Dimension{d},
|
||||||
|
Namespace: aws.String(namespace),
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin := &CloudWatch{
|
||||||
|
Namespaces: []string{namespace},
|
||||||
|
Delay: config.Duration(1 * time.Minute),
|
||||||
|
Period: config.Duration(1 * time.Minute),
|
||||||
|
BatchSize: 500,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
plugin.updateWindow(now)
|
||||||
|
|
||||||
|
statFilter, err := filter.NewIncludeExcludeFilter(nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
queries := plugin.getDataQueries([]filteredMetric{{metrics: []types.Metric{m}, statFilter: statFilter}})
|
||||||
|
params := &cloudwatch.GetMetricDataInput{
|
||||||
|
StartTime: aws.Time(plugin.windowStart),
|
||||||
|
EndTime: aws.Time(plugin.windowEnd),
|
||||||
|
MetricDataQueries: queries[namespace],
|
||||||
|
}
|
||||||
|
|
||||||
|
require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(plugin.Delay)))
|
||||||
|
require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(plugin.Period)).Add(-time.Duration(plugin.Delay)))
|
||||||
|
require.Len(t, params.MetricDataQueries, 5)
|
||||||
|
require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1)
|
||||||
|
require.EqualValues(t, 60, *params.MetricDataQueries[0].MetricStat.Period)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGenerateStatisticsInputParamsFiltered(t *testing.T) {
|
||||||
|
d := types.Dimension{
|
||||||
|
Name: aws.String("LoadBalancerName"),
|
||||||
|
Value: aws.String("p-example"),
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace := "AWS/ELB"
|
||||||
|
m := types.Metric{
|
||||||
|
MetricName: aws.String("Latency"),
|
||||||
|
Dimensions: []types.Dimension{d},
|
||||||
|
Namespace: aws.String(namespace),
|
||||||
|
}
|
||||||
|
|
||||||
|
plugin := &CloudWatch{
|
||||||
|
Namespaces: []string{namespace},
|
||||||
|
Delay: config.Duration(1 * time.Minute),
|
||||||
|
Period: config.Duration(1 * time.Minute),
|
||||||
|
BatchSize: 500,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
plugin.updateWindow(now)
|
||||||
|
|
||||||
|
statFilter, err := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
queries := plugin.getDataQueries([]filteredMetric{{metrics: []types.Metric{m}, statFilter: statFilter}})
|
||||||
|
params := &cloudwatch.GetMetricDataInput{
|
||||||
|
StartTime: aws.Time(plugin.windowStart),
|
||||||
|
EndTime: aws.Time(plugin.windowEnd),
|
||||||
|
MetricDataQueries: queries[namespace],
|
||||||
|
}
|
||||||
|
|
||||||
|
require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(plugin.Delay)))
|
||||||
|
require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(plugin.Period)).Add(-time.Duration(plugin.Delay)))
|
||||||
|
require.Len(t, params.MetricDataQueries, 2)
|
||||||
|
require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1)
|
||||||
|
require.EqualValues(t, 60, *params.MetricDataQueries[0].MetricStat.Period)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMetricsCacheTimeout(t *testing.T) {
|
||||||
|
cache := &metricCache{
|
||||||
|
metrics: make([]filteredMetric, 0),
|
||||||
|
built: time.Now(),
|
||||||
|
ttl: time.Minute,
|
||||||
|
}
|
||||||
|
|
||||||
|
require.True(t, cache.metrics != nil && time.Since(cache.built) < cache.ttl)
|
||||||
|
cache.built = time.Now().Add(-time.Minute)
|
||||||
|
require.False(t, cache.metrics != nil && time.Since(cache.built) < cache.ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateWindow(t *testing.T) {
|
||||||
|
plugin := &CloudWatch{
|
||||||
|
Namespace: "AWS/ELB",
|
||||||
|
Delay: config.Duration(1 * time.Minute),
|
||||||
|
Period: config.Duration(1 * time.Minute),
|
||||||
|
BatchSize: 500,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
require.True(t, plugin.windowEnd.IsZero())
|
||||||
|
require.True(t, plugin.windowStart.IsZero())
|
||||||
|
|
||||||
|
plugin.updateWindow(now)
|
||||||
|
|
||||||
|
newStartTime := plugin.windowEnd
|
||||||
|
|
||||||
|
// initial window just has a single period
|
||||||
|
require.EqualValues(t, plugin.windowEnd, now.Add(-time.Duration(plugin.Delay)))
|
||||||
|
require.EqualValues(t, plugin.windowStart, now.Add(-time.Duration(plugin.Delay)).Add(-time.Duration(plugin.Period)))
|
||||||
|
|
||||||
|
now = time.Now()
|
||||||
|
plugin.updateWindow(now)
|
||||||
|
|
||||||
|
// subsequent window uses previous end time as start time
|
||||||
|
require.EqualValues(t, plugin.windowEnd, now.Add(-time.Duration(plugin.Delay)))
|
||||||
|
require.EqualValues(t, plugin.windowStart, newStartTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProxyFunction(t *testing.T) {
|
||||||
|
proxyCfg := proxy.HTTPProxy{HTTPProxyURL: "http://www.penguins.com"}
|
||||||
|
|
||||||
|
proxyFunction, err := proxyCfg.Proxy()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
u, err := url.Parse("https://monitoring.us-west-1.amazonaws.com/")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
proxyResult, err := proxyFunction(&http.Request{URL: u})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "www.penguins.com", proxyResult.Host)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCombineNamespaces(t *testing.T) {
|
||||||
|
plugin := &CloudWatch{
|
||||||
|
Namespace: "AWS/ELB",
|
||||||
|
Namespaces: []string{"AWS/EC2", "AWS/Billing"},
|
||||||
|
BatchSize: 500,
|
||||||
|
Log: testutil.Logger{},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
require.Equal(t, []string{"AWS/EC2", "AWS/Billing", "AWS/ELB"}, plugin.Namespaces)
|
||||||
|
}
|
||||||
|
|
||||||
|
// INTERNAL mock client implementation
|
||||||
|
type mockClient struct {
|
||||||
|
metrics []types.Metric
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultMockClient(namespaces ...string) *mockClient {
|
||||||
|
c := &mockClient{
|
||||||
|
metrics: make([]types.Metric, 0, len(namespaces)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, namespace := range namespaces {
|
||||||
|
c.metrics = append(c.metrics,
|
||||||
|
types.Metric{
|
||||||
|
Namespace: aws.String(namespace),
|
||||||
MetricName: aws.String("Latency"),
|
MetricName: aws.String("Latency"),
|
||||||
Dimensions: []types.Dimension{
|
Dimensions: []types.Dimension{
|
||||||
{
|
{
|
||||||
|
|
@ -38,8 +494,8 @@ func (*mockGatherCloudWatchClient) ListMetrics(
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
types.Metric{
|
||||||
Namespace: params.Namespace,
|
Namespace: aws.String(namespace),
|
||||||
MetricName: aws.String("Latency"),
|
MetricName: aws.String("Latency"),
|
||||||
Dimensions: []types.Dimension{
|
Dimensions: []types.Dimension{
|
||||||
{
|
{
|
||||||
|
|
@ -47,16 +503,68 @@ func (*mockGatherCloudWatchClient) ListMetrics(
|
||||||
Value: aws.String("p-example2"),
|
Value: aws.String("p-example2"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
})
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func selectedMockClient() *mockClient {
|
||||||
|
c := &mockClient{
|
||||||
|
metrics: make([]types.Metric, 0, 4*3*2),
|
||||||
|
}
|
||||||
|
// 4 metrics for 3 ELBs in 2 AZs
|
||||||
|
for _, m := range []string{"Latency", "RequestCount", "HealthyHostCount", "UnHealthyHostCount"} {
|
||||||
|
for _, lb := range []string{"lb-1", "lb-2", "lb-3"} {
|
||||||
|
// For each metric/ELB pair, we get an aggregate value across all AZs.
|
||||||
|
c.metrics = append(c.metrics, types.Metric{
|
||||||
|
Namespace: aws.String("AWS/ELB"),
|
||||||
|
MetricName: aws.String(m),
|
||||||
|
Dimensions: []types.Dimension{
|
||||||
|
{
|
||||||
|
Name: aws.String("LoadBalancerName"),
|
||||||
|
Value: aws.String(lb),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
for _, az := range []string{"us-east-1a", "us-east-1b"} {
|
||||||
|
// We get a metric for each metric/ELB/AZ triplet.
|
||||||
|
c.metrics = append(c.metrics, types.Metric{
|
||||||
|
Namespace: aws.String("AWS/ELB"),
|
||||||
|
MetricName: aws.String(m),
|
||||||
|
Dimensions: []types.Dimension{
|
||||||
|
{
|
||||||
|
Name: aws.String("LoadBalancerName"),
|
||||||
|
Value: aws.String(lb),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: aws.String("AvailabilityZone"),
|
||||||
|
Value: aws.String(az),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockClient) ListMetrics(
|
||||||
|
_ context.Context,
|
||||||
|
params *cloudwatch.ListMetricsInput,
|
||||||
|
_ ...func(*cloudwatch.Options),
|
||||||
|
) (*cloudwatch.ListMetricsOutput, error) {
|
||||||
|
response := &cloudwatch.ListMetricsOutput{
|
||||||
|
Metrics: c.metrics,
|
||||||
|
}
|
||||||
|
|
||||||
if params.IncludeLinkedAccounts != nil && *params.IncludeLinkedAccounts {
|
if params.IncludeLinkedAccounts != nil && *params.IncludeLinkedAccounts {
|
||||||
(*response).OwningAccounts = []string{"123456789012", "923456789017"}
|
response.OwningAccounts = []string{"123456789012", "923456789017"}
|
||||||
}
|
}
|
||||||
return response, nil
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*mockGatherCloudWatchClient) GetMetricData(
|
func (*mockClient) GetMetricData(
|
||||||
_ context.Context,
|
_ context.Context,
|
||||||
params *cloudwatch.GetMetricDataInput,
|
params *cloudwatch.GetMetricDataInput,
|
||||||
_ ...func(*cloudwatch.Options),
|
_ ...func(*cloudwatch.Options),
|
||||||
|
|
@ -156,415 +664,3 @@ func (*mockGatherCloudWatchClient) GetMetricData(
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSnakeCase(t *testing.T) {
|
|
||||||
require.Equal(t, "cluster_name", snakeCase("Cluster Name"))
|
|
||||||
require.Equal(t, "broker_id", snakeCase("Broker ID"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGather(t *testing.T) {
|
|
||||||
duration, err := time.ParseDuration("1m")
|
|
||||||
require.NoError(t, err)
|
|
||||||
internalDuration := config.Duration(duration)
|
|
||||||
c := &CloudWatch{
|
|
||||||
CredentialConfig: common_aws.CredentialConfig{
|
|
||||||
Region: "us-east-1",
|
|
||||||
},
|
|
||||||
Namespace: "AWS/ELB",
|
|
||||||
Delay: internalDuration,
|
|
||||||
Period: internalDuration,
|
|
||||||
RateLimit: 200,
|
|
||||||
BatchSize: 500,
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
}
|
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
|
||||||
|
|
||||||
require.NoError(t, c.Init())
|
|
||||||
c.client = &mockGatherCloudWatchClient{}
|
|
||||||
require.NoError(t, acc.GatherError(c.Gather))
|
|
||||||
|
|
||||||
fields := map[string]interface{}{}
|
|
||||||
fields["latency_minimum"] = 0.1
|
|
||||||
fields["latency_maximum"] = 0.3
|
|
||||||
fields["latency_average"] = 0.2
|
|
||||||
fields["latency_sum"] = 123.0
|
|
||||||
fields["latency_sample_count"] = 100.0
|
|
||||||
|
|
||||||
tags := map[string]string{}
|
|
||||||
tags["region"] = "us-east-1"
|
|
||||||
tags["load_balancer_name"] = "p-example1"
|
|
||||||
|
|
||||||
require.True(t, acc.HasMeasurement("cloudwatch_aws_elb"))
|
|
||||||
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGatherDenseMetric(t *testing.T) {
|
|
||||||
duration, err := time.ParseDuration("1m")
|
|
||||||
require.NoError(t, err)
|
|
||||||
internalDuration := config.Duration(duration)
|
|
||||||
c := &CloudWatch{
|
|
||||||
CredentialConfig: common_aws.CredentialConfig{
|
|
||||||
Region: "us-east-1",
|
|
||||||
},
|
|
||||||
Namespace: "AWS/ELB",
|
|
||||||
Delay: internalDuration,
|
|
||||||
Period: internalDuration,
|
|
||||||
RateLimit: 200,
|
|
||||||
BatchSize: 500,
|
|
||||||
MetricFormat: "dense",
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
}
|
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
|
||||||
|
|
||||||
require.NoError(t, c.Init())
|
|
||||||
c.client = &mockGatherCloudWatchClient{}
|
|
||||||
require.NoError(t, acc.GatherError(c.Gather))
|
|
||||||
|
|
||||||
fields := map[string]interface{}{}
|
|
||||||
fields["minimum"] = 0.1
|
|
||||||
fields["maximum"] = 0.3
|
|
||||||
fields["average"] = 0.2
|
|
||||||
fields["sum"] = 123.0
|
|
||||||
fields["sample_count"] = 100.0
|
|
||||||
|
|
||||||
tags := map[string]string{}
|
|
||||||
tags["region"] = "us-east-1"
|
|
||||||
tags["load_balancer_name"] = "p-example1"
|
|
||||||
tags["metric_name"] = "latency"
|
|
||||||
|
|
||||||
require.True(t, acc.HasMeasurement("cloudwatch_aws_elb"))
|
|
||||||
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMultiAccountGather(t *testing.T) {
|
|
||||||
duration, err := time.ParseDuration("1m")
|
|
||||||
require.NoError(t, err)
|
|
||||||
internalDuration := config.Duration(duration)
|
|
||||||
c := &CloudWatch{
|
|
||||||
CredentialConfig: common_aws.CredentialConfig{
|
|
||||||
Region: "us-east-1",
|
|
||||||
},
|
|
||||||
Namespace: "AWS/ELB",
|
|
||||||
Delay: internalDuration,
|
|
||||||
Period: internalDuration,
|
|
||||||
RateLimit: 200,
|
|
||||||
BatchSize: 500,
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
IncludeLinkedAccounts: true,
|
|
||||||
}
|
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
|
||||||
|
|
||||||
require.NoError(t, c.Init())
|
|
||||||
c.client = &mockGatherCloudWatchClient{}
|
|
||||||
require.NoError(t, acc.GatherError(c.Gather))
|
|
||||||
|
|
||||||
fields := map[string]interface{}{}
|
|
||||||
fields["latency_minimum"] = 0.1
|
|
||||||
fields["latency_maximum"] = 0.3
|
|
||||||
fields["latency_average"] = 0.2
|
|
||||||
fields["latency_sum"] = 123.0
|
|
||||||
fields["latency_sample_count"] = 100.0
|
|
||||||
|
|
||||||
tags := map[string]string{}
|
|
||||||
tags["region"] = "us-east-1"
|
|
||||||
tags["load_balancer_name"] = "p-example1"
|
|
||||||
tags["account"] = "123456789012"
|
|
||||||
|
|
||||||
require.True(t, acc.HasMeasurement("cloudwatch_aws_elb"))
|
|
||||||
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
|
|
||||||
|
|
||||||
tags["load_balancer_name"] = "p-example2"
|
|
||||||
tags["account"] = "923456789017"
|
|
||||||
fields["latency_sum"] = 124.0
|
|
||||||
acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGather_MultipleNamespaces(t *testing.T) {
|
|
||||||
duration, err := time.ParseDuration("1m")
|
|
||||||
require.NoError(t, err)
|
|
||||||
internalDuration := config.Duration(duration)
|
|
||||||
c := &CloudWatch{
|
|
||||||
Namespaces: []string{"AWS/ELB", "AWS/EC2"},
|
|
||||||
Delay: internalDuration,
|
|
||||||
Period: internalDuration,
|
|
||||||
RateLimit: 200,
|
|
||||||
BatchSize: 500,
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
}
|
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
|
||||||
|
|
||||||
require.NoError(t, c.Init())
|
|
||||||
c.client = &mockGatherCloudWatchClient{}
|
|
||||||
require.NoError(t, acc.GatherError(c.Gather))
|
|
||||||
|
|
||||||
require.True(t, acc.HasMeasurement("cloudwatch_aws_elb"))
|
|
||||||
require.True(t, acc.HasMeasurement("cloudwatch_aws_ec2"))
|
|
||||||
}
|
|
||||||
|
|
||||||
type mockSelectMetricsCloudWatchClient struct{}
|
|
||||||
|
|
||||||
func (*mockSelectMetricsCloudWatchClient) ListMetrics(
|
|
||||||
context.Context,
|
|
||||||
*cloudwatch.ListMetricsInput,
|
|
||||||
...func(*cloudwatch.Options),
|
|
||||||
) (*cloudwatch.ListMetricsOutput, error) {
|
|
||||||
metrics := make([]types.Metric, 0)
|
|
||||||
// 4 metrics are available
|
|
||||||
metricNames := []string{"Latency", "RequestCount", "HealthyHostCount", "UnHealthyHostCount"}
|
|
||||||
// for 3 ELBs
|
|
||||||
loadBalancers := []string{"lb-1", "lb-2", "lb-3"}
|
|
||||||
// in 2 AZs
|
|
||||||
availabilityZones := []string{"us-east-1a", "us-east-1b"}
|
|
||||||
for _, m := range metricNames {
|
|
||||||
for _, lb := range loadBalancers {
|
|
||||||
// For each metric/ELB pair, we get an aggregate value across all AZs.
|
|
||||||
metrics = append(metrics, types.Metric{
|
|
||||||
Namespace: aws.String("AWS/ELB"),
|
|
||||||
MetricName: aws.String(m),
|
|
||||||
Dimensions: []types.Dimension{
|
|
||||||
{
|
|
||||||
Name: aws.String("LoadBalancerName"),
|
|
||||||
Value: aws.String(lb),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
for _, az := range availabilityZones {
|
|
||||||
// We get a metric for each metric/ELB/AZ triplet.
|
|
||||||
metrics = append(metrics, types.Metric{
|
|
||||||
Namespace: aws.String("AWS/ELB"),
|
|
||||||
MetricName: aws.String(m),
|
|
||||||
Dimensions: []types.Dimension{
|
|
||||||
{
|
|
||||||
Name: aws.String("LoadBalancerName"),
|
|
||||||
Value: aws.String(lb),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: aws.String("AvailabilityZone"),
|
|
||||||
Value: aws.String(az),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
result := &cloudwatch.ListMetricsOutput{
|
|
||||||
Metrics: metrics,
|
|
||||||
}
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*mockSelectMetricsCloudWatchClient) GetMetricData(
|
|
||||||
context.Context,
|
|
||||||
*cloudwatch.GetMetricDataInput,
|
|
||||||
...func(*cloudwatch.Options),
|
|
||||||
) (*cloudwatch.GetMetricDataOutput, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSelectMetrics(t *testing.T) {
|
|
||||||
duration, err := time.ParseDuration("1m")
|
|
||||||
require.NoError(t, err)
|
|
||||||
internalDuration := config.Duration(duration)
|
|
||||||
c := &CloudWatch{
|
|
||||||
CredentialConfig: common_aws.CredentialConfig{
|
|
||||||
Region: "us-east-1",
|
|
||||||
},
|
|
||||||
Namespace: "AWS/ELB",
|
|
||||||
Delay: internalDuration,
|
|
||||||
Period: internalDuration,
|
|
||||||
RateLimit: 200,
|
|
||||||
BatchSize: 500,
|
|
||||||
Metrics: []*cloudwatchMetric{
|
|
||||||
{
|
|
||||||
MetricNames: []string{"Latency", "RequestCount"},
|
|
||||||
Dimensions: []*dimension{
|
|
||||||
{
|
|
||||||
Name: "LoadBalancerName",
|
|
||||||
Value: "lb*",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Name: "AvailabilityZone",
|
|
||||||
Value: "us-east*",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
}
|
|
||||||
require.NoError(t, c.Init())
|
|
||||||
c.client = &mockSelectMetricsCloudWatchClient{}
|
|
||||||
filtered, err := getFilteredMetrics(c)
|
|
||||||
// We've asked for 2 (out of 4) metrics, over all 3 load balancers in all 2
|
|
||||||
// AZs. We should get 12 metrics.
|
|
||||||
require.Len(t, filtered[0].metrics, 12)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGenerateStatisticsInputParams(t *testing.T) {
|
|
||||||
d := types.Dimension{
|
|
||||||
Name: aws.String("LoadBalancerName"),
|
|
||||||
Value: aws.String("p-example"),
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace := "AWS/ELB"
|
|
||||||
m := types.Metric{
|
|
||||||
MetricName: aws.String("Latency"),
|
|
||||||
Dimensions: []types.Dimension{d},
|
|
||||||
Namespace: aws.String(namespace),
|
|
||||||
}
|
|
||||||
|
|
||||||
duration, err := time.ParseDuration("1m")
|
|
||||||
require.NoError(t, err)
|
|
||||||
internalDuration := config.Duration(duration)
|
|
||||||
|
|
||||||
c := &CloudWatch{
|
|
||||||
Namespaces: []string{namespace},
|
|
||||||
Delay: internalDuration,
|
|
||||||
Period: internalDuration,
|
|
||||||
BatchSize: 500,
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
}
|
|
||||||
|
|
||||||
require.NoError(t, c.initializeCloudWatch())
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
c.updateWindow(now)
|
|
||||||
|
|
||||||
statFilter, err := filter.NewIncludeExcludeFilter(nil, nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
queries := c.getDataQueries([]filteredMetric{{metrics: []types.Metric{m}, statFilter: statFilter}})
|
|
||||||
params := c.getDataInputs(queries[namespace])
|
|
||||||
|
|
||||||
require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay)))
|
|
||||||
require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay)))
|
|
||||||
require.Len(t, params.MetricDataQueries, 5)
|
|
||||||
require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1)
|
|
||||||
require.EqualValues(t, 60, *params.MetricDataQueries[0].MetricStat.Period)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGenerateStatisticsInputParamsFiltered(t *testing.T) {
|
|
||||||
d := types.Dimension{
|
|
||||||
Name: aws.String("LoadBalancerName"),
|
|
||||||
Value: aws.String("p-example"),
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace := "AWS/ELB"
|
|
||||||
m := types.Metric{
|
|
||||||
MetricName: aws.String("Latency"),
|
|
||||||
Dimensions: []types.Dimension{d},
|
|
||||||
Namespace: aws.String(namespace),
|
|
||||||
}
|
|
||||||
|
|
||||||
duration, err := time.ParseDuration("1m")
|
|
||||||
require.NoError(t, err)
|
|
||||||
internalDuration := config.Duration(duration)
|
|
||||||
|
|
||||||
c := &CloudWatch{
|
|
||||||
Namespaces: []string{namespace},
|
|
||||||
Delay: internalDuration,
|
|
||||||
Period: internalDuration,
|
|
||||||
BatchSize: 500,
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
}
|
|
||||||
|
|
||||||
require.NoError(t, c.initializeCloudWatch())
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
c.updateWindow(now)
|
|
||||||
|
|
||||||
statFilter, err := filter.NewIncludeExcludeFilter([]string{"average", "sample_count"}, nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
queries := c.getDataQueries([]filteredMetric{{metrics: []types.Metric{m}, statFilter: statFilter}})
|
|
||||||
params := c.getDataInputs(queries[namespace])
|
|
||||||
|
|
||||||
require.EqualValues(t, *params.EndTime, now.Add(-time.Duration(c.Delay)))
|
|
||||||
require.EqualValues(t, *params.StartTime, now.Add(-time.Duration(c.Period)).Add(-time.Duration(c.Delay)))
|
|
||||||
require.Len(t, params.MetricDataQueries, 2)
|
|
||||||
require.Len(t, params.MetricDataQueries[0].MetricStat.Metric.Dimensions, 1)
|
|
||||||
require.EqualValues(t, 60, *params.MetricDataQueries[0].MetricStat.Period)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestMetricsCacheTimeout(t *testing.T) {
|
|
||||||
cache := &metricCache{
|
|
||||||
metrics: make([]filteredMetric, 0),
|
|
||||||
built: time.Now(),
|
|
||||||
ttl: time.Minute,
|
|
||||||
}
|
|
||||||
|
|
||||||
require.True(t, cache.isValid())
|
|
||||||
cache.built = time.Now().Add(-time.Minute)
|
|
||||||
require.False(t, cache.isValid())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestUpdateWindow(t *testing.T) {
|
|
||||||
duration, err := time.ParseDuration("1m")
|
|
||||||
require.NoError(t, err)
|
|
||||||
internalDuration := config.Duration(duration)
|
|
||||||
|
|
||||||
c := &CloudWatch{
|
|
||||||
Namespace: "AWS/ELB",
|
|
||||||
Delay: internalDuration,
|
|
||||||
Period: internalDuration,
|
|
||||||
BatchSize: 500,
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
require.True(t, c.windowEnd.IsZero())
|
|
||||||
require.True(t, c.windowStart.IsZero())
|
|
||||||
|
|
||||||
c.updateWindow(now)
|
|
||||||
|
|
||||||
newStartTime := c.windowEnd
|
|
||||||
|
|
||||||
// initial window just has a single period
|
|
||||||
require.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay)))
|
|
||||||
require.EqualValues(t, c.windowStart, now.Add(-time.Duration(c.Delay)).Add(-time.Duration(c.Period)))
|
|
||||||
|
|
||||||
now = time.Now()
|
|
||||||
c.updateWindow(now)
|
|
||||||
|
|
||||||
// subsequent window uses previous end time as start time
|
|
||||||
require.EqualValues(t, c.windowEnd, now.Add(-time.Duration(c.Delay)))
|
|
||||||
require.EqualValues(t, c.windowStart, newStartTime)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProxyFunction(t *testing.T) {
|
|
||||||
c := &CloudWatch{
|
|
||||||
HTTPProxy: proxy.HTTPProxy{
|
|
||||||
HTTPProxyURL: "http://www.penguins.com",
|
|
||||||
},
|
|
||||||
BatchSize: 500,
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
}
|
|
||||||
|
|
||||||
proxyFunction, err := c.HTTPProxy.Proxy()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
u, err := url.Parse("https://monitoring.us-west-1.amazonaws.com/")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
proxyResult, err := proxyFunction(&http.Request{URL: u})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "www.penguins.com", proxyResult.Host)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCombineNamespaces(t *testing.T) {
|
|
||||||
c := &CloudWatch{
|
|
||||||
Namespace: "AWS/ELB",
|
|
||||||
Namespaces: []string{"AWS/EC2", "AWS/Billing"},
|
|
||||||
BatchSize: 500,
|
|
||||||
Log: testutil.Logger{},
|
|
||||||
}
|
|
||||||
|
|
||||||
require.NoError(t, c.Init())
|
|
||||||
require.Equal(t, []string{"AWS/EC2", "AWS/Billing", "AWS/ELB"}, c.Namespaces)
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -65,13 +65,13 @@
|
||||||
## Do not enable if "period" or "delay" is longer than 3 hours, as it will
|
## Do not enable if "period" or "delay" is longer than 3 hours, as it will
|
||||||
## not return data more than 3 hours old.
|
## not return data more than 3 hours old.
|
||||||
## See https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_ListMetrics.html
|
## See https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_ListMetrics.html
|
||||||
#recently_active = "PT3H"
|
# recently_active = "PT3H"
|
||||||
|
|
||||||
## Configure the TTL for the internal cache of metrics.
|
## Configure the TTL for the internal cache of metrics.
|
||||||
# cache_ttl = "1h"
|
# cache_ttl = "1h"
|
||||||
|
|
||||||
## Metric Statistic Namespaces (required)
|
## Metric Statistic Namespaces, wildcards are allowed
|
||||||
namespaces = ["AWS/ELB"]
|
# namespaces = ["*"]
|
||||||
|
|
||||||
## Metric Format
|
## Metric Format
|
||||||
## This determines the format of the produces metrics. 'sparse', the default
|
## This determines the format of the produces metrics. 'sparse', the default
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,53 @@
|
||||||
|
package cloudwatch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"slices"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
)
|
||||||
|
|
||||||
|
func dimensionsMatch(ref []*dimension, values []types.Dimension) bool {
|
||||||
|
for _, rd := range ref {
|
||||||
|
var found bool
|
||||||
|
for _, vd := range values {
|
||||||
|
if rd.Name == *vd.Name && (rd.valueMatcher == nil || rd.valueMatcher.Match(*vd.Value)) {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func metricMatch(cm *cloudwatchMetric, m types.Metric) bool {
|
||||||
|
if !slices.Contains(cm.MetricNames, *m.MetricName) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return dimensionsMatch(cm.Dimensions, m.Dimensions)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sanitizeMeasurement(namespace string) string {
|
||||||
|
namespace = strings.ReplaceAll(namespace, "/", "_")
|
||||||
|
namespace = snakeCase(namespace)
|
||||||
|
return "cloudwatch_" + namespace
|
||||||
|
}
|
||||||
|
|
||||||
|
func snakeCase(s string) string {
|
||||||
|
s = internal.SnakeCase(s)
|
||||||
|
s = strings.ReplaceAll(s, " ", "_")
|
||||||
|
s = strings.ReplaceAll(s, "__", "_")
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func ctod(cDimensions []types.Dimension) *map[string]string {
|
||||||
|
dimensions := make(map[string]string, len(cDimensions))
|
||||||
|
for i := range cDimensions {
|
||||||
|
dimensions[snakeCase(*cDimensions[i].Name)] = *cDimensions[i].Value
|
||||||
|
}
|
||||||
|
return &dimensions
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue