diff --git a/.changeset/empty-poets-serve.md b/.changeset/empty-poets-serve.md new file mode 100644 index 00000000..dfcd320c --- /dev/null +++ b/.changeset/empty-poets-serve.md @@ -0,0 +1,5 @@ +--- +"@cloudflare/sandbox": patch +--- + +feat: add file watching capabilities with inotify support diff --git a/packages/sandbox-container/src/core/container.ts b/packages/sandbox-container/src/core/container.ts index db3a6392..8e4814ef 100644 --- a/packages/sandbox-container/src/core/container.ts +++ b/packages/sandbox-container/src/core/container.ts @@ -8,6 +8,7 @@ import { MiscHandler } from '../handlers/misc-handler'; import { PortHandler } from '../handlers/port-handler'; import { ProcessHandler } from '../handlers/process-handler'; import { SessionHandler } from '../handlers/session-handler'; +import { WatchHandler } from '../handlers/watch-handler'; import { CorsMiddleware } from '../middleware/cors'; import { LoggingMiddleware } from '../middleware/logging'; import { SecurityServiceAdapter } from '../security/security-adapter'; @@ -19,6 +20,7 @@ import { InMemoryPortStore, PortService } from '../services/port-service'; import { ProcessService } from '../services/process-service'; import { ProcessStore } from '../services/process-store'; import { SessionManager } from '../services/session-manager'; +import { WatchService } from '../services/watch-service'; export interface Dependencies { // Services @@ -27,6 +29,7 @@ export interface Dependencies { portService: PortService; gitService: GitService; interpreterService: InterpreterService; + watchService: WatchService; // Infrastructure logger: Logger; @@ -41,6 +44,7 @@ export interface Dependencies { interpreterHandler: InterpreterHandler; sessionHandler: SessionHandler; miscHandler: MiscHandler; + watchHandler: WatchHandler; // Middleware corsMiddleware: CorsMiddleware; @@ -112,6 +116,7 @@ export class Container { sessionManager ); const interpreterService = new InterpreterService(logger); + const watchService = new WatchService(logger); // Initialize handlers const sessionHandler = new SessionHandler(sessionManager, logger); @@ -125,6 +130,7 @@ export class Container { logger ); const miscHandler = new MiscHandler(logger); + const watchHandler = new WatchHandler(watchService, logger); // Initialize middleware const corsMiddleware = new CorsMiddleware(); @@ -138,6 +144,7 @@ export class Container { portService, gitService, interpreterService, + watchService, // Infrastructure logger, @@ -152,6 +159,7 @@ export class Container { interpreterHandler, sessionHandler, miscHandler, + watchHandler, // Middleware corsMiddleware, diff --git a/packages/sandbox-container/src/handlers/watch-handler.ts b/packages/sandbox-container/src/handlers/watch-handler.ts new file mode 100644 index 00000000..757651ac --- /dev/null +++ b/packages/sandbox-container/src/handlers/watch-handler.ts @@ -0,0 +1,120 @@ +import type { Logger, WatchRequest, WatchStopResult } from '@repo/shared'; +import { ErrorCode } from '@repo/shared/errors'; +import type { RequestContext } from '../core/types'; +import type { WatchService } from '../services/watch-service'; +import { BaseHandler } from './base-handler'; + +/** + * Handler for file watch operations + */ +export class WatchHandler extends BaseHandler { + constructor( + private watchService: WatchService, + logger: Logger + ) { + super(logger); + } + + async handle(request: Request, context: RequestContext): Promise { + const url = new URL(request.url); + const pathname = url.pathname; + + switch (pathname) { + case '/api/watch': + return this.handleWatch(request, context); + case '/api/watch/stop': + return this.handleStopWatch(request, context); + case '/api/watch/list': + return this.handleListWatches(request, context); + default: + return this.createErrorResponse( + { + message: 'Invalid watch endpoint', + code: ErrorCode.UNKNOWN_ERROR + }, + context + ); + } + } + + /** + * Start watching a directory + * Returns an SSE stream of file change events + */ + private async handleWatch( + request: Request, + context: RequestContext + ): Promise { + const body = await this.parseRequestBody(request); + + // Resolve path - if relative, resolve from /workspace + let watchPath = body.path; + if (!watchPath.startsWith('/')) { + watchPath = `/workspace/${watchPath}`; + } + + const result = await this.watchService.watchDirectory(watchPath, { + ...body, + path: watchPath + }); + + if (result.success) { + return new Response(result.data, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + ...context.corsHeaders + } + }); + } else { + return this.createErrorResponse(result.error, context); + } + } + + /** + * Stop a specific watch by ID + */ + private async handleStopWatch( + request: Request, + context: RequestContext + ): Promise { + const body = await this.parseRequestBody<{ watchId: string }>(request); + + const result = await this.watchService.stopWatch(body.watchId); + + if (result.success) { + const response: WatchStopResult = { + success: true, + watchId: body.watchId, + timestamp: new Date().toISOString() + }; + return this.createTypedResponse(response, context); + } else { + return this.createErrorResponse(result.error, context); + } + } + + /** + * List all active watches + */ + private async handleListWatches( + _request: Request, + context: RequestContext + ): Promise { + const watches = this.watchService.getActiveWatches(); + + const response = { + success: true, + watches: watches.map((w) => ({ + id: w.id, + path: w.path, + startedAt: w.startedAt.toISOString() + })), + count: watches.length, + timestamp: new Date().toISOString() + }; + + return this.createTypedResponse(response, context); + } +} diff --git a/packages/sandbox-container/src/handlers/ws-adapter.ts b/packages/sandbox-container/src/handlers/ws-adapter.ts index 23d74bfb..fbe9d692 100644 --- a/packages/sandbox-container/src/handlers/ws-adapter.ts +++ b/packages/sandbox-container/src/handlers/ws-adapter.ts @@ -38,6 +38,8 @@ export interface WSData { export class WebSocketAdapter { private router: Router; private logger: Logger; + /** Track active streaming responses to prevent garbage collection */ + private activeStreams: Map> = new Map(); constructor(router: Router, logger: Logger) { this.router = router; @@ -162,7 +164,29 @@ export class WebSocketAdapter { if (isStreaming && httpResponse.body) { // Handle SSE streaming response - await this.handleStreamingResponse(ws, request.id, httpResponse); + // CRITICAL: We must capture the Response body reader BEFORE the promise starts executing + // asynchronously. If we call getReader() inside handleStreamingResponse after an await, + // Bun's WebSocket handler may GC or invalidate the Response body when onMessage returns. + // By getting the reader synchronously here, we ensure the stream remains valid. + const reader = + httpResponse.body.getReader() as ReadableStreamDefaultReader; + const streamPromise = this.handleStreamingResponseWithReader( + ws, + request.id, + httpResponse.status, + reader + ) + .catch((error: unknown) => { + this.logger.error( + 'Error in streaming response', + error instanceof Error ? error : new Error(String(error)), + { requestId: request.id } + ); + }) + .finally(() => { + this.activeStreams.delete(request.id); + }); + this.activeStreams.set(request.id, streamPromise); } else { // Handle regular response await this.handleRegularResponse(ws, request.id, httpResponse); @@ -198,39 +222,65 @@ export class WebSocketAdapter { } /** - * Handle a streaming (SSE) HTTP response + * Handle a streaming (SSE) HTTP response with a pre-acquired reader + * + * This variant receives the reader instead of the Response, allowing the caller + * to acquire the reader synchronously before any await points. This is critical + * for WebSocket streaming because Bun's message handler may invalidate the + * Response body if the reader is acquired after the handler returns. */ - private async handleStreamingResponse( + private async handleStreamingResponseWithReader( ws: ServerWebSocket, requestId: string, - response: Response + status: number, + reader: ReadableStreamDefaultReader ): Promise { - if (!response.body) { - this.sendError(ws, requestId, 'STREAM_ERROR', 'No response body', 500); - return; - } + this.logger.debug('Starting streaming response handler', { requestId }); - const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; + // Track partial event state across chunks + let currentEvent: { event?: string; data: string[] } = { data: [] }; + let chunkCount = 0; try { while (true) { + this.logger.debug('Waiting for stream chunk', { + requestId, + chunkCount + }); const { done, value } = await reader.read(); if (done) { + this.logger.debug('Stream ended', { requestId, chunkCount }); break; } + chunkCount++; + // Decode chunk and add to buffer - buffer += decoder.decode(value, { stream: true }); + const chunkText = decoder.decode(value, { stream: true }); + this.logger.debug('Received stream chunk', { + requestId, + chunkCount, + chunkLength: chunkText.length, + chunkPreview: chunkText.substring(0, 100) + }); + buffer += chunkText; - // Parse SSE events from buffer - const events = this.parseSSEEvents(buffer); - buffer = events.remaining; + // Parse SSE events from buffer, preserving partial event state + const result = this.parseSSEEvents(buffer, currentEvent); + buffer = result.remaining; + currentEvent = result.currentEvent; + + this.logger.debug('Parsed SSE events', { + requestId, + eventCount: result.events.length, + remainingBuffer: result.remaining.length + }); // Send each parsed event as a stream chunk - for (const event of events.events) { + for (const event of result.events) { const chunk: WSStreamChunk = { type: 'stream', id: requestId, @@ -247,7 +297,7 @@ export class WebSocketAdapter { const wsResponse: WSResponse = { type: 'response', id: requestId, - status: response.status, + status, done: true }; this.send(ws, wsResponse); @@ -272,8 +322,8 @@ export class WebSocketAdapter { /** * Parse SSE events from a buffer * - * Returns parsed events and any remaining unparsed content (incomplete lines - * waiting for more data from the next chunk). + * Returns parsed events, remaining unparsed content, and current partial event state. + * The currentEvent parameter allows preserving state across chunk boundaries. * * Note: This is a minimal SSE parser that only handles `event:` and `data:` * fields - sufficient for our streaming handlers which only emit these. @@ -282,12 +332,15 @@ export class WebSocketAdapter { * - `retry:` field (reconnection timing hints) * - Comment lines (starting with `:`) */ - private parseSSEEvents(buffer: string): { + private parseSSEEvents( + buffer: string, + currentEvent: { event?: string; data: string[] } = { data: [] } + ): { events: Array<{ event?: string; data: string }>; remaining: string; + currentEvent: { event?: string; data: string[] }; } { const events: Array<{ event?: string; data: string }> = []; - let currentEvent: { event?: string; data: string[] } = { data: [] }; let i = 0; while (i < buffer.length) { @@ -317,7 +370,8 @@ export class WebSocketAdapter { return { events, - remaining: buffer.substring(i) + remaining: buffer.substring(i), + currentEvent }; } diff --git a/packages/sandbox-container/src/routes/setup.ts b/packages/sandbox-container/src/routes/setup.ts index 16e2b999..d52dd027 100644 --- a/packages/sandbox-container/src/routes/setup.ts +++ b/packages/sandbox-container/src/routes/setup.ts @@ -274,6 +274,28 @@ export function setupRoutes(router: Router, container: Container): void { middleware: [container.get('loggingMiddleware')] }); + // File watch routes + router.register({ + method: 'POST', + path: '/api/watch', + handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx), + middleware: [container.get('loggingMiddleware')] + }); + + router.register({ + method: 'POST', + path: '/api/watch/stop', + handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx), + middleware: [container.get('loggingMiddleware')] + }); + + router.register({ + method: 'GET', + path: '/api/watch/list', + handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx), + middleware: [container.get('loggingMiddleware')] + }); + // Miscellaneous routes router.register({ method: 'GET', diff --git a/packages/sandbox-container/src/services/watch-service.ts b/packages/sandbox-container/src/services/watch-service.ts new file mode 100644 index 00000000..6e71aecc --- /dev/null +++ b/packages/sandbox-container/src/services/watch-service.ts @@ -0,0 +1,636 @@ +import type { + FileWatchEventType, + FileWatchSSEEvent, + Logger, + WatchRequest +} from '@repo/shared'; +import { ErrorCode } from '@repo/shared/errors'; +import type { Subprocess } from 'bun'; +import type { ServiceResult } from '../core/types'; +import { serviceError, serviceSuccess } from '../core/types'; + +interface ActiveWatch { + id: string; + path: string; + process: Subprocess; + options: WatchRequest; + startedAt: Date; +} + +/** + * Service for watching filesystem changes using inotifywait + */ +export class WatchService { + private activeWatches: Map = new Map(); + private watchCounter = 0; + + constructor(private logger: Logger) {} + + /** + * Start watching a directory for changes + * Returns a ReadableStream of SSE events + * + * @param path - Absolute path to watch + * @param options - Watch options + */ + async watchDirectory( + path: string, + options: WatchRequest = { path } + ): Promise>> { + const watchId = `watch-${++this.watchCounter}-${Date.now()}`; + const watchLogger = this.logger.child({ watchId, path }); + + // Verify path exists + const pathCheck = Bun.spawnSync(['test', '-e', path]); + if (pathCheck.exitCode !== 0) { + return serviceError({ + message: `Path does not exist: ${path}`, + code: ErrorCode.FILE_NOT_FOUND, + details: { path } + }); + } + + // Build inotifywait command + const args = this.buildInotifyArgs(path, options); + watchLogger.debug('Starting inotifywait', { args }); + + try { + const proc = Bun.spawn(['inotifywait', ...args], { + stdout: 'pipe', + stderr: 'pipe' + }); + + // Store active watch + this.activeWatches.set(watchId, { + id: watchId, + path, + process: proc, + options, + startedAt: new Date() + }); + + // Create SSE stream from inotifywait output + const stream = this.createWatchStream( + watchId, + path, + proc, + options, + watchLogger + ); + + return serviceSuccess(stream); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + watchLogger.error('Failed to start inotifywait', err); + return serviceError({ + message: `Failed to start file watcher: ${error instanceof Error ? error.message : 'Unknown error'}`, + code: ErrorCode.WATCH_START_ERROR, + details: { path } + }); + } + } + + /** + * Stop a specific watch + */ + async stopWatch(watchId: string): Promise> { + const watch = this.activeWatches.get(watchId); + if (!watch) { + return serviceError({ + message: `Watch not found: ${watchId}`, + code: ErrorCode.WATCH_NOT_FOUND, + details: { watchId } + }); + } + + try { + watch.process.kill(); + this.activeWatches.delete(watchId); + this.logger.info('Watch stopped', { watchId, path: watch.path }); + return serviceSuccess(undefined); + } catch (error) { + return serviceError({ + message: `Failed to stop watch: ${error instanceof Error ? error.message : 'Unknown error'}`, + code: ErrorCode.UNKNOWN_ERROR, + details: { watchId } + }); + } + } + + /** + * Stop all active watches + */ + async stopAllWatches(): Promise { + let count = 0; + for (const [watchId, watch] of this.activeWatches) { + try { + watch.process.kill(); + count++; + } catch (error) { + // Log the actual error for debugging + // Expected case: process already exited (no longer running) + // Unexpected: permission errors, system issues + this.logger.warn('Failed to kill watch process', { + watchId, + error: error instanceof Error ? error.message : String(error), + path: watch.path + }); + } + } + this.activeWatches.clear(); + return count; + } + + /** + * Get list of active watches + */ + getActiveWatches(): Array<{ id: string; path: string; startedAt: Date }> { + return Array.from(this.activeWatches.values()).map((w) => ({ + id: w.id, + path: w.path, + startedAt: w.startedAt + })); + } + + private buildInotifyArgs(path: string, options: WatchRequest): string[] { + const args: string[] = [ + '-m', // Monitor mode (continuous) + '--format', + '%e|%w%f' // event|path (ISDIR is part of event flags) + ]; + + // Recursive watching + if (options.recursive !== false) { + args.push('-r'); + } + + // Event types + const events: FileWatchEventType[] = options.events || [ + 'create', + 'modify', + 'delete', + 'move_from', + 'move_to' + ]; + const inotifyEvents = events + .map((e) => this.mapEventType(e)) + .filter((e): e is string => e !== undefined); + if (inotifyEvents.length > 0) { + args.push('-e', inotifyEvents.join(',')); + } + + // Exclude patterns - convert to regex for inotifywait + // inotifywait --exclude uses POSIX extended regex matching against full path + // NOTE: inotifywait only supports a single --exclude option, so we combine all patterns + const excludes = options.exclude || ['.git', 'node_modules', '.DS_Store']; + if (excludes.length > 0) { + // Escape regex metacharacters and wrap each pattern to match anywhere in path + const escapedPatterns = excludes.map((pattern) => { + const escaped = pattern.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); + return `(^|/)${escaped}(/|$)`; + }); + // Combine all patterns with OR operator + args.push('--exclude', escapedPatterns.join('|')); + } + + // Add path last + args.push(path); + + return args; + } + + private mapEventType(type: FileWatchEventType): string | undefined { + const mapping: Record = { + create: 'create', + modify: 'modify', + delete: 'delete', + move_from: 'moved_from', + move_to: 'moved_to', + attrib: 'attrib' + }; + return mapping[type]; + } + + private parseInotifyEvent(line: string): { + eventType: FileWatchEventType; + path: string; + isDirectory: boolean; + } | null { + // Format: EVENT|/path/to/file|EVENT_FLAGS + // The third part (%:e) contains colon-separated flags like CREATE:ISDIR + const parts = line.trim().split('|'); + if (parts.length < 2) return null; + + const [rawEvent, filePath, flagsPart] = parts; + // Check if ISDIR appears in either the event or the flags + const isDirectory = + rawEvent.includes('ISDIR') || (flagsPart?.includes('ISDIR') ?? false); + + // Map inotify event back to our type + const eventType = this.parseEventType(rawEvent); + if (!eventType) return null; + + return { eventType, path: filePath, isDirectory }; + } + + private parseEventType(rawEvent: string): FileWatchEventType | null { + // inotify can emit multiple events like "CREATE,ISDIR" + const events = rawEvent.split(','); + const primary = events[0].toLowerCase(); + + const mapping: Record = { + create: 'create', + modify: 'modify', + delete: 'delete', + moved_from: 'move_from', + moved_to: 'move_to', + attrib: 'attrib', + // Handle close_write as modify (common for editors) + close_write: 'modify' + }; + + return mapping[primary] || null; + } + + private createWatchStream( + watchId: string, + path: string, + proc: Subprocess, + options: WatchRequest, + logger: Logger + ): ReadableStream { + const encoder = new TextEncoder(); + const includePatterns = options.include; + const self = this; + const stdout = proc.stdout; + const stderr = proc.stderr; + + if (!stdout || typeof stdout === 'number') { + // Return a stream that immediately errors + return new ReadableStream({ + start(controller) { + const errorEvent: FileWatchSSEEvent = { + type: 'error', + error: 'Failed to capture process output' + }; + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`) + ); + controller.close(); + } + }); + } + + return new ReadableStream({ + async start(controller) { + // Wait for inotifywait to establish watches before sending watching event + // This ensures clients know the watch is truly ready to detect changes + if (stderr && typeof stderr !== 'number') { + await self.waitForWatchesEstablished( + stderr, + controller, + encoder, + logger + ); + } + + // Send watching event only after watches are established + const watchingEvent: FileWatchSSEEvent = { + type: 'watching', + path, + watchId + }; + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(watchingEvent)}\n\n`) + ); + + // Read stdout line by line + const reader = stdout.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + const processLine = (line: string) => { + const parsed = self.parseInotifyEvent(line); + if (!parsed) return; + + // Apply include filter if specified + if (includePatterns && includePatterns.length > 0) { + const matches = includePatterns.some((pattern: string) => + self.matchGlob(parsed.path, pattern) + ); + if (!matches) return; + } + + const event: FileWatchSSEEvent = { + type: 'event', + eventType: parsed.eventType, + path: parsed.path, + isDirectory: parsed.isDirectory, + timestamp: new Date().toISOString() + }; + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(event)}\n\n`) + ); + }; + + try { + logger.debug('Starting to read inotifywait stdout'); + while (true) { + const { done, value } = await reader.read(); + if (done) { + logger.debug('inotifywait stdout stream ended'); + break; + } + + const chunk = decoder.decode(value, { stream: true }); + logger.debug('Received chunk from inotifywait', { + chunkLength: chunk.length, + chunk: chunk.substring(0, 200) + }); + buffer += chunk; + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (line.trim()) { + logger.debug('Processing inotifywait line', { line }); + processLine(line); + } + } + } + + // Process any remaining buffer + if (buffer.trim()) { + processLine(buffer); + } + + // Send stopped event + const stoppedEvent: FileWatchSSEEvent = { + type: 'stopped', + reason: 'Watch process ended' + }; + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(stoppedEvent)}\n\n`) + ); + controller.close(); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + logger.error('Error reading watch output', err); + const errorEvent: FileWatchSSEEvent = { + type: 'error', + error: error instanceof Error ? error.message : 'Unknown error' + }; + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`) + ); + controller.close(); + } finally { + // Ensure process is killed on any exit path + try { + proc.kill(); + } catch { + // Process may already be dead + } + self.activeWatches.delete(watchId); + } + }, + + cancel() { + // Clean up when stream is cancelled + try { + proc.kill(); + } catch (error) { + logger.warn('Failed to kill watch process on cancel', { + watchId, + error: error instanceof Error ? error.message : 'Unknown error' + }); + } + self.activeWatches.delete(watchId); + logger.info('Watch cancelled', { watchId }); + } + }); + } + + /** + * Wait for inotifywait to output "Watches established" on stderr. + * This ensures the watch is ready to detect file changes before we signal readiness to clients. + * After watches are established, continues monitoring stderr for errors in background. + * + * Has a timeout to prevent hanging if inotifywait behaves unexpectedly. + */ + private async waitForWatchesEstablished( + stderr: ReadableStream, + controller: ReadableStreamDefaultController, + encoder: TextEncoder, + logger: Logger + ): Promise { + const reader = stderr.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + let established = false; + + // Timeout after 10 seconds - inotifywait should establish watches quickly + const timeoutMs = 10000; + const timeoutPromise = new Promise<'timeout'>((resolve) => + setTimeout(() => resolve('timeout'), timeoutMs) + ); + + const readLoop = async (): Promise<'done' | 'established'> => { + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + logger.debug('stderr ended before watches established'); + return 'done'; + } + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed) { + if (trimmed.includes('Watches established')) { + logger.debug('inotifywait watches established'); + established = true; + return 'established'; + } + if (trimmed.includes('Setting up watches')) { + logger.debug('inotifywait setting up watches', { + message: trimmed + }); + continue; + } + // Actual error from inotifywait + logger.warn('inotifywait stderr during setup', { + message: trimmed + }); + const errorEvent: FileWatchSSEEvent = { + type: 'error', + error: trimmed + }; + try { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`) + ); + } catch { + // Controller may be closed + return 'done'; + } + } + } + } + } catch (error) { + logger.debug('stderr reading ended during setup', { + error: error instanceof Error ? error.message : 'Unknown' + }); + return 'done'; + } + }; + + const result = await Promise.race([readLoop(), timeoutPromise]); + + if (result === 'timeout') { + logger.warn('Timeout waiting for inotifywait to establish watches'); + // Continue anyway - watches might still work + } + + if (established) { + // Continue monitoring stderr for errors in background + this.continueStderrMonitoring( + reader, + decoder, + buffer, + controller, + encoder, + logger + ); + } else { + // Release the reader if we're not continuing to monitor + try { + reader.releaseLock(); + } catch { + // Reader may already be released + } + } + } + + /** + * Continue monitoring stderr for errors after watches are established. + * Runs in background without blocking. + */ + private continueStderrMonitoring( + reader: { read(): Promise<{ done: boolean; value?: Uint8Array }> }, + decoder: TextDecoder, + initialBuffer: string, + controller: ReadableStreamDefaultController, + encoder: TextEncoder, + logger: Logger + ): void { + (async () => { + let buffer = initialBuffer; + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed) { + // Skip info messages + if ( + trimmed.includes('Watches established') || + trimmed.includes('Setting up watches') + ) { + logger.debug('inotifywait info', { message: trimmed }); + continue; + } + + logger.warn('inotifywait stderr', { message: trimmed }); + const errorEvent: FileWatchSSEEvent = { + type: 'error', + error: trimmed + }; + try { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify(errorEvent)}\n\n`) + ); + } catch { + // Controller may be closed + break; + } + } + } + } + } catch (error) { + // Stream closed or other error - expected when process terminates + logger.debug('stderr monitoring ended', { + error: error instanceof Error ? error.message : 'Unknown' + }); + } + })(); + } + + /** + * Simple glob matching for include patterns + * Converts glob pattern to regex character-by-character for security + */ + private matchGlob(filePath: string, pattern: string): boolean { + const regexParts: string[] = ['^']; + + for (let i = 0; i < pattern.length; i++) { + const char = pattern[i]; + + switch (char) { + case '*': + // ** matches any path segments, * matches any chars except / + if (pattern[i + 1] === '*') { + regexParts.push('.*'); + i++; // Skip next * + } else { + regexParts.push('[^/]*'); + } + break; + case '?': + // ? matches single char except / + regexParts.push('[^/]'); + break; + case '.': + case '+': + case '^': + case '$': + case '{': + case '}': + case '(': + case ')': + case '|': + case '\\': + // Escape regex metacharacters + regexParts.push(`\\${char}`); + break; + case '[': + // Character classes - find matching ] and treat literally + // to prevent [a-z] from being interpreted as regex range + regexParts.push('\\['); + break; + case ']': + regexParts.push('\\]'); + break; + default: + regexParts.push(char); + } + } + + regexParts.push('$'); + const regex = new RegExp(regexParts.join('')); + + // Match against filename only (for patterns like *.ts) + const filename = filePath.split('/').pop() || ''; + return regex.test(filename); + } +} diff --git a/packages/sandbox-container/tests/handlers/ws-adapter.test.ts b/packages/sandbox-container/tests/handlers/ws-adapter.test.ts index 93d91ab0..68d5ea5c 100644 --- a/packages/sandbox-container/tests/handlers/ws-adapter.test.ts +++ b/packages/sandbox-container/tests/handlers/ws-adapter.test.ts @@ -229,6 +229,21 @@ describe('WebSocketAdapter', () => { await adapter.onMessage(mockWs as any, JSON.stringify(request)); + // Streaming runs in background to avoid blocking the message handler. + // Wait for the final response which indicates streaming completed. + const waitForFinalResponse = async (maxWait = 1000) => { + const start = Date.now(); + while (Date.now() - start < maxWait) { + const messages = mockWs.getSentMessages(); + const finalResponse = messages.find( + (m) => m.type === 'response' && m.done === true + ); + if (finalResponse) return; + await new Promise((resolve) => setTimeout(resolve, 10)); + } + }; + await waitForFinalResponse(); + // Should have received stream chunks and final response const messages = mockWs.getSentMessages(); diff --git a/packages/sandbox-container/tests/services/watch-service.test.ts b/packages/sandbox-container/tests/services/watch-service.test.ts new file mode 100644 index 00000000..127a909f --- /dev/null +++ b/packages/sandbox-container/tests/services/watch-service.test.ts @@ -0,0 +1,386 @@ +import { beforeEach, describe, expect, it, vi } from 'bun:test'; +import type { WatchRequest } from '@repo/shared'; +import { createNoOpLogger } from '@repo/shared'; +import { ErrorCode } from '@repo/shared/errors'; +import { WatchService } from '@sandbox-container/services/watch-service'; + +const mockLogger = createNoOpLogger(); + +/** + * Type-safe accessor for testing private WatchService methods. + * Uses module augmentation to expose private methods for testing only. + */ +interface WatchServiceTestAccessor { + matchGlob(filePath: string, pattern: string): boolean; + parseInotifyEvent(line: string): { + eventType: string; + path: string; + isDirectory: boolean; + } | null; + buildInotifyArgs(path: string, options: WatchRequest): string[]; +} + +describe('WatchService', () => { + let watchService: WatchService; + + beforeEach(() => { + vi.clearAllMocks(); + watchService = new WatchService(mockLogger); + }); + + describe('matchGlob', () => { + // Access private method for testing via type assertion + const testMatchGlob = ( + service: WatchService, + path: string, + pattern: string + ): boolean => { + return (service as unknown as WatchServiceTestAccessor).matchGlob( + path, + pattern + ); + }; + + describe('basic patterns', () => { + it('should match exact filename', () => { + expect(testMatchGlob(watchService, '/app/file.ts', 'file.ts')).toBe( + true + ); + expect(testMatchGlob(watchService, '/app/other.ts', 'file.ts')).toBe( + false + ); + }); + + it('should match wildcard extension', () => { + expect(testMatchGlob(watchService, '/app/code.ts', '*.ts')).toBe(true); + expect(testMatchGlob(watchService, '/app/code.js', '*.ts')).toBe(false); + expect(testMatchGlob(watchService, '/app/code.tsx', '*.ts')).toBe( + false + ); + }); + + it('should match wildcard prefix', () => { + expect( + testMatchGlob(watchService, '/app/test.spec.ts', '*.spec.ts') + ).toBe(true); + expect(testMatchGlob(watchService, '/app/test.ts', '*.spec.ts')).toBe( + false + ); + }); + + it('should match single character wildcard', () => { + expect(testMatchGlob(watchService, '/app/file1.ts', 'file?.ts')).toBe( + true + ); + expect(testMatchGlob(watchService, '/app/file2.ts', 'file?.ts')).toBe( + true + ); + expect(testMatchGlob(watchService, '/app/file12.ts', 'file?.ts')).toBe( + false + ); + }); + }); + + describe('security - regex metacharacters', () => { + it('should escape dots literally', () => { + // Pattern *.ts should NOT match file.tsx (dot is literal, not regex any-char) + expect(testMatchGlob(watchService, '/app/code.ts', '*.ts')).toBe(true); + expect(testMatchGlob(watchService, '/app/codexts', '*.ts')).toBe(false); + }); + + it('should escape plus literally', () => { + expect( + testMatchGlob(watchService, '/app/file+name.ts', 'file+name.ts') + ).toBe(true); + expect( + testMatchGlob(watchService, '/app/filename.ts', 'file+name.ts') + ).toBe(false); + }); + + it('should escape caret and dollar literally', () => { + expect(testMatchGlob(watchService, '/app/$file.ts', '$file.ts')).toBe( + true + ); + expect(testMatchGlob(watchService, '/app/^file.ts', '^file.ts')).toBe( + true + ); + }); + + it('should escape parentheses literally', () => { + expect( + testMatchGlob(watchService, '/app/file(1).ts', 'file(1).ts') + ).toBe(true); + expect(testMatchGlob(watchService, '/app/file1.ts', 'file(1).ts')).toBe( + false + ); + }); + + it('should escape brackets literally (prevent character classes)', () => { + // Pattern [a-z].ts should match literal "[a-z].ts", not "a.ts" through "z.ts" + expect(testMatchGlob(watchService, '/app/[a-z].ts', '[a-z].ts')).toBe( + true + ); + expect(testMatchGlob(watchService, '/app/a.ts', '[a-z].ts')).toBe( + false + ); + expect(testMatchGlob(watchService, '/app/m.ts', '[a-z].ts')).toBe( + false + ); + }); + + it('should escape pipe literally', () => { + expect(testMatchGlob(watchService, '/app/a|b.ts', 'a|b.ts')).toBe(true); + expect(testMatchGlob(watchService, '/app/a.ts', 'a|b.ts')).toBe(false); + }); + + it('should escape backslash literally', () => { + expect(testMatchGlob(watchService, '/app/file\\.ts', 'file\\.ts')).toBe( + true + ); + }); + }); + + describe('security - ReDoS prevention', () => { + it('should handle potentially malicious patterns safely', () => { + // These patterns should complete quickly, not cause catastrophic backtracking + const start = Date.now(); + + // Pattern that could cause ReDoS if not properly escaped + testMatchGlob( + watchService, + '/app/aaaaaaaaaaaaaaaaaaaaaaaaa.ts', + '*.ts' + ); + testMatchGlob(watchService, '/app/test.ts', '*.*.*.*'); + + const elapsed = Date.now() - start; + // Should complete in under 100ms, not hang + expect(elapsed).toBeLessThan(100); + }); + }); + + describe('edge cases', () => { + it('should match files in any directory', () => { + expect(testMatchGlob(watchService, '/a/b/c/d/file.ts', '*.ts')).toBe( + true + ); + }); + + it('should handle empty pattern', () => { + expect(testMatchGlob(watchService, '/app/file.ts', '')).toBe(false); + }); + + it('should handle pattern with only wildcards', () => { + expect(testMatchGlob(watchService, '/app/anything', '*')).toBe(true); + }); + }); + }); + + describe('stopWatch', () => { + it('should return error for non-existent watch', async () => { + const result = await watchService.stopWatch('non-existent-watch-id'); + + expect(result.success).toBe(false); + if (!result.success) { + expect(result.error.code).toBe(ErrorCode.WATCH_NOT_FOUND); + expect(result.error.message).toContain('non-existent-watch-id'); + } + }); + }); + + describe('getActiveWatches', () => { + it('should return empty array initially', () => { + const watches = watchService.getActiveWatches(); + expect(watches).toEqual([]); + }); + }); + + describe('stopAllWatches', () => { + it('should return 0 when no watches active', async () => { + const count = await watchService.stopAllWatches(); + expect(count).toBe(0); + }); + }); + + describe('watchDirectory', () => { + it('should return error for non-existent path', async () => { + const result = await watchService.watchDirectory( + '/non/existent/path/12345' + ); + + expect(result.success).toBe(false); + if (!result.success) { + expect(result.error.code).toBe(ErrorCode.FILE_NOT_FOUND); + } + }); + }); + + describe('parseInotifyEvent', () => { + // Access private method for testing via type assertion + const testParseEvent = (service: WatchService, line: string) => { + return (service as unknown as WatchServiceTestAccessor).parseInotifyEvent( + line + ); + }; + + it('should parse CREATE event', () => { + const result = testParseEvent(watchService, 'CREATE|/app/file.ts|'); + expect(result).toEqual({ + eventType: 'create', + path: '/app/file.ts', + isDirectory: false + }); + }); + + it('should parse CREATE,ISDIR event', () => { + const result = testParseEvent( + watchService, + 'CREATE,ISDIR|/app/newdir|ISDIR' + ); + expect(result).toEqual({ + eventType: 'create', + path: '/app/newdir', + isDirectory: true + }); + }); + + it('should parse CREATE,ISDIR with colon-separated flags from %:e format', () => { + // This is the actual output format from inotifywait with --format '%e|%w%f|%:e' + const result = testParseEvent( + watchService, + 'CREATE,ISDIR|/app/newdir|CREATE:ISDIR' + ); + expect(result).toEqual({ + eventType: 'create', + path: '/app/newdir', + isDirectory: true + }); + }); + + it('should parse MODIFY event', () => { + const result = testParseEvent(watchService, 'MODIFY|/app/file.ts|'); + expect(result).toEqual({ + eventType: 'modify', + path: '/app/file.ts', + isDirectory: false + }); + }); + + it('should parse DELETE event', () => { + const result = testParseEvent(watchService, 'DELETE|/app/file.ts|'); + expect(result).toEqual({ + eventType: 'delete', + path: '/app/file.ts', + isDirectory: false + }); + }); + + it('should parse MOVED_FROM event', () => { + const result = testParseEvent(watchService, 'MOVED_FROM|/app/old.ts|'); + expect(result).toEqual({ + eventType: 'move_from', + path: '/app/old.ts', + isDirectory: false + }); + }); + + it('should parse MOVED_TO event', () => { + const result = testParseEvent(watchService, 'MOVED_TO|/app/new.ts|'); + expect(result).toEqual({ + eventType: 'move_to', + path: '/app/new.ts', + isDirectory: false + }); + }); + + it('should parse CLOSE_WRITE as modify', () => { + const result = testParseEvent(watchService, 'CLOSE_WRITE|/app/file.ts|'); + expect(result).toEqual({ + eventType: 'modify', + path: '/app/file.ts', + isDirectory: false + }); + }); + + it('should return null for malformed line', () => { + expect(testParseEvent(watchService, 'invalid')).toBeNull(); + expect(testParseEvent(watchService, '')).toBeNull(); + expect(testParseEvent(watchService, '|')).toBeNull(); + }); + + it('should return null for unknown event type', () => { + const result = testParseEvent( + watchService, + 'UNKNOWN_EVENT|/app/file.ts|' + ); + expect(result).toBeNull(); + }); + }); + + describe('buildInotifyArgs', () => { + // Access private method for testing via type assertion + const testBuildArgs = ( + service: WatchService, + path: string, + options: WatchRequest + ) => { + return (service as unknown as WatchServiceTestAccessor).buildInotifyArgs( + path, + options + ); + }; + + it('should include monitor mode and format', () => { + const args = testBuildArgs(watchService, '/app', { path: '/app' }); + expect(args).toContain('-m'); + expect(args).toContain('--format'); + expect(args).toContain('%e|%w%f'); + }); + + it('should include recursive flag by default', () => { + const args = testBuildArgs(watchService, '/app', { path: '/app' }); + expect(args).toContain('-r'); + }); + + it('should exclude recursive flag when disabled', () => { + const args = testBuildArgs(watchService, '/app', { + path: '/app', + recursive: false + }); + expect(args).not.toContain('-r'); + }); + + it('should include default excludes as combined regex pattern', () => { + const args = testBuildArgs(watchService, '/app', { path: '/app' }); + expect(args).toContain('--exclude'); + // inotifywait only supports a single --exclude, so patterns are combined with OR + const excludeIndex = args.indexOf('--exclude'); + expect(excludeIndex).toBeGreaterThan(-1); + const excludePattern = args[excludeIndex + 1]; + // Verify combined regex format: (^|/)pattern(/|$)|(^|/)pattern(/|$)|... + expect(excludePattern).toContain('(^|/)\\.git(/|$)'); + expect(excludePattern).toContain('(^|/)node_modules(/|$)'); + expect(excludePattern).toContain('(^|/)\\.DS_Store(/|$)'); + // Patterns are joined with | + expect(excludePattern.split('|').length).toBeGreaterThanOrEqual(3); + }); + + it('should convert custom excludes to combined regex pattern', () => { + const args = testBuildArgs(watchService, '/app', { + path: '/app', + exclude: ['*.log', 'temp'] + }); + expect(args).toContain('--exclude'); + const excludeIndex = args.indexOf('--exclude'); + const excludePattern = args[excludeIndex + 1]; + // Patterns are escaped and wrapped in regex anchors, combined with OR + expect(excludePattern).toContain('(^|/)\\*\\.log(/|$)'); + expect(excludePattern).toContain('(^|/)temp(/|$)'); + }); + + it('should add path as last argument', () => { + const args = testBuildArgs(watchService, '/app', { path: '/app' }); + expect(args[args.length - 1]).toBe('/app'); + }); + }); +}); diff --git a/packages/sandbox/Dockerfile b/packages/sandbox/Dockerfile index 2caaf741..4606035f 100644 --- a/packages/sandbox/Dockerfile +++ b/packages/sandbox/Dockerfile @@ -105,6 +105,7 @@ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \ apt-get update && apt-get install -y --no-install-recommends \ s3fs fuse \ ca-certificates curl wget procps git unzip zip jq file \ + inotify-tools \ libssl3 zlib1g libbz2-1.0 libreadline8 libsqlite3-0 \ libncursesw6 libtinfo6 libxml2 libxmlsec1 libffi8 liblzma5 libtk8.6 && \ update-ca-certificates diff --git a/packages/sandbox/src/clients/index.ts b/packages/sandbox/src/clients/index.ts index 83b636f4..3b6e5a01 100644 --- a/packages/sandbox/src/clients/index.ts +++ b/packages/sandbox/src/clients/index.ts @@ -16,6 +16,7 @@ export { InterpreterClient } from './interpreter-client'; export { PortClient } from './port-client'; export { ProcessClient } from './process-client'; export { UtilityClient } from './utility-client'; +export { WatchClient } from './watch-client'; // ============================================================================= // Transport layer @@ -38,6 +39,7 @@ export { // Client types and interfaces // ============================================================================= +export type { WatchRequest } from '@repo/shared'; // Command client types export type { ExecuteRequest, ExecuteResponse } from './command-client'; // File client types @@ -79,7 +81,6 @@ export type { ResponseHandler, SessionRequest } from './types'; - // Utility client types export type { CommandsResponse, @@ -90,3 +91,5 @@ export type { PingResponse, VersionResponse } from './utility-client'; +// Watch client types +export type { WatchListResponse } from './watch-client'; diff --git a/packages/sandbox/src/clients/sandbox-client.ts b/packages/sandbox/src/clients/sandbox-client.ts index 67aca6f1..8b4dce73 100644 --- a/packages/sandbox/src/clients/sandbox-client.ts +++ b/packages/sandbox/src/clients/sandbox-client.ts @@ -11,6 +11,7 @@ import { } from './transport'; import type { HttpClientOptions } from './types'; import { UtilityClient } from './utility-client'; +import { WatchClient } from './watch-client'; /** * Main sandbox client that composes all domain-specific clients @@ -30,6 +31,7 @@ export class SandboxClient { public readonly git: GitClient; public readonly interpreter: InterpreterClient; public readonly utils: UtilityClient; + public readonly watch: WatchClient; private transport: ITransport | null = null; @@ -62,6 +64,7 @@ export class SandboxClient { this.git = new GitClient(clientOptions); this.interpreter = new InterpreterClient(clientOptions); this.utils = new UtilityClient(clientOptions); + this.watch = new WatchClient(clientOptions); } /** diff --git a/packages/sandbox/src/clients/transport/ws-transport.ts b/packages/sandbox/src/clients/transport/ws-transport.ts index ec5b0620..a5b32589 100644 --- a/packages/sandbox/src/clients/transport/ws-transport.ts +++ b/packages/sandbox/src/clients/transport/ws-transport.ts @@ -21,6 +21,8 @@ interface PendingRequest { streamController?: ReadableStreamDefaultController; isStreaming: boolean; timeoutId?: ReturnType; + /** Called when first stream chunk is received (for deferred stream return) */ + onFirstChunk?: () => void; } /** @@ -348,6 +350,10 @@ export class WebSocketTransport extends BaseTransport { * * The stream will receive data chunks as they arrive over the WebSocket. * Format matches SSE for compatibility with existing streaming code. + * + * This method waits for the first message before returning. If the server + * responds with an error (non-streaming response), it throws immediately + * rather than returning a stream that will error later. */ private async requestStream( method: WSMethod, @@ -365,58 +371,104 @@ export class WebSocketTransport extends BaseTransport { body }; - return new ReadableStream({ - start: (controller) => { - const timeoutMs = this.config.requestTimeoutMs ?? 120000; - const timeoutId = setTimeout(() => { + // We need to wait for the first message to determine if this is a streaming + // response or an immediate error. This prevents returning a stream that will + // error on first read. + return new Promise((resolveStream, rejectStream) => { + let streamController: ReadableStreamDefaultController; + let firstMessageReceived = false; + + const timeoutMs = this.config.requestTimeoutMs ?? 120000; + const timeoutId = setTimeout(() => { + this.pendingRequests.delete(id); + const error = new Error( + `Stream timeout after ${timeoutMs}ms: ${method} ${path}` + ); + if (firstMessageReceived) { + streamController?.error(error); + } else { + rejectStream(error); + } + }, timeoutMs); + + // Create the stream but don't return it until we get the first message + const stream = new ReadableStream({ + start: (controller) => { + streamController = controller; + }, + cancel: () => { + const pending = this.pendingRequests.get(id); + if (pending?.timeoutId) { + clearTimeout(pending.timeoutId); + } + this.pendingRequests.delete(id); + } + }); + + this.pendingRequests.set(id, { + resolve: (response: WSResponse) => { + clearTimeout(timeoutId); this.pendingRequests.delete(id); - controller.error( - new Error(`Stream timeout after ${timeoutMs}ms: ${method} ${path}`) - ); - }, timeoutMs); - this.pendingRequests.set(id, { - resolve: (response: WSResponse) => { - clearTimeout(timeoutId); - this.pendingRequests.delete(id); - // Final response - close the stream + if (!firstMessageReceived) { + // First message is a final response (not streaming) - this is an error case + firstMessageReceived = true; if (response.status >= 400) { - controller.error( + rejectStream( new Error( `Stream error: ${response.status} - ${JSON.stringify(response.body)}` ) ); } else { - controller.close(); + // Successful non-streaming response - close immediately + streamController?.close(); + resolveStream(stream); } - }, - reject: (error: Error) => { - clearTimeout(timeoutId); - this.pendingRequests.delete(id); - controller.error(error); - }, - streamController: controller, - isStreaming: true, - timeoutId - }); - - try { - this.send(request); - } catch (error) { + } else { + // Stream was already returned, now closing + if (response.status >= 400) { + streamController?.error( + new Error( + `Stream error: ${response.status} - ${JSON.stringify(response.body)}` + ) + ); + } else { + streamController?.close(); + } + } + }, + reject: (error: Error) => { clearTimeout(timeoutId); this.pendingRequests.delete(id); - controller.error( - error instanceof Error ? error : new Error(String(error)) - ); - } - }, - cancel: () => { - const pending = this.pendingRequests.get(id); - if (pending?.timeoutId) { - clearTimeout(pending.timeoutId); + if (firstMessageReceived) { + streamController?.error(error); + } else { + rejectStream(error); + } + }, + streamController: undefined, // Set after first chunk + isStreaming: true, + timeoutId, + // Custom handler for first stream chunk + onFirstChunk: () => { + if (!firstMessageReceived) { + firstMessageReceived = true; + // Update the pending request with the actual controller + const pending = this.pendingRequests.get(id); + if (pending) { + pending.streamController = streamController; + } + resolveStream(stream); + } } + }); + + try { + this.send(request); + } catch (error) { + clearTimeout(timeoutId); this.pendingRequests.delete(id); - // Could send a cancel message to server if needed + rejectStream(error instanceof Error ? error : new Error(String(error))); } }); } @@ -490,13 +542,27 @@ export class WebSocketTransport extends BaseTransport { */ private handleStreamChunk(chunk: WSStreamChunk): void { const pending = this.pendingRequests.get(chunk.id); - if (!pending || !pending.streamController) { + if (!pending) { this.logger.warn('Received stream chunk for unknown request', { id: chunk.id }); return; } + // Call onFirstChunk if this is the first chunk (triggers stream return) + if (pending.onFirstChunk) { + pending.onFirstChunk(); + pending.onFirstChunk = undefined; // Only call once + } + + // streamController may not be set yet if onFirstChunk just resolved + if (!pending.streamController) { + this.logger.warn('Stream chunk received but controller not ready', { + id: chunk.id + }); + return; + } + // Convert to SSE format for compatibility with existing parsers const encoder = new TextEncoder(); let sseData: string; diff --git a/packages/sandbox/src/clients/watch-client.ts b/packages/sandbox/src/clients/watch-client.ts new file mode 100644 index 00000000..3bdf116e --- /dev/null +++ b/packages/sandbox/src/clients/watch-client.ts @@ -0,0 +1,78 @@ +import type { WatchRequest, WatchStopResult } from '@repo/shared'; +import { BaseHttpClient } from './base-client'; + +/** + * Response for listing active watches + */ +export interface WatchListResponse { + success: boolean; + watches: Array<{ + id: string; + path: string; + startedAt: string; + }>; + count: number; + timestamp: string; +} + +/** + * Client for file watch operations + * Uses inotify under the hood for native filesystem event notifications + * + * @internal This client is used internally by the SDK. + * Users should use `sandbox.files.watch()` instead. + */ +export class WatchClient extends BaseHttpClient { + /** + * Start watching a directory for changes + * Returns a stream of SSE events for file changes + * + * @param request - Watch request with path and options + */ + async watch(request: WatchRequest): Promise> { + try { + // Use doStreamFetch which handles both WebSocket and HTTP streaming + const stream = await this.doStreamFetch('/api/watch', request); + + this.logSuccess('File watch started', request.path); + return stream; + } catch (error) { + this.logError('watch', error); + throw error; + } + } + + /** + * Stop a specific watch by ID + * + * @param watchId - The watch ID returned in the 'watching' event + */ + async stopWatch(watchId: string): Promise { + try { + const response = await this.post('/api/watch/stop', { + watchId + }); + + this.logSuccess('Watch stopped', watchId); + return response; + } catch (error) { + this.logError('stopWatch', error); + throw error; + } + } + + /** + * List all active watches + */ + async listWatches(): Promise { + try { + const response = await this.get('/api/watch/list'); + + this.logSuccess('Watches listed', `${response.count} active`); + return response; + } catch (error) { + this.logError('listWatches', error); + throw error; + } + } +} diff --git a/packages/sandbox/src/file-watch.ts b/packages/sandbox/src/file-watch.ts new file mode 100644 index 00000000..a741924c --- /dev/null +++ b/packages/sandbox/src/file-watch.ts @@ -0,0 +1,279 @@ +import type { + createLogger, + FileWatchSSEEvent, + WatchEvent, + WatchEventType, + WatchHandle +} from '@repo/shared'; +import type { SandboxClient } from './clients'; + +/** Watch lifecycle state */ +type WatchState = 'establishing' | 'active' | 'stopped'; + +/** + * Encapsulates the entire file watch lifecycle with a single-loop state machine. + * + * States: + * establishing -> active -> stopped + * + * The same read loop handles both establishment and event processing, + * transitioning state when the 'watching' confirmation arrives. + * + * @internal This class is not part of the public API. + */ +export class FileWatch implements WatchHandle { + readonly path: string; + + private readonly abortController = new AbortController(); + private readonly reader: ReadableStreamDefaultReader; + private readonly decoder = new TextDecoder(); + private readonly logger: ReturnType; + private readonly client: SandboxClient; + private readonly onEvent?: (event: WatchEvent) => void; + private readonly onError?: (error: Error) => void; + + private state: WatchState = 'establishing'; + private watchId = ''; + private buffer = ''; + private loopPromise: Promise; + + // Resolver for the establishment promise - called when state transitions + private establishedResolve?: () => void; + private establishedReject?: (error: Error) => void; + + private constructor( + stream: ReadableStream, + path: string, + client: SandboxClient, + logger: ReturnType, + externalSignal?: AbortSignal, + onEvent?: (event: WatchEvent) => void, + onError?: (error: Error) => void + ) { + this.path = path; + this.reader = stream.getReader(); + this.client = client; + this.logger = logger; + this.onEvent = onEvent; + this.onError = onError; + + // Link external abort signal + if (externalSignal) { + if (externalSignal.aborted) { + this.abortController.abort(); + } else { + externalSignal.addEventListener( + 'abort', + () => this.abortController.abort(), + { once: true } + ); + } + } + + // Start the single event loop + this.loopPromise = this.runLoop(); + } + + get id(): string { + return this.watchId; + } + + /** + * Creates a FileWatch, waiting for the watch to be established before returning. + * + * @throws Error if watch cannot be established + */ + static async create( + stream: ReadableStream, + path: string, + client: SandboxClient, + logger: ReturnType, + options?: { + signal?: AbortSignal; + onEvent?: (event: WatchEvent) => void; + onError?: (error: Error) => void; + } + ): Promise { + const watch = new FileWatch( + stream, + path, + client, + logger, + options?.signal, + options?.onEvent, + options?.onError + ); + + // Wait for establishment or failure + await watch.established(); + return watch; + } + + /** + * Returns a promise that resolves when watch is established, or rejects on failure. + */ + private established(): Promise { + // Already established + if (this.state === 'active') { + return Promise.resolve(); + } + // Already failed + if (this.state === 'stopped') { + return Promise.reject(new Error('Watch failed to establish')); + } + // Wait for state transition + return new Promise((resolve, reject) => { + this.establishedResolve = resolve; + this.establishedReject = reject; + }); + } + + /** + * Single event loop handling both establishment and event processing. + */ + private async runLoop(): Promise { + const signal = this.abortController.signal; + + try { + while (!signal.aborted) { + const { done, value } = await this.reader.read(); + if (done) { + if (this.state === 'establishing') { + throw new Error('Stream ended before watch was established'); + } + break; + } + + this.buffer += this.decoder.decode(value, { stream: true }); + const lines = this.buffer.split('\n'); + this.buffer = lines.pop() || ''; + + for (const line of lines) { + if (signal.aborted) break; + if (line.startsWith('data: ')) { + this.handleEvent(line); + } + } + } + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + if (this.state === 'establishing') { + this.state = 'stopped'; + this.establishedReject?.(err); + } + this.onError?.(err); + throw err; + } finally { + this.state = 'stopped'; + await this.reader.cancel().catch(() => {}); + } + } + + /** + * Type guard for FileWatchSSEEvent + */ + private isFileWatchSSEEvent(value: unknown): value is FileWatchSSEEvent { + if (typeof value !== 'object' || value === null) return false; + const obj = value as Record; + if (typeof obj.type !== 'string') return false; + return ['watching', 'event', 'error', 'stopped'].includes(obj.type); + } + + /** + * Handles a single SSE event based on current state. + */ + private handleEvent(line: string): void { + let parsed: unknown; + try { + parsed = JSON.parse(line.slice(6)); + } catch { + return; // Ignore malformed JSON + } + + if (!this.isFileWatchSSEEvent(parsed)) { + return; // Ignore invalid event structure + } + + const event = parsed; + + switch (event.type) { + case 'watching': + if (this.state === 'establishing') { + this.watchId = event.watchId; + this.state = 'active'; + this.establishedResolve?.(); + } + break; + + case 'event': + if (this.state === 'active') { + this.onEvent?.({ + type: this.mapEventType(event.eventType), + path: event.path, + isDirectory: event.isDirectory + }); + } + break; + + case 'error': { + const error = new Error(event.error); + if (this.state === 'establishing') { + this.state = 'stopped'; + this.abortController.abort(); + this.establishedReject?.(error); + } else { + this.logger.error('Watch error from server', error); + this.onError?.(error); + } + break; + } + + case 'stopped': + this.abortController.abort(); + break; + } + } + + private mapEventType( + sseType: 'create' | 'modify' | 'delete' | 'move_from' | 'move_to' | 'attrib' + ): WatchEventType { + switch (sseType) { + case 'create': + return 'create'; + case 'modify': + case 'attrib': + return 'modify'; + case 'delete': + return 'delete'; + case 'move_from': + case 'move_to': + return 'rename'; + } + } + + /** + * Stops watching and releases all resources. + * Safe to call multiple times. Waits for full cleanup. + */ + async stop(): Promise { + if (this.state === 'stopped') { + await this.loopPromise.catch(() => {}); + return; + } + + this.abortController.abort(); + + if (this.watchId) { + try { + await this.client.watch.stopWatch(this.watchId); + } catch (error) { + this.logger.warn('Failed to stop watch on server', { + watchId: this.watchId, + error + }); + } + } + + await this.loopPromise.catch(() => {}); + } +} diff --git a/packages/sandbox/src/index.ts b/packages/sandbox/src/index.ts index ea964d0e..93efee6e 100644 --- a/packages/sandbox/src/index.ts +++ b/packages/sandbox/src/index.ts @@ -43,7 +43,12 @@ export type { StreamOptions, // Process readiness types WaitForLogResult, - WaitForPortOptions + WaitForPortOptions, + // File watch types + WatchEvent, + WatchEventType, + WatchHandle, + WatchOptions } from '@repo/shared'; // Export type guards for runtime validation export { isExecResult, isProcess, isProcessStatus } from '@repo/shared'; diff --git a/packages/sandbox/src/opencode/opencode.ts b/packages/sandbox/src/opencode/opencode.ts index f7626428..446d0ba6 100644 --- a/packages/sandbox/src/opencode/opencode.ts +++ b/packages/sandbox/src/opencode/opencode.ts @@ -35,7 +35,7 @@ async function ensureSdkLoaded(): Promise { } catch { throw new Error( '@opencode-ai/sdk is required for OpenCode integration. ' + - 'Install it with: npm install @opencode-ai/sdk' + 'Install it with: npm install @opencode-ai/sdk' ); } } diff --git a/packages/sandbox/src/sandbox.ts b/packages/sandbox/src/sandbox.ts index 1f559eb9..7720f65d 100644 --- a/packages/sandbox/src/sandbox.ts +++ b/packages/sandbox/src/sandbox.ts @@ -22,7 +22,9 @@ import type { StreamOptions, WaitForExitResult, WaitForLogResult, - WaitForPortOptions + WaitForPortOptions, + WatchHandle, + WatchOptions } from '@repo/shared'; import { createLogger, @@ -43,6 +45,7 @@ import { ProcessReadyTimeoutError, SessionAlreadyExistsError } from './errors'; +import { FileWatch } from './file-watch'; import { CodeInterpreter } from './interpreter'; import { isLocalhostPattern } from './request-handler'; import { SecurityError, sanitizeSandboxId, validatePort } from './security'; @@ -2112,6 +2115,102 @@ export class Sandbox extends Container implements ISandbox { return this.client.files.exists(path, session); } + /** + * Watch a directory for file system changes using native inotify + * + * Returns a FileWatcher that emits events for file changes (create, modify, delete, move). + * The watcher uses inotify under the hood for efficient, real-time notifications. + * + * @param path - Path to watch (absolute or relative to /workspace) + * @param options - Watch options + * @returns FileWatcher instance for consuming events + * + * @example + * ```typescript + * // Watch for all changes in src directory + * const watcher = await sandbox.watch('/app/src', { + * recursive: true, + * onEvent: (event) => console.log(`${event.type}: ${event.path}`), + * onError: (error) => console.error('Watch error:', error) + * }); + * + * // With AbortController for cancellation + * const controller = new AbortController(); + * const watcher = await sandbox.watch('/app', { + * signal: controller.signal, + * onEvent: (e) => console.log(e) + * }); + * // Later: controller.abort(); + * + * // Stop watching when done + * await watcher.stop(); + * ``` + */ + async watch( + path: string, + options?: Omit + ): Promise { + const sessionId = await this.ensureDefaultSession(); + const stream = await this.client.watch.watch({ + path, + sessionId, + recursive: options?.recursive, + include: options?.include, + exclude: options?.exclude + }); + + return FileWatch.create(stream, path, this.client, this.logger, { + signal: options?.signal, + onEvent: options?.onEvent, + onError: options?.onError + }); + } + + /** + * Get raw SSE stream for file watching. + * This is a lower-level API for testing and advanced use cases. + * Most users should use watch() instead. + * + * @internal + */ + async watchStream( + path: string, + options?: { + recursive?: boolean; + include?: string[]; + exclude?: string[]; + } + ): Promise> { + const sessionId = await this.ensureDefaultSession(); + return this.client.watch.watch({ + path, + sessionId, + recursive: options?.recursive, + include: options?.include, + exclude: options?.exclude + }); + } + + /** + * Stop a watch by ID. + * @internal + */ + async stopWatch(watchId: string): Promise<{ success: boolean }> { + return this.client.watch.stopWatch(watchId); + } + + /** + * List all active watches. + * @internal + */ + async listWatches(): Promise<{ + success: boolean; + watches: Array<{ id: string; path: string; startedAt: string }>; + count: number; + }> { + return this.client.watch.listWatches(); + } + /** * Expose a port and get a preview URL for accessing services running in the sandbox * diff --git a/packages/shared/src/errors/codes.ts b/packages/shared/src/errors/codes.ts index 9dbc12a7..3ffc46eb 100644 --- a/packages/shared/src/errors/codes.ts +++ b/packages/shared/src/errors/codes.ts @@ -108,6 +108,12 @@ export const ErrorCode = { PROCESS_READY_TIMEOUT: 'PROCESS_READY_TIMEOUT', PROCESS_EXITED_BEFORE_READY: 'PROCESS_EXITED_BEFORE_READY', + // File Watch Errors (404) + WATCH_NOT_FOUND: 'WATCH_NOT_FOUND', + + // File Watch Errors (500) + WATCH_START_ERROR: 'WATCH_START_ERROR', + // Validation Errors (400) VALIDATION_FAILED: 'VALIDATION_FAILED', diff --git a/packages/shared/src/errors/status-map.ts b/packages/shared/src/errors/status-map.ts index 68b4e000..0b2bd6db 100644 --- a/packages/shared/src/errors/status-map.ts +++ b/packages/shared/src/errors/status-map.ts @@ -13,6 +13,7 @@ export const ERROR_STATUS_MAP: Record = { [ErrorCode.GIT_REPOSITORY_NOT_FOUND]: 404, [ErrorCode.GIT_BRANCH_NOT_FOUND]: 404, [ErrorCode.CONTEXT_NOT_FOUND]: 404, + [ErrorCode.WATCH_NOT_FOUND]: 404, // 400 Bad Request [ErrorCode.IS_DIRECTORY]: 400, @@ -75,6 +76,7 @@ export const ERROR_STATUS_MAP: Record = { [ErrorCode.CODE_EXECUTION_ERROR]: 500, [ErrorCode.BUCKET_MOUNT_ERROR]: 500, [ErrorCode.S3FS_MOUNT_ERROR]: 500, + [ErrorCode.WATCH_START_ERROR]: 500, [ErrorCode.UNKNOWN_ERROR]: 500, [ErrorCode.INTERNAL_ERROR]: 500 }; diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 92413a6c..c4cdf6e3 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -73,6 +73,9 @@ export type { FileInfo, FileMetadata, FileStreamEvent, + // File watch types + FileWatchEventType, + FileWatchSSEEvent, GitCheckoutResult, // Miscellaneous result types HealthCheckResult, @@ -118,6 +121,16 @@ export type { WaitForExitResult, WaitForLogResult, WaitForPortOptions, + // File watch types (user-facing) + WatchErrorCallback, + WatchEvent, + WatchEventCallback, + WatchEventType, + WatchHandle, + WatchOptions, + // File watch types + WatchRequest, + WatchStopResult, WriteFileResult } from './types.js'; export { diff --git a/packages/shared/src/types.ts b/packages/shared/src/types.ts index 1a70d9c4..e12dd75f 100644 --- a/packages/shared/src/types.ts +++ b/packages/shared/src/types.ts @@ -675,6 +675,144 @@ export interface FileMetadata { */ export type FileChunk = string | Uint8Array; +// File Watch Types + +/** + * File system event types + */ +export type WatchEventType = 'create' | 'modify' | 'delete' | 'rename'; + +/** + * A file system change event + */ +export interface WatchEvent { + /** The type of change that occurred */ + type: WatchEventType; + /** Absolute path to the file or directory that changed */ + path: string; + /** Whether the changed path is a directory */ + isDirectory: boolean; +} + +/** + * Callback for file watch events + */ +export type WatchEventCallback = (event: WatchEvent) => void; + +/** + * Callback for watch errors + */ +export type WatchErrorCallback = (error: Error) => void; + +/** + * Options for watching a directory + */ +export interface WatchOptions { + /** + * Watch subdirectories recursively + * @default true + */ + recursive?: boolean; + + /** + * Glob patterns to include (e.g., '*.ts', '*.js') + * If not specified, all files are included + */ + include?: string[]; + + /** + * Glob patterns to exclude (e.g., 'node_modules', '.git') + * @default ['.git', 'node_modules', '.DS_Store'] + */ + exclude?: string[]; + + /** + * AbortSignal to cancel the watch + */ + signal?: AbortSignal; + + /** + * Callback for file change events + */ + onEvent?: WatchEventCallback; + + /** + * Callback for errors (e.g., watch process died) + */ + onError?: WatchErrorCallback; +} + +/** + * Handle returned from watch() - use to stop watching + */ +export interface WatchHandle { + /** Stop watching and clean up resources */ + stop(): Promise; + /** The watch ID (for debugging) */ + readonly id: string; + /** The path being watched */ + readonly path: string; +} + +// Internal types for SSE protocol (not user-facing) + +/** + * @internal SSE event types for container communication + */ +export type FileWatchEventType = + | 'create' + | 'modify' + | 'delete' + | 'move_from' + | 'move_to' + | 'attrib'; + +/** + * @internal Request body for starting a file watch + */ +export interface WatchRequest { + path: string; + recursive?: boolean; + events?: FileWatchEventType[]; + include?: string[]; + exclude?: string[]; + sessionId?: string; +} + +/** + * @internal SSE events for file watching + */ +export type FileWatchSSEEvent = + | { + type: 'watching'; + path: string; + watchId: string; + } + | { + type: 'event'; + eventType: FileWatchEventType; + path: string; + isDirectory: boolean; + timestamp: string; + } + | { + type: 'error'; + error: string; + } + | { + type: 'stopped'; + reason: string; + }; + +/** + * @internal Result from stopping a watch + */ +export interface WatchStopResult { + success: boolean; + watchId: string; + timestamp: string; +} + // Process management result types export interface ProcessStartResult { success: boolean; diff --git a/tests/e2e/file-watch-workflow.test.ts b/tests/e2e/file-watch-workflow.test.ts new file mode 100644 index 00000000..3573ce8b --- /dev/null +++ b/tests/e2e/file-watch-workflow.test.ts @@ -0,0 +1,510 @@ +/** + * File Watch Integration Tests + * + * Tests the file watching feature end-to-end: + * - Starting a watch and receiving the 'watching' confirmation + * - Detecting file creation, modification, and deletion + * - Stopping a watch cleanly + * - Filtering with include patterns + * - Recursive vs non-recursive watching + */ + +import type { FileWatchSSEEvent } from '@repo/shared'; +import { beforeAll, describe, expect, test } from 'vitest'; +import { + createUniqueSession, + getSharedSandbox, + uniqueTestPath +} from './helpers/global-sandbox'; + +describe('File Watch Workflow', () => { + let workerUrl: string; + let headers: Record; + let testDir: string; + + beforeAll(async () => { + const sandbox = await getSharedSandbox(); + workerUrl = sandbox.workerUrl; + headers = sandbox.createHeaders(createUniqueSession()); + }, 120000); + + /** + * Helper to start a watch that allows performing actions after the watch is established. + */ + async function watchWithActions( + path: string, + options: { + recursive?: boolean; + include?: string[]; + timeoutMs?: number; + stopAfterEvents?: number; + } = {}, + actions: () => Promise + ): Promise<{ + events: FileWatchSSEEvent[]; + watchId: string | null; + actionResult: T; + }> { + const { timeoutMs = 5000, stopAfterEvents = 20 } = options; + + const response = await fetch(`${workerUrl}/api/watch`, { + method: 'POST', + headers, + body: JSON.stringify({ + path, + recursive: options.recursive ?? true, + include: options.include + }) + }); + + if (!response.ok || !response.body) { + throw new Error(`Watch request failed: ${response.status}`); + } + + const events: FileWatchSSEEvent[] = []; + let watchId: string | null = null; + let actionResult: T; + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + let actionsExecuted = false; + + const timeout = setTimeout(() => reader.cancel(), timeoutMs); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (line.startsWith('data: ')) { + const event = JSON.parse(line.slice(6)) as FileWatchSSEEvent; + events.push(event); + + if (event.type === 'watching') { + watchId = event.watchId; + // Execute actions after the watch is confirmed ready + if (!actionsExecuted) { + actionsExecuted = true; + // inotifywait outputs "Setting up watches..." to stderr before it's ready + // We need to give it time to actually establish all watches before performing actions + await new Promise((r) => setTimeout(r, 1500)); + actionResult = await actions(); + // Give events time to propagate through the SSE stream + // Longer delay for WebSocket transport which has additional buffering + await new Promise((r) => setTimeout(r, 3000)); + } + } + + if ( + event.type === 'stopped' || + event.type === 'error' || + events.length >= stopAfterEvents + ) { + reader.cancel(); + break; + } + } + } + } + } catch (e) { + // Reader cancelled - expected + } finally { + clearTimeout(timeout); + } + + return { events, watchId, actionResult: actionResult! }; + } + + /** + * Helper to start a watch and collect events until stopped or timeout. + */ + async function watchAndCollect( + path: string, + options: { + recursive?: boolean; + include?: string[]; + timeoutMs?: number; + stopAfterEvents?: number; + } = {} + ): Promise<{ events: FileWatchSSEEvent[]; watchId: string | null }> { + const result = await watchWithActions(path, options, async () => {}); + return { events: result.events, watchId: result.watchId }; + } + + /** + * Helper to create a file via the API. + */ + async function createFile(path: string, content: string): Promise { + await fetch(`${workerUrl}/api/file/write`, { + method: 'POST', + headers, + body: JSON.stringify({ path, content }) + }); + } + + /** + * Helper to create a directory via the API. + */ + async function createDir(path: string): Promise { + await fetch(`${workerUrl}/api/file/mkdir`, { + method: 'POST', + headers, + body: JSON.stringify({ path, recursive: true }) + }); + } + + /** + * Helper to delete a file via the API. + */ + async function deleteFile(path: string): Promise { + await fetch(`${workerUrl}/api/file/delete`, { + method: 'DELETE', + headers, + body: JSON.stringify({ path }) + }); + } + + test('should establish watch and receive watching event', async () => { + testDir = uniqueTestPath('watch-establish'); + await createDir(testDir); + + const { events, watchId } = await watchAndCollect(testDir, { + timeoutMs: 2000, + stopAfterEvents: 1 + }); + + expect(events.length).toBeGreaterThanOrEqual(1); + expect(events[0].type).toBe('watching'); + expect(watchId).toBeTruthy(); + + if (events[0].type === 'watching') { + expect(events[0].path).toBe(testDir); + } + }, 30000); + + test('should detect file creation', async () => { + testDir = uniqueTestPath('watch-create'); + await createDir(testDir); + + // Start watch and create file after watch is confirmed ready + // timeoutMs must exceed: ~1s setup + 1500ms pre-action wait + action time + 3000ms post-action wait + const { events } = await watchWithActions( + testDir, + { timeoutMs: 8000, stopAfterEvents: 5 }, + async () => { + await createFile(`${testDir}/newfile.txt`, 'hello'); + } + ); + + console.log('[DEBUG] Collected events:', JSON.stringify(events, null, 2)); + + const createEvent = events.find( + (e) => e.type === 'event' && e.eventType === 'create' + ); + expect(createEvent).toBeDefined(); + + if (createEvent?.type === 'event') { + expect(createEvent.path).toContain('newfile.txt'); + } + }, 30000); + + test('should detect file modification', async () => { + testDir = uniqueTestPath('watch-modify'); + await createDir(testDir); + await createFile(`${testDir}/existing.txt`, 'initial'); + + // Start watch and modify file after watch is confirmed ready + // timeoutMs must exceed: ~1s setup + 1500ms pre-action wait + action time + 3000ms post-action wait + const { events } = await watchWithActions( + testDir, + { timeoutMs: 8000, stopAfterEvents: 5 }, + async () => { + await createFile(`${testDir}/existing.txt`, 'modified content'); + } + ); + + // Modification might show as 'modify' or 'create' depending on how editor writes + const modifyEvent = events.find( + (e) => + e.type === 'event' && + (e.eventType === 'modify' || e.eventType === 'create') && + e.path.includes('existing.txt') + ); + expect(modifyEvent).toBeDefined(); + }, 30000); + + test('should detect file deletion', async () => { + testDir = uniqueTestPath('watch-delete'); + await createDir(testDir); + await createFile(`${testDir}/todelete.txt`, 'delete me'); + + // Start watch and delete file after watch is confirmed ready + const { events } = await watchWithActions( + testDir, + { timeoutMs: 8000, stopAfterEvents: 5 }, + async () => { + // Small delay to ensure watch is fully ready + await new Promise((r) => setTimeout(r, 500)); + await deleteFile(`${testDir}/todelete.txt`); + } + ); + + console.log('[DEBUG] Delete test events:', JSON.stringify(events, null, 2)); + + const deleteEvent = events.find( + (e) => e.type === 'event' && e.eventType === 'delete' + ); + expect(deleteEvent).toBeDefined(); + + if (deleteEvent?.type === 'event') { + expect(deleteEvent.path).toContain('todelete.txt'); + } + }, 30000); + + test('should filter events with include pattern', async () => { + testDir = uniqueTestPath('watch-filter'); + await createDir(testDir); + + // Start watch and create files after watch is confirmed ready + const { events } = await watchWithActions( + testDir, + { include: ['*.ts'], timeoutMs: 10000, stopAfterEvents: 10 }, + async () => { + // Create both .ts and .js files + await createFile(`${testDir}/code.ts`, 'typescript'); + await createFile(`${testDir}/code.js`, 'javascript'); + await createFile(`${testDir}/another.ts`, 'more typescript'); + } + ); + + const fileEvents = events.filter((e) => e.type === 'event'); + + // Should only see .ts files + const tsEvents = fileEvents.filter( + (e) => e.type === 'event' && e.path.endsWith('.ts') + ); + const jsEvents = fileEvents.filter( + (e) => e.type === 'event' && e.path.endsWith('.js') + ); + + expect(tsEvents.length).toBeGreaterThan(0); + expect(jsEvents.length).toBe(0); + }, 30000); + + test('should stop watch via API', async () => { + testDir = uniqueTestPath('watch-stop'); + await createDir(testDir); + + // Start a watch + const response = await fetch(`${workerUrl}/api/watch`, { + method: 'POST', + headers, + body: JSON.stringify({ path: testDir }) + }); + + expect(response.body).toBeTruthy(); + if (!response.body) return; + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + + // Read until we get the watching event + let watchId: string | null = null; + while (!watchId) { + const { value } = await reader.read(); + const text = decoder.decode(value); + const match = text.match(/"watchId":"([^"]+)"/); + if (match) watchId = match[1]; + } + + expect(watchId).toBeTruthy(); + + // Stop the watch + const stopResponse = await fetch(`${workerUrl}/api/watch/stop`, { + method: 'POST', + headers, + body: JSON.stringify({ watchId }) + }); + + expect(stopResponse.ok).toBe(true); + const stopResult = (await stopResponse.json()) as { success: boolean }; + expect(stopResult.success).toBe(true); + + reader.cancel(); + }, 30000); + + test('should list active watches', async () => { + testDir = uniqueTestPath('watch-list'); + await createDir(testDir); + + // Start a watch + const response = await fetch(`${workerUrl}/api/watch`, { + method: 'POST', + headers, + body: JSON.stringify({ path: testDir }) + }); + + // Wait for watch to establish + expect(response.body).toBeTruthy(); + if (!response.body) return; + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let watchId: string | null = null; + + while (!watchId) { + const { value } = await reader.read(); + const text = decoder.decode(value); + const match = text.match(/"watchId":"([^"]+)"/); + if (match) watchId = match[1]; + } + + // List watches + const listResponse = await fetch(`${workerUrl}/api/watch/list`, { + method: 'GET', + headers + }); + + expect(listResponse.ok).toBe(true); + const listResult = (await listResponse.json()) as { + success: boolean; + watches: Array<{ id: string; path: string }>; + count: number; + }; + + expect(listResult.success).toBe(true); + expect(listResult.count).toBeGreaterThanOrEqual(1); + + const ourWatch = listResult.watches.find((w) => w.id === watchId); + expect(ourWatch).toBeDefined(); + expect(ourWatch?.path).toBe(testDir); + + // Cleanup + await fetch(`${workerUrl}/api/watch/stop`, { + method: 'POST', + headers, + body: JSON.stringify({ watchId }) + }); + + reader.cancel(); + }, 30000); + + test('should return error for non-existent path', async () => { + const response = await fetch(`${workerUrl}/api/watch`, { + method: 'POST', + headers, + body: JSON.stringify({ path: '/nonexistent/path/that/does/not/exist' }) + }); + + console.log( + '[DEBUG] Non-existent path response:', + response.status, + response.ok + ); + + // Should get an error response (either 4xx/5xx or error in body) + if (!response.ok) { + // Non-OK response is expected - pass + expect(response.ok).toBe(false); + // Verify we get meaningful error content + const body = await response.text(); + console.log('[DEBUG] Error response body:', body); + expect(body).toMatch(/error|not found|does not exist/i); + return; + } + + // If we get a 200, it should be an SSE stream with an error event + // Over WebSocket transport, errors may throw instead of returning data + if (response.body) { + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let text = ''; + let streamError: Error | null = null; + + // Read multiple chunks in case the error comes later + const timeout = setTimeout(() => reader.cancel(), 3000); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + text += decoder.decode(value, { stream: true }); + console.log('[DEBUG] Received SSE chunk:', text.substring(0, 500)); + // Check if we have an error indication + if (text.match(/error|not found|does not exist/i)) { + break; + } + } + } catch (e) { + // Stream error or cancelled - capture the error + streamError = e instanceof Error ? e : new Error(String(e)); + console.log('[DEBUG] Stream error:', streamError.message); + } finally { + clearTimeout(timeout); + reader.cancel().catch(() => {}); + } + + console.log('[DEBUG] Final SSE text:', text); + console.log('[DEBUG] Stream error:', streamError?.message); + + // Should contain an error message in either the text or the stream error + const errorContent = text || streamError?.message || ''; + expect(errorContent).toMatch(/error|not found|does not exist/i); + } + }, 30000); + + test('should exclude patterns from events', async () => { + testDir = uniqueTestPath('watch-exclude'); + await createDir(testDir); + await createDir(`${testDir}/node_modules`); + await createDir(`${testDir}/.git`); + + // Start watch and create files after watch is confirmed ready + const { events } = await watchWithActions( + testDir, + { timeoutMs: 12000, stopAfterEvents: 10 }, + async () => { + // Small delay to ensure watch is fully ready + await new Promise((r) => setTimeout(r, 500)); + // Create files in excluded and non-excluded directories + // Add delays between operations to avoid race conditions + await createFile(`${testDir}/app.ts`, 'app code'); + await new Promise((r) => setTimeout(r, 300)); + await createFile(`${testDir}/node_modules/dep.js`, 'dependency'); + await new Promise((r) => setTimeout(r, 300)); + await createFile(`${testDir}/.git/config`, 'git config'); + await new Promise((r) => setTimeout(r, 300)); + await createFile(`${testDir}/index.ts`, 'index'); + } + ); + + console.log( + '[DEBUG] Exclude test events:', + JSON.stringify(events, null, 2) + ); + + const fileEvents = events.filter((e) => e.type === 'event'); + + // Should see events for app.ts and index.ts + const appEvents = fileEvents.filter( + (e) => e.type === 'event' && e.path.includes('app.ts') + ); + expect(appEvents.length).toBeGreaterThan(0); + + // Should NOT see events for node_modules (default exclude) + const nodeModulesEvents = fileEvents.filter( + (e) => e.type === 'event' && e.path.includes('node_modules') + ); + expect(nodeModulesEvents.length).toBe(0); + + // Should NOT see events for .git (default exclude) + const gitEvents = fileEvents.filter( + (e) => e.type === 'event' && e.path.includes('.git') + ); + expect(gitEvents.length).toBe(0); + }, 30000); +}); diff --git a/tests/e2e/test-worker/index.ts b/tests/e2e/test-worker/index.ts index fb24b273..170bb449 100644 --- a/tests/e2e/test-worker/index.ts +++ b/tests/e2e/test-worker/index.ts @@ -810,6 +810,39 @@ console.log('Terminal server on port ' + port); }); } + // File Watch - Start watching (SSE stream) + // Note: watches are container-wide, not session-specific + if (url.pathname === '/api/watch' && request.method === 'POST') { + const stream = await sandbox.watchStream(body.path, { + recursive: body.recursive, + include: body.include, + exclude: body.exclude + }); + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive' + } + }); + } + + // File Watch - Stop watching + if (url.pathname === '/api/watch/stop' && request.method === 'POST') { + const result = await sandbox.stopWatch(body.watchId); + return new Response(JSON.stringify(result), { + headers: { 'Content-Type': 'application/json' } + }); + } + + // File Watch - List active watches + if (url.pathname === '/api/watch/list' && request.method === 'GET') { + const result = await sandbox.listWatches(); + return new Response(JSON.stringify(result), { + headers: { 'Content-Type': 'application/json' } + }); + } + // Cleanup endpoint - destroys the sandbox container // This is used by E2E tests to explicitly clean up after each test if (url.pathname === '/cleanup' && request.method === 'POST') {