Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/adr/002-bridge-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

## Status

**In Progress** (Core infrastructure complete, bridge migration pending)
**Accepted** (Fully implemented)

### Implementation Progress

- ✅ Phase 1: SafeCodec (TypeScript + Python)
- ✅ Phase 2: Transport interface + ProcessIO, HttpIO, PyodideIO
- ✅ Phase 3: WorkerPool
- ✅ Phase 3: WorkerPool (PooledTransport)
- ✅ BridgeProtocol base class
- Phase 4: Bridge migration (NodeBridge, HttpBridge, PyodideBridge)
- Phase 5: Cleanup and deprecations
- Phase 4: Bridge migration (NodeBridge, HttpBridge, PyodideBridge)
- Phase 5: Cleanup and documentation

## Context

Expand Down
9 changes: 1 addition & 8 deletions examples/living-app/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,7 @@ async function main(): Promise<void> {
setRuntimeBridge(bridge);

if (codec === 'arrow') {
const info = await bridge.getBridgeInfo();
if (!info.arrowAvailable) {
// Why: fail fast with a clear message; otherwise the bridge will emit Arrow envelopes and the
// caller will see confusing decode errors.
throw new Error(
'Arrow mode requested but pyarrow is not installed in the Python environment. Install pyarrow or run with --json.'
);
}
// Enable Arrow decoder - will fail at decode time if pyarrow is not installed
await enableArrowDecoder();
}

Expand Down
12 changes: 7 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ export { BridgeProtocol, type BridgeProtocolOptions } from './runtime/bridge-pro
export { SafeCodec, type CodecOptions } from './runtime/safe-codec.js';
// Transport - abstract I/O channel interface
export type { Transport, TransportOptions, ProtocolMessage, ProtocolResponse } from './runtime/transport.js';
export { isTransport, isProtocolMessage, isProtocolResponse } from './runtime/transport.js';
export { PROTOCOL_ID, isTransport, isProtocolMessage, isProtocolResponse } from './runtime/transport.js';
// Transport implementations
export { ProcessIO, type ProcessIOOptions } from './runtime/process-io.js';
export { HttpIO, type HttpIOOptions } from './runtime/http-io.js';
export { PyodideIO, type PyodideIOOptions } from './runtime/pyodide-io.js';
// WorkerPool - concurrent transport management
export { WorkerPool, type WorkerPoolOptions, type PooledWorker } from './runtime/worker-pool.js';
// PooledTransport - Transport adapter that wraps WorkerPool
export { PooledTransport, type PooledTransportOptions } from './runtime/pooled-transport.js';
export type { Disposable } from './runtime/disposable.js';
export { isDisposable, safeDispose, disposeAll } from './runtime/disposable.js';
export {
Expand Down Expand Up @@ -60,10 +62,10 @@ export {
} from './runtime/errors.js';
export { getRuntimeBridge, setRuntimeBridge, clearRuntimeBridge } from './runtime/index.js';

// Runtime-specific exports
export { NodeBridge } from './runtime/node.js';
export { PyodideBridge } from './runtime/pyodide.js';
export { HttpBridge } from './runtime/http.js';
// Runtime-specific exports (Bridges using new BridgeProtocol architecture)
export { NodeBridge, type NodeBridgeOptions } from './runtime/node.js';
export { PyodideBridge, type PyodideBridgeOptions } from './runtime/pyodide.js';
export { HttpBridge, type HttpBridgeOptions } from './runtime/http.js';

// Core types
export type {
Expand Down
60 changes: 34 additions & 26 deletions src/runtime/bridge-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import { BoundedContext, type ExecuteOptions } from './bounded-context.js';
import { SafeCodec, type CodecOptions } from './safe-codec.js';
import type { Transport, ProtocolMessage } from './transport.js';
import { PROTOCOL_ID, type Transport, type ProtocolMessage } from './transport.js';

// =============================================================================
// TYPES
Expand Down Expand Up @@ -143,12 +143,13 @@ export class BridgeProtocol extends BoundedContext {
* @throws BridgeTimeoutError if the operation times out
*/
protected async sendMessage<T>(
message: Omit<ProtocolMessage, 'id'>,
message: Omit<ProtocolMessage, 'id' | 'protocol'>,
options?: ExecuteOptions<T>
): Promise<T> {
const fullMessage: ProtocolMessage = {
...message,
id: this.generateId(),
protocol: PROTOCOL_ID,
};

return this.execute(async () => {
Expand Down Expand Up @@ -182,12 +183,13 @@ export class BridgeProtocol extends BoundedContext {
* @throws BridgeTimeoutError if the operation times out
*/
protected async sendMessageAsync<T>(
message: Omit<ProtocolMessage, 'id'>,
message: Omit<ProtocolMessage, 'id' | 'protocol'>,
options?: ExecuteOptions<T>
): Promise<T> {
const fullMessage: ProtocolMessage = {
...message,
id: this.generateId(),
protocol: PROTOCOL_ID,
};

return this.execute(async () => {
Expand All @@ -209,12 +211,11 @@ export class BridgeProtocol extends BoundedContext {
/**
* Generate a unique request ID.
*
* Format: req_{timestamp}_{counter}
* The combination of timestamp and monotonically increasing counter
* ensures uniqueness within a process lifetime.
* Returns a monotonically increasing integer that ensures uniqueness
* within a process lifetime.
*/
private generateId(): string {
return `req_${Date.now()}_${++this.requestId}`;
private generateId(): number {
return ++this.requestId;
}

// ===========================================================================
Expand All @@ -237,11 +238,13 @@ export class BridgeProtocol extends BoundedContext {
kwargs?: Record<string, unknown>
): Promise<T> {
return this.sendMessageAsync<T>({
type: 'call',
module,
functionName,
args,
kwargs,
method: 'call',
params: {
module,
functionName,
args,
kwargs,
},
});
}

Expand All @@ -261,11 +264,13 @@ export class BridgeProtocol extends BoundedContext {
kwargs?: Record<string, unknown>
): Promise<T> {
return this.sendMessageAsync<T>({
type: 'instantiate',
module,
className,
args,
kwargs,
method: 'instantiate',
params: {
module,
className,
args,
kwargs,
},
});
}

Expand All @@ -285,11 +290,13 @@ export class BridgeProtocol extends BoundedContext {
kwargs?: Record<string, unknown>
): Promise<T> {
return this.sendMessageAsync<T>({
type: 'call_method',
handle,
methodName,
args,
kwargs,
method: 'call_method',
params: {
handle,
methodName,
args,
kwargs,
},
});
}

Expand All @@ -303,9 +310,10 @@ export class BridgeProtocol extends BoundedContext {
*/
async disposeInstance(handle: string): Promise<void> {
await this.sendMessageAsync<void>({
type: 'dispose_instance',
handle,
args: [],
method: 'dispose_instance',
params: {
handle,
},
});
}
}
Loading
Loading