Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/adr/002-bridge-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

## Status

**In Progress** (Core infrastructure complete, bridge migration pending)
**Accepted** (Fully implemented)

### Implementation Progress

- ✅ Phase 1: SafeCodec (TypeScript + Python)
- ✅ Phase 2: Transport interface + ProcessIO, HttpIO, PyodideIO
- ✅ Phase 3: WorkerPool
- ✅ Phase 3: WorkerPool (PooledTransport)
- ✅ BridgeProtocol base class
- Phase 4: Bridge migration (NodeBridge, HttpBridge, PyodideBridge)
- Phase 5: Cleanup and deprecations
- Phase 4: Bridge migration (NodeBridge, HttpBridge, PyodideBridge)
- Phase 5: Cleanup and documentation

## Context

Expand Down
9 changes: 1 addition & 8 deletions examples/living-app/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,7 @@ async function main(): Promise<void> {
setRuntimeBridge(bridge);

if (codec === 'arrow') {
const info = await bridge.getBridgeInfo();
if (!info.arrowAvailable) {
// Why: fail fast with a clear message; otherwise the bridge will emit Arrow envelopes and the
// caller will see confusing decode errors.
throw new Error(
'Arrow mode requested but pyarrow is not installed in the Python environment. Install pyarrow or run with --json.'
);
}
// Enable Arrow decoder - will fail at decode time if pyarrow is not installed
await enableArrowDecoder();
}

Expand Down
12 changes: 7 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ export { BridgeProtocol, type BridgeProtocolOptions } from './runtime/bridge-pro
export { SafeCodec, type CodecOptions } from './runtime/safe-codec.js';
// Transport - abstract I/O channel interface
export type { Transport, TransportOptions, ProtocolMessage, ProtocolResponse } from './runtime/transport.js';
export { isTransport, isProtocolMessage, isProtocolResponse } from './runtime/transport.js';
export { PROTOCOL_ID, isTransport, isProtocolMessage, isProtocolResponse } from './runtime/transport.js';
// Transport implementations
export { ProcessIO, type ProcessIOOptions } from './runtime/process-io.js';
export { HttpIO, type HttpIOOptions } from './runtime/http-io.js';
export { PyodideIO, type PyodideIOOptions } from './runtime/pyodide-io.js';
// WorkerPool - concurrent transport management
export { WorkerPool, type WorkerPoolOptions, type PooledWorker } from './runtime/worker-pool.js';
// PooledTransport - Transport adapter that wraps WorkerPool
export { PooledTransport, type PooledTransportOptions } from './runtime/pooled-transport.js';
export type { Disposable } from './runtime/disposable.js';
export { isDisposable, safeDispose, disposeAll } from './runtime/disposable.js';
export {
Expand Down Expand Up @@ -60,10 +62,10 @@ export {
} from './runtime/errors.js';
export { getRuntimeBridge, setRuntimeBridge, clearRuntimeBridge } from './runtime/index.js';

// Runtime-specific exports
export { NodeBridge } from './runtime/node.js';
export { PyodideBridge } from './runtime/pyodide.js';
export { HttpBridge } from './runtime/http.js';
// Runtime-specific exports (Bridges using new BridgeProtocol architecture)
export { NodeBridge, type NodeBridgeOptions } from './runtime/node.js';
export { PyodideBridge, type PyodideBridgeOptions } from './runtime/pyodide.js';
export { HttpBridge, type HttpBridgeOptions } from './runtime/http.js';

// Core types
export type {
Expand Down
60 changes: 34 additions & 26 deletions src/runtime/bridge-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import { BoundedContext, type ExecuteOptions } from './bounded-context.js';
import { SafeCodec, type CodecOptions } from './safe-codec.js';
import type { Transport, ProtocolMessage } from './transport.js';
import { PROTOCOL_ID, type Transport, type ProtocolMessage } from './transport.js';

// =============================================================================
// TYPES
Expand Down Expand Up @@ -143,12 +143,13 @@ export class BridgeProtocol extends BoundedContext {
* @throws BridgeTimeoutError if the operation times out
*/
protected async sendMessage<T>(
message: Omit<ProtocolMessage, 'id'>,
message: Omit<ProtocolMessage, 'id' | 'protocol'>,
options?: ExecuteOptions<T>
): Promise<T> {
const fullMessage: ProtocolMessage = {
...message,
id: this.generateId(),
protocol: PROTOCOL_ID,
};

return this.execute(async () => {
Expand Down Expand Up @@ -182,12 +183,13 @@ export class BridgeProtocol extends BoundedContext {
* @throws BridgeTimeoutError if the operation times out
*/
protected async sendMessageAsync<T>(
message: Omit<ProtocolMessage, 'id'>,
message: Omit<ProtocolMessage, 'id' | 'protocol'>,
options?: ExecuteOptions<T>
): Promise<T> {
const fullMessage: ProtocolMessage = {
...message,
id: this.generateId(),
protocol: PROTOCOL_ID,
};

return this.execute(async () => {
Expand All @@ -209,12 +211,11 @@ export class BridgeProtocol extends BoundedContext {
/**
* Generate a unique request ID.
*
* Format: req_{timestamp}_{counter}
* The combination of timestamp and monotonically increasing counter
* ensures uniqueness within a process lifetime.
* Returns a monotonically increasing integer that ensures uniqueness
* within a process lifetime.
*/
private generateId(): string {
return `req_${Date.now()}_${++this.requestId}`;
private generateId(): number {
return ++this.requestId;
}

// ===========================================================================
Expand All @@ -237,11 +238,13 @@ export class BridgeProtocol extends BoundedContext {
kwargs?: Record<string, unknown>
): Promise<T> {
return this.sendMessageAsync<T>({
type: 'call',
module,
functionName,
args,
kwargs,
method: 'call',
params: {
module,
functionName,
args,
kwargs,
},
});
}

Expand All @@ -261,11 +264,13 @@ export class BridgeProtocol extends BoundedContext {
kwargs?: Record<string, unknown>
): Promise<T> {
return this.sendMessageAsync<T>({
type: 'instantiate',
module,
className,
args,
kwargs,
method: 'instantiate',
params: {
module,
className,
args,
kwargs,
},
});
}

Expand All @@ -285,11 +290,13 @@ export class BridgeProtocol extends BoundedContext {
kwargs?: Record<string, unknown>
): Promise<T> {
return this.sendMessageAsync<T>({
type: 'call_method',
handle,
methodName,
args,
kwargs,
method: 'call_method',
params: {
handle,
methodName,
args,
kwargs,
},
});
}

Expand All @@ -303,9 +310,10 @@ export class BridgeProtocol extends BoundedContext {
*/
async disposeInstance(handle: string): Promise<void> {
await this.sendMessageAsync<void>({
type: 'dispose_instance',
handle,
args: [],
method: 'dispose_instance',
params: {
handle,
},
});
}
}
Loading
Loading