feat: add graylog plugin TCP support (#9644)

This commit is contained in:
alespour 2021-09-21 23:02:36 +02:00 committed by GitHub
parent 027647e3ed
commit a9898f179b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 297 additions and 58 deletions

View File

@ -762,8 +762,8 @@
# # Send telegraf metrics to graylog # # Send telegraf metrics to graylog
# [[outputs.graylog]] # [[outputs.graylog]]
# ## UDP endpoint for your graylog instance. # ## Endpoints for your graylog instances.
# servers = ["127.0.0.1:12201"] # servers = ["udp://127.0.0.1:12201"]
# #
# ## The field to use as the GELF short_message, if unset the static string # ## The field to use as the GELF short_message, if unset the static string
# ## "telegraf" will be used. # ## "telegraf" will be used.

View File

@ -8,11 +8,17 @@ This plugin writes to a Graylog instance using the "[GELF][]" format.
```toml ```toml
[[outputs.graylog]] [[outputs.graylog]]
## UDP endpoint for your graylog instances. ## Endpoints for your graylog instances.
servers = ["127.0.0.1:12201"] servers = ["udp://127.0.0.1:12201"]
## Connection timeout.
# timeout = "5s"
## The field to use as the GELF short_message, if unset the static string ## The field to use as the GELF short_message, if unset the static string
## "telegraf" will be used. ## "telegraf" will be used.
## example: short_message_field = "message" ## example: short_message_field = "message"
# short_message_field = "" # short_message_field = ""
``` ```
Server endpoint may be specified without UDP or TCP scheme (eg. "127.0.0.1:12201").
In such case, UDP protocol is assumed.

View File

@ -11,8 +11,11 @@ import (
"math" "math"
"net" "net"
"os" "os"
"strings"
"time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs" "github.com/influxdata/telegraf/plugins/outputs"
) )
@ -21,45 +24,78 @@ const (
defaultConnection = "wan" defaultConnection = "wan"
defaultMaxChunkSizeWan = 1420 defaultMaxChunkSizeWan = 1420
defaultMaxChunkSizeLan = 8154 defaultMaxChunkSizeLan = 8154
defaultScheme = "udp"
defaultTimeout = 5 * time.Second
) )
type GelfConfig struct { type gelfConfig struct {
GraylogEndpoint string GraylogEndpoint string
Connection string Connection string
MaxChunkSizeWan int MaxChunkSizeWan int
MaxChunkSizeLan int MaxChunkSizeLan int
} }
type Gelf struct { type gelf interface {
GelfConfig io.WriteCloser
} }
func NewGelfWriter(config GelfConfig) *Gelf { type gelfCommon struct {
if config.GraylogEndpoint == "" { gelfConfig
config.GraylogEndpoint = defaultGraylogEndpoint dialer *net.Dialer
conn net.Conn
}
type gelfUDP struct {
gelfCommon
}
type gelfTCP struct {
gelfCommon
}
func newGelfWriter(cfg gelfConfig, dialer *net.Dialer) gelf {
if cfg.GraylogEndpoint == "" {
cfg.GraylogEndpoint = defaultGraylogEndpoint
} }
if config.Connection == "" { if cfg.Connection == "" {
config.Connection = defaultConnection cfg.Connection = defaultConnection
} }
if config.MaxChunkSizeWan == 0 { if cfg.MaxChunkSizeWan == 0 {
config.MaxChunkSizeWan = defaultMaxChunkSizeWan cfg.MaxChunkSizeWan = defaultMaxChunkSizeWan
} }
if config.MaxChunkSizeLan == 0 { if cfg.MaxChunkSizeLan == 0 {
config.MaxChunkSizeLan = defaultMaxChunkSizeLan cfg.MaxChunkSizeLan = defaultMaxChunkSizeLan
} }
g := &Gelf{GelfConfig: config} scheme := defaultScheme
parts := strings.SplitN(cfg.GraylogEndpoint, "://", 2)
if len(parts) == 2 {
scheme = strings.ToLower(parts[0])
cfg.GraylogEndpoint = parts[1]
}
common := gelfCommon{
gelfConfig: cfg,
dialer: dialer,
}
var g gelf
switch scheme {
case "tcp":
g = &gelfTCP{gelfCommon: common}
default:
g = &gelfUDP{gelfCommon: common}
}
return g return g
} }
func (g *Gelf) Write(message []byte) (n int, err error) { func (g *gelfUDP) Write(message []byte) (n int, err error) {
compressed := g.compress(message) compressed := g.compress(message)
chunksize := g.GelfConfig.MaxChunkSizeWan chunksize := g.gelfConfig.MaxChunkSizeWan
length := compressed.Len() length := compressed.Len()
if length > chunksize { if length > chunksize {
@ -84,10 +120,19 @@ func (g *Gelf) Write(message []byte) (n int, err error) {
n = len(message) n = len(message)
return return n, nil
} }
func (g *Gelf) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer { func (g *gelfUDP) Close() (err error) {
if g.conn != nil {
err = g.conn.Close()
g.conn = nil
}
return err
}
func (g *gelfUDP) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) bytes.Buffer {
var packet bytes.Buffer var packet bytes.Buffer
chunksize := g.getChunksize() chunksize := g.getChunksize()
@ -104,26 +149,26 @@ func (g *Gelf) createChunkedMessage(index int, chunkCountInt int, id []byte, com
return packet return packet
} }
func (g *Gelf) getChunksize() int { func (g *gelfUDP) getChunksize() int {
if g.GelfConfig.Connection == "wan" { if g.gelfConfig.Connection == "wan" {
return g.GelfConfig.MaxChunkSizeWan return g.gelfConfig.MaxChunkSizeWan
} }
if g.GelfConfig.Connection == "lan" { if g.gelfConfig.Connection == "lan" {
return g.GelfConfig.MaxChunkSizeLan return g.gelfConfig.MaxChunkSizeLan
} }
return g.GelfConfig.MaxChunkSizeWan return g.gelfConfig.MaxChunkSizeWan
} }
func (g *Gelf) intToBytes(i int) []byte { func (g *gelfUDP) intToBytes(i int) []byte {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, int8(i)) binary.Write(buf, binary.LittleEndian, int8(i))
return buf.Bytes() return buf.Bytes()
} }
func (g *Gelf) compress(b []byte) bytes.Buffer { func (g *gelfUDP) compress(b []byte) bytes.Buffer {
var buf bytes.Buffer var buf bytes.Buffer
comp := zlib.NewWriter(&buf) comp := zlib.NewWriter(&buf)
@ -133,30 +178,83 @@ func (g *Gelf) compress(b []byte) bytes.Buffer {
return buf return buf
} }
func (g *Gelf) send(b []byte) error { func (g *gelfUDP) send(b []byte) error {
udpAddr, err := net.ResolveUDPAddr("udp", g.GelfConfig.GraylogEndpoint) if g.conn == nil {
if err != nil { conn, err := g.dialer.Dial("udp", g.gelfConfig.GraylogEndpoint)
return err if err != nil {
return err
}
g.conn = conn
} }
conn, err := net.DialUDP("udp", nil, udpAddr) _, err := g.conn.Write(b)
if err != nil { if err != nil {
return err _ = g.conn.Close()
g.conn = nil
}
return err
}
func (g *gelfTCP) Write(message []byte) (n int, err error) {
err = g.send(message)
if err != nil {
return 0, err
}
n = len(message)
return n, nil
}
func (g *gelfTCP) Close() (err error) {
if g.conn != nil {
err = g.conn.Close()
g.conn = nil
}
return err
}
func (g *gelfTCP) send(b []byte) error {
if g.conn == nil {
conn, err := g.dialer.Dial("tcp", g.gelfConfig.GraylogEndpoint)
if err != nil {
return err
}
g.conn = conn
}
_, err := g.conn.Write(b)
if err != nil {
_ = g.conn.Close()
g.conn = nil
} else {
_, err = g.conn.Write([]byte{0}) // message delimiter
if err != nil {
_ = g.conn.Close()
g.conn = nil
}
} }
_, err = conn.Write(b)
return err return err
} }
type Graylog struct { type Graylog struct {
Servers []string `toml:"servers"` Servers []string `toml:"servers"`
ShortMessageField string `toml:"short_message_field"` ShortMessageField string `toml:"short_message_field"`
writer io.Writer Timeout config.Duration `toml:"timeout"`
writer io.Writer
closers []io.WriteCloser
} }
var sampleConfig = ` var sampleConfig = `
## UDP endpoint for your graylog instance. ## Endpoints for your graylog instances.
servers = ["127.0.0.1:12201"] servers = ["udp://127.0.0.1:12201"]
## Connection timeout.
# timeout = "5s"
## The field to use as the GELF short_message, if unset the static string ## The field to use as the GELF short_message, if unset the static string
## "telegraf" will be used. ## "telegraf" will be used.
@ -166,14 +264,16 @@ var sampleConfig = `
func (g *Graylog) Connect() error { func (g *Graylog) Connect() error {
writers := []io.Writer{} writers := []io.Writer{}
dialer := net.Dialer{Timeout: time.Duration(g.Timeout)}
if len(g.Servers) == 0 { if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:12201") g.Servers = append(g.Servers, "localhost:12201")
} }
for _, server := range g.Servers { for _, server := range g.Servers {
w := NewGelfWriter(GelfConfig{GraylogEndpoint: server}) w := newGelfWriter(gelfConfig{GraylogEndpoint: server}, &dialer)
writers = append(writers, w) writers = append(writers, w)
g.closers = append(g.closers, w)
} }
g.writer = io.MultiWriter(writers...) g.writer = io.MultiWriter(writers...)
@ -181,6 +281,9 @@ func (g *Graylog) Connect() error {
} }
func (g *Graylog) Close() error { func (g *Graylog) Close() error {
for _, closer := range g.closers {
_ = closer.Close()
}
return nil return nil
} }
@ -253,6 +356,8 @@ func (g *Graylog) serialize(metric telegraf.Metric) ([]string, error) {
func init() { func init() {
outputs.Add("graylog", func() telegraf.Output { outputs.Add("graylog", func() telegraf.Output {
return &Graylog{} return &Graylog{
Timeout: config.Duration(defaultTimeout),
}
}) })
} }

View File

@ -11,9 +11,22 @@ import (
"github.com/influxdata/telegraf/testutil" "github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestWrite(t *testing.T) { func TestWriteDefault(t *testing.T) {
scenarioUDP(t, "127.0.0.1:12201")
}
func TestWriteUDP(t *testing.T) {
scenarioUDP(t, "udp://127.0.0.1:12201")
}
func TestWriteTCP(t *testing.T) {
scenarioTCP(t, "tcp://127.0.0.1:12201")
}
func scenarioUDP(t *testing.T, server string) {
var wg sync.WaitGroup var wg sync.WaitGroup
var wg2 sync.WaitGroup var wg2 sync.WaitGroup
wg.Add(1) wg.Add(1)
@ -22,13 +35,62 @@ func TestWrite(t *testing.T) {
wg2.Wait() wg2.Wait()
i := Graylog{ i := Graylog{
Servers: []string{"127.0.0.1:12201"}, Servers: []string{server},
} }
i.Connect() err := i.Connect()
require.NoError(t, err)
metrics := testutil.MockMetrics() metrics := testutil.MockMetrics()
i.Write(metrics) // UDP scenario:
// 4 messages are send
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
wg.Wait()
i.Close()
}
func scenarioTCP(t *testing.T, server string) {
var wg sync.WaitGroup
var wg2 sync.WaitGroup
var wg3 sync.WaitGroup
wg.Add(1)
wg2.Add(1)
wg3.Add(1)
go TCPServer(t, &wg, &wg2, &wg3)
wg2.Wait()
i := Graylog{
Servers: []string{server},
}
err := i.Connect()
require.NoError(t, err)
metrics := testutil.MockMetrics()
// TCP scenario:
// 4 messages are send
// -> connection gets broken after the 2nd message (server closes connection)
// -> the 3rd write ends with error
// -> in the 4th write connection is restored and write is successful
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
wg3.Wait()
err = i.Write(metrics)
require.Error(t, err)
err = i.Write(metrics)
require.NoError(t, err)
wg.Wait() wg.Wait()
i.Close() i.Close()
@ -37,22 +99,88 @@ func TestWrite(t *testing.T) {
type GelfObject map[string]interface{} type GelfObject map[string]interface{}
func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup) { func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup) {
serverAddr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:12201") serverAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:12201")
udpServer, _ := net.ListenUDP("udp", serverAddr) require.NoError(t, err)
udpServer, err := net.ListenUDP("udp", serverAddr)
require.NoError(t, err)
defer udpServer.Close()
defer wg.Done() defer wg.Done()
bufR := make([]byte, 1024) bufR := make([]byte, 1024)
wg2.Done() wg2.Done()
n, _, _ := udpServer.ReadFromUDP(bufR)
b := bytes.NewReader(bufR[0:n]) recv := func() {
r, _ := zlib.NewReader(b) n, _, err := udpServer.ReadFromUDP(bufR)
require.NoError(t, err)
bufW := bytes.NewBuffer(nil) b := bytes.NewReader(bufR[0:n])
io.Copy(bufW, r) r, _ := zlib.NewReader(b)
r.Close()
var obj GelfObject bufW := bytes.NewBuffer(nil)
json.Unmarshal(bufW.Bytes(), &obj) _, _ = io.Copy(bufW, r)
assert.Equal(t, obj["_value"], float64(1)) _ = r.Close()
var obj GelfObject
_ = json.Unmarshal(bufW.Bytes(), &obj)
require.NoError(t, err)
assert.Equal(t, obj["_value"], float64(1))
}
// in UDP scenario all 4 messages are received
recv()
recv()
recv()
recv()
}
func TCPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, wg3 *sync.WaitGroup) {
serverAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:12201")
require.NoError(t, err)
tcpServer, err := net.ListenTCP("tcp", serverAddr)
require.NoError(t, err)
defer tcpServer.Close()
defer wg.Done()
bufR := make([]byte, 1)
bufW := bytes.NewBuffer(nil)
wg2.Done()
accept := func() *net.TCPConn {
conn, err := tcpServer.AcceptTCP()
require.NoError(t, err)
_ = conn.SetLinger(0)
return conn
}
conn := accept()
defer conn.Close()
recv := func() {
bufW.Reset()
for {
n, err := conn.Read(bufR)
require.NoError(t, err)
if n > 0 {
if bufR[0] == 0 { // message delimiter found
break
}
_, _ = bufW.Write(bufR)
}
}
var obj GelfObject
err = json.Unmarshal(bufW.Bytes(), &obj)
require.NoError(t, err)
assert.Equal(t, obj["_value"], float64(1))
}
// in TCP scenario only 3 messages are received (1st, 2dn and 4th) due to connection break after the 2nd
recv()
recv()
_ = conn.Close()
wg3.Done()
conn = accept()
defer conn.Close()
recv()
} }