cleanup cloudwatch plugin (#7928)
This commit is contained in:
parent
df93825112
commit
4d11d76bb1
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||||
"github.com/influxdata/telegraf"
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
internalaws "github.com/influxdata/telegraf/config/aws"
|
internalaws "github.com/influxdata/telegraf/config/aws"
|
||||||
"github.com/influxdata/telegraf/filter"
|
"github.com/influxdata/telegraf/filter"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
|
@ -20,65 +21,63 @@ import (
|
||||||
"github.com/influxdata/telegraf/plugins/inputs"
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
// CloudWatch contains the configuration and cache for the cloudwatch plugin.
|
||||||
// CloudWatch contains the configuration and cache for the cloudwatch plugin.
|
type CloudWatch struct {
|
||||||
CloudWatch struct {
|
Region string `toml:"region"`
|
||||||
Region string `toml:"region"`
|
AccessKey string `toml:"access_key"`
|
||||||
AccessKey string `toml:"access_key"`
|
SecretKey string `toml:"secret_key"`
|
||||||
SecretKey string `toml:"secret_key"`
|
RoleARN string `toml:"role_arn"`
|
||||||
RoleARN string `toml:"role_arn"`
|
Profile string `toml:"profile"`
|
||||||
Profile string `toml:"profile"`
|
CredentialPath string `toml:"shared_credential_file"`
|
||||||
CredentialPath string `toml:"shared_credential_file"`
|
Token string `toml:"token"`
|
||||||
Token string `toml:"token"`
|
EndpointURL string `toml:"endpoint_url"`
|
||||||
EndpointURL string `toml:"endpoint_url"`
|
StatisticExclude []string `toml:"statistic_exclude"`
|
||||||
StatisticExclude []string `toml:"statistic_exclude"`
|
StatisticInclude []string `toml:"statistic_include"`
|
||||||
StatisticInclude []string `toml:"statistic_include"`
|
Timeout config.Duration `toml:"timeout"`
|
||||||
Timeout internal.Duration `toml:"timeout"`
|
|
||||||
|
|
||||||
Period internal.Duration `toml:"period"`
|
Period config.Duration `toml:"period"`
|
||||||
Delay internal.Duration `toml:"delay"`
|
Delay config.Duration `toml:"delay"`
|
||||||
Namespace string `toml:"namespace"`
|
Namespace string `toml:"namespace"`
|
||||||
Metrics []*Metric `toml:"metrics"`
|
Metrics []*Metric `toml:"metrics"`
|
||||||
CacheTTL internal.Duration `toml:"cache_ttl"`
|
CacheTTL config.Duration `toml:"cache_ttl"`
|
||||||
RateLimit int `toml:"ratelimit"`
|
RateLimit int `toml:"ratelimit"`
|
||||||
|
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
client cloudwatchClient
|
client cloudwatchClient
|
||||||
statFilter filter.Filter
|
statFilter filter.Filter
|
||||||
metricCache *metricCache
|
metricCache *metricCache
|
||||||
queryDimensions map[string]*map[string]string
|
queryDimensions map[string]*map[string]string
|
||||||
windowStart time.Time
|
windowStart time.Time
|
||||||
windowEnd time.Time
|
windowEnd time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metric defines a simplified Cloudwatch metric.
|
// Metric defines a simplified Cloudwatch metric.
|
||||||
Metric struct {
|
type Metric struct {
|
||||||
StatisticExclude *[]string `toml:"statistic_exclude"`
|
StatisticExclude *[]string `toml:"statistic_exclude"`
|
||||||
StatisticInclude *[]string `toml:"statistic_include"`
|
StatisticInclude *[]string `toml:"statistic_include"`
|
||||||
MetricNames []string `toml:"names"`
|
MetricNames []string `toml:"names"`
|
||||||
Dimensions []*Dimension `toml:"dimensions"`
|
Dimensions []*Dimension `toml:"dimensions"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dimension defines a simplified Cloudwatch dimension (provides metric filtering).
|
// Dimension defines a simplified Cloudwatch dimension (provides metric filtering).
|
||||||
Dimension struct {
|
type Dimension struct {
|
||||||
Name string `toml:"name"`
|
Name string `toml:"name"`
|
||||||
Value string `toml:"value"`
|
Value string `toml:"value"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// metricCache caches metrics, their filters, and generated queries.
|
// metricCache caches metrics, their filters, and generated queries.
|
||||||
metricCache struct {
|
type metricCache struct {
|
||||||
ttl time.Duration
|
ttl time.Duration
|
||||||
built time.Time
|
built time.Time
|
||||||
metrics []filteredMetric
|
metrics []filteredMetric
|
||||||
queries []*cloudwatch.MetricDataQuery
|
queries []*cloudwatch.MetricDataQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
cloudwatchClient interface {
|
type cloudwatchClient interface {
|
||||||
ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error)
|
ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error)
|
||||||
GetMetricData(*cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error)
|
GetMetricData(*cloudwatch.GetMetricDataInput) (*cloudwatch.GetMetricDataOutput, error)
|
||||||
}
|
}
|
||||||
)
|
|
||||||
|
|
||||||
// SampleConfig returns the default configuration of the Cloudwatch input plugin.
|
// SampleConfig returns the default configuration of the Cloudwatch input plugin.
|
||||||
func (c *CloudWatch) SampleConfig() string {
|
func (c *CloudWatch) SampleConfig() string {
|
||||||
|
|
@ -270,7 +269,7 @@ func (c *CloudWatch) initializeCloudWatch() {
|
||||||
TLSHandshakeTimeout: 10 * time.Second,
|
TLSHandshakeTimeout: 10 * time.Second,
|
||||||
ExpectContinueTimeout: 1 * time.Second,
|
ExpectContinueTimeout: 1 * time.Second,
|
||||||
},
|
},
|
||||||
Timeout: c.Timeout.Duration,
|
Timeout: time.Duration(c.Timeout),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -359,7 +358,7 @@ func getFilteredMetrics(c *CloudWatch) ([]filteredMetric, error) {
|
||||||
c.metricCache = &metricCache{
|
c.metricCache = &metricCache{
|
||||||
metrics: fMetrics,
|
metrics: fMetrics,
|
||||||
built: time.Now(),
|
built: time.Now(),
|
||||||
ttl: c.CacheTTL.Duration,
|
ttl: time.Duration(c.CacheTTL),
|
||||||
}
|
}
|
||||||
|
|
||||||
return fMetrics, nil
|
return fMetrics, nil
|
||||||
|
|
@ -395,11 +394,11 @@ func (c *CloudWatch) fetchNamespaceMetrics() ([]*cloudwatch.Metric, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *CloudWatch) updateWindow(relativeTo time.Time) {
|
func (c *CloudWatch) updateWindow(relativeTo time.Time) {
|
||||||
windowEnd := relativeTo.Add(-c.Delay.Duration)
|
windowEnd := relativeTo.Add(-time.Duration(c.Delay))
|
||||||
|
|
||||||
if c.windowEnd.IsZero() {
|
if c.windowEnd.IsZero() {
|
||||||
// this is the first run, no window info, so just get a single period
|
// this is the first run, no window info, so just get a single period
|
||||||
c.windowStart = windowEnd.Add(-c.Period.Duration)
|
c.windowStart = windowEnd.Add(-time.Duration(c.Period))
|
||||||
} else {
|
} else {
|
||||||
// subsequent window, start where last window left off
|
// subsequent window, start where last window left off
|
||||||
c.windowStart = c.windowEnd
|
c.windowStart = c.windowEnd
|
||||||
|
|
@ -428,7 +427,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
|
||||||
Label: aws.String(snakeCase(*metric.MetricName + "_average")),
|
Label: aws.String(snakeCase(*metric.MetricName + "_average")),
|
||||||
MetricStat: &cloudwatch.MetricStat{
|
MetricStat: &cloudwatch.MetricStat{
|
||||||
Metric: metric,
|
Metric: metric,
|
||||||
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
|
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
|
||||||
Stat: aws.String(cloudwatch.StatisticAverage),
|
Stat: aws.String(cloudwatch.StatisticAverage),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
@ -440,7 +439,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
|
||||||
Label: aws.String(snakeCase(*metric.MetricName + "_maximum")),
|
Label: aws.String(snakeCase(*metric.MetricName + "_maximum")),
|
||||||
MetricStat: &cloudwatch.MetricStat{
|
MetricStat: &cloudwatch.MetricStat{
|
||||||
Metric: metric,
|
Metric: metric,
|
||||||
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
|
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
|
||||||
Stat: aws.String(cloudwatch.StatisticMaximum),
|
Stat: aws.String(cloudwatch.StatisticMaximum),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
@ -452,7 +451,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
|
||||||
Label: aws.String(snakeCase(*metric.MetricName + "_minimum")),
|
Label: aws.String(snakeCase(*metric.MetricName + "_minimum")),
|
||||||
MetricStat: &cloudwatch.MetricStat{
|
MetricStat: &cloudwatch.MetricStat{
|
||||||
Metric: metric,
|
Metric: metric,
|
||||||
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
|
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
|
||||||
Stat: aws.String(cloudwatch.StatisticMinimum),
|
Stat: aws.String(cloudwatch.StatisticMinimum),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
@ -464,7 +463,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
|
||||||
Label: aws.String(snakeCase(*metric.MetricName + "_sum")),
|
Label: aws.String(snakeCase(*metric.MetricName + "_sum")),
|
||||||
MetricStat: &cloudwatch.MetricStat{
|
MetricStat: &cloudwatch.MetricStat{
|
||||||
Metric: metric,
|
Metric: metric,
|
||||||
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
|
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
|
||||||
Stat: aws.String(cloudwatch.StatisticSum),
|
Stat: aws.String(cloudwatch.StatisticSum),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
@ -476,7 +475,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
|
||||||
Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")),
|
Label: aws.String(snakeCase(*metric.MetricName + "_sample_count")),
|
||||||
MetricStat: &cloudwatch.MetricStat{
|
MetricStat: &cloudwatch.MetricStat{
|
||||||
Metric: metric,
|
Metric: metric,
|
||||||
Period: aws.Int64(int64(c.Period.Duration.Seconds())),
|
Period: aws.Int64(int64(time.Duration(c.Period).Seconds())),
|
||||||
Stat: aws.String(cloudwatch.StatisticSampleCount),
|
Stat: aws.String(cloudwatch.StatisticSampleCount),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
@ -493,7 +492,7 @@ func (c *CloudWatch) getDataQueries(filteredMetrics []filteredMetric) ([]*cloudw
|
||||||
c.metricCache = &metricCache{
|
c.metricCache = &metricCache{
|
||||||
queries: dataQueries,
|
queries: dataQueries,
|
||||||
built: time.Now(),
|
built: time.Now(),
|
||||||
ttl: c.CacheTTL.Duration,
|
ttl: time.Duration(c.CacheTTL),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
c.metricCache.queries = dataQueries
|
c.metricCache.queries = dataQueries
|
||||||
|
|
@ -555,14 +554,19 @@ func (c *CloudWatch) aggregateMetrics(
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("cloudwatch", func() telegraf.Input {
|
inputs.Add("cloudwatch", func() telegraf.Input {
|
||||||
return &CloudWatch{
|
return New()
|
||||||
CacheTTL: internal.Duration{Duration: time.Hour},
|
|
||||||
RateLimit: 25,
|
|
||||||
Timeout: internal.Duration{Duration: time.Second * 5},
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// New instance of the cloudwatch plugin
|
||||||
|
func New() *CloudWatch {
|
||||||
|
return &CloudWatch{
|
||||||
|
CacheTTL: config.Duration(time.Hour),
|
||||||
|
RateLimit: 25,
|
||||||
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func sanitizeMeasurement(namespace string) string {
|
func sanitizeMeasurement(namespace string) string {
|
||||||
namespace = strings.Replace(namespace, "/", "_", -1)
|
namespace = strings.Replace(namespace, "/", "_", -1)
|
||||||
namespace = snakeCase(namespace)
|
namespace = snakeCase(namespace)
|
||||||
|
|
|
||||||
|
|
@ -135,14 +135,14 @@ func (w *Wavefront) Connect() error {
|
||||||
}
|
}
|
||||||
w.sender = sender
|
w.sender = sender
|
||||||
} else {
|
} else {
|
||||||
w.Log.Debug("connecting over tcp using Host: %s and Port: %d", w.Host, w.Port)
|
w.Log.Debugf("connecting over tcp using Host: %q and Port: %d", w.Host, w.Port)
|
||||||
sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{
|
sender, err := wavefront.NewProxySender(&wavefront.ProxyConfiguration{
|
||||||
Host: w.Host,
|
Host: w.Host,
|
||||||
MetricsPort: w.Port,
|
MetricsPort: w.Port,
|
||||||
FlushIntervalSeconds: 5,
|
FlushIntervalSeconds: 5,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %s and Port: %d", w.Host, w.Port)
|
return fmt.Errorf("Wavefront: Could not create Wavefront Sender for Host: %q and Port: %d", w.Host, w.Port)
|
||||||
}
|
}
|
||||||
w.sender = sender
|
w.sender = sender
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ func TestSimpleReverseLookup(t *testing.T) {
|
||||||
}, now)
|
}, now)
|
||||||
|
|
||||||
dns := newReverseDNS()
|
dns := newReverseDNS()
|
||||||
|
dns.Log = &testutil.Logger{}
|
||||||
dns.Lookups = []lookupEntry{
|
dns.Lookups = []lookupEntry{
|
||||||
{
|
{
|
||||||
Field: "source_ip",
|
Field: "source_ip",
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue