Skip to content

Commit 205747a

Browse files
authored
Add signal and closed properties to connection classes (#11)
Expose AbortSignal and closed Promise on AgentSideConnection and ClientSideConnection to allow detection of connection closure. The signal can be used with event listeners or passed to other APIs for cancellation, while the closed promise enables async/await style cleanup when the stream ends.
1 parent 895532d commit 205747a

File tree

2 files changed

+316
-0
lines changed

2 files changed

+316
-0
lines changed

src/acp.test.ts

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,184 @@ describe("Connection", () => {
758758
});
759759
});
760760

761+
it("resolves closed promise when stream ends", async () => {
762+
const closeLog: string[] = [];
763+
764+
// Create simple client and agent
765+
class TestClient implements Client {
766+
async writeTextFile(
767+
_: WriteTextFileRequest,
768+
): Promise<WriteTextFileResponse> {
769+
return {};
770+
}
771+
async readTextFile(
772+
_: ReadTextFileRequest,
773+
): Promise<ReadTextFileResponse> {
774+
return { content: "test" };
775+
}
776+
async requestPermission(
777+
_: RequestPermissionRequest,
778+
): Promise<RequestPermissionResponse> {
779+
return {
780+
outcome: {
781+
outcome: "selected",
782+
optionId: "allow",
783+
},
784+
};
785+
}
786+
async sessionUpdate(_: SessionNotification): Promise<void> {
787+
// no-op
788+
}
789+
}
790+
791+
class TestAgent implements Agent {
792+
async initialize(_: InitializeRequest): Promise<InitializeResponse> {
793+
return {
794+
protocolVersion: PROTOCOL_VERSION,
795+
agentCapabilities: { loadSession: false },
796+
};
797+
}
798+
async newSession(_: NewSessionRequest): Promise<NewSessionResponse> {
799+
return { sessionId: "test-session" };
800+
}
801+
async authenticate(_: AuthenticateRequest): Promise<void> {
802+
// no-op
803+
}
804+
async prompt(_: PromptRequest): Promise<PromptResponse> {
805+
return { stopReason: "end_turn" };
806+
}
807+
async cancel(_: CancelNotification): Promise<void> {
808+
// no-op
809+
}
810+
}
811+
812+
// Set up connections
813+
const agentConnection = new ClientSideConnection(
814+
() => new TestClient(),
815+
ndJsonStream(clientToAgent.writable, agentToClient.readable),
816+
);
817+
818+
const clientConnection = new AgentSideConnection(
819+
() => new TestAgent(),
820+
ndJsonStream(agentToClient.writable, clientToAgent.readable),
821+
);
822+
823+
// Listen for close via signal
824+
agentConnection.signal.addEventListener("abort", () => {
825+
closeLog.push("agent connection closed (signal)");
826+
});
827+
828+
clientConnection.signal.addEventListener("abort", () => {
829+
closeLog.push("client connection closed (signal)");
830+
});
831+
832+
// Verify connections are not closed yet
833+
expect(agentConnection.signal.aborted).toBe(false);
834+
expect(clientConnection.signal.aborted).toBe(false);
835+
expect(closeLog).toHaveLength(0);
836+
837+
// Close the streams by closing the writable ends
838+
await clientToAgent.writable.close();
839+
await agentToClient.writable.close();
840+
841+
// Wait for closed promises to resolve
842+
await agentConnection.closed;
843+
await clientConnection.closed;
844+
845+
// Verify connections are now closed
846+
expect(agentConnection.signal.aborted).toBe(true);
847+
expect(clientConnection.signal.aborted).toBe(true);
848+
expect(closeLog).toContain("agent connection closed (signal)");
849+
expect(closeLog).toContain("client connection closed (signal)");
850+
});
851+
852+
it("supports removing signal event listeners", async () => {
853+
const closeLog: string[] = [];
854+
855+
// Create simple client and agent
856+
class TestClient implements Client {
857+
async writeTextFile(
858+
_: WriteTextFileRequest,
859+
): Promise<WriteTextFileResponse> {
860+
return {};
861+
}
862+
async readTextFile(
863+
_: ReadTextFileRequest,
864+
): Promise<ReadTextFileResponse> {
865+
return { content: "test" };
866+
}
867+
async requestPermission(
868+
_: RequestPermissionRequest,
869+
): Promise<RequestPermissionResponse> {
870+
return {
871+
outcome: {
872+
outcome: "selected",
873+
optionId: "allow",
874+
},
875+
};
876+
}
877+
async sessionUpdate(_: SessionNotification): Promise<void> {
878+
// no-op
879+
}
880+
}
881+
882+
class TestAgent implements Agent {
883+
async initialize(_: InitializeRequest): Promise<InitializeResponse> {
884+
return {
885+
protocolVersion: PROTOCOL_VERSION,
886+
agentCapabilities: { loadSession: false },
887+
};
888+
}
889+
async newSession(_: NewSessionRequest): Promise<NewSessionResponse> {
890+
return { sessionId: "test-session" };
891+
}
892+
async authenticate(_: AuthenticateRequest): Promise<void> {
893+
// no-op
894+
}
895+
async prompt(_: PromptRequest): Promise<PromptResponse> {
896+
return { stopReason: "end_turn" };
897+
}
898+
async cancel(_: CancelNotification): Promise<void> {
899+
// no-op
900+
}
901+
}
902+
903+
// Set up connections
904+
const agentConnection = new ClientSideConnection(
905+
() => new TestClient(),
906+
ndJsonStream(clientToAgent.writable, agentToClient.readable),
907+
);
908+
909+
new AgentSideConnection(
910+
() => new TestAgent(),
911+
ndJsonStream(agentToClient.writable, clientToAgent.readable),
912+
);
913+
914+
// Register and then remove a listener
915+
const listener = () => {
916+
closeLog.push("this should not be called");
917+
};
918+
919+
agentConnection.signal.addEventListener("abort", listener);
920+
agentConnection.signal.removeEventListener("abort", listener);
921+
922+
// Register another listener that should be called
923+
agentConnection.signal.addEventListener("abort", () => {
924+
closeLog.push("agent connection closed");
925+
});
926+
927+
// Close the streams
928+
await clientToAgent.writable.close();
929+
await agentToClient.writable.close();
930+
931+
// Wait for closed promise
932+
await agentConnection.closed;
933+
934+
// Verify only the non-removed listener was called
935+
expect(closeLog).toEqual(["agent connection closed"]);
936+
expect(closeLog).not.toContain("this should not be called");
937+
});
938+
761939
it("handles methods returning response objects with _meta or void", async () => {
762940
// Create client that returns both response objects and void
763941
class TestClient implements Client {

src/acp.ts

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,57 @@ export class AgentSideConnection {
261261
): Promise<void> {
262262
return await this.#connection.sendNotification(`_${method}`, params);
263263
}
264+
265+
/**
266+
* AbortSignal that aborts when the connection closes.
267+
*
268+
* This signal can be used to:
269+
* - Listen for connection closure: `connection.signal.addEventListener('abort', () => {...})`
270+
* - Check connection status synchronously: `if (connection.signal.aborted) {...}`
271+
* - Pass to other APIs (fetch, setTimeout) for automatic cancellation
272+
*
273+
* The connection closes when the underlying stream ends, either normally or due to an error.
274+
*
275+
* @example
276+
* ```typescript
277+
* const connection = new AgentSideConnection(agent, stream);
278+
*
279+
* // Listen for closure
280+
* connection.signal.addEventListener('abort', () => {
281+
* console.log('Connection closed - performing cleanup');
282+
* });
283+
*
284+
* // Check status
285+
* if (connection.signal.aborted) {
286+
* console.log('Connection is already closed');
287+
* }
288+
*
289+
* // Pass to other APIs
290+
* fetch(url, { signal: connection.signal });
291+
* ```
292+
*/
293+
get signal(): AbortSignal {
294+
return this.#connection.signal;
295+
}
296+
297+
/**
298+
* Promise that resolves when the connection closes.
299+
*
300+
* The connection closes when the underlying stream ends, either normally or due to an error.
301+
* Once closed, the connection cannot send or receive any more messages.
302+
*
303+
* This is useful for async/await style cleanup:
304+
*
305+
* @example
306+
* ```typescript
307+
* const connection = new AgentSideConnection(agent, stream);
308+
* await connection.closed;
309+
* console.log('Connection closed - performing cleanup');
310+
* ```
311+
*/
312+
get closed(): Promise<void> {
313+
return this.#connection.closed;
314+
}
264315
}
265316

266317
/**
@@ -686,6 +737,57 @@ export class ClientSideConnection implements Agent {
686737
): Promise<void> {
687738
return await this.#connection.sendNotification(`_${method}`, params);
688739
}
740+
741+
/**
742+
* AbortSignal that aborts when the connection closes.
743+
*
744+
* This signal can be used to:
745+
* - Listen for connection closure: `connection.signal.addEventListener('abort', () => {...})`
746+
* - Check connection status synchronously: `if (connection.signal.aborted) {...}`
747+
* - Pass to other APIs (fetch, setTimeout) for automatic cancellation
748+
*
749+
* The connection closes when the underlying stream ends, either normally or due to an error.
750+
*
751+
* @example
752+
* ```typescript
753+
* const connection = new ClientSideConnection(client, stream);
754+
*
755+
* // Listen for closure
756+
* connection.signal.addEventListener('abort', () => {
757+
* console.log('Connection closed - performing cleanup');
758+
* });
759+
*
760+
* // Check status
761+
* if (connection.signal.aborted) {
762+
* console.log('Connection is already closed');
763+
* }
764+
*
765+
* // Pass to other APIs
766+
* fetch(url, { signal: connection.signal });
767+
* ```
768+
*/
769+
get signal(): AbortSignal {
770+
return this.#connection.signal;
771+
}
772+
773+
/**
774+
* Promise that resolves when the connection closes.
775+
*
776+
* The connection closes when the underlying stream ends, either normally or due to an error.
777+
* Once closed, the connection cannot send or receive any more messages.
778+
*
779+
* This is useful for async/await style cleanup:
780+
*
781+
* @example
782+
* ```typescript
783+
* const connection = new ClientSideConnection(client, stream);
784+
* await connection.closed;
785+
* console.log('Connection closed - performing cleanup');
786+
* ```
787+
*/
788+
get closed(): Promise<void> {
789+
return this.#connection.closed;
790+
}
689791
}
690792

691793
export type { AnyMessage } from "./jsonrpc.js";
@@ -697,6 +799,8 @@ class Connection {
697799
#notificationHandler: NotificationHandler;
698800
#stream: Stream;
699801
#writeQueue: Promise<void> = Promise.resolve();
802+
#abortController = new AbortController();
803+
#closedPromise: Promise<void>;
700804

701805
constructor(
702806
requestHandler: RequestHandler,
@@ -706,9 +810,42 @@ class Connection {
706810
this.#requestHandler = requestHandler;
707811
this.#notificationHandler = notificationHandler;
708812
this.#stream = stream;
813+
this.#closedPromise = new Promise((resolve) => {
814+
this.#abortController.signal.addEventListener("abort", () => resolve());
815+
});
709816
this.#receive();
710817
}
711818

819+
/**
820+
* AbortSignal that aborts when the connection closes.
821+
*
822+
* This signal can be used to:
823+
* - Listen for connection closure via event listeners
824+
* - Check connection status synchronously with `signal.aborted`
825+
* - Pass to other APIs (fetch, setTimeout) for automatic cancellation
826+
*/
827+
get signal(): AbortSignal {
828+
return this.#abortController.signal;
829+
}
830+
831+
/**
832+
* Promise that resolves when the connection closes.
833+
*
834+
* The connection closes when the underlying stream ends, either normally
835+
* or due to an error. Once closed, the connection cannot send or receive
836+
* any more messages.
837+
*
838+
* @example
839+
* ```typescript
840+
* const connection = new ClientSideConnection(client, stream);
841+
* await connection.closed;
842+
* console.log('Connection closed - performing cleanup');
843+
* ```
844+
*/
845+
get closed(): Promise<void> {
846+
return this.#closedPromise;
847+
}
848+
712849
async #receive() {
713850
const reader = this.#stream.readable.getReader();
714851
try {
@@ -744,6 +881,7 @@ class Connection {
744881
}
745882
} finally {
746883
reader.releaseLock();
884+
this.#abortController.abort();
747885
}
748886
}
749887

0 commit comments

Comments
 (0)