From 1d4f08954ec190a11f77e5d463580a060fd1ca9c Mon Sep 17 00:00:00 2001 From: Syed Muhammad Hashim Date: Wed, 22 Jan 2025 20:29:00 +0500 Subject: [PATCH] feat(inputs.firehose): Add new plugin (#15988) --- plugins/inputs/all/firehose.go | 5 + plugins/inputs/firehose/README.md | 122 ++++++ plugins/inputs/firehose/firehose.go | 239 +++++++++++ .../inputs/firehose/firehose_request_test.go | 374 ++++++++++++++++++ plugins/inputs/firehose/firehose_test.go | 1 + plugins/inputs/firehose/message.go | 125 ++++++ plugins/inputs/firehose/sample.conf | 42 ++ .../testcases/common-attributes/body.json | 5 + .../testcases/common-attributes/expected.out | 1 + .../testcases/common-attributes/headers.json | 5 + .../testcases/common-attributes/telegraf.conf | 4 + 11 files changed, 923 insertions(+) create mode 100644 plugins/inputs/all/firehose.go create mode 100644 plugins/inputs/firehose/README.md create mode 100644 plugins/inputs/firehose/firehose.go create mode 100644 plugins/inputs/firehose/firehose_request_test.go create mode 100644 plugins/inputs/firehose/firehose_test.go create mode 100644 plugins/inputs/firehose/message.go create mode 100644 plugins/inputs/firehose/sample.conf create mode 100644 plugins/inputs/firehose/testcases/common-attributes/body.json create mode 100644 plugins/inputs/firehose/testcases/common-attributes/expected.out create mode 100644 plugins/inputs/firehose/testcases/common-attributes/headers.json create mode 100644 plugins/inputs/firehose/testcases/common-attributes/telegraf.conf diff --git a/plugins/inputs/all/firehose.go b/plugins/inputs/all/firehose.go new file mode 100644 index 000000000..bf13d189b --- /dev/null +++ b/plugins/inputs/all/firehose.go @@ -0,0 +1,5 @@ +//go:build !custom || inputs || inputs.firehose + +package all + +import _ "github.com/influxdata/telegraf/plugins/inputs/firehose" // register plugin diff --git a/plugins/inputs/firehose/README.md b/plugins/inputs/firehose/README.md new file mode 100644 index 000000000..01e30631a --- /dev/null +++ b/plugins/inputs/firehose/README.md @@ -0,0 +1,122 @@ +# AWS Data Firehose Input Plugin + +This plugin listens for metrics sent via HTTP from [AWS Data Firehose][firehose] +in one of the supported [data formats][data_formats]. +The plugin strictly follows the request-response schema as describe in the +official [documentation][response_spec]. + +⭐ Telegraf v1.34.0 +🏷️ cloud, messaging +💻 all + +[firehose]: https://aws.amazon.com/de/firehose/ +[data_formats]: /docs/DATA_FORMATS_INPUT.md +[response_spec]: https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html + +## Service Input + +This plugin is a service input. Normal plugins gather metrics determined by the +interval setting. Service plugins start a service to listens and waits for +metrics or events to occur. Service plugins have two key differences from +normal plugins: + +1. The global or plugin specific `interval` setting may not apply +2. The CLI options of `--test`, `--test-wait`, and `--once` may not produce + output for this plugin + +## Global configuration options + +In addition to the plugin-specific configuration settings, plugins support +additional global and plugin configuration settings. These settings are used to +modify metrics, tags, and field or create aliases and configure ordering, etc. +See the [CONFIGURATION.md][CONFIGURATION.md] for more details. + +[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins + +## Configuration + +```toml @sample.conf +# AWS Data Firehose listener +[[inputs.firehose]] + ## Address and port to host HTTP listener on + service_address = ":8080" + + ## Paths to listen to. + # paths = ["/telegraf"] + + ## maximum duration before timing out read of the request + # read_timeout = "5s" + ## maximum duration before timing out write of the response + # write_timeout = "5s" + + ## Set one or more allowed client CA certificate file names to + ## enable mutually authenticated TLS connections + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Add service certificate and key + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Minimal TLS version accepted by the server + # tls_min_version = "TLS12" + + ## Optional access key to accept for authentication. + ## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key. + ## If no access_key is provided (default), authentication is completely disabled and + ## this plugin will accept all request ignoring the provided access-key in the request! + # access_key = "foobar" + + ## Optional setting to add parameters as tags + ## If the http header "x-amz-firehose-common-attributes" is not present on the + ## request, no corresponding tag will be added. The header value should be a + ## json and should follow the schema as describe in the official documentation: + ## https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#requestformat + # parameter_tags = ["env"] + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + # data_format = "influx" +``` + +## Metrics + +Metrics are collected from the `records.[*].data` field in the request body. +The data must be base64 encoded and may be sent in any supported +[data format][data_formats]. + +## Example Output + +When run with this configuration: + +```toml +[[inputs.firehose]] + service_address = ":8080" + paths = ["/telegraf"] + data_format = "value" + data_type = "string" +``` + +the following curl command: + +```sh +curl -i -XPOST 'localhost:8080/telegraf' \ +--header 'x-amz-firehose-request-id: ed4acda5-034f-9f42-bba1-f29aea6d7d8f' \ +--header 'Content-Type: application/json' \ +--data '{ + "requestId": "ed4acda5-034f-9f42-bba1-f29aea6d7d8f", + "timestamp": 1578090901599, + "records": [ + { + "data": "aGVsbG8gd29ybGQK" // "hello world" + } + ] +}' +``` + +produces: + +```text +firehose,firehose_http_path=/telegraf value="hello world" 1725001851000000000 +``` diff --git a/plugins/inputs/firehose/firehose.go b/plugins/inputs/firehose/firehose.go new file mode 100644 index 000000000..987864e0b --- /dev/null +++ b/plugins/inputs/firehose/firehose.go @@ -0,0 +1,239 @@ +//go:generate ../../../tools/readme_config_includer/generator +package firehose + +import ( + "context" + "crypto/tls" + _ "embed" + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "slices" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal" + common_tls "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +//go:embed sample.conf +var sampleConfig string + +type Firehose struct { + ServiceAddress string `toml:"service_address"` + Paths []string `toml:"paths"` + ReadTimeout config.Duration `toml:"read_timeout"` + WriteTimeout config.Duration `toml:"write_timeout"` + AccessKey config.Secret `toml:"access_key"` + ParameterTags []string `toml:"parameter_tags"` + Log telegraf.Logger `toml:"-"` + + common_tls.ServerConfig + tlsConf *tls.Config + + once sync.Once + + listener net.Listener + server http.Server + + parser telegraf.Parser + acc telegraf.Accumulator +} + +func (*Firehose) SampleConfig() string { + return sampleConfig +} + +func (f *Firehose) SetParser(parser telegraf.Parser) { + f.parser = parser +} + +func (f *Firehose) Init() error { + if f.ServiceAddress == "" { + f.ServiceAddress = ":8080" + } + if len(f.Paths) == 0 { + f.Paths = []string{"/telegraf"} + } + + var err error + f.tlsConf, err = f.ServerConfig.TLSConfig() + return err +} + +// Start starts the http listener service. +func (f *Firehose) Start(acc telegraf.Accumulator) error { + f.acc = acc + + var err error + if f.tlsConf != nil { + f.listener, err = tls.Listen("tcp", f.ServiceAddress, f.tlsConf) + } else { + f.listener, err = net.Listen("tcp", f.ServiceAddress) + } + if err != nil { + return fmt.Errorf("creating listener failed: %w", err) + } + + f.server = http.Server{ + Addr: f.ServiceAddress, + Handler: f, + ReadTimeout: time.Duration(f.ReadTimeout), + WriteTimeout: time.Duration(f.WriteTimeout), + TLSConfig: f.tlsConf, + } + + go func() { + if err := f.server.Serve(f.listener); err != nil { + if !errors.Is(err, net.ErrClosed) { + f.Log.Errorf("Server failed: %v", err) + } + } + }() + + f.Log.Infof("Listening on %s", f.listener.Addr().String()) + + return nil +} + +// Stop cleans up all resources +func (f *Firehose) Stop() { + if err := f.server.Shutdown(context.Background()); err != nil { + f.Log.Errorf("Shutting down server failed: %v", err) + } +} + +func (*Firehose) Gather(telegraf.Accumulator) error { + return nil +} + +func (f *Firehose) ServeHTTP(res http.ResponseWriter, req *http.Request) { + if !slices.Contains(f.Paths, req.URL.Path) { + res.WriteHeader(http.StatusNotFound) + return + } + + msg, err := f.handleRequest(req) + if err != nil { + f.acc.AddError(err) + } + if err := msg.sendResponse(res); err != nil { + f.acc.AddError(fmt.Errorf("sending response failed: %w", err)) + } +} + +func (f *Firehose) handleRequest(req *http.Request) (*message, error) { + // Create a request with a default response status code + msg := &message{ + responseCode: http.StatusInternalServerError, + } + + // Extract the request ID used to reference the request + msg.id = req.Header.Get("x-amz-firehose-request-id") + if msg.id == "" { + msg.responseCode = http.StatusBadRequest + return msg, errors.New("x-amz-firehose-request-id header is not set") + } + + // Check the maximum body size which can be up to 64 MiB according to + // https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html + if req.ContentLength > int64(64*1024*1024) { + msg.responseCode = http.StatusRequestEntityTooLarge + return msg, errors.New("content length is too large") + } + + // Check the HTTP method used + switch req.Method { + case http.MethodPost, http.MethodPut: + // Do nothing, those methods are allowed + default: + msg.responseCode = http.StatusMethodNotAllowed + return msg, fmt.Errorf("method %q is not allowed", req.Method) + } + + if req.Header.Get("content-type") != "application/json" { + msg.responseCode = http.StatusUnsupportedMediaType + return msg, fmt.Errorf("content type %q is not allowed", req.Header.Get("content-type")) + } + + // Decode the content if necessary and parse the JSON message + encoding := req.Header.Get("content-encoding") + body, err := internal.NewStreamContentDecoder(encoding, req.Body) + if err != nil { + msg.responseCode = http.StatusUnsupportedMediaType + return msg, fmt.Errorf("creating %q decoder for request %q failed: %w", encoding, msg.id, err) + } + defer req.Body.Close() + + var reqbody requestBody + if err := json.NewDecoder(body).Decode(&reqbody); err != nil { + msg.responseCode = http.StatusBadRequest + return msg, fmt.Errorf("decode body for request %q failed: %w", msg.id, err) + } + + // Validate the body content + if msg.id != reqbody.RequestID { + msg.responseCode = http.StatusBadRequest + return msg, fmt.Errorf("mismatch between request ID in header (%q) and body (%q)", msg.id, reqbody.RequestID) + } + + // Authenticate the request + if err := msg.authenticate(req, f.AccessKey); err != nil { + return msg, fmt.Errorf("authentication for request %q failed: %w", msg.id, err) + } + + // Extract the records and parameters for tagging + records, err := msg.decodeData(&reqbody) + if err != nil { + return msg, fmt.Errorf("decode base64 data from request %q failed: %w", msg.id, err) + } + + tags, err := msg.extractTagsFromCommonAttributes(req, f.ParameterTags) + if err != nil { + return msg, fmt.Errorf("extracting parameter tags for request %q failed: %w", msg.id, err) + } + + // Parse the metrics + var metrics []telegraf.Metric + for _, record := range records { + m, err := f.parser.Parse(record) + if err != nil { + // respond with bad request status code to inform firehose about the failure + msg.responseCode = http.StatusBadRequest + return msg, fmt.Errorf("parsing data of request %q failed: %w", msg.id, err) + } + metrics = append(metrics, m...) + } + + if len(metrics) == 0 { + f.once.Do(func() { + f.Log.Info(internal.NoMetricsCreatedMsg) + }) + } + + // Add the extracted tags and the path + for _, m := range metrics { + for k, v := range tags { + m.AddTag(k, v) + } + m.AddTag("path", req.URL.Path) + f.acc.AddMetric(m) + } + + msg.responseCode = http.StatusOK + return msg, nil +} + +func init() { + inputs.Add("firehose", func() telegraf.Input { + return &Firehose{ + ReadTimeout: config.Duration(time.Second * 5), + WriteTimeout: config.Duration(time.Second * 5), + } + }) +} diff --git a/plugins/inputs/firehose/firehose_request_test.go b/plugins/inputs/firehose/firehose_request_test.go new file mode 100644 index 000000000..e954c2be8 --- /dev/null +++ b/plugins/inputs/firehose/firehose_request_test.go @@ -0,0 +1,374 @@ +package firehose + +import ( + "bytes" + "encoding/json" + "net/http" + "os" + "path/filepath" + "sort" + "strings" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func TestInvalidRequests(t *testing.T) { + tests := []struct { + name string + headers map[string]string + body string + method string + expectedMsg string + expectedCode int + }{ + { + name: "missing request id", + headers: map[string]string{"x-amz-firehose-request-id": ""}, + body: `{"requestId":"test-id","timestamp":1578090901599,"records":[{"data":"dGVzdA=="}]}`, + expectedMsg: "x-amz-firehose-request-id header is not set", + expectedCode: 400, + }, + { + name: "request id mismatch", + headers: map[string]string{"x-amz-firehose-request-id": "test-id"}, + body: `{"requestId":"some-other-id","timestamp":1578090901599,"records":[{"data":"dGVzdA=="}]}`, + expectedMsg: "mismatch between request ID", + expectedCode: 400, + }, + { + name: "invalid body", + headers: map[string]string{"x-amz-firehose-request-id": "test-id"}, + body: "not a json", + expectedMsg: `decode body for request "test-id" failed`, + expectedCode: 400, + }, + { + name: "invalid data encoding", + headers: map[string]string{"x-amz-firehose-request-id": "test-id"}, + body: `{"requestId":"test-id","timestamp":1578090901599,"records":[{"data":"not a base64 encoded text"}]}`, + expectedMsg: `ecode base64 data from request "test-id" failed: illegal base64 data`, + expectedCode: 400, + }, + { + name: "content too large", + headers: map[string]string{"x-amz-firehose-request-id": "test-id"}, + body: strings.Repeat("x", 65*1024*1024), + expectedMsg: `content length is too large`, + expectedCode: 413, + }, + { + name: "invalid content type", + headers: map[string]string{ + "x-amz-firehose-request-id": "test-id", + "content-type": "application/text", + }, + body: `{"requestId":"test-id","timestamp":1578090901599,"records":[{"data":"dGVzdA=="}]}`, + expectedMsg: `content type "application/text" is not allowed`, + expectedCode: 415, + }, + { + name: "invalid method", + headers: map[string]string{"x-amz-firehose-request-id": "test-id"}, + body: `{"requestId":"test-id","timestamp":1578090901599,"records":[{"data":"dGVzdA=="}]}`, + method: "GET", + expectedMsg: `method "GET" is not allowed`, + expectedCode: 405, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup plugin and start it + plugin := &Firehose{ + ServiceAddress: "127.0.0.1:0", + Log: &testutil.Logger{}, + } + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Get the listening address + addr := plugin.listener.Addr().String() + + // Create a request with the data defined in the test case + method := "POST" + if tt.method != "" { + method = tt.method + } + req, err := http.NewRequest(method, "http://"+addr+"/telegraf", bytes.NewBufferString(tt.body)) + require.NoError(t, err) + req.Header.Set("content-type", "application/json") + for k, v := range tt.headers { + req.Header.Set(k, v) + } + + // Execute the request + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + // Check the result + require.ErrorContains(t, acc.FirstError(), tt.expectedMsg) + require.Equal(t, tt.expectedCode, resp.StatusCode) + }) + } +} + +func TestAuthentication(t *testing.T) { + tests := []struct { + name string + body string + headers map[string]string + key string + expectedMsg string + expectedCode int + }{ + { + name: "no auth required", + headers: map[string]string{ + "x-amz-firehose-request-id": "test-id", + }, + body: ` + { + "requestId": "test-id", + "timestamp":1734625715000000000, + "records":[{"data":"dGVzdCB2YWx1ZT00MmkgMTczNDYyNTcxNTAwMDAwMDAwMAo="}] + }`, + expectedCode: 200, + }, + { + name: "no auth required but key sent", + headers: map[string]string{ + "x-amz-firehose-request-id": "test-id", + "x-amz-firehose-access-key": "test-key", + }, + body: ` + { + "requestId": "test-id", + "timestamp":1734625715000000000, + "records":[{"data":"dGVzdCB2YWx1ZT00MmkgMTczNDYyNTcxNTAwMDAwMDAwMAo="}] + }`, + expectedCode: 200, + }, + { + name: "auth required success", + headers: map[string]string{ + "x-amz-firehose-request-id": "test-id", + "x-amz-firehose-access-key": "test-key", + }, + body: ` + { + "requestId": "test-id", + "timestamp":1734625715000000000, + "records":[{"data":"dGVzdCB2YWx1ZT00MmkgMTczNDYyNTcxNTAwMDAwMDAwMAo="}] + }`, + key: "test-key", + expectedCode: 200, + }, + { + name: "auth required wrong key", + headers: map[string]string{ + "x-amz-firehose-request-id": "test-id", + "x-amz-firehose-access-key": "foo bar", + }, + body: ` + { + "requestId": "test-id", + "timestamp":1734625715000000000, + "records":[{"data":"dGVzdCB2YWx1ZT00MmkgMTczNDYyNTcxNTAwMDAwMDAwMAo="}] + }`, + key: "test-key", + expectedMsg: "unauthorized request", + expectedCode: 401, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup plugin + plugin := &Firehose{ + ServiceAddress: "127.0.0.1:0", + AccessKey: config.NewSecret([]byte(tt.key)), + Log: &testutil.Logger{}, + } + + // Setup a parser + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + plugin.SetParser(parser) + + // Start the plugin + require.NoError(t, plugin.Init()) + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Get the listening address + addr := plugin.listener.Addr().String() + + // Create a request with the data defined in the test case + req, err := http.NewRequest("POST", "http://"+addr+"/telegraf", bytes.NewBufferString(tt.body)) + require.NoError(t, err) + req.Header.Set("content-type", "application/json") + for k, v := range tt.headers { + req.Header.Set(k, v) + } + + // Execute the request + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + // Check the result + if tt.expectedMsg == "" { + require.NoError(t, acc.FirstError()) + } else { + require.ErrorContains(t, acc.FirstError(), tt.expectedMsg) + } + require.Equal(t, tt.expectedCode, resp.StatusCode) + }) + } +} + +func TestCases(t *testing.T) { + // Get all directories in testdata + folders, err := os.ReadDir("testcases") + require.NoError(t, err) + + // Register the plugin + inputs.Add("firehose", func() telegraf.Input { + return &Firehose{ + ReadTimeout: config.Duration(time.Second * 5), + WriteTimeout: config.Duration(time.Second * 5), + } + }) + + // Prepare the influx parser for expectations + parser := &influx.Parser{} + require.NoError(t, parser.Init()) + + for _, f := range folders { + // Only handle folders + if !f.IsDir() { + continue + } + testcasePath := filepath.Join("testcases", f.Name()) + configFilename := filepath.Join(testcasePath, "telegraf.conf") + expectedFilename := filepath.Join(testcasePath, "expected.out") + expectedErrorFilename := filepath.Join(testcasePath, "expected.err") + + t.Run(f.Name(), func(t *testing.T) { + // Read the input data + headers, bodies, err := readInputData(testcasePath) + require.NoError(t, err) + + // Read the expected output if any + var expected []telegraf.Metric + if _, err := os.Stat(expectedFilename); err == nil { + var err error + expected, err = testutil.ParseMetricsFromFile(expectedFilename, parser) + require.NoError(t, err) + } + + // Read the expected output if any + var expectedErrors []string + if _, err := os.Stat(expectedErrorFilename); err == nil { + var err error + expectedErrors, err = testutil.ParseLinesFromFile(expectedErrorFilename) + require.NoError(t, err) + require.NotEmpty(t, expectedErrors) + } + + // Configure and initialize the plugin + cfg := config.NewConfig() + require.NoError(t, cfg.LoadConfig(configFilename)) + require.Len(t, cfg.Inputs, 1) + + plugin := cfg.Inputs[0].Input.(*Firehose) + plugin.ServiceAddress = "127.0.0.1:0" + require.NoError(t, plugin.Init()) + + // Start the plugin + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + defer plugin.Stop() + + // Get the listening address + addr := plugin.listener.Addr().String() + + // Set all message bodies + endpoint := "http://" + addr + plugin.Paths[0] + for i, body := range bodies { + req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer(body)) + require.NoErrorf(t, err, "creating request for body %d", i) + req.Header.Set("content-type", "application/json") + for k, v := range headers { + req.Header.Set(k, v) + } + + // Execute the request + resp, err := http.DefaultClient.Do(req) + require.NoErrorf(t, err, "executing request for body %d", i) + resp.Body.Close() + + if len(expectedErrors) == 0 { + require.Equalf(t, 200, resp.StatusCode, "result for body %d: %v", i, acc.Errors) + } else { + require.NotEqualf(t, 200, resp.StatusCode, "result for body %d: %v", i, acc.Errors) + } + } + + // Check the result + var actualErrorMsgs []string + if len(acc.Errors) > 0 { + for _, err := range acc.Errors { + actualErrorMsgs = append(actualErrorMsgs, err.Error()) + } + } + require.ElementsMatch(t, actualErrorMsgs, expectedErrors) + + // Check the metric nevertheless as we might get some metrics despite errors. + actual := acc.GetTelegrafMetrics() + testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics()) + }) + } +} + +func readInputData(path string) (map[string]string, [][]byte, error) { + // Reading the headers file + var headers map[string]string + headersBuf, err := os.ReadFile(filepath.Join(path, "headers.json")) + if err != nil { + return nil, nil, err + } + if err := json.Unmarshal(headersBuf, &headers); err != nil { + return nil, nil, err + } + + // Read all bodies + bodyFiles, err := filepath.Glob(filepath.Join(path, "body*.json")) + if err != nil { + return nil, nil, err + } + sort.Strings(bodyFiles) + bodies := make([][]byte, 0, len(bodyFiles)) + for _, fn := range bodyFiles { + buf, err := os.ReadFile(fn) + if err != nil { + return nil, nil, err + } + bodies = append(bodies, buf) + } + + return headers, bodies, nil +} diff --git a/plugins/inputs/firehose/firehose_test.go b/plugins/inputs/firehose/firehose_test.go new file mode 100644 index 000000000..b46ed233c --- /dev/null +++ b/plugins/inputs/firehose/firehose_test.go @@ -0,0 +1 @@ +package firehose diff --git a/plugins/inputs/firehose/message.go b/plugins/inputs/firehose/message.go new file mode 100644 index 000000000..7575cec0a --- /dev/null +++ b/plugins/inputs/firehose/message.go @@ -0,0 +1,125 @@ +package firehose + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" + + "github.com/influxdata/telegraf/config" +) + +// Firehose request data-structures according to +// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#requestformat +type record struct { + EncodedData string `json:"data"` +} + +type requestBody struct { + RequestID string `json:"requestId"` + Timestamp int64 `json:"timestamp"` + Records []record `json:"records"` +} + +// Required response data structure according to +// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#responseformat +type responseBody struct { + RequestID string `json:"requestId"` + Timestamp int64 `json:"timestamp"` + ErrorMessage string `json:"errorMessage,omitempty"` +} + +type message struct { + id string + responseCode int +} + +func (m *message) authenticate(req *http.Request, expected config.Secret) error { + // We completely switch off authentication if no 'access_key' was provided in the config, it's intended! + if expected.Empty() { + return nil + } + key := req.Header.Get("x-amz-firehose-access-key") + match, err := expected.EqualTo([]byte(key)) + if err != nil { + m.responseCode = http.StatusInternalServerError + return fmt.Errorf("comparing keys failed: %w", err) + } + if !match { + m.responseCode = http.StatusUnauthorized + return fmt.Errorf("unauthorized request from %v", req.RemoteAddr) + } + + return nil +} + +func (m *message) decodeData(r *requestBody) ([][]byte, error) { + // Decode base64-encoded data and return them as a slice of byte slices + decodedData := make([][]byte, 0) + for _, record := range r.Records { + data, err := base64.StdEncoding.DecodeString(record.EncodedData) + if err != nil { + m.responseCode = http.StatusBadRequest + return nil, err + } + decodedData = append(decodedData, data) + } + return decodedData, nil +} + +func (m *message) extractTagsFromCommonAttributes(req *http.Request, tagkeys []string) (map[string]string, error) { + tags := make(map[string]string, len(tagkeys)) + + h := req.Header.Get("x-amz-firehose-common-attributes") + if len(tagkeys) == 0 || h == "" { + return tags, nil + } + + var params map[string]interface{} + if err := json.Unmarshal([]byte(h), ¶ms); err != nil { + m.responseCode = http.StatusBadRequest + return nil, fmt.Errorf("decoding x-amz-firehose-common-attributes header failed: %w", err) + } + + raw, ok := params["commonAttributes"] + if !ok { + m.responseCode = http.StatusBadRequest + return nil, errors.New("commonAttributes not found in x-amz-firehose-common-attributes header") + } + + attributes, ok := raw.(map[string]interface{}) + if !ok { + m.responseCode = http.StatusBadRequest + return nil, errors.New("parse parameters data failed") + } + for _, k := range tagkeys { + if v, found := attributes[k]; found { + tags[k] = v.(string) + } + } + return tags, nil +} + +func (m *message) sendResponse(w http.ResponseWriter) error { + var errorMessage string + if m.responseCode != http.StatusOK { + errorMessage = http.StatusText(m.responseCode) + } + + response, err := json.Marshal(responseBody{ + RequestID: m.id, + Timestamp: time.Now().Unix(), + ErrorMessage: errorMessage, + }) + if err != nil { + return err + } + w.Header().Set("content-type", "application/json") + w.WriteHeader(m.responseCode) + if _, err := w.Write(response); err != nil { + return fmt.Errorf("writing response to request %s failed: %w", m.id, err) + } + return nil +} diff --git a/plugins/inputs/firehose/sample.conf b/plugins/inputs/firehose/sample.conf new file mode 100644 index 000000000..e0d740108 --- /dev/null +++ b/plugins/inputs/firehose/sample.conf @@ -0,0 +1,42 @@ +# AWS Data Firehose listener +[[inputs.firehose]] + ## Address and port to host HTTP listener on + service_address = ":8080" + + ## Paths to listen to. + # paths = ["/telegraf"] + + ## maximum duration before timing out read of the request + # read_timeout = "5s" + ## maximum duration before timing out write of the response + # write_timeout = "5s" + + ## Set one or more allowed client CA certificate file names to + ## enable mutually authenticated TLS connections + # tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"] + + ## Add service certificate and key + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + + ## Minimal TLS version accepted by the server + # tls_min_version = "TLS12" + + ## Optional access key to accept for authentication. + ## AWS Data Firehose uses "x-amz-firehose-access-key" header to set the access key. + ## If no access_key is provided (default), authentication is completely disabled and + ## this plugin will accept all request ignoring the provided access-key in the request! + # access_key = "foobar" + + ## Optional setting to add parameters as tags + ## If the http header "x-amz-firehose-common-attributes" is not present on the + ## request, no corresponding tag will be added. The header value should be a + ## json and should follow the schema as describe in the official documentation: + ## https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html#requestformat + # parameter_tags = ["env"] + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + # data_format = "influx" diff --git a/plugins/inputs/firehose/testcases/common-attributes/body.json b/plugins/inputs/firehose/testcases/common-attributes/body.json new file mode 100644 index 000000000..b6a0a8e72 --- /dev/null +++ b/plugins/inputs/firehose/testcases/common-attributes/body.json @@ -0,0 +1,5 @@ +{ + "requestId": "telegraf-test-id", + "timestamp":1734625715000000000, + "records":[{"data":"dGVzdCB2YWx1ZT00MmkgMTczNDYyNTcxNTAwMDAwMDAwMAo="}] +} \ No newline at end of file diff --git a/plugins/inputs/firehose/testcases/common-attributes/expected.out b/plugins/inputs/firehose/testcases/common-attributes/expected.out new file mode 100644 index 000000000..c538502d1 --- /dev/null +++ b/plugins/inputs/firehose/testcases/common-attributes/expected.out @@ -0,0 +1 @@ +test,device=pc,deployment=prod,path=/telegraf value=42i 1734625715000000000 diff --git a/plugins/inputs/firehose/testcases/common-attributes/headers.json b/plugins/inputs/firehose/testcases/common-attributes/headers.json new file mode 100644 index 000000000..231121dfe --- /dev/null +++ b/plugins/inputs/firehose/testcases/common-attributes/headers.json @@ -0,0 +1,5 @@ +{ + "x-amz-firehose-request-id": "telegraf-test-id", + "x-amz-firehose-access-key": "secret", + "x-amz-firehose-common-attributes": "{\"commonAttributes\": {\"deployment\": \"prod\", \"device\": \"pc\"}}" +} \ No newline at end of file diff --git a/plugins/inputs/firehose/testcases/common-attributes/telegraf.conf b/plugins/inputs/firehose/testcases/common-attributes/telegraf.conf new file mode 100644 index 000000000..ac01f1585 --- /dev/null +++ b/plugins/inputs/firehose/testcases/common-attributes/telegraf.conf @@ -0,0 +1,4 @@ +[[inputs.firehose]] + service_address = "dummy" + access_key = "secret" + parameter_tags = ["deployment", "device"]