-
Notifications
You must be signed in to change notification settings - Fork 53
Token bucket unification #582
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 6 commits
edefa13
d63a1bc
eec4f03
a7f008f
1c85269
451ecd6
7f65929
b9cf3d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,55 +14,109 @@ import timer | |
| export timer | ||
|
|
||
| type | ||
| ReplenishMode* = enum | ||
| # Strict mode allows replenish tokens only after the fill duration has elapsed. | ||
| Strict | ||
|
|
||
| # For better utilization of available tokens and tolerate burst periods. | ||
| # Balanced mode allows minting tokens based on elapsed time in between consuming of tokens up to budget capacity. | ||
| Balanced | ||
|
|
||
| BucketWaiter = object | ||
| future: Future[void] | ||
| value: int | ||
| alreadyConsumed: int | ||
|
|
||
| TokenBucket* = ref object | ||
| budget: int | ||
| budgetCap: int | ||
| budgetCapacity: int | ||
NagyZoltanPeter marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| lastUpdate: Moment | ||
| fillDuration: Duration | ||
| workFuture: Future[void] | ||
| pendingRequests: seq[BucketWaiter] | ||
| manuallyReplenished: AsyncEvent | ||
| replenishMode: ReplenishMode | ||
|
|
||
| proc update(bucket: TokenBucket, currentTime: Moment) = | ||
| func fullPeriodElapsed(bucket: TokenBucket, currentTime: Moment): bool = | ||
| return currentTime - bucket.lastUpdate >= bucket.fillDuration | ||
NagyZoltanPeter marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| proc calcUpdateStrict(bucket: TokenBucket, currentTime: Moment): tuple[budget: int, lastUpdate: Moment] = | ||
| if bucket.fillDuration == default(Duration): | ||
| bucket.budget = min(bucket.budgetCap, bucket.budget) | ||
| return | ||
| # with zero fillDuration we only allow manual replenish till capacity | ||
| return (min(bucket.budgetCapacity, bucket.budget), bucket.lastUpdate) | ||
|
|
||
| if not fullPeriodElapsed(bucket, currentTime): | ||
| return (bucket.budget, bucket.lastUpdate) | ||
|
|
||
| if currentTime < bucket.lastUpdate: | ||
| return | ||
| return (bucket.budgetCapacity, currentTime) | ||
|
||
|
|
||
| let | ||
| timeDelta = currentTime - bucket.lastUpdate | ||
| fillPercent = timeDelta.milliseconds.float / bucket.fillDuration.milliseconds.float | ||
| replenished = | ||
| int(bucket.budgetCap.float * fillPercent) | ||
| deltaFromReplenished = | ||
| int(bucket.fillDuration.milliseconds.float * | ||
| replenished.float / bucket.budgetCap.float) | ||
| proc calcUpdateBalanced(bucket: TokenBucket, currentTime: Moment): tuple[budget: int, lastUpdate: Moment] = | ||
| if bucket.fillDuration == default(Duration): | ||
NagyZoltanPeter marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # with zero fillDuration we only allow manual replenish till capacity | ||
| return (min(bucket.budgetCapacity, bucket.budget), bucket.lastUpdate) | ||
|
|
||
| if currentTime <= bucket.lastUpdate: | ||
| # don't allow backward timing | ||
| return (bucket.budget, bucket.lastUpdate) | ||
|
|
||
| let timeDelta = currentTime - bucket.lastUpdate | ||
| let capacity = bucket.budgetCapacity | ||
| let periodNs = bucket.fillDuration.nanoseconds.int64 | ||
| let deltaNs = timeDelta.nanoseconds.int64 | ||
|
|
||
| # How many whole tokens could be produced by the elapsed time. | ||
| let possibleTokens = int((deltaNs * capacity.int64) div periodNs) | ||
| if possibleTokens <= 0: | ||
| return (bucket.budget, bucket.lastUpdate) | ||
|
|
||
| let budgetLeft = capacity - bucket.budget | ||
| if budgetLeft <= 0: | ||
| # Bucket already full the entire elapsed time: burn the elapsed time | ||
| # so we do not accumulate implicit credit and do not allow over budgeting | ||
| return (capacity, currentTime) | ||
|
|
||
| let toAdd = min(possibleTokens, budgetLeft) | ||
|
|
||
| # Advance lastUpdate only by the fraction of time actually “spent” to mint toAdd tokens. | ||
| # (toAdd / capacity) * period = time used | ||
| let usedNs = (periodNs * toAdd.int64) div capacity.int64 | ||
| let newbudget = bucket.budget + toAdd | ||
| var newLastUpdate = bucket.lastUpdate + nanoseconds(usedNs) | ||
| if toAdd == budgetLeft and possibleTokens > budgetLeft: | ||
| # We hit the capacity; discard leftover elapsed time to prevent multi-call burst inflation | ||
| newLastUpdate = currentTime | ||
|
|
||
| return (newbudget, newLastUpdate) | ||
|
|
||
| proc calcUpdate(bucket: TokenBucket, currentTime: Moment): tuple[budget: int, lastUpdate: Moment] = | ||
| if bucket.replenishMode == ReplenishMode.Strict: | ||
| return bucket.calcUpdateStrict(currentTime) | ||
| else: | ||
| return bucket.calcUpdateBalanced(currentTime) | ||
|
|
||
| bucket.lastUpdate += milliseconds(deltaFromReplenished) | ||
| bucket.budget = min(bucket.budgetCap, bucket.budget + replenished) | ||
| proc update(bucket: TokenBucket, currentTime: Moment) = | ||
| let (newBudget, newLastUpdate) = bucket.calcUpdate(currentTime) | ||
| bucket.budget = newBudget | ||
| bucket.lastUpdate = newLastUpdate | ||
|
|
||
| proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = | ||
| ## If `tokens` are available, consume them, | ||
| ## Otherwhise, return false. | ||
| ## Otherwise, return false. | ||
|
|
||
| if bucket.budget >= tokens: | ||
| # If bucket is full, consider this point as period start, drop silent periods before | ||
| if bucket.budget == bucket.budgetCapacity: | ||
| bucket.lastUpdate = now | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how does this change the behavior? |
||
| bucket.budget -= tokens | ||
| return true | ||
|
|
||
| bucket.update(now) | ||
|
|
||
| if bucket.budget >= tokens: | ||
| bucket.budget -= tokens | ||
| true | ||
| return true | ||
NagyZoltanPeter marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| else: | ||
| false | ||
| return false | ||
|
|
||
| proc worker(bucket: TokenBucket) {.async.} = | ||
| while bucket.pendingRequests.len > 0: | ||
|
|
@@ -80,8 +134,8 @@ proc worker(bucket: TokenBucket) {.async.} = | |
| let eventWaiter = bucket.manuallyReplenished.wait() | ||
| if bucket.fillDuration.milliseconds > 0: | ||
| let | ||
| nextCycleValue = float(min(waiter.value, bucket.budgetCap)) | ||
| budgetRatio = nextCycleValue.float / bucket.budgetCap.float | ||
| nextCycleValue = float(min(waiter.value, bucket.budgetCapacity)) | ||
| budgetRatio = nextCycleValue.float / bucket.budgetCapacity.float | ||
| timeToTarget = int(budgetRatio * bucket.fillDuration.milliseconds.float) + 1 | ||
| #TODO this will create a timer for each blocked bucket, | ||
| #which may cause performance issue when creating many | ||
|
|
@@ -127,16 +181,33 @@ proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) = | |
| bucket.update(now) | ||
| bucket.manuallyReplenished.fire() | ||
|
|
||
| proc getAvailableCapacity*( | ||
plopezlpz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| bucket: TokenBucket, currentTime: Moment = Moment.now() | ||
| ): tuple[budget: int, budgetCapacity: int, lastUpdate: Moment] = | ||
| let (assumedBudget, assumedLastUpdate) = bucket.calcUpdate(currentTime) | ||
| return (assumedBudget, bucket.budgetCapacity, assumedLastUpdate) | ||
|
|
||
| proc new*( | ||
| T: type[TokenBucket], | ||
| budgetCap: int, | ||
| fillDuration: Duration = 1.seconds): T = | ||
| budgetCapacity: int, | ||
| fillDuration: Duration = 1.seconds, | ||
| replenishMode: ReplenishMode = ReplenishMode.Balanced): T = | ||
|
|
||
| ## Create a TokenBucket | ||
| T( | ||
| budget: budgetCap, | ||
| budgetCap: budgetCap, | ||
| budget: budgetCapacity, | ||
| budgetCapacity: budgetCapacity, | ||
| fillDuration: fillDuration, | ||
| lastUpdate: Moment.now(), | ||
| manuallyReplenished: newAsyncEvent() | ||
| manuallyReplenished: newAsyncEvent(), | ||
| replenishMode: replenishMode | ||
| ) | ||
|
|
||
| proc setState*(bucket: TokenBucket, budget: int, lastUpdate: Moment) = | ||
plopezlpz marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this function would have to take into account waiting requests for budget - basically, setting the values here is a special version of a manual replenish and needs to be treated as such to maintain internal consistency. This begs the question what the use case is that isn't covered by a manual replenish? |
||
| bucket.budget = budget | ||
| bucket.lastUpdate = lastUpdate | ||
|
|
||
| func `$`*(b: TokenBucket): string {.inline.} = | ||
| if isNil(b): | ||
| return "nil" | ||
| return $b.budgetCapacity & "/" & $b.fillDuration | ||
Uh oh!
There was an error while loading. Please reload this page.