feat(inputs.firehose): Add new plugin (#15988)
This commit is contained in:
parent
01aa1a39cf
commit
1d4f08954e
|
|
@ -0,0 +1,5 @@
|
||||||
|
//go:build !custom || inputs || inputs.firehose
|
||||||
|
|
||||||
|
package all
|
||||||
|
|
||||||
|
import _ "github.com/influxdata/telegraf/plugins/inputs/firehose" // register plugin
|
||||||
|
|
@ -0,0 +1,122 @@
|
||||||
|
# AWS Data Firehose Input Plugin
|
||||||
|
|
||||||
|
This plugin listens for metrics sent via HTTP from [AWS Data Firehose][firehose]
|
||||||
|
in one of the supported [data formats][data_formats].
|
||||||
|
The plugin strictly follows the request-response schema as describe in the
|
||||||
|
official [documentation][response_spec].
|
||||||
|
|
||||||
|
⭐ Telegraf v1.34.0
|
||||||
|
🏷️ cloud, messaging
|
||||||
|
💻 all
|
||||||
|
|
||||||
|
[firehose]: https://aws.amazon.com/de/firehose/
|
||||||
|
[data_formats]: /docs/DATA_FORMATS_INPUT.md
|
||||||
|
[response_spec]: https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html
|
||||||
|
|
||||||
|
## Service Input <!-- @/docs/includes/service_input.md -->
|
||||||
|
|
||||||
|
This plugin is a service input. Normal plugins gather metrics determined by the
|
||||||
|
interval setting. Service plugins start a service to listens and waits for
|
||||||
|
metrics or events to occur. Service plugins have two key differences from
|
||||||
|
normal plugins:
|
||||||
|
|
||||||
|
1. The global or plugin specific `interval` setting may not apply
|
||||||
|
2. The CLI options of `--test`, `--test-wait`, and `--once` may not produce
|
||||||
|
output for this plugin
|
||||||
|
|
||||||
|
## Global configuration options <!-- @/docs/includes/plugin_config.md -->
|
||||||
|
|
||||||
|
In addition to the plugin-specific configuration settings, plugins support
|
||||||
|
additional global and plugin configuration settings. These settings are used to
|
||||||
|
modify metrics, tags, and field or create aliases and configure ordering, etc.
|
||||||
|
See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
|
||||||
|
|
||||||
|
[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
```toml @sample.conf
|
||||||
|
# AWS Data Firehose listener
|
||||||
|
[[inputs.firehose]]
|
||||||
|
## Address and port to host HTTP listener on
|
||||||
|
service_address = ":8080"
|
||||||
|
|
||||||
|
## Paths to listen to.
|
||||||
|
# paths = ["/telegraf"]
|
||||||
|
|
||||||
|
## maximum duration before timing out read of the request
|
||||||
|
# read_timeout = "5s"
|
||||||
|
## maximum duration before timing out write of the response
|
||||||
|
# write_timeout = "5s"
|
||||||
|
|
||||||
|
## Set one or more allowed client CA certificate file names to
|
||||||
|
## enable mutually authenticated TLS connections
|
||||||
|
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
||||||
|
|
||||||
|
## Add service certificate and key
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
|
||||||
|
## Minimal TLS version accepted by the server
|
||||||
|
# tls_min_version = "TLS12"
|
||||||
|
|
||||||
|
## Optional access key to accept for authentication.
|
||||||
|
## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key.
|
||||||
|
## If no access_key is provided (default), authentication is completely disabled and
|
||||||
|
## this plugin will accept all request ignoring the provided access-key in the request!
|
||||||
|
# access_key = "foobar"
|
||||||
|
|
||||||
|
## Optional setting to add parameters as tags
|
||||||
|
## If the http header "x-amz-firehose-common-attributes" is not present on the
|
||||||
|
## request, no corresponding tag will be added. The header value should be a
|
||||||
|
## json and should follow the schema as describe in the official documentation:
|
||||||
|
## https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#requestformat
|
||||||
|
# parameter_tags = ["env"]
|
||||||
|
|
||||||
|
## Data format to consume.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
|
# data_format = "influx"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Metrics
|
||||||
|
|
||||||
|
Metrics are collected from the `records.[*].data` field in the request body.
|
||||||
|
The data must be base64 encoded and may be sent in any supported
|
||||||
|
[data format][data_formats].
|
||||||
|
|
||||||
|
## Example Output
|
||||||
|
|
||||||
|
When run with this configuration:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[[inputs.firehose]]
|
||||||
|
service_address = ":8080"
|
||||||
|
paths = ["/telegraf"]
|
||||||
|
data_format = "value"
|
||||||
|
data_type = "string"
|
||||||
|
```
|
||||||
|
|
||||||
|
the following curl command:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
curl -i -XPOST 'localhost:8080/telegraf' \
|
||||||
|
--header 'x-amz-firehose-request-id: ed4acda5-034f-9f42-bba1-f29aea6d7d8f' \
|
||||||
|
--header 'Content-Type: application/json' \
|
||||||
|
--data '{
|
||||||
|
"requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f",
|
||||||
|
"timestamp": 1578090901599,
|
||||||
|
"records": [
|
||||||
|
{
|
||||||
|
"data": "aGVsbG8gd29ybGQK" // "hello world"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}'
|
||||||
|
```
|
||||||
|
|
||||||
|
produces:
|
||||||
|
|
||||||
|
```text
|
||||||
|
firehose,firehose_http_path=/telegraf value="hello world" 1725001851000000000
|
||||||
|
```
|
||||||
|
|
@ -0,0 +1,239 @@
|
||||||
|
//go:generate ../../../tools/readme_config_includer/generator
|
||||||
|
package firehose
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
_ "embed"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"slices"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
common_tls "github.com/influxdata/telegraf/plugins/common/tls"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed sample.conf
|
||||||
|
var sampleConfig string
|
||||||
|
|
||||||
|
type Firehose struct {
|
||||||
|
ServiceAddress string `toml:"service_address"`
|
||||||
|
Paths []string `toml:"paths"`
|
||||||
|
ReadTimeout config.Duration `toml:"read_timeout"`
|
||||||
|
WriteTimeout config.Duration `toml:"write_timeout"`
|
||||||
|
AccessKey config.Secret `toml:"access_key"`
|
||||||
|
ParameterTags []string `toml:"parameter_tags"`
|
||||||
|
Log telegraf.Logger `toml:"-"`
|
||||||
|
|
||||||
|
common_tls.ServerConfig
|
||||||
|
tlsConf *tls.Config
|
||||||
|
|
||||||
|
once sync.Once
|
||||||
|
|
||||||
|
listener net.Listener
|
||||||
|
server http.Server
|
||||||
|
|
||||||
|
parser telegraf.Parser
|
||||||
|
acc telegraf.Accumulator
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*Firehose) SampleConfig() string {
|
||||||
|
return sampleConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Firehose) SetParser(parser telegraf.Parser) {
|
||||||
|
f.parser = parser
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Firehose) Init() error {
|
||||||
|
if f.ServiceAddress == "" {
|
||||||
|
f.ServiceAddress = ":8080"
|
||||||
|
}
|
||||||
|
if len(f.Paths) == 0 {
|
||||||
|
f.Paths = []string{"/telegraf"}
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
f.tlsConf, err = f.ServerConfig.TLSConfig()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the http listener service.
|
||||||
|
func (f *Firehose) Start(acc telegraf.Accumulator) error {
|
||||||
|
f.acc = acc
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if f.tlsConf != nil {
|
||||||
|
f.listener, err = tls.Listen("tcp", f.ServiceAddress, f.tlsConf)
|
||||||
|
} else {
|
||||||
|
f.listener, err = net.Listen("tcp", f.ServiceAddress)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("creating listener failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
f.server = http.Server{
|
||||||
|
Addr: f.ServiceAddress,
|
||||||
|
Handler: f,
|
||||||
|
ReadTimeout: time.Duration(f.ReadTimeout),
|
||||||
|
WriteTimeout: time.Duration(f.WriteTimeout),
|
||||||
|
TLSConfig: f.tlsConf,
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := f.server.Serve(f.listener); err != nil {
|
||||||
|
if !errors.Is(err, net.ErrClosed) {
|
||||||
|
f.Log.Errorf("Server failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
f.Log.Infof("Listening on %s", f.listener.Addr().String())
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop cleans up all resources
|
||||||
|
func (f *Firehose) Stop() {
|
||||||
|
if err := f.server.Shutdown(context.Background()); err != nil {
|
||||||
|
f.Log.Errorf("Shutting down server failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*Firehose) Gather(telegraf.Accumulator) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
||||||
|
if !slices.Contains(f.Paths, req.URL.Path) {
|
||||||
|
res.WriteHeader(http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
msg, err := f.handleRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
f.acc.AddError(err)
|
||||||
|
}
|
||||||
|
if err := msg.sendResponse(res); err != nil {
|
||||||
|
f.acc.AddError(fmt.Errorf("sending response failed: %w", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Firehose) handleRequest(req *http.Request) (*message, error) {
|
||||||
|
// Create a request with a default response status code
|
||||||
|
msg := &message{
|
||||||
|
responseCode: http.StatusInternalServerError,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the request ID used to reference the request
|
||||||
|
msg.id = req.Header.Get("x-amz-firehose-request-id")
|
||||||
|
if msg.id == "" {
|
||||||
|
msg.responseCode = http.StatusBadRequest
|
||||||
|
return msg, errors.New("x-amz-firehose-request-id header is not set")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the maximum body size which can be up to 64 MiB according to
|
||||||
|
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html
|
||||||
|
if req.ContentLength > int64(64*1024*1024) {
|
||||||
|
msg.responseCode = http.StatusRequestEntityTooLarge
|
||||||
|
return msg, errors.New("content length is too large")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the HTTP method used
|
||||||
|
switch req.Method {
|
||||||
|
case http.MethodPost, http.MethodPut:
|
||||||
|
// Do nothing, those methods are allowed
|
||||||
|
default:
|
||||||
|
msg.responseCode = http.StatusMethodNotAllowed
|
||||||
|
return msg, fmt.Errorf("method %q is not allowed", req.Method)
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.Header.Get("content-type") != "application/json" {
|
||||||
|
msg.responseCode = http.StatusUnsupportedMediaType
|
||||||
|
return msg, fmt.Errorf("content type %q is not allowed", req.Header.Get("content-type"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode the content if necessary and parse the JSON message
|
||||||
|
encoding := req.Header.Get("content-encoding")
|
||||||
|
body, err := internal.NewStreamContentDecoder(encoding, req.Body)
|
||||||
|
if err != nil {
|
||||||
|
msg.responseCode = http.StatusUnsupportedMediaType
|
||||||
|
return msg, fmt.Errorf("creating %q decoder for request %q failed: %w", encoding, msg.id, err)
|
||||||
|
}
|
||||||
|
defer req.Body.Close()
|
||||||
|
|
||||||
|
var reqbody requestBody
|
||||||
|
if err := json.NewDecoder(body).Decode(&reqbody); err != nil {
|
||||||
|
msg.responseCode = http.StatusBadRequest
|
||||||
|
return msg, fmt.Errorf("decode body for request %q failed: %w", msg.id, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate the body content
|
||||||
|
if msg.id != reqbody.RequestID {
|
||||||
|
msg.responseCode = http.StatusBadRequest
|
||||||
|
return msg, fmt.Errorf("mismatch between request ID in header (%q) and body (%q)", msg.id, reqbody.RequestID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Authenticate the request
|
||||||
|
if err := msg.authenticate(req, f.AccessKey); err != nil {
|
||||||
|
return msg, fmt.Errorf("authentication for request %q failed: %w", msg.id, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the records and parameters for tagging
|
||||||
|
records, err := msg.decodeData(&reqbody)
|
||||||
|
if err != nil {
|
||||||
|
return msg, fmt.Errorf("decode base64 data from request %q failed: %w", msg.id, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tags, err := msg.extractTagsFromCommonAttributes(req, f.ParameterTags)
|
||||||
|
if err != nil {
|
||||||
|
return msg, fmt.Errorf("extracting parameter tags for request %q failed: %w", msg.id, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the metrics
|
||||||
|
var metrics []telegraf.Metric
|
||||||
|
for _, record := range records {
|
||||||
|
m, err := f.parser.Parse(record)
|
||||||
|
if err != nil {
|
||||||
|
// respond with bad request status code to inform firehose about the failure
|
||||||
|
msg.responseCode = http.StatusBadRequest
|
||||||
|
return msg, fmt.Errorf("parsing data of request %q failed: %w", msg.id, err)
|
||||||
|
}
|
||||||
|
metrics = append(metrics, m...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(metrics) == 0 {
|
||||||
|
f.once.Do(func() {
|
||||||
|
f.Log.Info(internal.NoMetricsCreatedMsg)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the extracted tags and the path
|
||||||
|
for _, m := range metrics {
|
||||||
|
for k, v := range tags {
|
||||||
|
m.AddTag(k, v)
|
||||||
|
}
|
||||||
|
m.AddTag("path", req.URL.Path)
|
||||||
|
f.acc.AddMetric(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.responseCode = http.StatusOK
|
||||||
|
return msg, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
inputs.Add("firehose", func() telegraf.Input {
|
||||||
|
return &Firehose{
|
||||||
|
ReadTimeout: config.Duration(time.Second * 5),
|
||||||
|
WriteTimeout: config.Duration(time.Second * 5),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,374 @@
|
||||||
|
package firehose
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf"
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
"github.com/influxdata/telegraf/plugins/inputs"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers/influx"
|
||||||
|
"github.com/influxdata/telegraf/testutil"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInvalidRequests(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
headers map[string]string
|
||||||
|
body string
|
||||||
|
method string
|
||||||
|
expectedMsg string
|
||||||
|
expectedCode int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "missing request id",
|
||||||
|
headers: map[string]string{"x-amz-firehose-request-id": ""},
|
||||||
|
body: `{"requestId":"test-id","timestamp":1578090901599,"records":[{"data":"dGVzdA=="}]}`,
|
||||||
|
expectedMsg: "x-amz-firehose-request-id header is not set",
|
||||||
|
expectedCode: 400,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "request id mismatch",
|
||||||
|
headers: map[string]string{"x-amz-firehose-request-id": "test-id"},
|
||||||
|
body: `{"requestId":"some-other-id","timestamp":1578090901599,"records":[{"data":"dGVzdA=="}]}`,
|
||||||
|
expectedMsg: "mismatch between request ID",
|
||||||
|
expectedCode: 400,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid body",
|
||||||
|
headers: map[string]string{"x-amz-firehose-request-id": "test-id"},
|
||||||
|
body: "not a json",
|
||||||
|
expectedMsg: `decode body for request "test-id" failed`,
|
||||||
|
expectedCode: 400,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid data encoding",
|
||||||
|
headers: map[string]string{"x-amz-firehose-request-id": "test-id"},
|
||||||
|
body: `{"requestId":"test-id","timestamp":1578090901599,"records":[{"data":"not a base64 encoded text"}]}`,
|
||||||
|
expectedMsg: `ecode base64 data from request "test-id" failed: illegal base64 data`,
|
||||||
|
expectedCode: 400,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "content too large",
|
||||||
|
headers: map[string]string{"x-amz-firehose-request-id": "test-id"},
|
||||||
|
body: strings.Repeat("x", 65*1024*1024),
|
||||||
|
expectedMsg: `content length is too large`,
|
||||||
|
expectedCode: 413,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid content type",
|
||||||
|
headers: map[string]string{
|
||||||
|
"x-amz-firehose-request-id": "test-id",
|
||||||
|
"content-type": "application/text",
|
||||||
|
},
|
||||||
|
body: `{"requestId":"test-id","timestamp":1578090901599,"records":[{"data":"dGVzdA=="}]}`,
|
||||||
|
expectedMsg: `content type "application/text" is not allowed`,
|
||||||
|
expectedCode: 415,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid method",
|
||||||
|
headers: map[string]string{"x-amz-firehose-request-id": "test-id"},
|
||||||
|
body: `{"requestId":"test-id","timestamp":1578090901599,"records":[{"data":"dGVzdA=="}]}`,
|
||||||
|
method: "GET",
|
||||||
|
expectedMsg: `method "GET" is not allowed`,
|
||||||
|
expectedCode: 405,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
// Setup plugin and start it
|
||||||
|
plugin := &Firehose{
|
||||||
|
ServiceAddress: "127.0.0.1:0",
|
||||||
|
Log: &testutil.Logger{},
|
||||||
|
}
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, plugin.Start(&acc))
|
||||||
|
defer plugin.Stop()
|
||||||
|
|
||||||
|
// Get the listening address
|
||||||
|
addr := plugin.listener.Addr().String()
|
||||||
|
|
||||||
|
// Create a request with the data defined in the test case
|
||||||
|
method := "POST"
|
||||||
|
if tt.method != "" {
|
||||||
|
method = tt.method
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest(method, "http://"+addr+"/telegraf", bytes.NewBufferString(tt.body))
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set("content-type", "application/json")
|
||||||
|
for k, v := range tt.headers {
|
||||||
|
req.Header.Set(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the request
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
// Check the result
|
||||||
|
require.ErrorContains(t, acc.FirstError(), tt.expectedMsg)
|
||||||
|
require.Equal(t, tt.expectedCode, resp.StatusCode)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAuthentication(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
body string
|
||||||
|
headers map[string]string
|
||||||
|
key string
|
||||||
|
expectedMsg string
|
||||||
|
expectedCode int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no auth required",
|
||||||
|
headers: map[string]string{
|
||||||
|
"x-amz-firehose-request-id": "test-id",
|
||||||
|
},
|
||||||
|
body: `
|
||||||
|
{
|
||||||
|
"requestId": "test-id",
|
||||||
|
"timestamp":1734625715000000000,
|
||||||
|
"records":[{"data":"dGVzdCB2YWx1ZT00MmkgMTczNDYyNTcxNTAwMDAwMDAwMAo="}]
|
||||||
|
}`,
|
||||||
|
expectedCode: 200,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no auth required but key sent",
|
||||||
|
headers: map[string]string{
|
||||||
|
"x-amz-firehose-request-id": "test-id",
|
||||||
|
"x-amz-firehose-access-key": "test-key",
|
||||||
|
},
|
||||||
|
body: `
|
||||||
|
{
|
||||||
|
"requestId": "test-id",
|
||||||
|
"timestamp":1734625715000000000,
|
||||||
|
"records":[{"data":"dGVzdCB2YWx1ZT00MmkgMTczNDYyNTcxNTAwMDAwMDAwMAo="}]
|
||||||
|
}`,
|
||||||
|
expectedCode: 200,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "auth required success",
|
||||||
|
headers: map[string]string{
|
||||||
|
"x-amz-firehose-request-id": "test-id",
|
||||||
|
"x-amz-firehose-access-key": "test-key",
|
||||||
|
},
|
||||||
|
body: `
|
||||||
|
{
|
||||||
|
"requestId": "test-id",
|
||||||
|
"timestamp":1734625715000000000,
|
||||||
|
"records":[{"data":"dGVzdCB2YWx1ZT00MmkgMTczNDYyNTcxNTAwMDAwMDAwMAo="}]
|
||||||
|
}`,
|
||||||
|
key: "test-key",
|
||||||
|
expectedCode: 200,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "auth required wrong key",
|
||||||
|
headers: map[string]string{
|
||||||
|
"x-amz-firehose-request-id": "test-id",
|
||||||
|
"x-amz-firehose-access-key": "foo bar",
|
||||||
|
},
|
||||||
|
body: `
|
||||||
|
{
|
||||||
|
"requestId": "test-id",
|
||||||
|
"timestamp":1734625715000000000,
|
||||||
|
"records":[{"data":"dGVzdCB2YWx1ZT00MmkgMTczNDYyNTcxNTAwMDAwMDAwMAo="}]
|
||||||
|
}`,
|
||||||
|
key: "test-key",
|
||||||
|
expectedMsg: "unauthorized request",
|
||||||
|
expectedCode: 401,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
// Setup plugin
|
||||||
|
plugin := &Firehose{
|
||||||
|
ServiceAddress: "127.0.0.1:0",
|
||||||
|
AccessKey: config.NewSecret([]byte(tt.key)),
|
||||||
|
Log: &testutil.Logger{},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup a parser
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
plugin.SetParser(parser)
|
||||||
|
|
||||||
|
// Start the plugin
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, plugin.Start(&acc))
|
||||||
|
defer plugin.Stop()
|
||||||
|
|
||||||
|
// Get the listening address
|
||||||
|
addr := plugin.listener.Addr().String()
|
||||||
|
|
||||||
|
// Create a request with the data defined in the test case
|
||||||
|
req, err := http.NewRequest("POST", "http://"+addr+"/telegraf", bytes.NewBufferString(tt.body))
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set("content-type", "application/json")
|
||||||
|
for k, v := range tt.headers {
|
||||||
|
req.Header.Set(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the request
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
// Check the result
|
||||||
|
if tt.expectedMsg == "" {
|
||||||
|
require.NoError(t, acc.FirstError())
|
||||||
|
} else {
|
||||||
|
require.ErrorContains(t, acc.FirstError(), tt.expectedMsg)
|
||||||
|
}
|
||||||
|
require.Equal(t, tt.expectedCode, resp.StatusCode)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCases(t *testing.T) {
|
||||||
|
// Get all directories in testdata
|
||||||
|
folders, err := os.ReadDir("testcases")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Register the plugin
|
||||||
|
inputs.Add("firehose", func() telegraf.Input {
|
||||||
|
return &Firehose{
|
||||||
|
ReadTimeout: config.Duration(time.Second * 5),
|
||||||
|
WriteTimeout: config.Duration(time.Second * 5),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Prepare the influx parser for expectations
|
||||||
|
parser := &influx.Parser{}
|
||||||
|
require.NoError(t, parser.Init())
|
||||||
|
|
||||||
|
for _, f := range folders {
|
||||||
|
// Only handle folders
|
||||||
|
if !f.IsDir() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
testcasePath := filepath.Join("testcases", f.Name())
|
||||||
|
configFilename := filepath.Join(testcasePath, "telegraf.conf")
|
||||||
|
expectedFilename := filepath.Join(testcasePath, "expected.out")
|
||||||
|
expectedErrorFilename := filepath.Join(testcasePath, "expected.err")
|
||||||
|
|
||||||
|
t.Run(f.Name(), func(t *testing.T) {
|
||||||
|
// Read the input data
|
||||||
|
headers, bodies, err := readInputData(testcasePath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Read the expected output if any
|
||||||
|
var expected []telegraf.Metric
|
||||||
|
if _, err := os.Stat(expectedFilename); err == nil {
|
||||||
|
var err error
|
||||||
|
expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the expected output if any
|
||||||
|
var expectedErrors []string
|
||||||
|
if _, err := os.Stat(expectedErrorFilename); err == nil {
|
||||||
|
var err error
|
||||||
|
expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, expectedErrors)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Configure and initialize the plugin
|
||||||
|
cfg := config.NewConfig()
|
||||||
|
require.NoError(t, cfg.LoadConfig(configFilename))
|
||||||
|
require.Len(t, cfg.Inputs, 1)
|
||||||
|
|
||||||
|
plugin := cfg.Inputs[0].Input.(*Firehose)
|
||||||
|
plugin.ServiceAddress = "127.0.0.1:0"
|
||||||
|
require.NoError(t, plugin.Init())
|
||||||
|
|
||||||
|
// Start the plugin
|
||||||
|
var acc testutil.Accumulator
|
||||||
|
require.NoError(t, plugin.Start(&acc))
|
||||||
|
defer plugin.Stop()
|
||||||
|
|
||||||
|
// Get the listening address
|
||||||
|
addr := plugin.listener.Addr().String()
|
||||||
|
|
||||||
|
// Set all message bodies
|
||||||
|
endpoint := "http://" + addr + plugin.Paths[0]
|
||||||
|
for i, body := range bodies {
|
||||||
|
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(body))
|
||||||
|
require.NoErrorf(t, err, "creating request for body %d", i)
|
||||||
|
req.Header.Set("content-type", "application/json")
|
||||||
|
for k, v := range headers {
|
||||||
|
req.Header.Set(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the request
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
require.NoErrorf(t, err, "executing request for body %d", i)
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
if len(expectedErrors) == 0 {
|
||||||
|
require.Equalf(t, 200, resp.StatusCode, "result for body %d: %v", i, acc.Errors)
|
||||||
|
} else {
|
||||||
|
require.NotEqualf(t, 200, resp.StatusCode, "result for body %d: %v", i, acc.Errors)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the result
|
||||||
|
var actualErrorMsgs []string
|
||||||
|
if len(acc.Errors) > 0 {
|
||||||
|
for _, err := range acc.Errors {
|
||||||
|
actualErrorMsgs = append(actualErrorMsgs, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
require.ElementsMatch(t, actualErrorMsgs, expectedErrors)
|
||||||
|
|
||||||
|
// Check the metric nevertheless as we might get some metrics despite errors.
|
||||||
|
actual := acc.GetTelegrafMetrics()
|
||||||
|
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func readInputData(path string) (map[string]string, [][]byte, error) {
|
||||||
|
// Reading the headers file
|
||||||
|
var headers map[string]string
|
||||||
|
headersBuf, err := os.ReadFile(filepath.Join(path, "headers.json"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(headersBuf, &headers); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read all bodies
|
||||||
|
bodyFiles, err := filepath.Glob(filepath.Join(path, "body*.json"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
sort.Strings(bodyFiles)
|
||||||
|
bodies := make([][]byte, 0, len(bodyFiles))
|
||||||
|
for _, fn := range bodyFiles {
|
||||||
|
buf, err := os.ReadFile(fn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
bodies = append(bodies, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
return headers, bodies, nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
package firehose
|
||||||
|
|
@ -0,0 +1,125 @@
|
||||||
|
package firehose
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/telegraf/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Firehose request data-structures according to
|
||||||
|
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#requestformat
|
||||||
|
type record struct {
|
||||||
|
EncodedData string `json:"data"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type requestBody struct {
|
||||||
|
RequestID string `json:"requestId"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
Records []record `json:"records"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Required response data structure according to
|
||||||
|
// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat
|
||||||
|
type responseBody struct {
|
||||||
|
RequestID string `json:"requestId"`
|
||||||
|
Timestamp int64 `json:"timestamp"`
|
||||||
|
ErrorMessage string `json:"errorMessage,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type message struct {
|
||||||
|
id string
|
||||||
|
responseCode int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *message) authenticate(req *http.Request, expected config.Secret) error {
|
||||||
|
// We completely switch off authentication if no 'access_key' was provided in the config, it's intended!
|
||||||
|
if expected.Empty() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
key := req.Header.Get("x-amz-firehose-access-key")
|
||||||
|
match, err := expected.EqualTo([]byte(key))
|
||||||
|
if err != nil {
|
||||||
|
m.responseCode = http.StatusInternalServerError
|
||||||
|
return fmt.Errorf("comparing keys failed: %w", err)
|
||||||
|
}
|
||||||
|
if !match {
|
||||||
|
m.responseCode = http.StatusUnauthorized
|
||||||
|
return fmt.Errorf("unauthorized request from %v", req.RemoteAddr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *message) decodeData(r *requestBody) ([][]byte, error) {
|
||||||
|
// Decode base64-encoded data and return them as a slice of byte slices
|
||||||
|
decodedData := make([][]byte, 0)
|
||||||
|
for _, record := range r.Records {
|
||||||
|
data, err := base64.StdEncoding.DecodeString(record.EncodedData)
|
||||||
|
if err != nil {
|
||||||
|
m.responseCode = http.StatusBadRequest
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
decodedData = append(decodedData, data)
|
||||||
|
}
|
||||||
|
return decodedData, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *message) extractTagsFromCommonAttributes(req *http.Request, tagkeys []string) (map[string]string, error) {
|
||||||
|
tags := make(map[string]string, len(tagkeys))
|
||||||
|
|
||||||
|
h := req.Header.Get("x-amz-firehose-common-attributes")
|
||||||
|
if len(tagkeys) == 0 || h == "" {
|
||||||
|
return tags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var params map[string]interface{}
|
||||||
|
if err := json.Unmarshal([]byte(h), ¶ms); err != nil {
|
||||||
|
m.responseCode = http.StatusBadRequest
|
||||||
|
return nil, fmt.Errorf("decoding x-amz-firehose-common-attributes header failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
raw, ok := params["commonAttributes"]
|
||||||
|
if !ok {
|
||||||
|
m.responseCode = http.StatusBadRequest
|
||||||
|
return nil, errors.New("commonAttributes not found in x-amz-firehose-common-attributes header")
|
||||||
|
}
|
||||||
|
|
||||||
|
attributes, ok := raw.(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
m.responseCode = http.StatusBadRequest
|
||||||
|
return nil, errors.New("parse parameters data failed")
|
||||||
|
}
|
||||||
|
for _, k := range tagkeys {
|
||||||
|
if v, found := attributes[k]; found {
|
||||||
|
tags[k] = v.(string)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tags, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *message) sendResponse(w http.ResponseWriter) error {
|
||||||
|
var errorMessage string
|
||||||
|
if m.responseCode != http.StatusOK {
|
||||||
|
errorMessage = http.StatusText(m.responseCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
response, err := json.Marshal(responseBody{
|
||||||
|
RequestID: m.id,
|
||||||
|
Timestamp: time.Now().Unix(),
|
||||||
|
ErrorMessage: errorMessage,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.Header().Set("content-type", "application/json")
|
||||||
|
w.WriteHeader(m.responseCode)
|
||||||
|
if _, err := w.Write(response); err != nil {
|
||||||
|
return fmt.Errorf("writing response to request %s failed: %w", m.id, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,42 @@
|
||||||
|
# AWS Data Firehose listener
|
||||||
|
[[inputs.firehose]]
|
||||||
|
## Address and port to host HTTP listener on
|
||||||
|
service_address = ":8080"
|
||||||
|
|
||||||
|
## Paths to listen to.
|
||||||
|
# paths = ["/telegraf"]
|
||||||
|
|
||||||
|
## maximum duration before timing out read of the request
|
||||||
|
# read_timeout = "5s"
|
||||||
|
## maximum duration before timing out write of the response
|
||||||
|
# write_timeout = "5s"
|
||||||
|
|
||||||
|
## Set one or more allowed client CA certificate file names to
|
||||||
|
## enable mutually authenticated TLS connections
|
||||||
|
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
|
||||||
|
|
||||||
|
## Add service certificate and key
|
||||||
|
# tls_cert = "/etc/telegraf/cert.pem"
|
||||||
|
# tls_key = "/etc/telegraf/key.pem"
|
||||||
|
|
||||||
|
## Minimal TLS version accepted by the server
|
||||||
|
# tls_min_version = "TLS12"
|
||||||
|
|
||||||
|
## Optional access key to accept for authentication.
|
||||||
|
## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key.
|
||||||
|
## If no access_key is provided (default), authentication is completely disabled and
|
||||||
|
## this plugin will accept all request ignoring the provided access-key in the request!
|
||||||
|
# access_key = "foobar"
|
||||||
|
|
||||||
|
## Optional setting to add parameters as tags
|
||||||
|
## If the http header "x-amz-firehose-common-attributes" is not present on the
|
||||||
|
## request, no corresponding tag will be added. The header value should be a
|
||||||
|
## json and should follow the schema as describe in the official documentation:
|
||||||
|
## https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#requestformat
|
||||||
|
# parameter_tags = ["env"]
|
||||||
|
|
||||||
|
## Data format to consume.
|
||||||
|
## Each data format has its own unique set of configuration options, read
|
||||||
|
## more about them here:
|
||||||
|
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
|
||||||
|
# data_format = "influx"
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
{
|
||||||
|
"requestId": "telegraf-test-id",
|
||||||
|
"timestamp":1734625715000000000,
|
||||||
|
"records":[{"data":"dGVzdCB2YWx1ZT00MmkgMTczNDYyNTcxNTAwMDAwMDAwMAo="}]
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
test,device=pc,deployment=prod,path=/telegraf value=42i 1734625715000000000
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
{
|
||||||
|
"x-amz-firehose-request-id": "telegraf-test-id",
|
||||||
|
"x-amz-firehose-access-key": "secret",
|
||||||
|
"x-amz-firehose-common-attributes": "{\"commonAttributes\": {\"deployment\": \"prod\", \"device\": \"pc\"}}"
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,4 @@
|
||||||
|
[[inputs.firehose]]
|
||||||
|
service_address = "dummy"
|
||||||
|
access_key = "secret"
|
||||||
|
parameter_tags = ["deployment", "device"]
|
||||||
Loading…
Reference in New Issue