fix: Graylog plugin TLS support and message format (#9862)

This commit is contained in:
alespour 2021-10-21 16:39:24 +02:00 committed by GitHub
parent c669874750
commit e685f3be46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 253 additions and 119 deletions

View File

@ -769,6 +769,13 @@
# ## "telegraf" will be used.
# ## example: short_message_field = "message"
# # short_message_field = ""
#
# ## Optional TLS Config
# # tls_ca = "/etc/telegraf/ca.pem"
# # tls_cert = "/etc/telegraf/cert.pem"
# # tls_key = "/etc/telegraf/key.pem"
# ## Use TLS but skip chain & host verification
# # insecure_skip_verify = false
# # Configurable HTTP health check resource based on metrics

View File

@ -18,7 +18,19 @@ This plugin writes to a Graylog instance using the "[GELF][]" format.
## "telegraf" will be used.
## example: short_message_field = "message"
# short_message_field = ""
## According to GELF payload specification, additional fields names must be prefixed
## with an underscore. Previous versions did not prefix custom field 'name' with underscore.
## Set to true for backward compatibility.
# name_field_no_prefix = false
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
```
Server endpoint may be specified without UDP or TCP scheme (eg. "127.0.0.1:12201").
In such case, UDP protocol is assumed.
In such case, UDP protocol is assumed. TLS config is ignored for UDP endpoints.

View File

@ -4,6 +4,7 @@ import (
"bytes"
"compress/zlib"
"crypto/rand"
"crypto/tls"
"encoding/binary"
ejson "encoding/json"
"fmt"
@ -16,11 +17,12 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
)
const (
defaultGraylogEndpoint = "127.0.0.1:12201"
defaultEndpoint = "127.0.0.1:12201"
defaultConnection = "wan"
defaultMaxChunkSizeWan = 1420
defaultMaxChunkSizeLan = 8154
@ -29,7 +31,7 @@ const (
)
type gelfConfig struct {
GraylogEndpoint string
Endpoint string
Connection string
MaxChunkSizeWan int
MaxChunkSizeLan int
@ -37,6 +39,7 @@ type gelfConfig struct {
type gelf interface {
io.WriteCloser
Connect() error
}
type gelfCommon struct {
@ -51,11 +54,12 @@ type gelfUDP struct {
type gelfTCP struct {
gelfCommon
tlsConfig *tls.Config
}
func newGelfWriter(cfg gelfConfig, dialer *net.Dialer) gelf {
if cfg.GraylogEndpoint == "" {
cfg.GraylogEndpoint = defaultGraylogEndpoint
func newGelfWriter(cfg gelfConfig, dialer *net.Dialer, tlsConfig *tls.Config) gelf {
if cfg.Endpoint == "" {
cfg.Endpoint = defaultEndpoint
}
if cfg.Connection == "" {
@ -71,10 +75,10 @@ func newGelfWriter(cfg gelfConfig, dialer *net.Dialer) gelf {
}
scheme := defaultScheme
parts := strings.SplitN(cfg.GraylogEndpoint, "://", 2)
parts := strings.SplitN(cfg.Endpoint, "://", 2)
if len(parts) == 2 {
scheme = strings.ToLower(parts[0])
cfg.GraylogEndpoint = parts[1]
cfg.Endpoint = parts[1]
}
common := gelfCommon{
gelfConfig: cfg,
@ -84,7 +88,7 @@ func newGelfWriter(cfg gelfConfig, dialer *net.Dialer) gelf {
var g gelf
switch scheme {
case "tcp":
g = &gelfTCP{gelfCommon: common}
g = &gelfTCP{gelfCommon: common, tlsConfig: tlsConfig}
default:
g = &gelfUDP{gelfCommon: common}
}
@ -178,13 +182,21 @@ func (g *gelfUDP) compress(b []byte) bytes.Buffer {
return buf
}
func (g *gelfUDP) Connect() error {
conn, err := g.dialer.Dial("udp", g.gelfConfig.Endpoint)
if err != nil {
return err
}
g.conn = conn
return nil
}
func (g *gelfUDP) send(b []byte) error {
if g.conn == nil {
conn, err := g.dialer.Dial("udp", g.gelfConfig.GraylogEndpoint)
err := g.Connect()
if err != nil {
return err
}
g.conn = conn
}
_, err := g.conn.Write(b)
@ -216,13 +228,27 @@ func (g *gelfTCP) Close() (err error) {
return err
}
func (g *gelfTCP) Connect() error {
var err error
var conn net.Conn
if g.tlsConfig == nil {
conn, err = g.dialer.Dial("tcp", g.gelfConfig.Endpoint)
} else {
conn, err = tls.DialWithDialer(g.dialer, "tcp", g.gelfConfig.Endpoint, g.tlsConfig)
}
if err != nil {
return err
}
g.conn = conn
return nil
}
func (g *gelfTCP) send(b []byte) error {
if g.conn == nil {
conn, err := g.dialer.Dial("tcp", g.gelfConfig.GraylogEndpoint)
err := g.Connect()
if err != nil {
return err
}
g.conn = conn
}
_, err := g.conn.Write(b)
@ -243,7 +269,9 @@ func (g *gelfTCP) send(b []byte) error {
type Graylog struct {
Servers []string `toml:"servers"`
ShortMessageField string `toml:"short_message_field"`
NameFieldNoPrefix bool `toml:"name_field_noprefix"`
Timeout config.Duration `toml:"timeout"`
tlsint.ClientConfig
writer io.Writer
closers []io.WriteCloser
@ -260,18 +288,39 @@ var sampleConfig = `
## "telegraf" will be used.
## example: short_message_field = "message"
# short_message_field = ""
## According to GELF payload specification, additional fields names must be prefixed
## with an underscore. Previous versions did not prefix custom field 'name' with underscore.
## Set to true for backward compatibility.
# name_field_no_prefix = false
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
func (g *Graylog) Connect() error {
writers := []io.Writer{}
dialer := net.Dialer{Timeout: time.Duration(g.Timeout)}
var writers []io.Writer
dialer := &net.Dialer{Timeout: time.Duration(g.Timeout)}
if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:12201")
}
tlsCfg, err := g.ClientConfig.TLSConfig()
if err != nil {
return err
}
for _, server := range g.Servers {
w := newGelfWriter(gelfConfig{GraylogEndpoint: server}, &dialer)
w := newGelfWriter(gelfConfig{Endpoint: server}, dialer, tlsCfg)
err := w.Connect()
if err != nil {
return fmt.Errorf("failed to connect to server [%s]: %v", server, err)
}
writers = append(writers, w)
g.closers = append(g.closers, w)
}
@ -319,7 +368,11 @@ func (g *Graylog) serialize(metric telegraf.Metric) ([]string, error) {
m["version"] = "1.1"
m["timestamp"] = float64(metric.Time().UnixNano()) / 1_000_000_000
m["short_message"] = "telegraf"
m["name"] = metric.Name()
if g.NameFieldNoPrefix {
m["name"] = metric.Name()
} else {
m["_name"] = metric.Name()
}
if host, ok := metric.GetTag("host"); ok {
m["host"] = host

View File

@ -3,125 +3,174 @@ package graylog
import (
"bytes"
"compress/zlib"
"crypto/tls"
"encoding/json"
"io"
"net"
"sync"
"testing"
"time"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestWriteDefault(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
scenarioUDP(t, "127.0.0.1:12201")
}
func TestWriteUDP(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
tests := []struct {
name string
instance Graylog
}{
{
name: "default without scheme",
instance: Graylog{
Servers: []string{"127.0.0.1:12201"},
},
},
{
name: "UDP",
instance: Graylog{
Servers: []string{"udp://127.0.0.1:12201"},
},
},
{
name: "UDP non-standard name field",
instance: Graylog{
Servers: []string{"udp://127.0.0.1:12201"},
NameFieldNoPrefix: true,
},
},
}
scenarioUDP(t, "udp://127.0.0.1:12201")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var wg sync.WaitGroup
var wg2 sync.WaitGroup
wg.Add(1)
wg2.Add(1)
go UDPServer(t, &wg, &wg2, &tt.instance)
wg2.Wait()
i := tt.instance
err := i.Connect()
require.NoError(t, err)
defer i.Close()
defer wg.Wait()
metrics := testutil.MockMetrics()
// UDP scenario:
// 4 messages are send
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
})
}
}
func TestWriteTCP(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
pki := testutil.NewPKI("../../../testutil/pki")
tlsClientConfig := pki.TLSClientConfig()
tlsServerConfig, err := pki.TLSServerConfig().TLSConfig()
require.NoError(t, err)
tests := []struct {
name string
instance Graylog
tlsServerConfig *tls.Config
}{
{
name: "TCP",
instance: Graylog{
Servers: []string{"tcp://127.0.0.1:12201"},
},
},
{
name: "TLS",
instance: Graylog{
Servers: []string{"tcp://127.0.0.1:12201"},
ClientConfig: tlsint.ClientConfig{
ServerName: "localhost",
TLSCA: tlsClientConfig.TLSCA,
TLSKey: tlsClientConfig.TLSKey,
TLSCert: tlsClientConfig.TLSCert,
},
},
tlsServerConfig: tlsServerConfig,
},
{
name: "TLS no validation",
instance: Graylog{
Servers: []string{"tcp://127.0.0.1:12201"},
ClientConfig: tlsint.ClientConfig{
InsecureSkipVerify: true,
ServerName: "localhost",
TLSKey: tlsClientConfig.TLSKey,
TLSCert: tlsClientConfig.TLSCert,
},
},
tlsServerConfig: tlsServerConfig,
},
}
scenarioTCP(t, "tcp://127.0.0.1:12201")
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var wg sync.WaitGroup
var wg2 sync.WaitGroup
var wg3 sync.WaitGroup
wg.Add(1)
wg2.Add(1)
wg3.Add(1)
go TCPServer(t, &wg, &wg2, &wg3, tt.tlsServerConfig)
wg2.Wait()
func scenarioUDP(t *testing.T, server string) {
var wg sync.WaitGroup
var wg2 sync.WaitGroup
wg.Add(1)
wg2.Add(1)
go UDPServer(t, &wg, &wg2)
wg2.Wait()
i := tt.instance
err = i.Connect()
require.NoError(t, err)
defer i.Close()
defer wg.Wait()
i := Graylog{
Servers: []string{server},
metrics := testutil.MockMetrics()
// TCP scenario:
// 4 messages are send
// -> connection gets forcefully broken after the 2nd message (server closes connection)
// -> the 3rd write fails with error
// -> during the 4th write connection is restored and write is successful
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
wg3.Wait()
err = i.Write(metrics)
require.Error(t, err)
err = i.Write(metrics)
require.NoError(t, err)
})
}
err := i.Connect()
require.NoError(t, err)
metrics := testutil.MockMetrics()
// UDP scenario:
// 4 messages are send
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
wg.Wait()
i.Close()
}
func scenarioTCP(t *testing.T, server string) {
var wg sync.WaitGroup
var wg2 sync.WaitGroup
var wg3 sync.WaitGroup
wg.Add(1)
wg2.Add(1)
wg3.Add(1)
go TCPServer(t, &wg, &wg2, &wg3)
wg2.Wait()
i := Graylog{
Servers: []string{server},
}
err := i.Connect()
require.NoError(t, err)
metrics := testutil.MockMetrics()
// TCP scenario:
// 4 messages are send
// -> connection gets broken after the 2nd message (server closes connection)
// -> the 3rd write ends with error
// -> in the 4th write connection is restored and write is successful
err = i.Write(metrics)
require.NoError(t, err)
err = i.Write(metrics)
require.NoError(t, err)
wg3.Wait()
err = i.Write(metrics)
require.Error(t, err)
err = i.Write(metrics)
require.NoError(t, err)
wg.Wait()
i.Close()
}
type GelfObject map[string]interface{}
func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup) {
func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, config *Graylog) {
serverAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:12201")
require.NoError(t, err)
udpServer, err := net.ListenUDP("udp", serverAddr)
require.NoError(t, err)
defer udpServer.Close()
defer wg.Done()
bufR := make([]byte, 1024)
wg2.Done()
recv := func() {
bufR := make([]byte, 1024)
n, _, err := udpServer.ReadFromUDP(bufR)
require.NoError(t, err)
@ -135,6 +184,13 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup) {
var obj GelfObject
_ = json.Unmarshal(bufW.Bytes(), &obj)
require.NoError(t, err)
assert.Equal(t, obj["short_message"], "telegraf")
if config.NameFieldNoPrefix {
assert.Equal(t, obj["name"], "test1")
} else {
assert.Equal(t, obj["_name"], "test1")
}
assert.Equal(t, obj["_tag1"], "value1")
assert.Equal(t, obj["_value"], float64(1))
}
@ -146,29 +202,29 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup) {
recv()
}
func TCPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, wg3 *sync.WaitGroup) {
serverAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:12201")
require.NoError(t, err)
tcpServer, err := net.ListenTCP("tcp", serverAddr)
func TCPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, wg3 *sync.WaitGroup, tlsConfig *tls.Config) {
tcpServer, err := net.Listen("tcp", "127.0.0.1:12201")
require.NoError(t, err)
defer tcpServer.Close()
defer wg.Done()
bufR := make([]byte, 1)
bufW := bytes.NewBuffer(nil)
wg2.Done()
accept := func() *net.TCPConn {
conn, err := tcpServer.AcceptTCP()
accept := func() net.Conn {
conn, err := tcpServer.Accept()
require.NoError(t, err)
_ = conn.SetLinger(0)
if tcpConn, ok := conn.(*net.TCPConn); ok {
_ = tcpConn.SetLinger(0)
}
_ = conn.SetDeadline(time.Now().Add(15 * time.Second))
if tlsConfig != nil {
conn = tls.Server(conn, tlsConfig)
}
return conn
}
conn := accept()
defer conn.Close()
recv := func() {
bufW.Reset()
recv := func(conn net.Conn) {
bufR := make([]byte, 1)
bufW := bytes.NewBuffer(nil)
for {
n, err := conn.Read(bufR)
require.NoError(t, err)
@ -183,16 +239,22 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, wg3 *sync.
var obj GelfObject
err = json.Unmarshal(bufW.Bytes(), &obj)
require.NoError(t, err)
assert.Equal(t, obj["short_message"], "telegraf")
assert.Equal(t, obj["_name"], "test1")
assert.Equal(t, obj["_tag1"], "value1")
assert.Equal(t, obj["_value"], float64(1))
}
// in TCP scenario only 3 messages are received (1st, 2dn and 4th) due to connection break after the 2nd
conn := accept()
defer conn.Close()
recv()
recv()
// in TCP scenario only 3 messages are received, the 3rd is lost due to simulated connection break after the 2nd
recv(conn)
recv(conn)
_ = conn.Close()
wg3.Done()
conn = accept()
defer conn.Close()
recv()
recv(conn)
}