Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions src/acp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,174 @@ describe("Connection", () => {
});
});

it("handles requests from inside a request handler without deadlocking", async () => {
let agentConnection: ClientSideConnection | null = null;

class TestClient implements Client {
async writeTextFile(
_: WriteTextFileRequest,
): Promise<WriteTextFileResponse> {
return {};
}
async readTextFile(
_: ReadTextFileRequest,
): Promise<ReadTextFileResponse> {
return { content: "" };
}
async requestPermission(
_: RequestPermissionRequest,
): Promise<RequestPermissionResponse> {
const listResponse = await agentConnection!.listSessions({});
expect(listResponse.sessions).toEqual([]);
return { outcome: { outcome: "selected", optionId: "allow" } };
}
async sessionUpdate(_: SessionNotification): Promise<void> {}
}

class TestAgent implements Agent {
async initialize(_: InitializeRequest): Promise<InitializeResponse> {
return {
protocolVersion: 1,
agentCapabilities: { loadSession: false },
authMethods: [],
};
}
async newSession(_: NewSessionRequest): Promise<NewSessionResponse> {
return { sessionId: "test-session" };
}
async authenticate(_: AuthenticateRequest): Promise<void> {}
async prompt(_: PromptRequest): Promise<PromptResponse> {
return { stopReason: "end_turn" };
}
async cancel(_: CancelNotification): Promise<void> {}
async listSessions(
_: ListSessionsRequest,
): Promise<ListSessionsResponse> {
return { sessions: [] };
}
}

agentConnection = new ClientSideConnection(
() => new TestClient(),
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

const clientConnection = new AgentSideConnection(
() => new TestAgent(),
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

const permissionResponse = await clientConnection.requestPermission({
sessionId: "test-session",
toolCall: {
title: "Execute command",
kind: "execute",
status: "pending",
toolCallId: "tool-123",
content: [
{ type: "content", content: { type: "text", text: "ls -la" } },
],
},
options: [
{ kind: "allow_once", name: "Allow", optionId: "allow" },
{ kind: "reject_once", name: "Reject", optionId: "reject" },
],
});

expect(permissionResponse.outcome.outcome).toBe("selected");
});

it("processes notification after response when both arrive in quick succession", async () => {
const events: string[] = [];
const {
promise: sessionNotification,
resolve: resolveSessionNotification,
} = Promise.withResolvers<void>();

class TestClient implements Client {
async writeTextFile(
_: WriteTextFileRequest,
): Promise<WriteTextFileResponse> {
return {};
}
async readTextFile(
_: ReadTextFileRequest,
): Promise<ReadTextFileResponse> {
return { content: "test" };
}
async requestPermission(
_: RequestPermissionRequest,
): Promise<RequestPermissionResponse> {
return {
outcome: {
outcome: "selected",
optionId: "allow",
},
};
}
async sessionUpdate(_params: SessionNotification): Promise<void> {
// Record the session notification
events.push("SessionNotification");
resolveSessionNotification();
}
}

const connection = new ClientSideConnection(
() => new TestClient(),
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

const newSessionResponse = connection
.newSession({ cwd: "/test", mcpServers: [] })
.then((result) => {
// Record the new session response event
events.push("NewSessionResponse");
return result;
});

// Get the NewSessionRequest ID
const requestReader = clientToAgent.readable.getReader();
const { value: requestChunk } = await requestReader.read();
requestReader.releaseLock();
const { id: requestId } = JSON.parse(
new TextDecoder().decode(requestChunk),
);

// Write response and notification in quick succession
const sessionId = "test-session";
const writer = agentToClient.writable.getWriter();
await writer.write(
new TextEncoder().encode(
JSON.stringify({
jsonrpc: "2.0",
id: requestId,
result: { sessionId },
}) + "\n",
),
);
await writer.write(
new TextEncoder().encode(
JSON.stringify({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId,
update: {
sessionUpdate: "available_commands_update",
availableCommands: [],
},
},
}) + "\n",
),
);
writer.releaseLock();

await newSessionResponse;
await sessionNotification;

expect(events).toEqual(["NewSessionResponse", "SessionNotification"]);
});

it("handles initialize method", async () => {
// Create client
class TestClient implements Client {
Expand Down
6 changes: 3 additions & 3 deletions src/acp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1462,21 +1462,21 @@ class Connection {
}
}

async sendRequest<Req, Resp>(method: string, params?: Req): Promise<Resp> {
sendRequest<Req, Resp>(method: string, params?: Req): Promise<Resp> {
this.throwIfClosed();
const id = this.nextRequestId++;
const responsePromise = new Promise((resolve, reject) => {
this.pendingResponses.set(id, { resolve, reject });
});
// If the transport fails (or receive observes stream closure) during the
// `await sendMessage` below, close() will reject `responsePromise`
// `sendMessage` below, close() will reject `responsePromise`
// before the caller has had a chance to attach its own handler. Node then
// reports a spurious `unhandledRejection`, even though the caller's
// subsequent `await` does observe the rejection. Attach a noop catch so
// the rejection is considered handled at the time it's raised; the
// rejection is still delivered to the caller via `return responsePromise`.
responsePromise.catch(() => {});
await this.sendMessage({ jsonrpc: "2.0", id, method, params });
void this.sendMessage({ jsonrpc: "2.0", id, method, params });
return responsePromise as Promise<Resp>;
}

Expand Down