Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
9268acf
feat: add file watching capabilities with inotify support
whoiskatrin Jan 2, 2026
bf0a692
Create empty-poets-serve.md
whoiskatrin Jan 2, 2026
87d89fb
Potential fix for code scanning alert no. 42: Incomplete string escap…
whoiskatrin Jan 2, 2026
a81a31e
fixes for claude review
whoiskatrin Jan 2, 2026
a878bbd
update tests to verify regex format for default and custom excludes
whoiskatrin Jan 2, 2026
d8171b0
Fix error handling and type safety in WatchService and FileWatch clas…
whoiskatrin Jan 3, 2026
775fe4e
Refactor WatchService tests to validate combined regex patterns for d…
whoiskatrin Jan 3, 2026
5f87bbe
Added timeouts for event propagation
whoiskatrin Jan 3, 2026
aaebe90
Refactored and cleaned
whoiskatrin Jan 14, 2026
2084ad8
Merge main into feature/file-watching
whoiskatrin Jan 14, 2026
c7b8556
Small ws transport related fixes
whoiskatrin Jan 14, 2026
a5cc961
Timing changes help account for the additional buffering
whoiskatrin Jan 14, 2026
70f7e6b
Fix WebSocket blocking issue for SSE streaming responses
whoiskatrin Jan 14, 2026
e7209f8
Add debug logging and stream tracking for WebSocket streaming
whoiskatrin Jan 14, 2026
f06d880
Acquire stream reader synchronously before async execution
whoiskatrin Jan 14, 2026
526cd80
Wait for inotifywait watches to be established before signaling ready
whoiskatrin Jan 14, 2026
2e8e7e8
Add timeout to waitForWatchesEstablished to prevent hanging
whoiskatrin Jan 14, 2026
fb3531a
Wait for first message before returning WebSocket stream
whoiskatrin Jan 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/empty-poets-serve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cloudflare/sandbox": patch
---

feat: add file watching capabilities with inotify support
8 changes: 8 additions & 0 deletions packages/sandbox-container/src/core/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { MiscHandler } from '../handlers/misc-handler';
import { PortHandler } from '../handlers/port-handler';
import { ProcessHandler } from '../handlers/process-handler';
import { SessionHandler } from '../handlers/session-handler';
import { WatchHandler } from '../handlers/watch-handler';
import { CorsMiddleware } from '../middleware/cors';
import { LoggingMiddleware } from '../middleware/logging';
import { SecurityServiceAdapter } from '../security/security-adapter';
Expand All @@ -19,6 +20,7 @@ import { InMemoryPortStore, PortService } from '../services/port-service';
import { ProcessService } from '../services/process-service';
import { ProcessStore } from '../services/process-store';
import { SessionManager } from '../services/session-manager';
import { WatchService } from '../services/watch-service';

export interface Dependencies {
// Services
Expand All @@ -27,6 +29,7 @@ export interface Dependencies {
portService: PortService;
gitService: GitService;
interpreterService: InterpreterService;
watchService: WatchService;

// Infrastructure
logger: Logger;
Expand All @@ -41,6 +44,7 @@ export interface Dependencies {
interpreterHandler: InterpreterHandler;
sessionHandler: SessionHandler;
miscHandler: MiscHandler;
watchHandler: WatchHandler;

// Middleware
corsMiddleware: CorsMiddleware;
Expand Down Expand Up @@ -112,6 +116,7 @@ export class Container {
sessionManager
);
const interpreterService = new InterpreterService(logger);
const watchService = new WatchService(logger);

// Initialize handlers
const sessionHandler = new SessionHandler(sessionManager, logger);
Expand All @@ -125,6 +130,7 @@ export class Container {
logger
);
const miscHandler = new MiscHandler(logger);
const watchHandler = new WatchHandler(watchService, logger);

// Initialize middleware
const corsMiddleware = new CorsMiddleware();
Expand All @@ -138,6 +144,7 @@ export class Container {
portService,
gitService,
interpreterService,
watchService,

// Infrastructure
logger,
Expand All @@ -152,6 +159,7 @@ export class Container {
interpreterHandler,
sessionHandler,
miscHandler,
watchHandler,

// Middleware
corsMiddleware,
Expand Down
120 changes: 120 additions & 0 deletions packages/sandbox-container/src/handlers/watch-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import type { Logger, WatchRequest, WatchStopResult } from '@repo/shared';
import { ErrorCode } from '@repo/shared/errors';
import type { RequestContext } from '../core/types';
import type { WatchService } from '../services/watch-service';
import { BaseHandler } from './base-handler';

/**
* Handler for file watch operations
*/
export class WatchHandler extends BaseHandler<Request, Response> {
constructor(
private watchService: WatchService,
logger: Logger
) {
super(logger);
}

async handle(request: Request, context: RequestContext): Promise<Response> {
const url = new URL(request.url);
const pathname = url.pathname;

switch (pathname) {
case '/api/watch':
return this.handleWatch(request, context);
case '/api/watch/stop':
return this.handleStopWatch(request, context);
case '/api/watch/list':
return this.handleListWatches(request, context);
default:
return this.createErrorResponse(
{
message: 'Invalid watch endpoint',
code: ErrorCode.UNKNOWN_ERROR
},
context
);
}
}

/**
* Start watching a directory
* Returns an SSE stream of file change events
*/
private async handleWatch(
request: Request,
context: RequestContext
): Promise<Response> {
const body = await this.parseRequestBody<WatchRequest>(request);

// Resolve path - if relative, resolve from /workspace
let watchPath = body.path;
if (!watchPath.startsWith('/')) {
watchPath = `/workspace/${watchPath}`;
}

const result = await this.watchService.watchDirectory(watchPath, {
...body,
path: watchPath
});

if (result.success) {
return new Response(result.data, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
...context.corsHeaders
}
});
} else {
return this.createErrorResponse(result.error, context);
}
}

/**
* Stop a specific watch by ID
*/
private async handleStopWatch(
request: Request,
context: RequestContext
): Promise<Response> {
const body = await this.parseRequestBody<{ watchId: string }>(request);

const result = await this.watchService.stopWatch(body.watchId);

if (result.success) {
const response: WatchStopResult = {
success: true,
watchId: body.watchId,
timestamp: new Date().toISOString()
};
return this.createTypedResponse(response, context);
} else {
return this.createErrorResponse(result.error, context);
}
}

/**
* List all active watches
*/
private async handleListWatches(
_request: Request,
context: RequestContext
): Promise<Response> {
const watches = this.watchService.getActiveWatches();

const response = {
success: true,
watches: watches.map((w) => ({
id: w.id,
path: w.path,
startedAt: w.startedAt.toISOString()
})),
count: watches.length,
timestamp: new Date().toISOString()
};

return this.createTypedResponse(response, context);
}
}
94 changes: 74 additions & 20 deletions packages/sandbox-container/src/handlers/ws-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ export interface WSData {
export class WebSocketAdapter {
private router: Router;
private logger: Logger;
/** Track active streaming responses to prevent garbage collection */
private activeStreams: Map<string, Promise<void>> = new Map();

constructor(router: Router, logger: Logger) {
this.router = router;
Expand Down Expand Up @@ -162,7 +164,29 @@ export class WebSocketAdapter {

if (isStreaming && httpResponse.body) {
// Handle SSE streaming response
await this.handleStreamingResponse(ws, request.id, httpResponse);
// CRITICAL: We must capture the Response body reader BEFORE the promise starts executing
// asynchronously. If we call getReader() inside handleStreamingResponse after an await,
// Bun's WebSocket handler may GC or invalidate the Response body when onMessage returns.
// By getting the reader synchronously here, we ensure the stream remains valid.
const reader =
httpResponse.body.getReader() as ReadableStreamDefaultReader<Uint8Array>;
const streamPromise = this.handleStreamingResponseWithReader(
ws,
request.id,
httpResponse.status,
reader
)
.catch((error: unknown) => {
this.logger.error(
'Error in streaming response',
error instanceof Error ? error : new Error(String(error)),
{ requestId: request.id }
);
})
.finally(() => {
this.activeStreams.delete(request.id);
});
this.activeStreams.set(request.id, streamPromise);
} else {
// Handle regular response
await this.handleRegularResponse(ws, request.id, httpResponse);
Expand Down Expand Up @@ -198,39 +222,65 @@ export class WebSocketAdapter {
}

/**
* Handle a streaming (SSE) HTTP response
* Handle a streaming (SSE) HTTP response with a pre-acquired reader
*
* This variant receives the reader instead of the Response, allowing the caller
* to acquire the reader synchronously before any await points. This is critical
* for WebSocket streaming because Bun's message handler may invalidate the
* Response body if the reader is acquired after the handler returns.
*/
private async handleStreamingResponse(
private async handleStreamingResponseWithReader(
ws: ServerWebSocket<WSData>,
requestId: string,
response: Response
status: number,
reader: ReadableStreamDefaultReader<Uint8Array>
): Promise<void> {
if (!response.body) {
this.sendError(ws, requestId, 'STREAM_ERROR', 'No response body', 500);
return;
}
this.logger.debug('Starting streaming response handler', { requestId });

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
// Track partial event state across chunks
let currentEvent: { event?: string; data: string[] } = { data: [] };
let chunkCount = 0;

try {
while (true) {
this.logger.debug('Waiting for stream chunk', {
requestId,
chunkCount
});
const { done, value } = await reader.read();

if (done) {
this.logger.debug('Stream ended', { requestId, chunkCount });
break;
}

chunkCount++;

// Decode chunk and add to buffer
buffer += decoder.decode(value, { stream: true });
const chunkText = decoder.decode(value, { stream: true });
this.logger.debug('Received stream chunk', {
requestId,
chunkCount,
chunkLength: chunkText.length,
chunkPreview: chunkText.substring(0, 100)
});
buffer += chunkText;

// Parse SSE events from buffer
const events = this.parseSSEEvents(buffer);
buffer = events.remaining;
// Parse SSE events from buffer, preserving partial event state
const result = this.parseSSEEvents(buffer, currentEvent);
buffer = result.remaining;
currentEvent = result.currentEvent;

this.logger.debug('Parsed SSE events', {
requestId,
eventCount: result.events.length,
remainingBuffer: result.remaining.length
});

// Send each parsed event as a stream chunk
for (const event of events.events) {
for (const event of result.events) {
const chunk: WSStreamChunk = {
type: 'stream',
id: requestId,
Expand All @@ -247,7 +297,7 @@ export class WebSocketAdapter {
const wsResponse: WSResponse = {
type: 'response',
id: requestId,
status: response.status,
status,
done: true
};
this.send(ws, wsResponse);
Expand All @@ -272,8 +322,8 @@ export class WebSocketAdapter {
/**
* Parse SSE events from a buffer
*
* Returns parsed events and any remaining unparsed content (incomplete lines
* waiting for more data from the next chunk).
* Returns parsed events, remaining unparsed content, and current partial event state.
* The currentEvent parameter allows preserving state across chunk boundaries.
*
* Note: This is a minimal SSE parser that only handles `event:` and `data:`
* fields - sufficient for our streaming handlers which only emit these.
Expand All @@ -282,12 +332,15 @@ export class WebSocketAdapter {
* - `retry:` field (reconnection timing hints)
* - Comment lines (starting with `:`)
*/
private parseSSEEvents(buffer: string): {
private parseSSEEvents(
buffer: string,
currentEvent: { event?: string; data: string[] } = { data: [] }
): {
events: Array<{ event?: string; data: string }>;
remaining: string;
currentEvent: { event?: string; data: string[] };
} {
const events: Array<{ event?: string; data: string }> = [];
let currentEvent: { event?: string; data: string[] } = { data: [] };
let i = 0;

while (i < buffer.length) {
Expand Down Expand Up @@ -317,7 +370,8 @@ export class WebSocketAdapter {

return {
events,
remaining: buffer.substring(i)
remaining: buffer.substring(i),
currentEvent
};
}

Expand Down
22 changes: 22 additions & 0 deletions packages/sandbox-container/src/routes/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,28 @@ export function setupRoutes(router: Router, container: Container): void {
middleware: [container.get('loggingMiddleware')]
});

// File watch routes
router.register({
method: 'POST',
path: '/api/watch',
handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx),
middleware: [container.get('loggingMiddleware')]
});

router.register({
method: 'POST',
path: '/api/watch/stop',
handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx),
middleware: [container.get('loggingMiddleware')]
});

router.register({
method: 'GET',
path: '/api/watch/list',
handler: async (req, ctx) => container.get('watchHandler').handle(req, ctx),
middleware: [container.get('loggingMiddleware')]
});

// Miscellaneous routes
router.register({
method: 'GET',
Expand Down
Loading
Loading