chore(inputs.socket_listener): Remove parser from init (#11462)
This commit is contained in:
parent
fbccc71abb
commit
1c8c057984
|
|
@ -388,14 +388,6 @@ func (sl *SocketListener) Stop() {
|
||||||
sl.wg.Wait()
|
sl.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSocketListener() *SocketListener {
|
|
||||||
parser, _ := parsers.NewInfluxParser()
|
|
||||||
|
|
||||||
return &SocketListener{
|
|
||||||
Parser: parser,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type unixCloser struct {
|
type unixCloser struct {
|
||||||
path string
|
path string
|
||||||
closer io.Closer
|
closer io.Closer
|
||||||
|
|
@ -410,5 +402,5 @@ func (uc unixCloser) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
inputs.Add("socket_listener", func() telegraf.Input { return newSocketListener() })
|
inputs.Add("socket_listener", func() telegraf.Input { return &SocketListener{} })
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ import (
|
||||||
|
|
||||||
"github.com/influxdata/telegraf/config"
|
"github.com/influxdata/telegraf/config"
|
||||||
"github.com/influxdata/telegraf/internal"
|
"github.com/influxdata/telegraf/internal"
|
||||||
|
"github.com/influxdata/telegraf/plugins/parsers"
|
||||||
"github.com/influxdata/telegraf/testutil"
|
"github.com/influxdata/telegraf/testutil"
|
||||||
"github.com/influxdata/wlog"
|
"github.com/influxdata/wlog"
|
||||||
)
|
)
|
||||||
|
|
@ -48,13 +49,16 @@ func TestSocketListener_tcp_tls(t *testing.T) {
|
||||||
testEmptyLog := prepareLog(t)
|
testEmptyLog := prepareLog(t)
|
||||||
defer testEmptyLog()
|
defer testEmptyLog()
|
||||||
|
|
||||||
sl := newSocketListener()
|
sl := &SocketListener{}
|
||||||
|
parser, err := parsers.NewInfluxParser()
|
||||||
|
require.NoError(t, err)
|
||||||
|
sl.SetParser(parser)
|
||||||
sl.Log = testutil.Logger{}
|
sl.Log = testutil.Logger{}
|
||||||
sl.ServiceAddress = "tcp://127.0.0.1:0"
|
sl.ServiceAddress = "tcp://127.0.0.1:0"
|
||||||
sl.ServerConfig = *pki.TLSServerConfig()
|
sl.ServerConfig = *pki.TLSServerConfig()
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
err := sl.Start(acc)
|
err = sl.Start(acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer sl.Stop()
|
defer sl.Stop()
|
||||||
|
|
||||||
|
|
@ -70,13 +74,16 @@ func TestSocketListener_tcp_tls(t *testing.T) {
|
||||||
func TestSocketListener_unix_tls(t *testing.T) {
|
func TestSocketListener_unix_tls(t *testing.T) {
|
||||||
sock := testutil.TempSocket(t)
|
sock := testutil.TempSocket(t)
|
||||||
|
|
||||||
sl := newSocketListener()
|
sl := &SocketListener{}
|
||||||
|
parser, err := parsers.NewInfluxParser()
|
||||||
|
require.NoError(t, err)
|
||||||
|
sl.SetParser(parser)
|
||||||
sl.Log = testutil.Logger{}
|
sl.Log = testutil.Logger{}
|
||||||
sl.ServiceAddress = "unix://" + sock
|
sl.ServiceAddress = "unix://" + sock
|
||||||
sl.ServerConfig = *pki.TLSServerConfig()
|
sl.ServerConfig = *pki.TLSServerConfig()
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
err := sl.Start(acc)
|
err = sl.Start(acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer sl.Stop()
|
defer sl.Stop()
|
||||||
|
|
||||||
|
|
@ -94,13 +101,16 @@ func TestSocketListener_tcp(t *testing.T) {
|
||||||
testEmptyLog := prepareLog(t)
|
testEmptyLog := prepareLog(t)
|
||||||
defer testEmptyLog()
|
defer testEmptyLog()
|
||||||
|
|
||||||
sl := newSocketListener()
|
sl := &SocketListener{}
|
||||||
|
parser, err := parsers.NewInfluxParser()
|
||||||
|
require.NoError(t, err)
|
||||||
|
sl.SetParser(parser)
|
||||||
sl.Log = testutil.Logger{}
|
sl.Log = testutil.Logger{}
|
||||||
sl.ServiceAddress = "tcp://127.0.0.1:0"
|
sl.ServiceAddress = "tcp://127.0.0.1:0"
|
||||||
sl.ReadBufferSize = config.Size(1024)
|
sl.ReadBufferSize = config.Size(1024)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
err := sl.Start(acc)
|
err = sl.Start(acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer sl.Stop()
|
defer sl.Stop()
|
||||||
|
|
||||||
|
|
@ -114,13 +124,16 @@ func TestSocketListener_udp(t *testing.T) {
|
||||||
testEmptyLog := prepareLog(t)
|
testEmptyLog := prepareLog(t)
|
||||||
defer testEmptyLog()
|
defer testEmptyLog()
|
||||||
|
|
||||||
sl := newSocketListener()
|
sl := &SocketListener{}
|
||||||
|
parser, err := parsers.NewInfluxParser()
|
||||||
|
require.NoError(t, err)
|
||||||
|
sl.SetParser(parser)
|
||||||
sl.Log = testutil.Logger{}
|
sl.Log = testutil.Logger{}
|
||||||
sl.ServiceAddress = "udp://127.0.0.1:0"
|
sl.ServiceAddress = "udp://127.0.0.1:0"
|
||||||
sl.ReadBufferSize = config.Size(1024)
|
sl.ReadBufferSize = config.Size(1024)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
err := sl.Start(acc)
|
err = sl.Start(acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer sl.Stop()
|
defer sl.Stop()
|
||||||
|
|
||||||
|
|
@ -138,13 +151,16 @@ func TestSocketListener_unix(t *testing.T) {
|
||||||
|
|
||||||
f, _ := os.Create(sock)
|
f, _ := os.Create(sock)
|
||||||
require.NoError(t, f.Close())
|
require.NoError(t, f.Close())
|
||||||
sl := newSocketListener()
|
sl := &SocketListener{}
|
||||||
|
parser, err := parsers.NewInfluxParser()
|
||||||
|
require.NoError(t, err)
|
||||||
|
sl.SetParser(parser)
|
||||||
sl.Log = testutil.Logger{}
|
sl.Log = testutil.Logger{}
|
||||||
sl.ServiceAddress = "unix://" + sock
|
sl.ServiceAddress = "unix://" + sock
|
||||||
sl.ReadBufferSize = config.Size(1024)
|
sl.ReadBufferSize = config.Size(1024)
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
err := sl.Start(acc)
|
err = sl.Start(acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer sl.Stop()
|
defer sl.Stop()
|
||||||
|
|
||||||
|
|
@ -168,7 +184,10 @@ func TestSocketListener_unixgram(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(func() { require.NoError(t, f.Close()) })
|
t.Cleanup(func() { require.NoError(t, f.Close()) })
|
||||||
|
|
||||||
sl := newSocketListener()
|
sl := &SocketListener{}
|
||||||
|
parser, err := parsers.NewInfluxParser()
|
||||||
|
require.NoError(t, err)
|
||||||
|
sl.SetParser(parser)
|
||||||
sl.Log = testutil.Logger{}
|
sl.Log = testutil.Logger{}
|
||||||
sl.ServiceAddress = "unixgram://" + sock
|
sl.ServiceAddress = "unixgram://" + sock
|
||||||
sl.ReadBufferSize = config.Size(1024)
|
sl.ReadBufferSize = config.Size(1024)
|
||||||
|
|
@ -188,14 +207,17 @@ func TestSocketListenerDecode_tcp(t *testing.T) {
|
||||||
testEmptyLog := prepareLog(t)
|
testEmptyLog := prepareLog(t)
|
||||||
defer testEmptyLog()
|
defer testEmptyLog()
|
||||||
|
|
||||||
sl := newSocketListener()
|
sl := &SocketListener{}
|
||||||
|
parser, err := parsers.NewInfluxParser()
|
||||||
|
require.NoError(t, err)
|
||||||
|
sl.SetParser(parser)
|
||||||
sl.Log = testutil.Logger{}
|
sl.Log = testutil.Logger{}
|
||||||
sl.ServiceAddress = "tcp://127.0.0.1:0"
|
sl.ServiceAddress = "tcp://127.0.0.1:0"
|
||||||
sl.ReadBufferSize = config.Size(1024)
|
sl.ReadBufferSize = config.Size(1024)
|
||||||
sl.ContentEncoding = "gzip"
|
sl.ContentEncoding = "gzip"
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
err := sl.Start(acc)
|
err = sl.Start(acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer sl.Stop()
|
defer sl.Stop()
|
||||||
|
|
||||||
|
|
@ -209,14 +231,17 @@ func TestSocketListenerDecode_udp(t *testing.T) {
|
||||||
testEmptyLog := prepareLog(t)
|
testEmptyLog := prepareLog(t)
|
||||||
defer testEmptyLog()
|
defer testEmptyLog()
|
||||||
|
|
||||||
sl := newSocketListener()
|
sl := &SocketListener{}
|
||||||
|
parser, err := parsers.NewInfluxParser()
|
||||||
|
require.NoError(t, err)
|
||||||
|
sl.SetParser(parser)
|
||||||
sl.Log = testutil.Logger{}
|
sl.Log = testutil.Logger{}
|
||||||
sl.ServiceAddress = "udp://127.0.0.1:0"
|
sl.ServiceAddress = "udp://127.0.0.1:0"
|
||||||
sl.ReadBufferSize = config.Size(1024)
|
sl.ReadBufferSize = config.Size(1024)
|
||||||
sl.ContentEncoding = "gzip"
|
sl.ContentEncoding = "gzip"
|
||||||
|
|
||||||
acc := &testutil.Accumulator{}
|
acc := &testutil.Accumulator{}
|
||||||
err := sl.Start(acc)
|
err = sl.Start(acc)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer sl.Stop()
|
defer sl.Stop()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue