feat: support aws managed service for prometheus (#10202)
This commit is contained in:
parent
31cbd2e504
commit
8976483471
|
|
@ -68,6 +68,27 @@ batch format by default.
|
|||
## Maximum amount of time before idle connection is closed.
|
||||
## Zero means no limit.
|
||||
# idle_conn_timeout = 0
|
||||
|
||||
## Amazon Region
|
||||
#region = "us-east-1"
|
||||
|
||||
## Amazon Credentials
|
||||
## Credentials are loaded in the following order
|
||||
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
|
||||
## 2) Assumed credentials via STS if role_arn is specified
|
||||
## 3) explicit credentials from 'access_key' and 'secret_key'
|
||||
## 4) shared profile from 'profile'
|
||||
## 5) environment variables
|
||||
## 6) shared credentials file
|
||||
## 7) EC2 Instance Profile
|
||||
#access_key = ""
|
||||
#secret_key = ""
|
||||
#token = ""
|
||||
#role_arn = ""
|
||||
#web_identity_token_file = ""
|
||||
#role_session_name = ""
|
||||
#profile = ""
|
||||
#shared_credential_file = ""
|
||||
```
|
||||
|
||||
### Optional Cookie Authentication Settings
|
||||
|
|
|
|||
|
|
@ -4,12 +4,17 @@ import (
|
|||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
awsV2 "github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
||||
"github.com/influxdata/telegraf"
|
||||
internalaws "github.com/influxdata/telegraf/config/aws"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
|
||||
"github.com/influxdata/telegraf/plugins/outputs"
|
||||
|
|
@ -81,6 +86,27 @@ var sampleConfig = `
|
|||
## Maximum amount of time before idle connection is closed.
|
||||
## Zero means no limit.
|
||||
# idle_conn_timeout = 0
|
||||
|
||||
## Amazon Region
|
||||
#region = "us-east-1"
|
||||
|
||||
## Amazon Credentials
|
||||
## Credentials are loaded in the following order
|
||||
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
|
||||
## 2) Assumed credentials via STS if role_arn is specified
|
||||
## 3) explicit credentials from 'access_key' and 'secret_key'
|
||||
## 4) shared profile from 'profile'
|
||||
## 5) environment variables
|
||||
## 6) shared credentials file
|
||||
## 7) EC2 Instance Profile
|
||||
#access_key = ""
|
||||
#secret_key = ""
|
||||
#token = ""
|
||||
#role_arn = ""
|
||||
#web_identity_token_file = ""
|
||||
#role_session_name = ""
|
||||
#profile = ""
|
||||
#shared_credential_file = ""
|
||||
`
|
||||
|
||||
const (
|
||||
|
|
@ -97,11 +123,15 @@ type HTTP struct {
|
|||
Headers map[string]string `toml:"headers"`
|
||||
ContentEncoding string `toml:"content_encoding"`
|
||||
UseBatchFormat bool `toml:"use_batch_format"`
|
||||
AwsService string `toml:"aws_service"`
|
||||
httpconfig.HTTPClientConfig
|
||||
Log telegraf.Logger `toml:"-"`
|
||||
|
||||
client *http.Client
|
||||
serializer serializers.Serializer
|
||||
|
||||
awsCfg *awsV2.Config
|
||||
internalaws.CredentialConfig
|
||||
}
|
||||
|
||||
func (h *HTTP) SetSerializer(serializer serializers.Serializer) {
|
||||
|
|
@ -109,6 +139,13 @@ func (h *HTTP) SetSerializer(serializer serializers.Serializer) {
|
|||
}
|
||||
|
||||
func (h *HTTP) Connect() error {
|
||||
if h.AwsService != "" {
|
||||
cfg, err := h.CredentialConfig.Credentials()
|
||||
if err == nil {
|
||||
h.awsCfg = &cfg
|
||||
}
|
||||
}
|
||||
|
||||
if h.Method == "" {
|
||||
h.Method = http.MethodPost
|
||||
}
|
||||
|
|
@ -180,11 +217,43 @@ func (h *HTTP) writeMetric(reqBody []byte) error {
|
|||
reqBodyBuffer = rc
|
||||
}
|
||||
|
||||
var payloadHash *string
|
||||
if h.awsCfg != nil {
|
||||
// We need a local copy of the full buffer, the signature scheme requires a sha256 of the request body.
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = io.Copy(buf, reqBodyBuffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sum := sha256.Sum256(buf.Bytes())
|
||||
reqBodyBuffer = buf
|
||||
|
||||
// sha256 is hex encoded
|
||||
hash := fmt.Sprintf("%x", sum)
|
||||
payloadHash = &hash
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(h.Method, h.URL, reqBodyBuffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if h.awsCfg != nil {
|
||||
signer := v4.NewSigner()
|
||||
ctx := context.Background()
|
||||
|
||||
credentials, err := h.awsCfg.Credentials.Retrieve(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = signer.SignHTTP(ctx, credentials, req, *payloadHash, h.AwsService, h.Region, time.Now().UTC())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if h.Username != "" || h.Password != "" {
|
||||
req.SetBasicAuth(h.Username, h.Password)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/influxdata/telegraf"
|
||||
internalaws "github.com/influxdata/telegraf/config/aws"
|
||||
"github.com/influxdata/telegraf/internal"
|
||||
"github.com/influxdata/telegraf/metric"
|
||||
httpconfig "github.com/influxdata/telegraf/plugins/common/http"
|
||||
|
|
@ -516,3 +517,53 @@ func TestBatchedUnbatched(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAwsCredentials(t *testing.T) {
|
||||
ts := httptest.NewServer(http.NotFoundHandler())
|
||||
defer ts.Close()
|
||||
|
||||
u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
|
||||
require.NoError(t, err)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
plugin *HTTP
|
||||
tokenHandler TestHandlerFunc
|
||||
handler TestHandlerFunc
|
||||
}{
|
||||
{
|
||||
name: "simple credentials",
|
||||
plugin: &HTTP{
|
||||
URL: u.String(),
|
||||
AwsService: "aps",
|
||||
CredentialConfig: internalaws.CredentialConfig{
|
||||
Region: "us-east-1",
|
||||
AccessKey: "dummy",
|
||||
SecretKey: "dummy",
|
||||
},
|
||||
},
|
||||
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
|
||||
require.Contains(t, r.Header["Authorization"][0], "AWS4-HMAC-SHA256")
|
||||
require.Contains(t, r.Header["Authorization"][0], "=dummy/")
|
||||
require.Contains(t, r.Header["Authorization"][0], "/us-east-1/")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
tt.handler(t, w, r)
|
||||
})
|
||||
|
||||
serializer := influx.NewSerializer()
|
||||
tt.plugin.SetSerializer(serializer)
|
||||
err = tt.plugin.Connect()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = tt.plugin.Write([]telegraf.Metric{getMetric()})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue