Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,12 @@ const EnvironmentSchema = z
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100),
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3),
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50),
// Number of master queue shards for horizontal scaling
BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1),
// Maximum queues to fetch from master queue per iteration
BATCH_QUEUE_MASTER_QUEUE_LIMIT: z.coerce.number().int().default(1000),
// Worker queue blocking timeout in seconds (for two-stage processing)
BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
// Global rate limit: max items processed per second across all consumers
// If not set, no global rate limiting is applied
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ function createRunEngine() {
drr: {
quantum: env.BATCH_QUEUE_DRR_QUANTUM,
maxDeficit: env.BATCH_QUEUE_MAX_DEFICIT,
masterQueueLimit: env.BATCH_QUEUE_MASTER_QUEUE_LIMIT,
},
shardCount: env.BATCH_QUEUE_SHARD_COUNT,
workerQueueBlockingTimeoutSeconds: env.BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS,
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
// Default processing concurrency when no specific limit is set
Expand Down
210 changes: 179 additions & 31 deletions internal-packages/run-engine/src/batch-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import {
FairQueue,
DRRScheduler,
CallbackFairQueueKeyProducer,
WorkerQueueManager,
BatchedSpanManager,
type FairQueueOptions,
type StoredMessage,
} from "@trigger.dev/redis-worker";
import { Logger } from "@trigger.dev/core/logger";
import type {
Expand Down Expand Up @@ -48,8 +51,14 @@ export { BatchCompletionTracker } from "./completionTracker.js";
// Redis key for environment concurrency limits
const ENV_CONCURRENCY_KEY_PREFIX = "batch:env_concurrency";

// Single worker queue ID for all batch items
// BatchQueue uses a single shared worker queue - FairQueue handles fair scheduling,
// then all messages are routed to this queue for BatchQueue's own consumer loop.
const BATCH_WORKER_QUEUE_ID = "batch-worker-queue";

export class BatchQueue {
private fairQueue: FairQueue<typeof BatchItemPayloadSchema>;
private workerQueueManager: WorkerQueueManager;
private completionTracker: BatchCompletionTracker;
private logger: Logger;
private tracer?: Tracer;
Expand All @@ -59,6 +68,13 @@ export class BatchQueue {
private processItemCallback?: ProcessBatchItemCallback;
private completionCallback?: BatchCompletionCallback;

// Consumer loop state
private isRunning = false;
private abortController: AbortController;
private workerQueueConsumerLoops: Promise<void>[] = [];
private workerQueueBlockingTimeoutSeconds: number;
private batchedSpanManager: BatchedSpanManager;

// Metrics
private batchesEnqueuedCounter?: Counter;
private itemsEnqueuedCounter?: Counter;
Expand All @@ -72,6 +88,8 @@ export class BatchQueue {
this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info");
this.tracer = options.tracer;
this.defaultConcurrency = options.defaultConcurrency ?? 10;
this.abortController = new AbortController();
this.workerQueueBlockingTimeoutSeconds = options.workerQueueBlockingTimeoutSeconds ?? 10;

// Initialize metrics if meter is provided
if (options.meter) {
Expand Down Expand Up @@ -108,20 +126,23 @@ export class BatchQueue {
keys: keyProducer,
quantum: options.drr.quantum,
maxDeficit: options.drr.maxDeficit,
masterQueueLimit: options.drr.masterQueueLimit,
logger: {
debug: (msg, ctx) => this.logger.debug(msg, ctx),
error: (msg, ctx) => this.logger.error(msg, ctx),
},
});

// Create FairQueue with telemetry and environment-based concurrency limiting
// FairQueue handles fair scheduling and routes messages to the batch worker queue
// BatchQueue runs its own consumer loop to process messages from the worker queue
const fairQueueOptions: FairQueueOptions<typeof BatchItemPayloadSchema> = {
redis: options.redis,
keys: keyProducer,
scheduler,
payloadSchema: BatchItemPayloadSchema,
validateOnEnqueue: false, // We control the payload
shardCount: 1, // Batches don't need sharding
shardCount: options.shardCount ?? 1,
consumerCount: options.consumerCount,
consumerIntervalMs: options.consumerIntervalMs,
visibilityTimeoutMs: 60_000, // 1 minute for batch item processing
Expand All @@ -131,6 +152,11 @@ export class BatchQueue {
threshold: 5,
periodMs: 5_000,
},
// Worker queue configuration - FairQueue routes all messages to our single worker queue
workerQueue: {
// All batch items go to the same worker queue - BatchQueue handles consumption
resolveWorkerQueue: () => BATCH_WORKER_QUEUE_ID,
},
// Concurrency group based on tenant (environment)
// This limits how many batch items can be processed concurrently per environment
// Items wait in queue until capacity frees up
Expand All @@ -157,6 +183,24 @@ export class BatchQueue {

this.fairQueue = new FairQueue(fairQueueOptions);

// Create worker queue manager for consuming from the batch worker queue
this.workerQueueManager = new WorkerQueueManager({
redis: options.redis,
keys: keyProducer,
logger: {
debug: (msg, ctx) => this.logger.debug(msg, ctx),
error: (msg, ctx) => this.logger.error(msg, ctx),
},
});

// Initialize batched span manager for worker queue consumer tracing
this.batchedSpanManager = new BatchedSpanManager({
tracer: options.tracer,
name: "batch-queue-worker",
maxIterations: options.consumerTraceMaxIterations ?? 1000,
timeoutSeconds: options.consumerTraceTimeoutSeconds ?? 60,
});

// Create completion tracker
this.completionTracker = new BatchCompletionTracker({
redis: options.redis,
Expand All @@ -167,11 +211,6 @@ export class BatchQueue {
},
});

// Set up message handler
this.fairQueue.onMessage(async (ctx) => {
await this.#handleMessage(ctx);
});

// Register telemetry gauge callbacks for observable metrics
// Note: observedTenants is not provided since tenant list is dynamic
this.fairQueue.registerTelemetryGauges();
Expand Down Expand Up @@ -410,29 +449,62 @@ export class BatchQueue {

/**
* Start the consumer loops.
* FairQueue runs the master queue consumer loop (claim and push to worker queue).
* BatchQueue runs its own worker queue consumer loops to process messages.
*/
start(): void {
if (this.isRunning) {
return;
}

this.isRunning = true;
this.abortController = new AbortController();

// Start FairQueue's master queue consumers (routes messages to worker queue)
this.fairQueue.start();

// Start worker queue consumer loops
for (let consumerId = 0; consumerId < this.options.consumerCount; consumerId++) {
const loop = this.#runWorkerQueueConsumerLoop(consumerId);
this.workerQueueConsumerLoops.push(loop);
}

this.logger.info("BatchQueue consumers started", {
consumerCount: this.options.consumerCount,
intervalMs: this.options.consumerIntervalMs,
drrQuantum: this.options.drr.quantum,
workerQueueId: BATCH_WORKER_QUEUE_ID,
});
}

/**
* Stop the consumer loops gracefully.
*/
async stop(): Promise<void> {
if (!this.isRunning) {
return;
}

this.isRunning = false;
this.abortController.abort();

// Stop FairQueue's master queue consumers
await this.fairQueue.stop();

// Wait for worker queue consumer loops to finish
await Promise.allSettled(this.workerQueueConsumerLoops);
this.workerQueueConsumerLoops = [];

this.logger.info("BatchQueue consumers stopped");
}

/**
* Close the BatchQueue and all Redis connections.
*/
async close(): Promise<void> {
await this.stop();
await this.fairQueue.close();
await this.workerQueueManager.close();
await this.completionTracker.close();
await this.concurrencyRedis.quit();
}
Expand Down Expand Up @@ -516,56 +588,132 @@ export class BatchQueue {
});
}

// ============================================================================
// Private - Worker Queue Consumer Loop
// ============================================================================

/**
* Run a worker queue consumer loop.
* This pops messages from the batch worker queue and processes them.
*/
async #runWorkerQueueConsumerLoop(consumerId: number): Promise<void> {
const loopId = `batch-worker-${consumerId}`;

// Initialize batched span tracking for this loop
this.batchedSpanManager.initializeLoop(loopId);

try {
while (this.isRunning) {
if (!this.processItemCallback) {
await new Promise((resolve) => setTimeout(resolve, 100));
continue;
}

try {
await this.batchedSpanManager.withBatchedSpan(
loopId,
async (span) => {
span.setAttribute("consumer_id", consumerId);

// Blocking pop from worker queue
const messageKey = await this.workerQueueManager.blockingPop(
BATCH_WORKER_QUEUE_ID,
this.workerQueueBlockingTimeoutSeconds,
this.abortController.signal
);

if (!messageKey) {
this.batchedSpanManager.incrementStat(loopId, "empty_iterations");
return false; // Timeout, no work
}

// Parse message key (format: "messageId:queueId")
const colonIndex = messageKey.indexOf(":");
if (colonIndex === -1) {
this.logger.error("Invalid message key format", { messageKey });
this.batchedSpanManager.incrementStat(loopId, "invalid_message_keys");
return false;
}

const messageId = messageKey.substring(0, colonIndex);
const queueId = messageKey.substring(colonIndex + 1);

await this.#handleMessage(loopId, messageId, queueId);
this.batchedSpanManager.incrementStat(loopId, "messages_processed");
return true; // Had work
},
{
iterationSpanName: "processWorkerQueueMessage",
attributes: { consumer_id: consumerId },
}
);
} catch (error) {
if (this.abortController.signal.aborted) {
break;
}
this.logger.error("Worker queue consumer error", {
loopId,
error: error instanceof Error ? error.message : String(error),
});
this.batchedSpanManager.markForRotation(loopId);
}
}
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
this.logger.debug("Worker queue consumer aborted", { loopId });
this.batchedSpanManager.cleanup(loopId);
return;
}
throw error;
} finally {
this.batchedSpanManager.cleanup(loopId);
}
}

// ============================================================================
// Private - Message Handling
// ============================================================================

async #handleMessage(ctx: {
message: {
id: string;
queueId: string;
payload: BatchItemPayload;
timestamp: number;
attempt: number;
};
queue: { id: string; tenantId: string };
consumerId: string;
heartbeat: () => Promise<boolean>;
complete: () => Promise<void>;
release: () => Promise<void>;
fail: (error?: Error) => Promise<void>;
}): Promise<void> {
const { batchId, friendlyId, itemIndex, item } = ctx.message.payload;
async #handleMessage(consumerId: string, messageId: string, queueId: string): Promise<void> {
// Get message data from FairQueue's in-flight storage
const storedMessage = await this.fairQueue.getMessageData(messageId, queueId);

if (!storedMessage) {
this.logger.error("Message not found in in-flight data", { messageId, queueId });
return;
}

const { batchId, friendlyId, itemIndex, item } = storedMessage.payload;

return this.#startSpan("BatchQueue.handleMessage", async (span) => {
span?.setAttributes({
"batch.id": batchId,
"batch.friendlyId": friendlyId,
"batch.itemIndex": itemIndex,
"batch.task": item.task,
"batch.consumerId": ctx.consumerId,
"batch.attempt": ctx.message.attempt,
"batch.consumerId": consumerId,
"batch.attempt": storedMessage.attempt,
});

// Record queue time metric (time from enqueue to processing)
const queueTimeMs = Date.now() - ctx.message.timestamp;
this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: ctx.queue.tenantId });
const queueTimeMs = Date.now() - storedMessage.timestamp;
this.itemQueueTimeHistogram?.record(queueTimeMs, { envId: storedMessage.tenantId });
span?.setAttribute("batch.queueTimeMs", queueTimeMs);

this.logger.debug("Processing batch item", {
batchId,
friendlyId,
itemIndex,
task: item.task,
consumerId: ctx.consumerId,
attempt: ctx.message.attempt,
consumerId,
attempt: storedMessage.attempt,
queueTimeMs,
});

if (!this.processItemCallback) {
this.logger.error("No process item callback set", { batchId, itemIndex });
// Still complete the message to avoid blocking
await ctx.complete();
await this.fairQueue.completeMessage(messageId, queueId);
return;
}

Expand All @@ -576,7 +724,7 @@ export class BatchQueue {

if (!meta) {
this.logger.error("Batch metadata not found", { batchId, itemIndex });
await ctx.complete();
await this.fairQueue.completeMessage(messageId, queueId);
return;
}

Expand Down Expand Up @@ -712,7 +860,7 @@ export class BatchQueue {
// This must happen after recording success/failure to ensure the counter
// is updated before the message is considered done
await this.#startSpan("BatchQueue.completeMessage", async () => {
return ctx.complete();
return this.fairQueue.completeMessage(messageId, queueId);
});

// Check if all items have been processed using atomic counter
Expand Down
Loading
Loading