Add WebSocket output plugin (#9188)

This commit is contained in:
Alexander Emelin 2021-06-04 07:53:38 +03:00 committed by GitHub
parent e289612ff3
commit 0fd0ae0953
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 488 additions and 0 deletions

View File

@ -471,5 +471,6 @@ For documentation on the latest development code see the [documentation index][d
* [udp](./plugins/outputs/socket_writer)
* [warp10](./plugins/outputs/warp10)
* [wavefront](./plugins/outputs/wavefront)
* [websocket](./plugins/outputs/websocket)
* [sumologic](./plugins/outputs/sumologic)
* [yandex_cloud_monitoring](./plugins/outputs/yandex_cloud_monitoring)

1
go.mod
View File

@ -63,6 +63,7 @@ require (
github.com/google/go-github/v32 v32.1.0
github.com/gopcua/opcua v0.1.13
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.4.2
github.com/gosnmp/gosnmp v1.32.0
github.com/grid-x/modbus v0.0.0-20210224155242-c4a3d042e99b
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect

View File

@ -47,5 +47,6 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/timestream"
_ "github.com/influxdata/telegraf/plugins/outputs/warp10"
_ "github.com/influxdata/telegraf/plugins/outputs/wavefront"
_ "github.com/influxdata/telegraf/plugins/outputs/websocket"
_ "github.com/influxdata/telegraf/plugins/outputs/yandex_cloud_monitoring"
)

View File

@ -0,0 +1,39 @@
# Websocket Output Plugin
This plugin can write to a WebSocket endpoint.
It can output data in any of the [supported output formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md).
### Configuration:
```toml
# A plugin that can transmit metrics over WebSocket.
[[outputs.websocket]]
## URL is the address to send metrics to. Make sure ws or wss scheme is used.
url = "ws://127.0.0.1:3000/telegraf"
## Timeouts (make sure read_timeout is larger than server ping interval or set to zero).
# connect_timeout = "30s"
# write_timeout = "30s"
# read_timeout = "30s"
## Optionally turn on using text data frames (binary by default).
# use_text_frames = false
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
# data_format = "influx"
## Additional HTTP Upgrade headers
# [outputs.websocket.headers]
# Authorization = "Bearer <TOKEN>"
```

View File

@ -0,0 +1,225 @@
package websocket
import (
"errors"
"fmt"
"net/http"
"net/url"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/common/proxy"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
ws "github.com/gorilla/websocket"
)
var sampleConfig = `
## URL is the address to send metrics to. Make sure ws or wss scheme is used.
url = "ws://127.0.0.1:8080/telegraf"
## Timeouts (make sure read_timeout is larger than server ping interval or set to zero).
# connect_timeout = "30s"
# write_timeout = "30s"
# read_timeout = "30s"
## Optionally turn on using text data frames (binary by default).
# use_text_frames = false
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Data format to output.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
# data_format = "influx"
## Additional HTTP Upgrade headers
# [outputs.websocket.headers]
# Authorization = "Bearer <TOKEN>"
`
const (
defaultConnectTimeout = 30 * time.Second
defaultWriteTimeout = 30 * time.Second
defaultReadTimeout = 30 * time.Second
)
// WebSocket can output to WebSocket endpoint.
type WebSocket struct {
URL string `toml:"url"`
ConnectTimeout config.Duration `toml:"connect_timeout"`
WriteTimeout config.Duration `toml:"write_timeout"`
ReadTimeout config.Duration `toml:"read_timeout"`
Headers map[string]string `toml:"headers"`
UseTextFrames bool `toml:"use_text_frames"`
Log telegraf.Logger `toml:"-"`
proxy.HTTPProxy
tls.ClientConfig
conn *ws.Conn
serializer serializers.Serializer
}
// SetSerializer implements serializers.SerializerOutput.
func (w *WebSocket) SetSerializer(serializer serializers.Serializer) {
w.serializer = serializer
}
// Description of plugin.
func (w *WebSocket) Description() string {
return "Generic WebSocket output writer."
}
// SampleConfig returns plugin config sample.
func (w *WebSocket) SampleConfig() string {
return sampleConfig
}
var errInvalidURL = errors.New("invalid websocket URL")
// Init the output plugin.
func (w *WebSocket) Init() error {
if parsedURL, err := url.Parse(w.URL); err != nil || (parsedURL.Scheme != "ws" && parsedURL.Scheme != "wss") {
return fmt.Errorf("%w: \"%s\"", errInvalidURL, w.URL)
}
return nil
}
// Connect to the output endpoint.
func (w *WebSocket) Connect() error {
tlsCfg, err := w.ClientConfig.TLSConfig()
if err != nil {
return fmt.Errorf("error creating TLS config: %v", err)
}
dialProxy, err := w.HTTPProxy.Proxy()
if err != nil {
return fmt.Errorf("error creating proxy: %v", err)
}
dialer := &ws.Dialer{
Proxy: dialProxy,
HandshakeTimeout: time.Duration(w.ConnectTimeout),
TLSClientConfig: tlsCfg,
}
headers := http.Header{}
for k, v := range w.Headers {
headers.Set(k, v)
}
conn, resp, err := dialer.Dial(w.URL, headers)
if err != nil {
return fmt.Errorf("error dial: %v", err)
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusSwitchingProtocols {
return fmt.Errorf("wrong status code while connecting to server: %d", resp.StatusCode)
}
w.conn = conn
go w.read(conn)
return nil
}
func (w *WebSocket) read(conn *ws.Conn) {
defer func() { _ = conn.Close() }()
if w.ReadTimeout > 0 {
if err := conn.SetReadDeadline(time.Now().Add(time.Duration(w.ReadTimeout))); err != nil {
w.Log.Errorf("error setting read deadline: %v", err)
return
}
conn.SetPingHandler(func(string) error {
err := conn.SetReadDeadline(time.Now().Add(time.Duration(w.ReadTimeout)))
if err != nil {
w.Log.Errorf("error setting read deadline: %v", err)
return err
}
return conn.WriteControl(ws.PongMessage, nil, time.Now().Add(time.Duration(w.WriteTimeout)))
})
}
for {
// Need to read a connection (to properly process pings from a server).
_, _, err := conn.ReadMessage()
if err != nil {
// Websocket connection is not readable after first error, it's going to error state.
// In the beginning of this goroutine we have defer section that closes such connection.
// After that connection will be tried to reestablish on next Write.
if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
w.Log.Errorf("error reading websocket connection: %v", err)
}
return
}
if w.ReadTimeout > 0 {
if err := conn.SetReadDeadline(time.Now().Add(time.Duration(w.ReadTimeout))); err != nil {
return
}
}
}
}
// Write writes the given metrics to the destination. Not thread-safe.
func (w *WebSocket) Write(metrics []telegraf.Metric) error {
if w.conn == nil {
// Previous write failed with error and ws conn was closed.
if err := w.Connect(); err != nil {
return err
}
}
messageData, err := w.serializer.SerializeBatch(metrics)
if err != nil {
return err
}
if w.WriteTimeout > 0 {
if err := w.conn.SetWriteDeadline(time.Now().Add(time.Duration(w.WriteTimeout))); err != nil {
return fmt.Errorf("error setting write deadline: %v", err)
}
}
messageType := ws.BinaryMessage
if w.UseTextFrames {
messageType = ws.TextMessage
}
err = w.conn.WriteMessage(messageType, messageData)
if err != nil {
_ = w.conn.Close()
w.conn = nil
return fmt.Errorf("error writing to connection: %v", err)
}
return nil
}
// Close closes the connection. Noop if already closed.
func (w *WebSocket) Close() error {
if w.conn == nil {
return nil
}
err := w.conn.Close()
w.conn = nil
return err
}
func newWebSocket() *WebSocket {
return &WebSocket{
ConnectTimeout: config.Duration(defaultConnectTimeout),
WriteTimeout: config.Duration(defaultWriteTimeout),
ReadTimeout: config.Duration(defaultReadTimeout),
}
}
func init() {
outputs.Add("websocket", func() telegraf.Output {
return newWebSocket()
})
}

View File

@ -0,0 +1,221 @@
package websocket
import (
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/testutil"
ws "github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)
// testSerializer serializes to a number of metrics to simplify tests here.
type testSerializer struct{}
func newTestSerializer() *testSerializer {
return &testSerializer{}
}
func (t testSerializer) Serialize(_ telegraf.Metric) ([]byte, error) {
return []byte("1"), nil
}
func (t testSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
return []byte(strconv.Itoa(len(metrics))), nil
}
type testServer struct {
*httptest.Server
t *testing.T
messages chan []byte
upgradeDelay time.Duration
expectTextFrames bool
}
func newTestServer(t *testing.T, messages chan []byte, tls bool) *testServer {
s := &testServer{}
s.t = t
if tls {
s.Server = httptest.NewTLSServer(s)
} else {
s.Server = httptest.NewServer(s)
}
s.URL = makeWsProto(s.Server.URL)
s.messages = messages
return s
}
func makeWsProto(s string) string {
return "ws" + strings.TrimPrefix(s, "http")
}
const (
testHeaderName = "X-Telegraf-Test"
testHeaderValue = "1"
)
var testUpgrader = ws.Upgrader{}
func (s *testServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(testHeaderName) != testHeaderValue {
s.t.Fatalf("expected test header found in request, got: %#v", r.Header)
}
if s.upgradeDelay > 0 {
// Emulate long handshake.
select {
case <-r.Context().Done():
return
case <-time.After(s.upgradeDelay):
}
}
conn, err := testUpgrader.Upgrade(w, r, http.Header{})
if err != nil {
return
}
defer func() { _ = conn.Close() }()
for {
messageType, data, err := conn.ReadMessage()
if err != nil {
break
}
if s.expectTextFrames && messageType != ws.TextMessage {
s.t.Fatalf("unexpected frame type: %d", messageType)
}
select {
case s.messages <- data:
case <-time.After(5 * time.Second):
s.t.Fatal("timeout writing to messages channel, make sure there are readers")
}
}
}
func initWebSocket(s *testServer) *WebSocket {
w := newWebSocket()
w.Log = testutil.Logger{}
w.URL = s.URL
w.Headers = map[string]string{testHeaderName: testHeaderValue}
w.SetSerializer(newTestSerializer())
return w
}
func connect(t *testing.T, w *WebSocket) {
err := w.Connect()
require.NoError(t, err)
}
func TestWebSocket_NoURL(t *testing.T) {
w := newWebSocket()
err := w.Init()
require.ErrorIs(t, err, errInvalidURL)
}
func TestWebSocket_Connect_Timeout(t *testing.T) {
s := newTestServer(t, nil, false)
s.upgradeDelay = time.Second
defer s.Close()
w := initWebSocket(s)
w.ConnectTimeout = config.Duration(10 * time.Millisecond)
err := w.Connect()
require.Error(t, err)
}
func TestWebSocket_Connect_OK(t *testing.T) {
s := newTestServer(t, nil, false)
defer s.Close()
w := initWebSocket(s)
connect(t, w)
}
func TestWebSocket_ConnectTLS_OK(t *testing.T) {
s := newTestServer(t, nil, true)
defer s.Close()
w := initWebSocket(s)
w.ClientConfig.InsecureSkipVerify = true
connect(t, w)
}
func TestWebSocket_Write_OK(t *testing.T) {
messages := make(chan []byte, 1)
s := newTestServer(t, messages, false)
defer s.Close()
w := initWebSocket(s)
connect(t, w)
var metrics []telegraf.Metric
metrics = append(metrics, testutil.TestMetric(0.4, "test"))
metrics = append(metrics, testutil.TestMetric(0.5, "test"))
err := w.Write(metrics)
require.NoError(t, err)
select {
case data := <-messages:
require.Equal(t, []byte("2"), data)
case <-time.After(time.Second):
t.Fatal("timeout receiving data")
}
}
func TestWebSocket_Write_Error(t *testing.T) {
s := newTestServer(t, nil, false)
defer s.Close()
w := initWebSocket(s)
connect(t, w)
require.NoError(t, w.conn.Close())
metrics := []telegraf.Metric{testutil.TestMetric(0.4, "test")}
err := w.Write(metrics)
require.Error(t, err)
require.Nil(t, w.conn)
}
func TestWebSocket_Write_Reconnect(t *testing.T) {
messages := make(chan []byte, 1)
s := newTestServer(t, messages, false)
s.expectTextFrames = true // Also use text frames in this test.
defer s.Close()
w := initWebSocket(s)
w.UseTextFrames = true
connect(t, w)
metrics := []telegraf.Metric{testutil.TestMetric(0.4, "test")}
require.NoError(t, w.conn.Close())
err := w.Write(metrics)
require.Error(t, err)
require.Nil(t, w.conn)
err = w.Write(metrics)
require.NoError(t, err)
select {
case data := <-messages:
require.Equal(t, []byte("1"), data)
case <-time.After(time.Second):
t.Fatal("timeout receiving data")
}
}
func TestWebSocket_Close(t *testing.T) {
s := newTestServer(t, nil, false)
defer s.Close()
w := initWebSocket(s)
connect(t, w)
require.NoError(t, w.Close())
// Check no error on second close.
require.NoError(t, w.Close())
}