diff --git a/CHANGELOG.md b/CHANGELOG.md index abbb1f44a..af9444b81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ### Features +- [#1340](https://github.com/influxdata/telegraf/issues/1340): statsd: do not log every dropped metric. + ### Bugfixes - [#1330](https://github.com/influxdata/telegraf/issues/1330): Fix exec plugin panic when using single binary. diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 69638af06..fb191974f 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -27,7 +27,8 @@ const ( defaultSeparator = "_" ) -var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + +var dropwarn = "ERROR: statsd message queue full. " + + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" var prevInstance *Statsd @@ -65,6 +66,8 @@ type Statsd struct { sync.Mutex wg sync.WaitGroup + // drops tracks the number of dropped metrics. + drops int // Channel for all incoming statsd packets in chan []byte @@ -291,7 +294,10 @@ func (s *Statsd) udpListen() error { select { case s.in <- bufCopy: default: - log.Printf(dropwarn, string(buf[:n])) + s.drops++ + if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 { + log.Printf(dropwarn, s.drops) + } } } } diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go index a420ed759..053fc927e 100644 --- a/plugins/inputs/tcp_listener/tcp_listener.go +++ b/plugins/inputs/tcp_listener/tcp_listener.go @@ -29,6 +29,8 @@ type TcpListener struct { // is an available bool in accept, then we are below the maximum and can // accept the connection accept chan bool + // drops tracks the number of dropped metrics. + drops int // track the listener here so we can close it in Stop() listener *net.TCPListener @@ -39,7 +41,8 @@ type TcpListener struct { acc telegraf.Accumulator } -var dropwarn = "ERROR: Message queue full. Discarding metric [%s], " + +var dropwarn = "ERROR: tcp_listener message queue full. " + + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" const sampleConfig = ` @@ -212,7 +215,10 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) { select { case t.in <- bufCopy: default: - log.Printf(dropwarn, scanner.Text()) + t.drops++ + if t.drops == 1 || t.drops%t.AllowedPendingMessages == 0 { + log.Printf(dropwarn, t.drops) + } } } } diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go index 8e2637ce7..a20a5583f 100644 --- a/plugins/inputs/udp_listener/udp_listener.go +++ b/plugins/inputs/udp_listener/udp_listener.go @@ -25,6 +25,8 @@ type UdpListener struct { in chan []byte done chan struct{} + // drops tracks the number of dropped metrics. + drops int parser parsers.Parser @@ -38,7 +40,8 @@ type UdpListener struct { // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure const UDP_MAX_PACKET_SIZE int = 64 * 1024 -var dropwarn = "ERROR: Message queue full. Discarding line [%s] " + +var dropwarn = "ERROR: udp_listener message queue full. " + + "We have dropped %d messages so far. " + "You may want to increase allowed_pending_messages in the config\n" const sampleConfig = ` @@ -125,7 +128,10 @@ func (u *UdpListener) udpListen() error { select { case u.in <- bufCopy: default: - log.Printf(dropwarn, string(bufCopy)) + u.drops++ + if u.drops == 1 || u.drops%u.AllowedPendingMessages == 0 { + log.Printf(dropwarn, u.drops) + } } } }