Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,12 @@ const EnvironmentSchema = z
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100),
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3),
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50),
// Number of master queue shards for horizontal scaling
BATCH_QUEUE_SHARD_COUNT: z.coerce.number().int().default(1),
// Maximum queues to fetch from master queue per iteration
BATCH_QUEUE_MASTER_QUEUE_LIMIT: z.coerce.number().int().default(1000),
// Worker queue blocking timeout in seconds (for two-stage processing)
BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
// Global rate limit: max items processed per second across all consumers
// If not set, no global rate limiting is applied
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ function createRunEngine() {
drr: {
quantum: env.BATCH_QUEUE_DRR_QUANTUM,
maxDeficit: env.BATCH_QUEUE_MAX_DEFICIT,
masterQueueLimit: env.BATCH_QUEUE_MASTER_QUEUE_LIMIT,
},
shardCount: env.BATCH_QUEUE_SHARD_COUNT,
workerQueueBlockingTimeoutSeconds: env.BATCH_QUEUE_WORKER_QUEUE_TIMEOUT_SECONDS,
consumerCount: env.BATCH_QUEUE_CONSUMER_COUNT,
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
// Default processing concurrency when no specific limit is set
Expand Down
11 changes: 10 additions & 1 deletion internal-packages/run-engine/src/batch-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export class BatchQueue {
keys: keyProducer,
quantum: options.drr.quantum,
maxDeficit: options.drr.maxDeficit,
masterQueueLimit: options.drr.masterQueueLimit,
logger: {
debug: (msg, ctx) => this.logger.debug(msg, ctx),
error: (msg, ctx) => this.logger.error(msg, ctx),
Expand All @@ -121,7 +122,7 @@ export class BatchQueue {
scheduler,
payloadSchema: BatchItemPayloadSchema,
validateOnEnqueue: false, // We control the payload
shardCount: 1, // Batches don't need sharding
shardCount: options.shardCount ?? 1,
consumerCount: options.consumerCount,
consumerIntervalMs: options.consumerIntervalMs,
visibilityTimeoutMs: 60_000, // 1 minute for batch item processing
Expand All @@ -131,6 +132,14 @@ export class BatchQueue {
threshold: 5,
periodMs: 5_000,
},
// Enable two-stage processing with worker queues for better parallelism (when configured)
// Worker queues provide better concurrency by separating queue selection from message processing
workerQueue: options.workerQueueBlockingTimeoutSeconds
? {
enabled: true,
blockingTimeoutSeconds: options.workerQueueBlockingTimeoutSeconds,
}
: undefined,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check BatchQueueOptions type definition
echo "=== BatchQueueOptions type definition ==="
if [ -f "internal-packages/run-engine/src/batch-queue/types.ts" ]; then
  cat -n internal-packages/run-engine/src/batch-queue/types.ts | head -100
else
  echo "File not found, searching for it..."
  fd "types.ts" internal-packages/run-engine/src/batch-queue/
fi

Repository: triggerdotdev/trigger.dev

Length of output: 4666


🏁 Script executed:

#!/bin/bash
# Search for workerQueueBlockingTimeoutSeconds usage and documentation
echo "=== Searching for workerQueueBlockingTimeoutSeconds ==="
rg -n "workerQueueBlockingTimeoutSeconds" --type ts -B 3 -A 3

echo -e "\n=== Searching for blockingTimeoutSeconds in type definitions ==="
rg -n "blockingTimeoutSeconds" --type ts -B 3 -A 3

Repository: triggerdotdev/trigger.dev

Length of output: 8707


🏁 Script executed:

#!/bin/bash
# Check the actual file to see context around lines 135-142
echo "=== Context around lines 135-142 in index.ts ==="
sed -n '125,150p' internal-packages/run-engine/src/batch-queue/index.ts | cat -n

Repository: triggerdotdev/trigger.dev

Length of output: 1497


Use explicit undefined check to allow 0 as a valid timeout value.

The truthiness check on options.workerQueueBlockingTimeoutSeconds prevents 0 from being used as a valid timeout value. In timeout contexts (Redis BLPOP), 0 is valid and typically means block indefinitely. If the timeout is set via environment variable to "0", it will be coerced to the number 0 and incorrectly disable the worker queue.

Change the condition to:

-      workerQueue: options.workerQueueBlockingTimeoutSeconds
+      workerQueue: options.workerQueueBlockingTimeoutSeconds !== undefined
         ? {
             enabled: true,
             blockingTimeoutSeconds: options.workerQueueBlockingTimeoutSeconds,
           }
         : undefined,
🤖 Prompt for AI Agents
In @internal-packages/run-engine/src/batch-queue/index.ts around lines 135 -
142, The current truthiness check on options.workerQueueBlockingTimeoutSeconds
disables workerQueue when the value is 0; change the condition to explicitly
check for undefined (e.g., options.workerQueueBlockingTimeoutSeconds !==
undefined) so 0 is treated as a valid blockingTimeoutSeconds; update the object
creation for workerQueue (referencing workerQueue,
options.workerQueueBlockingTimeoutSeconds, and blockingTimeoutSeconds) to use
this explicit undefined check.

// Concurrency group based on tenant (environment)
// This limits how many batch items can be processed concurrently per environment
// Items wait in queue until capacity frees up
Expand Down
6 changes: 6 additions & 0 deletions internal-packages/run-engine/src/batch-queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ export type DRRConfig = {
quantum: number;
/** Maximum accumulated deficit (prevents starvation) */
maxDeficit: number;
/** Maximum queues to fetch from master queue (default: 1000) */
masterQueueLimit?: number;
};

// ============================================================================
Expand Down Expand Up @@ -196,6 +198,10 @@ export type BatchQueueOptions = {
consumerCount: number;
/** Interval between consumer iterations (ms) */
consumerIntervalMs: number;
/** Number of master queue shards (default: 1) */
shardCount?: number;
/** Worker queue blocking timeout in seconds (enables two-stage processing) */
workerQueueBlockingTimeoutSeconds?: number;
/** Whether to start consumers on initialization */
startConsumers?: boolean;
/**
Expand Down
3 changes: 3 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,10 @@ export class RunEngine {
drr: {
quantum: options.batchQueue?.drr?.quantum ?? 5,
maxDeficit: options.batchQueue?.drr?.maxDeficit ?? 50,
masterQueueLimit: options.batchQueue?.drr?.masterQueueLimit,
},
shardCount: options.batchQueue?.shardCount,
workerQueueBlockingTimeoutSeconds: options.batchQueue?.workerQueueBlockingTimeoutSeconds,
consumerCount: options.batchQueue?.consumerCount ?? 2,
consumerIntervalMs: options.batchQueue?.consumerIntervalMs ?? 100,
defaultConcurrency: options.batchQueue?.defaultConcurrency ?? 10,
Expand Down
4 changes: 4 additions & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ export type RunEngineOptions = {
batchQueue?: {
redis: RedisOptions;
drr?: Partial<DRRConfig>;
/** Number of master queue shards (default: 1) */
shardCount?: number;
/** Worker queue blocking timeout in seconds (enables two-stage processing) */
workerQueueBlockingTimeoutSeconds?: number;
consumerCount?: number;
consumerIntervalMs?: number;
/** Default processing concurrency per environment when no specific limit is set */
Expand Down
39 changes: 39 additions & 0 deletions packages/redis-worker/src/fair-queue/concurrency.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,45 @@ export class ConcurrencyManager {
return await this.redis.scard(key);
}

/**
* Get available capacity for a queue across all concurrency groups.
* Returns the minimum available capacity across all groups.
*/
async getAvailableCapacity(queue: QueueDescriptor): Promise<number> {
if (this.groups.length === 0) {
return 0;
}

// Build group data for parallel fetching
const groupData = this.groups.map((group) => ({
group,
groupId: group.extractGroupId(queue),
}));

// Fetch all current counts and limits in parallel
const [currents, limits] = await Promise.all([
Promise.all(
groupData.map(({ group, groupId }) =>
this.redis.scard(this.keys.concurrencyKey(group.name, groupId))
)
),
Promise.all(
groupData.map(({ group, groupId }) =>
group.getLimit(groupId).then((limit) => limit || group.defaultLimit)
)
),
]);

// Calculate minimum available capacity across all groups
let minCapacity = Infinity;
for (let i = 0; i < groupData.length; i++) {
const available = Math.max(0, limits[i]! - currents[i]!);
minCapacity = Math.min(minCapacity, available);
}

return minCapacity === Infinity ? 0 : minCapacity;
}

/**
* Get concurrency limit for a specific group.
*/
Expand Down
Loading
Loading