Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 63 additions & 3 deletions src/runtime/process-io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const DEFAULT_MAX_LINE_LENGTH = 100 * 1024 * 1024;
/** Maximum stderr bytes to retain for diagnostics: 8KB */
const MAX_STDERR_BYTES = 8 * 1024;

/** Default write queue timeout: 30 seconds */
const DEFAULT_WRITE_QUEUE_TIMEOUT_MS = 30_000;

/** Regex for ANSI escape sequences */
const ANSI_ESCAPE_RE = /\u001b\[[0-9;]*[A-Za-z]/g;

Expand Down Expand Up @@ -64,6 +67,9 @@ export interface ProcessIOOptions {

/** Restart process after N requests (0 = never). Default: 0 */
restartAfterRequests?: number;

/** Write queue timeout in milliseconds. Default: 30000ms */
writeQueueTimeoutMs?: number;
}

/**
Expand All @@ -82,6 +88,8 @@ interface QueuedWrite {
data: string;
resolve: () => void;
reject: (error: Error) => void;
/** Timestamp when the write was queued */
queuedAt: number;
}

// =============================================================================
Expand Down Expand Up @@ -142,6 +150,7 @@ export class ProcessIO extends BoundedContext implements Transport {
private readonly cwd: string | undefined;
private readonly maxLineLength: number;
private readonly restartAfterRequests: number;
private readonly writeQueueTimeoutMs: number;

// Process state
private process: ChildProcess | null = null;
Expand Down Expand Up @@ -174,6 +183,7 @@ export class ProcessIO extends BoundedContext implements Transport {
this.cwd = options.cwd;
this.maxLineLength = options.maxLineLength ?? DEFAULT_MAX_LINE_LENGTH;
this.restartAfterRequests = options.restartAfterRequests ?? 0;
this.writeQueueTimeoutMs = options.writeQueueTimeoutMs ?? DEFAULT_WRITE_QUEUE_TIMEOUT_MS;
}

// ===========================================================================
Expand Down Expand Up @@ -417,10 +427,12 @@ export class ProcessIO extends BoundedContext implements Transport {

if (this.process.stdout) {
this.process.stdout.on('data', this.handleStdoutData.bind(this));
this.process.stdout.on('error', this.handleStdoutError.bind(this));
}

if (this.process.stderr) {
this.process.stderr.on('data', this.handleStderrData.bind(this));
this.process.stderr.on('error', this.handleStderrError.bind(this));
}

if (this.process.stdin) {
Expand Down Expand Up @@ -502,6 +514,17 @@ export class ProcessIO extends BoundedContext implements Transport {
await this.spawnProcess();
}

/**
* Mark the process for restart on the next send.
* This is called after stream errors to ensure the next request uses a fresh process.
*/
private markForRestart(): void {
if (this.restartAfterRequests > 0) {
// Set requestCount to trigger restart on next send
this.requestCount = this.restartAfterRequests;
}
}

// ===========================================================================
// STREAM HANDLERS
// ===========================================================================
Expand Down Expand Up @@ -660,6 +683,30 @@ export class ProcessIO extends BoundedContext implements Transport {

// Reject all pending requests
this.rejectAllPending(error);

// Mark for restart on next send
this.markForRestart();
}

/**
* Handle stdout error event.
* This can occur during pipe errors or when the process crashes.
*/
private handleStdoutError(err: Error): void {
const error = new BridgeProtocolError(`stdout error: ${err.message}`);
this.rejectAllPending(error);
this.markForRestart();
}

/**
* Handle stderr error event.
* This can occur during pipe errors or when the process crashes.
*/
private handleStderrError(err: Error): void {
// Stderr errors are less critical but still indicate process health issues
const error = new BridgeProtocolError(`stderr error: ${err.message}`);
this.rejectAllPending(error);
this.markForRestart();
}

// ===========================================================================
Expand All @@ -677,8 +724,8 @@ export class ProcessIO extends BoundedContext implements Transport {
}

if (this.draining || this.writeQueue.length > 0) {
// Queue the write
this.writeQueue.push({ data, resolve, reject });
// Queue the write with timestamp
this.writeQueue.push({ data, resolve, reject, queuedAt: Date.now() });
return;
}

Expand All @@ -691,10 +738,11 @@ export class ProcessIO extends BoundedContext implements Transport {
} else {
// Backpressure - queue this write and set draining flag
this.draining = true;
this.writeQueue.push({ data, resolve, reject });
this.writeQueue.push({ data, resolve, reject, queuedAt: Date.now() });
}
} catch (err) {
// Synchronous write error (e.g., EPIPE)
this.markForRestart();
reject(new BridgeProtocolError(`Write error: ${err instanceof Error ? err.message : 'unknown'}`));
}
});
Expand All @@ -704,13 +752,16 @@ export class ProcessIO extends BoundedContext implements Transport {
* Flush queued writes when backpressure clears.
*/
private flushWriteQueue(): void {
const now = Date.now();

while (this.writeQueue.length > 0 && !this.draining) {
if (!this.process?.stdin || this.processExited) {
// Process died - reject all queued writes
for (const q of this.writeQueue) {
q.reject(new BridgeProtocolError('Process stdin not available'));
}
this.writeQueue.length = 0;
this.markForRestart();
return;
}

Expand All @@ -719,6 +770,14 @@ export class ProcessIO extends BoundedContext implements Transport {
return;
}

// Check for write queue timeout
if (now - queued.queuedAt > this.writeQueueTimeoutMs) {
queued.reject(new BridgeTimeoutError(
`Write queue timeout: entry waited ${now - queued.queuedAt}ms (limit: ${this.writeQueueTimeoutMs}ms)`
));
continue; // Process next entry
}

try {
const canWrite = this.process.stdin.write(queued.data);

Expand All @@ -740,6 +799,7 @@ export class ProcessIO extends BoundedContext implements Transport {
q.reject(error);
}
this.writeQueue.length = 0;
this.markForRestart();
return;
}
}
Expand Down
Loading