diff --git a/plugins/common/ratelimiter/limiters.go b/plugins/common/ratelimiter/limiters.go index f24d08b62..a4c70092b 100644 --- a/plugins/common/ratelimiter/limiters.go +++ b/plugins/common/ratelimiter/limiters.go @@ -3,6 +3,7 @@ package ratelimiter import ( "errors" "math" + "sync" "time" ) @@ -14,10 +15,17 @@ type RateLimiter struct { limit int64 period time.Duration periodStart time.Time - remaining int64 + + remaining int64 + reserved int64 + + sync.Mutex } func (r *RateLimiter) Remaining(t time.Time) int64 { + r.Lock() + defer r.Unlock() + if r.limit == 0 { return math.MaxInt64 } @@ -33,10 +41,27 @@ func (r *RateLimiter) Remaining(t time.Time) int64 { return r.limit } - return r.remaining + return r.remaining - r.reserved +} + +func (r *RateLimiter) Reserve(used int64) { + r.Lock() + defer r.Unlock() + + r.reserved = max(r.reserved+used, used) +} + +func (r *RateLimiter) Release() { + r.Lock() + defer r.Unlock() + + r.reserved = 0 } func (r *RateLimiter) Accept(t time.Time, used int64) { + r.Lock() + defer r.Unlock() + if r.limit == 0 || r.periodStart.After(t) { return } @@ -52,9 +77,13 @@ func (r *RateLimiter) Accept(t time.Time, used int64) { // Update the state r.remaining = max(r.remaining-used, 0) + r.reserved = max(r.reserved-used, 0) } func (r *RateLimiter) Undo(t time.Time, used int64) { + r.Lock() + defer r.Unlock() + // Do nothing if we are not in the current period or unlimited because we // already reset the limit on a new window. if r.limit == 0 || r.periodStart.IsZero() || r.periodStart.After(t) || t.Sub(r.periodStart) >= r.period {