diff --git a/src/README.md b/src/README.md new file mode 100644 index 00000000..8c7e566b --- /dev/null +++ b/src/README.md @@ -0,0 +1,91 @@ +## Environment variables + +### `API_TOKEN` +Optional token to protect the API endpoints (used by [`server.ts`](../server.ts)). + +- Type: string +- Default: unset (no token enforcement, depending on server implementation) +- Behavior: + - If set, clients must provide the expected token (e.g., via header/query — see [`server.ts`](../server.ts)) to access the API. + +### `HOST` +IP address the HTTP server binds to (used by [`server.ts`](../server.ts)). + +- Type: string +- Default: `0.0.0.0` + +### `PORT` +Port the HTTP server listens on (used by [`server.ts`](../server.ts)). + +- Type: integer +- Default: `8001` + +### `HOME` +Home directory, used only as a fallback when `XDG_CACHE_HOME` is unset. + +- Type: string (path) +- Default: platform-dependent + +### `IGNORE_SCRIPT_REGION` +Controls whether the player script region is ignored when caching player scripts (used by [`src/playerCache.ts`](./playerCache.ts)). + +- Type: boolean-like string +- Default: `false` +- Behavior: + - Set to `"true"` (string) to enable. + +### `XDG_CACHE_HOME` +Base directory for caches (used by [`src/playerCache.ts`](./playerCache.ts) for `CACHE_HOME`). + +- Type: string (path) +- Default: `$HOME/.cache` + +### `PREPROCESSED_CACHE_SIZE` +Max size (or entry limit, depending on implementation) for the preprocessed cache (used by [`src/preprocessedCache.ts`](./preprocessedCache.ts)). + +- Type: integer +- Default: implementation-defined if unset (see [`src/preprocessedCache.ts`](./preprocessedCache.ts)) + +### `SOLVER_CACHE_SIZE` +Max size (or entry limit) for the solver cache (used by [`src/solverCache.ts`](./solverCache.ts)). + +- Type: integer +- Default: implementation-defined if unset (see [`src/solverCache.ts`](./solverCache.ts)) + +### `STS_CACHE_SIZE` +Max size (or entry limit) for the STS cache (used by [`src/stsCache.ts`](./stsCache.ts)). + +- Type: integer +- Default: implementation-defined if unset (see [`src/stsCache.ts`](./stsCache.ts)) + +### `TASK_QUEUE_DEQUE_IMPL` +Selects which deque implementation backs the internal task queue (used by the worker pool). + +- Type: string +- Default: `alg` +- Allowed: `alg`, `korkje`, `native` +- Behavior: + - `alg`: uses `jsr:@alg/deque` + - `korkje`: uses `jsr:@korkje/deque` + - `native`: uses a head-indexed Array (with O(N) unshift behavior) + +### `MAX_THREADS` +Controls the maximum number of workers used by the pool (used by [`src/workerPool.ts`](./workerPool.ts)) + +- Type: integer +- Default: `navigator.hardwareConcurrency` (or `1` if unavailable) +- Behavior: + - If set to a valid integer, the pool will create up to that many workers. + - If unset/invalid, the pool falls back to `navigator.hardwareConcurrency`, then `1`. + +### `MESSAGES_LIMIT` +Controls how many tasks (messages) a single worker will process before being retired and replaced (used by [`src/workerPool.ts`](./workerPool.ts)) + +This helps prevent long-lived workers from accumulating memory/garbage collection pressure over time. + +- Type: integer +- Default: `10000` +- Behavior: + - If set to a positive integer, each worker starts with that message budget. + - When a worker reaches `0` remaining messages, it is terminated and replaced. + diff --git a/src/taskQueueDeque.ts b/src/taskQueueDeque.ts new file mode 100644 index 00000000..d1239151 --- /dev/null +++ b/src/taskQueueDeque.ts @@ -0,0 +1,286 @@ +import type { TaskQueue } from "./types.ts"; + +import { Deque as AlgDeque } from "jsr:@alg/deque"; +import { Deque as KorkjeDeque } from "jsr:@korkje/deque"; + +type DequeImpl = "alg" | "korkje" | "native"; + +function getDequeImpl(): DequeImpl { + const v = (Deno.env.get("TASK_QUEUE_DEQUE_IMPL") || "").trim().toLowerCase(); + if (v === "native") return "native"; + if (v === "korkje") return "korkje"; + return "alg"; +} + +export function createTaskQueue(): TaskQueue { + const impl = getDequeImpl(); + + if ("native" === impl) return new ArrayTaskQueue(); + if ("korkje" === impl) return new KorkjeTaskQueueAdapter(); + return new AlgTaskQueueAdapter(); +} + +/** + * Abstract base for TaskQueue implementations. + * + * Default behavior: + * - empty: O(1), derived from `length` + * - clear(): removes items by repeatedly calling `pop()` + * - Expected performance: + * - O(n) calls to `pop()` + * - If `pop()` is O(1), overall clear is O(n) + * - Why pop (not shift): avoids O(n^2) behavior for array-backed queues where `shift()` is O(n). + * + * Concrete adapters SHOULD override `clear()` when a more efficient mechanism exists. + * + */ +abstract class AbstractTaskQueue implements TaskQueue { + public abstract get length(): number; + + public get empty(): boolean { + return 0 === this.length; + } + + public abstract push(...items: T[]): number; + public abstract pop(): T | undefined; + public abstract shift(): T | undefined; + public abstract unshift(...items: T[]): number; + + /** + * Important for head-indexed arrays: + * Do NOT rely on AbstractTaskQueue.clear. + * This default implementation will not clear items + * before the head index. + * + */ + public clear(): void { + // O(n) pop() calls; + // this relies on pop() being O(1) to keep total O(n). + while (!this.empty) { + this.pop(); + } + } +} + +/** + * Native array-backed TaskQueue implementation. + * + * Performance notes: + * - push/pop: amortized O(1) + * - shift/unshift: O(n) due to element reindexing + * - clear(): O(1) by setting length = 0 + * + */ +class ArrayTaskQueue extends AbstractTaskQueue implements TaskQueue { + private readonly items: T[] = []; + private head = 0; + + // Match Array#length usage + public get length(): number { + return this.items.length - this.head; + } + + // Match Array#push(...items) usage + public push(...items: T[]): number { + this.items.push(...items); + return this.length; + } + + // Match Array#pop() usage + public pop(): T | undefined { + if (this.empty) return undefined; + + const v = this.items.pop(); + // If the queue is now logically empty, reset to release any already-shifted slots. + if (this.empty) this.clear(); + return v; + } + + // Match Array#shift() usage + public shift(): T | undefined { + if (this.empty) return undefined; + + const v = this.items[this.head++]; + // If the queue is now logically empty, reset to release any already-shifted slots. + if (this.empty) this.clear(); + // Periodically compact to avoid unbounded growth + if (this.head > 1024 && this.head * 2 > this.items.length) { + this.items.splice(0, this.head); + this.head = 0; + } + return v; + } + + // Match Array#unshift(...items) usage + public unshift(...items: T[]): number { + const k = items.length; + if (0 === k) return this.length; + + // Fast-path: there is enough unused space before `head` + // so we can place items into [head-k, head) and move head back. + if (this.head >= k) { + const start = this.head - k; + // Intentionally avoided: + // this.items.splice(start, k, ...items); + // The tight loop was considered less problematic over all. + for (let i = 0; i < k; i++) { + this.items[start + i] = items[i]; + } + this.head = start; + return this.length; + } + + // Not enough head-gap: compact then use native unshift. + // Compact only if we actually have skipped space. + if (this.head > 0) { + this.items.splice(0, this.head); + this.head = 0; + } + + this.items.unshift(...items); + return this.length; + } + + // Match "clear the queue" semantics + public override clear(): void { + // O(1) clear for arrays. + this.items.length = 0; + this.head = 0; + } +} + +/** + * Adapter for jsr:@alg/deque + * + * Performance notes (expected): + * - pushBack/popBack/pushFront/popFront: O(1) + * - clear(): O(1) by re-initializing the deque + * + * Safety: + * - AlgDeque throws on pop/shift from empty; we guard and return undefined. + * + */ +class AlgTaskQueueAdapter extends AbstractTaskQueue implements TaskQueue { + // implements linked blocks of arrays + private dq: AlgDeque = new AlgDeque(); + + public get length(): number { + return this.dq.length; + } + + /** + * TaskQueue API is Array-like and variadic: + * push(...items: T[]): number + * + * jsr:@alg/deque provides: + * pushBack(item: T): void // single item + * pushAllBack(items: Iterable): void // one iterable, not variadic + * + * We forward the rest-parameter array (`items`) to `pushAllBack` since + * arrays are Iterable. This preserves `queue.push(a, b, c)` semantics. + * + */ + public push(...items: T[]): number { + // Match Array#push: commonly returns new length; since underlying deque doesn’t, return length ourselves. + // items: T[] is treated as Iterable + this.dq.pushAllBack(items); + + return this.length; + } + + public pop(): T | undefined { + // AlgDeque throws on empty; we avoid exceptions by guarding. + if (this.empty) { + return undefined; + } + + return this.dq.popBack(); + } + + public shift(): T | undefined { + // AlgDeque throws on empty; we avoid exceptions by guarding. + if (this.empty) { + return undefined; + } + + return this.dq.popFront(); + } + + public unshift(...items: T[]): number { + // Preserve Array#unshift order: + // unshift(a, b) => a becomes index 0, then b becomes index 1. + // Using pushFront repeatedly per-item needs reverse iteration. + for (let i = items.length - 1; i >= 0; i--) { + this.dq.pushFront(items[i]); + } + return this.length; + } + + public override clear(): void { + // O(1) clear by replacing the underlying deque instance. + this.dq = new AlgDeque(); + } +} + +/** + * Adapter for jsr:@korkje/deque + * + * Performance notes (expected): + * - push/pop/shift/unshift: O(1) amortized (circular buffer) + * - clear(): O(1) via dq.clear() + * + * Safety: + * - We guard pop/shift to ensure undefined when empty regardless of library behavior. + * + */ +class KorkjeTaskQueueAdapter extends AbstractTaskQueue implements TaskQueue { + // manages an underlying array with a circular buffer + private readonly dq = new KorkjeDeque(); + + public get length(): number { + return this.dq.length; + } + + public override get empty(): boolean { + return this.dq.isEmpty(); + } + + public push(...items: T[]): number { + for (const item of items) { + this.dq.push(item); + } + return this.length; + } + + public pop(): T | undefined { + // KorkjeDeque is not expected to throw; we guard anyway. + if (this.empty) { + return undefined; + } + + return this.dq.pop(); + } + + public shift(): T | undefined { + // KorkjeDeque is not expected to throw; we guard anyway. + if (this.empty) { + return undefined; + } + + return this.dq.shift(); + } + + public unshift(...items: T[]): number { + // Preserve Array#unshift order. + // unshift(a, b) => a becomes index 0, then b becomes index 1. + // Using unshift repeatedly per-item needs reverse iteration. + for (let i = items.length - 1; i >= 0; i--) { + this.dq.unshift(items[i]); + } + return this.length; + } + + public override clear(): void { + this.dq.clear(); + } +} diff --git a/src/types.ts b/src/types.ts index 1f8c5e93..ae7e451f 100644 --- a/src/types.ts +++ b/src/types.ts @@ -36,8 +36,9 @@ export interface ResolveUrlResponse { resolved_url: string; } -export interface WorkerWithStatus extends Worker { - isIdle?: boolean; +export interface WorkerWithLimit extends Worker { + messagesRemaining: number; + messagesLimit: number; } export interface Task { @@ -46,6 +47,46 @@ export interface Task { reject: (error: any) => void; } +export interface TaskQueue { + /** Match Array#length usage */ + readonly length: number; + + /** Match Array#push(...items) usage */ + push(...items: T[]): number; + + /** Match Array#pop() usage */ + pop(): T | undefined; + + /** Match Array#shift() usage */ + shift(): T | undefined; + + /** Match Array#unshift(...items) usage */ + unshift(...items: T[]): number; + + /** Convenience boolean; should be implemented as `0 === this.length` */ + readonly empty: boolean; + + /** Match "clear the queue" semantics */ + clear(): void; +} + +export type InFlight = { task: Task; timeoutId: number }; + +export type SafeCallOptions = { + /** + * Optional label used when logging errors. + */ + label?: string; + /** + * If `true`, logs to console.error. If a function, called with (label, err). + */ + log?: boolean | ((label: string, err: unknown) => void); + /** + * Optional callback invoked when the call throws. + */ + onError?: (err: unknown) => void; +}; + export type ApiRequest = SignatureRequest | StsRequest | ResolveUrlRequest; // Parsing into this context helps avoid multi copies of requests @@ -53,4 +94,4 @@ export type ApiRequest = SignatureRequest | StsRequest | ResolveUrlRequest; export interface RequestContext { req: Request; body: ApiRequest; -} \ No newline at end of file +} diff --git a/src/utils.ts b/src/utils.ts index 43405f1c..05ce945c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,3 +1,5 @@ +import type { SafeCallOptions } from "./types.ts"; + const ALLOWED_HOSTNAMES = ["youtube.com", "www.youtube.com", "m.youtube.com"]; export function validateAndNormalizePlayerUrl(playerUrl: string): string { @@ -38,4 +40,95 @@ export function extractPlayerId(playerUrl: string): string { } } return 'unknown'; -} \ No newline at end of file +} +function looksLikeSafeCallOptions(v: unknown): v is SafeCallOptions { + if (v === null || typeof v !== "object") return false; + const o = v as Record; + + // If present, validate types + if ("label" in o && o.label !== undefined && typeof o.label !== "string") return false; + + if ("log" in o && o.log !== undefined) { + const log = o.log; + if (typeof log !== "boolean" && typeof log !== "function") return false; + } + + if ("onError" in o && o.onError !== undefined && typeof o.onError !== "function") return false; + + // Only consider it options if it has at least one of the known keys + return ("label" in o) || ("log" in o) || ("onError" in o); +} + +/** + * Calls `fn` safely (swallows exceptions), optionally logging and/or invoking `onError`. + * + * Preserves `this` by applying the function with the caller's `this`. + * - Typical usage: safeCall(task.resolve, data) + * - If you need to preserve a specific receiver: safeCall.call(obj, obj.method, arg) + */ +export function safeCall(this:unknown, fn: unknown, ...args: unknown[]): unknown { + if (typeof fn !== "function") return undefined; + + let options: SafeCallOptions | undefined; + if (args.length > 0 && looksLikeSafeCallOptions(args[args.length - 1])) { + options = args.pop() as SafeCallOptions; + } + + const label = options?.label ?? "safeCall"; + + try { + // Preserve the caller-provided receiver exactly. + // In module/strict mode, `this` is typically `undefined` for plain calls. + const receiver = this as unknown; + return Reflect.apply(fn, receiver, args); + } catch (err) { + try { + options?.onError?.(err); + } catch { + // ignore onError throws + } + + const log = options?.log; + if (typeof log === "function") { + try { + log(label, err); + } catch { + // ignore logger throws + } + } else if (log) { + const prefix = options?.label ? `[safeCall:${label}]` : "[safeCall]"; + console.error(prefix, err); + } + + return undefined; + } +} + +export function normalizeError(err: unknown, message?: string): Error { + if (err instanceof Error) { + return message ? new Error(message, { cause: err }) : err; + } + + let derived: string; + if (message !== undefined) { + derived = message; + } else if (typeof err === "string") { + derived = err; + } else if (err && typeof err === "object") { + const maybeMessage = (err as { message?: unknown }).message; + if (typeof maybeMessage === "string" && maybeMessage.length > 0) { + derived = maybeMessage; + } else { + try { + derived = JSON.stringify(err); + } catch { + derived = String(err); + } + } + } else { + derived = String(err); + } + + // Preserve the original thrown value for debugging. + return new Error(derived, { cause: err }); +} diff --git a/src/workerPool.ts b/src/workerPool.ts index 79b6a9b2..a2de4bc2 100644 --- a/src/workerPool.ts +++ b/src/workerPool.ts @@ -1,51 +1,484 @@ -import type { WorkerWithStatus, Task } from "./types.ts"; +import type { InFlight, Task, WorkerWithLimit } from "./types.ts"; +import { createTaskQueue } from "./taskQueueDeque.ts"; +import { normalizeError, safeCall } from "./utils.ts"; const CONCURRENCY = parseInt(Deno.env.get("MAX_THREADS") || "", 10) || navigator.hardwareConcurrency || 1; -const workers: WorkerWithStatus[] = []; -const taskQueue: Task[] = []; +// Keep the per-worker message budget consistent across the module. +// (Optional env override for testing/tuning.) +const parsedMessagesLimit = parseInt(Deno.env.get("MESSAGES_LIMIT") || "", 10); +const MESSAGES_LIMIT = Number.isFinite(parsedMessagesLimit) && parsedMessagesLimit > 0 + ? Math.max(10, Math.floor(parsedMessagesLimit)) + : 10_000; -function dispatch() { - const idleWorker = workers.find(w => w.isIdle); - if (!idleWorker || taskQueue.length === 0) { +const workers: WorkerWithLimit[] = []; +const idleWorkerSet = new Set(); +const idleWorkerStack: WorkerWithLimit[] = []; +const taskQueue = createTaskQueue(); +// Track enqueue timestamps without extending `Task`'s type surface. +const taskEnqueuedAt = new WeakMap(); +const inFlightTask = new WeakMap(); +// Workers that currently have an assigned task (purely "in-flight" tracking). +const inFlightWorker = new Set(); +// Workers that must not accept new tasks and must be retired after their current in-flight work finishes. +const retireAfterFlight: Set = new Set(); + +// Backpressure / liveness policies +const MAX_TASK_AGE_MS = 30 * 60 * 1000; // 30 minutes +const IN_FLIGHT_TIMEOUT_MS = 60 * 60 * 1000; // 60 minutes + +// When set, the pool is considered "fatally" broken and new work should fail fast. +let poolInitError: Error | null = null; + +// Bounded recovery with exponential backoff +const RECOVERY_BACKOFF_BASE_MS = 25; +const RECOVERY_BACKOFF_MAX_MS = 5_000; +const RECOVERY_FAILURE_THRESHOLD = 5; +let recoveryBackoffMs = RECOVERY_BACKOFF_BASE_MS; +let recoveryFailures = 0; + +let recoveryTimerId: number | null = null; + +let refillScheduled = false; + +function removeWorkerFromTracking(worker: WorkerWithLimit) { + clearIdle(worker); + const queueIdx = workers.indexOf(worker); + if (queueIdx >= 0) workers.splice(queueIdx, 1); +} + +function retireWorker(worker: WorkerWithLimit) { + // Defensive: ensure we don't leak message handlers or keep stale in-flight state + const inFlight = clearInFlight(worker); + if (inFlight) { + safeCall(inFlight.task.reject, new Error("Worker was retired while task was in-flight"), { + label: "inFlight.task.reject(retireWorker)", + log: true, + }); + } + + removeWorkerFromTracking(worker); + safeCall(worker.terminate.bind(worker), { + label: "worker.terminate(retireWorker)", + log: true, + }); + retireAfterFlight.delete(worker); +} + +function scheduleRefillAndDispatch() { + // If we've latched a fatal failure, fail fast and drain anything still queued. + if (poolInitError) { + drainAndRejectQueuedTasks(poolInitError); return; } - const task = taskQueue.shift()!; - idleWorker.isIdle = false; + // Avoid immediate recursive microtask rescheduling and avoid piling up timers. + if (refillScheduled) return; + refillScheduled = true; + + queueMicrotask(() => { + refillScheduled = false; + + // If a backoff timer is already pending, let it drive the next attempt. + if (recoveryTimerId !== null) return; + + try { + fillWorkers(); + // Opportunistically compact the stack. + // Extra pops for stale entries may eventually slow down dispatch. + if (idleWorkerStack.length > (16 + idleWorkerSet.size * 2)) { + idleWorkerStack.length = 0; + idleWorkerStack.push(...idleWorkerSet); + } + dispatch(); + + // Successful run: reset failure counter/backoff. + recoveryFailures = 0; + recoveryBackoffMs = RECOVERY_BACKOFF_BASE_MS; + } catch (err) { + const e = err instanceof Error ? err : new Error(String(err)); + recoveryFailures += 1; + + console.error( + `Worker pool refill/dispatch failed (attempt ${recoveryFailures}/${RECOVERY_FAILURE_THRESHOLD}); will retry with backoff:`, + e, + ); + + // Suspicious state recovery: + // - Keep queued tasks intact so they can be processed after recovery. + // - Do NOT clear in-flight handlers for tracked workers here; allow in-flight work to complete normally. + // - Quarantine all tracked workers by draining `workers` and setting their budget to 0. + // - Retire all currently-idle workers immediately. + // - Any quarantined worker that is not actually in-flight has no path to be observed/released, + // so terminate it now to avoid "zombie" workers. + + const quarantined: WorkerWithLimit[] = []; + const retireImmediately = new Set(); + while (workers.length > 0) { + const w = workers.pop()!; + w.messagesRemaining = 0; + quarantined.push(w); + // Track the workers being allowed to finish their assignments. + // When called below, retireWorker is expected to remove from both of these sets. + retireAfterFlight.add(w); + if (inFlightTask.has(w)) { + inFlightWorker.add(w); + } else { + retireImmediately.add(w); + } + } + const quarantinedSet = new Set(quarantined); + + while (idleWorkerStack.length > 0) { + const idleWorker = idleWorkerStack.pop()!; + if (!idleWorkerSet.has(idleWorker)) continue; // stale entry + clearIdle(idleWorker); // now reserved (not idle) + retireImmediately.add(idleWorker); + } + // Defensive: after quarantine/recovery, discard any lingering idle markers + // to avoid stale bookkeeping blocking future scheduling. + idleWorkerSet.clear(); + + // Terminate quarantined workers that are not actually in-flight. + for (const w of retireImmediately) { + retireWorker(w); + } + + // Consistency check: every worker we believe is in-flight must have been quarantined. + // If an unexpected in-flight worker exists, reject/cleanup/terminate it. + for (const w of [...inFlightWorker]) { + if (quarantinedSet.has(w)) continue; + console.error("Found unexpected in-flight worker during recovery; rejecting and terminating it."); + try { + const inFlight = clearInFlight(w); + if (inFlight) { + safeCall(inFlight.task.reject, new Error("Worker was unexpectedly found in-flight"), { + label: "inFlight.task.reject(recovery)", + log: true + }); + } + } finally { + retireWorker(w); + // scheduleRefillAndDispatch(); // already here + } + } + + /** + * TODO: Enable this after decisions are made or remove it. + // If we've exceeded our bounded recovery threshold, latch the pool failure and drain queued tasks. + if (recoveryFailures >= RECOVERY_FAILURE_THRESHOLD) { + poolInitError ??= new Error( + `Worker pool failed to recover after ${recoveryFailures} attempts: ${e.message}`, + ); + drainAndRejectQueuedTasks(poolInitError); + return; + } + */ + + // Exponential backoff with cap (no immediate recursive microtasks). + const delay = Math.min(recoveryBackoffMs, RECOVERY_BACKOFF_MAX_MS); + recoveryBackoffMs = Math.min(recoveryBackoffMs * 2, RECOVERY_BACKOFF_MAX_MS); + + recoveryTimerId = setTimeout(() => { + recoveryTimerId = null; + scheduleRefillAndDispatch(); + }, delay) as unknown as number; + } + }); +} + +function releaseWorker( + worker: WorkerWithLimit, + overrideSchedule: boolean = false, +) { + let schedule = (workers.length < CONCURRENCY); + + // Quarantine marker takes precedence: never return to idle once quarantined. + if (retireAfterFlight.has(worker)) { + // This was likely already zero, but set it anyway + worker.messagesRemaining = 0; + retireWorker(worker); + } else if (worker.messagesRemaining > 0) { + // Worker can take more work + setIdle(worker); + } else { + // Worker hit its limit; remove & replace + retireWorker(worker); + schedule = true; + } + + if (overrideSchedule) return; + // Keep the pool healthy and keep draining the queue + if (schedule || taskQueue.length > 0) + scheduleRefillAndDispatch(); +} + +function setIdle(worker: WorkerWithLimit) { + if (idleWorkerSet.has(worker)) return; // avoid stack duplicates + idleWorkerStack.push(worker); + idleWorkerSet.add(worker); +} + +function setInFlight(worker: WorkerWithLimit, task: Task) { + inFlightWorker.add(worker); + + const timeoutId = setTimeout(() => { + try { + const inFlight = clearInFlight(worker); + if (inFlight) { + safeCall(inFlight.task.reject, new Error("Worker task timed out"), { + label: "inFlight.task.reject(timeout)", + log: true, + }); + } + } finally { + retireWorker(worker); + scheduleRefillAndDispatch(); + } + }, IN_FLIGHT_TIMEOUT_MS) as unknown as number; + + inFlightTask.set(worker, { task, timeoutId } as InFlight); +} + +function clearIdle(worker: WorkerWithLimit): boolean { + const wasIdle = idleWorkerSet.delete(worker); + return wasIdle; +} + +function clearInFlight(worker: WorkerWithLimit): InFlight | undefined { + const inFlight = inFlightTask.get(worker); + try { + if (inFlight) { + const timeoutId = inFlight.timeoutId; + if (typeof timeoutId === "number") clearTimeout(timeoutId); + } + } finally { + inFlightTask.delete(worker); + inFlightWorker.delete(worker); + } + return inFlight; +} + +const workerMessageHandlerAttached = new WeakSet(); +function attachPermanentHandlers(worker: WorkerWithLimit) { + if (workerMessageHandlerAttached.has(worker)) return; + workerMessageHandlerAttached.add(worker); + + worker.addEventListener("message", (e: MessageEvent) => { + const { type, data } = (e.data ?? {}) as { type?: string; data?: any }; + + // Look up the current in-flight record for THIS worker. + const inFlight = inFlightTask.get(worker); + if (!inFlight) { + // Stray message: worker responded but we don't think it has a task. + // Treat as unhealthy to avoid corrupting queue semantics. + console.error("Worker sent message but no in-flight task was tracked; retiring worker."); + worker.messagesRemaining = 0; + retireWorker(worker); + scheduleRefillAndDispatch(); + return; + } + + const task = inFlight.task; + + try { + // Clear in-flight first to avoid re-entrancy / duplicate settle. + clearInFlight(worker); + + if (type === "success") { + if (typeof data === "string") { + safeCall(task.resolve, data, { label: "task.resolve", log: true }); + } else { + // Malformed response: mark worker as unhealthy + worker.messagesRemaining = 0; + safeCall(task.reject, new Error("Worker returned non-string success payload"), { + label: "task.reject(nonStringSuccess)", + log: true, + }); + } + } else { + // Treat worker-reported errors as unhealthy (as you already do) + worker.messagesRemaining = 0; + + console.error("Received error from worker:", data); + const err = new Error(data?.message ?? "Worker error"); + err.stack = data?.stack; + safeCall(task.reject, err, { label: "task.reject(workerError)", log: true }); + } + } finally { + releaseWorker(worker); + } + }); + + worker.addEventListener("messageerror", () => { + console.error("Worker message deserialization failed"); + try { + const inFlight = inFlightTask.get(worker); + if (inFlight) { + clearInFlight(worker); + safeCall(inFlight.task.reject, new Error("Worker message deserialization failed"), { + label: "inFlight.task.reject(messageerror)", + log: true, + }); + } + } finally { + retireWorker(worker); + scheduleRefillAndDispatch(); + } + }); + + worker.addEventListener("error", (ev: ErrorEvent) => { + console.error("Worker crashed:", ev.message); + try { + const inFlight = inFlightTask.get(worker); + if (inFlight) { + clearInFlight(worker); + safeCall(inFlight.task.reject, new Error(`Worker crashed: ${ev.message}`), { + label: "inFlight.task.reject(workerCrash)", + log: true, + }); + } + } finally { + retireWorker(worker); + scheduleRefillAndDispatch(); + } + }); +} + +function createWorker(): WorkerWithLimit { + const url = new URL("../worker.ts", import.meta.url); + const worker = new Worker(url.href, { type: "module" }) as WorkerWithLimit; + + if (!Object.isExtensible(worker)) { + safeCall(worker.terminate.bind(worker), { + label: "worker.terminate(createWorker)", + log: true, + }); + throw new Error("Worker instance is not extensible; cannot attach pool metadata"); + } + + try { + // Set and lock the limit + Object.defineProperty(worker, "messagesLimit", { + value: MESSAGES_LIMIT, + configurable: false, + writable: false, + enumerable: true, + }); - const messageHandler = (e: MessageEvent) => { - idleWorker.removeEventListener("message", messageHandler); - idleWorker.isIdle = true; + // Mutable, but not removable/re-definable pool metadata. + Object.defineProperty(worker, "messagesRemaining", { + value: MESSAGES_LIMIT, + configurable: false, + writable: true, + enumerable: true, + }); - const { type, data } = e.data; - if (type === 'success') { - task.resolve(data); - } else { - console.error("Received error from worker:", data); - const err = new Error(data.message); - err.stack = data.stack; - task.reject(err); + // handlers are available for use immediately + attachPermanentHandlers(worker); + } catch (e) { + safeCall(worker.terminate.bind(worker), { + label: "worker.terminate(createWorker)", + log: true, + }); + throw normalizeError(e); + } + + return worker; +} + +function takeIdleWorker(): WorkerWithLimit | undefined { + while (idleWorkerStack.length > 0) { + const w = idleWorkerStack.pop()!; + if (!idleWorkerSet.has(w)) continue; // stale entry + if (!Number.isFinite(w.messagesRemaining) || w.messagesRemaining <= 0 || retireAfterFlight.has(w)) { + w.messagesRemaining = 0; + releaseWorker(w, true); // do not schedule + continue; + } + clearIdle(w); // now reserved (not idle) + return w; + } + if (workers.length < CONCURRENCY) + scheduleRefillAndDispatch(); + return undefined; +} + +function dispatch() { + const now = Date.now(); + let idleWorker: ReturnType; + while (taskQueue.length > 0 && undefined !== (idleWorker = takeIdleWorker())) { + const worker = idleWorker; // capture for closure + const task = taskQueue.shift()!; + const enqueuedAt = taskEnqueuedAt.get(task) ?? now; + if (enqueuedAt < (now - MAX_TASK_AGE_MS)) { + try { + safeCall(task.reject, new Error("Task was queued longer than allowed"), { label: "task.reject(maxAge)", log: true }); + } finally { + releaseWorker(worker); + } + continue; } - dispatch(); // keep checking - }; - idleWorker.addEventListener("message", messageHandler); - idleWorker.postMessage(task.data); + try { + worker.messagesRemaining -= 1; + setInFlight(worker, task); + worker.postMessage(task.data); + } catch (err) { + // Worker may be unusable; replace it to avoid pool deadlocks. + worker.messagesRemaining = 0; + try { + const inFlight = clearInFlight(worker); + if (inFlight) { + safeCall(inFlight.task.reject, err, { label: "task.reject(postMessageFailure)", log: true }); + } + } finally { + releaseWorker(worker); + } + } + } +} + +function drainAndRejectQueuedTasks(err: Error) { + while (taskQueue.length > 0) { + const t = taskQueue.shift()!; + safeCall(t.reject, err, { label: "t.reject(drain)", log: true }); + } } export function execInPool(data: string): Promise { return new Promise((resolve, reject) => { - taskQueue.push({ data, resolve, reject }); - dispatch(); + // TODO: Before enabling `poolInitError` by setting it anywhere: + // - Define what counts as a *fatal* pool failure (only worker construction? repeated crashes? permissions?). + // - Decide if/when it should be cleared (never vs. on later successful `fillWorkers()`). + // - Decide whether callers should always fail fast or whether retry/backoff should be attempted. + // Until those decisions are made, `poolInitError` should remain unset (null). + if (poolInitError) { + safeCall(reject, poolInitError, { + label: "reject(execInPool)", + log: true, + }); + return; + } + + const task: Task = { data, resolve, reject }; + taskEnqueuedAt.set(task, Date.now()); + taskQueue.push(task); + scheduleRefillAndDispatch(); }); } -export function initializeWorkers() { - for (let i = 0; i < CONCURRENCY; i++) { - const worker: WorkerWithStatus = new Worker(new URL("../worker.ts", import.meta.url).href, { type: "module" }); - worker.isIdle = true; +function fillWorkers() { + while (workers.length < CONCURRENCY) { + const worker = createWorker(); + workers.push(worker); + setIdle(worker); } +} + +export function initializeWorkers() { + fillWorkers(); console.log(`Initialized ${CONCURRENCY} workers`); -} \ No newline at end of file +} +