Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/adr/002-bridge-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

## Status

**In Progress** (Core infrastructure complete, bridge migration pending)
**Accepted** (Fully implemented)

### Implementation Progress

- ✅ Phase 1: SafeCodec (TypeScript + Python)
- ✅ Phase 2: Transport interface + ProcessIO, HttpIO, PyodideIO
- ✅ Phase 3: WorkerPool
- ✅ Phase 3: WorkerPool (PooledTransport)
- ✅ BridgeProtocol base class
- Phase 4: Bridge migration (NodeBridge, HttpBridge, PyodideBridge)
- Phase 5: Cleanup and deprecations
- Phase 4: Bridge migration (NodeBridge, HttpBridge, PyodideBridge)
- Phase 5: Cleanup and documentation

## Context

Expand Down
12 changes: 7 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ export { BridgeProtocol, type BridgeProtocolOptions } from './runtime/bridge-pro
export { SafeCodec, type CodecOptions } from './runtime/safe-codec.js';
// Transport - abstract I/O channel interface
export type { Transport, TransportOptions, ProtocolMessage, ProtocolResponse } from './runtime/transport.js';
export { isTransport, isProtocolMessage, isProtocolResponse } from './runtime/transport.js';
export { PROTOCOL_ID, isTransport, isProtocolMessage, isProtocolResponse } from './runtime/transport.js';
// Transport implementations
export { ProcessIO, type ProcessIOOptions } from './runtime/process-io.js';
export { HttpIO, type HttpIOOptions } from './runtime/http-io.js';
export { PyodideIO, type PyodideIOOptions } from './runtime/pyodide-io.js';
// WorkerPool - concurrent transport management
export { WorkerPool, type WorkerPoolOptions, type PooledWorker } from './runtime/worker-pool.js';
// PooledTransport - Transport adapter that wraps WorkerPool
export { PooledTransport, type PooledTransportOptions } from './runtime/pooled-transport.js';
export type { Disposable } from './runtime/disposable.js';
export { isDisposable, safeDispose, disposeAll } from './runtime/disposable.js';
export {
Expand Down Expand Up @@ -60,10 +62,10 @@ export {
} from './runtime/errors.js';
export { getRuntimeBridge, setRuntimeBridge, clearRuntimeBridge } from './runtime/index.js';

// Runtime-specific exports
export { NodeBridge } from './runtime/node.js';
export { PyodideBridge } from './runtime/pyodide.js';
export { HttpBridge } from './runtime/http.js';
// Runtime-specific exports (Bridges using new BridgeProtocol architecture)
export { NodeBridge, type NodeBridgeOptions } from './runtime/node.js';
export { PyodideBridge, type PyodideBridgeOptions } from './runtime/pyodide.js';
export { HttpBridge, type HttpBridgeOptions } from './runtime/http.js';

// Core types
export type {
Expand Down
60 changes: 34 additions & 26 deletions src/runtime/bridge-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import { BoundedContext, type ExecuteOptions } from './bounded-context.js';
import { SafeCodec, type CodecOptions } from './safe-codec.js';
import type { Transport, ProtocolMessage } from './transport.js';
import { PROTOCOL_ID, type Transport, type ProtocolMessage } from './transport.js';

// =============================================================================
// TYPES
Expand Down Expand Up @@ -143,12 +143,13 @@ export class BridgeProtocol extends BoundedContext {
* @throws BridgeTimeoutError if the operation times out
*/
protected async sendMessage<T>(
message: Omit<ProtocolMessage, 'id'>,
message: Omit<ProtocolMessage, 'id' | 'protocol'>,
options?: ExecuteOptions<T>
): Promise<T> {
const fullMessage: ProtocolMessage = {
...message,
id: this.generateId(),
protocol: PROTOCOL_ID,
};

return this.execute(async () => {
Expand Down Expand Up @@ -182,12 +183,13 @@ export class BridgeProtocol extends BoundedContext {
* @throws BridgeTimeoutError if the operation times out
*/
protected async sendMessageAsync<T>(
message: Omit<ProtocolMessage, 'id'>,
message: Omit<ProtocolMessage, 'id' | 'protocol'>,
options?: ExecuteOptions<T>
): Promise<T> {
const fullMessage: ProtocolMessage = {
...message,
id: this.generateId(),
protocol: PROTOCOL_ID,
};

return this.execute(async () => {
Expand All @@ -209,12 +211,11 @@ export class BridgeProtocol extends BoundedContext {
/**
* Generate a unique request ID.
*
* Format: req_{timestamp}_{counter}
* The combination of timestamp and monotonically increasing counter
* ensures uniqueness within a process lifetime.
* Returns a monotonically increasing integer that ensures uniqueness
* within a process lifetime.
*/
private generateId(): string {
return `req_${Date.now()}_${++this.requestId}`;
private generateId(): number {
return ++this.requestId;
}

// ===========================================================================
Expand All @@ -237,11 +238,13 @@ export class BridgeProtocol extends BoundedContext {
kwargs?: Record<string, unknown>
): Promise<T> {
return this.sendMessageAsync<T>({
type: 'call',
module,
functionName,
args,
kwargs,
method: 'call',
params: {
module,
functionName,
args,
kwargs,
},
});
}

Expand All @@ -261,11 +264,13 @@ export class BridgeProtocol extends BoundedContext {
kwargs?: Record<string, unknown>
): Promise<T> {
return this.sendMessageAsync<T>({
type: 'instantiate',
module,
className,
args,
kwargs,
method: 'instantiate',
params: {
module,
className,
args,
kwargs,
},
});
}

Expand All @@ -285,11 +290,13 @@ export class BridgeProtocol extends BoundedContext {
kwargs?: Record<string, unknown>
): Promise<T> {
return this.sendMessageAsync<T>({
type: 'call_method',
handle,
methodName,
args,
kwargs,
method: 'call_method',
params: {
handle,
methodName,
args,
kwargs,
},
});
}

Expand All @@ -303,9 +310,10 @@ export class BridgeProtocol extends BoundedContext {
*/
async disposeInstance(handle: string): Promise<void> {
await this.sendMessageAsync<void>({
type: 'dispose_instance',
handle,
args: [],
method: 'dispose_instance',
params: {
handle,
},
});
}
}
201 changes: 69 additions & 132 deletions src/runtime/http.ts
Original file line number Diff line number Diff line change
@@ -1,149 +1,86 @@
/**
* HTTP runtime bridge
* HTTP runtime bridge for BridgeProtocol.
*
* HttpBridge extends BridgeProtocol and uses HttpIO transport for
* stateless HTTP POST-based communication with a Python server.
*
* @see https://github.com/bbopen/tywrap/issues/149
*/

import { decodeValueAsync } from '../utils/codec.js';
import { BridgeProtocol, type BridgeProtocolOptions } from './bridge-protocol.js';
import { HttpIO, type HttpIOOptions } from './http-io.js';

Check warning on line 11 in src/runtime/http.ts

View workflow job for this annotation

GitHub Actions / lint

'HttpIOOptions' is defined but never used
import type { CodecOptions } from './safe-codec.js';

import { BoundedContext } from './bounded-context.js';
import { BridgeExecutionError, BridgeTimeoutError } from './errors.js';
// =============================================================================
// OPTIONS
// =============================================================================

/**
* Configuration options for HttpBridge.
*/
export interface HttpBridgeOptions {
/** Base URL for the Python server (e.g., 'http://localhost:8000') */
baseURL: string;
headers?: Record<string, string>;
timeoutMs?: number;
}

interface HttpCallPayload {
module: string;
functionName: string;
args: unknown[];
kwargs?: Record<string, unknown>;
}

interface HttpInstantiatePayload {
module: string;
className: string;
args: unknown[];
kwargs?: Record<string, unknown>;
}
/** Additional headers to include in each request */
headers?: Record<string, string>;

interface HttpCallMethodPayload {
handle: string;
methodName: string;
args: unknown[];
kwargs?: Record<string, unknown>;
}
/** Timeout in ms for requests. Default: 30000 (30 seconds) */
timeoutMs?: number;

interface HttpDisposePayload {
handle: string;
/** Codec options for validation/serialization */
codec?: CodecOptions;
}

export class HttpBridge extends BoundedContext {
private readonly baseURL: string;
private readonly headers: Record<string, string>;
private readonly timeoutMs: number;

constructor(options: HttpBridgeOptions = { baseURL: 'http://localhost:8000' }) {
super();
this.baseURL = options.baseURL.replace(/\/$/, '');
this.headers = { 'content-type': 'application/json', ...(options.headers ?? {}) };
this.timeoutMs = options.timeoutMs ?? 30000;
}

/**
* HttpBridge is stateless, so init is a no-op.
*/
protected async doInit(): Promise<void> {
// Stateless - no initialization required
}
// =============================================================================
// HTTP BRIDGE
// =============================================================================

/**
* HTTP-based runtime bridge for executing Python code.
*
* HttpBridge provides a stateless HTTP transport for communication with
* a Python server. Each request is independent - no connection state is
* maintained between calls.
*
* Features:
* - Stateless HTTP POST communication
* - Timeout handling via AbortController
* - Full SafeCodec validation (NaN/Infinity rejection, key validation)
* - Automatic Arrow decoding for DataFrames/ndarrays
*
* @example
* ```typescript
* const bridge = new HttpBridge({ baseURL: 'http://localhost:8000' });
* await bridge.init();
*
* const result = await bridge.call('math', 'sqrt', [16]);
* console.log(result); // 4.0
*
* await bridge.dispose();
* ```
*/
export class HttpBridge extends BridgeProtocol {
/**
* HttpBridge is stateless, so dispose is a no-op.
* Create a new HttpBridge instance.
*
* @param options - Configuration options for the bridge
*/
protected async doDispose(): Promise<void> {
// Stateless - no cleanup required
}

async call<T = unknown>(
module: string,
functionName: string,
args: unknown[],
kwargs?: Record<string, unknown>
): Promise<T> {
const payload: HttpCallPayload = { module, functionName, args, kwargs };
const res = await this.post(`${this.baseURL}/call`, payload);
return (await decodeValueAsync(res)) as T;
}

async instantiate<T = unknown>(
module: string,
className: string,
args: unknown[],
kwargs?: Record<string, unknown>
): Promise<T> {
const payload: HttpInstantiatePayload = { module, className, args, kwargs };
const res = await this.post(`${this.baseURL}/instantiate`, payload);
return (await decodeValueAsync(res)) as T;
}

async callMethod<T = unknown>(
handle: string,
methodName: string,
args: unknown[],
kwargs?: Record<string, unknown>
): Promise<T> {
const payload: HttpCallMethodPayload = { handle, methodName, args, kwargs };
const res = await this.post(`${this.baseURL}/call_method`, payload);
return (await decodeValueAsync(res)) as T;
}

async disposeInstance(handle: string): Promise<void> {
const payload: HttpDisposePayload = { handle };
await this.post(`${this.baseURL}/dispose_instance`, payload);
}

private async post(url: string, body: unknown): Promise<unknown> {
const controller = typeof AbortController !== 'undefined' ? new AbortController() : undefined;
const timer = controller ? setTimeout(() => controller.abort(), this.timeoutMs) : undefined;
try {
const resp = await fetch(url, {
method: 'POST',
headers: this.headers,
body: JSON.stringify(body),
signal: controller?.signal,
});
if (!resp.ok) {
const text = await safeText(resp);
throw new BridgeExecutionError(`HTTP ${resp.status}: ${text || resp.statusText}`);
}
const ct = resp.headers.get('content-type') ?? '';
if (ct.includes('application/json')) {
return (await resp.json()) as unknown;
}
const text = await resp.text();
try {
return JSON.parse(text) as unknown;
} catch {
return text as unknown;
}
} catch (error) {
// Handle abort/timeout errors
if (error instanceof Error && error.name === 'AbortError') {
throw new BridgeTimeoutError(`Request timed out after ${this.timeoutMs}ms`);
}
throw error;
} finally {
if (timer) {
clearTimeout(timer);
}
}
}
}

async function safeText(resp: Response): Promise<string> {
try {
return await resp.text();
} catch {
return '';
constructor(options: HttpBridgeOptions) {
// Create HTTP transport
const transport = new HttpIO({
baseURL: options.baseURL,
headers: options.headers,
defaultTimeoutMs: options.timeoutMs,
});

// Initialize BridgeProtocol with transport and codec options
const protocolOptions: BridgeProtocolOptions = {
transport,
codec: options.codec,
defaultTimeoutMs: options.timeoutMs,
};

super(protocolOptions);
}
}
Loading
Loading