From ee088a50cbf5f66e054ce264695c69e378333add Mon Sep 17 00:00:00 2001 From: Pete Date: Wed, 22 Apr 2026 09:16:55 +0100 Subject: [PATCH 1/3] fix: Wait for response handler to run before processing the next message --- src/acp.test.ts | 133 ++++++++++++++++++++++++++++++++++++++++++++++++ src/acp.ts | 2 + 2 files changed, 135 insertions(+) diff --git a/src/acp.test.ts b/src/acp.test.ts index 54e6e22..e937f3d 100644 --- a/src/acp.test.ts +++ b/src/acp.test.ts @@ -476,6 +476,139 @@ describe("Connection", () => { }); }); + it("handles requests from inside a request handler without deadlocking", async () => { + let agentConnection: ClientSideConnection | null = null; + + class TestClient implements Client { + async writeTextFile(_: WriteTextFileRequest): Promise { + return {}; + } + async readTextFile(_: ReadTextFileRequest): Promise { + return { content: "" }; + } + async requestPermission(_: RequestPermissionRequest): Promise { + const listResponse = await agentConnection!.listSessions({}); + expect(listResponse.sessions).toEqual([]); + return { outcome: { outcome: "selected", optionId: "allow" } }; + } + async sessionUpdate(_: SessionNotification): Promise {} + } + + class TestAgent implements Agent { + async initialize(_: InitializeRequest): Promise { + return { + protocolVersion: 1, + agentCapabilities: { loadSession: false }, + authMethods: [], + }; + } + async newSession(_: NewSessionRequest): Promise { + return { sessionId: "test-session" }; + } + async authenticate(_: AuthenticateRequest): Promise {} + async prompt(_: PromptRequest): Promise { + return { stopReason: "end_turn" }; + } + async cancel(_: CancelNotification): Promise {} + async listSessions(_: ListSessionsRequest): Promise { + return { sessions: [] }; + } + } + + agentConnection = new ClientSideConnection( + () => new TestClient(), + ndJsonStream(clientToAgent.writable, agentToClient.readable), + ); + + const clientConnection = new AgentSideConnection( + () => new TestAgent(), + ndJsonStream(agentToClient.writable, clientToAgent.readable), + ); + + const permissionResponse = await clientConnection.requestPermission({ + sessionId: "test-session", + toolCall: { + title: "Execute command", + kind: "execute", + status: "pending", + toolCallId: "tool-123", + content: [{ type: "content", content: { type: "text", text: "ls -la" } }], + }, + options: [ + { kind: "allow_once", name: "Allow", optionId: "allow" }, + { kind: "reject_once", name: "Reject", optionId: "reject" }, + ], + }); + + expect(permissionResponse.outcome.outcome).toBe("selected"); + }); + + it("processes notification after response when both arrive in quick succession", async () => { + const events: string[] = []; + const { promise: sessionNotification, resolve: resolveSessionNotification } = Promise.withResolvers(); + + class TestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile( + _: ReadTextFileRequest, + ): Promise { + return { content: "test" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + return { + outcome: { + outcome: "selected", + optionId: "allow", + }, + }; + } + async sessionUpdate(_params: SessionNotification): Promise { + // Record the session notification + events.push("SessionNotification") + resolveSessionNotification(); + } + } + + const connection = new ClientSideConnection( + () => new TestClient(), + ndJsonStream(clientToAgent.writable, agentToClient.readable), + ); + + const newSessionResponse = connection + .newSession({ cwd: "/test", mcpServers: [] }) + .then((result) => { + // Record the new session response event + events.push("NewSessionResponse"); + return result; + }); + + // Get the NewSessionRequest ID + const requestReader = clientToAgent.readable.getReader(); + const { value: requestChunk } = await requestReader.read(); + requestReader.releaseLock(); + const { id: requestId } = JSON.parse(new TextDecoder().decode(requestChunk)); + + // Write response and notification in quick succession + const sessionId = "test-session"; + const writer = agentToClient.writable.getWriter(); + await writer.write(new TextEncoder().encode(JSON.stringify( + { jsonrpc: "2.0", id: requestId, result: { sessionId } }) + "\n")); + await writer.write(new TextEncoder().encode(JSON.stringify( + { jsonrpc: "2.0", method: "session/update", params: { sessionId, update: { sessionUpdate: "available_commands_update", availableCommands: [] } } }) + "\n")); + writer.releaseLock(); + + await newSessionResponse; + await sessionNotification; + + expect(events).toEqual(["NewSessionResponse", "SessionNotification"]); + }); + it("handles initialize method", async () => { // Create client class TestClient implements Client { diff --git a/src/acp.ts b/src/acp.ts index 0693a72..00fda23 100644 --- a/src/acp.ts +++ b/src/acp.ts @@ -1294,6 +1294,8 @@ class Connection { try { this.processMessage(message); + // Ensure any response handlers are called before processing the next message + await new Promise(resolve => setTimeout(resolve, 0)); } catch (err) { console.error( "Unexpected error during message processing:", From 3de01cb984a0d1927fef3d353c1ff166a9156b82 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Thu, 23 Apr 2026 16:14:44 +0200 Subject: [PATCH 2/3] Avoid awaiting request sends in Connection Send requests without awaiting `sendMessage` and remove the extra zero-delay yield after processing each message. This prevents unnecessary event-loop stalls while preserving pending response rejection handling if the transport closes. --- src/acp.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/acp.ts b/src/acp.ts index 00fda23..6708b3a 100644 --- a/src/acp.ts +++ b/src/acp.ts @@ -1294,8 +1294,6 @@ class Connection { try { this.processMessage(message); - // Ensure any response handlers are called before processing the next message - await new Promise(resolve => setTimeout(resolve, 0)); } catch (err) { console.error( "Unexpected error during message processing:", @@ -1464,21 +1462,21 @@ class Connection { } } - async sendRequest(method: string, params?: Req): Promise { + sendRequest(method: string, params?: Req): Promise { this.throwIfClosed(); const id = this.nextRequestId++; const responsePromise = new Promise((resolve, reject) => { this.pendingResponses.set(id, { resolve, reject }); }); // If the transport fails (or receive observes stream closure) during the - // `await sendMessage` below, close() will reject `responsePromise` + // `sendMessage` below, close() will reject `responsePromise` // before the caller has had a chance to attach its own handler. Node then // reports a spurious `unhandledRejection`, even though the caller's // subsequent `await` does observe the rejection. Attach a noop catch so // the rejection is considered handled at the time it's raised; the // rejection is still delivered to the caller via `return responsePromise`. responsePromise.catch(() => {}); - await this.sendMessage({ jsonrpc: "2.0", id, method, params }); + void this.sendMessage({ jsonrpc: "2.0", id, method, params }); return responsePromise as Promise; } From 6d371b875385a068353099952cc1a567448ef0ec Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Thu, 23 Apr 2026 16:17:14 +0200 Subject: [PATCH 3/3] Format --- src/acp.test.ts | 61 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/src/acp.test.ts b/src/acp.test.ts index e937f3d..99e80b9 100644 --- a/src/acp.test.ts +++ b/src/acp.test.ts @@ -480,13 +480,19 @@ describe("Connection", () => { let agentConnection: ClientSideConnection | null = null; class TestClient implements Client { - async writeTextFile(_: WriteTextFileRequest): Promise { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { return {}; } - async readTextFile(_: ReadTextFileRequest): Promise { + async readTextFile( + _: ReadTextFileRequest, + ): Promise { return { content: "" }; } - async requestPermission(_: RequestPermissionRequest): Promise { + async requestPermission( + _: RequestPermissionRequest, + ): Promise { const listResponse = await agentConnection!.listSessions({}); expect(listResponse.sessions).toEqual([]); return { outcome: { outcome: "selected", optionId: "allow" } }; @@ -510,7 +516,9 @@ describe("Connection", () => { return { stopReason: "end_turn" }; } async cancel(_: CancelNotification): Promise {} - async listSessions(_: ListSessionsRequest): Promise { + async listSessions( + _: ListSessionsRequest, + ): Promise { return { sessions: [] }; } } @@ -532,7 +540,9 @@ describe("Connection", () => { kind: "execute", status: "pending", toolCallId: "tool-123", - content: [{ type: "content", content: { type: "text", text: "ls -la" } }], + content: [ + { type: "content", content: { type: "text", text: "ls -la" } }, + ], }, options: [ { kind: "allow_once", name: "Allow", optionId: "allow" }, @@ -542,10 +552,13 @@ describe("Connection", () => { expect(permissionResponse.outcome.outcome).toBe("selected"); }); - + it("processes notification after response when both arrive in quick succession", async () => { const events: string[] = []; - const { promise: sessionNotification, resolve: resolveSessionNotification } = Promise.withResolvers(); + const { + promise: sessionNotification, + resolve: resolveSessionNotification, + } = Promise.withResolvers(); class TestClient implements Client { async writeTextFile( @@ -570,7 +583,7 @@ describe("Connection", () => { } async sessionUpdate(_params: SessionNotification): Promise { // Record the session notification - events.push("SessionNotification") + events.push("SessionNotification"); resolveSessionNotification(); } } @@ -592,15 +605,37 @@ describe("Connection", () => { const requestReader = clientToAgent.readable.getReader(); const { value: requestChunk } = await requestReader.read(); requestReader.releaseLock(); - const { id: requestId } = JSON.parse(new TextDecoder().decode(requestChunk)); + const { id: requestId } = JSON.parse( + new TextDecoder().decode(requestChunk), + ); // Write response and notification in quick succession const sessionId = "test-session"; const writer = agentToClient.writable.getWriter(); - await writer.write(new TextEncoder().encode(JSON.stringify( - { jsonrpc: "2.0", id: requestId, result: { sessionId } }) + "\n")); - await writer.write(new TextEncoder().encode(JSON.stringify( - { jsonrpc: "2.0", method: "session/update", params: { sessionId, update: { sessionUpdate: "available_commands_update", availableCommands: [] } } }) + "\n")); + await writer.write( + new TextEncoder().encode( + JSON.stringify({ + jsonrpc: "2.0", + id: requestId, + result: { sessionId }, + }) + "\n", + ), + ); + await writer.write( + new TextEncoder().encode( + JSON.stringify({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId, + update: { + sessionUpdate: "available_commands_update", + availableCommands: [], + }, + }, + }) + "\n", + ), + ); writer.releaseLock(); await newSessionResponse;