Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions src/CodexAcpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ import type {EmbeddedResourceResource} from "@agentclientprotocol/sdk";
import * as acp from "@agentclientprotocol/sdk";
import {type McpServer, RequestError} from "@agentclientprotocol/sdk";
import type {
ApprovalHandler,
CodexAppServerClient,
ElicitationHandler,
McpStartupResult,
SessionHandler,
} from "./CodexAppServerClient";
import open from "open";
import type {Disposable} from "vscode-jsonrpc";
import type {
ClientInfo,
ReasoningEffort,
ServerNotification
} from "./app-server";
import type {JsonValue} from "./app-server/serde_json/JsonValue";
import {ModelId} from "./ModelId";
Expand All @@ -36,6 +33,7 @@ import type {
} from "./app-server/v2";
import packageJson from "../package.json";
import type {AuthenticationLogoutResponse, AuthenticationStatusResponse} from "./AcpExtensions";
import type {Disposable} from "vscode-jsonrpc";

/**
* API for accessing the Codex App Server using ACP requests.
Expand All @@ -49,7 +47,6 @@ export class CodexAcpClient {
private pendingLoginCompleted: Promise<AccountLoginCompletedNotification> | null = null;
private pendingAccountUpdated: Promise<AccountUpdatedNotification> | null = null;


constructor(codexClient: CodexAppServerClient, codexConfig?: JsonObject, modelProvider?: string) {
this.codexClient = codexClient;
this.config = codexConfig ?? {};
Expand Down Expand Up @@ -364,17 +361,6 @@ export class CodexAcpClient {
return ModelId.create(selectedModel.id, reasoningEffort ?? selectedModel.defaultReasoningEffort);
}

async subscribeToSessionEvents(
sessionId: string,
eventHandler: (result: ServerNotification) => void,
approvalHandler: ApprovalHandler,
elicitationHandler: ElicitationHandler
) {
this.codexClient.onServerNotification(sessionId, eventHandler);
this.codexClient.onApprovalRequest(sessionId, approvalHandler);
this.codexClient.onElicitationRequest(sessionId, elicitationHandler);
}

async sendPrompt(
request: acp.PromptRequest,
agentMode: AgentMode,
Expand Down Expand Up @@ -402,7 +388,13 @@ export class CodexAcpClient {

// Wait for turn completion
// If turnInterrupt() was called, Codex will send turn/completed event with status "interrupted"
return await this.codexClient.awaitTurnCompleted();
const turnCompleted = await this.codexClient.awaitTurnCompleted();
await this.codexClient.awaitSessionIdle(request.sessionId);
return turnCompleted;
}

subscribeSession(sessionId: string, handler: SessionHandler): void {
this.codexClient.subscribeSession(sessionId, handler);
}

async listSkills(params?: SkillsListParams): Promise<SkillsListResponse> {
Expand Down
16 changes: 3 additions & 13 deletions src/CodexAcpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import {
type SessionModeState
} from "@agentclientprotocol/sdk";
import {CodexEventHandler} from "./CodexEventHandler";
import {CodexApprovalHandler} from "./CodexApprovalHandler";
import {CodexElicitationHandler} from "./CodexElicitationHandler";
import {getCodexAuthMethods, type CodexAuthRequest} from "./CodexAuthMethod";
import {CodexAcpClient, type SessionMetadata, type SessionMetadataWithThread} from "./CodexAcpClient";
import type {McpStartupResult} from "./CodexAppServerClient";
Expand Down Expand Up @@ -717,16 +715,8 @@ export class CodexAcpServer implements acp.Agent {
sessionState.lastTokenUsage = null;

try {
const eventHandler = new CodexEventHandler(this.connection, sessionState);
const approvalHandler = new CodexApprovalHandler(this.connection, sessionState);
const elicitationHandler = new CodexElicitationHandler(this.connection, sessionState);
await this.codexAcpClient.subscribeToSessionEvents(params.sessionId,
(event) => {
elicitationHandler.handleNotification(event);
return eventHandler.handleNotification(event);
},
approvalHandler,
elicitationHandler);
const sessionHandler = new CodexEventHandler(this.connection, sessionState);
this.codexAcpClient.subscribeSession(params.sessionId, sessionHandler);

if (await this.availableCommands.tryHandle(params.prompt, sessionState)) {
logger.log("Prompt handled by a command");
Expand Down Expand Up @@ -775,7 +765,7 @@ export class CodexAcpServer implements acp.Agent {
};
}

const error = eventHandler.getFailure()
const error = sessionHandler.getFailure()
if (error) {
// noinspection ExceptionCaughtLocallyJS
throw error;
Expand Down
90 changes: 63 additions & 27 deletions src/CodexAppServerClient.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type {Disposable} from "vscode-jsonrpc";
import {type MessageConnection, RequestType} from "vscode-jsonrpc/node";
import type {
ClientRequest,
Expand Down Expand Up @@ -44,15 +45,14 @@ import type {
FileChangeRequestApprovalParams,
FileChangeRequestApprovalResponse,
} from "./app-server/v2";
import { logger } from "./Logger";

export interface ApprovalHandler {
export type SessionHandler = {
handleNotification(notification: ServerNotification): Promise<void>;
handleCommandExecution(params: CommandExecutionRequestApprovalParams): Promise<CommandExecutionRequestApprovalResponse>;
handleFileChange(params: FileChangeRequestApprovalParams): Promise<FileChangeRequestApprovalResponse>;
}

export interface ElicitationHandler {
handleElicitation(params: McpServerElicitationRequestParams): Promise<McpServerElicitationRequestResponse>;
}
};

export type McpStartupFailure = {
server: string;
Expand Down Expand Up @@ -89,8 +89,7 @@ const McpServerElicitationRequest = new RequestType<
*/
export class CodexAppServerClient {
readonly connection: MessageConnection;
private approvalHandlers = new Map<string, ApprovalHandler>();
private elicitationHandlers = new Map<string, ElicitationHandler>();
private readonly sessionHandlers = new Map<string, SessionHandlerState>();
private mcpServerStartupVersion = 0;
private readonly mcpServerStartupStates = new Map<string, McpServerStartupSnapshot>();
private readonly mcpServerStartupResolvers: Array<McpServerStartupResolver> = [];
Expand All @@ -108,43 +107,51 @@ export class CodexAppServerClient {
});
this.resolveMcpServerStartupResolvers();
}
this.notify(serverNotification);
this.enqueueSessionNotification(serverNotification);
for (const callback of this.codexEventHandlers) {
callback({ eventType: "notification", ...serverNotification });
}
});

this.connection.onRequest(CommandExecutionApprovalRequest, async (params) => {
const handler = this.approvalHandlers.get(params.threadId);
const handler = this.sessionHandlers.get(params.threadId)?.handler;
if (!handler) {
return { decision: "cancel" };
}
return await handler.handleCommandExecution(params);
});

this.connection.onRequest(FileChangeApprovalRequest, async (params) => {
const handler = this.approvalHandlers.get(params.threadId);
const handler = this.sessionHandlers.get(params.threadId)?.handler;
if (!handler) {
return { decision: "cancel" };
}
return await handler.handleFileChange(params);
});

this.connection.onRequest(McpServerElicitationRequest, async (params) => {
const handler = this.elicitationHandlers.get(params.threadId);
const handler = this.sessionHandlers.get(params.threadId)?.handler;
if (!handler) {
return { action: "cancel", content: null, _meta: null };
}
return await handler.handleElicitation(params);
});
}

onApprovalRequest(threadId: string, handler: ApprovalHandler): void {
this.approvalHandlers.set(threadId, handler);
}

onElicitationRequest(threadId: string, handler: ElicitationHandler): void {
this.elicitationHandlers.set(threadId, handler);
subscribeSession(threadId: string, handler: SessionHandler): Disposable {
const previousState = this.sessionHandlers.get(threadId);
const state: SessionHandlerState = {
handler,
pending: previousState?.pending ?? Promise.resolve(),
};
this.sessionHandlers.set(threadId, state);
return {
dispose: () => {
if (this.sessionHandlers.get(threadId) === state) {
this.sessionHandlers.delete(threadId);
}
}
};
}

async initialize(params: InitializeParams): Promise<InitializeResponse> {
Expand Down Expand Up @@ -240,24 +247,38 @@ export class CodexAppServerClient {
return await this.sendRequest({ method: "skills/list", params });
}

/**
* Registers a notification handler for a specific session.
* Replaces any existing handler for the same session, preventing handler accumulation.
*/
onServerNotification(sessionId: string, callback: (event: ServerNotification) => void) {
this.notificationHandlers.set(sessionId, callback);
async awaitSessionIdle(threadId: string): Promise<void> {
await (this.sessionHandlers.get(threadId)?.pending ?? Promise.resolve());
}

private codexEventHandlers: Array<(event: CodexConnectionEvent) => void> = [];
onClientTransportEvent(callback: (event: CodexConnectionEvent) => void){
this.codexEventHandlers.push(callback);
}

private notificationHandlers = new Map<string, (event: ServerNotification) => void>();
private notify(notification: ServerNotification) {
for (const notificationHandler of this.notificationHandlers.values()) {
notificationHandler(notification);
private enqueueSessionNotification(notification: ServerNotification): void {
const threadId = getNotificationThreadId(notification);
if (threadId !== null && this.sessionHandlers.has(threadId)) {
this.enqueueNotificationForSession(threadId, notification);
return;
}

for (const sessionId of this.sessionHandlers.keys()) {
this.enqueueNotificationForSession(sessionId, notification);
}
}

private enqueueNotificationForSession(threadId: string, notification: ServerNotification): void {
const state = this.sessionHandlers.get(threadId);
if (!state) {
return;
}

state.pending = state.pending
.then(() => state.handler.handleNotification(notification))
.catch((error) => {
logger.error("Error handling server notification", error);
});
}

private resolveMcpServerStartupResolvers(): void {
Expand Down Expand Up @@ -330,6 +351,21 @@ export type CodexConnectionEvent =

type CodexRequest = DistributiveOmit<ClientRequest, "id">

type SessionHandlerState = {
handler: SessionHandler;
pending: Promise<void>;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify why do we need state management to handle notifications instead of calling "handler" when event arrives?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the per-session pending promise because notification handling is async: handleNotification() awaits createUpdateEvent() and session.update(), and file-change events may read files to reconstruct ACP diffs. The stored promise chain preserves event order and lets awaitSessionIdle() wait for all pending ACP updates before prompt completion.

};

function getNotificationThreadId(notification: ServerNotification): string | null {
const params = notification.params;
if (!params || typeof params !== "object") {
return null;
}

const threadId = (params as Record<string, unknown>)["threadId"];
return typeof threadId === "string" ? threadId : null;
}

type DistributiveOmit<T, K extends keyof any> = T extends any
? Omit<T, K>
: never;
Expand Down
Loading
Loading