feat(inputs.statsd): add pending messages stat and allow to configure number of threads (#12318)

This commit is contained in:
Sven Rebhan 2023-02-17 21:46:23 +01:00 committed by GitHub
parent 1e04e3822c
commit c2854204ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 211 additions and 39 deletions

View File

@ -75,6 +75,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## the statsd server will start dropping packets
allowed_pending_messages = 10000
## Number of worker threads used to parse the incoming messages.
# number_workers_threads = 5
## Number of timing/histogram values to track per-measurement in the
## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time.

View File

@ -59,6 +59,9 @@
## the statsd server will start dropping packets
allowed_pending_messages = 10000
## Number of worker threads used to parse the incoming messages.
# number_workers_threads = 5
## Number of timing/histogram values to track per-measurement in the
## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time.

View File

@ -38,8 +38,6 @@ const (
defaultSeparator = "_"
defaultAllowPendingMessage = 10000
parserGoRoutines = 5
)
var errParsing = errors.New("error parsing statsd line")
@ -63,25 +61,25 @@ type Statsd struct {
Protocol string `toml:"protocol"`
// Address & Port to serve from
ServiceAddress string
ServiceAddress string `toml:"service_address"`
// Number of messages allowed to queue up in between calls to Gather. If this
// fills up, packets will get dropped until the next Gather interval is ran.
AllowedPendingMessages int
AllowedPendingMessages int `toml:"allowed_pending_messages"`
NumberWorkerThreads int `toml:"number_workers_threads"`
// Percentiles specifies the percentiles that will be calculated for timing
// and histogram stats.
Percentiles []Number
PercentileLimit int
DeleteGauges bool
DeleteCounters bool
DeleteSets bool
DeleteTimings bool
ConvertNames bool `toml:"convert_names" deprecated:"0.12.0;2.0.0;use 'metric_separator' instead"`
Percentiles []Number `toml:"percentiles"`
PercentileLimit int `toml:"percentile_limit"`
DeleteGauges bool `toml:"delete_gauges"`
DeleteCounters bool `toml:"delete_counters"`
DeleteSets bool `toml:"delete_sets"`
DeleteTimings bool `toml:"delete_timings"`
ConvertNames bool `toml:"convert_names" deprecated:"0.12.0;2.0.0;use 'metric_separator' instead"`
// MetricSeparator is the separator between parts of the metric name.
MetricSeparator string
MetricSeparator string `toml:"metric_separator"`
// This flag enables parsing of tags in the dogstatsd extension to the
// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
ParseDataDogTags bool `toml:"parse_data_dog_tags" deprecated:"1.10.0;use 'datadog_extensions' instead"`
@ -102,9 +100,16 @@ type Statsd struct {
// see https://github.com/influxdata/telegraf/pull/992
UDPPacketSize int `toml:"udp_packet_size" deprecated:"0.12.1;2.0.0;option is ignored"`
ReadBufferSize int `toml:"read_buffer_size"`
ReadBufferSize int `toml:"read_buffer_size"`
SanitizeNamesMethod string `toml:"sanitize_name_method"`
Templates []string `toml:"templates"` // bucket -> influx templates
MaxTCPConnections int `toml:"max_tcp_connections"`
TCPKeepAlive bool `toml:"tcp_keep_alive"`
TCPKeepAlivePeriod *config.Duration `toml:"tcp_keep_alive_period"`
SanitizeNamesMethod string `toml:"sanitize_name_method"`
// Max duration for each metric to stay cached without being updated.
MaxTTL config.Duration `toml:"max_ttl"`
Log telegraf.Logger `toml:"-"`
sync.Mutex
// Lock for preventing a data race during resource cleanup
@ -131,28 +136,17 @@ type Statsd struct {
timings map[string]cachedtimings
distributions []cacheddistributions
// bucket -> influx templates
Templates []string
// Protocol listeners
UDPlistener *net.UDPConn
TCPlistener *net.TCPListener
// track current connections so we can close them in Stop()
conns map[string]*net.TCPConn
MaxTCPConnections int `toml:"max_tcp_connections"`
TCPKeepAlive bool `toml:"tcp_keep_alive"`
TCPKeepAlivePeriod *config.Duration `toml:"tcp_keep_alive_period"`
// Max duration for each metric to stay cached without being updated.
MaxTTL config.Duration `toml:"max_ttl"`
conns map[string]*net.TCPConn
graphiteParser *graphite.Parser
acc telegraf.Accumulator
bufPool sync.Pool // pool of byte slices to handle parsing
acc telegraf.Accumulator
// Internal statistics counters
MaxConnections selfstat.Stat
CurrentConnections selfstat.Stat
TotalConnections selfstat.Stat
@ -162,11 +156,7 @@ type Statsd struct {
UDPPacketsDrop selfstat.Stat
UDPBytesRecv selfstat.Stat
ParseTimeNS selfstat.Stat
Log telegraf.Logger `toml:"-"`
// A pool of byte slices to handle parsing
bufPool sync.Pool
PendingMessages selfstat.Stat
}
type input struct {
@ -330,6 +320,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
s.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags)
s.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags)
s.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags)
s.PendingMessages = selfstat.Register("statsd", "pending_messages", tags)
s.in = make(chan input, s.AllowedPendingMessages)
s.done = make(chan struct{})
@ -391,7 +382,7 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
}()
}
for i := 1; i <= parserGoRoutines; i++ {
for i := 1; i <= s.NumberWorkerThreads; i++ {
// Start the line parser
s.wg.Add(1)
go func() {
@ -487,6 +478,7 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
Buffer: b,
Time: time.Now(),
Addr: addr.IP.String()}:
s.PendingMessages.Set(int64(len(s.in)))
default:
s.UDPPacketsDrop.Incr(1)
s.drops++
@ -509,6 +501,7 @@ func (s *Statsd) parser() error {
case <-s.done:
return nil
case in := <-s.in:
s.PendingMessages.Set(int64(len(s.in)))
start := time.Now()
lines := strings.Split(in.Buffer.String(), "\n")
s.bufPool.Put(in.Buffer)
@ -913,6 +906,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
select {
case s.in <- input{Buffer: b, Time: time.Now(), Addr: remoteIP}:
s.PendingMessages.Set(int64(len(s.in)))
default:
s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {
@ -1022,14 +1016,13 @@ func init() {
Protocol: defaultProtocol,
ServiceAddress: ":8125",
MaxTCPConnections: 250,
TCPKeepAlive: false,
MetricSeparator: "_",
AllowedPendingMessages: defaultAllowPendingMessage,
DeleteCounters: true,
DeleteGauges: true,
DeleteSets: true,
DeleteTimings: true,
SanitizeNamesMethod: "",
NumberWorkerThreads: 5,
}
})
}

View File

@ -20,7 +20,10 @@ const (
)
func NewTestStatsd() *Statsd {
s := Statsd{Log: testutil.Logger{}}
s := Statsd{
Log: testutil.Logger{},
NumberWorkerThreads: 5,
}
// Make data structures
s.done = make(chan struct{})
@ -44,6 +47,7 @@ func TestConcurrentConns(t *testing.T) {
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
NumberWorkerThreads: 5,
}
acc := &testutil.Accumulator{}
@ -75,6 +79,7 @@ func TestConcurrentConns1(t *testing.T) {
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 1,
NumberWorkerThreads: 5,
}
acc := &testutil.Accumulator{}
@ -104,6 +109,7 @@ func TestCloseConcurrentConns(t *testing.T) {
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
NumberWorkerThreads: 5,
}
acc := &testutil.Accumulator{}
@ -118,6 +124,27 @@ func TestCloseConcurrentConns(t *testing.T) {
listener.Stop()
}
// benchmark how long it takes to parse metrics:
func BenchmarkParser(b *testing.B) {
plugin := Statsd{
Log: testutil.Logger{},
Protocol: "udp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 250000,
NumberWorkerThreads: 5,
}
acc := &testutil.Accumulator{Discard: true}
require.NoError(b, plugin.Start(acc))
// send multiple messages to socket
for n := 0; n < b.N; n++ {
require.NoError(b, plugin.parseStatsdLine(testMsg))
}
plugin.Stop()
}
// benchmark how long it takes to accept & process 100,000 metrics:
func BenchmarkUDP(b *testing.B) {
listener := Statsd{
@ -125,6 +152,7 @@ func BenchmarkUDP(b *testing.B) {
Protocol: "udp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 250000,
NumberWorkerThreads: 5,
}
acc := &testutil.Accumulator{Discard: true}
@ -151,6 +179,117 @@ func BenchmarkUDP(b *testing.B) {
}
}
func BenchmarkUDPThreads4(b *testing.B) {
listener := Statsd{
Log: testutil.Logger{},
Protocol: "udp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 250000,
NumberWorkerThreads: 4,
}
acc := &testutil.Accumulator{Discard: true}
require.NoError(b, listener.Start(acc))
time.Sleep(time.Millisecond * 250)
conn, err := net.Dial("udp", "127.0.0.1:8125")
require.NoError(b, err)
defer conn.Close()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
_, err := conn.Write([]byte(testMsg))
require.NoError(b, err)
}
}()
}
wg.Wait()
// wait for 250,000 metrics to get added to accumulator
for len(listener.in) > 0 {
time.Sleep(time.Millisecond)
}
listener.Stop()
}
func BenchmarkUDPThreads8(b *testing.B) {
listener := Statsd{
Log: testutil.Logger{},
Protocol: "udp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 250000,
NumberWorkerThreads: 8,
}
acc := &testutil.Accumulator{Discard: true}
require.NoError(b, listener.Start(acc))
time.Sleep(time.Millisecond * 250)
conn, err := net.Dial("udp", "127.0.0.1:8125")
require.NoError(b, err)
defer conn.Close()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
_, err := conn.Write([]byte(testMsg))
require.NoError(b, err)
}
}()
}
wg.Wait()
// wait for 250,000 metrics to get added to accumulator
for len(listener.in) > 0 {
time.Sleep(time.Millisecond)
}
listener.Stop()
}
func BenchmarkUDPThreads16(b *testing.B) {
listener := Statsd{
Log: testutil.Logger{},
Protocol: "udp",
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 250000,
NumberWorkerThreads: 16,
}
acc := &testutil.Accumulator{Discard: true}
require.NoError(b, listener.Start(acc))
time.Sleep(time.Millisecond * 250)
conn, err := net.Dial("udp", "127.0.0.1:8125")
require.NoError(b, err)
defer conn.Close()
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
_, err := conn.Write([]byte(testMsg))
require.NoError(b, err)
}
}()
}
wg.Wait()
// wait for 250,000 metrics to get added to accumulator
for len(listener.in) > 0 {
time.Sleep(time.Millisecond)
}
listener.Stop()
}
func sendRequests(conn net.Conn, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 25000; i++ {
@ -166,6 +305,7 @@ func BenchmarkTCP(b *testing.B) {
ServiceAddress: "localhost:8125",
AllowedPendingMessages: 250000,
MaxTCPConnections: 250,
NumberWorkerThreads: 5,
}
acc := &testutil.Accumulator{Discard: true}
@ -1586,6 +1726,7 @@ func TestTCP(t *testing.T) {
ServiceAddress: "localhost:0",
AllowedPendingMessages: 10000,
MaxTCPConnections: 2,
NumberWorkerThreads: 5,
}
var acc testutil.Accumulator
require.NoError(t, statsd.Start(&acc))
@ -1633,6 +1774,7 @@ func TestUdp(t *testing.T) {
Protocol: "udp",
ServiceAddress: "localhost:14223",
AllowedPendingMessages: 250000,
NumberWorkerThreads: 5,
}
var acc testutil.Accumulator
require.NoError(t, statsd.Start(&acc))
@ -1671,6 +1813,36 @@ func TestUdp(t *testing.T) {
)
}
func TestUdpFillQueue(t *testing.T) {
logger := testutil.CaptureLogger{}
plugin := &Statsd{
Log: &logger,
Protocol: "udp",
ServiceAddress: "localhost:0",
AllowedPendingMessages: 10,
NumberWorkerThreads: 5,
}
var acc testutil.Accumulator
require.NoError(t, plugin.Start(&acc))
conn, err := net.Dial("udp", plugin.UDPlistener.LocalAddr().String())
require.NoError(t, err)
numberToSend := plugin.AllowedPendingMessages
for i := 0; i < numberToSend; i++ {
_, _ = fmt.Fprintf(conn, "cpu.time_idle:%d|c\n", i)
}
require.NoError(t, conn.Close())
require.Eventually(t, func() bool {
return plugin.UDPPacketsRecv.Get() >= int64(numberToSend)
}, 1*time.Second, 100*time.Millisecond)
defer plugin.Stop()
errs := logger.Errors()
require.Lenf(t, errs, 0, "got errors: %v", errs)
}
func TestParse_Ints(t *testing.T) {
s := NewTestStatsd()
s.Percentiles = []Number{90}
@ -1779,6 +1951,7 @@ func TestParse_InvalidAndRecoverIntegration(t *testing.T) {
AllowedPendingMessages: 10000,
MaxTCPConnections: 250,
TCPKeepAlive: true,
NumberWorkerThreads: 5,
}
acc := &testutil.Accumulator{}