AWS EC2 metadata processor Using StreamingProcessor (#8707)

This commit is contained in:
Patryk Małek 2021-02-04 23:02:27 +01:00 committed by GitHub
parent 3b8df55b9c
commit 7e78a08eba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 460 additions and 4 deletions

View File

@ -25,6 +25,15 @@ following works:
- github.com/aristanetworks/glog [Apache License 2.0](https://github.com/aristanetworks/glog/blob/master/LICENSE)
- github.com/aristanetworks/goarista [Apache License 2.0](https://github.com/aristanetworks/goarista/blob/master/COPYING)
- github.com/aws/aws-sdk-go [Apache License 2.0](https://github.com/aws/aws-sdk-go/blob/master/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2 [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/config [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/config/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/credentials [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/credentials/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/feature/ec2/imds [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/feature/ec2/imds/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/ec2 [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/ec2/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/internal/presigned-url [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/internal/presigned-url/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/sso [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/ec2/LICENSE.txt)
- github.com/aws/aws-sdk-go-v2/service/sts [Apache License 2.0](https://github.com/aws/aws-sdk-go-v2/blob/main/service/sts/LICENSE.txt)
- github.com/aws/smithy-go [Apache License 2.0](https://github.com/aws/smithy-go/blob/main/LICENSE)
- github.com/benbjohnson/clock [MIT License](https://github.com/benbjohnson/clock/blob/master/LICENSE)
- github.com/beorn7/perks [MIT License](https://github.com/beorn7/perks/blob/master/LICENSE)
- github.com/caio/go-tdigest [MIT License](https://github.com/caio/go-tdigest/blob/master/LICENSE)

7
go.mod
View File

@ -27,6 +27,11 @@ require (
github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740
github.com/armon/go-metrics v0.3.0 // indirect
github.com/aws/aws-sdk-go v1.34.34
github.com/aws/aws-sdk-go-v2 v1.1.0
github.com/aws/aws-sdk-go-v2/config v1.1.0
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.1
github.com/aws/aws-sdk-go-v2/service/ec2 v1.1.0
github.com/aws/smithy-go v1.0.0
github.com/benbjohnson/clock v1.0.3
github.com/bitly/go-hostpool v0.1.0 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869
@ -62,7 +67,7 @@ require (
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
github.com/golang/protobuf v1.3.5
github.com/golang/snappy v0.0.1
github.com/google/go-cmp v0.5.2
github.com/google/go-cmp v0.5.4
github.com/google/go-github/v32 v32.1.0
github.com/gopcua/opcua v0.1.12
github.com/gorilla/mux v1.6.2

20
go.sum
View File

@ -115,6 +115,24 @@ github.com/armon/go-metrics v0.3.0 h1:B7AQgHi8QSEi4uHu7Sbsga+IJDU+CENgjxoo81vDUq
github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs=
github.com/aws/aws-sdk-go v1.34.34 h1:5dC0ZU0xy25+UavGNEkQ/5MOQwxXDA2YXtjCL1HfYKI=
github.com/aws/aws-sdk-go v1.34.34/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
github.com/aws/aws-sdk-go-v2 v1.1.0 h1:sKP6QWxdN1oRYjl+k6S3bpgBI+XUx/0mqVOLIw4lR/Q=
github.com/aws/aws-sdk-go-v2 v1.1.0/go.mod h1:smfAbmpW+tcRVuNUjo3MOArSZmW72t62rkCzc2i0TWM=
github.com/aws/aws-sdk-go-v2/config v1.1.0 h1:f3QVGpAcKrWpYNhKB8hE/buMjcfei95buQ5xdr/xYcU=
github.com/aws/aws-sdk-go-v2/config v1.1.0/go.mod h1:zfTyI6wH8yiZEvb6hGVza+S5oIB2lts2M7TDB4zMoeo=
github.com/aws/aws-sdk-go-v2/credentials v1.1.0 h1:RV0yzjGSNnJhTBco+01lwvWlc2m8gqBfha3D9dQDk78=
github.com/aws/aws-sdk-go-v2/credentials v1.1.0/go.mod h1:cV0qgln5tz/76IxAV0EsJVmmR5ZzKSQwWixsIvzk6lY=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.1 h1:eoT5e1jJf8Vcacu+mkEe1cgsgEAkuabpjhgq03GiXKc=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.1/go.mod h1:b+8dhYiS3m1xpzTZWk5EuQml/vSmPhKlzM/bAm/fttY=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.1.0 h1:+VnEgB1yp+7KlOsk6FXX/v/fU9uL5oSujIMkKQBBmp8=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.1.0/go.mod h1:/6514fU/SRcY3+ousB1zjUqiXjruSuti2qcfE70osOc=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.1 h1:E7zGGgca12s7jA3VqirtaltXj5Wwe5eUIsUlNl1v+d8=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.1/go.mod h1:PISaKWylTYAyruocNk4Lr9miOOJjOcVBd7twCPbydDk=
github.com/aws/aws-sdk-go-v2/service/sso v1.1.0 h1:oQ/FE7bk1MldOs6RBTr+D7uMv1RfQ8WxxBRuH4lYEEo=
github.com/aws/aws-sdk-go-v2/service/sso v1.1.0/go.mod h1:VnS0vieB4YxutHFP9ROJ3ciT3T/XJZjxxv9L39eo8OQ=
github.com/aws/aws-sdk-go-v2/service/sts v1.1.0 h1:X9oTTSm14wc0ef4dit7aIB02UIw1kVi/imV7zLhFDdM=
github.com/aws/aws-sdk-go-v2/service/sts v1.1.0/go.mod h1:A15vQm/MsXL3a410CxwKQ5IBoSvIg+cr10fEFzPgEYs=
github.com/aws/smithy-go v1.0.0 h1:hkhcRKG9rJ4Fn+RbfXY7Tz7b3ITLDyolBnLLBhwbg/c=
github.com/aws/smithy-go v1.0.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB9LgIw=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
@ -287,6 +305,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-github/v32 v32.1.0 h1:GWkQOdXqviCPx7Q7Fj+KyPoGm4SwHRh8rheoPhd27II=
github.com/google/go-github/v32 v32.1.0/go.mod h1:rIEpZD9CTDQwDK9GDrtMTycQNA4JU3qBsCizh3q2WCI=
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=

View File

@ -7,7 +7,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/processors/reverse_dns/parallel"
"github.com/influxdata/telegraf/plugins/common/parallel"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

View File

@ -1,6 +1,7 @@
package all
import (
_ "github.com/influxdata/telegraf/plugins/processors/aws/ec2"
_ "github.com/influxdata/telegraf/plugins/processors/clone"
_ "github.com/influxdata/telegraf/plugins/processors/converter"
_ "github.com/influxdata/telegraf/plugins/processors/date"

View File

@ -0,0 +1,52 @@
# AWS EC2 Metadata Processor Plugin
AWS EC2 Metadata processor plugin appends metadata gathered from [AWS IMDS][]
to metrics associated with EC2 instances.
[AWS IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
## Configuration
```toml
[[processors.aws_ec2]]
## Tags to attach to metrics. Available tags:
## * accountId
## * architecture
## * availabilityZone
## * billingProducts
## * imageId
## * instanceId
## * instanceType
## * kernelId
## * pendingTime
## * privateIp
## * ramdiskId
## * region
## * version
tags = []
## Timeout for http requests made by against AWS EC2 metadata endpoint.
timeout = "10s"
## ordered controls whether or not the metrics need to stay in the same order
## this plugin received them in. If false, this plugin will change the order
## with requests hitting cached results moving through immediately and not
## waiting on slower lookups. This may cause issues for you if you are
## depending on the order of metrics staying the same. If so, set this to true.
## Keeping the metrics ordered may be slightly slower.
ordered = false
```
## Example
Append `accountId` and `instanceId` to metrics tags:
```toml
[[processors.aws_ec2]]
tags = [ "accountId", "instanceId"]
```
```diff
- cpu,hostname=localhost time_idle=42
+ cpu,hostname=localhost,accountId=123456789,instanceId=i-123456789123 time_idle=42
```

View File

@ -0,0 +1,310 @@
package ec2
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/smithy-go"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/parallel"
"github.com/influxdata/telegraf/plugins/processors"
)
type AwsEc2Processor struct {
ImdsTags []string `toml:"imds_tags"`
EC2Tags []string `toml:"ec2_tags"`
Timeout config.Duration `toml:"timeout"`
Ordered bool `toml:"ordered"`
MaxParallelCalls int `toml:"max_parallel_calls"`
Log telegraf.Logger `toml:"-"`
imdsClient *imds.Client `toml:"-"`
imdsTags map[string]struct{} `toml:"-"`
ec2Client *ec2.Client `toml:"-"`
parallel parallel.Parallel `toml:"-"`
instanceID string `toml:"-"`
}
const sampleConfig = `
## Instance identity document tags to attach to metrics.
## For more information see:
## https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html
##
## Available tags:
## * accountId
## * architecture
## * availabilityZone
## * billingProducts
## * imageId
## * instanceId
## * instanceType
## * kernelId
## * pendingTime
## * privateIp
## * ramdiskId
## * region
## * version
imds_tags = []
## EC2 instance tags retrieved with DescribeTags action.
## In case tag is empty upon retrieval it's omitted when tagging metrics.
## Note that in order for this to work, role attached to EC2 instance or AWS
## credentials available from the environment must have a policy attached, that
## allows ec2:DescribeTags.
##
## For more information see:
## https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeTags.html
ec2_tags = []
## Timeout for http requests made by against aws ec2 metadata endpoint.
timeout = "10s"
## ordered controls whether or not the metrics need to stay in the same order
## this plugin received them in. If false, this plugin will change the order
## with requests hitting cached results moving through immediately and not
## waiting on slower lookups. This may cause issues for you if you are
## depending on the order of metrics staying the same. If so, set this to true.
## Keeping the metrics ordered may be slightly slower.
ordered = false
## max_parallel_calls is the maximum number of AWS API calls to be in flight
## at the same time.
## It's probably best to keep this number fairly low.
max_parallel_calls = 10
`
const (
DefaultMaxOrderedQueueSize = 10_000
DefaultMaxParallelCalls = 10
DefaultTimeout = 10 * time.Second
)
var allowedImdsTags = map[string]struct{}{
"accountId": {},
"architecture": {},
"availabilityZone": {},
"billingProducts": {},
"imageId": {},
"instanceId": {},
"instanceType": {},
"kernelId": {},
"pendingTime": {},
"privateIp": {},
"ramdiskId": {},
"region": {},
"version": {},
}
func (r *AwsEc2Processor) SampleConfig() string {
return sampleConfig
}
func (r *AwsEc2Processor) Description() string {
return "Attach AWS EC2 metadata to metrics"
}
func (r *AwsEc2Processor) Add(metric telegraf.Metric, acc telegraf.Accumulator) error {
r.parallel.Enqueue(metric)
return nil
}
func (r *AwsEc2Processor) Init() error {
r.Log.Debug("Initializing AWS EC2 Processor")
if len(r.EC2Tags) == 0 && len(r.ImdsTags) == 0 {
return errors.New("no tags specified in configuration")
}
ctx := context.Background()
cfg, err := awsconfig.LoadDefaultConfig(ctx)
if err != nil {
return fmt.Errorf("failed loading default AWS config: %w", err)
}
r.imdsClient = imds.NewFromConfig(cfg)
iido, err := r.imdsClient.GetInstanceIdentityDocument(
ctx,
&imds.GetInstanceIdentityDocumentInput{},
)
if err != nil {
return fmt.Errorf("failed getting instance identity document: %w", err)
}
r.instanceID = iido.InstanceID
if len(r.EC2Tags) > 0 {
// Add region to AWS config when creating EC2 service client since it's required.
cfg.Region = iido.Region
r.ec2Client = ec2.NewFromConfig(cfg)
// Chceck if instance is allowed to call DescribeTags.
_, err = r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{
DryRun: true,
})
var ae smithy.APIError
if errors.As(err, &ae) {
if ae.ErrorCode() != "DryRunOperation" {
return fmt.Errorf("instance doesn't have permissions to call DescribeTags: %w", err)
}
} else if err != nil {
return fmt.Errorf("error calling DescribeTags: %w", err)
}
}
for _, tag := range r.ImdsTags {
if len(tag) > 0 && isImdsTagAllowed(tag) {
r.imdsTags[tag] = struct{}{}
} else {
return fmt.Errorf("not allowed metadata tag specified in configuration: %s", tag)
}
}
if len(r.imdsTags) == 0 && len(r.EC2Tags) == 0 {
return errors.New("no allowed metadata tags specified in configuration")
}
return nil
}
func (r *AwsEc2Processor) Start(acc telegraf.Accumulator) error {
if r.Ordered {
r.parallel = parallel.NewOrdered(acc, r.asyncAdd, DefaultMaxOrderedQueueSize, r.MaxParallelCalls)
} else {
r.parallel = parallel.NewUnordered(acc, r.asyncAdd, r.MaxParallelCalls)
}
return nil
}
func (r *AwsEc2Processor) Stop() error {
if r.parallel == nil {
return errors.New("Trying to stop unstarted AWS EC2 Processor")
}
r.parallel.Stop()
return nil
}
func (r *AwsEc2Processor) asyncAdd(metric telegraf.Metric) []telegraf.Metric {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout))
defer cancel()
// Add IMDS Instance Identity Document tags.
if len(r.imdsTags) > 0 {
iido, err := r.imdsClient.GetInstanceIdentityDocument(
ctx,
&imds.GetInstanceIdentityDocumentInput{},
)
if err != nil {
r.Log.Errorf("Error when calling GetInstanceIdentityDocument: %v", err)
return []telegraf.Metric{metric}
}
for tag := range r.imdsTags {
if v := getTagFromInstanceIdentityDocument(iido, tag); v != "" {
metric.AddTag(tag, v)
}
}
}
// Add EC2 instance tags.
if len(r.EC2Tags) > 0 {
dto, err := r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{
Filters: createFilterFromTags(r.instanceID, r.EC2Tags),
})
if err != nil {
r.Log.Errorf("Error during EC2 DescribeTags: %v", err)
return []telegraf.Metric{metric}
}
for _, tag := range r.EC2Tags {
if v := getTagFromDescribeTags(dto, tag); v != "" {
metric.AddTag(tag, v)
}
}
}
return []telegraf.Metric{metric}
}
func init() {
processors.AddStreaming("aws_ec2", func() telegraf.StreamingProcessor {
return newAwsEc2Processor()
})
}
func newAwsEc2Processor() *AwsEc2Processor {
return &AwsEc2Processor{
MaxParallelCalls: DefaultMaxParallelCalls,
Timeout: config.Duration(DefaultTimeout),
imdsTags: make(map[string]struct{}),
}
}
func createFilterFromTags(instanceID string, tagNames []string) []types.Filter {
return []types.Filter{
{
Name: aws.String("resource-id"),
Values: []string{instanceID},
},
{
Name: aws.String("key"),
Values: tagNames,
},
}
}
func getTagFromDescribeTags(o *ec2.DescribeTagsOutput, tag string) string {
for _, t := range o.Tags {
if *t.Key == tag {
return *t.Value
}
}
return ""
}
func getTagFromInstanceIdentityDocument(o *imds.GetInstanceIdentityDocumentOutput, tag string) string {
switch tag {
case "accountId":
return o.AccountID
case "architecture":
return o.Architecture
case "availabilityZone":
return o.AvailabilityZone
case "billingProducts":
return strings.Join(o.BillingProducts, ",")
case "imageId":
return o.ImageID
case "instanceId":
return o.InstanceID
case "instanceType":
return o.InstanceType
case "kernelId":
return o.KernelID
case "pendingTime":
return o.PendingTime.String()
case "privateIp":
return o.PrivateIP
case "ramdiskId":
return o.RamdiskID
case "region":
return o.Region
case "version":
return o.Version
default:
return ""
}
}
func isImdsTagAllowed(tag string) bool {
_, ok := allowedImdsTags[tag]
return ok
}

View File

@ -0,0 +1,59 @@
package ec2
import (
"testing"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)
func TestBasicStartup(t *testing.T) {
p := newAwsEc2Processor()
p.Log = &testutil.Logger{}
p.ImdsTags = []string{"accountId", "instanceId"}
acc := &testutil.Accumulator{}
require.NoError(t, p.Start(acc))
require.NoError(t, p.Stop())
require.Len(t, acc.GetTelegrafMetrics(), 0)
require.Len(t, acc.Errors, 0)
}
func TestBasicStartupWithEC2Tags(t *testing.T) {
p := newAwsEc2Processor()
p.Log = &testutil.Logger{}
p.ImdsTags = []string{"accountId", "instanceId"}
p.EC2Tags = []string{"Name"}
acc := &testutil.Accumulator{}
require.NoError(t, p.Start(acc))
require.NoError(t, p.Stop())
require.Len(t, acc.GetTelegrafMetrics(), 0)
require.Len(t, acc.Errors, 0)
}
func TestBasicInitNoTagsReturnAnError(t *testing.T) {
p := newAwsEc2Processor()
p.Log = &testutil.Logger{}
p.ImdsTags = []string{}
err := p.Init()
require.Error(t, err)
}
func TestBasicInitInvalidTagsReturnAnError(t *testing.T) {
p := newAwsEc2Processor()
p.Log = &testutil.Logger{}
p.ImdsTags = []string{"dummy", "qwerty"}
err := p.Init()
require.Error(t, err)
}
func TestLoadingConfig(t *testing.T) {
confFile := []byte("[[processors.aws_ec2]]" + "\n" + sampleConfig)
c := config.NewConfig()
err := c.LoadConfigData(confFile)
require.NoError(t, err)
require.Len(t, c.Processors, 1)
}

View File

@ -10,9 +10,9 @@ import (
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/snmp"
"github.com/influxdata/telegraf/plugins/common/parallel"
si "github.com/influxdata/telegraf/plugins/inputs/snmp"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/processors/reverse_dns/parallel"
)
var sampleConfig = `

View File

@ -5,8 +5,8 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/parallel"
"github.com/influxdata/telegraf/plugins/processors"
"github.com/influxdata/telegraf/plugins/processors/reverse_dns/parallel"
)
const sampleConfig = `