From a9898f179bbcbbfbc126b5f604c3357b990c80c3 Mon Sep 17 00:00:00 2001 From: alespour <42931850+alespour@users.noreply.github.com> Date: Tue, 21 Sep 2021 23:02:36 +0200 Subject: [PATCH] feat: add graylog plugin TCP support (#9644) --- etc/telegraf.conf | 4 +- plugins/outputs/graylog/README.md | 10 +- plugins/outputs/graylog/graylog.go | 183 +++++++++++++++++++----- plugins/outputs/graylog/graylog_test.go | 158 ++++++++++++++++++-- 4 files changed, 297 insertions(+), 58 deletions(-) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index beb228214..0ed5ba8eb 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -762,8 +762,8 @@ # # Send telegraf metrics to graylog # [[outputs.graylog]] -# ## UDP endpoint for your graylog instance. -# servers = ["127.0.0.1:12201"] +# ## Endpoints for your graylog instances. +# servers = ["udp://127.0.0.1:12201"] # # ## The field to use as the GELF short_message, if unset the static string # ## "telegraf" will be used. diff --git a/plugins/outputs/graylog/README.md b/plugins/outputs/graylog/README.md index 4945ce46f..600312289 100644 --- a/plugins/outputs/graylog/README.md +++ b/plugins/outputs/graylog/README.md @@ -8,11 +8,17 @@ This plugin writes to a Graylog instance using the "[GELF][]" format. ```toml [[outputs.graylog]] - ## UDP endpoint for your graylog instances. - servers = ["127.0.0.1:12201"] + ## Endpoints for your graylog instances. + servers = ["udp://127.0.0.1:12201"] + + ## Connection timeout. + # timeout = "5s" ## The field to use as the GELF short_message, if unset the static string ## "telegraf" will be used. ## example: short_message_field = "message" # short_message_field = "" ``` + +Server endpoint may be specified without UDP or TCP scheme (eg. "127.0.0.1:12201"). +In such case, UDP protocol is assumed. diff --git a/plugins/outputs/graylog/graylog.go b/plugins/outputs/graylog/graylog.go index cf5dc6dc5..951273e2e 100644 --- a/plugins/outputs/graylog/graylog.go +++ b/plugins/outputs/graylog/graylog.go @@ -11,8 +11,11 @@ import ( "math" "net" "os" + "strings" + "time" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/outputs" ) @@ -21,45 +24,78 @@ const ( defaultConnection = "wan" defaultMaxChunkSizeWan = 1420 defaultMaxChunkSizeLan = 8154 + defaultScheme = "udp" + defaultTimeout = 5 * time.Second ) -type GelfConfig struct { +type gelfConfig struct { GraylogEndpoint string Connection string MaxChunkSizeWan int MaxChunkSizeLan int } -type Gelf struct { - GelfConfig +type gelf interface { + io.WriteCloser } -func NewGelfWriter(config GelfConfig) *Gelf { - if config.GraylogEndpoint == "" { - config.GraylogEndpoint = defaultGraylogEndpoint +type gelfCommon struct { + gelfConfig + dialer *net.Dialer + conn net.Conn +} + +type gelfUDP struct { + gelfCommon +} + +type gelfTCP struct { + gelfCommon +} + +func newGelfWriter(cfg gelfConfig, dialer *net.Dialer) gelf { + if cfg.GraylogEndpoint == "" { + cfg.GraylogEndpoint = defaultGraylogEndpoint } - if config.Connection == "" { - config.Connection = defaultConnection + if cfg.Connection == "" { + cfg.Connection = defaultConnection } - if config.MaxChunkSizeWan == 0 { - config.MaxChunkSizeWan = defaultMaxChunkSizeWan + if cfg.MaxChunkSizeWan == 0 { + cfg.MaxChunkSizeWan = defaultMaxChunkSizeWan } - if config.MaxChunkSizeLan == 0 { - config.MaxChunkSizeLan = defaultMaxChunkSizeLan + if cfg.MaxChunkSizeLan == 0 { + cfg.MaxChunkSizeLan = defaultMaxChunkSizeLan } - g := &Gelf{GelfConfig: config} + scheme := defaultScheme + parts := strings.SplitN(cfg.GraylogEndpoint, "://", 2) + if len(parts) == 2 { + scheme = strings.ToLower(parts[0]) + cfg.GraylogEndpoint = parts[1] + } + common := gelfCommon{ + gelfConfig: cfg, + dialer: dialer, + } + + var g gelf + switch scheme { + case "tcp": + g = &gelfTCP{gelfCommon: common} + default: + g = &gelfUDP{gelfCommon: common} + } return g } -func (g *Gelf) Write(message []byte) (n int, err error) { +func (g *gelfUDP) Write(message []byte) (n int, err error) { compressed := g.compress(message) - chunksize := g.GelfConfig.MaxChunkSizeWan + chunksize := g.gelfConfig.MaxChunkSizeWan length := compressed.Len() if length > chunksize { @@ -84,10 +120,19 @@ func (g *Gelf) Write(message []byte) (n int, err error) { n = len(message) - return + return n, nil } -func (g *Gelf) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer { +func (g *gelfUDP) Close() (err error) { + if g.conn != nil { + err = g.conn.Close() + g.conn = nil + } + + return err +} + +func (g *gelfUDP) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer { var packet bytes.Buffer chunksize := g.getChunksize() @@ -104,26 +149,26 @@ func (g *Gelf) createChunkedMessage(index int, chunkCountInt int, id []byte, com return packet } -func (g *Gelf) getChunksize() int { - if g.GelfConfig.Connection == "wan" { - return g.GelfConfig.MaxChunkSizeWan +func (g *gelfUDP) getChunksize() int { + if g.gelfConfig.Connection == "wan" { + return g.gelfConfig.MaxChunkSizeWan } - if g.GelfConfig.Connection == "lan" { - return g.GelfConfig.MaxChunkSizeLan + if g.gelfConfig.Connection == "lan" { + return g.gelfConfig.MaxChunkSizeLan } - return g.GelfConfig.MaxChunkSizeWan + return g.gelfConfig.MaxChunkSizeWan } -func (g *Gelf) intToBytes(i int) []byte { +func (g *gelfUDP) intToBytes(i int) []byte { buf := new(bytes.Buffer) binary.Write(buf, binary.LittleEndian, int8(i)) return buf.Bytes() } -func (g *Gelf) compress(b []byte) bytes.Buffer { +func (g *gelfUDP) compress(b []byte) bytes.Buffer { var buf bytes.Buffer comp := zlib.NewWriter(&buf) @@ -133,30 +178,83 @@ func (g *Gelf) compress(b []byte) bytes.Buffer { return buf } -func (g *Gelf) send(b []byte) error { - udpAddr, err := net.ResolveUDPAddr("udp", g.GelfConfig.GraylogEndpoint) - if err != nil { - return err +func (g *gelfUDP) send(b []byte) error { + if g.conn == nil { + conn, err := g.dialer.Dial("udp", g.gelfConfig.GraylogEndpoint) + if err != nil { + return err + } + g.conn = conn } - conn, err := net.DialUDP("udp", nil, udpAddr) + _, err := g.conn.Write(b) if err != nil { - return err + _ = g.conn.Close() + g.conn = nil + } + + return err +} + +func (g *gelfTCP) Write(message []byte) (n int, err error) { + err = g.send(message) + if err != nil { + return 0, err + } + + n = len(message) + + return n, nil +} + +func (g *gelfTCP) Close() (err error) { + if g.conn != nil { + err = g.conn.Close() + g.conn = nil + } + + return err +} + +func (g *gelfTCP) send(b []byte) error { + if g.conn == nil { + conn, err := g.dialer.Dial("tcp", g.gelfConfig.GraylogEndpoint) + if err != nil { + return err + } + g.conn = conn + } + + _, err := g.conn.Write(b) + if err != nil { + _ = g.conn.Close() + g.conn = nil + } else { + _, err = g.conn.Write([]byte{0}) // message delimiter + if err != nil { + _ = g.conn.Close() + g.conn = nil + } } - _, err = conn.Write(b) return err } type Graylog struct { - Servers []string `toml:"servers"` - ShortMessageField string `toml:"short_message_field"` - writer io.Writer + Servers []string `toml:"servers"` + ShortMessageField string `toml:"short_message_field"` + Timeout config.Duration `toml:"timeout"` + + writer io.Writer + closers []io.WriteCloser } var sampleConfig = ` - ## UDP endpoint for your graylog instance. - servers = ["127.0.0.1:12201"] + ## Endpoints for your graylog instances. + servers = ["udp://127.0.0.1:12201"] + + ## Connection timeout. + # timeout = "5s" ## The field to use as the GELF short_message, if unset the static string ## "telegraf" will be used. @@ -166,14 +264,16 @@ var sampleConfig = ` func (g *Graylog) Connect() error { writers := []io.Writer{} + dialer := net.Dialer{Timeout: time.Duration(g.Timeout)} if len(g.Servers) == 0 { g.Servers = append(g.Servers, "localhost:12201") } for _, server := range g.Servers { - w := NewGelfWriter(GelfConfig{GraylogEndpoint: server}) + w := newGelfWriter(gelfConfig{GraylogEndpoint: server}, &dialer) writers = append(writers, w) + g.closers = append(g.closers, w) } g.writer = io.MultiWriter(writers...) @@ -181,6 +281,9 @@ func (g *Graylog) Connect() error { } func (g *Graylog) Close() error { + for _, closer := range g.closers { + _ = closer.Close() + } return nil } @@ -253,6 +356,8 @@ func (g *Graylog) serialize(metric telegraf.Metric) ([]string, error) { func init() { outputs.Add("graylog", func() telegraf.Output { - return &Graylog{} + return &Graylog{ + Timeout: config.Duration(defaultTimeout), + } }) } diff --git a/plugins/outputs/graylog/graylog_test.go b/plugins/outputs/graylog/graylog_test.go index 37816a7a2..faa5b34b9 100644 --- a/plugins/outputs/graylog/graylog_test.go +++ b/plugins/outputs/graylog/graylog_test.go @@ -11,9 +11,22 @@ import ( "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func TestWrite(t *testing.T) { +func TestWriteDefault(t *testing.T) { + scenarioUDP(t, "127.0.0.1:12201") +} + +func TestWriteUDP(t *testing.T) { + scenarioUDP(t, "udp://127.0.0.1:12201") +} + +func TestWriteTCP(t *testing.T) { + scenarioTCP(t, "tcp://127.0.0.1:12201") +} + +func scenarioUDP(t *testing.T, server string) { var wg sync.WaitGroup var wg2 sync.WaitGroup wg.Add(1) @@ -22,13 +35,62 @@ func TestWrite(t *testing.T) { wg2.Wait() i := Graylog{ - Servers: []string{"127.0.0.1:12201"}, + Servers: []string{server}, } - i.Connect() + err := i.Connect() + require.NoError(t, err) metrics := testutil.MockMetrics() - i.Write(metrics) + // UDP scenario: + // 4 messages are send + + err = i.Write(metrics) + require.NoError(t, err) + err = i.Write(metrics) + require.NoError(t, err) + err = i.Write(metrics) + require.NoError(t, err) + err = i.Write(metrics) + require.NoError(t, err) + + wg.Wait() + i.Close() +} + +func scenarioTCP(t *testing.T, server string) { + var wg sync.WaitGroup + var wg2 sync.WaitGroup + var wg3 sync.WaitGroup + wg.Add(1) + wg2.Add(1) + wg3.Add(1) + go TCPServer(t, &wg, &wg2, &wg3) + wg2.Wait() + + i := Graylog{ + Servers: []string{server}, + } + err := i.Connect() + require.NoError(t, err) + + metrics := testutil.MockMetrics() + + // TCP scenario: + // 4 messages are send + // -> connection gets broken after the 2nd message (server closes connection) + // -> the 3rd write ends with error + // -> in the 4th write connection is restored and write is successful + + err = i.Write(metrics) + require.NoError(t, err) + err = i.Write(metrics) + require.NoError(t, err) + wg3.Wait() + err = i.Write(metrics) + require.Error(t, err) + err = i.Write(metrics) + require.NoError(t, err) wg.Wait() i.Close() @@ -37,22 +99,88 @@ func TestWrite(t *testing.T) { type GelfObject map[string]interface{} func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup) { - serverAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:12201") - udpServer, _ := net.ListenUDP("udp", serverAddr) + serverAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:12201") + require.NoError(t, err) + udpServer, err := net.ListenUDP("udp", serverAddr) + require.NoError(t, err) + defer udpServer.Close() defer wg.Done() bufR := make([]byte, 1024) wg2.Done() - n, _, _ := udpServer.ReadFromUDP(bufR) - b := bytes.NewReader(bufR[0:n]) - r, _ := zlib.NewReader(b) + recv := func() { + n, _, err := udpServer.ReadFromUDP(bufR) + require.NoError(t, err) - bufW := bytes.NewBuffer(nil) - io.Copy(bufW, r) - r.Close() + b := bytes.NewReader(bufR[0:n]) + r, _ := zlib.NewReader(b) - var obj GelfObject - json.Unmarshal(bufW.Bytes(), &obj) - assert.Equal(t, obj["_value"], float64(1)) + bufW := bytes.NewBuffer(nil) + _, _ = io.Copy(bufW, r) + _ = r.Close() + + var obj GelfObject + _ = json.Unmarshal(bufW.Bytes(), &obj) + require.NoError(t, err) + assert.Equal(t, obj["_value"], float64(1)) + } + + // in UDP scenario all 4 messages are received + + recv() + recv() + recv() + recv() +} + +func TCPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, wg3 *sync.WaitGroup) { + serverAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:12201") + require.NoError(t, err) + tcpServer, err := net.ListenTCP("tcp", serverAddr) + require.NoError(t, err) + defer tcpServer.Close() + defer wg.Done() + + bufR := make([]byte, 1) + bufW := bytes.NewBuffer(nil) + wg2.Done() + + accept := func() *net.TCPConn { + conn, err := tcpServer.AcceptTCP() + require.NoError(t, err) + _ = conn.SetLinger(0) + return conn + } + conn := accept() + defer conn.Close() + + recv := func() { + bufW.Reset() + for { + n, err := conn.Read(bufR) + require.NoError(t, err) + if n > 0 { + if bufR[0] == 0 { // message delimiter found + break + } + _, _ = bufW.Write(bufR) + } + } + + var obj GelfObject + err = json.Unmarshal(bufW.Bytes(), &obj) + require.NoError(t, err) + assert.Equal(t, obj["_value"], float64(1)) + } + + // in TCP scenario only 3 messages are received (1st, 2dn and 4th) due to connection break after the 2nd + + recv() + recv() + _ = conn.Close() + wg3.Done() + conn = accept() + defer conn.Close() + recv() }