1- import pLimit , { type LimitFunction } from 'p-limit'
21import { CancelReason } from './CancelReason'
2+ import { PromiseWithResolvers } from './PromiseWithResolvers'
33import { TaskRecord , type TaskOptions } from './TaskRecord'
44
55export interface QueueOptions {
@@ -29,27 +29,44 @@ export class TaskQueue {
2929 private readonly burstLimit : number
3030 private readonly sustainRate : number
3131 private readonly queueCostLimit : number
32- private readonly concurrencyLimiter : LimitFunction
33- private readonly boundRunTasks = this . runTasks . bind ( this )
32+ private readonly concurrencyLimit : number
3433 private tokenCount : number
3534
35+ private runningTasks = 0
3636 private pendingTaskRecords : TaskRecord < unknown > [ ] = [ ]
37- private timeout : number | null = null
3837 private lastRefillTime : number = Date . now ( )
38+ private onTaskAdded = PromiseWithResolvers < void > ( ) . resolve // start with a no-op of correct type
39+ private onTaskFinished = PromiseWithResolvers < void > ( ) . resolve // start with a no-op of correct type
3940
4041 constructor ( options : QueueOptions ) {
4142 this . burstLimit = options . burstLimit
4243 this . sustainRate = options . sustainRate
4344 this . tokenCount = options . startingTokens ?? options . burstLimit
4445 this . queueCostLimit = options . queueCostLimit ?? Infinity
45- this . concurrencyLimiter = pLimit ( options . concurrency ?? 1 )
46+ this . concurrencyLimit = options . concurrency ?? 1
47+ void this . runTasks ( )
4648 }
4749
4850 /** @returns The number of tasks currently in the queue */
4951 get length ( ) : number {
5052 return this . pendingTaskRecords . length
5153 }
5254
55+ /**
56+ * @returns The current configuration options of the queue. Used primarily for testing and inspection.
57+ * Note that the `startingTokens` value returned here reflects the current token count, which is only guaranteed to
58+ * match the originally configured starting tokens value if no time has passed and no tasks have been processed.
59+ */
60+ get options ( ) : Readonly < QueueOptions > {
61+ return {
62+ burstLimit : this . burstLimit ,
63+ sustainRate : this . sustainRate ,
64+ startingTokens : this . tokenCount ,
65+ queueCostLimit : this . queueCostLimit ,
66+ concurrency : this . concurrencyLimit ,
67+ }
68+ }
69+
5370 /**
5471 * Adds a task to the queue. The task will first wait until enough tokens are available, then will wait its turn in
5572 * the concurrency queue.
@@ -77,10 +94,7 @@ export class TaskQueue {
7794 this . cancel ( taskRecord . promise , new Error ( CancelReason . Aborted ) )
7895 } )
7996
80- // If the queue was empty, we need to prime the pump
81- if ( this . pendingTaskRecords . length === 1 ) {
82- void this . runTasks ( )
83- }
97+ this . onTaskAdded ( )
8498
8599 return taskRecord . promise
86100 }
@@ -96,9 +110,6 @@ export class TaskQueue {
96110 if ( taskIndex !== - 1 ) {
97111 const [ taskRecord ] = this . pendingTaskRecords . splice ( taskIndex , 1 )
98112 taskRecord . cancel ( reason ?? new Error ( CancelReason . Cancel ) )
99- if ( taskIndex === 0 && this . pendingTaskRecords . length > 0 ) {
100- void this . runTasks ( )
101- }
102113 return true
103114 }
104115 return false
@@ -110,10 +121,6 @@ export class TaskQueue {
110121 * @returns The number of tasks that were cancelled.
111122 */
112123 cancelAll ( reason ?: Error ) : number {
113- if ( this . timeout !== null ) {
114- clearTimeout ( this . timeout )
115- this . timeout = null
116- }
117124 const oldTasks = this . pendingTaskRecords
118125 this . pendingTaskRecords = [ ]
119126 reason = reason ?? new Error ( CancelReason . Cancel )
@@ -164,17 +171,15 @@ export class TaskQueue {
164171 /**
165172 * Run tasks from the queue as tokens become available.
166173 */
167- private runTasks ( ) : void {
168- if ( this . timeout !== null ) {
169- clearTimeout ( this . timeout )
170- this . timeout = null
171- }
172-
174+ private async runTasks ( ) : Promise < void > {
173175 for ( ; ; ) {
174176 const nextRecord = this . pendingTaskRecords . shift ( )
175177 if ( ! nextRecord ) {
176178 // No more tasks to run
177- return
179+ const { promise, resolve } = PromiseWithResolvers < void > ( )
180+ this . onTaskAdded = resolve
181+ await promise // wait until a task is added
182+ continue // then try again
178183 }
179184
180185 if ( nextRecord . cost > this . burstLimit ) {
@@ -185,16 +190,34 @@ export class TaskQueue {
185190
186191 // Refill before each task in case the time it took for the last task to run was enough to afford the next.
187192 if ( this . refillAndSpend ( nextRecord . cost ) ) {
188- // Run the task within the concurrency limiter
189- void this . concurrencyLimiter ( nextRecord . run )
193+ if ( this . runningTasks >= this . concurrencyLimit ) {
194+ const { promise, resolve } = PromiseWithResolvers < void > ( )
195+ this . onTaskFinished = resolve
196+ await promise // wait until a task finishes
197+ // then we know there's room for at least one more task
198+ }
199+ void this . runTask ( nextRecord )
190200 } else {
191201 // We can't currently afford this task. Put it back and wait until we can, then try again.
192202 this . pendingTaskRecords . unshift ( nextRecord )
193203 const tokensNeeded = Math . max ( nextRecord . cost - this . tokenCount , 0 )
194204 const estimatedWait = Math . ceil ( ( 1000 * tokensNeeded ) / this . sustainRate )
195- this . timeout = setTimeout ( this . boundRunTasks , estimatedWait )
196- return
205+ await new Promise ( resolve => setTimeout ( resolve , estimatedWait ) )
197206 }
198207 }
199208 }
209+
210+ /**
211+ * Run a task record right now, managing the running tasks count.
212+ * @param taskRecord The task that should run.
213+ */
214+ private async runTask ( taskRecord : TaskRecord < unknown > ) : Promise < void > {
215+ this . runningTasks ++
216+ try {
217+ await taskRecord . run ( )
218+ } finally {
219+ this . runningTasks --
220+ this . onTaskFinished ( )
221+ }
222+ }
200223}
0 commit comments