diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 2f2dce2f6..8b5fe63d1 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -769,6 +769,13 @@ # ## "telegraf" will be used. # ## example: short_message_field = "message" # # short_message_field = "" +# +# ## Optional TLS Config +# # tls_ca = "/etc/telegraf/ca.pem" +# # tls_cert = "/etc/telegraf/cert.pem" +# # tls_key = "/etc/telegraf/key.pem" +# ## Use TLS but skip chain & host verification +# # insecure_skip_verify = false # # Configurable HTTP health check resource based on metrics diff --git a/plugins/outputs/graylog/README.md b/plugins/outputs/graylog/README.md index 600312289..96e290b09 100644 --- a/plugins/outputs/graylog/README.md +++ b/plugins/outputs/graylog/README.md @@ -18,7 +18,19 @@ This plugin writes to a Graylog instance using the "[GELF][]" format. ## "telegraf" will be used. ## example: short_message_field = "message" # short_message_field = "" + + ## According to GELF payload specification, additional fields names must be prefixed + ## with an underscore. Previous versions did not prefix custom field 'name' with underscore. + ## Set to true for backward compatibility. + # name_field_no_prefix = false + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false ``` Server endpoint may be specified without UDP or TCP scheme (eg. "127.0.0.1:12201"). -In such case, UDP protocol is assumed. +In such case, UDP protocol is assumed. TLS config is ignored for UDP endpoints. diff --git a/plugins/outputs/graylog/graylog.go b/plugins/outputs/graylog/graylog.go index 951273e2e..16b744f35 100644 --- a/plugins/outputs/graylog/graylog.go +++ b/plugins/outputs/graylog/graylog.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/zlib" "crypto/rand" + "crypto/tls" "encoding/binary" ejson "encoding/json" "fmt" @@ -16,11 +17,12 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/plugins/outputs" ) const ( - defaultGraylogEndpoint = "127.0.0.1:12201" + defaultEndpoint = "127.0.0.1:12201" defaultConnection = "wan" defaultMaxChunkSizeWan = 1420 defaultMaxChunkSizeLan = 8154 @@ -29,7 +31,7 @@ const ( ) type gelfConfig struct { - GraylogEndpoint string + Endpoint string Connection string MaxChunkSizeWan int MaxChunkSizeLan int @@ -37,6 +39,7 @@ type gelfConfig struct { type gelf interface { io.WriteCloser + Connect() error } type gelfCommon struct { @@ -51,11 +54,12 @@ type gelfUDP struct { type gelfTCP struct { gelfCommon + tlsConfig *tls.Config } -func newGelfWriter(cfg gelfConfig, dialer *net.Dialer) gelf { - if cfg.GraylogEndpoint == "" { - cfg.GraylogEndpoint = defaultGraylogEndpoint +func newGelfWriter(cfg gelfConfig, dialer *net.Dialer, tlsConfig *tls.Config) gelf { + if cfg.Endpoint == "" { + cfg.Endpoint = defaultEndpoint } if cfg.Connection == "" { @@ -71,10 +75,10 @@ func newGelfWriter(cfg gelfConfig, dialer *net.Dialer) gelf { } scheme := defaultScheme - parts := strings.SplitN(cfg.GraylogEndpoint, "://", 2) + parts := strings.SplitN(cfg.Endpoint, "://", 2) if len(parts) == 2 { scheme = strings.ToLower(parts[0]) - cfg.GraylogEndpoint = parts[1] + cfg.Endpoint = parts[1] } common := gelfCommon{ gelfConfig: cfg, @@ -84,7 +88,7 @@ func newGelfWriter(cfg gelfConfig, dialer *net.Dialer) gelf { var g gelf switch scheme { case "tcp": - g = &gelfTCP{gelfCommon: common} + g = &gelfTCP{gelfCommon: common, tlsConfig: tlsConfig} default: g = &gelfUDP{gelfCommon: common} } @@ -178,13 +182,21 @@ func (g *gelfUDP) compress(b []byte) bytes.Buffer { return buf } +func (g *gelfUDP) Connect() error { + conn, err := g.dialer.Dial("udp", g.gelfConfig.Endpoint) + if err != nil { + return err + } + g.conn = conn + return nil +} + func (g *gelfUDP) send(b []byte) error { if g.conn == nil { - conn, err := g.dialer.Dial("udp", g.gelfConfig.GraylogEndpoint) + err := g.Connect() if err != nil { return err } - g.conn = conn } _, err := g.conn.Write(b) @@ -216,13 +228,27 @@ func (g *gelfTCP) Close() (err error) { return err } +func (g *gelfTCP) Connect() error { + var err error + var conn net.Conn + if g.tlsConfig == nil { + conn, err = g.dialer.Dial("tcp", g.gelfConfig.Endpoint) + } else { + conn, err = tls.DialWithDialer(g.dialer, "tcp", g.gelfConfig.Endpoint, g.tlsConfig) + } + if err != nil { + return err + } + g.conn = conn + return nil +} + func (g *gelfTCP) send(b []byte) error { if g.conn == nil { - conn, err := g.dialer.Dial("tcp", g.gelfConfig.GraylogEndpoint) + err := g.Connect() if err != nil { return err } - g.conn = conn } _, err := g.conn.Write(b) @@ -243,7 +269,9 @@ func (g *gelfTCP) send(b []byte) error { type Graylog struct { Servers []string `toml:"servers"` ShortMessageField string `toml:"short_message_field"` + NameFieldNoPrefix bool `toml:"name_field_noprefix"` Timeout config.Duration `toml:"timeout"` + tlsint.ClientConfig writer io.Writer closers []io.WriteCloser @@ -260,18 +288,39 @@ var sampleConfig = ` ## "telegraf" will be used. ## example: short_message_field = "message" # short_message_field = "" + + ## According to GELF payload specification, additional fields names must be prefixed + ## with an underscore. Previous versions did not prefix custom field 'name' with underscore. + ## Set to true for backward compatibility. + # name_field_no_prefix = false + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false ` func (g *Graylog) Connect() error { - writers := []io.Writer{} - dialer := net.Dialer{Timeout: time.Duration(g.Timeout)} + var writers []io.Writer + dialer := &net.Dialer{Timeout: time.Duration(g.Timeout)} if len(g.Servers) == 0 { g.Servers = append(g.Servers, "localhost:12201") } + tlsCfg, err := g.ClientConfig.TLSConfig() + if err != nil { + return err + } + for _, server := range g.Servers { - w := newGelfWriter(gelfConfig{GraylogEndpoint: server}, &dialer) + w := newGelfWriter(gelfConfig{Endpoint: server}, dialer, tlsCfg) + err := w.Connect() + if err != nil { + return fmt.Errorf("failed to connect to server [%s]: %v", server, err) + } writers = append(writers, w) g.closers = append(g.closers, w) } @@ -319,7 +368,11 @@ func (g *Graylog) serialize(metric telegraf.Metric) ([]string, error) { m["version"] = "1.1" m["timestamp"] = float64(metric.Time().UnixNano()) / 1_000_000_000 m["short_message"] = "telegraf" - m["name"] = metric.Name() + if g.NameFieldNoPrefix { + m["name"] = metric.Name() + } else { + m["_name"] = metric.Name() + } if host, ok := metric.GetTag("host"); ok { m["host"] = host diff --git a/plugins/outputs/graylog/graylog_test.go b/plugins/outputs/graylog/graylog_test.go index fcf61ae77..3932c736c 100644 --- a/plugins/outputs/graylog/graylog_test.go +++ b/plugins/outputs/graylog/graylog_test.go @@ -3,125 +3,174 @@ package graylog import ( "bytes" "compress/zlib" + "crypto/tls" "encoding/json" "io" "net" "sync" "testing" + "time" + tlsint "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestWriteDefault(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - scenarioUDP(t, "127.0.0.1:12201") -} - func TestWriteUDP(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") + tests := []struct { + name string + instance Graylog + }{ + { + name: "default without scheme", + instance: Graylog{ + Servers: []string{"127.0.0.1:12201"}, + }, + }, + { + name: "UDP", + instance: Graylog{ + Servers: []string{"udp://127.0.0.1:12201"}, + }, + }, + { + name: "UDP non-standard name field", + instance: Graylog{ + Servers: []string{"udp://127.0.0.1:12201"}, + NameFieldNoPrefix: true, + }, + }, } - scenarioUDP(t, "udp://127.0.0.1:12201") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var wg sync.WaitGroup + var wg2 sync.WaitGroup + wg.Add(1) + wg2.Add(1) + go UDPServer(t, &wg, &wg2, &tt.instance) + wg2.Wait() + + i := tt.instance + err := i.Connect() + require.NoError(t, err) + defer i.Close() + defer wg.Wait() + + metrics := testutil.MockMetrics() + + // UDP scenario: + // 4 messages are send + + err = i.Write(metrics) + require.NoError(t, err) + err = i.Write(metrics) + require.NoError(t, err) + err = i.Write(metrics) + require.NoError(t, err) + err = i.Write(metrics) + require.NoError(t, err) + }) + } } func TestWriteTCP(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") + pki := testutil.NewPKI("../../../testutil/pki") + tlsClientConfig := pki.TLSClientConfig() + tlsServerConfig, err := pki.TLSServerConfig().TLSConfig() + require.NoError(t, err) + + tests := []struct { + name string + instance Graylog + tlsServerConfig *tls.Config + }{ + { + name: "TCP", + instance: Graylog{ + Servers: []string{"tcp://127.0.0.1:12201"}, + }, + }, + { + name: "TLS", + instance: Graylog{ + Servers: []string{"tcp://127.0.0.1:12201"}, + ClientConfig: tlsint.ClientConfig{ + ServerName: "localhost", + TLSCA: tlsClientConfig.TLSCA, + TLSKey: tlsClientConfig.TLSKey, + TLSCert: tlsClientConfig.TLSCert, + }, + }, + tlsServerConfig: tlsServerConfig, + }, + { + name: "TLS no validation", + instance: Graylog{ + Servers: []string{"tcp://127.0.0.1:12201"}, + ClientConfig: tlsint.ClientConfig{ + InsecureSkipVerify: true, + ServerName: "localhost", + TLSKey: tlsClientConfig.TLSKey, + TLSCert: tlsClientConfig.TLSCert, + }, + }, + tlsServerConfig: tlsServerConfig, + }, } - scenarioTCP(t, "tcp://127.0.0.1:12201") -} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var wg sync.WaitGroup + var wg2 sync.WaitGroup + var wg3 sync.WaitGroup + wg.Add(1) + wg2.Add(1) + wg3.Add(1) + go TCPServer(t, &wg, &wg2, &wg3, tt.tlsServerConfig) + wg2.Wait() -func scenarioUDP(t *testing.T, server string) { - var wg sync.WaitGroup - var wg2 sync.WaitGroup - wg.Add(1) - wg2.Add(1) - go UDPServer(t, &wg, &wg2) - wg2.Wait() + i := tt.instance + err = i.Connect() + require.NoError(t, err) + defer i.Close() + defer wg.Wait() - i := Graylog{ - Servers: []string{server}, + metrics := testutil.MockMetrics() + + // TCP scenario: + // 4 messages are send + // -> connection gets forcefully broken after the 2nd message (server closes connection) + // -> the 3rd write fails with error + // -> during the 4th write connection is restored and write is successful + + err = i.Write(metrics) + require.NoError(t, err) + err = i.Write(metrics) + require.NoError(t, err) + wg3.Wait() + err = i.Write(metrics) + require.Error(t, err) + err = i.Write(metrics) + require.NoError(t, err) + }) } - err := i.Connect() - require.NoError(t, err) - - metrics := testutil.MockMetrics() - - // UDP scenario: - // 4 messages are send - - err = i.Write(metrics) - require.NoError(t, err) - err = i.Write(metrics) - require.NoError(t, err) - err = i.Write(metrics) - require.NoError(t, err) - err = i.Write(metrics) - require.NoError(t, err) - - wg.Wait() - i.Close() -} - -func scenarioTCP(t *testing.T, server string) { - var wg sync.WaitGroup - var wg2 sync.WaitGroup - var wg3 sync.WaitGroup - wg.Add(1) - wg2.Add(1) - wg3.Add(1) - go TCPServer(t, &wg, &wg2, &wg3) - wg2.Wait() - - i := Graylog{ - Servers: []string{server}, - } - err := i.Connect() - require.NoError(t, err) - - metrics := testutil.MockMetrics() - - // TCP scenario: - // 4 messages are send - // -> connection gets broken after the 2nd message (server closes connection) - // -> the 3rd write ends with error - // -> in the 4th write connection is restored and write is successful - - err = i.Write(metrics) - require.NoError(t, err) - err = i.Write(metrics) - require.NoError(t, err) - wg3.Wait() - err = i.Write(metrics) - require.Error(t, err) - err = i.Write(metrics) - require.NoError(t, err) - - wg.Wait() - i.Close() } type GelfObject map[string]interface{} -func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup) { +func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, config *Graylog) { serverAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:12201") require.NoError(t, err) udpServer, err := net.ListenUDP("udp", serverAddr) require.NoError(t, err) defer udpServer.Close() defer wg.Done() - - bufR := make([]byte, 1024) wg2.Done() recv := func() { + bufR := make([]byte, 1024) n, _, err := udpServer.ReadFromUDP(bufR) require.NoError(t, err) @@ -135,6 +184,13 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup) { var obj GelfObject _ = json.Unmarshal(bufW.Bytes(), &obj) require.NoError(t, err) + assert.Equal(t, obj["short_message"], "telegraf") + if config.NameFieldNoPrefix { + assert.Equal(t, obj["name"], "test1") + } else { + assert.Equal(t, obj["_name"], "test1") + } + assert.Equal(t, obj["_tag1"], "value1") assert.Equal(t, obj["_value"], float64(1)) } @@ -146,29 +202,29 @@ func UDPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup) { recv() } -func TCPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, wg3 *sync.WaitGroup) { - serverAddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:12201") - require.NoError(t, err) - tcpServer, err := net.ListenTCP("tcp", serverAddr) +func TCPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, wg3 *sync.WaitGroup, tlsConfig *tls.Config) { + tcpServer, err := net.Listen("tcp", "127.0.0.1:12201") require.NoError(t, err) defer tcpServer.Close() defer wg.Done() - - bufR := make([]byte, 1) - bufW := bytes.NewBuffer(nil) wg2.Done() - accept := func() *net.TCPConn { - conn, err := tcpServer.AcceptTCP() + accept := func() net.Conn { + conn, err := tcpServer.Accept() require.NoError(t, err) - _ = conn.SetLinger(0) + if tcpConn, ok := conn.(*net.TCPConn); ok { + _ = tcpConn.SetLinger(0) + } + _ = conn.SetDeadline(time.Now().Add(15 * time.Second)) + if tlsConfig != nil { + conn = tls.Server(conn, tlsConfig) + } return conn } - conn := accept() - defer conn.Close() - recv := func() { - bufW.Reset() + recv := func(conn net.Conn) { + bufR := make([]byte, 1) + bufW := bytes.NewBuffer(nil) for { n, err := conn.Read(bufR) require.NoError(t, err) @@ -183,16 +239,22 @@ func TCPServer(t *testing.T, wg *sync.WaitGroup, wg2 *sync.WaitGroup, wg3 *sync. var obj GelfObject err = json.Unmarshal(bufW.Bytes(), &obj) require.NoError(t, err) + assert.Equal(t, obj["short_message"], "telegraf") + assert.Equal(t, obj["_name"], "test1") + assert.Equal(t, obj["_tag1"], "value1") assert.Equal(t, obj["_value"], float64(1)) } - // in TCP scenario only 3 messages are received (1st, 2dn and 4th) due to connection break after the 2nd + conn := accept() + defer conn.Close() - recv() - recv() + // in TCP scenario only 3 messages are received, the 3rd is lost due to simulated connection break after the 2nd + + recv(conn) + recv(conn) _ = conn.Close() wg3.Done() conn = accept() defer conn.Close() - recv() + recv(conn) }