@@ -11,7 +11,11 @@ import {
1111 FairQueue ,
1212 DRRScheduler ,
1313 CallbackFairQueueKeyProducer ,
14+ WorkerQueueManager ,
15+ BatchedSpanManager ,
16+ isAbortError ,
1417 type FairQueueOptions ,
18+ type StoredMessage ,
1519} from "@trigger.dev/redis-worker" ;
1620import { Logger } from "@trigger.dev/core/logger" ;
1721import type {
@@ -48,8 +52,14 @@ export { BatchCompletionTracker } from "./completionTracker.js";
4852// Redis key for environment concurrency limits
4953const ENV_CONCURRENCY_KEY_PREFIX = "batch:env_concurrency" ;
5054
55+ // Single worker queue ID for all batch items
56+ // BatchQueue uses a single shared worker queue - FairQueue handles fair scheduling,
57+ // then all messages are routed to this queue for BatchQueue's own consumer loop.
58+ const BATCH_WORKER_QUEUE_ID = "batch-worker-queue" ;
59+
5160export class BatchQueue {
5261 private fairQueue : FairQueue < typeof BatchItemPayloadSchema > ;
62+ private workerQueueManager : WorkerQueueManager ;
5363 private completionTracker : BatchCompletionTracker ;
5464 private logger : Logger ;
5565 private tracer ?: Tracer ;
@@ -59,6 +69,13 @@ export class BatchQueue {
5969 private processItemCallback ?: ProcessBatchItemCallback ;
6070 private completionCallback ?: BatchCompletionCallback ;
6171
72+ // Consumer loop state
73+ private isRunning = false ;
74+ private abortController : AbortController ;
75+ private workerQueueConsumerLoops : Promise < void > [ ] = [ ] ;
76+ private workerQueueBlockingTimeoutSeconds : number ;
77+ private batchedSpanManager : BatchedSpanManager ;
78+
6279 // Metrics
6380 private batchesEnqueuedCounter ?: Counter ;
6481 private itemsEnqueuedCounter ?: Counter ;
@@ -72,6 +89,8 @@ export class BatchQueue {
7289 this . logger = options . logger ?? new Logger ( "BatchQueue" , options . logLevel ?? "info" ) ;
7390 this . tracer = options . tracer ;
7491 this . defaultConcurrency = options . defaultConcurrency ?? 10 ;
92+ this . abortController = new AbortController ( ) ;
93+ this . workerQueueBlockingTimeoutSeconds = options . workerQueueBlockingTimeoutSeconds ?? 10 ;
7594
7695 // Initialize metrics if meter is provided
7796 if ( options . meter ) {
@@ -108,20 +127,23 @@ export class BatchQueue {
108127 keys : keyProducer ,
109128 quantum : options . drr . quantum ,
110129 maxDeficit : options . drr . maxDeficit ,
130+ masterQueueLimit : options . drr . masterQueueLimit ,
111131 logger : {
112132 debug : ( msg , ctx ) => this . logger . debug ( msg , ctx ) ,
113133 error : ( msg , ctx ) => this . logger . error ( msg , ctx ) ,
114134 } ,
115135 } ) ;
116136
117137 // Create FairQueue with telemetry and environment-based concurrency limiting
138+ // FairQueue handles fair scheduling and routes messages to the batch worker queue
139+ // BatchQueue runs its own consumer loop to process messages from the worker queue
118140 const fairQueueOptions : FairQueueOptions < typeof BatchItemPayloadSchema > = {
119141 redis : options . redis ,
120142 keys : keyProducer ,
121143 scheduler,
122144 payloadSchema : BatchItemPayloadSchema ,
123145 validateOnEnqueue : false , // We control the payload
124- shardCount : 1 , // Batches don't need sharding
146+ shardCount : options . shardCount ?? 1 ,
125147 consumerCount : options . consumerCount ,
126148 consumerIntervalMs : options . consumerIntervalMs ,
127149 visibilityTimeoutMs : 60_000 , // 1 minute for batch item processing
@@ -131,6 +153,11 @@ export class BatchQueue {
131153 threshold : 5 ,
132154 periodMs : 5_000 ,
133155 } ,
156+ // Worker queue configuration - FairQueue routes all messages to our single worker queue
157+ workerQueue : {
158+ // All batch items go to the same worker queue - BatchQueue handles consumption
159+ resolveWorkerQueue : ( ) => BATCH_WORKER_QUEUE_ID ,
160+ } ,
134161 // Concurrency group based on tenant (environment)
135162 // This limits how many batch items can be processed concurrently per environment
136163 // Items wait in queue until capacity frees up
@@ -157,6 +184,24 @@ export class BatchQueue {
157184
158185 this . fairQueue = new FairQueue ( fairQueueOptions ) ;
159186
187+ // Create worker queue manager for consuming from the batch worker queue
188+ this . workerQueueManager = new WorkerQueueManager ( {
189+ redis : options . redis ,
190+ keys : keyProducer ,
191+ logger : {
192+ debug : ( msg , ctx ) => this . logger . debug ( msg , ctx ) ,
193+ error : ( msg , ctx ) => this . logger . error ( msg , ctx ) ,
194+ } ,
195+ } ) ;
196+
197+ // Initialize batched span manager for worker queue consumer tracing
198+ this . batchedSpanManager = new BatchedSpanManager ( {
199+ tracer : options . tracer ,
200+ name : "batch-queue-worker" ,
201+ maxIterations : options . consumerTraceMaxIterations ?? 1000 ,
202+ timeoutSeconds : options . consumerTraceTimeoutSeconds ?? 60 ,
203+ } ) ;
204+
160205 // Create completion tracker
161206 this . completionTracker = new BatchCompletionTracker ( {
162207 redis : options . redis ,
@@ -167,11 +212,6 @@ export class BatchQueue {
167212 } ,
168213 } ) ;
169214
170- // Set up message handler
171- this . fairQueue . onMessage ( async ( ctx ) => {
172- await this . #handleMessage( ctx ) ;
173- } ) ;
174-
175215 // Register telemetry gauge callbacks for observable metrics
176216 // Note: observedTenants is not provided since tenant list is dynamic
177217 this . fairQueue . registerTelemetryGauges ( ) ;
@@ -410,29 +450,66 @@ export class BatchQueue {
410450
411451 /**
412452 * Start the consumer loops.
453+ * FairQueue runs the master queue consumer loop (claim and push to worker queue).
454+ * BatchQueue runs its own worker queue consumer loops to process messages.
413455 */
414456 start ( ) : void {
457+ if ( this . isRunning ) {
458+ return ;
459+ }
460+
461+ this . isRunning = true ;
462+ this . abortController = new AbortController ( ) ;
463+
464+ // Start FairQueue's master queue consumers (routes messages to worker queue)
415465 this . fairQueue . start ( ) ;
466+
467+ // Start worker queue consumer loops
468+ for ( let consumerId = 0 ; consumerId < this . options . consumerCount ; consumerId ++ ) {
469+ const loop = this . #runWorkerQueueConsumerLoop( consumerId ) ;
470+ this . workerQueueConsumerLoops . push ( loop ) ;
471+ }
472+
416473 this . logger . info ( "BatchQueue consumers started" , {
417474 consumerCount : this . options . consumerCount ,
418475 intervalMs : this . options . consumerIntervalMs ,
419476 drrQuantum : this . options . drr . quantum ,
477+ workerQueueId : BATCH_WORKER_QUEUE_ID ,
420478 } ) ;
421479 }
422480
423481 /**
424482 * Stop the consumer loops gracefully.
425483 */
426484 async stop ( ) : Promise < void > {
485+ if ( ! this . isRunning ) {
486+ return ;
487+ }
488+
489+ this . isRunning = false ;
490+ this . abortController . abort ( ) ;
491+
492+ // Stop FairQueue's master queue consumers
427493 await this . fairQueue . stop ( ) ;
494+
495+ // Wait for worker queue consumer loops to finish
496+ await Promise . allSettled ( this . workerQueueConsumerLoops ) ;
497+ this . workerQueueConsumerLoops = [ ] ;
498+
428499 this . logger . info ( "BatchQueue consumers stopped" ) ;
429500 }
430501
431502 /**
432503 * Close the BatchQueue and all Redis connections.
433504 */
434505 async close ( ) : Promise < void > {
506+ await this . stop ( ) ;
507+
508+ // Clean up any remaining batched spans (safety net for spans not cleaned up by consumer loops)
509+ this . batchedSpanManager . cleanupAll ( ) ;
510+
435511 await this . fairQueue . close ( ) ;
512+ await this . workerQueueManager . close ( ) ;
436513 await this . completionTracker . close ( ) ;
437514 await this . concurrencyRedis . quit ( ) ;
438515 }
@@ -516,56 +593,133 @@ export class BatchQueue {
516593 } ) ;
517594 }
518595
596+ // ============================================================================
597+ // Private - Worker Queue Consumer Loop
598+ // ============================================================================
599+
600+ /**
601+ * Run a worker queue consumer loop.
602+ * This pops messages from the batch worker queue and processes them.
603+ */
604+ async #runWorkerQueueConsumerLoop( consumerId : number ) : Promise < void > {
605+ const loopId = `batch-worker-${ consumerId } ` ;
606+
607+ // Initialize batched span tracking for this loop
608+ this . batchedSpanManager . initializeLoop ( loopId ) ;
609+
610+ try {
611+ while ( this . isRunning ) {
612+ if ( ! this . processItemCallback ) {
613+ await new Promise ( ( resolve ) => setTimeout ( resolve , 100 ) ) ;
614+ continue ;
615+ }
616+
617+ try {
618+ await this . batchedSpanManager . withBatchedSpan (
619+ loopId ,
620+ async ( span ) => {
621+ span . setAttribute ( "consumer_id" , consumerId ) ;
622+
623+ // Blocking pop from worker queue
624+ const messageKey = await this . workerQueueManager . blockingPop (
625+ BATCH_WORKER_QUEUE_ID ,
626+ this . workerQueueBlockingTimeoutSeconds ,
627+ this . abortController . signal
628+ ) ;
629+
630+ if ( ! messageKey ) {
631+ this . batchedSpanManager . incrementStat ( loopId , "empty_iterations" ) ;
632+ return false ; // Timeout, no work
633+ }
634+
635+ // Parse message key (format: "messageId:queueId")
636+ const colonIndex = messageKey . indexOf ( ":" ) ;
637+ if ( colonIndex === - 1 ) {
638+ this . logger . error ( "Invalid message key format" , { messageKey } ) ;
639+ this . batchedSpanManager . incrementStat ( loopId , "invalid_message_keys" ) ;
640+ return false ;
641+ }
642+
643+ const messageId = messageKey . substring ( 0 , colonIndex ) ;
644+ const queueId = messageKey . substring ( colonIndex + 1 ) ;
645+
646+ await this . #handleMessage( loopId , messageId , queueId ) ;
647+ this . batchedSpanManager . incrementStat ( loopId , "messages_processed" ) ;
648+ return true ; // Had work
649+ } ,
650+ {
651+ iterationSpanName : "processWorkerQueueMessage" ,
652+ attributes : { consumer_id : consumerId } ,
653+ }
654+ ) ;
655+ } catch ( error ) {
656+ if ( this . abortController . signal . aborted ) {
657+ break ;
658+ }
659+ this . logger . error ( "Worker queue consumer error" , {
660+ loopId,
661+ error : error instanceof Error ? error . message : String ( error ) ,
662+ } ) ;
663+ this . batchedSpanManager . markForRotation ( loopId ) ;
664+ }
665+ }
666+ } catch ( error ) {
667+ if ( isAbortError ( error ) ) {
668+ this . logger . debug ( "Worker queue consumer aborted" , { loopId } ) ;
669+ this . batchedSpanManager . cleanup ( loopId ) ;
670+ return ;
671+ }
672+ throw error ;
673+ } finally {
674+ this . batchedSpanManager . cleanup ( loopId ) ;
675+ }
676+ }
677+
519678 // ============================================================================
520679 // Private - Message Handling
521680 // ============================================================================
522681
523- async #handleMessage( ctx : {
524- message : {
525- id : string ;
526- queueId : string ;
527- payload : BatchItemPayload ;
528- timestamp : number ;
529- attempt : number ;
530- } ;
531- queue : { id : string ; tenantId : string } ;
532- consumerId : string ;
533- heartbeat : ( ) => Promise < boolean > ;
534- complete : ( ) => Promise < void > ;
535- release : ( ) => Promise < void > ;
536- fail : ( error ?: Error ) => Promise < void > ;
537- } ) : Promise < void > {
538- const { batchId, friendlyId, itemIndex, item } = ctx . message . payload ;
682+ async #handleMessage( consumerId : string , messageId : string , queueId : string ) : Promise < void > {
683+ // Get message data from FairQueue's in-flight storage
684+ const storedMessage = await this . fairQueue . getMessageData ( messageId , queueId ) ;
685+
686+ if ( ! storedMessage ) {
687+ this . logger . error ( "Message not found in in-flight data" , { messageId, queueId } ) ;
688+ await this . fairQueue . completeMessage ( messageId , queueId ) ;
689+ return ;
690+ }
691+
692+ const { batchId, friendlyId, itemIndex, item } = storedMessage . payload ;
539693
540694 return this . #startSpan( "BatchQueue.handleMessage" , async ( span ) => {
541695 span ?. setAttributes ( {
542696 "batch.id" : batchId ,
543697 "batch.friendlyId" : friendlyId ,
544698 "batch.itemIndex" : itemIndex ,
545699 "batch.task" : item . task ,
546- "batch.consumerId" : ctx . consumerId ,
547- "batch.attempt" : ctx . message . attempt ,
700+ "batch.consumerId" : consumerId ,
701+ "batch.attempt" : storedMessage . attempt ,
548702 } ) ;
549703
550704 // Record queue time metric (time from enqueue to processing)
551- const queueTimeMs = Date . now ( ) - ctx . message . timestamp ;
552- this . itemQueueTimeHistogram ?. record ( queueTimeMs , { envId : ctx . queue . tenantId } ) ;
705+ const queueTimeMs = Date . now ( ) - storedMessage . timestamp ;
706+ this . itemQueueTimeHistogram ?. record ( queueTimeMs , { envId : storedMessage . tenantId } ) ;
553707 span ?. setAttribute ( "batch.queueTimeMs" , queueTimeMs ) ;
554708
555709 this . logger . debug ( "Processing batch item" , {
556710 batchId,
557711 friendlyId,
558712 itemIndex,
559713 task : item . task ,
560- consumerId : ctx . consumerId ,
561- attempt : ctx . message . attempt ,
714+ consumerId,
715+ attempt : storedMessage . attempt ,
562716 queueTimeMs,
563717 } ) ;
564718
565719 if ( ! this . processItemCallback ) {
566720 this . logger . error ( "No process item callback set" , { batchId, itemIndex } ) ;
567721 // Still complete the message to avoid blocking
568- await ctx . complete ( ) ;
722+ await this . fairQueue . completeMessage ( messageId , queueId ) ;
569723 return ;
570724 }
571725
@@ -576,7 +730,7 @@ export class BatchQueue {
576730
577731 if ( ! meta ) {
578732 this . logger . error ( "Batch metadata not found" , { batchId, itemIndex } ) ;
579- await ctx . complete ( ) ;
733+ await this . fairQueue . completeMessage ( messageId , queueId ) ;
580734 return ;
581735 }
582736
@@ -712,7 +866,7 @@ export class BatchQueue {
712866 // This must happen after recording success/failure to ensure the counter
713867 // is updated before the message is considered done
714868 await this . #startSpan( "BatchQueue.completeMessage" , async ( ) => {
715- return ctx . complete ( ) ;
869+ return this . fairQueue . completeMessage ( messageId , queueId ) ;
716870 } ) ;
717871
718872 // Check if all items have been processed using atomic counter
0 commit comments