feat(processors.aws_ec2): Allow to use instance metadata (#15795)

This commit is contained in:
Sven Rebhan 2024-09-05 18:56:39 +02:00 committed by GitHub
parent a0755797f5
commit 895f96f21b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 192 additions and 136 deletions

View File

@ -37,7 +37,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## * ramdiskId ## * ramdiskId
## * region ## * region
## * version ## * version
imds_tags = [] # imds_tags = []
## EC2 instance tags retrieved with DescribeTags action. ## EC2 instance tags retrieved with DescribeTags action.
## In case tag is empty upon retrieval it's omitted when tagging metrics. ## In case tag is empty upon retrieval it's omitted when tagging metrics.
@ -47,10 +47,22 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## ##
## For more information see: ## For more information see:
## https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeTags.html ## https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeTags.html
ec2_tags = [] # ec2_tags = []
## Paths to instance metadata information to attach to the metrics.
## Specify the full path without the base-path e.g. `tags/instance/Name`.
##
## For more information see:
## https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
# metadata_paths = []
## Allows to convert metadata tag-names to canonical names representing the
## full path with slashes ('/') being replaces with underscores. By default,
## only the last path element is used to name the tag.
# canonical_metadata_tags = false
## Timeout for http requests made by against aws ec2 metadata endpoint. ## Timeout for http requests made by against aws ec2 metadata endpoint.
timeout = "10s" # timeout = "10s"
## ordered controls whether or not the metrics need to stay in the same order ## ordered controls whether or not the metrics need to stay in the same order
## this plugin received them in. If false, this plugin will change the order ## this plugin received them in. If false, this plugin will change the order
@ -58,12 +70,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## waiting on slower lookups. This may cause issues for you if you are ## waiting on slower lookups. This may cause issues for you if you are
## depending on the order of metrics staying the same. If so, set this to true. ## depending on the order of metrics staying the same. If so, set this to true.
## Keeping the metrics ordered may be slightly slower. ## Keeping the metrics ordered may be slightly slower.
ordered = false # ordered = false
## max_parallel_calls is the maximum number of AWS API calls to be in flight ## max_parallel_calls is the maximum number of AWS API calls to be in flight
## at the same time. ## at the same time.
## It's probably best to keep this number fairly low. ## It's probably best to keep this number fairly low.
max_parallel_calls = 10 # max_parallel_calls = 10
## cache_ttl determines how long each cached item will remain in the cache before ## cache_ttl determines how long each cached item will remain in the cache before
## it is removed and subsequently needs to be queried for from the AWS API. By ## it is removed and subsequently needs to be queried for from the AWS API. By

View File

@ -6,6 +6,8 @@ import (
_ "embed" _ "embed"
"errors" "errors"
"fmt" "fmt"
"io"
"slices"
"strings" "strings"
"time" "time"
@ -29,18 +31,19 @@ var sampleConfig string
type AwsEc2Processor struct { type AwsEc2Processor struct {
ImdsTags []string `toml:"imds_tags"` ImdsTags []string `toml:"imds_tags"`
EC2Tags []string `toml:"ec2_tags"` EC2Tags []string `toml:"ec2_tags"`
MetadataPaths []string `toml:"metadata_paths"`
CanonicalMetadataTags bool `toml:"canonical_metadata_tags"`
Timeout config.Duration `toml:"timeout"` Timeout config.Duration `toml:"timeout"`
CacheTTL config.Duration `toml:"cache_ttl"` CacheTTL config.Duration `toml:"cache_ttl"`
Ordered bool `toml:"ordered"` Ordered bool `toml:"ordered"`
MaxParallelCalls int `toml:"max_parallel_calls"` MaxParallelCalls int `toml:"max_parallel_calls"`
Log telegraf.Logger `toml:"-"`
TagCacheSize int `toml:"tag_cache_size"` TagCacheSize int `toml:"tag_cache_size"`
LogCacheStats bool `toml:"log_cache_stats"` LogCacheStats bool `toml:"log_cache_stats"`
Log telegraf.Logger `toml:"-"`
tagCache *freecache.Cache tagCache *freecache.Cache
imdsClient *imds.Client imdsClient *imds.Client
imdsTagsMap map[string]struct{}
ec2Client *ec2.Client ec2Client *ec2.Client
parallel parallel.Parallel parallel parallel.Parallel
instanceID string instanceID string
@ -56,20 +59,20 @@ const (
DefaultLogCacheStats = false DefaultLogCacheStats = false
) )
var allowedImdsTags = map[string]struct{}{ var allowedImdsTags = []string{
"accountId": {}, "accountId",
"architecture": {}, "architecture",
"availabilityZone": {}, "availabilityZone",
"billingProducts": {}, "billingProducts",
"imageId": {}, "imageId",
"instanceId": {}, "instanceId",
"instanceType": {}, "instanceType",
"kernelId": {}, "kernelId",
"pendingTime": {}, "pendingTime",
"privateIp": {}, "privateIp",
"ramdiskId": {}, "ramdiskId",
"region": {}, "region",
"version": {}, "version",
} }
func (*AwsEc2Processor) SampleConfig() string { func (*AwsEc2Processor) SampleConfig() string {
@ -81,43 +84,17 @@ func (r *AwsEc2Processor) Add(metric telegraf.Metric, _ telegraf.Accumulator) er
return nil return nil
} }
func (r *AwsEc2Processor) logCacheStatistics(ctx context.Context) {
if r.tagCache == nil {
return
}
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.Log.Debugf("cache: size=%d hit=%d miss=%d full=%d\n",
r.tagCache.EntryCount(),
r.tagCache.HitCount(),
r.tagCache.MissCount(),
r.tagCache.EvacuateCount(),
)
r.tagCache.ResetStatistics()
}
}
}
func (r *AwsEc2Processor) Init() error { func (r *AwsEc2Processor) Init() error {
r.Log.Debug("Initializing AWS EC2 Processor") r.Log.Debug("Initializing AWS EC2 Processor")
if len(r.EC2Tags) == 0 && len(r.ImdsTags) == 0 {
if len(r.ImdsTags) == 0 && len(r.MetadataPaths) == 0 && len(r.EC2Tags) == 0 {
return errors.New("no tags specified in configuration") return errors.New("no tags specified in configuration")
} }
for _, tag := range r.ImdsTags { for _, tag := range r.ImdsTags {
if len(tag) == 0 || !isImdsTagAllowed(tag) { if tag == "" || !slices.Contains(allowedImdsTags, tag) {
return fmt.Errorf("not allowed metadata tag specified in configuration: %s", tag) return fmt.Errorf("invalid imds tag %q", tag)
} }
r.imdsTagsMap[tag] = struct{}{}
}
if len(r.imdsTagsMap) == 0 && len(r.EC2Tags) == 0 {
return errors.New("no allowed metadata tags specified in configuration")
} }
return nil return nil
@ -189,13 +166,36 @@ func (r *AwsEc2Processor) Stop() {
r.cancelCleanupWorker() r.cancelCleanupWorker()
} }
func (r *AwsEc2Processor) LookupIMDSTags(metric telegraf.Metric) telegraf.Metric { func (r *AwsEc2Processor) logCacheStatistics(ctx context.Context) {
if r.tagCache == nil {
return
}
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.Log.Debugf("cache: size=%d hit=%d miss=%d full=%d\n",
r.tagCache.EntryCount(),
r.tagCache.HitCount(),
r.tagCache.MissCount(),
r.tagCache.EvacuateCount(),
)
r.tagCache.ResetStatistics()
}
}
}
func (r *AwsEc2Processor) lookupIMDSTags(metric telegraf.Metric) telegraf.Metric {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout)) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout))
defer cancel() defer cancel()
var tagsNotFound []string var tagsNotFound []string
for tag := range r.imdsTagsMap { for _, tag := range r.ImdsTags {
val, err := r.tagCache.Get([]byte(tag)) val, err := r.tagCache.Get([]byte(tag))
if err != nil { if err != nil {
tagsNotFound = append(tagsNotFound, tag) tagsNotFound = append(tagsNotFound, tag)
@ -208,31 +208,102 @@ func (r *AwsEc2Processor) LookupIMDSTags(metric telegraf.Metric) telegraf.Metric
return metric return metric
} }
iido, err := r.imdsClient.GetInstanceIdentityDocument( doc, err := r.imdsClient.GetInstanceIdentityDocument(ctx, &imds.GetInstanceIdentityDocumentInput{})
ctx,
&imds.GetInstanceIdentityDocumentInput{},
)
if err != nil { if err != nil {
r.Log.Errorf("Error when calling GetInstanceIdentityDocument: %v", err) r.Log.Errorf("Error when calling GetInstanceIdentityDocument: %v", err)
return metric return metric
} }
for _, tag := range tagsNotFound { for _, tag := range tagsNotFound {
if v := getTagFromInstanceIdentityDocument(iido, tag); v != "" { var v string
switch tag {
case "accountId":
v = doc.AccountID
case "architecture":
v = doc.Architecture
case "availabilityZone":
v = doc.AvailabilityZone
case "billingProducts":
v = strings.Join(doc.BillingProducts, ",")
case "imageId":
v = doc.ImageID
case "instanceId":
v = doc.InstanceID
case "instanceType":
v = doc.InstanceType
case "kernelId":
v = doc.KernelID
case "pendingTime":
v = doc.PendingTime.String()
case "privateIp":
v = doc.PrivateIP
case "ramdiskId":
v = doc.RamdiskID
case "region":
v = doc.Region
case "version":
v = doc.Version
default:
continue
}
metric.AddTag(tag, v) metric.AddTag(tag, v)
expiration := int(time.Duration(r.CacheTTL).Seconds()) expiration := int(time.Duration(r.CacheTTL).Seconds())
err = r.tagCache.Set([]byte(tag), []byte(v), expiration) if err := r.tagCache.Set([]byte(tag), []byte(v), expiration); err != nil {
if err != nil {
r.Log.Errorf("Error when setting IMDS tag cache value: %v", err) r.Log.Errorf("Error when setting IMDS tag cache value: %v", err)
} continue
} }
} }
return metric return metric
} }
func (r *AwsEc2Processor) LookupEC2Tags(metric telegraf.Metric) telegraf.Metric { func (r *AwsEc2Processor) lookupMetadata(metric telegraf.Metric) telegraf.Metric {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout))
defer cancel()
for _, path := range r.MetadataPaths {
key := strings.Trim(path, "/ ")
if r.CanonicalMetadataTags {
key = strings.ReplaceAll(key, "/", "_")
} else {
if idx := strings.LastIndex(key, "/"); idx > 0 {
key = key[idx:]
}
}
// Try to lookup the tag in cache
if value, err := r.tagCache.Get([]byte("metadata/" + path)); err == nil {
metric.AddTag(key, string(value))
continue
}
// Query the tag with the full path
resp, err := r.imdsClient.GetMetadata(ctx, &imds.GetMetadataInput{Path: path})
if err != nil {
r.Log.Errorf("Getting metadata %q failed: %v", path, err)
continue
}
value, err := io.ReadAll(resp.Content)
if err != nil {
r.Log.Errorf("Reading metadata reponse for %+v failed: %v", path, err)
continue
}
if len(value) > 0 {
metric.AddTag(key, string(value))
}
expiration := int(time.Duration(r.CacheTTL).Seconds())
if err = r.tagCache.Set([]byte("metadata/"+path), value, expiration); err != nil {
r.Log.Errorf("Updating metadata cache for %q failed: %v", path, err)
continue
}
}
return metric
}
func (r *AwsEc2Processor) lookupEC2Tags(metric telegraf.Metric) telegraf.Metric {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout)) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout))
defer cancel() defer cancel()
@ -252,7 +323,16 @@ func (r *AwsEc2Processor) LookupEC2Tags(metric telegraf.Metric) telegraf.Metric
} }
dto, err := r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{ dto, err := r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{
Filters: createFilterFromTags(r.instanceID, r.EC2Tags), Filters: []types.Filter{
{
Name: aws.String("resource-id"),
Values: []string{r.instanceID},
},
{
Name: aws.String("key"),
Values: r.EC2Tags,
},
},
}) })
if err != nil { if err != nil {
@ -276,13 +356,18 @@ func (r *AwsEc2Processor) LookupEC2Tags(metric telegraf.Metric) telegraf.Metric
func (r *AwsEc2Processor) asyncAdd(metric telegraf.Metric) []telegraf.Metric { func (r *AwsEc2Processor) asyncAdd(metric telegraf.Metric) []telegraf.Metric {
// Add IMDS Instance Identity Document tags. // Add IMDS Instance Identity Document tags.
if len(r.imdsTagsMap) > 0 { if len(r.ImdsTags) > 0 {
metric = r.LookupIMDSTags(metric) metric = r.lookupIMDSTags(metric)
}
// Add instance metadata tags.
if len(r.MetadataPaths) > 0 {
metric = r.lookupMetadata(metric)
} }
// Add EC2 instance tags. // Add EC2 instance tags.
if len(r.EC2Tags) > 0 { if len(r.EC2Tags) > 0 {
metric = r.LookupEC2Tags(metric) metric = r.lookupEC2Tags(metric)
} }
return []telegraf.Metric{metric} return []telegraf.Metric{metric}
@ -300,20 +385,6 @@ func newAwsEc2Processor() *AwsEc2Processor {
TagCacheSize: DefaultCacheSize, TagCacheSize: DefaultCacheSize,
Timeout: config.Duration(DefaultTimeout), Timeout: config.Duration(DefaultTimeout),
CacheTTL: config.Duration(DefaultCacheTTL), CacheTTL: config.Duration(DefaultCacheTTL),
imdsTagsMap: make(map[string]struct{}),
}
}
func createFilterFromTags(instanceID string, tagNames []string) []types.Filter {
return []types.Filter{
{
Name: aws.String("resource-id"),
Values: []string{instanceID},
},
{
Name: aws.String("key"),
Values: tagNames,
},
} }
} }
@ -325,41 +396,3 @@ func getTagFromDescribeTags(o *ec2.DescribeTagsOutput, tag string) string {
} }
return "" return ""
} }
func getTagFromInstanceIdentityDocument(o *imds.GetInstanceIdentityDocumentOutput, tag string) string {
switch tag {
case "accountId":
return o.AccountID
case "architecture":
return o.Architecture
case "availabilityZone":
return o.AvailabilityZone
case "billingProducts":
return strings.Join(o.BillingProducts, ",")
case "imageId":
return o.ImageID
case "instanceId":
return o.InstanceID
case "instanceType":
return o.InstanceType
case "kernelId":
return o.KernelID
case "pendingTime":
return o.PendingTime.String()
case "privateIp":
return o.PrivateIP
case "ramdiskId":
return o.RamdiskID
case "region":
return o.Region
case "version":
return o.Version
default:
return ""
}
}
func isImdsTagAllowed(tag string) bool {
_, ok := allowedImdsTags[tag]
return ok
}

View File

@ -145,7 +145,6 @@ func TestTracking(t *testing.T) {
CacheTTL: config.Duration(DefaultCacheTTL), CacheTTL: config.Duration(DefaultCacheTTL),
ImdsTags: []string{"accountId", "instanceId"}, ImdsTags: []string{"accountId", "instanceId"},
Log: &testutil.Logger{}, Log: &testutil.Logger{},
imdsTagsMap: make(map[string]struct{}),
} }
require.NoError(t, plugin.Init()) require.NoError(t, plugin.Init())

View File

@ -18,7 +18,7 @@
## * ramdiskId ## * ramdiskId
## * region ## * region
## * version ## * version
imds_tags = [] # imds_tags = []
## EC2 instance tags retrieved with DescribeTags action. ## EC2 instance tags retrieved with DescribeTags action.
## In case tag is empty upon retrieval it's omitted when tagging metrics. ## In case tag is empty upon retrieval it's omitted when tagging metrics.
@ -28,10 +28,22 @@
## ##
## For more information see: ## For more information see:
## https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeTags.html ## https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeTags.html
ec2_tags = [] # ec2_tags = []
## Paths to instance metadata information to attach to the metrics.
## Specify the full path without the base-path e.g. `tags/instance/Name`.
##
## For more information see:
## https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
# metadata_paths = []
## Allows to convert metadata tag-names to canonical names representing the
## full path with slashes ('/') being replaces with underscores. By default,
## only the last path element is used to name the tag.
# canonical_metadata_tags = false
## Timeout for http requests made by against aws ec2 metadata endpoint. ## Timeout for http requests made by against aws ec2 metadata endpoint.
timeout = "10s" # timeout = "10s"
## ordered controls whether or not the metrics need to stay in the same order ## ordered controls whether or not the metrics need to stay in the same order
## this plugin received them in. If false, this plugin will change the order ## this plugin received them in. If false, this plugin will change the order
@ -39,12 +51,12 @@
## waiting on slower lookups. This may cause issues for you if you are ## waiting on slower lookups. This may cause issues for you if you are
## depending on the order of metrics staying the same. If so, set this to true. ## depending on the order of metrics staying the same. If so, set this to true.
## Keeping the metrics ordered may be slightly slower. ## Keeping the metrics ordered may be slightly slower.
ordered = false # ordered = false
## max_parallel_calls is the maximum number of AWS API calls to be in flight ## max_parallel_calls is the maximum number of AWS API calls to be in flight
## at the same time. ## at the same time.
## It's probably best to keep this number fairly low. ## It's probably best to keep this number fairly low.
max_parallel_calls = 10 # max_parallel_calls = 10
## cache_ttl determines how long each cached item will remain in the cache before ## cache_ttl determines how long each cached item will remain in the cache before
## it is removed and subsequently needs to be queried for from the AWS API. By ## it is removed and subsequently needs to be queried for from the AWS API. By