Skip to content
123 changes: 97 additions & 26 deletions chronos/ratelimit.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,55 +14,109 @@ import timer
export timer

type
ReplenishMode* = enum
# Strict mode allows replenish tokens only after the fill duration has elapsed.
Strict

# For better utilization of available tokens and tolerate burst periods.
# Balanced mode allows minting tokens based on elapsed time in between consuming of tokens up to budget capacity.
Balanced

BucketWaiter = object
future: Future[void]
value: int
alreadyConsumed: int

TokenBucket* = ref object
budget: int
budgetCap: int
budgetCapacity: int
lastUpdate: Moment
fillDuration: Duration
workFuture: Future[void]
pendingRequests: seq[BucketWaiter]
manuallyReplenished: AsyncEvent
replenishMode: ReplenishMode

proc update(bucket: TokenBucket, currentTime: Moment) =
func fullPeriodElapsed(bucket: TokenBucket, currentTime: Moment): bool =
return currentTime - bucket.lastUpdate >= bucket.fillDuration

proc calcUpdateStrict(bucket: TokenBucket, currentTime: Moment): tuple[budget: int, lastUpdate: Moment] =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
# with zero fillDuration we only allow manual replenish till capacity
return (min(bucket.budgetCapacity, bucket.budget), bucket.lastUpdate)

if not fullPeriodElapsed(bucket, currentTime):
return (bucket.budget, bucket.lastUpdate)

if currentTime < bucket.lastUpdate:
return
return (bucket.budgetCapacity, currentTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will introduce a drift since the current time likely will not land on a fillDuration boundary .. the net result of the drift will be a lower average throughput, even if updates happen on a regular bases (ie at least once per fill duration).

What you're looking to return here is max(bucket.lastUpdate + bucket.fillDuration, currentTime - fillDuration) or something like that to ensure that all tokens are handed out correctly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point, but with given this calculation is for Discrete mode I think we need a calculation of ditinct replenist periods elapsed since lastUpdate, because the replanish might not happen that often. That we we can be precise with fillDuration boundaries. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

precise with fillDuration boundaries

This is only meaningful if the initial "update tick" is set to a specific value. However, the downside of this approach is that you again reduce the effective rate - another way to describe this approach is that you're rounding the last update up to the next fillDuration boundary, and this rounding is effectively "time that no tokens are created".


let
timeDelta = currentTime - bucket.lastUpdate
fillPercent = timeDelta.milliseconds.float / bucket.fillDuration.milliseconds.float
replenished =
int(bucket.budgetCap.float * fillPercent)
deltaFromReplenished =
int(bucket.fillDuration.milliseconds.float *
replenished.float / bucket.budgetCap.float)
proc calcUpdateBalanced(bucket: TokenBucket, currentTime: Moment): tuple[budget: int, lastUpdate: Moment] =
if bucket.fillDuration == default(Duration):
# with zero fillDuration we only allow manual replenish till capacity
return (min(bucket.budgetCapacity, bucket.budget), bucket.lastUpdate)

if currentTime <= bucket.lastUpdate:
# don't allow backward timing
return (bucket.budget, bucket.lastUpdate)

let timeDelta = currentTime - bucket.lastUpdate
let capacity = bucket.budgetCapacity
let periodNs = bucket.fillDuration.nanoseconds.int64
let deltaNs = timeDelta.nanoseconds.int64

# How many whole tokens could be produced by the elapsed time.
let possibleTokens = int((deltaNs * capacity.int64) div periodNs)
if possibleTokens <= 0:
return (bucket.budget, bucket.lastUpdate)

let budgetLeft = capacity - bucket.budget
if budgetLeft <= 0:
# Bucket already full the entire elapsed time: burn the elapsed time
# so we do not accumulate implicit credit and do not allow over budgeting
return (capacity, currentTime)

let toAdd = min(possibleTokens, budgetLeft)

# Advance lastUpdate only by the fraction of time actually “spent” to mint toAdd tokens.
# (toAdd / capacity) * period = time used
let usedNs = (periodNs * toAdd.int64) div capacity.int64
let newbudget = bucket.budget + toAdd
var newLastUpdate = bucket.lastUpdate + nanoseconds(usedNs)
if toAdd == budgetLeft and possibleTokens > budgetLeft:
# We hit the capacity; discard leftover elapsed time to prevent multi-call burst inflation
newLastUpdate = currentTime

return (newbudget, newLastUpdate)

proc calcUpdate(bucket: TokenBucket, currentTime: Moment): tuple[budget: int, lastUpdate: Moment] =
if bucket.replenishMode == ReplenishMode.Strict:
return bucket.calcUpdateStrict(currentTime)
else:
return bucket.calcUpdateBalanced(currentTime)

bucket.lastUpdate += milliseconds(deltaFromReplenished)
bucket.budget = min(bucket.budgetCap, bucket.budget + replenished)
proc update(bucket: TokenBucket, currentTime: Moment) =
let (newBudget, newLastUpdate) = bucket.calcUpdate(currentTime)
bucket.budget = newBudget
bucket.lastUpdate = newLastUpdate

proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
## If `tokens` are available, consume them,
## Otherwhise, return false.
## Otherwise, return false.

if bucket.budget >= tokens:
# If bucket is full, consider this point as period start, drop silent periods before
if bucket.budget == bucket.budgetCapacity:
bucket.lastUpdate = now
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this change the behavior?

bucket.budget -= tokens
return true

bucket.update(now)

if bucket.budget >= tokens:
bucket.budget -= tokens
true
return true
else:
false
return false

proc worker(bucket: TokenBucket) {.async.} =
while bucket.pendingRequests.len > 0:
Expand All @@ -80,8 +134,8 @@ proc worker(bucket: TokenBucket) {.async.} =
let eventWaiter = bucket.manuallyReplenished.wait()
if bucket.fillDuration.milliseconds > 0:
let
nextCycleValue = float(min(waiter.value, bucket.budgetCap))
budgetRatio = nextCycleValue.float / bucket.budgetCap.float
nextCycleValue = float(min(waiter.value, bucket.budgetCapacity))
budgetRatio = nextCycleValue.float / bucket.budgetCapacity.float
timeToTarget = int(budgetRatio * bucket.fillDuration.milliseconds.float) + 1
#TODO this will create a timer for each blocked bucket,
#which may cause performance issue when creating many
Expand Down Expand Up @@ -127,16 +181,33 @@ proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
bucket.update(now)
bucket.manuallyReplenished.fire()

proc getAvailableCapacity*(
bucket: TokenBucket, currentTime: Moment = Moment.now()
): tuple[budget: int, budgetCapacity: int, lastUpdate: Moment] =
let (assumedBudget, assumedLastUpdate) = bucket.calcUpdate(currentTime)
return (assumedBudget, bucket.budgetCapacity, assumedLastUpdate)

proc new*(
T: type[TokenBucket],
budgetCap: int,
fillDuration: Duration = 1.seconds): T =
budgetCapacity: int,
fillDuration: Duration = 1.seconds,
replenishMode: ReplenishMode = ReplenishMode.Balanced): T =

## Create a TokenBucket
T(
budget: budgetCap,
budgetCap: budgetCap,
budget: budgetCapacity,
budgetCapacity: budgetCapacity,
fillDuration: fillDuration,
lastUpdate: Moment.now(),
manuallyReplenished: newAsyncEvent()
manuallyReplenished: newAsyncEvent(),
replenishMode: replenishMode
)

proc setState*(bucket: TokenBucket, budget: int, lastUpdate: Moment) =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function would have to take into account waiting requests for budget - basically, setting the values here is a special version of a manual replenish and needs to be treated as such to maintain internal consistency. This begs the question what the use case is that isn't covered by a manual replenish?

bucket.budget = budget
bucket.lastUpdate = lastUpdate

func `$`*(b: TokenBucket): string {.inline.} =
if isNil(b):
return "nil"
return $b.budgetCapacity & "/" & $b.fillDuration
86 changes: 76 additions & 10 deletions tests/testratelimit.nim
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,80 @@ suite "Token Bucket":
check bucket.tryConsume(1, fakeNow) == true

test "Short replenish":
skip()
# TODO (cheatfate): This test was disabled, because it continuosly fails in
# Github Actions Windows x64 CI when using Nim 1.6.14 version.
# Unable to reproduce failure locally.

# var bucket = TokenBucket.new(15000, 1.milliseconds)
# let start = Moment.now()
# check bucket.tryConsume(15000, start)
# check bucket.tryConsume(1, start) == false
var bucket = TokenBucket.new(15000, 1.milliseconds)
let start = Moment.now()
check bucket.tryConsume(15000, start)
check bucket.tryConsume(1, start) == false

# check bucket.tryConsume(15000, start + 1.milliseconds) == true
check bucket.tryConsume(15000, start + 1.milliseconds) == true

# Edge-case: ensure only one refill can occur for the same timestamp even if
# multiple tryConsume calls are made that would otherwise appear to have large
# elapsed time credit. This prevents multi-call burst inflation at a single time.
test "No double refill at same timestamp":
var bucket = TokenBucket.new(10, 100.milliseconds)
let t0 = Moment.now()
# Consume from full so lastUpdate is stamped at t0
check bucket.tryConsume(5, t0) == true # budget now 5
# Long idle period (simulate large elapsed time)
let idle = t0 + 5.seconds
# First large request triggers an update + refill limited by space (5)
check bucket.tryConsume(6, idle) == true # budget after = 4 (5 minted -> 10 then -6)
# Second request at the SAME timestamp cannot refill again
check bucket.tryConsume(5, idle) == false
# Prove only 4 remain: consuming 4 succeeds, then 1 more fails at same timestamp
check bucket.tryConsume(4, idle) == true
check bucket.tryConsume(1, idle) == false

# Edge-case fairness: partial usage should only mint up to available space, not
# more than cap, and leftover elapsed time is burned once cap is reached.
test "Refill limited by available space":
var bucket = TokenBucket.new(10, 100.milliseconds)
let t0 = Moment.now()
# Spend a portion (from full) -> lastUpdate = t0, budget 4
check bucket.tryConsume(6, t0) == true
# Mid-period small consume without triggering update (still before refill point)
let mid = t0 + 50.milliseconds
check bucket.tryConsume(1, mid) == true # budget 3
# At the 100ms boundary request more than remaining budget to force update
let boundary = t0 + 100.milliseconds
# Space is 7; even though 100ms elapsed corresponds to 10 possible tokens,
# only 7 are minted and leftover elapsed time credit is discarded.
check bucket.tryConsume(6, boundary) == true # leaves 4
# A second consume at identical boundary timestamp cannot mint more than residual
check bucket.tryConsume(5, boundary) == false
# After another 40ms, at most floor(40/100 * 10)=4 tokens accrue; request 4 succeeds
let late = boundary + 40.milliseconds
check bucket.tryConsume(4, late) == true # should deplete
# A subsequent call at the same timestamp may mint remaining fractional time credit (fair catch-up)
# so a small consume can still succeed.
check bucket.tryConsume(1, late) == true

test "Strict replenish mode does not refill before period elapsed":
var bucket = TokenBucket.new(10, 100.milliseconds, ReplenishMode.Strict)
let t0 = Moment.now()
# Spend a portion (from full) -> lastUpdate = t0, budget 10
check bucket.tryConsume(9, t0) == true # leaves 1

var cap = bucket.getAvailableCapacity(t0)
check cap.budget == 1
check cap.lastUpdate == t0
check cap.budgetCapacity == 10

let mid = t0 + 50.milliseconds

cap = bucket.getAvailableCapacity(mid)
check cap.budget == 1
check cap.lastUpdate == t0
check cap.budgetCapacity == 10

check bucket.tryConsume(2, mid) == false # no update before period boundary passed, budget 1

let boundary = t0 + 100.milliseconds

cap = bucket.getAvailableCapacity(boundary)
check cap.budget == 10
check cap.lastUpdate == boundary
check cap.budgetCapacity == 10

check bucket.tryConsume(2, boundary) == true # ok, we passed the period boundary now, leaves 8
Loading