diff --git a/plugins/processors/aws/ec2/README.md b/plugins/processors/aws/ec2/README.md index 5bb5b8243..69e2e4cad 100644 --- a/plugins/processors/aws/ec2/README.md +++ b/plugins/processors/aws/ec2/README.md @@ -64,6 +64,25 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## at the same time. ## It's probably best to keep this number fairly low. max_parallel_calls = 10 + + ## cache_ttl determines how long each cached item will remain in the cache before + ## it is removed and subsequently needs to be queried for from the AWS API. By + ## default, no items are cached. + # cache_ttl = "0s" + + ## tag_cache_size determines how many of the values which are found in imds_tags + ## or ec2_tags will be kept in memory for faster lookup on successive processing + ## of metrics. You may want to adjust this if you have excessively large numbers + ## of tags on your EC2 instances, and you are using the ec2_tags field. This + ## typically does not need to be changed when using the imds_tags field. + # tag_cache_size = 1000 + + ## log_cache_stats will emit a log line periodically to stdout with details of + ## cache entries, hits, misses, and evacuations since the last time stats were + ## emitted. This can be helpful in determining whether caching is being effective + ## in your environment. Stats are emitted every 30 seconds. By default, this + ## setting is disabled. + # log_cache_stats = false ``` ## Example @@ -79,3 +98,17 @@ Append `accountId` and `instanceId` to metrics tags: - cpu,hostname=localhost time_idle=42 + cpu,hostname=localhost,accountId=123456789,instanceId=i-123456789123 time_idle=42 ``` + +## Notes + +We use a single cache because telegraf's `AddTag` function models this. + +A user can specify a list of both EC2 tags and IMDS tags. The items in this list +can, technically, be the same. This will result in a situation where the EC2 +Tag's value will override the IMDS tags value. + +Though this is undesirable, it is unavoidable because the `AddTag` function does +not support this case. + +You should avoid using IMDS tags as EC2 tags because the EC2 tags will always +"win" due to them being processed in this plugin *after* IMDS tags. diff --git a/plugins/processors/aws/ec2/ec2.go b/plugins/processors/aws/ec2/ec2.go index 480441220..4090d8b52 100644 --- a/plugins/processors/aws/ec2/ec2.go +++ b/plugins/processors/aws/ec2/ec2.go @@ -15,6 +15,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/ec2/types" "github.com/aws/smithy-go" + "github.com/coocood/freecache" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" @@ -29,21 +30,30 @@ type AwsEc2Processor struct { ImdsTags []string `toml:"imds_tags"` EC2Tags []string `toml:"ec2_tags"` Timeout config.Duration `toml:"timeout"` + CacheTTL config.Duration `toml:"cache_ttl"` Ordered bool `toml:"ordered"` MaxParallelCalls int `toml:"max_parallel_calls"` Log telegraf.Logger `toml:"-"` + TagCacheSize int `toml:"tag_cache_size"` + LogCacheStats bool `toml:"log_cache_stats"` - imdsClient *imds.Client - imdsTagsMap map[string]struct{} - ec2Client *ec2.Client - parallel parallel.Parallel - instanceID string + tagCache *freecache.Cache + + imdsClient *imds.Client + imdsTagsMap map[string]struct{} + ec2Client *ec2.Client + parallel parallel.Parallel + instanceID string + cancelCleanupWorker context.CancelFunc } const ( DefaultMaxOrderedQueueSize = 10_000 DefaultMaxParallelCalls = 10 DefaultTimeout = 10 * time.Second + DefaultCacheTTL = 0 * time.Hour + DefaultCacheSize = 1000 + DefaultLogCacheStats = false ) var allowedImdsTags = map[string]struct{}{ @@ -71,6 +81,29 @@ func (r *AwsEc2Processor) Add(metric telegraf.Metric, _ telegraf.Accumulator) er return nil } +func (r *AwsEc2Processor) logCacheStatistics(ctx context.Context) { + if r.tagCache == nil { + return + } + + ticker := time.NewTicker(30 * time.Second) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.Log.Debugf("cache: size=%d hit=%d miss=%d full=%d\n", + r.tagCache.EntryCount(), + r.tagCache.HitCount(), + r.tagCache.MissCount(), + r.tagCache.EvacuateCount(), + ) + r.tagCache.ResetStatistics() + } + } +} + func (r *AwsEc2Processor) Init() error { r.Log.Debug("Initializing AWS EC2 Processor") if len(r.EC2Tags) == 0 && len(r.ImdsTags) == 0 { @@ -91,6 +124,18 @@ func (r *AwsEc2Processor) Init() error { } func (r *AwsEc2Processor) Start(acc telegraf.Accumulator) error { + r.tagCache = freecache.NewCache(r.TagCacheSize) + if r.LogCacheStats { + ctx, cancel := context.WithCancel(context.Background()) + r.cancelCleanupWorker = cancel + go r.logCacheStatistics(ctx) + } + + r.Log.Debugf("cache: size=%d\n", r.TagCacheSize) + if r.CacheTTL > 0 { + r.Log.Debugf("cache timeout: seconds=%d\n", int(time.Duration(r.CacheTTL).Seconds())) + } + ctx := context.Background() cfg, err := awsconfig.LoadDefaultConfig(ctx) if err != nil { @@ -114,7 +159,7 @@ func (r *AwsEc2Processor) Start(acc telegraf.Accumulator) error { r.ec2Client = ec2.NewFromConfig(cfg) - // Chceck if instance is allowed to call DescribeTags. + // Check if instance is allowed to call DescribeTags. _, err = r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{ DryRun: aws.Bool(true), }) @@ -141,47 +186,105 @@ func (r *AwsEc2Processor) Stop() { if r.parallel != nil { r.parallel.Stop() } + r.cancelCleanupWorker() } -func (r *AwsEc2Processor) asyncAdd(metric telegraf.Metric) []telegraf.Metric { +func (r *AwsEc2Processor) LookupIMDSTags(metric telegraf.Metric) telegraf.Metric { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout)) defer cancel() - // Add IMDS Instance Identity Document tags. - if len(r.imdsTagsMap) > 0 { - iido, err := r.imdsClient.GetInstanceIdentityDocument( - ctx, - &imds.GetInstanceIdentityDocumentInput{}, - ) - if err != nil { - r.Log.Errorf("Error when calling GetInstanceIdentityDocument: %v", err) - return []telegraf.Metric{metric} - } + var tagsNotFound []string - for tag := range r.imdsTagsMap { - if v := getTagFromInstanceIdentityDocument(iido, tag); v != "" { - metric.AddTag(tag, v) + for tag := range r.imdsTagsMap { + val, err := r.tagCache.Get([]byte(tag)) + if err != nil { + tagsNotFound = append(tagsNotFound, tag) + } else { + metric.AddTag(tag, string(val)) + } + } + + if len(tagsNotFound) == 0 { + return metric + } + + iido, err := r.imdsClient.GetInstanceIdentityDocument( + ctx, + &imds.GetInstanceIdentityDocumentInput{}, + ) + + if err != nil { + r.Log.Errorf("Error when calling GetInstanceIdentityDocument: %v", err) + return metric + } + + for _, tag := range tagsNotFound { + if v := getTagFromInstanceIdentityDocument(iido, tag); v != "" { + metric.AddTag(tag, v) + expiration := int(time.Duration(r.CacheTTL).Seconds()) + err = r.tagCache.Set([]byte(tag), []byte(v), expiration) + if err != nil { + r.Log.Errorf("Error when setting IMDS tag cache value: %v", err) } } } - // Add EC2 instance tags. - if len(r.EC2Tags) > 0 { - dto, err := r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{ - Filters: createFilterFromTags(r.instanceID, r.EC2Tags), - }) - if err != nil { - r.Log.Errorf("Error during EC2 DescribeTags: %v", err) - return []telegraf.Metric{metric} - } + return metric +} - for _, tag := range r.EC2Tags { - if v := getTagFromDescribeTags(dto, tag); v != "" { - metric.AddTag(tag, v) +func (r *AwsEc2Processor) LookupEC2Tags(metric telegraf.Metric) telegraf.Metric { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout)) + defer cancel() + + var tagsNotFound []string + + for _, tag := range r.EC2Tags { + val, err := r.tagCache.Get([]byte(tag)) + if err != nil { + tagsNotFound = append(tagsNotFound, tag) + } else { + metric.AddTag(tag, string(val)) + } + } + + if len(tagsNotFound) == 0 { + return metric + } + + dto, err := r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{ + Filters: createFilterFromTags(r.instanceID, r.EC2Tags), + }) + + if err != nil { + r.Log.Errorf("Error during EC2 DescribeTags: %v", err) + return metric + } + + for _, tag := range r.EC2Tags { + if v := getTagFromDescribeTags(dto, tag); v != "" { + metric.AddTag(tag, v) + expiration := int(time.Duration(r.CacheTTL).Seconds()) + err = r.tagCache.Set([]byte(tag), []byte(v), expiration) + if err != nil { + r.Log.Errorf("Error when setting EC2Tags tag cache value: %v", err) } } } + return metric +} + +func (r *AwsEc2Processor) asyncAdd(metric telegraf.Metric) []telegraf.Metric { + // Add IMDS Instance Identity Document tags. + if len(r.imdsTagsMap) > 0 { + metric = r.LookupIMDSTags(metric) + } + + // Add EC2 instance tags. + if len(r.EC2Tags) > 0 { + metric = r.LookupEC2Tags(metric) + } + return []telegraf.Metric{metric} } @@ -194,7 +297,9 @@ func init() { func newAwsEc2Processor() *AwsEc2Processor { return &AwsEc2Processor{ MaxParallelCalls: DefaultMaxParallelCalls, + TagCacheSize: DefaultCacheSize, Timeout: config.Duration(DefaultTimeout), + CacheTTL: config.Duration(DefaultCacheTTL), imdsTagsMap: make(map[string]struct{}), } } diff --git a/plugins/processors/aws/ec2/ec2_test.go b/plugins/processors/aws/ec2/ec2_test.go index daaf950d8..03dfa9df1 100644 --- a/plugins/processors/aws/ec2/ec2_test.go +++ b/plugins/processors/aws/ec2/ec2_test.go @@ -2,7 +2,9 @@ package ec2 import ( "testing" + "time" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" ) @@ -30,6 +32,30 @@ func TestBasicStartupWithEC2Tags(t *testing.T) { require.Len(t, acc.Errors, 0) } +func TestBasicStartupWithCacheTTL(t *testing.T) { + p := newAwsEc2Processor() + p.Log = &testutil.Logger{} + p.ImdsTags = []string{"accountId", "instanceId"} + p.CacheTTL = config.Duration(12 * time.Hour) + acc := &testutil.Accumulator{} + require.NoError(t, p.Init()) + + require.Len(t, acc.GetTelegrafMetrics(), 0) + require.Len(t, acc.Errors, 0) +} + +func TestBasicStartupWithTagCacheSize(t *testing.T) { + p := newAwsEc2Processor() + p.Log = &testutil.Logger{} + p.ImdsTags = []string{"accountId", "instanceId"} + p.TagCacheSize = 100 + acc := &testutil.Accumulator{} + require.NoError(t, p.Init()) + + require.Len(t, acc.GetTelegrafMetrics(), 0) + require.Len(t, acc.Errors, 0) +} + func TestBasicInitNoTagsReturnAnError(t *testing.T) { p := newAwsEc2Processor() p.Log = &testutil.Logger{} diff --git a/plugins/processors/aws/ec2/sample.conf b/plugins/processors/aws/ec2/sample.conf index 270ddcbe4..198ab6a84 100644 --- a/plugins/processors/aws/ec2/sample.conf +++ b/plugins/processors/aws/ec2/sample.conf @@ -45,3 +45,22 @@ ## at the same time. ## It's probably best to keep this number fairly low. max_parallel_calls = 10 + + ## cache_ttl determines how long each cached item will remain in the cache before + ## it is removed and subsequently needs to be queried for from the AWS API. By + ## default, no items are cached. + # cache_ttl = "0s" + + ## tag_cache_size determines how many of the values which are found in imds_tags + ## or ec2_tags will be kept in memory for faster lookup on successive processing + ## of metrics. You may want to adjust this if you have excessively large numbers + ## of tags on your EC2 instances, and you are using the ec2_tags field. This + ## typically does not need to be changed when using the imds_tags field. + # tag_cache_size = 1000 + + ## log_cache_stats will emit a log line periodically to stdout with details of + ## cache entries, hits, misses, and evacuations since the last time stats were + ## emitted. This can be helpful in determining whether caching is being effective + ## in your environment. Stats are emitted every 30 seconds. By default, this + ## setting is disabled. + # log_cache_stats = false \ No newline at end of file