-
Notifications
You must be signed in to change notification settings - Fork 53
Token bucket unification #582
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
edefa13
d63a1bc
eec4f03
a7f008f
1c85269
451ecd6
7f65929
b9cf3d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,45 +14,101 @@ import timer | |
| export timer | ||
|
|
||
| type | ||
| ReplenishMode* = enum | ||
| Continuous | ||
| # Tokens are continuously replenished at a rate of `capacity / fillDuration`, up to the configured capacity | ||
| Discrete | ||
| # Up to `capacity` tokens are replenished once every `fillDuration`, in discrete steps, such that at the beginning of every `fillDuration` period, there are `capacity` tokens available | ||
|
|
||
| BucketWaiter = object | ||
| future: Future[void] | ||
| value: int | ||
| alreadyConsumed: int | ||
|
|
||
| TokenBucket* = ref object | ||
| budget: int | ||
| budgetCap: int | ||
| capacity: int | ||
| lastUpdate: Moment | ||
| fillDuration: Duration | ||
| workFuture: Future[void] | ||
| pendingRequests: seq[BucketWaiter] | ||
| manuallyReplenished: AsyncEvent | ||
| replenishMode: ReplenishMode | ||
|
|
||
| proc update(bucket: TokenBucket, currentTime: Moment) = | ||
| func periodDistance(bucket: TokenBucket, currentTime: Moment): float = | ||
| if currentTime <= bucket.lastUpdate or bucket.fillDuration == default(Duration): | ||
| return 0.0 | ||
|
|
||
| nanoseconds(currentTime - bucket.lastUpdate).float / nanoseconds(bucket.fillDuration).float | ||
|
|
||
| proc calcUpdateDiscrete(bucket: TokenBucket, currentTime: Moment): tuple[budget: int, lastUpdate: Moment] = | ||
| if bucket.fillDuration == default(Duration): | ||
| bucket.budget = min(bucket.budgetCap, bucket.budget) | ||
| return | ||
| # with zero fillDuration we only allow manual replenish till capacity | ||
| return (min(bucket.capacity, bucket.budget), bucket.lastUpdate) | ||
|
|
||
| if currentTime < bucket.lastUpdate: | ||
| return | ||
| let distance = periodDistance(bucket, currentTime) | ||
| if distance < 1.0: | ||
| return (bucket.budget, bucket.lastUpdate) | ||
|
|
||
| let | ||
| timeDelta = currentTime - bucket.lastUpdate | ||
| fillPercent = timeDelta.milliseconds.float / bucket.fillDuration.milliseconds.float | ||
| replenished = | ||
| int(bucket.budgetCap.float * fillPercent) | ||
| deltaFromReplenished = | ||
| int(bucket.fillDuration.milliseconds.float * | ||
| replenished.float / bucket.budgetCap.float) | ||
| (bucket.capacity, bucket.lastUpdate + (bucket.fillDuration * int(distance))) | ||
|
|
||
| bucket.lastUpdate += milliseconds(deltaFromReplenished) | ||
| bucket.budget = min(bucket.budgetCap, bucket.budget + replenished) | ||
| proc calcUpdateContinuous(bucket: TokenBucket, currentTime: Moment): tuple[budget: int, lastUpdate: Moment] = | ||
| if bucket.fillDuration == default(Duration): | ||
NagyZoltanPeter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # with zero fillDuration we only allow manual replenish till capacity | ||
| return (min(bucket.capacity, bucket.budget), bucket.lastUpdate) | ||
|
|
||
| if currentTime <= bucket.lastUpdate: | ||
| # don't allow backward timing | ||
| return (bucket.budget, bucket.lastUpdate) | ||
|
|
||
| let timeDelta = currentTime - bucket.lastUpdate | ||
| let capacity = bucket.capacity | ||
| let periodNs = bucket.fillDuration.nanoseconds.int64 | ||
| let deltaNs = timeDelta.nanoseconds.int64 | ||
|
|
||
| # How many whole tokens could be produced by the elapsed time. | ||
| let possibleTokens = int((deltaNs * capacity.int64) div periodNs) | ||
| if possibleTokens <= 0: | ||
| return (bucket.budget, bucket.lastUpdate) | ||
|
|
||
| let budgetLeft = capacity - bucket.budget | ||
| if budgetLeft <= 0: | ||
| # Bucket already full the entire elapsed time: burn the elapsed time | ||
| # so we do not accumulate implicit credit and do not allow over budgeting | ||
| return (capacity, currentTime) | ||
|
|
||
| let toAdd = min(possibleTokens, budgetLeft) | ||
|
|
||
| # Advance lastUpdate only by the fraction of time actually “spent” to mint toAdd tokens. | ||
| # (toAdd / capacity) * period = time used | ||
| let usedNs = (periodNs * toAdd.int64) div capacity.int64 | ||
| let newbudget = bucket.budget + toAdd | ||
| var newLastUpdate = bucket.lastUpdate + nanoseconds(usedNs) | ||
| if toAdd == budgetLeft and possibleTokens > budgetLeft: | ||
| # We hit the capacity; discard leftover elapsed time to prevent multi-call burst inflation | ||
| newLastUpdate = currentTime | ||
|
|
||
| (newbudget, newLastUpdate) | ||
|
|
||
| proc calcUpdate(bucket: TokenBucket, currentTime: Moment): tuple[budget: int, lastUpdate: Moment] = | ||
| if bucket.replenishMode == ReplenishMode.Discrete: | ||
| return bucket.calcUpdateDiscrete(currentTime) | ||
| else: | ||
| return bucket.calcUpdateContinuous(currentTime) | ||
|
|
||
| proc update(bucket: TokenBucket, currentTime: Moment) = | ||
| let (newBudget, newLastUpdate) = bucket.calcUpdate(currentTime) | ||
| bucket.budget = newBudget | ||
| bucket.lastUpdate = newLastUpdate | ||
|
|
||
| proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = | ||
| ## If `tokens` are available, consume them, | ||
| ## Otherwhise, return false. | ||
| ## Otherwise, return false. | ||
|
|
||
| if bucket.budget >= tokens: | ||
| # If bucket is full, consider this point as period start, drop silent periods before | ||
| if bucket.budget == bucket.capacity: | ||
| bucket.lastUpdate = now | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how does this change the behavior? |
||
| bucket.budget -= tokens | ||
| return true | ||
|
|
||
|
|
@@ -80,8 +136,8 @@ proc worker(bucket: TokenBucket) {.async.} = | |
| let eventWaiter = bucket.manuallyReplenished.wait() | ||
| if bucket.fillDuration.milliseconds > 0: | ||
| let | ||
| nextCycleValue = float(min(waiter.value, bucket.budgetCap)) | ||
| budgetRatio = nextCycleValue.float / bucket.budgetCap.float | ||
| nextCycleValue = float(min(waiter.value, bucket.capacity)) | ||
| budgetRatio = nextCycleValue.float / bucket.capacity.float | ||
| timeToTarget = int(budgetRatio * bucket.fillDuration.milliseconds.float) + 1 | ||
| #TODO this will create a timer for each blocked bucket, | ||
| #which may cause performance issue when creating many | ||
|
|
@@ -119,24 +175,41 @@ proc consume*(bucket: TokenBucket, tokens: int, now = Moment.now()): Future[void | |
| if isNil(bucket.workFuture) or bucket.workFuture.finished(): | ||
| bucket.workFuture = worker(bucket) | ||
|
|
||
| return retFuture | ||
| retFuture | ||
|
|
||
| proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) = | ||
| ## Add `tokens` to the budget (capped to the bucket capacity) | ||
| bucket.budget += tokens | ||
| bucket.update(now) | ||
| bucket.manuallyReplenished.fire() | ||
|
|
||
| proc getAvailableCapacity*( | ||
plopezlpz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| bucket: TokenBucket, currentTime: Moment = Moment.now() | ||
| ): tuple[budget: int, capacity: int, lastUpdate: Moment] = | ||
| let (assumedBudget, assumedLastUpdate) = bucket.calcUpdate(currentTime) | ||
| (assumedBudget, bucket.capacity, assumedLastUpdate) | ||
|
|
||
| proc new*( | ||
| T: type[TokenBucket], | ||
| budgetCap: int, | ||
| fillDuration: Duration = 1.seconds): T = | ||
| capacity: int, | ||
| fillDuration: Duration = 1.seconds, | ||
| replenishMode: ReplenishMode = ReplenishMode.Continuous): T = | ||
|
|
||
| ## Create a TokenBucket | ||
| T( | ||
| budget: budgetCap, | ||
| budgetCap: budgetCap, | ||
| budget: capacity, | ||
| capacity: capacity, | ||
| fillDuration: fillDuration, | ||
| lastUpdate: Moment.now(), | ||
| manuallyReplenished: newAsyncEvent() | ||
| manuallyReplenished: newAsyncEvent(), | ||
| replenishMode: replenishMode | ||
| ) | ||
|
|
||
| proc setState*(bucket: TokenBucket, budget: int, lastUpdate: Moment) = | ||
plopezlpz marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this function would have to take into account waiting requests for budget - basically, setting the values here is a special version of a manual replenish and needs to be treated as such to maintain internal consistency. This begs the question what the use case is that isn't covered by a manual replenish? |
||
| bucket.budget = budget | ||
| bucket.lastUpdate = lastUpdate | ||
|
|
||
| func `$`*(b: TokenBucket): string {.inline.} = | ||
| if isNil(b): | ||
| return "nil" | ||
| $b.capacity & "/" & $b.fillDuration | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| # TokenBucket — Usage Modes (Overview) | ||
|
|
||
| TokenBucket provides several usage modes and patterns depending on how you want to rate-limit: | ||
|
|
||
| - Continuous mode (default): | ||
| - Mints tokens proportionally to elapsed time at a constant rate (`capacity / fillDuration`), adding only whole tokens. | ||
| - When the bucket is full for an interval, the elapsed time is burned (no “credit banking”). | ||
| - If an update would overfill, budget is clamped to capacity and leftover elapsed time is discarded; `lastUpdate` is set to the current time. | ||
| - Nanosecond-level accounting for precise behavior. | ||
|
|
||
| - Discrete mode: | ||
| - Replenishes only after a full `fillDuration` has elapsed (step-like refill behavior). | ||
| - Before the period boundary, budget does not increase; after the boundary, budget jumps to capacity. | ||
| - Use when you need hard period boundaries rather than proportional accrual. | ||
|
|
||
| - Manual-only replenish (fillDuration = 0): | ||
| - Disables automatic minting; tokens can only be added via `replenish(tokens)`. | ||
| - Replenish is capped at capacity and wakes pending consumers. | ||
|
|
||
| - Synchronous consumption: `tryConsume(tokens, now)` | ||
| - Attempts to consume immediately; returns `true` on success, `false` otherwise. | ||
| - If consuming from full, `lastUpdate` is set to `now` (prevents idle-at-full credit banking in Continuous mode). | ||
|
|
||
| - Asynchronous consumption: `consume(tokens, now) -> Future[void]` | ||
| - Returns a future that completes when tokens become available (or can be cancelled). | ||
| - Internally, the waiter is woken around the time enough tokens are expected to accrue, or earlier if `replenish()` is called. | ||
|
|
||
| - Capacity and timing introspection: `getAvailableCapacity(now)` | ||
| - Computes the budget as of `now` without mutating bucket state. | ||
|
|
||
| - Manual replenishment: `replenish(tokens, now)` | ||
| - Adds tokens (capped to capacity), updates timing, and wakes waiters. | ||
|
|
||
| The sections below illustrate Continuous semantics with concrete timelines and compare them with the older algorithm for context. | ||
|
|
||
| # Example Scenarios for Continuous Mode | ||
| ## TokenBucket Continuous Mode — Scenario 1 Timeline | ||
|
|
||
| Assumptions: | ||
| - Capacity `C = 10` | ||
| - `fillDuration = 1s` (per-token time: 100ms) | ||
| - Start: `t = 0ms`, `budget = 10`, `lastUpdate = 0ms` | ||
|
|
||
| Legend: | ||
| - Minted tokens: tokens added by Continuous update at that step (TA) | ||
| - Budget after mint: budget after minting, before the consume at that row | ||
| - Budget after consume: budget left after processing the request at that row | ||
| - LU set?: whether `lastUpdate` changes at that step (reason) | ||
|
|
||
| Only request events are listed below (no passive availability checks): | ||
|
|
||
| | Time | Elapsed from LU | Budget (in) | Request tokens | Minted tokens (TA) | Budget after mint | Budget after consume | LU set? | | ||
| |---------|------------------|-------------|----------------|--------------------|-------------------|----------------------|-----------------------------------| | ||
| | 0 ms | n/a | 10 | 7 | 0 | 10 | 3 | yes (consume/full → 0 ms) | | ||
| | 200 ms | 200 ms | 3 | 5 | 2 | 5 | 0 | yes (update → 200 ms) | | ||
| | 650 ms | 450 ms | 0 | 3 | 4 | 4 | 1 | yes (update → 600 ms) | | ||
| | 1200 ms | 600 ms | 1 | 6 | 6 | 7 | 1 | yes (update → 1200 ms) | | ||
| | 1800 ms | 600 ms | 1 | 5 | 6 | 7 | 2 | yes (update → 1800 ms) | | ||
| | 2100 ms | 300 ms | 2 | 10 | 3 | 5 | 5 (insufficient) | yes (update → 2100 ms) | | ||
| | 2600 ms | 500 ms | 5 | 10 | 5 (to cap) | 10 (hit cap) | 0 | yes (update hit cap → 2600 ms); yes (consume/full → 2600 ms) | | ||
|
|
||
| Notes: | ||
| - When an update would overfill the bucket, it is clamped to capacity and `lastUpdate` is set to the current time; leftover elapsed time is discarded. | ||
| - Consuming from a full bucket sets `lastUpdate` to the consume time (prevents idle-at-full credit banking). | ||
|
|
||
| ### Consumption Summary (0–3s window) | ||
|
|
||
| Per `fillDuration` period (1s each): | ||
|
|
||
| | Period | Requests within period | Tokens consumed | | ||
| |----------------|-------------------------------------------------|-----------------| | ||
| | 0–1000 ms | 0ms:7, 200ms:5, 650ms:3 | 15 | | ||
| | 1000–2000 ms | 1200ms:6, 1800ms:5 | 11 | | ||
| | 2000–3000 ms | 2100ms:10 (insufficient), 2600ms:10 (consumed) | 10 | | ||
|
|
||
| Total consumed over 3 seconds: 15 + 11 + 10 = 36 tokens. | ||
|
|
||
| ## High-rate single-token requests (Continuous) | ||
|
|
||
| Settings: | ||
| - Capacity `C = 10`, `fillDuration = 10ms` (per-token time: 1ms) | ||
| - Window to observe: `0–40ms` (4 full periods) | ||
| - Requests are 1 token each; batches occur at specific timestamps. | ||
|
|
||
| We show how the bucket rejects attempts that exceed the available budget at each instant, ensuring no more than `capacity + minted` tokens are usable in any time frame. Over `0–40ms`, at most `10 (initial capacity) + 4 × 10 (mint) = 50` tokens can be consumed. | ||
|
|
||
| Request batches and outcomes: | ||
|
|
||
| | Time | Elapsed from LU | Budget before | Minted (PT→TA) | Budget after mint | Requests (×1) | Accepted | Rejected | Budget after consume | LU after | | ||
| |--------|------------------|---------------|-----------------|-------------------|---------------|----------|----------|----------------------|---------| | ||
| | 0 ms | n/a | 10 | 0 | 10 | 12 | 10 | 2 | 0 | 0 ms | | ||
| | 5 ms | 5 ms | 0 | 5 → 5 | 5 | 7 | 5 | 2 | 0 | 5 ms | | ||
| | 10 ms | 5 ms | 0 | 5 → 5 | 5 | 15 | 5 | 10 | 0 | 10 ms | | ||
| | 12 ms | 2 ms | 0 | 2 → 2 | 2 | 3 | 2 | 1 | 0 | 12 ms | | ||
| | 20 ms | 8 ms | 0 | 8 → 8 | 8 | 25 | 8 | 17 | 0 | 20 ms | | ||
| | 30 ms | 10 ms | 0 | 10 → 10 | 10 | 9 | 9 | 0 | 1 | 30 ms | | ||
| | 31 ms | 1 ms | 1 | 1 → 1 | 2 | 3 | 2 | 1 | 0 | 31 ms | | ||
| | 40 ms | 9 ms | 0 | 9 → 9 | 9 | 20 | 9 | 11 | 0 | 40 ms | | ||
|
|
||
| Totals over 0–40ms: | ||
| - Attempted: 12 + 7 + 15 + 3 + 25 + 9 + 3 + 20 = 94 requests | ||
| - Accepted: 10 + 5 + 5 + 2 + 8 + 9 + 2 + 9 = 50 tokens (matches `10 + 4×10`) | ||
| - Rejected: 94 − 50 = 44 requests | ||
|
|
||
| Why the rejections happen (preventing overuse): | ||
| - At any given instant, you can only consume up to the tokens currently in the bucket. | ||
| - Between instants, tokens mint continuously at `capacity / fillDuration = 1 token/ms`; the table shows how many become available just before each batch. | ||
| - When a batch demands more than available, the excess is rejected (or would be queued with `consume()`), enforcing the rate limit. | ||
| - Over any observation window, the maximum consumable tokens = initial available (up to capacity) + tokens minted during that window; here, that cap is `10 + (40ms × 1/ms) = 50`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a better factoring would be to have an
elapsed(bucket, now)function that returns a Duration, that would be used for both discrete and continuous mode - it would clamp the elapsed time to values >= 0 which is what both modes do.