diff --git a/src/acp.test.ts b/src/acp.test.ts index 54e6e22..99e80b9 100644 --- a/src/acp.test.ts +++ b/src/acp.test.ts @@ -476,6 +476,174 @@ describe("Connection", () => { }); }); + it("handles requests from inside a request handler without deadlocking", async () => { + let agentConnection: ClientSideConnection | null = null; + + class TestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile( + _: ReadTextFileRequest, + ): Promise { + return { content: "" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + const listResponse = await agentConnection!.listSessions({}); + expect(listResponse.sessions).toEqual([]); + return { outcome: { outcome: "selected", optionId: "allow" } }; + } + async sessionUpdate(_: SessionNotification): Promise {} + } + + class TestAgent implements Agent { + async initialize(_: InitializeRequest): Promise { + return { + protocolVersion: 1, + agentCapabilities: { loadSession: false }, + authMethods: [], + }; + } + async newSession(_: NewSessionRequest): Promise { + return { sessionId: "test-session" }; + } + async authenticate(_: AuthenticateRequest): Promise {} + async prompt(_: PromptRequest): Promise { + return { stopReason: "end_turn" }; + } + async cancel(_: CancelNotification): Promise {} + async listSessions( + _: ListSessionsRequest, + ): Promise { + return { sessions: [] }; + } + } + + agentConnection = new ClientSideConnection( + () => new TestClient(), + ndJsonStream(clientToAgent.writable, agentToClient.readable), + ); + + const clientConnection = new AgentSideConnection( + () => new TestAgent(), + ndJsonStream(agentToClient.writable, clientToAgent.readable), + ); + + const permissionResponse = await clientConnection.requestPermission({ + sessionId: "test-session", + toolCall: { + title: "Execute command", + kind: "execute", + status: "pending", + toolCallId: "tool-123", + content: [ + { type: "content", content: { type: "text", text: "ls -la" } }, + ], + }, + options: [ + { kind: "allow_once", name: "Allow", optionId: "allow" }, + { kind: "reject_once", name: "Reject", optionId: "reject" }, + ], + }); + + expect(permissionResponse.outcome.outcome).toBe("selected"); + }); + + it("processes notification after response when both arrive in quick succession", async () => { + const events: string[] = []; + const { + promise: sessionNotification, + resolve: resolveSessionNotification, + } = Promise.withResolvers(); + + class TestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile( + _: ReadTextFileRequest, + ): Promise { + return { content: "test" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + return { + outcome: { + outcome: "selected", + optionId: "allow", + }, + }; + } + async sessionUpdate(_params: SessionNotification): Promise { + // Record the session notification + events.push("SessionNotification"); + resolveSessionNotification(); + } + } + + const connection = new ClientSideConnection( + () => new TestClient(), + ndJsonStream(clientToAgent.writable, agentToClient.readable), + ); + + const newSessionResponse = connection + .newSession({ cwd: "/test", mcpServers: [] }) + .then((result) => { + // Record the new session response event + events.push("NewSessionResponse"); + return result; + }); + + // Get the NewSessionRequest ID + const requestReader = clientToAgent.readable.getReader(); + const { value: requestChunk } = await requestReader.read(); + requestReader.releaseLock(); + const { id: requestId } = JSON.parse( + new TextDecoder().decode(requestChunk), + ); + + // Write response and notification in quick succession + const sessionId = "test-session"; + const writer = agentToClient.writable.getWriter(); + await writer.write( + new TextEncoder().encode( + JSON.stringify({ + jsonrpc: "2.0", + id: requestId, + result: { sessionId }, + }) + "\n", + ), + ); + await writer.write( + new TextEncoder().encode( + JSON.stringify({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId, + update: { + sessionUpdate: "available_commands_update", + availableCommands: [], + }, + }, + }) + "\n", + ), + ); + writer.releaseLock(); + + await newSessionResponse; + await sessionNotification; + + expect(events).toEqual(["NewSessionResponse", "SessionNotification"]); + }); + it("handles initialize method", async () => { // Create client class TestClient implements Client { diff --git a/src/acp.ts b/src/acp.ts index 0693a72..6708b3a 100644 --- a/src/acp.ts +++ b/src/acp.ts @@ -1462,21 +1462,21 @@ class Connection { } } - async sendRequest(method: string, params?: Req): Promise { + sendRequest(method: string, params?: Req): Promise { this.throwIfClosed(); const id = this.nextRequestId++; const responsePromise = new Promise((resolve, reject) => { this.pendingResponses.set(id, { resolve, reject }); }); // If the transport fails (or receive observes stream closure) during the - // `await sendMessage` below, close() will reject `responsePromise` + // `sendMessage` below, close() will reject `responsePromise` // before the caller has had a chance to attach its own handler. Node then // reports a spurious `unhandledRejection`, even though the caller's // subsequent `await` does observe the rejection. Attach a noop catch so // the rejection is considered handled at the time it's raised; the // rejection is still delivered to the caller via `return responsePromise`. responsePromise.catch(() => {}); - await this.sendMessage({ jsonrpc: "2.0", id, method, params }); + void this.sendMessage({ jsonrpc: "2.0", id, method, params }); return responsePromise as Promise; }