fix(inputs.cloudwatch): customizable batch size when querying (#10851)
This commit is contained in:
parent
e7e3926710
commit
196abb74cf
|
|
@ -91,6 +91,12 @@ API endpoint. In the following order the plugin will attempt to authenticate.
|
||||||
## Timeout for http requests made by the cloudwatch client.
|
## Timeout for http requests made by the cloudwatch client.
|
||||||
# timeout = "5s"
|
# timeout = "5s"
|
||||||
|
|
||||||
|
## Batch Size
|
||||||
|
## The size of each batch to send requests to Cloudwatch. 500 is the
|
||||||
|
## suggested largest size. If a request gets to large (413 errors), consider
|
||||||
|
## reducing this amount.
|
||||||
|
# batch_size = 500
|
||||||
|
|
||||||
## Namespace-wide statistic filters. These allow fewer queries to be made to
|
## Namespace-wide statistic filters. These allow fewer queries to be made to
|
||||||
## cloudwatch.
|
## cloudwatch.
|
||||||
# statistic_include = [ "average", "sum", "minimum", "maximum", sample_count" ]
|
# statistic_include = [ "average", "sum", "minimum", "maximum", sample_count" ]
|
||||||
|
|
|
||||||
|
|
@ -55,6 +55,7 @@ type CloudWatch struct {
|
||||||
CacheTTL config.Duration `toml:"cache_ttl"`
|
CacheTTL config.Duration `toml:"cache_ttl"`
|
||||||
RateLimit int `toml:"ratelimit"`
|
RateLimit int `toml:"ratelimit"`
|
||||||
RecentlyActive string `toml:"recently_active"`
|
RecentlyActive string `toml:"recently_active"`
|
||||||
|
BatchSize int `toml:"batch_size"`
|
||||||
|
|
||||||
Log telegraf.Logger `toml:"-"`
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
|
|
@ -146,12 +147,10 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
|
||||||
results := map[string][]types.MetricDataResult{}
|
results := map[string][]types.MetricDataResult{}
|
||||||
|
|
||||||
for namespace, namespacedQueries := range queries {
|
for namespace, namespacedQueries := range queries {
|
||||||
// 500 is the maximum number of metric data queries a `GetMetricData` request can contain.
|
|
||||||
batchSize := 500
|
|
||||||
var batches [][]types.MetricDataQuery
|
var batches [][]types.MetricDataQuery
|
||||||
|
|
||||||
for batchSize < len(namespacedQueries) {
|
for c.BatchSize < len(namespacedQueries) {
|
||||||
namespacedQueries, batches = namespacedQueries[batchSize:], append(batches, namespacedQueries[0:batchSize:batchSize])
|
namespacedQueries, batches = namespacedQueries[c.BatchSize:], append(batches, namespacedQueries[0:c.BatchSize:c.BatchSize])
|
||||||
}
|
}
|
||||||
batches = append(batches, namespacedQueries)
|
batches = append(batches, namespacedQueries)
|
||||||
|
|
||||||
|
|
@ -530,6 +529,7 @@ func New() *CloudWatch {
|
||||||
CacheTTL: config.Duration(time.Hour),
|
CacheTTL: config.Duration(time.Hour),
|
||||||
RateLimit: 25,
|
RateLimit: 25,
|
||||||
Timeout: config.Duration(time.Second * 5),
|
Timeout: config.Duration(time.Second * 5),
|
||||||
|
BatchSize: 500,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -106,6 +106,7 @@ func TestGather(t *testing.T) {
|
||||||
Delay: internalDuration,
|
Delay: internalDuration,
|
||||||
Period: internalDuration,
|
Period: internalDuration,
|
||||||
RateLimit: 200,
|
RateLimit: 200,
|
||||||
|
BatchSize: 500,
|
||||||
}
|
}
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|
@ -137,6 +138,7 @@ func TestGather_MultipleNamespaces(t *testing.T) {
|
||||||
Delay: internalDuration,
|
Delay: internalDuration,
|
||||||
Period: internalDuration,
|
Period: internalDuration,
|
||||||
RateLimit: 200,
|
RateLimit: 200,
|
||||||
|
BatchSize: 500,
|
||||||
}
|
}
|
||||||
|
|
||||||
var acc testutil.Accumulator
|
var acc testutil.Accumulator
|
||||||
|
|
@ -213,6 +215,7 @@ func TestSelectMetrics(t *testing.T) {
|
||||||
Delay: internalDuration,
|
Delay: internalDuration,
|
||||||
Period: internalDuration,
|
Period: internalDuration,
|
||||||
RateLimit: 200,
|
RateLimit: 200,
|
||||||
|
BatchSize: 500,
|
||||||
Metrics: []*Metric{
|
Metrics: []*Metric{
|
||||||
{
|
{
|
||||||
MetricNames: []string{"Latency", "RequestCount"},
|
MetricNames: []string{"Latency", "RequestCount"},
|
||||||
|
|
@ -258,6 +261,7 @@ func TestGenerateStatisticsInputParams(t *testing.T) {
|
||||||
Namespaces: []string{namespace},
|
Namespaces: []string{namespace},
|
||||||
Delay: internalDuration,
|
Delay: internalDuration,
|
||||||
Period: internalDuration,
|
Period: internalDuration,
|
||||||
|
BatchSize: 500,
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, c.initializeCloudWatch())
|
require.NoError(t, c.initializeCloudWatch())
|
||||||
|
|
@ -297,6 +301,7 @@ func TestGenerateStatisticsInputParamsFiltered(t *testing.T) {
|
||||||
Namespaces: []string{namespace},
|
Namespaces: []string{namespace},
|
||||||
Delay: internalDuration,
|
Delay: internalDuration,
|
||||||
Period: internalDuration,
|
Period: internalDuration,
|
||||||
|
BatchSize: 500,
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, c.initializeCloudWatch())
|
require.NoError(t, c.initializeCloudWatch())
|
||||||
|
|
@ -336,6 +341,7 @@ func TestUpdateWindow(t *testing.T) {
|
||||||
Namespace: "AWS/ELB",
|
Namespace: "AWS/ELB",
|
||||||
Delay: internalDuration,
|
Delay: internalDuration,
|
||||||
Period: internalDuration,
|
Period: internalDuration,
|
||||||
|
BatchSize: 500,
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
@ -364,6 +370,7 @@ func TestProxyFunction(t *testing.T) {
|
||||||
HTTPProxy: proxy.HTTPProxy{
|
HTTPProxy: proxy.HTTPProxy{
|
||||||
HTTPProxyURL: "http://www.penguins.com",
|
HTTPProxyURL: "http://www.penguins.com",
|
||||||
},
|
},
|
||||||
|
BatchSize: 500,
|
||||||
}
|
}
|
||||||
|
|
||||||
proxyFunction, err := c.HTTPProxy.Proxy()
|
proxyFunction, err := c.HTTPProxy.Proxy()
|
||||||
|
|
@ -378,7 +385,11 @@ func TestProxyFunction(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCombineNamespaces(t *testing.T) {
|
func TestCombineNamespaces(t *testing.T) {
|
||||||
c := &CloudWatch{Namespace: "AWS/ELB", Namespaces: []string{"AWS/EC2", "AWS/Billing"}}
|
c := &CloudWatch{
|
||||||
|
Namespace: "AWS/ELB",
|
||||||
|
Namespaces: []string{"AWS/EC2", "AWS/Billing"},
|
||||||
|
BatchSize: 500,
|
||||||
|
}
|
||||||
|
|
||||||
require.NoError(t, c.Init())
|
require.NoError(t, c.Init())
|
||||||
require.Equal(t, []string{"AWS/EC2", "AWS/Billing", "AWS/ELB"}, c.Namespaces)
|
require.Equal(t, []string{"AWS/EC2", "AWS/Billing", "AWS/ELB"}, c.Namespaces)
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,12 @@
|
||||||
## Timeout for http requests made by the cloudwatch client.
|
## Timeout for http requests made by the cloudwatch client.
|
||||||
# timeout = "5s"
|
# timeout = "5s"
|
||||||
|
|
||||||
|
## Batch Size
|
||||||
|
## The size of each batch to send requests to Cloudwatch. 500 is the
|
||||||
|
## suggested largest size. If a request gets to large (413 errors), consider
|
||||||
|
## reducing this amount.
|
||||||
|
# batch_size = 500
|
||||||
|
|
||||||
## Namespace-wide statistic filters. These allow fewer queries to be made to
|
## Namespace-wide statistic filters. These allow fewer queries to be made to
|
||||||
## cloudwatch.
|
## cloudwatch.
|
||||||
# statistic_include = [ "average", "sum", "minimum", "maximum", sample_count" ]
|
# statistic_include = [ "average", "sum", "minimum", "maximum", sample_count" ]
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue