From c2854204aeeec7766623fd266a5b6792c8653f53 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Fri, 17 Feb 2023 21:46:23 +0100 Subject: [PATCH] feat(inputs.statsd): add pending messages stat and allow to configure number of threads (#12318) --- plugins/inputs/statsd/README.md | 3 + plugins/inputs/statsd/sample.conf | 3 + plugins/inputs/statsd/statsd.go | 69 +++++------ plugins/inputs/statsd/statsd_test.go | 175 ++++++++++++++++++++++++++- 4 files changed, 211 insertions(+), 39 deletions(-) diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index 552517e95..3a4cdc5c1 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -75,6 +75,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## the statsd server will start dropping packets allowed_pending_messages = 10000 + ## Number of worker threads used to parse the incoming messages. + # number_workers_threads = 5 + ## Number of timing/histogram values to track per-measurement in the ## calculation of percentiles. Raising this limit increases the accuracy ## of percentiles but also increases the memory usage and cpu time. diff --git a/plugins/inputs/statsd/sample.conf b/plugins/inputs/statsd/sample.conf index c60e7b17b..8b3da152e 100644 --- a/plugins/inputs/statsd/sample.conf +++ b/plugins/inputs/statsd/sample.conf @@ -59,6 +59,9 @@ ## the statsd server will start dropping packets allowed_pending_messages = 10000 + ## Number of worker threads used to parse the incoming messages. + # number_workers_threads = 5 + ## Number of timing/histogram values to track per-measurement in the ## calculation of percentiles. Raising this limit increases the accuracy ## of percentiles but also increases the memory usage and cpu time. diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 16f7e92b7..d5006927c 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -38,8 +38,6 @@ const ( defaultSeparator = "_" defaultAllowPendingMessage = 10000 - - parserGoRoutines = 5 ) var errParsing = errors.New("error parsing statsd line") @@ -63,25 +61,25 @@ type Statsd struct { Protocol string `toml:"protocol"` // Address & Port to serve from - ServiceAddress string + ServiceAddress string `toml:"service_address"` // Number of messages allowed to queue up in between calls to Gather. If this // fills up, packets will get dropped until the next Gather interval is ran. - AllowedPendingMessages int + AllowedPendingMessages int `toml:"allowed_pending_messages"` + NumberWorkerThreads int `toml:"number_workers_threads"` // Percentiles specifies the percentiles that will be calculated for timing // and histogram stats. - Percentiles []Number - PercentileLimit int - - DeleteGauges bool - DeleteCounters bool - DeleteSets bool - DeleteTimings bool - ConvertNames bool `toml:"convert_names" deprecated:"0.12.0;2.0.0;use 'metric_separator' instead"` + Percentiles []Number `toml:"percentiles"` + PercentileLimit int `toml:"percentile_limit"` + DeleteGauges bool `toml:"delete_gauges"` + DeleteCounters bool `toml:"delete_counters"` + DeleteSets bool `toml:"delete_sets"` + DeleteTimings bool `toml:"delete_timings"` + ConvertNames bool `toml:"convert_names" deprecated:"0.12.0;2.0.0;use 'metric_separator' instead"` // MetricSeparator is the separator between parts of the metric name. - MetricSeparator string + MetricSeparator string `toml:"metric_separator"` // This flag enables parsing of tags in the dogstatsd extension to the // statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/) ParseDataDogTags bool `toml:"parse_data_dog_tags" deprecated:"1.10.0;use 'datadog_extensions' instead"` @@ -102,9 +100,16 @@ type Statsd struct { // see https://github.com/influxdata/telegraf/pull/992 UDPPacketSize int `toml:"udp_packet_size" deprecated:"0.12.1;2.0.0;option is ignored"` - ReadBufferSize int `toml:"read_buffer_size"` + ReadBufferSize int `toml:"read_buffer_size"` + SanitizeNamesMethod string `toml:"sanitize_name_method"` + Templates []string `toml:"templates"` // bucket -> influx templates + MaxTCPConnections int `toml:"max_tcp_connections"` + TCPKeepAlive bool `toml:"tcp_keep_alive"` + TCPKeepAlivePeriod *config.Duration `toml:"tcp_keep_alive_period"` - SanitizeNamesMethod string `toml:"sanitize_name_method"` + // Max duration for each metric to stay cached without being updated. + MaxTTL config.Duration `toml:"max_ttl"` + Log telegraf.Logger `toml:"-"` sync.Mutex // Lock for preventing a data race during resource cleanup @@ -131,28 +136,17 @@ type Statsd struct { timings map[string]cachedtimings distributions []cacheddistributions - // bucket -> influx templates - Templates []string - // Protocol listeners UDPlistener *net.UDPConn TCPlistener *net.TCPListener // track current connections so we can close them in Stop() - conns map[string]*net.TCPConn - - MaxTCPConnections int `toml:"max_tcp_connections"` - - TCPKeepAlive bool `toml:"tcp_keep_alive"` - TCPKeepAlivePeriod *config.Duration `toml:"tcp_keep_alive_period"` - - // Max duration for each metric to stay cached without being updated. - MaxTTL config.Duration `toml:"max_ttl"` - + conns map[string]*net.TCPConn graphiteParser *graphite.Parser + acc telegraf.Accumulator + bufPool sync.Pool // pool of byte slices to handle parsing - acc telegraf.Accumulator - + // Internal statistics counters MaxConnections selfstat.Stat CurrentConnections selfstat.Stat TotalConnections selfstat.Stat @@ -162,11 +156,7 @@ type Statsd struct { UDPPacketsDrop selfstat.Stat UDPBytesRecv selfstat.Stat ParseTimeNS selfstat.Stat - - Log telegraf.Logger `toml:"-"` - - // A pool of byte slices to handle parsing - bufPool sync.Pool + PendingMessages selfstat.Stat } type input struct { @@ -330,6 +320,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { s.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags) s.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags) s.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags) + s.PendingMessages = selfstat.Register("statsd", "pending_messages", tags) s.in = make(chan input, s.AllowedPendingMessages) s.done = make(chan struct{}) @@ -391,7 +382,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error { }() } - for i := 1; i <= parserGoRoutines; i++ { + for i := 1; i <= s.NumberWorkerThreads; i++ { // Start the line parser s.wg.Add(1) go func() { @@ -487,6 +478,7 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error { Buffer: b, Time: time.Now(), Addr: addr.IP.String()}: + s.PendingMessages.Set(int64(len(s.in))) default: s.UDPPacketsDrop.Incr(1) s.drops++ @@ -509,6 +501,7 @@ func (s *Statsd) parser() error { case <-s.done: return nil case in := <-s.in: + s.PendingMessages.Set(int64(len(s.in))) start := time.Now() lines := strings.Split(in.Buffer.String(), "\n") s.bufPool.Put(in.Buffer) @@ -913,6 +906,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) { select { case s.in <- input{Buffer: b, Time: time.Now(), Addr: remoteIP}: + s.PendingMessages.Set(int64(len(s.in))) default: s.drops++ if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { @@ -1022,14 +1016,13 @@ func init() { Protocol: defaultProtocol, ServiceAddress: ":8125", MaxTCPConnections: 250, - TCPKeepAlive: false, MetricSeparator: "_", AllowedPendingMessages: defaultAllowPendingMessage, DeleteCounters: true, DeleteGauges: true, DeleteSets: true, DeleteTimings: true, - SanitizeNamesMethod: "", + NumberWorkerThreads: 5, } }) } diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 3661160e6..f1d2c0099 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -20,7 +20,10 @@ const ( ) func NewTestStatsd() *Statsd { - s := Statsd{Log: testutil.Logger{}} + s := Statsd{ + Log: testutil.Logger{}, + NumberWorkerThreads: 5, + } // Make data structures s.done = make(chan struct{}) @@ -44,6 +47,7 @@ func TestConcurrentConns(t *testing.T) { ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, MaxTCPConnections: 2, + NumberWorkerThreads: 5, } acc := &testutil.Accumulator{} @@ -75,6 +79,7 @@ func TestConcurrentConns1(t *testing.T) { ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, MaxTCPConnections: 1, + NumberWorkerThreads: 5, } acc := &testutil.Accumulator{} @@ -104,6 +109,7 @@ func TestCloseConcurrentConns(t *testing.T) { ServiceAddress: "localhost:8125", AllowedPendingMessages: 10000, MaxTCPConnections: 2, + NumberWorkerThreads: 5, } acc := &testutil.Accumulator{} @@ -118,6 +124,27 @@ func TestCloseConcurrentConns(t *testing.T) { listener.Stop() } +// benchmark how long it takes to parse metrics: +func BenchmarkParser(b *testing.B) { + plugin := Statsd{ + Log: testutil.Logger{}, + Protocol: "udp", + ServiceAddress: "localhost:8125", + AllowedPendingMessages: 250000, + NumberWorkerThreads: 5, + } + acc := &testutil.Accumulator{Discard: true} + + require.NoError(b, plugin.Start(acc)) + + // send multiple messages to socket + for n := 0; n < b.N; n++ { + require.NoError(b, plugin.parseStatsdLine(testMsg)) + } + + plugin.Stop() +} + // benchmark how long it takes to accept & process 100,000 metrics: func BenchmarkUDP(b *testing.B) { listener := Statsd{ @@ -125,6 +152,7 @@ func BenchmarkUDP(b *testing.B) { Protocol: "udp", ServiceAddress: "localhost:8125", AllowedPendingMessages: 250000, + NumberWorkerThreads: 5, } acc := &testutil.Accumulator{Discard: true} @@ -151,6 +179,117 @@ func BenchmarkUDP(b *testing.B) { } } +func BenchmarkUDPThreads4(b *testing.B) { + listener := Statsd{ + Log: testutil.Logger{}, + Protocol: "udp", + ServiceAddress: "localhost:8125", + AllowedPendingMessages: 250000, + NumberWorkerThreads: 4, + } + + acc := &testutil.Accumulator{Discard: true} + require.NoError(b, listener.Start(acc)) + + time.Sleep(time.Millisecond * 250) + conn, err := net.Dial("udp", "127.0.0.1:8125") + require.NoError(b, err) + defer conn.Close() + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + _, err := conn.Write([]byte(testMsg)) + require.NoError(b, err) + } + }() + } + wg.Wait() + + // wait for 250,000 metrics to get added to accumulator + for len(listener.in) > 0 { + time.Sleep(time.Millisecond) + } + listener.Stop() +} + +func BenchmarkUDPThreads8(b *testing.B) { + listener := Statsd{ + Log: testutil.Logger{}, + Protocol: "udp", + ServiceAddress: "localhost:8125", + AllowedPendingMessages: 250000, + NumberWorkerThreads: 8, + } + + acc := &testutil.Accumulator{Discard: true} + require.NoError(b, listener.Start(acc)) + + time.Sleep(time.Millisecond * 250) + conn, err := net.Dial("udp", "127.0.0.1:8125") + require.NoError(b, err) + defer conn.Close() + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + _, err := conn.Write([]byte(testMsg)) + require.NoError(b, err) + } + }() + } + wg.Wait() + + // wait for 250,000 metrics to get added to accumulator + for len(listener.in) > 0 { + time.Sleep(time.Millisecond) + } + listener.Stop() +} + +func BenchmarkUDPThreads16(b *testing.B) { + listener := Statsd{ + Log: testutil.Logger{}, + Protocol: "udp", + ServiceAddress: "localhost:8125", + AllowedPendingMessages: 250000, + NumberWorkerThreads: 16, + } + + acc := &testutil.Accumulator{Discard: true} + require.NoError(b, listener.Start(acc)) + + time.Sleep(time.Millisecond * 250) + conn, err := net.Dial("udp", "127.0.0.1:8125") + require.NoError(b, err) + defer conn.Close() + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + _, err := conn.Write([]byte(testMsg)) + require.NoError(b, err) + } + }() + } + wg.Wait() + + // wait for 250,000 metrics to get added to accumulator + for len(listener.in) > 0 { + time.Sleep(time.Millisecond) + } + listener.Stop() +} + func sendRequests(conn net.Conn, wg *sync.WaitGroup) { defer wg.Done() for i := 0; i < 25000; i++ { @@ -166,6 +305,7 @@ func BenchmarkTCP(b *testing.B) { ServiceAddress: "localhost:8125", AllowedPendingMessages: 250000, MaxTCPConnections: 250, + NumberWorkerThreads: 5, } acc := &testutil.Accumulator{Discard: true} @@ -1586,6 +1726,7 @@ func TestTCP(t *testing.T) { ServiceAddress: "localhost:0", AllowedPendingMessages: 10000, MaxTCPConnections: 2, + NumberWorkerThreads: 5, } var acc testutil.Accumulator require.NoError(t, statsd.Start(&acc)) @@ -1633,6 +1774,7 @@ func TestUdp(t *testing.T) { Protocol: "udp", ServiceAddress: "localhost:14223", AllowedPendingMessages: 250000, + NumberWorkerThreads: 5, } var acc testutil.Accumulator require.NoError(t, statsd.Start(&acc)) @@ -1671,6 +1813,36 @@ func TestUdp(t *testing.T) { ) } +func TestUdpFillQueue(t *testing.T) { + logger := testutil.CaptureLogger{} + plugin := &Statsd{ + Log: &logger, + Protocol: "udp", + ServiceAddress: "localhost:0", + AllowedPendingMessages: 10, + NumberWorkerThreads: 5, + } + + var acc testutil.Accumulator + require.NoError(t, plugin.Start(&acc)) + + conn, err := net.Dial("udp", plugin.UDPlistener.LocalAddr().String()) + require.NoError(t, err) + numberToSend := plugin.AllowedPendingMessages + for i := 0; i < numberToSend; i++ { + _, _ = fmt.Fprintf(conn, "cpu.time_idle:%d|c\n", i) + } + require.NoError(t, conn.Close()) + + require.Eventually(t, func() bool { + return plugin.UDPPacketsRecv.Get() >= int64(numberToSend) + }, 1*time.Second, 100*time.Millisecond) + defer plugin.Stop() + + errs := logger.Errors() + require.Lenf(t, errs, 0, "got errors: %v", errs) +} + func TestParse_Ints(t *testing.T) { s := NewTestStatsd() s.Percentiles = []Number{90} @@ -1779,6 +1951,7 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) { AllowedPendingMessages: 10000, MaxTCPConnections: 250, TCPKeepAlive: true, + NumberWorkerThreads: 5, } acc := &testutil.Accumulator{}