Add v3 metadata support to ecs input (#7154)

This commit is contained in:
Sergey 2020-07-07 11:14:05 -07:00 committed by GitHub
parent 07f601f304
commit 55b672e4fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 277 additions and 33 deletions

View File

@ -1,8 +1,8 @@
# Amazon ECS Input Plugin # Amazon ECS Input Plugin
Amazon ECS, Fargate compatible, input plugin which uses the [Amazon ECS v2 metadata and Amazon ECS, Fargate compatible, input plugin which uses the Amazon ECS metadata and
stats API][task-metadata-endpoint-v2] endpoints to gather stats on running stats [v2][task-metadata-endpoint-v2] or [v3][task-metadata-endpoint-v3] API endpoints
containers in a Task. to gather stats on running containers in a Task.
The telegraf container must be run in the same Task as the workload it is The telegraf container must be run in the same Task as the workload it is
inspecting. inspecting.
@ -19,8 +19,41 @@ present in the metadata/stats endpoints.
```toml ```toml
# Read metrics about ECS containers # Read metrics about ECS containers
[[inputs.ecs]] [[inputs.ecs]]
## ECS metadata url ## ECS metadata url.
# endpoint_url = "http://169.254.170.2" ## Metadata v2 API is used if set explicitly. Otherwise,
## v3 metadata endpoint API is used if available.
# endpoint_url = ""
## Containers to include and exclude. Globs accepted.
## Note that an empty array for both will include all containers
# container_name_include = []
# container_name_exclude = []
## Container states to include and exclude. Globs accepted.
## When empty only containers in the "RUNNING" state will be captured.
## Possible values are "NONE", "PULLED", "CREATED", "RUNNING",
## "RESOURCES_PROVISIONED", "STOPPED".
# container_status_include = []
# container_status_exclude = []
## ecs labels to include and exclude as tags. Globs accepted.
## Note that an empty array for both will include all labels as tags
ecs_label_include = [ "com.amazonaws.ecs.*" ]
ecs_label_exclude = []
## Timeout for queries.
# timeout = "5s"
```
### Configuration (enforce v2 metadata)
```toml
# Read metrics about ECS containers
[[inputs.ecs]]
## ECS metadata url.
## Metadata v2 API is used if set explicitly. Otherwise,
## v3 metadata endpoint API is used if available.
endpoint_url = "http://169.254.170.2"
## Containers to include and exclude. Globs accepted. ## Containers to include and exclude. Globs accepted.
## Note that an empty array for both will include all containers ## Note that an empty array for both will include all containers
@ -210,3 +243,4 @@ ecs_container_meta,cluster=test,com.amazonaws.ecs.cluster=test,com.amazonaws.ecs
[docker-input]: /plugins/inputs/docker/README.md [docker-input]: /plugins/inputs/docker/README.md
[task-metadata-endpoint-v2]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html [task-metadata-endpoint-v2]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html
[task-metadata-endpoint-v3] https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html

View File

@ -12,8 +12,13 @@ import (
) )
var ( var (
ecsMetadataPath, _ = url.Parse("/v2/metadata") // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html
ecsMetaStatsPath, _ = url.Parse("/v2/stats") ecsMetadataPath = "/v2/metadata"
ecsMetaStatsPath = "/v2/stats"
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
ecsMetadataPathV3 = "/task"
ecsMetaStatsPathV3 = "/task/stats"
) )
// Client is the ECS client contract // Client is the ECS client contract
@ -27,30 +32,78 @@ type httpClient interface {
} }
// NewClient constructs an ECS client with the passed configuration params // NewClient constructs an ECS client with the passed configuration params
func NewClient(timeout time.Duration) (*EcsClient, error) { func NewClient(timeout time.Duration, endpoint string, version int) (*EcsClient, error) {
if version != 2 && version != 3 {
const msg = "expected metadata version 2 or 3, got %d"
return nil, fmt.Errorf(msg, version)
}
baseURL, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
c := &http.Client{ c := &http.Client{
Timeout: timeout, Timeout: timeout,
} }
return &EcsClient{ return &EcsClient{
client: c, client: c,
baseURL: baseURL,
taskURL: resolveTaskURL(baseURL, version),
statsURL: resolveStatsURL(baseURL, version),
version: version,
}, nil }, nil
} }
func resolveTaskURL(base *url.URL, version int) string {
var path string
switch version {
case 2:
path = ecsMetadataPath
case 3:
path = ecsMetadataPathV3
default:
// Should never happen.
const msg = "resolveTaskURL: unexpected version %d"
panic(fmt.Errorf(msg, version))
}
return resolveURL(base, path)
}
func resolveStatsURL(base *url.URL, version int) string {
var path string
switch version {
case 2:
path = ecsMetaStatsPath
case 3:
path = ecsMetaStatsPathV3
default:
// Should never happen.
const msg = "resolveStatsURL: unexpected version %d"
panic(fmt.Errorf(msg, version))
}
return resolveURL(base, path)
}
// resolveURL returns a URL string by concatenating the string representation of base
// and path. This is consistent with AWS metadata documentation:
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html#task-metadata-endpoint-v3-paths
func resolveURL(base *url.URL, path string) string {
return base.String() + path
}
// EcsClient contains ECS connection config // EcsClient contains ECS connection config
type EcsClient struct { type EcsClient struct {
client httpClient client httpClient
BaseURL *url.URL version int
baseURL *url.URL
taskURL string taskURL string
statsURL string statsURL string
} }
// Task calls the ECS metadata endpoint and returns a populated Task // Task calls the ECS metadata endpoint and returns a populated Task
func (c *EcsClient) Task() (*Task, error) { func (c *EcsClient) Task() (*Task, error) {
if c.taskURL == "" {
c.taskURL = c.BaseURL.ResolveReference(ecsMetadataPath).String()
}
req, _ := http.NewRequest("GET", c.taskURL, nil) req, _ := http.NewRequest("GET", c.taskURL, nil)
resp, err := c.client.Do(req) resp, err := c.client.Do(req)
if err != nil { if err != nil {
@ -74,10 +127,6 @@ func (c *EcsClient) Task() (*Task, error) {
// ContainerStats calls the ECS stats endpoint and returns a populated container stats map // ContainerStats calls the ECS stats endpoint and returns a populated container stats map
func (c *EcsClient) ContainerStats() (map[string]types.StatsJSON, error) { func (c *EcsClient) ContainerStats() (map[string]types.StatsJSON, error) {
if c.statsURL == "" {
c.statsURL = c.BaseURL.ResolveReference(ecsMetaStatsPath).String()
}
req, _ := http.NewRequest("GET", c.statsURL, nil) req, _ := http.NewRequest("GET", c.statsURL, nil)
resp, err := c.client.Do(req) resp, err := c.client.Do(req)
if err != nil { if err != nil {

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url"
"os" "os"
"testing" "testing"
@ -238,3 +239,77 @@ func TestEcsClient_ContainerStats(t *testing.T) {
}) })
} }
} }
func TestResolveTaskURL(t *testing.T) {
tests := []struct {
name string
base string
ver int
exp string
}{
{
name: "default v2 endpoint",
base: v2Endpoint,
ver: 2,
exp: "http://169.254.170.2/v2/metadata",
},
{
name: "custom v2 endpoint",
base: "http://192.168.0.1",
ver: 2,
exp: "http://192.168.0.1/v2/metadata",
},
{
name: "theoretical v3 endpoint",
base: "http://169.254.170.2/v3/metadata",
ver: 3,
exp: "http://169.254.170.2/v3/metadata/task",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
baseURL, err := url.Parse(tt.base)
assert.NoError(t, err)
act := resolveTaskURL(baseURL, tt.ver)
assert.Equal(t, tt.exp, act)
})
}
}
func TestResolveStatsURL(t *testing.T) {
tests := []struct {
name string
base string
ver int
exp string
}{
{
name: "default v2 endpoint",
base: v2Endpoint,
ver: 2,
exp: "http://169.254.170.2/v2/stats",
},
{
name: "custom v2 endpoint",
base: "http://192.168.0.1",
ver: 2,
exp: "http://192.168.0.1/v2/stats",
},
{
name: "theoretical v3 endpoint",
base: "http://169.254.170.2/v3/metadata",
ver: 3,
exp: "http://169.254.170.2/v3/metadata/task/stats",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
baseURL, err := url.Parse(tt.base)
assert.NoError(t, err)
act := resolveStatsURL(baseURL, tt.ver)
assert.Equal(t, tt.exp, act)
})
}
}

View File

@ -1,7 +1,7 @@
package ecs package ecs
import ( import (
"net/url" "os"
"strings" "strings"
"time" "time"
@ -25,13 +25,14 @@ type Ecs struct {
LabelInclude []string `toml:"ecs_label_include"` LabelInclude []string `toml:"ecs_label_include"`
LabelExclude []string `toml:"ecs_label_exclude"` LabelExclude []string `toml:"ecs_label_exclude"`
newClient func(timeout time.Duration) (*EcsClient, error) newClient func(timeout time.Duration, endpoint string, version int) (*EcsClient, error)
client Client client Client
filtersCreated bool filtersCreated bool
labelFilter filter.Filter labelFilter filter.Filter
containerNameFilter filter.Filter containerNameFilter filter.Filter
statusFilter filter.Filter statusFilter filter.Filter
metadataVersion int
} }
const ( const (
@ -40,11 +41,15 @@ const (
GB = 1000 * MB GB = 1000 * MB
TB = 1000 * GB TB = 1000 * GB
PB = 1000 * TB PB = 1000 * TB
v2Endpoint = "http://169.254.170.2"
) )
var sampleConfig = ` var sampleConfig = `
## ECS metadata url ## ECS metadata url.
# endpoint_url = "http://169.254.170.2" ## Metadata v2 API is used if set explicitly. Otherwise,
## v3 metadata endpoint API is used if available.
# endpoint_url = ""
## Containers to include and exclude. Globs accepted. ## Containers to include and exclude. Globs accepted.
## Note that an empty array for both will include all containers ## Note that an empty array for both will include all containers
@ -69,7 +74,7 @@ var sampleConfig = `
// Description describes ECS plugin // Description describes ECS plugin
func (ecs *Ecs) Description() string { func (ecs *Ecs) Description() string {
return "Read metrics about docker containers from Fargate/ECS v2 meta endpoints." return "Read metrics about docker containers from Fargate/ECS v2, v3 meta endpoints."
} }
// SampleConfig returns the ECS example config // SampleConfig returns the ECS example config
@ -107,18 +112,12 @@ func (ecs *Ecs) Gather(acc telegraf.Accumulator) error {
func initSetup(ecs *Ecs) error { func initSetup(ecs *Ecs) error {
if ecs.client == nil { if ecs.client == nil {
var err error resolveEndpoint(ecs)
var c *EcsClient
c, err = ecs.newClient(ecs.Timeout.Duration) c, err := ecs.newClient(ecs.Timeout.Duration, ecs.EndpointURL, ecs.metadataVersion)
if err != nil { if err != nil {
return err return err
} }
c.BaseURL, err = url.Parse(ecs.EndpointURL)
if err != nil {
return err
}
ecs.client = c ecs.client = c
} }
@ -142,6 +141,29 @@ func initSetup(ecs *Ecs) error {
return nil return nil
} }
func resolveEndpoint(ecs *Ecs) {
if ecs.EndpointURL != "" {
// Use metadata v2 API since endpoint is set explicitly.
ecs.metadataVersion = 2
return
}
// Auto-detect metadata endpoint version.
// Use metadata v3 if available.
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
v3Endpoint := os.Getenv("ECS_CONTAINER_METADATA_URI")
if v3Endpoint != "" {
ecs.EndpointURL = v3Endpoint
ecs.metadataVersion = 3
return
}
// Use v2 endpoint if nothing else is available.
ecs.EndpointURL = v2Endpoint
ecs.metadataVersion = 2
}
func (ecs *Ecs) accTask(task *Task, tags map[string]string, acc telegraf.Accumulator) { func (ecs *Ecs) accTask(task *Task, tags map[string]string, acc telegraf.Accumulator) {
taskFields := map[string]interface{}{ taskFields := map[string]interface{}{
"revision": task.Revision, "revision": task.Revision,
@ -240,7 +262,7 @@ func (ecs *Ecs) createContainerStatusFilters() error {
func init() { func init() {
inputs.Add("ecs", func() telegraf.Input { inputs.Add("ecs", func() telegraf.Input {
return &Ecs{ return &Ecs{
EndpointURL: "http://169.254.170.2", EndpointURL: "",
Timeout: internal.Duration{Duration: 5 * time.Second}, Timeout: internal.Duration{Duration: 5 * time.Second},
newClient: NewClient, newClient: NewClient,
filtersCreated: false, filtersCreated: false,

View File

@ -1,9 +1,12 @@
package ecs package ecs
import ( import (
"os"
"testing"
"time" "time"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/stretchr/testify/assert"
) )
// codified golden objects for tests // codified golden objects for tests
@ -765,3 +768,64 @@ var validMeta = Task{
PullStartedAt: metaPullStart, PullStartedAt: metaPullStart,
PullStoppedAt: metaPullStop, PullStoppedAt: metaPullStop,
} }
func TestResolveEndpoint(t *testing.T) {
tests := []struct {
name string
given Ecs
exp Ecs
preF func()
afterF func()
}{
{
name: "Endpoint is explicitly set => use v2 metadata",
given: Ecs{
EndpointURL: "192.162.0.1/custom_endpoint",
},
exp: Ecs{
EndpointURL: "192.162.0.1/custom_endpoint",
metadataVersion: 2,
},
},
{
name: "Endpoint is not set, ECS_CONTAINER_METADATA_URI is not set => use v2 metadata",
given: Ecs{
EndpointURL: "",
},
exp: Ecs{
EndpointURL: v2Endpoint,
metadataVersion: 2,
},
},
{
name: "Endpoint is not set, ECS_CONTAINER_METADATA_URI is set => use v3 metadata",
preF: func() {
os.Setenv("ECS_CONTAINER_METADATA_URI", "v3-endpoint.local")
},
afterF: func() {
os.Unsetenv("ECS_CONTAINER_METADATA_URI")
},
given: Ecs{
EndpointURL: "",
},
exp: Ecs{
EndpointURL: "v3-endpoint.local",
metadataVersion: 3,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.preF != nil {
tt.preF()
}
if tt.afterF != nil {
defer tt.afterF()
}
act := tt.given
resolveEndpoint(&act)
assert.Equal(t, tt.exp, act)
})
}
}