feat(processors.aws_ec2): Add caching of imds and ec2 tags (#13075)

This commit is contained in:
Tim Rupp 2023-04-14 12:57:04 -07:00 committed by GitHub
parent ba16eeb495
commit 2fed77e02a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 215 additions and 32 deletions

View File

@ -64,6 +64,25 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## at the same time.
## It's probably best to keep this number fairly low.
max_parallel_calls = 10
## cache_ttl determines how long each cached item will remain in the cache before
## it is removed and subsequently needs to be queried for from the AWS API. By
## default, no items are cached.
# cache_ttl = "0s"
## tag_cache_size determines how many of the values which are found in imds_tags
## or ec2_tags will be kept in memory for faster lookup on successive processing
## of metrics. You may want to adjust this if you have excessively large numbers
## of tags on your EC2 instances, and you are using the ec2_tags field. This
## typically does not need to be changed when using the imds_tags field.
# tag_cache_size = 1000
## log_cache_stats will emit a log line periodically to stdout with details of
## cache entries, hits, misses, and evacuations since the last time stats were
## emitted. This can be helpful in determining whether caching is being effective
## in your environment. Stats are emitted every 30 seconds. By default, this
## setting is disabled.
# log_cache_stats = false
```
## Example
@ -79,3 +98,17 @@ Append `accountId` and `instanceId` to metrics tags:
- cpu,hostname=localhost time_idle=42
+ cpu,hostname=localhost,accountId=123456789,instanceId=i-123456789123 time_idle=42
```
## Notes
We use a single cache because telegraf's `AddTag` function models this.
A user can specify a list of both EC2 tags and IMDS tags. The items in this list
can, technically, be the same. This will result in a situation where the EC2
Tag's value will override the IMDS tags value.
Though this is undesirable, it is unavoidable because the `AddTag` function does
not support this case.
You should avoid using IMDS tags as EC2 tags because the EC2 tags will always
"win" due to them being processed in this plugin *after* IMDS tags.

View File

@ -15,6 +15,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/smithy-go"
"github.com/coocood/freecache"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
@ -29,21 +30,30 @@ type AwsEc2Processor struct {
ImdsTags []string `toml:"imds_tags"`
EC2Tags []string `toml:"ec2_tags"`
Timeout config.Duration `toml:"timeout"`
CacheTTL config.Duration `toml:"cache_ttl"`
Ordered bool `toml:"ordered"`
MaxParallelCalls int `toml:"max_parallel_calls"`
Log telegraf.Logger `toml:"-"`
TagCacheSize int `toml:"tag_cache_size"`
LogCacheStats bool `toml:"log_cache_stats"`
imdsClient *imds.Client
imdsTagsMap map[string]struct{}
ec2Client *ec2.Client
parallel parallel.Parallel
instanceID string
tagCache *freecache.Cache
imdsClient *imds.Client
imdsTagsMap map[string]struct{}
ec2Client *ec2.Client
parallel parallel.Parallel
instanceID string
cancelCleanupWorker context.CancelFunc
}
const (
DefaultMaxOrderedQueueSize = 10_000
DefaultMaxParallelCalls = 10
DefaultTimeout = 10 * time.Second
DefaultCacheTTL = 0 * time.Hour
DefaultCacheSize = 1000
DefaultLogCacheStats = false
)
var allowedImdsTags = map[string]struct{}{
@ -71,6 +81,29 @@ func (r *AwsEc2Processor) Add(metric telegraf.Metric, _ telegraf.Accumulator) er
return nil
}
func (r *AwsEc2Processor) logCacheStatistics(ctx context.Context) {
if r.tagCache == nil {
return
}
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.Log.Debugf("cache: size=%d hit=%d miss=%d full=%d\n",
r.tagCache.EntryCount(),
r.tagCache.HitCount(),
r.tagCache.MissCount(),
r.tagCache.EvacuateCount(),
)
r.tagCache.ResetStatistics()
}
}
}
func (r *AwsEc2Processor) Init() error {
r.Log.Debug("Initializing AWS EC2 Processor")
if len(r.EC2Tags) == 0 && len(r.ImdsTags) == 0 {
@ -91,6 +124,18 @@ func (r *AwsEc2Processor) Init() error {
}
func (r *AwsEc2Processor) Start(acc telegraf.Accumulator) error {
r.tagCache = freecache.NewCache(r.TagCacheSize)
if r.LogCacheStats {
ctx, cancel := context.WithCancel(context.Background())
r.cancelCleanupWorker = cancel
go r.logCacheStatistics(ctx)
}
r.Log.Debugf("cache: size=%d\n", r.TagCacheSize)
if r.CacheTTL > 0 {
r.Log.Debugf("cache timeout: seconds=%d\n", int(time.Duration(r.CacheTTL).Seconds()))
}
ctx := context.Background()
cfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
@ -114,7 +159,7 @@ func (r *AwsEc2Processor) Start(acc telegraf.Accumulator) error {
r.ec2Client = ec2.NewFromConfig(cfg)
// Chceck if instance is allowed to call DescribeTags.
// Check if instance is allowed to call DescribeTags.
_, err = r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{
DryRun: aws.Bool(true),
})
@ -141,47 +186,105 @@ func (r *AwsEc2Processor) Stop() {
if r.parallel != nil {
r.parallel.Stop()
}
r.cancelCleanupWorker()
}
func (r *AwsEc2Processor) asyncAdd(metric telegraf.Metric) []telegraf.Metric {
func (r *AwsEc2Processor) LookupIMDSTags(metric telegraf.Metric) telegraf.Metric {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout))
defer cancel()
// Add IMDS Instance Identity Document tags.
if len(r.imdsTagsMap) > 0 {
iido, err := r.imdsClient.GetInstanceIdentityDocument(
ctx,
&imds.GetInstanceIdentityDocumentInput{},
)
if err != nil {
r.Log.Errorf("Error when calling GetInstanceIdentityDocument: %v", err)
return []telegraf.Metric{metric}
}
var tagsNotFound []string
for tag := range r.imdsTagsMap {
if v := getTagFromInstanceIdentityDocument(iido, tag); v != "" {
metric.AddTag(tag, v)
for tag := range r.imdsTagsMap {
val, err := r.tagCache.Get([]byte(tag))
if err != nil {
tagsNotFound = append(tagsNotFound, tag)
} else {
metric.AddTag(tag, string(val))
}
}
if len(tagsNotFound) == 0 {
return metric
}
iido, err := r.imdsClient.GetInstanceIdentityDocument(
ctx,
&imds.GetInstanceIdentityDocumentInput{},
)
if err != nil {
r.Log.Errorf("Error when calling GetInstanceIdentityDocument: %v", err)
return metric
}
for _, tag := range tagsNotFound {
if v := getTagFromInstanceIdentityDocument(iido, tag); v != "" {
metric.AddTag(tag, v)
expiration := int(time.Duration(r.CacheTTL).Seconds())
err = r.tagCache.Set([]byte(tag), []byte(v), expiration)
if err != nil {
r.Log.Errorf("Error when setting IMDS tag cache value: %v", err)
}
}
}
// Add EC2 instance tags.
if len(r.EC2Tags) > 0 {
dto, err := r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{
Filters: createFilterFromTags(r.instanceID, r.EC2Tags),
})
if err != nil {
r.Log.Errorf("Error during EC2 DescribeTags: %v", err)
return []telegraf.Metric{metric}
}
return metric
}
for _, tag := range r.EC2Tags {
if v := getTagFromDescribeTags(dto, tag); v != "" {
metric.AddTag(tag, v)
func (r *AwsEc2Processor) LookupEC2Tags(metric telegraf.Metric) telegraf.Metric {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout))
defer cancel()
var tagsNotFound []string
for _, tag := range r.EC2Tags {
val, err := r.tagCache.Get([]byte(tag))
if err != nil {
tagsNotFound = append(tagsNotFound, tag)
} else {
metric.AddTag(tag, string(val))
}
}
if len(tagsNotFound) == 0 {
return metric
}
dto, err := r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{
Filters: createFilterFromTags(r.instanceID, r.EC2Tags),
})
if err != nil {
r.Log.Errorf("Error during EC2 DescribeTags: %v", err)
return metric
}
for _, tag := range r.EC2Tags {
if v := getTagFromDescribeTags(dto, tag); v != "" {
metric.AddTag(tag, v)
expiration := int(time.Duration(r.CacheTTL).Seconds())
err = r.tagCache.Set([]byte(tag), []byte(v), expiration)
if err != nil {
r.Log.Errorf("Error when setting EC2Tags tag cache value: %v", err)
}
}
}
return metric
}
func (r *AwsEc2Processor) asyncAdd(metric telegraf.Metric) []telegraf.Metric {
// Add IMDS Instance Identity Document tags.
if len(r.imdsTagsMap) > 0 {
metric = r.LookupIMDSTags(metric)
}
// Add EC2 instance tags.
if len(r.EC2Tags) > 0 {
metric = r.LookupEC2Tags(metric)
}
return []telegraf.Metric{metric}
}
@ -194,7 +297,9 @@ func init() {
func newAwsEc2Processor() *AwsEc2Processor {
return &AwsEc2Processor{
MaxParallelCalls: DefaultMaxParallelCalls,
TagCacheSize: DefaultCacheSize,
Timeout: config.Duration(DefaultTimeout),
CacheTTL: config.Duration(DefaultCacheTTL),
imdsTagsMap: make(map[string]struct{}),
}
}

View File

@ -2,7 +2,9 @@ package ec2
import (
"testing"
"time"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
@ -30,6 +32,30 @@ func TestBasicStartupWithEC2Tags(t *testing.T) {
require.Len(t, acc.Errors, 0)
}
func TestBasicStartupWithCacheTTL(t *testing.T) {
p := newAwsEc2Processor()
p.Log = &testutil.Logger{}
p.ImdsTags = []string{"accountId", "instanceId"}
p.CacheTTL = config.Duration(12 * time.Hour)
acc := &testutil.Accumulator{}
require.NoError(t, p.Init())
require.Len(t, acc.GetTelegrafMetrics(), 0)
require.Len(t, acc.Errors, 0)
}
func TestBasicStartupWithTagCacheSize(t *testing.T) {
p := newAwsEc2Processor()
p.Log = &testutil.Logger{}
p.ImdsTags = []string{"accountId", "instanceId"}
p.TagCacheSize = 100
acc := &testutil.Accumulator{}
require.NoError(t, p.Init())
require.Len(t, acc.GetTelegrafMetrics(), 0)
require.Len(t, acc.Errors, 0)
}
func TestBasicInitNoTagsReturnAnError(t *testing.T) {
p := newAwsEc2Processor()
p.Log = &testutil.Logger{}

View File

@ -45,3 +45,22 @@
## at the same time.
## It's probably best to keep this number fairly low.
max_parallel_calls = 10
## cache_ttl determines how long each cached item will remain in the cache before
## it is removed and subsequently needs to be queried for from the AWS API. By
## default, no items are cached.
# cache_ttl = "0s"
## tag_cache_size determines how many of the values which are found in imds_tags
## or ec2_tags will be kept in memory for faster lookup on successive processing
## of metrics. You may want to adjust this if you have excessively large numbers
## of tags on your EC2 instances, and you are using the ec2_tags field. This
## typically does not need to be changed when using the imds_tags field.
# tag_cache_size = 1000
## log_cache_stats will emit a log line periodically to stdout with details of
## cache entries, hits, misses, and evacuations since the last time stats were
## emitted. This can be helpful in determining whether caching is being effective
## in your environment. Stats are emitted every 30 seconds. By default, this
## setting is disabled.
# log_cache_stats = false