diff --git a/README.md b/README.md index 58254877c..0702d6b4d 100644 --- a/README.md +++ b/README.md @@ -471,5 +471,6 @@ For documentation on the latest development code see the [documentation index][d * [udp](./plugins/outputs/socket_writer) * [warp10](./plugins/outputs/warp10) * [wavefront](./plugins/outputs/wavefront) +* [websocket](./plugins/outputs/websocket) * [sumologic](./plugins/outputs/sumologic) * [yandex_cloud_monitoring](./plugins/outputs/yandex_cloud_monitoring) diff --git a/go.mod b/go.mod index 810f61613..d3f08990d 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/google/go-github/v32 v32.1.0 github.com/gopcua/opcua v0.1.13 github.com/gorilla/mux v1.7.3 + github.com/gorilla/websocket v1.4.2 github.com/gosnmp/gosnmp v1.32.0 github.com/grid-x/modbus v0.0.0-20210224155242-c4a3d042e99b github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 893ac91f4..8fc5f8b75 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -47,5 +47,6 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/timestream" _ "github.com/influxdata/telegraf/plugins/outputs/warp10" _ "github.com/influxdata/telegraf/plugins/outputs/wavefront" + _ "github.com/influxdata/telegraf/plugins/outputs/websocket" _ "github.com/influxdata/telegraf/plugins/outputs/yandex_cloud_monitoring" ) diff --git a/plugins/outputs/websocket/README.md b/plugins/outputs/websocket/README.md new file mode 100644 index 000000000..577c10e6b --- /dev/null +++ b/plugins/outputs/websocket/README.md @@ -0,0 +1,39 @@ +# Websocket Output Plugin + +This plugin can write to a WebSocket endpoint. + +It can output data in any of the [supported output formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md). + +### Configuration: + +```toml +# A plugin that can transmit metrics over WebSocket. +[[outputs.websocket]] + ## URL is the address to send metrics to. Make sure ws or wss scheme is used. + url = "ws://127.0.0.1:3000/telegraf" + + ## Timeouts (make sure read_timeout is larger than server ping interval or set to zero). + # connect_timeout = "30s" + # write_timeout = "30s" + # read_timeout = "30s" + + ## Optionally turn on using text data frames (binary by default). + # use_text_frames = false + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + # data_format = "influx" + + ## Additional HTTP Upgrade headers + # [outputs.websocket.headers] + # Authorization = "Bearer " +``` diff --git a/plugins/outputs/websocket/websocket.go b/plugins/outputs/websocket/websocket.go new file mode 100644 index 000000000..17aea0542 --- /dev/null +++ b/plugins/outputs/websocket/websocket.go @@ -0,0 +1,225 @@ +package websocket + +import ( + "errors" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/plugins/common/proxy" + "github.com/influxdata/telegraf/plugins/common/tls" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" + + ws "github.com/gorilla/websocket" +) + +var sampleConfig = ` + ## URL is the address to send metrics to. Make sure ws or wss scheme is used. + url = "ws://127.0.0.1:8080/telegraf" + + ## Timeouts (make sure read_timeout is larger than server ping interval or set to zero). + # connect_timeout = "30s" + # write_timeout = "30s" + # read_timeout = "30s" + + ## Optionally turn on using text data frames (binary by default). + # use_text_frames = false + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false + + ## Data format to output. + ## Each data format has it's own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + # data_format = "influx" + + ## Additional HTTP Upgrade headers + # [outputs.websocket.headers] + # Authorization = "Bearer " +` + +const ( + defaultConnectTimeout = 30 * time.Second + defaultWriteTimeout = 30 * time.Second + defaultReadTimeout = 30 * time.Second +) + +// WebSocket can output to WebSocket endpoint. +type WebSocket struct { + URL string `toml:"url"` + ConnectTimeout config.Duration `toml:"connect_timeout"` + WriteTimeout config.Duration `toml:"write_timeout"` + ReadTimeout config.Duration `toml:"read_timeout"` + Headers map[string]string `toml:"headers"` + UseTextFrames bool `toml:"use_text_frames"` + Log telegraf.Logger `toml:"-"` + proxy.HTTPProxy + tls.ClientConfig + + conn *ws.Conn + serializer serializers.Serializer +} + +// SetSerializer implements serializers.SerializerOutput. +func (w *WebSocket) SetSerializer(serializer serializers.Serializer) { + w.serializer = serializer +} + +// Description of plugin. +func (w *WebSocket) Description() string { + return "Generic WebSocket output writer." +} + +// SampleConfig returns plugin config sample. +func (w *WebSocket) SampleConfig() string { + return sampleConfig +} + +var errInvalidURL = errors.New("invalid websocket URL") + +// Init the output plugin. +func (w *WebSocket) Init() error { + if parsedURL, err := url.Parse(w.URL); err != nil || (parsedURL.Scheme != "ws" && parsedURL.Scheme != "wss") { + return fmt.Errorf("%w: \"%s\"", errInvalidURL, w.URL) + } + return nil +} + +// Connect to the output endpoint. +func (w *WebSocket) Connect() error { + tlsCfg, err := w.ClientConfig.TLSConfig() + if err != nil { + return fmt.Errorf("error creating TLS config: %v", err) + } + + dialProxy, err := w.HTTPProxy.Proxy() + if err != nil { + return fmt.Errorf("error creating proxy: %v", err) + } + + dialer := &ws.Dialer{ + Proxy: dialProxy, + HandshakeTimeout: time.Duration(w.ConnectTimeout), + TLSClientConfig: tlsCfg, + } + + headers := http.Header{} + for k, v := range w.Headers { + headers.Set(k, v) + } + + conn, resp, err := dialer.Dial(w.URL, headers) + if err != nil { + return fmt.Errorf("error dial: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusSwitchingProtocols { + return fmt.Errorf("wrong status code while connecting to server: %d", resp.StatusCode) + } + + w.conn = conn + go w.read(conn) + + return nil +} + +func (w *WebSocket) read(conn *ws.Conn) { + defer func() { _ = conn.Close() }() + if w.ReadTimeout > 0 { + if err := conn.SetReadDeadline(time.Now().Add(time.Duration(w.ReadTimeout))); err != nil { + w.Log.Errorf("error setting read deadline: %v", err) + return + } + conn.SetPingHandler(func(string) error { + err := conn.SetReadDeadline(time.Now().Add(time.Duration(w.ReadTimeout))) + if err != nil { + w.Log.Errorf("error setting read deadline: %v", err) + return err + } + return conn.WriteControl(ws.PongMessage, nil, time.Now().Add(time.Duration(w.WriteTimeout))) + }) + } + for { + // Need to read a connection (to properly process pings from a server). + _, _, err := conn.ReadMessage() + if err != nil { + // Websocket connection is not readable after first error, it's going to error state. + // In the beginning of this goroutine we have defer section that closes such connection. + // After that connection will be tried to reestablish on next Write. + if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) { + w.Log.Errorf("error reading websocket connection: %v", err) + } + return + } + if w.ReadTimeout > 0 { + if err := conn.SetReadDeadline(time.Now().Add(time.Duration(w.ReadTimeout))); err != nil { + return + } + } + } +} + +// Write writes the given metrics to the destination. Not thread-safe. +func (w *WebSocket) Write(metrics []telegraf.Metric) error { + if w.conn == nil { + // Previous write failed with error and ws conn was closed. + if err := w.Connect(); err != nil { + return err + } + } + + messageData, err := w.serializer.SerializeBatch(metrics) + if err != nil { + return err + } + + if w.WriteTimeout > 0 { + if err := w.conn.SetWriteDeadline(time.Now().Add(time.Duration(w.WriteTimeout))); err != nil { + return fmt.Errorf("error setting write deadline: %v", err) + } + } + messageType := ws.BinaryMessage + if w.UseTextFrames { + messageType = ws.TextMessage + } + err = w.conn.WriteMessage(messageType, messageData) + if err != nil { + _ = w.conn.Close() + w.conn = nil + return fmt.Errorf("error writing to connection: %v", err) + } + return nil +} + +// Close closes the connection. Noop if already closed. +func (w *WebSocket) Close() error { + if w.conn == nil { + return nil + } + err := w.conn.Close() + w.conn = nil + return err +} + +func newWebSocket() *WebSocket { + return &WebSocket{ + ConnectTimeout: config.Duration(defaultConnectTimeout), + WriteTimeout: config.Duration(defaultWriteTimeout), + ReadTimeout: config.Duration(defaultReadTimeout), + } +} + +func init() { + outputs.Add("websocket", func() telegraf.Output { + return newWebSocket() + }) +} diff --git a/plugins/outputs/websocket/websocket_test.go b/plugins/outputs/websocket/websocket_test.go new file mode 100644 index 000000000..a6c74a77d --- /dev/null +++ b/plugins/outputs/websocket/websocket_test.go @@ -0,0 +1,221 @@ +package websocket + +import ( + "net/http" + "net/http/httptest" + "strconv" + "strings" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/testutil" + + ws "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" +) + +// testSerializer serializes to a number of metrics to simplify tests here. +type testSerializer struct{} + +func newTestSerializer() *testSerializer { + return &testSerializer{} +} + +func (t testSerializer) Serialize(_ telegraf.Metric) ([]byte, error) { + return []byte("1"), nil +} + +func (t testSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) { + return []byte(strconv.Itoa(len(metrics))), nil +} + +type testServer struct { + *httptest.Server + t *testing.T + messages chan []byte + upgradeDelay time.Duration + expectTextFrames bool +} + +func newTestServer(t *testing.T, messages chan []byte, tls bool) *testServer { + s := &testServer{} + s.t = t + if tls { + s.Server = httptest.NewTLSServer(s) + } else { + s.Server = httptest.NewServer(s) + } + s.URL = makeWsProto(s.Server.URL) + s.messages = messages + return s +} + +func makeWsProto(s string) string { + return "ws" + strings.TrimPrefix(s, "http") +} + +const ( + testHeaderName = "X-Telegraf-Test" + testHeaderValue = "1" +) + +var testUpgrader = ws.Upgrader{} + +func (s *testServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Header.Get(testHeaderName) != testHeaderValue { + s.t.Fatalf("expected test header found in request, got: %#v", r.Header) + } + if s.upgradeDelay > 0 { + // Emulate long handshake. + select { + case <-r.Context().Done(): + return + case <-time.After(s.upgradeDelay): + } + } + conn, err := testUpgrader.Upgrade(w, r, http.Header{}) + if err != nil { + return + } + defer func() { _ = conn.Close() }() + + for { + messageType, data, err := conn.ReadMessage() + if err != nil { + break + } + if s.expectTextFrames && messageType != ws.TextMessage { + s.t.Fatalf("unexpected frame type: %d", messageType) + } + select { + case s.messages <- data: + case <-time.After(5 * time.Second): + s.t.Fatal("timeout writing to messages channel, make sure there are readers") + } + } +} + +func initWebSocket(s *testServer) *WebSocket { + w := newWebSocket() + w.Log = testutil.Logger{} + w.URL = s.URL + w.Headers = map[string]string{testHeaderName: testHeaderValue} + w.SetSerializer(newTestSerializer()) + return w +} + +func connect(t *testing.T, w *WebSocket) { + err := w.Connect() + require.NoError(t, err) +} + +func TestWebSocket_NoURL(t *testing.T) { + w := newWebSocket() + err := w.Init() + require.ErrorIs(t, err, errInvalidURL) +} + +func TestWebSocket_Connect_Timeout(t *testing.T) { + s := newTestServer(t, nil, false) + s.upgradeDelay = time.Second + defer s.Close() + w := initWebSocket(s) + w.ConnectTimeout = config.Duration(10 * time.Millisecond) + err := w.Connect() + require.Error(t, err) +} + +func TestWebSocket_Connect_OK(t *testing.T) { + s := newTestServer(t, nil, false) + defer s.Close() + w := initWebSocket(s) + connect(t, w) +} + +func TestWebSocket_ConnectTLS_OK(t *testing.T) { + s := newTestServer(t, nil, true) + defer s.Close() + w := initWebSocket(s) + w.ClientConfig.InsecureSkipVerify = true + connect(t, w) +} + +func TestWebSocket_Write_OK(t *testing.T) { + messages := make(chan []byte, 1) + + s := newTestServer(t, messages, false) + defer s.Close() + + w := initWebSocket(s) + connect(t, w) + + var metrics []telegraf.Metric + metrics = append(metrics, testutil.TestMetric(0.4, "test")) + metrics = append(metrics, testutil.TestMetric(0.5, "test")) + err := w.Write(metrics) + require.NoError(t, err) + + select { + case data := <-messages: + require.Equal(t, []byte("2"), data) + case <-time.After(time.Second): + t.Fatal("timeout receiving data") + } +} + +func TestWebSocket_Write_Error(t *testing.T) { + s := newTestServer(t, nil, false) + defer s.Close() + + w := initWebSocket(s) + connect(t, w) + + require.NoError(t, w.conn.Close()) + + metrics := []telegraf.Metric{testutil.TestMetric(0.4, "test")} + err := w.Write(metrics) + require.Error(t, err) + require.Nil(t, w.conn) +} + +func TestWebSocket_Write_Reconnect(t *testing.T) { + messages := make(chan []byte, 1) + s := newTestServer(t, messages, false) + s.expectTextFrames = true // Also use text frames in this test. + defer s.Close() + + w := initWebSocket(s) + w.UseTextFrames = true + connect(t, w) + + metrics := []telegraf.Metric{testutil.TestMetric(0.4, "test")} + + require.NoError(t, w.conn.Close()) + + err := w.Write(metrics) + require.Error(t, err) + require.Nil(t, w.conn) + + err = w.Write(metrics) + require.NoError(t, err) + + select { + case data := <-messages: + require.Equal(t, []byte("1"), data) + case <-time.After(time.Second): + t.Fatal("timeout receiving data") + } +} + +func TestWebSocket_Close(t *testing.T) { + s := newTestServer(t, nil, false) + defer s.Close() + + w := initWebSocket(s) + connect(t, w) + require.NoError(t, w.Close()) + // Check no error on second close. + require.NoError(t, w.Close()) +}