Influxdb v2 listener (#7828)

This commit is contained in:
John Ibsen 2020-09-14 18:41:46 -04:00 committed by GitHub
parent 77b36e0eb7
commit d764f862b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1039 additions and 0 deletions

View File

@ -48,6 +48,41 @@ func (h *basicAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request)
h.next.ServeHTTP(rw, req)
}
type GenericAuthErrorFunc func(rw http.ResponseWriter)
// GenericAuthHandler returns a http handler that requires `Authorization: <credentials>`
func GenericAuthHandler(credentials string, onError GenericAuthErrorFunc) func(h http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return &genericAuthHandler{
credentials: credentials,
onError: onError,
next: h,
}
}
}
// Generic auth scheme handler - exact match on `Authorization: <credentials>`
type genericAuthHandler struct {
credentials string
onError GenericAuthErrorFunc
next http.Handler
}
func (h *genericAuthHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if h.credentials != "" {
// Scheme checking
authorization := req.Header.Get("Authorization")
if subtle.ConstantTimeCompare([]byte(authorization), []byte(h.credentials)) != 1 {
h.onError(rw)
http.Error(rw, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return
}
}
h.next.ServeHTTP(rw, req)
}
// ErrorFunc is a callback for writing an error response.
type ErrorFunc func(rw http.ResponseWriter, code int)

View File

@ -62,6 +62,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/infiniband"
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb"
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/influxdb_v2_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/internal"
_ "github.com/influxdata/telegraf/plugins/inputs/interrupts"
_ "github.com/influxdata/telegraf/plugins/inputs/ipmi_sensor"

View File

@ -0,0 +1,52 @@
# InfluxDB V2 Listener Input Plugin
InfluxDB V2 Listener is a service input plugin that listens for requests sent
according to the [InfluxDB HTTP API][influxdb_http_api]. The intent of the
plugin is to allow Telegraf to serve as a proxy/router for the `/api/v2/write`
endpoint of the InfluxDB HTTP API.
The `/api/v2/write` endpoint supports the `precision` query parameter and can be set
to one of `ns`, `us`, `ms`, `s`. All other parameters are ignored and
defer to the output plugins configuration.
### Configuration:
```toml
## Address and port to host InfluxDB listener on
service_address = ":9999"
## Maximum allowed HTTP request body size in bytes.
## 0 means to use the default of 32MiB.
# max_body_size = "32MiB"
## Optional tag to determine the bucket.
## If the write has a bucket in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
# bucket_tag = ""
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Add service certificate and key
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Optional token to accept for HTTP authentication.
## You probably want to make sure you have TLS configured above for this.
# token = "some-long-shared-secret-token"
```
### Metrics:
Metrics are created from InfluxDB Line Protocol in the request body.
### Troubleshooting:
**Example Query:**
```
curl -i -XPOST 'http://localhost:8186/api/v2/write' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'
```
[influxdb_http_api]: https://v2.docs.influxdata.com/v2.0/api/

View File

@ -0,0 +1,325 @@
package influxdb_v2_listener
import (
"compress/gzip"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/selfstat"
)
const (
// defaultMaxBodySize is the default maximum request body size, in bytes.
// if the request body is over this size, we will return an HTTP 413 error.
defaultMaxBodySize = 32 * 1024 * 1024
)
// The BadRequestCode constants keep standard error messages
// see: https://v2.docs.influxdata.com/v2.0/api/#operation/PostWrite
type BadRequestCode string
const (
InternalError BadRequestCode = "internal error"
Invalid BadRequestCode = "invalid"
)
type InfluxDBV2Listener struct {
ServiceAddress string `toml:"service_address"`
port int
tlsint.ServerConfig
MaxBodySize internal.Size `toml:"max_body_size"`
Token string `toml:"token"`
BucketTag string `toml:"bucket_tag"`
timeFunc influx.TimeFunc
listener net.Listener
server http.Server
acc telegraf.Accumulator
bytesRecv selfstat.Stat
requestsServed selfstat.Stat
writesServed selfstat.Stat
requestsRecv selfstat.Stat
notFoundsServed selfstat.Stat
authFailures selfstat.Stat
startTime time.Time
Log telegraf.Logger `toml:"-"`
mux http.ServeMux
}
const sampleConfig = `
## Address and port to host InfluxDB listener on
service_address = ":9999"
## Maximum allowed HTTP request body size in bytes.
## 0 means to use the default of 32MiB.
# max_body_size = "32MiB"
## Optional tag to determine the bucket.
## If the write has a bucket in the query string then it will be kept in this tag name.
## This tag can be used in downstream outputs.
## The default value of nothing means it will be off and the database will not be recorded.
# bucket_tag = ""
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Add service certificate and key
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Optional token to accept for HTTP authentication.
## You probably want to make sure you have TLS configured above for this.
# token = "some-long-shared-secret-token"
`
func (h *InfluxDBV2Listener) SampleConfig() string {
return sampleConfig
}
func (h *InfluxDBV2Listener) Description() string {
return "Accept metrics over InfluxDB 2.x HTTP API"
}
func (h *InfluxDBV2Listener) Gather(_ telegraf.Accumulator) error {
return nil
}
func (h *InfluxDBV2Listener) routes() {
credentials := ""
if h.Token != "" {
credentials = fmt.Sprintf("Token %s", h.Token)
}
authHandler := internal.GenericAuthHandler(credentials,
func(_ http.ResponseWriter) {
h.authFailures.Incr(1)
},
)
h.mux.Handle("/api/v2/write", authHandler(h.handleWrite()))
h.mux.Handle("/", authHandler(h.handleDefault()))
}
func (h *InfluxDBV2Listener) Init() error {
tags := map[string]string{
"address": h.ServiceAddress,
}
h.bytesRecv = selfstat.Register("influxdb_v2_listener", "bytes_received", tags)
h.requestsServed = selfstat.Register("influxdb_v2_listener", "requests_served", tags)
h.writesServed = selfstat.Register("influxdb_v2_listener", "writes_served", tags)
h.requestsRecv = selfstat.Register("influxdb_v2_listener", "requests_received", tags)
h.notFoundsServed = selfstat.Register("influxdb_v2_listener", "not_founds_served", tags)
h.authFailures = selfstat.Register("influxdb_v2_listener", "auth_failures", tags)
h.routes()
if h.MaxBodySize.Size == 0 {
h.MaxBodySize.Size = defaultMaxBodySize
}
return nil
}
// Start starts the InfluxDB listener service.
func (h *InfluxDBV2Listener) Start(acc telegraf.Accumulator) error {
h.acc = acc
tlsConf, err := h.ServerConfig.TLSConfig()
if err != nil {
return err
}
h.server = http.Server{
Addr: h.ServiceAddress,
Handler: h,
TLSConfig: tlsConf,
}
var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
if err != nil {
return err
}
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
if err != nil {
return err
}
}
h.listener = listener
h.port = listener.Addr().(*net.TCPAddr).Port
go func() {
err = h.server.Serve(h.listener)
if err != http.ErrServerClosed {
h.Log.Infof("Error serving HTTP on %s", h.ServiceAddress)
}
}()
h.startTime = h.timeFunc()
h.Log.Infof("Started HTTP listener service on %s", h.ServiceAddress)
return nil
}
// Stop cleans up all resources
func (h *InfluxDBV2Listener) Stop() {
err := h.server.Shutdown(context.Background())
if err != nil {
h.Log.Infof("Error shutting down HTTP server: %v", err.Error())
}
}
func (h *InfluxDBV2Listener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
h.requestsRecv.Incr(1)
h.mux.ServeHTTP(res, req)
h.requestsServed.Incr(1)
}
func (h *InfluxDBV2Listener) handleDefault() http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
defer h.notFoundsServed.Incr(1)
http.NotFound(res, req)
}
}
func (h *InfluxDBV2Listener) handleWrite() http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
defer h.writesServed.Incr(1)
// Check that the content length is not too large for us to handle.
if req.ContentLength > h.MaxBodySize.Size {
tooLarge(res, h.MaxBodySize.Size)
return
}
bucket := req.URL.Query().Get("bucket")
body := req.Body
body = http.MaxBytesReader(res, body, h.MaxBodySize.Size)
// Handle gzip request bodies
if req.Header.Get("Content-Encoding") == "gzip" {
var err error
body, err = gzip.NewReader(body)
if err != nil {
h.Log.Debugf("Error decompressing request body: %v", err.Error())
badRequest(res, Invalid, err.Error())
return
}
defer body.Close()
}
var readErr error
var bytes []byte
//body = http.MaxBytesReader(res, req.Body, 1000000) //p.MaxBodySize.Size)
bytes, readErr = ioutil.ReadAll(body)
if readErr != nil {
h.Log.Debugf("Error parsing the request body: %v", readErr.Error())
badRequest(res, InternalError, readErr.Error())
return
}
metricHandler := influx.NewMetricHandler()
parser := influx.NewParser(metricHandler)
parser.SetTimeFunc(h.timeFunc)
precisionStr := req.URL.Query().Get("precision")
if precisionStr != "" {
precision := getPrecisionMultiplier(precisionStr)
metricHandler.SetTimePrecision(precision)
}
var metrics []telegraf.Metric
var err error
metrics, err = parser.Parse(bytes)
if err != influx.EOF && err != nil {
h.Log.Debugf("Error parsing the request body: %v", err.Error())
badRequest(res, Invalid, err.Error())
return
}
for _, m := range metrics {
// Handle bucket_tag override
if h.BucketTag != "" && bucket != "" {
m.AddTag(h.BucketTag, bucket)
}
h.acc.AddMetric(m)
}
// http request success
res.WriteHeader(http.StatusNoContent)
}
}
func tooLarge(res http.ResponseWriter, maxLength int64) {
res.Header().Set("Content-Type", "application/json")
res.Header().Set("X-Influxdb-Error", "http: request body too large")
res.WriteHeader(http.StatusRequestEntityTooLarge)
b, _ := json.Marshal(map[string]string{
"code": fmt.Sprint(Invalid),
"message": "http: request body too large",
"maxLength": fmt.Sprint(maxLength)})
res.Write(b)
}
func badRequest(res http.ResponseWriter, code BadRequestCode, errString string) {
res.Header().Set("Content-Type", "application/json")
if errString == "" {
errString = "http: bad request"
}
res.Header().Set("X-Influxdb-Error", errString)
res.WriteHeader(http.StatusBadRequest)
b, _ := json.Marshal(map[string]string{
"code": fmt.Sprint(code),
"message": errString,
"op": "",
"err": errString,
})
res.Write(b)
}
func getPrecisionMultiplier(precision string) time.Duration {
// Influxdb defaults silently to nanoseconds if precision isn't
// one of the following:
var d time.Duration
switch precision {
case "us":
d = time.Microsecond
case "ms":
d = time.Millisecond
case "s":
d = time.Second
default:
d = time.Nanosecond
}
return d
}
func init() {
inputs.Add("influxdb_v2_listener", func() telegraf.Input {
return &InfluxDBV2Listener{
ServiceAddress: ":9999",
timeFunc: time.Now,
}
})
}

View File

@ -0,0 +1,108 @@
package influxdb_v2_listener
import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/selfstat"
"github.com/influxdata/telegraf/testutil"
)
// newListener is the minimal InfluxDBV2Listener construction to serve writes.
func newListener() *InfluxDBV2Listener {
listener := &InfluxDBV2Listener{
timeFunc: time.Now,
acc: &testutil.NopAccumulator{},
bytesRecv: selfstat.Register("influxdb_v2_listener", "bytes_received", map[string]string{}),
writesServed: selfstat.Register("influxdb_v2_listener", "writes_served", map[string]string{}),
MaxBodySize: internal.Size{
Size: defaultMaxBodySize,
},
}
return listener
}
func BenchmarkInfluxDBV2Listener_serveWrite(b *testing.B) {
res := httptest.NewRecorder()
addr := "http://localhost/api/v2/write?bucket=mybucket"
benchmarks := []struct {
name string
lines string
}{
{
name: "single line, tag, and field",
lines: lines(1, 1, 1),
},
{
name: "single line, 10 tags and fields",
lines: lines(1, 10, 10),
},
{
name: "single line, 100 tags and fields",
lines: lines(1, 100, 100),
},
{
name: "1k lines, single tag and field",
lines: lines(1000, 1, 1),
},
{
name: "1k lines, 10 tags and fields",
lines: lines(1000, 10, 10),
},
{
name: "10k lines, 10 tags and fields",
lines: lines(10000, 10, 10),
},
{
name: "100k lines, 10 tags and fields",
lines: lines(100000, 10, 10),
},
}
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
listener := newListener()
b.ResetTimer()
for n := 0; n < b.N; n++ {
req, err := http.NewRequest("POST", addr, strings.NewReader(bm.lines))
if err != nil {
b.Error(err)
}
listener.handleWrite()(res, req)
if res.Code != http.StatusNoContent {
b.Errorf("unexpected status %d", res.Code)
}
}
})
}
}
func lines(lines, numTags, numFields int) string {
lp := make([]string, lines)
for i := 0; i < lines; i++ {
tags := make([]string, numTags)
for j := 0; j < numTags; j++ {
tags[j] = fmt.Sprintf("t%d=v%d", j, j)
}
fields := make([]string, numFields)
for k := 0; k < numFields; k++ {
fields[k] = fmt.Sprintf("f%d=%d", k, k)
}
lp[i] = fmt.Sprintf("m%d,%s %s",
i,
strings.Join(tags, ","),
strings.Join(fields, ","),
)
}
return strings.Join(lp, "\n")
}

File diff suppressed because one or more lines are too long

Binary file not shown.