chore(inputs.statsd): Refactor internal stats into their own struct (#16419)

Co-authored-by: James Ribe <james.ribe@omniva.com>
This commit is contained in:
James Ribe 2025-01-22 09:05:25 -08:00 committed by GitHub
parent 0ec17e4229
commit abc3a5ed10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 32 additions and 28 deletions

View File

@ -139,6 +139,12 @@ type Statsd struct {
acc telegraf.Accumulator
bufPool sync.Pool // pool of byte slices to handle parsing
lastGatherTime time.Time
Stats InternalStats
}
type InternalStats struct {
// Internal statistics counters
MaxConnections selfstat.Stat
CurrentConnections selfstat.Stat
@ -151,8 +157,6 @@ type Statsd struct {
ParseTimeNS selfstat.Stat
PendingMessages selfstat.Stat
MaxPendingMessages selfstat.Stat
lastGatherTime time.Time
}
// number will get parsed as an int or float depending on what is passed
@ -249,19 +253,19 @@ func (s *Statsd) Start(ac telegraf.Accumulator) error {
tags := map[string]string{
"address": s.ServiceAddress,
}
s.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags)
s.MaxConnections.Set(int64(s.MaxTCPConnections))
s.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags)
s.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags)
s.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
s.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags)
s.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags)
s.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags)
s.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags)
s.PendingMessages = selfstat.Register("statsd", "pending_messages", tags)
s.MaxPendingMessages = selfstat.Register("statsd", "max_pending_messages", tags)
s.MaxPendingMessages.Set(int64(s.AllowedPendingMessages))
s.Stats.MaxConnections = selfstat.Register("statsd", "tcp_max_connections", tags)
s.Stats.MaxConnections.Set(int64(s.MaxTCPConnections))
s.Stats.CurrentConnections = selfstat.Register("statsd", "tcp_current_connections", tags)
s.Stats.TotalConnections = selfstat.Register("statsd", "tcp_total_connections", tags)
s.Stats.TCPPacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
s.Stats.TCPBytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
s.Stats.UDPPacketsRecv = selfstat.Register("statsd", "udp_packets_received", tags)
s.Stats.UDPPacketsDrop = selfstat.Register("statsd", "udp_packets_dropped", tags)
s.Stats.UDPBytesRecv = selfstat.Register("statsd", "udp_bytes_received", tags)
s.Stats.ParseTimeNS = selfstat.Register("statsd", "parse_time_ns", tags)
s.Stats.PendingMessages = selfstat.Register("statsd", "pending_messages", tags)
s.Stats.MaxPendingMessages = selfstat.Register("statsd", "max_pending_messages", tags)
s.Stats.MaxPendingMessages.Set(int64(s.AllowedPendingMessages))
s.in = make(chan input, s.AllowedPendingMessages)
s.done = make(chan struct{})
@ -545,8 +549,8 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
}
return nil
}
s.UDPPacketsRecv.Incr(1)
s.UDPBytesRecv.Incr(int64(n))
s.Stats.UDPPacketsRecv.Incr(1)
s.Stats.UDPBytesRecv.Incr(int64(n))
b, ok := s.bufPool.Get().(*bytes.Buffer)
if !ok {
return errors.New("bufPool is not a bytes buffer")
@ -558,9 +562,9 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
Buffer: b,
Time: time.Now(),
Addr: addr.IP.String()}:
s.PendingMessages.Set(int64(len(s.in)))
s.Stats.PendingMessages.Set(int64(len(s.in)))
default:
s.UDPPacketsDrop.Incr(1)
s.Stats.UDPPacketsDrop.Incr(1)
s.drops++
if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 {
s.Log.Errorf("Statsd message queue full. "+
@ -581,7 +585,7 @@ func (s *Statsd) parser() error {
case <-s.done:
return nil
case in := <-s.in:
s.PendingMessages.Set(int64(len(s.in)))
s.Stats.PendingMessages.Set(int64(len(s.in)))
start := time.Now()
lines := strings.Split(in.Buffer.String(), "\n")
s.bufPool.Put(in.Buffer)
@ -608,7 +612,7 @@ func (s *Statsd) parser() error {
}
}
elapsed := time.Since(start)
s.ParseTimeNS.Set(elapsed.Nanoseconds())
s.Stats.ParseTimeNS.Set(elapsed.Nanoseconds())
}
}
}
@ -958,8 +962,8 @@ func (s *Statsd) aggregate(m metric) {
// handler handles a single TCP Connection
func (s *Statsd) handler(conn *net.TCPConn, id string) {
s.CurrentConnections.Incr(1)
s.TotalConnections.Incr(1)
s.Stats.CurrentConnections.Incr(1)
s.Stats.TotalConnections.Incr(1)
// connection cleanup function
defer func() {
s.wg.Done()
@ -968,7 +972,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
// Add one connection potential back to channel when this one closes
s.accept <- true
s.forget(id)
s.CurrentConnections.Incr(-1)
s.Stats.CurrentConnections.Incr(-1)
}()
var remoteIP string
@ -990,8 +994,8 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
if n == 0 {
continue
}
s.TCPBytesRecv.Incr(int64(n))
s.TCPPacketsRecv.Incr(1)
s.Stats.TCPBytesRecv.Incr(int64(n))
s.Stats.TCPPacketsRecv.Incr(1)
b := s.bufPool.Get().(*bytes.Buffer)
b.Reset()
@ -1000,7 +1004,7 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
select {
case s.in <- input{Buffer: b, Time: time.Now(), Addr: remoteIP}:
s.PendingMessages.Set(int64(len(s.in)))
s.Stats.PendingMessages.Set(int64(len(s.in)))
default:
s.drops++
if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {

View File

@ -2177,7 +2177,7 @@ func TestUdpFillQueue(t *testing.T) {
require.NoError(t, conn.Close())
require.Eventually(t, func() bool {
return plugin.UDPPacketsRecv.Get() >= int64(numberToSend)
return plugin.Stats.UDPPacketsRecv.Get() >= int64(numberToSend)
}, 1*time.Second, 100*time.Millisecond)
defer plugin.Stop()