From 690b95bba11e0615301cd2f335f342e514b09ca3 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Thu, 25 Jun 2026 20:34:07 +0100 Subject: [PATCH 01/12] Project selected plugin runtime by environment availability --- codex-rs/Cargo.lock | 1 + codex-rs/Cargo.toml | 1 + .../src/request_processors/mcp_processor.rs | 23 +- .../request_processors/thread_processor.rs | 1 - codex-rs/core/src/codex_thread.rs | 20 +- codex-rs/core/src/mcp.rs | 45 +++- codex-rs/core/src/prompt_debug.rs | 4 +- codex-rs/core/src/session/mcp.rs | 196 ++++++++++++++---- codex-rs/core/src/session/mcp_runtime.rs | 8 + codex-rs/core/src/session/mod.rs | 19 +- codex-rs/core/src/session/session.rs | 10 +- codex-rs/core/src/session/tests.rs | 3 + codex-rs/core/src/session/turn.rs | 27 +-- codex-rs/core/src/state/service.rs | 14 +- codex-rs/core/src/thread_manager_tests.rs | 24 ++- .../ext/extension-api/src/contributors/mcp.rs | 47 ++++- codex-rs/ext/mcp/Cargo.toml | 1 + codex-rs/ext/mcp/src/executor_plugin.rs | 148 +++++++++---- codex-rs/ext/mcp/src/lib.rs | 7 - codex-rs/ext/mcp/tests/executor_plugin_mcp.rs | 12 +- 20 files changed, 452 insertions(+), 159 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index b147022825e4..75c02adfb500 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -3359,6 +3359,7 @@ name = "codex-mcp-extension" version = "0.0.0" dependencies = [ "codex-config", + "codex-connectors-extension", "codex-core", "codex-core-plugins", "codex-exec-server", diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index ef17cb9ce5f1..0824f7db7b7f 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -166,6 +166,7 @@ codex-code-mode-protocol = { path = "code-mode-protocol" } codex-home = { path = "codex-home" } codex-config = { path = "config" } codex-connectors = { path = "connectors" } +codex-connectors-extension = { path = "ext/connectors" } codex-context-fragments = { path = "context-fragments" } codex-core = { path = "core" } codex-core-api = { path = "core-api" } diff --git a/codex-rs/app-server/src/request_processors/mcp_processor.rs b/codex-rs/app-server/src/request_processors/mcp_processor.rs index abefd260153b..ac2125c2cfcd 100644 --- a/codex-rs/app-server/src/request_processors/mcp_processor.rs +++ b/codex-rs/app-server/src/request_processors/mcp_processor.rs @@ -124,7 +124,7 @@ impl McpRequestProcessor { let (mcp_config, runtime_context) = match thread_id.as_deref() { Some(thread_id) => { let (_, thread) = self.load_thread(thread_id).await?; - let runtime = thread.current_mcp_runtime(); + let runtime = thread.current_mcp_runtime().await; (runtime.config().clone(), runtime.runtime_context().clone()) } None => { @@ -246,14 +246,21 @@ impl McpRequestProcessor { let mcp_manager = self.thread_manager.mcp_manager(); let codex_apps_tools_cache = mcp_manager.codex_apps_tools_cache(); let auth = self.auth_manager.auth().await; - let mcp_config = match thread { - Some(thread) => thread.runtime_mcp_config(&config).await, - None => mcp_manager.runtime_config(&config).await, + let (mcp_config, runtime_context) = match thread { + Some(thread) => { + let mcp_config = thread.runtime_mcp_config(&config).await; + let runtime = thread.current_mcp_runtime().await; + (mcp_config, runtime.runtime_context().clone()) + } + None => { + let mcp_config = mcp_manager.runtime_config(&config).await; + let runtime_context = McpRuntimeContext::new( + self.thread_manager.environment_manager(), + config.cwd.to_path_buf(), + ); + (mcp_config, runtime_context) + } }; - let runtime_context = McpRuntimeContext::new( - self.thread_manager.environment_manager(), - config.cwd.to_path_buf(), - ); tokio::spawn(async move { Self::list_mcp_server_status_task( diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 0fc37d1a4fbf..39283bb35ca2 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -1155,7 +1155,6 @@ impl ThreadRequestProcessor { let mut thread_extension_init = ExtensionDataInit::new(); if !selected_capability_roots.is_empty() { thread_extension_init.insert(selected_capability_roots); - codex_mcp_extension::initialize_executor_plugin_thread_data(&mut thread_extension_init); } let create_thread_started_at = std::time::Instant::now(); let NewThread { diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 46b38af151ca..90555db9119e 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -595,8 +595,14 @@ impl CodexThread { } /// Returns the exact MCP config, environment bindings, and manager most recently published. - pub fn current_mcp_runtime(&self) -> Arc { - self.codex.session.services.latest_mcp_runtime() + pub async fn current_mcp_runtime(&self) -> Arc { + let turn_context = self.codex.session.new_default_turn().await; + self.codex + .session + .capture_step_context(turn_context) + .await + .mcp + .clone() } pub fn multi_agent_version(&self) -> Option { @@ -620,8 +626,9 @@ impl CodexThread { uri: &str, ) -> anyhow::Result { let result = self - .codex - .session + .current_mcp_runtime() + .await + .manager_arc() .read_resource(server, ReadResourceRequestParams::new(uri)) .await?; @@ -635,8 +642,9 @@ impl CodexThread { arguments: Option, meta: Option, ) -> anyhow::Result { - self.codex - .session + self.current_mcp_runtime() + .await + .manager_arc() .call_tool(server, tool, arguments, meta) .await } diff --git a/codex-rs/core/src/mcp.rs b/codex-rs/core/src/mcp.rs index 658c5471eed7..45ca0d563d24 100644 --- a/codex-rs/core/src/mcp.rs +++ b/codex-rs/core/src/mcp.rs @@ -3,7 +3,10 @@ use std::sync::Arc; use crate::config::Config; use codex_config::McpServerConfig; +use codex_connectors::ConnectorSnapshot; +use codex_connectors::PluginConnectorSource; use codex_core_plugins::PluginsManager; +use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionDataInit; use codex_extension_api::ExtensionRegistry; use codex_extension_api::McpServerContribution; @@ -18,6 +21,7 @@ use codex_mcp::McpServerRegistration; use codex_mcp::codex_apps_mcp_server_config; use codex_mcp::configured_mcp_servers; use codex_mcp::effective_mcp_servers; +use codex_plugin::AppConnectorId; const LEGACY_CODEX_APPS_REGISTRATION_ID: &str = "legacy_codex_apps"; @@ -69,28 +73,32 @@ impl McpManager { /// Returns the MCP config after applying compatibility built-ins and /// runtime-only extension overlays. pub async fn runtime_config(&self, config: &Config) -> McpConfig { - self.runtime_config_with_context(config, /*thread_init*/ None) + self.runtime_config_with_context(McpServerContributionContext::global(config)) .await } - pub(crate) async fn runtime_config_for_thread( + pub(crate) async fn runtime_config_for_step( &self, config: &Config, thread_init: &ExtensionDataInit, + thread_store: &ExtensionData, + available_environment_ids: &[String], ) -> McpConfig { - self.runtime_config_with_context(config, Some(thread_init)) - .await + self.runtime_config_with_context(McpServerContributionContext::for_step( + config, + thread_init, + thread_store, + available_environment_ids, + )) + .await } async fn runtime_config_with_context( &self, - config: &Config, - thread_init: Option<&ExtensionDataInit>, + context: McpServerContributionContext<'_, Config>, ) -> McpConfig { - let context = match thread_init { - Some(thread_init) => McpServerContributionContext::for_thread(config, thread_init), - None => McpServerContributionContext::global(config), - }; + let config = context.config(); + let mut selected_plugin_connector_sources = Vec::new(); let mut selected_plugin_registrations = Vec::new(); let mut overlays = Vec::new(); // A contributor can emit multiple ordered actions, so order each action globally rather @@ -121,6 +129,17 @@ impl McpManager { *config, ), ), + McpServerContribution::SelectedPluginConnectors { + plugin_id, + plugin_display_name, + connector_ids, + } => selected_plugin_connector_sources.push( + PluginConnectorSource::from_connector_ids( + plugin_id, + plugin_display_name, + connector_ids.into_iter().map(AppConnectorId), + ), + ), McpServerContribution::Remove { name } => { overlays.push(OrderedMcpOverlay::Remove { contributor_id: contributor.id(), @@ -186,6 +205,12 @@ impl McpManager { ); } mcp_config.mcp_server_catalog = catalog; + mcp_config.connector_snapshot = + mcp_config + .connector_snapshot + .merged_with(&ConnectorSnapshot::from_plugin_sources( + selected_plugin_connector_sources, + )); mcp_config } diff --git a/codex-rs/core/src/prompt_debug.rs b/codex-rs/core/src/prompt_debug.rs index a866116a2484..cac9bb41ae0d 100644 --- a/codex-rs/core/src/prompt_debug.rs +++ b/codex-rs/core/src/prompt_debug.rs @@ -64,7 +64,7 @@ pub async fn build_prompt_input( ); let thread = thread_manager.start_thread(config).await?; - let output = build_prompt_input_from_session(thread.thread.codex.session.as_ref(), input).await; + let output = build_prompt_input_from_session(&thread.thread.codex.session, input).await; let shutdown = thread.thread.shutdown_and_wait().await; let _removed = thread_manager.remove_thread(&thread.thread_id).await; @@ -73,7 +73,7 @@ pub async fn build_prompt_input( } pub(crate) async fn build_prompt_input_from_session( - sess: &Session, + sess: &Arc, input: Vec, ) -> CodexResult> { let turn_context = sess.new_default_turn().await; diff --git a/codex-rs/core/src/session/mcp.rs b/codex-rs/core/src/session/mcp.rs index b9e87f0c8995..50921f1591b6 100644 --- a/codex-rs/core/src/session/mcp.rs +++ b/codex-rs/core/src/session/mcp.rs @@ -1,7 +1,9 @@ use super::*; +use codex_exec_server::ResolvedSelectedCapabilityRoot; use codex_mcp::ElicitationReviewRequest; use codex_mcp::ElicitationReviewer; use codex_mcp::ElicitationReviewerHandle; +use codex_protocol::capabilities::CapabilityRootLocation; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::mcp_approval_meta::APPROVAL_KIND_KEY as MCP_ELICITATION_APPROVAL_KIND_KEY; use codex_protocol::mcp_approval_meta::APPROVAL_KIND_MCP_TOOL_CALL as MCP_ELICITATION_APPROVAL_KIND_MCP_TOOL_CALL; @@ -75,9 +77,20 @@ impl ElicitationReviewer for GuardianMcpElicitationReviewer { impl Session { pub(crate) async fn runtime_mcp_config(&self, config: &Config) -> McpConfig { + let environments = self.services.turn_environments.snapshot().await; + let selected_capability_roots = self + .resolve_selected_capability_roots_for_step(&environments) + .await; + let available_environment_ids = + Self::available_selected_environment_ids(&selected_capability_roots); self.services .mcp_manager - .runtime_config_for_thread(config, &self.services.mcp_thread_init) + .runtime_config_for_step( + config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) .await } @@ -88,6 +101,62 @@ impl Session { codex_mcp::configured_mcp_servers(&self.runtime_mcp_config(config).await) } + #[expect( + clippy::await_holding_invalid_type, + reason = "MCP runtime comparison and publication must remain serialized" + )] + pub(crate) async fn mcp_runtime_for_step( + self: &Arc, + turn_context: &TurnContext, + environments: &TurnEnvironmentSnapshot, + selected_capability_roots: &[ResolvedSelectedCapabilityRoot], + ) -> Arc { + let available_environment_ids = + Self::available_selected_environment_ids(selected_capability_roots); + let current = self.services.latest_mcp_runtime(); + if current.available_environment_ids() == available_environment_ids { + return current; + } + + let _guard = self.services.mcp_projection_lock.lock().await; + let current = self.services.latest_mcp_runtime(); + if current.available_environment_ids() == available_environment_ids { + return current; + } + let mcp_config = self + .services + .mcp_manager + .runtime_config_for_step( + &turn_context.config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) + .await; + self.refresh_mcp_servers_inner( + turn_context, + mcp_config, + environments, + &available_environment_ids, + Some(self.mcp_elicitation_reviewer()), + ) + .await + } + + pub(crate) async fn resolve_selected_capability_roots_for_step( + &self, + environments: &TurnEnvironmentSnapshot, + ) -> Vec { + self.services + .turn_environments + .environment_manager() + .resolve_selected_capability_roots( + &self.services.selected_capability_roots, + &environments.captured_environments(), + ) + .await + } + pub(crate) fn mcp_elicitation_reviewer(self: &Arc) -> ElicitationReviewerHandle { Arc::new(GuardianMcpElicitationReviewer::new(self)) } @@ -207,51 +276,22 @@ impl Session { .await } - pub async fn read_resource( - &self, - server: &str, - params: ReadResourceRequestParams, - ) -> anyhow::Result { - self.services - .latest_mcp_runtime() - .manager_arc() - .read_resource(server, params) - .await - } - - pub async fn call_tool( - &self, - server: &str, - tool: &str, - arguments: Option, - meta: Option, - ) -> anyhow::Result { - self.services - .latest_mcp_runtime() - .manager_arc() - .call_tool(server, tool, arguments, meta) - .await - } - async fn refresh_mcp_servers_inner( &self, turn_context: &TurnContext, - mut mcp_config: McpConfig, - configured_mcp_servers: HashMap, + mcp_config: McpConfig, + environments: &TurnEnvironmentSnapshot, + available_environment_ids: &[String], elicitation_reviewer: Option, - ) { - mcp_config.mcp_server_catalog = mcp_config - .mcp_server_catalog - .with_materialized_servers(configured_mcp_servers); - let mcp_config = Arc::new(mcp_config); + ) -> Arc { let auth = self.services.auth_manager.auth().await; + let mcp_config = Arc::new(mcp_config); let tool_plugin_provenance = codex_mcp::tool_plugin_provenance(&mcp_config); let mcp_servers = effective_mcp_servers(&mcp_config, auth.as_ref()); let environment_manager = self.services.turn_environments.environment_manager(); // TODO(anp): Migrate MCP runtime cwd plumbing to PathUri so foreign environment cwd // values can be used without falling back to the legacy host cwd. - let cwd = turn_context - .environments + let cwd = environments .primary() .and_then(|turn_environment| turn_environment.cwd().to_abs_path().ok()) .map(|cwd| cwd.to_path_buf()) @@ -305,10 +345,18 @@ impl Session { refreshed_manager .set_elicitations_auto_deny(current_manager.manager().elicitations_auto_deny()); } - self.services - .publish_mcp_runtime(mcp_config, mcp_runtime_context, refreshed_manager); + self.services.publish_mcp_runtime( + mcp_config, + mcp_runtime_context, + available_environment_ids.to_vec(), + refreshed_manager, + ) } + #[expect( + clippy::await_holding_invalid_type, + reason = "MCP runtime refresh and publication must remain serialized" + )] pub(crate) async fn refresh_mcp_servers_if_requested( &self, turn_context: &TurnContext, @@ -365,9 +413,33 @@ impl Session { return; } - let mcp_config = self.runtime_mcp_config(&refresh_config).await; - self.refresh_mcp_servers_inner(turn_context, mcp_config, mcp_servers, elicitation_reviewer) + let _guard = self.services.mcp_projection_lock.lock().await; + let available_environment_ids = self + .services + .latest_mcp_runtime() + .available_environment_ids() + .to_vec(); + let mut mcp_config = self + .services + .mcp_manager + .runtime_config_for_step( + &refresh_config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) .await; + mcp_config.mcp_server_catalog = mcp_config + .mcp_server_catalog + .with_materialized_servers(mcp_servers); + self.refresh_mcp_servers_inner( + turn_context, + mcp_config, + &turn_context.environments, + &available_environment_ids, + elicitation_reviewer, + ) + .await; } pub(crate) async fn set_openai_form_elicitation_support( @@ -398,16 +470,54 @@ impl Session { Ok(()) } + #[expect( + clippy::await_holding_invalid_type, + reason = "MCP runtime refresh and publication must remain serialized" + )] pub(crate) async fn refresh_mcp_servers_now( &self, turn_context: &TurnContext, refresh_config: &Config, elicitation_reviewer: Option, ) { - let mcp_config = self.runtime_mcp_config(refresh_config).await; - let mcp_servers = codex_mcp::configured_mcp_servers(&mcp_config); - self.refresh_mcp_servers_inner(turn_context, mcp_config, mcp_servers, elicitation_reviewer) + let _guard = self.services.mcp_projection_lock.lock().await; + let available_environment_ids = self + .services + .latest_mcp_runtime() + .available_environment_ids() + .to_vec(); + let mcp_config = self + .services + .mcp_manager + .runtime_config_for_step( + refresh_config, + &self.services.mcp_thread_init, + &self.services.thread_extension_data, + &available_environment_ids, + ) .await; + self.refresh_mcp_servers_inner( + turn_context, + mcp_config, + &turn_context.environments, + &available_environment_ids, + elicitation_reviewer, + ) + .await; + } + + fn available_selected_environment_ids( + selected_capability_roots: &[ResolvedSelectedCapabilityRoot], + ) -> Vec { + let mut available = Vec::new(); + for root in selected_capability_roots { + let CapabilityRootLocation::Environment { environment_id, .. } = + &root.selected_root().location; + if !available.contains(environment_id) { + available.push(environment_id.clone()); + } + } + available } #[cfg(test)] diff --git a/codex-rs/core/src/session/mcp_runtime.rs b/codex-rs/core/src/session/mcp_runtime.rs index dc5ea011ca32..34cf07cc35a8 100644 --- a/codex-rs/core/src/session/mcp_runtime.rs +++ b/codex-rs/core/src/session/mcp_runtime.rs @@ -10,6 +10,7 @@ pub struct McpRuntimeSnapshot { config: Arc, manager: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, } impl McpRuntimeSnapshot { @@ -17,11 +18,13 @@ impl McpRuntimeSnapshot { config: Arc, manager: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, ) -> Self { Self { config, manager, runtime_context, + available_environment_ids, } } @@ -41,6 +44,10 @@ impl McpRuntimeSnapshot { &self.runtime_context } + pub(crate) fn available_environment_ids(&self) -> &[String] { + &self.available_environment_ids + } + #[cfg(test)] pub(crate) fn new_uninitialized_for_test(config: &crate::config::Config) -> Arc { use codex_exec_server::EnvironmentManager; @@ -81,6 +88,7 @@ impl McpRuntimeSnapshot { Arc::new(mcp_config), Arc::new(manager), runtime_context, + Vec::new(), )) } } diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 9421e07a6615..0008257542f3 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -100,7 +100,6 @@ use codex_protocol::dynamic_tools::DynamicToolResponse; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::items::TurnItem; use codex_protocol::items::UserMessageItem; -use codex_protocol::mcp::CallToolResult; use codex_protocol::models::ActivePermissionProfile; use codex_protocol::models::AdditionalPermissionProfile; use codex_protocol::models::BaseInstructions; @@ -158,8 +157,6 @@ use codex_utils_path_uri::PathUri; use futures::future::BoxFuture; use futures::future::Shared; use futures::prelude::*; -use rmcp::model::ReadResourceRequestParams; -use rmcp::model::ReadResourceResult; use rmcp::model::RequestId; use serde_json::Value; use tokio::sync::Mutex; @@ -2825,7 +2822,7 @@ impl Session { /// `run_turn` and pass the result down; standalone request or history boundaries may capture /// their own step. pub(crate) async fn capture_step_context( - &self, + self: &Arc, turn_context: Arc, ) -> Arc { let deferred_executor_enabled = turn_context @@ -2846,15 +2843,15 @@ impl Session { } let loaded_agents_md = self.services.agents_md_manager.get_loaded().await; let selected_capability_roots = self - .services - .turn_environments - .environment_manager() - .resolve_selected_capability_roots( - &self.services.selected_capability_roots, - &environments.captured_environments(), + .resolve_selected_capability_roots_for_step(&environments) + .await; + let mcp = self + .mcp_runtime_for_step( + turn_context.as_ref(), + &environments, + &selected_capability_roots, ) .await; - let mcp = self.services.latest_mcp_runtime(); Arc::new(StepContext::new( turn_context, environments, diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 2c9e57036df8..0e1609207a78 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -661,6 +661,7 @@ impl Session { let config_for_mcp = Arc::clone(&config); let mcp_manager_for_mcp = Arc::clone(&mcp_manager); let mcp_thread_init_for_startup = &mcp_thread_init; + let thread_extension_data_for_mcp = &thread_extension_data; let mcp_runtime_cwd = session_configuration .environment_selections() .first() @@ -673,7 +674,12 @@ impl Session { let auth_and_mcp_fut = async move { let auth = auth_manager_clone.auth().await; let mcp_config = mcp_manager_for_mcp - .runtime_config_for_thread(&config_for_mcp, mcp_thread_init_for_startup) + .runtime_config_for_step( + &config_for_mcp, + mcp_thread_init_for_startup, + thread_extension_data_for_mcp, + /*available_environment_ids*/ &[], + ) .await; let mcp_servers = codex_mcp::effective_mcp_servers(&mcp_config, auth.as_ref()); let tool_plugin_provenance = codex_mcp::tool_plugin_provenance(&mcp_config); @@ -1039,6 +1045,7 @@ impl Session { // setup is straightforward enough and performs well. mcp_connection_manager, mcp_runtime: arc_swap::ArcSwapOption::empty(), + mcp_projection_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -1210,6 +1217,7 @@ impl Session { .install_mcp_connection_manager( Arc::new(mcp_config), mcp_runtime_context, + /*available_environment_ids*/ Vec::new(), mcp_connection_manager, ) .await?; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index d0aac0424942..57f1ce9081a6 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -5372,6 +5372,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { let services = SessionServices { mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from(mcp_runtime.manager_arc())), mcp_runtime: arc_swap::ArcSwapOption::from(Some(mcp_runtime)), + mcp_projection_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -7446,6 +7447,7 @@ where let services = SessionServices { mcp_connection_manager: Arc::new(arc_swap::ArcSwap::from(mcp_runtime.manager_arc())), mcp_runtime: arc_swap::ArcSwapOption::from(Some(mcp_runtime)), + mcp_projection_lock: Mutex::new(()), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -7603,6 +7605,7 @@ pub(crate) async fn make_session_and_context_with_rx() -> ( #[tokio::test] async fn refresh_mcp_servers_keeps_the_previous_runtime_alive() { let (session, turn_context) = make_session_and_context().await; + let session = Arc::new(session); let turn_context = Arc::new(turn_context); let old_runtime = session.services.latest_mcp_runtime(); let step_context = session diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 2cb1f2cada9c..7b509ae42e37 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -171,8 +171,13 @@ pub(crate) async fn run_turn( .record_context_updates_and_set_reference_context_item(first_step_context.as_ref()) .await; - let Some((injection_items, explicitly_enabled_connectors)) = - build_skills_and_plugins(&sess, turn_context.as_ref(), &input, &cancellation_token).await + let Some((injection_items, explicitly_enabled_connectors)) = build_skills_and_plugins( + &sess, + first_step_context.as_ref(), + &input, + &cancellation_token, + ) + .await else { return Ok(None); }; @@ -504,10 +509,11 @@ async fn run_hooks_and_record_inputs( #[instrument(level = "trace", skip_all)] async fn build_skills_and_plugins( sess: &Arc, - turn_context: &TurnContext, + step_context: &StepContext, input: &[TurnInput], cancellation_token: &CancellationToken, ) -> Option<(Vec, HashSet)> { + let turn_context = step_context.turn.as_ref(); // Guardian input embeds the parent transcript as untrusted evidence. Do not interpret skill or // plugin mentions from that generated prompt as requests to inject additional instructions. if crate::guardian::is_guardian_reviewer_source(&turn_context.session_source) { @@ -538,17 +544,14 @@ async fn build_skills_and_plugins( // enabled plugins, then converted into turn-scoped guidance below. let mentioned_plugins = collect_explicit_plugin_mentions(&user_input, loaded_plugins.capability_summaries()); - let connector_snapshot = codex_connectors::ConnectorSnapshot::from_plugin_capability_summaries( - loaded_plugins.capability_summaries(), - ); + let connector_snapshot = step_context.mcp.config().connector_snapshot.clone(); let mcp_tools = if turn_context.apps_enabled() || !mentioned_plugins.is_empty() { // Plugin mentions need raw MCP/app inventory even when app tools // are normally hidden so we can describe the plugin's currently // usable capabilities for this turn. - match sess - .services - .mcp_connection_manager - .load_full() + match step_context + .mcp + .manager_arc() .list_all_tools() .or_cancel(cancellation_token) .await @@ -1189,9 +1192,7 @@ pub(crate) async fn built_tools( .plugins_for_config(&turn_context.config.plugins_config_input()) .instrument(trace_span!("built_tools.load_plugins")) .await; - let connector_snapshot = codex_connectors::ConnectorSnapshot::from_plugin_capability_summaries( - loaded_plugins.capability_summaries(), - ); + let connector_snapshot = step_context.mcp.config().connector_snapshot.clone(); let apps_enabled = turn_context.apps_enabled(); let accessible_connectors = diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 1bee45cce010..a611b94887b8 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -51,6 +51,8 @@ pub(crate) struct SessionServices { pub(crate) mcp_connection_manager: Arc>, /// The latest atomically published MCP config and manager pair. pub(crate) mcp_runtime: ArcSwapOption, + /// Serializes environment-driven runtime rebuilds. + pub(crate) mcp_projection_lock: Mutex<()>, pub(crate) mcp_startup_cancellation_token: Mutex, pub(crate) unified_exec_manager: UnifiedExecProcessManager, #[cfg_attr(not(unix), allow(dead_code))] @@ -106,9 +108,11 @@ impl SessionServices { &self, config: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, manager: McpConnectionManager, ) -> Result<()> { - let runtime = self.publish_mcp_runtime(config, runtime_context, manager); + let runtime = + self.publish_mcp_runtime(config, runtime_context, available_environment_ids, manager); runtime.manager().validate_required_servers().await } @@ -116,13 +120,19 @@ impl SessionServices { &self, config: Arc, runtime_context: McpRuntimeContext, + available_environment_ids: Vec, manager: McpConnectionManager, ) -> Arc { let manager = Arc::new(manager); // Publish the manager for legacy resource clients first. Once the paired snapshot is // visible, every model-scoped consumer observes this exact manager. self.mcp_connection_manager.store(Arc::clone(&manager)); - let runtime = Arc::new(McpRuntimeSnapshot::new(config, manager, runtime_context)); + let runtime = Arc::new(McpRuntimeSnapshot::new( + config, + manager, + runtime_context, + available_environment_ids, + )); self.mcp_runtime.store(Some(Arc::clone(&runtime))); runtime } diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 9bccf8b6c13e..5148f1625962 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -584,8 +584,28 @@ async fn start_thread_seeds_extension_data_for_mcp_and_lifecycle_contributors() }) .await .expect("start second thread"); - let first_resolved = first_thread.thread.runtime_mcp_config(&config).await; - let second_resolved = second_thread.thread.runtime_mcp_config(&config).await; + let first_session = &first_thread.thread.codex.session; + let first_resolved = first_session + .services + .mcp_manager + .runtime_config_for_step( + &config, + &first_session.services.mcp_thread_init, + &first_session.services.thread_extension_data, + /*available_environment_ids*/ &[], + ) + .await; + let second_session = &second_thread.thread.codex.session; + let second_resolved = second_session + .services + .mcp_manager + .runtime_config_for_step( + &config, + &second_session.services.mcp_thread_init, + &second_session.services.thread_extension_data, + /*available_environment_ids*/ &[], + ) + .await; assert_eq!( *lifecycle_observed diff --git a/codex-rs/ext/extension-api/src/contributors/mcp.rs b/codex-rs/ext/extension-api/src/contributors/mcp.rs index 91e9cb459cca..499657f55662 100644 --- a/codex-rs/ext/extension-api/src/contributors/mcp.rs +++ b/codex-rs/ext/extension-api/src/contributors/mcp.rs @@ -1,17 +1,22 @@ use codex_config::McpServerConfig; +use crate::ExtensionData; use crate::ExtensionDataInit; /// Input supplied while resolving MCP server contributions. /// -/// Thread-scoped implementations can read the immutable host-seeded inputs -/// through [`Self::thread_init`]. Implementations should not retain borrowed -/// context after contribution completes. +/// Thread-scoped implementations can read stable host inputs through [`Self::thread_init`] and +/// keep their cache in [`Self::thread_store`]. Implementations should not retain borrowed context +/// after contribution completes. pub struct McpServerContributionContext<'a, C> { /// Host configuration visible during MCP resolution. config: &'a C, - /// Initial inputs for the active thread, when resolution is thread-scoped. + /// Extension-owned data for the active thread, when resolution is thread-scoped. + thread_store: Option<&'a ExtensionData>, + /// Stable host inputs for the active thread, when resolution is thread-scoped. thread_init: Option<&'a ExtensionDataInit>, + /// Environment IDs whose selected roots may contribute to this exact step. + available_environment_ids: Option<&'a [String]>, } impl Clone for McpServerContributionContext<'_, C> { @@ -27,15 +32,24 @@ impl<'a, C> McpServerContributionContext<'a, C> { pub fn global(config: &'a C) -> Self { Self { config, + thread_store: None, thread_init: None, + available_environment_ids: None, } } - /// Creates context for one active thread runtime. - pub fn for_thread(config: &'a C, thread_init: &'a ExtensionDataInit) -> Self { + /// Creates context for one model step using only currently available environments. + pub fn for_step( + config: &'a C, + thread_init: &'a ExtensionDataInit, + thread_store: &'a ExtensionData, + available_environment_ids: &'a [String], + ) -> Self { Self { config, + thread_store: Some(thread_store), thread_init: Some(thread_init), + available_environment_ids: Some(available_environment_ids), } } @@ -44,10 +58,23 @@ impl<'a, C> McpServerContributionContext<'a, C> { self.config } - /// Returns the frozen initial inputs when resolving for a running thread. + /// Returns extension-owned state when resolving for a running thread. + pub fn thread_store(&self) -> Option<&'a ExtensionData> { + self.thread_store + } + + /// Returns stable host inputs when resolving for a running thread. pub fn thread_init(&self) -> Option<&'a ExtensionDataInit> { self.thread_init } + + /// Returns the exact environment availability projection for a model step. + /// + /// `Some` means contributors must omit selected roots whose environment ID is absent from the + /// slice. Global resolution returns `None` because it has no thread environments. + pub fn available_environment_ids(&self) -> Option<&'a [String]> { + self.available_environment_ids + } } /// One extension-owned overlay for the runtime MCP server configuration. @@ -66,6 +93,12 @@ pub enum McpServerContribution { selection_order: usize, config: Box, }, + /// Adds connector IDs declared by a plugin selected for this thread. + SelectedPluginConnectors { + plugin_id: String, + plugin_display_name: String, + connector_ids: Vec, + }, /// Removes a named MCP server. Remove { name: String }, } diff --git a/codex-rs/ext/mcp/Cargo.toml b/codex-rs/ext/mcp/Cargo.toml index b98c0905dbc2..d918bd5bd9a5 100644 --- a/codex-rs/ext/mcp/Cargo.toml +++ b/codex-rs/ext/mcp/Cargo.toml @@ -16,6 +16,7 @@ workspace = true codex-core = { workspace = true } codex-core-plugins = { workspace = true } codex-config = { workspace = true } +codex-connectors-extension = { workspace = true } codex-exec-server = { workspace = true } codex-extension-api = { workspace = true } codex-features = { workspace = true } diff --git a/codex-rs/ext/mcp/src/executor_plugin.rs b/codex-rs/ext/mcp/src/executor_plugin.rs index 69d1a47b31b3..32637b4f47be 100644 --- a/codex-rs/ext/mcp/src/executor_plugin.rs +++ b/codex-rs/ext/mcp/src/executor_plugin.rs @@ -1,44 +1,47 @@ +use codex_connectors_extension::ExecutorPluginConnectorProvider; use codex_core::config::Config; use codex_core_plugins::ExecutorPluginProvider; use codex_exec_server::EnvironmentManager; -use codex_extension_api::ExtensionDataInit; use codex_extension_api::ExtensionFuture; use codex_extension_api::McpServerContribution; use codex_extension_api::McpServerContributionContext; use codex_extension_api::McpServerContributor; +use codex_protocol::capabilities::CapabilityRootLocation; use codex_protocol::capabilities::SelectedCapabilityRoot; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::OnceCell; +use std::sync::Mutex; use self::provider::ExecutorPluginMcpProvider; mod provider; -/// Frozen MCP declarations for one selected package. +/// Frozen MCP and connector declarations for one selected package. /// /// Each server config retains the stable logical environment ID. Reconnection may replace the /// concrete environment instance without changing that authority. #[derive(Clone)] -struct SelectedPluginMcpServers { +struct SelectedPluginMetadata { plugin_id: String, plugin_display_name: String, - selection_order: usize, servers: Vec<(String, codex_config::McpServerConfig)>, + connector_ids: Vec, } #[derive(Default)] pub(crate) struct SelectedExecutorPluginMcpState { - snapshot: OnceCell>, + cache: Mutex>, } -pub(crate) fn seed_thread_state(thread_init: &mut ExtensionDataInit) { - thread_init.insert(SelectedExecutorPluginMcpState::default()); +struct CachedSelectedRoot { + root: SelectedCapabilityRoot, + metadata: Option, } pub(crate) struct SelectedExecutorPluginMcpContributor { plugin_provider: ExecutorPluginProvider, mcp_provider: ExecutorPluginMcpProvider, + connector_provider: ExecutorPluginConnectorProvider, } impl SelectedExecutorPluginMcpContributor { @@ -46,46 +49,87 @@ impl SelectedExecutorPluginMcpContributor { Self { plugin_provider: ExecutorPluginProvider::new(Arc::clone(&environment_manager)), mcp_provider: ExecutorPluginMcpProvider, + connector_provider: ExecutorPluginConnectorProvider, } } - async fn resolve_snapshot( + /// Returns metadata for one stable selected root. + /// + /// Successful resolution, including a root that is not a plugin or declares no capabilities, + /// is cached until the thread state is dropped. Environment availability never invalidates + /// this cache; it only controls whether the cached metadata is projected into a model step. + async fn metadata_for_root( &self, - selected_roots: &[SelectedCapabilityRoot], - ) -> Vec { - let mut snapshot = Vec::new(); + state: &SelectedExecutorPluginMcpState, + selected_root: &SelectedCapabilityRoot, + ) -> Option { + if let Some(cached) = state + .cache + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .iter() + .find(|cached| cached.root == *selected_root) + { + return cached.metadata.clone(); + } - for (selection_order, selected_root) in selected_roots.iter().enumerate() { - let plugin = match self.plugin_provider.resolve_bound(selected_root).await { - Ok(Some(plugin)) => plugin, - Ok(None) => continue, - Err(err) => { + let plugin = match self.plugin_provider.resolve_bound(selected_root).await { + Ok(plugin) => plugin, + Err(err) => { + tracing::warn!( + selected_root = selected_root.id, + error = %err, + "failed to resolve selected executor plugin" + ); + return None; + } + }; + let metadata = match plugin { + Some(plugin) => { + let servers = self.mcp_provider.load(&plugin).await.unwrap_or_else(|err| { tracing::warn!( selected_root = selected_root.id, error = %err, - "failed to resolve selected executor plugin for MCP discovery" + "failed to load selected executor plugin MCP servers" ); - continue; - } - }; - match self.mcp_provider.load(&plugin).await { - Ok(servers) => snapshot.push(SelectedPluginMcpServers { + Vec::new() + }); + let connector_ids = self + .connector_provider + .load(&plugin) + .await + .unwrap_or_else(|err| { + tracing::warn!( + selected_root = selected_root.id, + error = %err, + "failed to load selected executor plugin connectors" + ); + Vec::new() + }) + .into_iter() + .map(|declaration| declaration.connector_id.0) + .collect(); + Some(SelectedPluginMetadata { plugin_id: plugin.plugin().selected_root_id().to_string(), plugin_display_name: plugin.plugin().manifest().display_name().to_string(), - selection_order, servers, - }), - Err(err) => { - tracing::warn!( - selected_root = selected_root.id, - error = %err, - "failed to load selected executor plugin MCP servers" - ); - } + connector_ids, + }) } + None => None, + }; + let mut cache = state + .cache + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + if let Some(cached) = cache.iter().find(|cached| cached.root == *selected_root) { + return cached.metadata.clone(); } - - snapshot + cache.push(CachedSelectedRoot { + root: selected_root.clone(), + metadata: metadata.clone(), + }); + metadata } } @@ -102,20 +146,31 @@ impl McpServerContributor for SelectedExecutorPluginMcpContributor { let Some(thread_init) = context.thread_init() else { return Vec::new(); }; - let Some(selected_roots) = thread_init.get::>() else { + let Some(thread_store) = context.thread_store() else { return Vec::new(); }; - let Some(state) = thread_init.get::() else { - tracing::warn!("selected executor plugin MCP state was not initialized"); + let Some(selected_roots) = thread_init.get::>() else { return Vec::new(); }; - let snapshot = state - .snapshot - .get_or_init(|| self.resolve_snapshot(selected_roots.as_ref())) - .await; + let state = thread_store.get_or_init(SelectedExecutorPluginMcpState::default); let mut contributions = Vec::new(); - for plugin in snapshot { + for (selection_order, selected_root) in selected_roots.iter().enumerate() { + let CapabilityRootLocation::Environment { environment_id, .. } = + &selected_root.location; + if context + .available_environment_ids() + .is_some_and(|available| { + !available + .iter() + .any(|available| available == environment_id) + }) + { + continue; + } + let Some(plugin) = self.metadata_for_root(&state, selected_root).await else { + continue; + }; let mut servers = plugin.servers.iter().cloned().collect::>(); context .config() @@ -127,10 +182,17 @@ impl McpServerContributor for SelectedExecutorPluginMcpContributor { name, plugin_id: plugin.plugin_id.clone(), plugin_display_name: plugin.plugin_display_name.clone(), - selection_order: plugin.selection_order, + selection_order, config: Box::new(config), } })); + if !plugin.connector_ids.is_empty() { + contributions.push(McpServerContribution::SelectedPluginConnectors { + plugin_id: plugin.plugin_id, + plugin_display_name: plugin.plugin_display_name, + connector_ids: plugin.connector_ids, + }); + } } contributions diff --git a/codex-rs/ext/mcp/src/lib.rs b/codex-rs/ext/mcp/src/lib.rs index e32316ac36fb..9c22bad5a0a1 100644 --- a/codex-rs/ext/mcp/src/lib.rs +++ b/codex-rs/ext/mcp/src/lib.rs @@ -51,10 +51,3 @@ pub fn install_executor_plugins( executor_plugin::SelectedExecutorPluginMcpContributor::new(environment_manager), )); } - -/// Seeds the per-thread snapshot used by selected executor plugin MCP discovery. -pub fn initialize_executor_plugin_thread_data( - thread_init: &mut codex_extension_api::ExtensionDataInit, -) { - executor_plugin::seed_thread_state(thread_init); -} diff --git a/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs b/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs index 7bdbc83d62e4..65a4248a32a5 100644 --- a/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs +++ b/codex-rs/ext/mcp/tests/executor_plugin_mcp.rs @@ -3,6 +3,7 @@ use codex_core::config::Config; use codex_core::config::ConfigBuilder; use codex_exec_server::EnvironmentManager; use codex_exec_server::LOCAL_ENVIRONMENT_ID; +use codex_extension_api::ExtensionData; use codex_extension_api::ExtensionDataInit; use codex_extension_api::ExtensionRegistryBuilder; use codex_extension_api::McpServerContribution; @@ -109,12 +110,15 @@ async fn selected_plugin_contributions( path: PathUri::from_host_native_path(plugin_root)?, }, }]); - codex_mcp_extension::initialize_executor_plugin_thread_data(&mut thread_init); + let thread_store = ExtensionData::new_with_init("test-thread", thread_init.clone()); + let available_environment_ids = vec![LOCAL_ENVIRONMENT_ID.to_string()]; Ok(registry.mcp_server_contributors()[0] - .contribute(McpServerContributionContext::for_thread( + .contribute(McpServerContributionContext::for_step( config, &thread_init, + &thread_store, + &available_environment_ids, )) .await .into_iter() @@ -132,7 +136,9 @@ async fn selected_plugin_contributions( selection_order, enabled: config.enabled, }, - McpServerContribution::Set { .. } | McpServerContribution::Remove { .. } => { + McpServerContribution::Set { .. } + | McpServerContribution::SelectedPluginConnectors { .. } + | McpServerContribution::Remove { .. } => { panic!("expected selected plugin contribution") } }) From e3cc8a212f68c51989779e78a7f6c5192f69a146 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Thu, 25 Jun 2026 20:47:05 +0100 Subject: [PATCH 02/12] Test selected capability stack across availability and resume --- .../app-server/tests/suite/v2/app_list.rs | 4 +- codex-rs/app-server/tests/suite/v2/mod.rs | 1 + .../suite/v2/selected_capability_stack.rs | 463 ++++++++++++++++++ 3 files changed, 466 insertions(+), 2 deletions(-) create mode 100644 codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs diff --git a/codex-rs/app-server/tests/suite/v2/app_list.rs b/codex-rs/app-server/tests/suite/v2/app_list.rs index 1ddf355269f8..5fad04929f7f 100644 --- a/codex-rs/app-server/tests/suite/v2/app_list.rs +++ b/codex-rs/app-server/tests/suite/v2/app_list.rs @@ -1533,7 +1533,7 @@ impl ServerHandler for AppListMcpServer { } } -async fn start_apps_server_with_delays( +pub(super) async fn start_apps_server_with_delays( connectors: Vec, tools: Vec, directory_delay: Duration, @@ -1693,7 +1693,7 @@ async fn list_directory_connectors( } } -fn connector_tool(connector_id: &str, connector_name: &str) -> Result { +pub(super) fn connector_tool(connector_id: &str, connector_name: &str) -> Result { let schema: JsonObject = serde_json::from_value(json!({ "type": "object", "additionalProperties": false diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index bbd995275463..50c5313aaaa1 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -56,6 +56,7 @@ mod request_user_input; mod request_validation; mod review; mod safety_check_downgrade; +mod selected_capability_stack; mod skills_list; mod sleep; mod thread_archive; diff --git a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs new file mode 100644 index 000000000000..e3b86af7864a --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs @@ -0,0 +1,463 @@ +use std::process::Stdio; +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use app_test_support::ChatGptAuthFixture; +use app_test_support::TestAppServer; +use app_test_support::to_response; +use app_test_support::write_chatgpt_auth; +use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url; +use codex_app_server_protocol::AppInfo; +use codex_app_server_protocol::CapabilityRootLocation; +use codex_app_server_protocol::EnvironmentAddResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::SelectedCapabilityRoot; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnEnvironmentParams; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::UserInput; +use codex_config::types::AuthCredentialsStoreMode; +use codex_utils_absolute_path::AbsolutePathBuf; +use codex_utils_path_uri::PathUri; +use core_test_support::process::wait_for_pid_file; +use core_test_support::responses; +use core_test_support::responses::ResponsesRequest; +use core_test_support::stdio_server_bin; +use pretty_assertions::assert_eq; +use pretty_assertions::assert_ne; +use serde_json::json; +use tempfile::TempDir; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::process::Child; +use tokio::process::Command; +use tokio::time::timeout; + +use super::app_list::connector_tool; +use super::app_list::start_apps_server_with_delays; + +const READ_TIMEOUT: Duration = Duration::from_secs(20); +const EXECUTOR_ID: &str = "executor-1"; +const EXECUTOR_ENV_NAME: &str = "MCP_EXECUTOR_MARKER"; +const EXECUTOR_ENV_VALUE: &str = "executor-only"; +const PLUGIN_ID: &str = "executor-demo@1"; +const PLUGIN_DISPLAY_NAME: &str = "Executor Demo"; +const SKILL_NAME: &str = "executor-demo:deploy"; +const SKILL_DESCRIPTION: &str = "Deploy through the selected executor."; +const SKILL_BODY_MARKER: &str = "SELECTED_EXECUTOR_SKILL_BODY"; +const LOCAL_SKILL_BODY_MARKER: &str = "COLLIDING_LOCAL_SKILL_BODY"; +const MCP_SERVER_NAME: &str = "executor_probe"; +const MCP_CALL_ID: &str = "selected-executor-mcp-call"; +const CONNECTOR_ID: &str = "calendar"; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn selected_capability_stack_tracks_environment_availability_and_resume() -> Result<()> { + let responses_server = responses::start_mock_server().await; + let (apps_url, apps_server_handle) = start_apps_server_with_delays( + vec![AppInfo { + id: CONNECTOR_ID.to_string(), + name: "Calendar".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + icon_assets: None, + icon_dark_assets: None, + distribution_channel: None, + branding: None, + app_metadata: None, + labels: None, + install_url: None, + is_accessible: false, + is_enabled: true, + plugin_display_names: Vec::new(), + }], + vec![connector_tool(CONNECTOR_ID, "Calendar")?], + Duration::ZERO, + Duration::ZERO, + ) + .await?; + + let codex_home = TempDir::new()?; + write_mock_responses_config_toml_with_chatgpt_base_url( + codex_home.path(), + &responses_server.uri(), + &apps_url, + )?; + let config_path = codex_home.path().join("config.toml"); + let config = std::fs::read_to_string(&config_path)?.replacen( + "model_provider = \"mock_provider\"", + "mcp_oauth_credentials_store = \"file\"\nmodel_provider = \"mock_provider\"", + 1, + ); + std::fs::write( + config_path, + format!("{config}\n[features]\napps = true\n\n[skills]\ninclude_instructions = true\n"), + )?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .email("selected-capability-stack@example.com") + .plan_type("pro") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + // Reserve the URL before app-server starts. The configured environment initially fails to + // connect, then environment/add points the same stable ID at the same URL once it is live. + let listener = std::net::TcpListener::bind("127.0.0.1:0")?; + let exec_server_url = format!("ws://{}", listener.local_addr()?); + drop(listener); + std::fs::write( + codex_home.path().join("environments.toml"), + format!( + "default = \"{EXECUTOR_ID}\"\ninclude_local = false\n\n[[environments]]\nid = \"{EXECUTOR_ID}\"\nurl = \"{exec_server_url}\"\nconnect_timeout_sec = 0.05\n" + ), + )?; + + // A colliding host skill makes the authority assertion meaningful: once the executor is + // available, its body must win by name. + let local_skill_dir = codex_home.path().join("skills/local-deploy"); + std::fs::create_dir_all(&local_skill_dir)?; + std::fs::write( + local_skill_dir.join("SKILL.md"), + format!( + "---\nname: {SKILL_NAME}\ndescription: Colliding local skill.\n---\n\n{LOCAL_SKILL_BODY_MARKER}\n" + ), + )?; + + let plugin = TempDir::new()?; + let manifest_dir = plugin.path().join(".codex-plugin"); + let skill_dir = plugin.path().join("skills/deploy"); + let pid_file = plugin.path().join("executor-mcp.pid"); + std::fs::create_dir_all(&manifest_dir)?; + std::fs::create_dir_all(&skill_dir)?; + std::fs::write( + manifest_dir.join("plugin.json"), + r#"{"name":"executor-demo","apps":"./.app.json","interface":{"displayName":"Executor Demo"}}"#, + )?; + std::fs::write( + skill_dir.join("SKILL.md"), + format!( + "---\nname: deploy\ndescription: {SKILL_DESCRIPTION}\n---\n\n{SKILL_BODY_MARKER}\n" + ), + )?; + std::fs::write( + plugin.path().join(".app.json"), + format!(r#"{{"apps":{{"calendar":{{"id":"{CONNECTOR_ID}"}}}}}}"#), + )?; + std::fs::write( + plugin.path().join(".mcp.json"), + serde_json::to_vec_pretty(&json!({ + "mcpServers": { + (MCP_SERVER_NAME): { + "command": stdio_server_bin()?, + "env": { + "MCP_TEST_PID_FILE": pid_file.to_string_lossy(), + }, + "env_vars": [EXECUTOR_ENV_NAME], + "startup_timeout_sec": 10, + } + } + }))?, + )?; + + let response_mock = responses::mount_sse_sequence( + &responses_server, + vec![ + responses::sse(vec![ + responses::ev_response_created("environment-unavailable"), + responses::ev_assistant_message("unavailable-message", "Waiting"), + responses::ev_completed("environment-unavailable"), + ]), + responses::sse(vec![ + responses::ev_response_created("environment-available-call"), + responses::ev_function_call_with_namespace( + MCP_CALL_ID, + &format!("mcp__{MCP_SERVER_NAME}"), + "echo", + &json!({ + "message": "hello from the selected executor", + "env_var": EXECUTOR_ENV_NAME, + }) + .to_string(), + ), + responses::ev_completed("environment-available-call"), + ]), + responses::sse(vec![ + responses::ev_response_created("environment-available-done"), + responses::ev_assistant_message("available-message", "Done"), + responses::ev_completed("environment-available-done"), + ]), + responses::sse(vec![ + responses::ev_response_created("unchanged-step"), + responses::ev_assistant_message("unchanged-message", "Still ready"), + responses::ev_completed("unchanged-step"), + ]), + responses::sse(vec![ + responses::ev_response_created("resumed-step"), + responses::ev_assistant_message("resumed-message", "Ready after resume"), + responses::ev_completed("resumed-step"), + ]), + ], + ) + .await; + + let selected_root = SelectedCapabilityRoot { + id: PLUGIN_ID.to_string(), + location: CapabilityRootLocation::Environment { + environment_id: EXECUTOR_ID.to_string(), + path: PathUri::from_host_native_path(plugin.path())?, + }, + }; + let environment_cwd = AbsolutePathBuf::try_from(plugin.path().to_path_buf())?; + let mut app_server = TestAppServer::new(codex_home.path()).await?; + timeout(READ_TIMEOUT, app_server.initialize()).await??; + let thread_id = start_thread(&mut app_server, selected_root, environment_cwd.clone()).await?; + + run_turn( + &mut app_server, + &thread_id, + "Inspect the current capabilities", + environment_cwd.clone(), + ) + .await?; + let initial_requests = response_mock.requests(); + assert_selected_capabilities_absent(&initial_requests[0]); + + let mut exec_server = spawn_exec_server(codex_home.path(), &exec_server_url).await?; + add_environment(&mut app_server, &exec_server_url).await?; + tokio::time::sleep(Duration::from_millis(200)).await; + + run_turn( + &mut app_server, + &thread_id, + &format!("Use ${SKILL_NAME} and call its selected executor MCP"), + environment_cwd.clone(), + ) + .await?; + let first_mcp_pid = wait_for_pid_file(&pid_file).await?; + + run_turn( + &mut app_server, + &thread_id, + "Continue with the same selected capabilities", + environment_cwd.clone(), + ) + .await?; + assert_eq!(first_mcp_pid, wait_for_pid_file(&pid_file).await?); + + drop(app_server); + std::fs::remove_file(&pid_file)?; + + let mut app_server = TestAppServer::new(codex_home.path()).await?; + timeout(READ_TIMEOUT, app_server.initialize()).await??; + let request_id = app_server + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread_id.clone(), + ..Default::default() + }) + .await?; + let response = timeout( + READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let ThreadResumeResponse { thread, .. } = to_response(response)?; + assert_eq!(thread_id, thread.id); + tokio::time::sleep(Duration::from_millis(200)).await; + + run_turn( + &mut app_server, + &thread_id, + "Continue after resuming the thread", + environment_cwd, + ) + .await?; + let resumed_mcp_pid = wait_for_pid_file(&pid_file).await?; + assert_ne!(first_mcp_pid, resumed_mcp_pid); + + let requests = response_mock.requests(); + assert_eq!(5, requests.len()); + for request in &requests[1..] { + assert_selected_skill_is_injected_once(request); + assert_selected_plugin_tools(request); + } + let output = requests[2].function_call_output(MCP_CALL_ID); + let output = output["output"] + .as_str() + .expect("MCP function output should be text"); + assert!(output.contains("ECHOING: hello from the selected executor")); + assert!(output.contains(EXECUTOR_ENV_VALUE)); + + exec_server.kill().await?; + apps_server_handle.abort(); + let _ = apps_server_handle.await; + Ok(()) +} + +fn assert_selected_capabilities_absent(request: &ResponsesRequest) { + assert!( + request + .message_input_texts("developer") + .into_iter() + .all(|text| !text.contains(SKILL_DESCRIPTION)) + ); + assert!( + request + .tool_by_name(&format!("mcp__{MCP_SERVER_NAME}"), "echo") + .is_none() + ); + let connector = request + .tool_by_name("mcp__codex_apps__calendar", "connector_calendar") + .expect("host connector should remain model-visible"); + assert!( + connector["description"] + .as_str() + .is_some_and(|description| !description.contains(PLUGIN_DISPLAY_NAME)) + ); +} + +fn assert_selected_skill_is_injected_once(request: &ResponsesRequest) { + let catalog_fragments = request + .message_input_texts("developer") + .into_iter() + .filter(|text| text.contains(SKILL_DESCRIPTION)) + .collect::>(); + assert_eq!(1, catalog_fragments.len()); + assert!(catalog_fragments[0].contains("environment resource:")); + + let skill_fragments = request + .message_input_texts("user") + .into_iter() + .filter(|text| text.starts_with("")) + .collect::>(); + assert_eq!(1, skill_fragments.len()); + assert!(skill_fragments[0].contains(&format!("{SKILL_NAME}"))); + assert!(skill_fragments[0].contains(SKILL_BODY_MARKER)); + assert!(!skill_fragments[0].contains(LOCAL_SKILL_BODY_MARKER)); +} + +fn assert_selected_plugin_tools(request: &ResponsesRequest) { + assert!( + request + .tool_by_name(&format!("mcp__{MCP_SERVER_NAME}"), "echo") + .is_some() + ); + let connector = request + .tool_by_name("mcp__codex_apps__calendar", "connector_calendar") + .expect("selected connector should be model-visible"); + assert!( + connector["description"] + .as_str() + .is_some_and(|description| description.contains(PLUGIN_DISPLAY_NAME)) + ); +} + +async fn start_thread( + app_server: &mut TestAppServer, + selected_root: SelectedCapabilityRoot, + environment_cwd: AbsolutePathBuf, +) -> Result { + let request_id = app_server + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + environments: Some(vec![TurnEnvironmentParams { + environment_id: EXECUTOR_ID.to_string(), + cwd: environment_cwd.into(), + }]), + selected_capability_roots: Some(vec![selected_root]), + ..Default::default() + }) + .await?; + let response = timeout( + READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response(response)?; + Ok(thread.id) +} + +async fn run_turn( + app_server: &mut TestAppServer, + thread_id: &str, + text: &str, + environment_cwd: AbsolutePathBuf, +) -> Result<()> { + let request_id = app_server + .send_turn_start_request(TurnStartParams { + thread_id: thread_id.to_string(), + input: vec![UserInput::Text { + text: text.to_string(), + text_elements: Vec::new(), + }], + environments: Some(vec![TurnEnvironmentParams { + environment_id: EXECUTOR_ID.to_string(), + cwd: environment_cwd.into(), + }]), + ..Default::default() + }) + .await?; + timeout( + READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + timeout( + READ_TIMEOUT, + app_server.read_stream_until_notification_message("turn/completed"), + ) + .await??; + Ok(()) +} + +async fn add_environment(app_server: &mut TestAppServer, exec_server_url: &str) -> Result<()> { + let request_id = app_server + .send_raw_request( + "environment/add", + Some(json!({ + "environmentId": EXECUTOR_ID, + "execServerUrl": exec_server_url, + "connectTimeoutMs": 10_000, + })), + ) + .await?; + let response = timeout( + READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let _: EnvironmentAddResponse = to_response(response)?; + Ok(()) +} + +async fn spawn_exec_server(codex_home: &std::path::Path, url: &str) -> Result { + let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex")?) + .args(["exec-server", "--listen", url]) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .kill_on_drop(true) + .env("CODEX_HOME", codex_home) + .env(EXECUTOR_ENV_NAME, EXECUTOR_ENV_VALUE) + .spawn()?; + let stdout = child + .stdout + .take() + .context("exec-server stdout was not captured")?; + let mut lines = BufReader::new(stdout).lines(); + loop { + let line = timeout(READ_TIMEOUT, lines.next_line()) + .await + .context("timed out waiting for exec-server URL")?? + .context("exec-server exited before printing its URL")?; + if line.trim() == url { + return Ok(child); + } + } +} From f850a5cb1db94728159afee9b235d18c1b7e8e30 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Thu, 25 Jun 2026 22:35:19 +0100 Subject: [PATCH 03/12] Gate selected capability stack test on Windows --- codex-rs/app-server/tests/suite/v2/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 50c5313aaaa1..9963d37be5d2 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -56,6 +56,7 @@ mod request_user_input; mod request_validation; mod review; mod safety_check_downgrade; +#[cfg(not(target_os = "windows"))] mod selected_capability_stack; mod skills_list; mod sleep; From 51a5fc48cb348af25655a7275d8ed03df0683d80 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 26 Jun 2026 00:47:43 +0100 Subject: [PATCH 04/12] Cover same-turn selected capability activation --- .../suite/v2/selected_capability_stack.rs | 426 +++++++++++++----- 1 file changed, 313 insertions(+), 113 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs index e3b86af7864a..f185e679a5aa 100644 --- a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs +++ b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs @@ -13,6 +13,7 @@ use codex_app_server_protocol::CapabilityRootLocation; use codex_app_server_protocol::EnvironmentAddResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SelectedCapabilityRoot; +use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; @@ -21,6 +22,9 @@ use codex_app_server_protocol::TurnEnvironmentParams; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::UserInput; use codex_config::types::AuthCredentialsStoreMode; +use codex_protocol::config_types::CollaborationMode; +use codex_protocol::config_types::ModeKind; +use codex_protocol::config_types::Settings; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_path_uri::PathUri; use core_test_support::process::wait_for_pid_file; @@ -80,91 +84,7 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() Duration::ZERO, ) .await?; - - let codex_home = TempDir::new()?; - write_mock_responses_config_toml_with_chatgpt_base_url( - codex_home.path(), - &responses_server.uri(), - &apps_url, - )?; - let config_path = codex_home.path().join("config.toml"); - let config = std::fs::read_to_string(&config_path)?.replacen( - "model_provider = \"mock_provider\"", - "mcp_oauth_credentials_store = \"file\"\nmodel_provider = \"mock_provider\"", - 1, - ); - std::fs::write( - config_path, - format!("{config}\n[features]\napps = true\n\n[skills]\ninclude_instructions = true\n"), - )?; - write_chatgpt_auth( - codex_home.path(), - ChatGptAuthFixture::new("chatgpt-token") - .account_id("account-123") - .email("selected-capability-stack@example.com") - .plan_type("pro") - .chatgpt_account_id("account-123"), - AuthCredentialsStoreMode::File, - )?; - - // Reserve the URL before app-server starts. The configured environment initially fails to - // connect, then environment/add points the same stable ID at the same URL once it is live. - let listener = std::net::TcpListener::bind("127.0.0.1:0")?; - let exec_server_url = format!("ws://{}", listener.local_addr()?); - drop(listener); - std::fs::write( - codex_home.path().join("environments.toml"), - format!( - "default = \"{EXECUTOR_ID}\"\ninclude_local = false\n\n[[environments]]\nid = \"{EXECUTOR_ID}\"\nurl = \"{exec_server_url}\"\nconnect_timeout_sec = 0.05\n" - ), - )?; - - // A colliding host skill makes the authority assertion meaningful: once the executor is - // available, its body must win by name. - let local_skill_dir = codex_home.path().join("skills/local-deploy"); - std::fs::create_dir_all(&local_skill_dir)?; - std::fs::write( - local_skill_dir.join("SKILL.md"), - format!( - "---\nname: {SKILL_NAME}\ndescription: Colliding local skill.\n---\n\n{LOCAL_SKILL_BODY_MARKER}\n" - ), - )?; - - let plugin = TempDir::new()?; - let manifest_dir = plugin.path().join(".codex-plugin"); - let skill_dir = plugin.path().join("skills/deploy"); - let pid_file = plugin.path().join("executor-mcp.pid"); - std::fs::create_dir_all(&manifest_dir)?; - std::fs::create_dir_all(&skill_dir)?; - std::fs::write( - manifest_dir.join("plugin.json"), - r#"{"name":"executor-demo","apps":"./.app.json","interface":{"displayName":"Executor Demo"}}"#, - )?; - std::fs::write( - skill_dir.join("SKILL.md"), - format!( - "---\nname: deploy\ndescription: {SKILL_DESCRIPTION}\n---\n\n{SKILL_BODY_MARKER}\n" - ), - )?; - std::fs::write( - plugin.path().join(".app.json"), - format!(r#"{{"apps":{{"calendar":{{"id":"{CONNECTOR_ID}"}}}}}}"#), - )?; - std::fs::write( - plugin.path().join(".mcp.json"), - serde_json::to_vec_pretty(&json!({ - "mcpServers": { - (MCP_SERVER_NAME): { - "command": stdio_server_bin()?, - "env": { - "MCP_TEST_PID_FILE": pid_file.to_string_lossy(), - }, - "env_vars": [EXECUTOR_ENV_NAME], - "startup_timeout_sec": 10, - } - } - }))?, - )?; + let fixture = selected_capability_fixture(&responses_server.uri(), &apps_url)?; let response_mock = responses::mount_sse_sequence( &responses_server, @@ -207,54 +127,52 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() ) .await; - let selected_root = SelectedCapabilityRoot { - id: PLUGIN_ID.to_string(), - location: CapabilityRootLocation::Environment { - environment_id: EXECUTOR_ID.to_string(), - path: PathUri::from_host_native_path(plugin.path())?, - }, - }; - let environment_cwd = AbsolutePathBuf::try_from(plugin.path().to_path_buf())?; - let mut app_server = TestAppServer::new(codex_home.path()).await?; + let mut app_server = TestAppServer::new(fixture.codex_home.path()).await?; timeout(READ_TIMEOUT, app_server.initialize()).await??; - let thread_id = start_thread(&mut app_server, selected_root, environment_cwd.clone()).await?; + let thread_id = start_thread( + &mut app_server, + fixture.selected_root.clone(), + fixture.environment_cwd.clone(), + ) + .await?; run_turn( &mut app_server, &thread_id, "Inspect the current capabilities", - environment_cwd.clone(), + fixture.environment_cwd.clone(), ) .await?; let initial_requests = response_mock.requests(); assert_selected_capabilities_absent(&initial_requests[0]); - let mut exec_server = spawn_exec_server(codex_home.path(), &exec_server_url).await?; - add_environment(&mut app_server, &exec_server_url).await?; + let mut exec_server = + spawn_exec_server(fixture.codex_home.path(), &fixture.exec_server_url).await?; + add_environment(&mut app_server, &fixture.exec_server_url).await?; tokio::time::sleep(Duration::from_millis(200)).await; run_turn( &mut app_server, &thread_id, &format!("Use ${SKILL_NAME} and call its selected executor MCP"), - environment_cwd.clone(), + fixture.environment_cwd.clone(), ) .await?; - let first_mcp_pid = wait_for_pid_file(&pid_file).await?; + let first_mcp_pid = wait_for_pid_file(&fixture.pid_file).await?; run_turn( &mut app_server, &thread_id, "Continue with the same selected capabilities", - environment_cwd.clone(), + fixture.environment_cwd.clone(), ) .await?; - assert_eq!(first_mcp_pid, wait_for_pid_file(&pid_file).await?); + assert_eq!(first_mcp_pid, wait_for_pid_file(&fixture.pid_file).await?); drop(app_server); - std::fs::remove_file(&pid_file)?; + std::fs::remove_file(&fixture.pid_file)?; - let mut app_server = TestAppServer::new(codex_home.path()).await?; + let mut app_server = TestAppServer::new(fixture.codex_home.path()).await?; timeout(READ_TIMEOUT, app_server.initialize()).await??; let request_id = app_server .send_thread_resume_request(ThreadResumeParams { @@ -275,10 +193,10 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() &mut app_server, &thread_id, "Continue after resuming the thread", - environment_cwd, + fixture.environment_cwd, ) .await?; - let resumed_mcp_pid = wait_for_pid_file(&pid_file).await?; + let resumed_mcp_pid = wait_for_pid_file(&fixture.pid_file).await?; assert_ne!(first_mcp_pid, resumed_mcp_pid); let requests = response_mock.requests(); @@ -300,6 +218,284 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn selected_capabilities_become_available_between_samples_in_one_turn() -> Result<()> { + const USER_INPUT_CALL_ID: &str = "pause-for-environment"; + + let responses_server = responses::start_mock_server().await; + let (apps_url, apps_server_handle) = start_apps_server_with_delays( + vec![AppInfo { + id: CONNECTOR_ID.to_string(), + name: "Calendar".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + icon_assets: None, + icon_dark_assets: None, + distribution_channel: None, + branding: None, + app_metadata: None, + labels: None, + install_url: None, + is_accessible: false, + is_enabled: true, + plugin_display_names: Vec::new(), + }], + vec![connector_tool(CONNECTOR_ID, "Calendar")?], + Duration::ZERO, + Duration::ZERO, + ) + .await?; + let fixture = selected_capability_fixture(&responses_server.uri(), &apps_url)?; + let response_mock = responses::mount_sse_sequence( + &responses_server, + vec![ + responses::sse(vec![ + responses::ev_response_created("environment-pending"), + responses::ev_function_call( + USER_INPUT_CALL_ID, + "request_user_input", + &json!({ + "questions": [{ + "id": "continue", + "header": "Continue", + "question": "Continue after the executor is attached?", + "options": [{ + "label": "Yes (Recommended)", + "description": "Continue the same turn." + }, { + "label": "No", + "description": "Stop here." + }] + }], + "autoResolutionMs": 60_000 + }) + .to_string(), + ), + responses::ev_completed("environment-pending"), + ]), + responses::sse(vec![ + responses::ev_response_created("environment-ready-call"), + responses::ev_function_call_with_namespace( + MCP_CALL_ID, + &format!("mcp__{MCP_SERVER_NAME}"), + "echo", + &json!({ + "message": "same turn", + "env_var": EXECUTOR_ENV_NAME, + }) + .to_string(), + ), + responses::ev_completed("environment-ready-call"), + ]), + responses::sse(vec![ + responses::ev_response_created("same-turn-done"), + responses::ev_assistant_message("same-turn-message", "Done"), + responses::ev_completed("same-turn-done"), + ]), + ], + ) + .await; + + let mut app_server = TestAppServer::new(fixture.codex_home.path()).await?; + timeout(READ_TIMEOUT, app_server.initialize()).await??; + let thread_id = start_thread( + &mut app_server, + fixture.selected_root, + fixture.environment_cwd.clone(), + ) + .await?; + let turn_start_id = app_server + .send_turn_start_request(TurnStartParams { + thread_id, + input: vec![UserInput::Text { + text: "Use the executor when it becomes ready.".to_string(), + text_elements: Vec::new(), + }], + environments: Some(vec![TurnEnvironmentParams { + environment_id: EXECUTOR_ID.to_string(), + cwd: fixture.environment_cwd.into(), + }]), + collaboration_mode: Some(CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: "mock-model".to_string(), + reasoning_effort: None, + developer_instructions: None, + }, + }), + ..Default::default() + }) + .await?; + timeout( + READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(turn_start_id)), + ) + .await??; + + let request = timeout(READ_TIMEOUT, app_server.read_stream_until_request_message()).await??; + let ServerRequest::ToolRequestUserInput { request_id, .. } = request else { + panic!("expected request_user_input, got {request:?}"); + }; + let requests = response_mock.requests(); + assert_eq!(1, requests.len()); + assert_selected_capabilities_absent(&requests[0]); + + let mut exec_server = + spawn_exec_server(fixture.codex_home.path(), &fixture.exec_server_url).await?; + add_environment(&mut app_server, &fixture.exec_server_url).await?; + tokio::time::sleep(Duration::from_millis(200)).await; + app_server + .send_response( + request_id, + json!({ + "answers": { + "continue": { "answers": ["yes"] } + } + }), + ) + .await?; + timeout( + READ_TIMEOUT, + app_server.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let requests = response_mock.requests(); + assert_eq!(3, requests.len()); + assert_selected_skill_catalog_available(&requests[1]); + assert_selected_plugin_tools(&requests[1]); + assert_selected_plugin_tools(&requests[2]); + let output = requests[2].function_call_output(MCP_CALL_ID); + let output = output["output"] + .as_str() + .expect("MCP function output should be text"); + assert!(output.contains("ECHOING: same turn")); + assert!(output.contains(EXECUTOR_ENV_VALUE)); + wait_for_pid_file(&fixture.pid_file).await?; + + exec_server.kill().await?; + apps_server_handle.abort(); + let _ = apps_server_handle.await; + Ok(()) +} + +struct SelectedCapabilityFixture { + codex_home: TempDir, + _plugin: TempDir, + pid_file: std::path::PathBuf, + exec_server_url: String, + selected_root: SelectedCapabilityRoot, + environment_cwd: AbsolutePathBuf, +} + +fn selected_capability_fixture( + responses_server_uri: &str, + apps_url: &str, +) -> Result { + let codex_home = TempDir::new()?; + write_mock_responses_config_toml_with_chatgpt_base_url( + codex_home.path(), + responses_server_uri, + apps_url, + )?; + let config_path = codex_home.path().join("config.toml"); + let config = std::fs::read_to_string(&config_path)?.replacen( + "model_provider = \"mock_provider\"", + "mcp_oauth_credentials_store = \"file\"\nmodel_provider = \"mock_provider\"", + 1, + ); + std::fs::write( + config_path, + format!( + "{config}\n[features]\napps = true\ndeferred_executor = true\n\n[skills]\ninclude_instructions = true\n" + ), + )?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .email("selected-capability-stack@example.com") + .plan_type("pro") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + // Reserve the URL before app-server starts. The configured environment initially fails to + // connect, then environment/add points the same stable ID at the same URL once it is live. + let listener = std::net::TcpListener::bind("127.0.0.1:0")?; + let exec_server_url = format!("ws://{}", listener.local_addr()?); + drop(listener); + std::fs::write( + codex_home.path().join("environments.toml"), + format!( + "default = \"{EXECUTOR_ID}\"\ninclude_local = false\n\n[[environments]]\nid = \"{EXECUTOR_ID}\"\nurl = \"{exec_server_url}\"\nconnect_timeout_sec = 0.05\n" + ), + )?; + + let local_skill_dir = codex_home.path().join("skills/local-deploy"); + std::fs::create_dir_all(&local_skill_dir)?; + std::fs::write( + local_skill_dir.join("SKILL.md"), + format!( + "---\nname: {SKILL_NAME}\ndescription: Colliding local skill.\n---\n\n{LOCAL_SKILL_BODY_MARKER}\n" + ), + )?; + + let plugin = TempDir::new()?; + let manifest_dir = plugin.path().join(".codex-plugin"); + let skill_dir = plugin.path().join("skills/deploy"); + let pid_file = plugin.path().join("executor-mcp.pid"); + std::fs::create_dir_all(&manifest_dir)?; + std::fs::create_dir_all(&skill_dir)?; + std::fs::write( + manifest_dir.join("plugin.json"), + r#"{"name":"executor-demo","apps":"./.app.json","interface":{"displayName":"Executor Demo"}}"#, + )?; + std::fs::write( + skill_dir.join("SKILL.md"), + format!( + "---\nname: deploy\ndescription: {SKILL_DESCRIPTION}\n---\n\n{SKILL_BODY_MARKER}\n" + ), + )?; + std::fs::write( + plugin.path().join(".app.json"), + format!(r#"{{"apps":{{"calendar":{{"id":"{CONNECTOR_ID}"}}}}}}"#), + )?; + std::fs::write( + plugin.path().join(".mcp.json"), + serde_json::to_vec_pretty(&json!({ + "mcpServers": { + (MCP_SERVER_NAME): { + "command": stdio_server_bin()?, + "env": { + "MCP_TEST_PID_FILE": pid_file.to_string_lossy(), + }, + "env_vars": [EXECUTOR_ENV_NAME], + "startup_timeout_sec": 10, + } + } + }))?, + )?; + + let selected_root = SelectedCapabilityRoot { + id: PLUGIN_ID.to_string(), + location: CapabilityRootLocation::Environment { + environment_id: EXECUTOR_ID.to_string(), + path: PathUri::from_host_native_path(plugin.path())?, + }, + }; + let environment_cwd = AbsolutePathBuf::try_from(plugin.path().to_path_buf())?; + Ok(SelectedCapabilityFixture { + codex_home, + _plugin: plugin, + pid_file, + exec_server_url, + selected_root, + environment_cwd, + }) +} + fn assert_selected_capabilities_absent(request: &ResponsesRequest) { assert!( request @@ -323,13 +519,7 @@ fn assert_selected_capabilities_absent(request: &ResponsesRequest) { } fn assert_selected_skill_is_injected_once(request: &ResponsesRequest) { - let catalog_fragments = request - .message_input_texts("developer") - .into_iter() - .filter(|text| text.contains(SKILL_DESCRIPTION)) - .collect::>(); - assert_eq!(1, catalog_fragments.len()); - assert!(catalog_fragments[0].contains("environment resource:")); + assert_selected_skill_catalog_available(request); let skill_fragments = request .message_input_texts("user") @@ -342,6 +532,16 @@ fn assert_selected_skill_is_injected_once(request: &ResponsesRequest) { assert!(!skill_fragments[0].contains(LOCAL_SKILL_BODY_MARKER)); } +fn assert_selected_skill_catalog_available(request: &ResponsesRequest) { + let catalog_fragments = request + .message_input_texts("developer") + .into_iter() + .filter(|text| text.contains(SKILL_DESCRIPTION)) + .collect::>(); + assert_eq!(1, catalog_fragments.len()); + assert!(catalog_fragments[0].contains("environment resource:")); +} + fn assert_selected_plugin_tools(request: &ResponsesRequest) { assert!( request From 85d4bd64f90b40f3992ead91ab4a88c3cdf3d854 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 26 Jun 2026 01:24:47 +0100 Subject: [PATCH 05/12] Strengthen selected capability integration coverage --- .../suite/v2/selected_capability_stack.rs | 68 +++++++++++++++---- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs index f185e679a5aa..d312687ee386 100644 --- a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs +++ b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs @@ -11,6 +11,8 @@ use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url; use codex_app_server_protocol::AppInfo; use codex_app_server_protocol::CapabilityRootLocation; use codex_app_server_protocol::EnvironmentAddResponse; +use codex_app_server_protocol::ListMcpServerStatusParams; +use codex_app_server_protocol::ListMcpServerStatusResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SelectedCapabilityRoot; use codex_app_server_protocol::ServerRequest; @@ -22,6 +24,7 @@ use codex_app_server_protocol::TurnEnvironmentParams; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::UserInput; use codex_config::types::AuthCredentialsStoreMode; +use codex_exec_server::LOCAL_ENVIRONMENT_ID; use codex_protocol::config_types::CollaborationMode; use codex_protocol::config_types::ModeKind; use codex_protocol::config_types::Settings; @@ -133,6 +136,7 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() &mut app_server, fixture.selected_root.clone(), fixture.environment_cwd.clone(), + EXECUTOR_ID, ) .await?; @@ -149,7 +153,7 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() let mut exec_server = spawn_exec_server(fixture.codex_home.path(), &fixture.exec_server_url).await?; add_environment(&mut app_server, &fixture.exec_server_url).await?; - tokio::time::sleep(Duration::from_millis(200)).await; + wait_for_selected_mcp_server(&mut app_server, &thread_id).await?; run_turn( &mut app_server, @@ -187,12 +191,12 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() .await??; let ThreadResumeResponse { thread, .. } = to_response(response)?; assert_eq!(thread_id, thread.id); - tokio::time::sleep(Duration::from_millis(200)).await; + wait_for_selected_mcp_server(&mut app_server, &thread_id).await?; run_turn( &mut app_server, &thread_id, - "Continue after resuming the thread", + &format!("Use ${SKILL_NAME} after resuming the thread"), fixture.environment_cwd, ) .await?; @@ -201,10 +205,12 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() let requests = response_mock.requests(); assert_eq!(5, requests.len()); - for request in &requests[1..] { - assert_selected_skill_is_injected_once(request); + for request in &requests[1..4] { + assert_selected_skill_is_injected(request, /*expected_count*/ 1); assert_selected_plugin_tools(request); } + assert_selected_skill_is_injected(&requests[4], /*expected_count*/ 2); + assert_selected_plugin_tools(&requests[4]); let output = requests[2].function_call_output(MCP_CALL_ID); let output = output["output"] .as_str() @@ -303,6 +309,7 @@ async fn selected_capabilities_become_available_between_samples_in_one_turn() -> &mut app_server, fixture.selected_root, fixture.environment_cwd.clone(), + LOCAL_ENVIRONMENT_ID, ) .await?; let turn_start_id = app_server @@ -313,7 +320,7 @@ async fn selected_capabilities_become_available_between_samples_in_one_turn() -> text_elements: Vec::new(), }], environments: Some(vec![TurnEnvironmentParams { - environment_id: EXECUTOR_ID.to_string(), + environment_id: LOCAL_ENVIRONMENT_ID.to_string(), cwd: fixture.environment_cwd.into(), }]), collaboration_mode: Some(CollaborationMode { @@ -429,7 +436,7 @@ fn selected_capability_fixture( std::fs::write( codex_home.path().join("environments.toml"), format!( - "default = \"{EXECUTOR_ID}\"\ninclude_local = false\n\n[[environments]]\nid = \"{EXECUTOR_ID}\"\nurl = \"{exec_server_url}\"\nconnect_timeout_sec = 0.05\n" + "default = \"{EXECUTOR_ID}\"\ninclude_local = true\n\n[[environments]]\nid = \"{EXECUTOR_ID}\"\nurl = \"{exec_server_url}\"\nconnect_timeout_sec = 0.05\n" ), )?; @@ -518,7 +525,7 @@ fn assert_selected_capabilities_absent(request: &ResponsesRequest) { ); } -fn assert_selected_skill_is_injected_once(request: &ResponsesRequest) { +fn assert_selected_skill_is_injected(request: &ResponsesRequest, expected_count: usize) { assert_selected_skill_catalog_available(request); let skill_fragments = request @@ -526,10 +533,12 @@ fn assert_selected_skill_is_injected_once(request: &ResponsesRequest) { .into_iter() .filter(|text| text.starts_with("")) .collect::>(); - assert_eq!(1, skill_fragments.len()); - assert!(skill_fragments[0].contains(&format!("{SKILL_NAME}"))); - assert!(skill_fragments[0].contains(SKILL_BODY_MARKER)); - assert!(!skill_fragments[0].contains(LOCAL_SKILL_BODY_MARKER)); + assert_eq!(expected_count, skill_fragments.len()); + for fragment in skill_fragments { + assert!(fragment.contains(&format!("{SKILL_NAME}"))); + assert!(fragment.contains(SKILL_BODY_MARKER)); + assert!(!fragment.contains(LOCAL_SKILL_BODY_MARKER)); + } } fn assert_selected_skill_catalog_available(request: &ResponsesRequest) { @@ -562,12 +571,13 @@ async fn start_thread( app_server: &mut TestAppServer, selected_root: SelectedCapabilityRoot, environment_cwd: AbsolutePathBuf, + turn_environment_id: &str, ) -> Result { let request_id = app_server .send_thread_start_request(ThreadStartParams { model: Some("mock-model".to_string()), environments: Some(vec![TurnEnvironmentParams { - environment_id: EXECUTOR_ID.to_string(), + environment_id: turn_environment_id.to_string(), cwd: environment_cwd.into(), }]), selected_capability_roots: Some(vec![selected_root]), @@ -636,6 +646,38 @@ async fn add_environment(app_server: &mut TestAppServer, exec_server_url: &str) Ok(()) } +async fn wait_for_selected_mcp_server( + app_server: &mut TestAppServer, + thread_id: &str, +) -> Result<()> { + timeout(READ_TIMEOUT, async { + loop { + let request_id = app_server + .send_list_mcp_server_status_request(ListMcpServerStatusParams { + cursor: None, + limit: None, + detail: None, + thread_id: Some(thread_id.to_string()), + }) + .await?; + let response = app_server + .read_stream_until_response_message(RequestId::Integer(request_id)) + .await?; + let response: ListMcpServerStatusResponse = to_response(response)?; + if response + .data + .iter() + .any(|server| server.name == MCP_SERVER_NAME) + { + return Ok::<_, anyhow::Error>(()); + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + }) + .await??; + Ok(()) +} + async fn spawn_exec_server(codex_home: &std::path::Path, url: &str) -> Result { let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex")?) .args(["exec-server", "--listen", url]) From 7da42a5d4b8eda4383f8fa7e18f690590984d02f Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 26 Jun 2026 01:43:20 +0100 Subject: [PATCH 06/12] Reuse unchanged MCP projections --- codex-rs/codex-mcp/src/catalog.rs | 5 +++++ codex-rs/core/src/session/mcp.rs | 30 ++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/codex-rs/codex-mcp/src/catalog.rs b/codex-rs/codex-mcp/src/catalog.rs index 689f93d813a2..d60ea843df87 100644 --- a/codex-rs/codex-mcp/src/catalog.rs +++ b/codex-rs/codex-mcp/src/catalog.rs @@ -371,6 +371,11 @@ impl ResolvedMcpCatalog { .collect() } + /// Returns whether both catalogs resolve to the same winning servers and sources. + pub fn has_same_servers(&self, other: &Self) -> bool { + self.servers == other.servers + } + /// Replaces the resolved server set while preserving known server sources. /// /// Names not present in the existing catalog are treated as config-owned. diff --git a/codex-rs/core/src/session/mcp.rs b/codex-rs/core/src/session/mcp.rs index 50921f1591b6..709d3e432863 100644 --- a/codex-rs/core/src/session/mcp.rs +++ b/codex-rs/core/src/session/mcp.rs @@ -133,6 +133,36 @@ impl Session { &available_environment_ids, ) .await; + let changed_environment_is_used_by_mcp = mcp_config + .mcp_server_catalog + .configured_servers() + .values() + .any(|server| { + let was_available = current + .available_environment_ids() + .contains(&server.environment_id); + let is_available = available_environment_ids.contains(&server.environment_id); + server.enabled && was_available != is_available + }); + if !changed_environment_is_used_by_mcp + && current + .config() + .mcp_server_catalog + .has_same_servers(&mcp_config.mcp_server_catalog) + && current.config().connector_snapshot == mcp_config.connector_snapshot + { + // Availability is only an input to the MCP projection. When that input changes but + // the projected servers and connectors do not, advance the input key without + // replacing the live manager and restarting its processes. + let runtime = Arc::new(McpRuntimeSnapshot::new( + Arc::new(current.config().clone()), + current.manager_arc(), + current.runtime_context().clone(), + available_environment_ids, + )); + self.services.mcp_runtime.store(Some(Arc::clone(&runtime))); + return runtime; + } self.refresh_mcp_servers_inner( turn_context, mcp_config, From 60655e8b41fd2904cc2c2428742135774f58002f Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 26 Jun 2026 02:43:14 +0100 Subject: [PATCH 07/12] Fall back when remote filesystem walk is unavailable --- codex-rs/exec-server/src/remote_file_system.rs | 17 +++++++++++++++-- codex-rs/file-system/src/lib.rs | 16 ++++++++++++++-- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/codex-rs/exec-server/src/remote_file_system.rs b/codex-rs/exec-server/src/remote_file_system.rs index acc86f758490..0d10c3aece70 100644 --- a/codex-rs/exec-server/src/remote_file_system.rs +++ b/codex-rs/exec-server/src/remote_file_system.rs @@ -29,6 +29,7 @@ use crate::protocol::FsWalkParams; use crate::protocol::FsWriteFileParams; const INVALID_REQUEST_ERROR_CODE: i64 = -32600; +const METHOD_NOT_FOUND_ERROR_CODE: i64 = -32601; const NOT_FOUND_ERROR_CODE: i64 = -32004; #[path = "remote_file_stream.rs"] @@ -194,14 +195,26 @@ impl RemoteFileSystem { ) -> FileSystemResult { trace!("remote fs walk"); let client = self.client.get().await.map_err(map_remote_error)?; - let response = client + let response = match client .fs_walk(FsWalkParams { path: path.clone(), options, sandbox: remote_sandbox_context(sandbox), }) .await - .map_err(map_remote_error)?; + { + Ok(response) => response, + Err(ExecServerError::Server { + code: METHOD_NOT_FOUND_ERROR_CODE, + .. + }) => { + return ::walk_via_directory_reads( + self, path, options, sandbox, + ) + .await; + } + Err(error) => return Err(map_remote_error(error)), + }; Ok(response) } diff --git a/codex-rs/file-system/src/lib.rs b/codex-rs/file-system/src/lib.rs index fcdecc8cb5eb..737412f46360 100644 --- a/codex-rs/file-system/src/lib.rs +++ b/codex-rs/file-system/src/lib.rs @@ -316,7 +316,19 @@ pub trait ExecutorFileSystem: Send + Sync { options: WalkOptions, sandbox: Option<&'a FileSystemSandboxContext>, ) -> ExecutorFileSystemFuture<'a, WalkOutcome> { - Box::pin(walk(self, path, options, sandbox)) + self.walk_via_directory_reads(path, options, sandbox) + } + + /// Performs a bounded walk using the primitive filesystem operations. + /// + /// Implementations with an optimized walk transport can use this as a compatibility fallback. + fn walk_via_directory_reads<'a>( + &'a self, + path: &'a PathUri, + options: WalkOptions, + sandbox: Option<&'a FileSystemSandboxContext>, + ) -> ExecutorFileSystemFuture<'a, WalkOutcome> { + Box::pin(walk_via_directory_reads(self, path, options, sandbox)) } fn remove<'a>( @@ -335,7 +347,7 @@ pub trait ExecutorFileSystem: Send + Sync { ) -> ExecutorFileSystemFuture<'a, ()>; } -async fn walk( +async fn walk_via_directory_reads( file_system: &F, root: &PathUri, options: WalkOptions, From 4595b6d6dadd7c4f73c3612b33eebdcd0dae9130 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Thu, 25 Jun 2026 20:47:05 +0100 Subject: [PATCH 08/12] Test selected capability stack across availability and resume --- .../app-server/tests/suite/v2/app_list.rs | 4 +- codex-rs/app-server/tests/suite/v2/mod.rs | 1 + .../suite/v2/selected_capability_stack.rs | 463 ++++++++++++++++++ 3 files changed, 466 insertions(+), 2 deletions(-) create mode 100644 codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs diff --git a/codex-rs/app-server/tests/suite/v2/app_list.rs b/codex-rs/app-server/tests/suite/v2/app_list.rs index 1ddf355269f8..5fad04929f7f 100644 --- a/codex-rs/app-server/tests/suite/v2/app_list.rs +++ b/codex-rs/app-server/tests/suite/v2/app_list.rs @@ -1533,7 +1533,7 @@ impl ServerHandler for AppListMcpServer { } } -async fn start_apps_server_with_delays( +pub(super) async fn start_apps_server_with_delays( connectors: Vec, tools: Vec, directory_delay: Duration, @@ -1693,7 +1693,7 @@ async fn list_directory_connectors( } } -fn connector_tool(connector_id: &str, connector_name: &str) -> Result { +pub(super) fn connector_tool(connector_id: &str, connector_name: &str) -> Result { let schema: JsonObject = serde_json::from_value(json!({ "type": "object", "additionalProperties": false diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index bbd995275463..50c5313aaaa1 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -56,6 +56,7 @@ mod request_user_input; mod request_validation; mod review; mod safety_check_downgrade; +mod selected_capability_stack; mod skills_list; mod sleep; mod thread_archive; diff --git a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs new file mode 100644 index 000000000000..e3b86af7864a --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs @@ -0,0 +1,463 @@ +use std::process::Stdio; +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use app_test_support::ChatGptAuthFixture; +use app_test_support::TestAppServer; +use app_test_support::to_response; +use app_test_support::write_chatgpt_auth; +use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url; +use codex_app_server_protocol::AppInfo; +use codex_app_server_protocol::CapabilityRootLocation; +use codex_app_server_protocol::EnvironmentAddResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::SelectedCapabilityRoot; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::TurnEnvironmentParams; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::UserInput; +use codex_config::types::AuthCredentialsStoreMode; +use codex_utils_absolute_path::AbsolutePathBuf; +use codex_utils_path_uri::PathUri; +use core_test_support::process::wait_for_pid_file; +use core_test_support::responses; +use core_test_support::responses::ResponsesRequest; +use core_test_support::stdio_server_bin; +use pretty_assertions::assert_eq; +use pretty_assertions::assert_ne; +use serde_json::json; +use tempfile::TempDir; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::process::Child; +use tokio::process::Command; +use tokio::time::timeout; + +use super::app_list::connector_tool; +use super::app_list::start_apps_server_with_delays; + +const READ_TIMEOUT: Duration = Duration::from_secs(20); +const EXECUTOR_ID: &str = "executor-1"; +const EXECUTOR_ENV_NAME: &str = "MCP_EXECUTOR_MARKER"; +const EXECUTOR_ENV_VALUE: &str = "executor-only"; +const PLUGIN_ID: &str = "executor-demo@1"; +const PLUGIN_DISPLAY_NAME: &str = "Executor Demo"; +const SKILL_NAME: &str = "executor-demo:deploy"; +const SKILL_DESCRIPTION: &str = "Deploy through the selected executor."; +const SKILL_BODY_MARKER: &str = "SELECTED_EXECUTOR_SKILL_BODY"; +const LOCAL_SKILL_BODY_MARKER: &str = "COLLIDING_LOCAL_SKILL_BODY"; +const MCP_SERVER_NAME: &str = "executor_probe"; +const MCP_CALL_ID: &str = "selected-executor-mcp-call"; +const CONNECTOR_ID: &str = "calendar"; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn selected_capability_stack_tracks_environment_availability_and_resume() -> Result<()> { + let responses_server = responses::start_mock_server().await; + let (apps_url, apps_server_handle) = start_apps_server_with_delays( + vec![AppInfo { + id: CONNECTOR_ID.to_string(), + name: "Calendar".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + icon_assets: None, + icon_dark_assets: None, + distribution_channel: None, + branding: None, + app_metadata: None, + labels: None, + install_url: None, + is_accessible: false, + is_enabled: true, + plugin_display_names: Vec::new(), + }], + vec![connector_tool(CONNECTOR_ID, "Calendar")?], + Duration::ZERO, + Duration::ZERO, + ) + .await?; + + let codex_home = TempDir::new()?; + write_mock_responses_config_toml_with_chatgpt_base_url( + codex_home.path(), + &responses_server.uri(), + &apps_url, + )?; + let config_path = codex_home.path().join("config.toml"); + let config = std::fs::read_to_string(&config_path)?.replacen( + "model_provider = \"mock_provider\"", + "mcp_oauth_credentials_store = \"file\"\nmodel_provider = \"mock_provider\"", + 1, + ); + std::fs::write( + config_path, + format!("{config}\n[features]\napps = true\n\n[skills]\ninclude_instructions = true\n"), + )?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .email("selected-capability-stack@example.com") + .plan_type("pro") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + // Reserve the URL before app-server starts. The configured environment initially fails to + // connect, then environment/add points the same stable ID at the same URL once it is live. + let listener = std::net::TcpListener::bind("127.0.0.1:0")?; + let exec_server_url = format!("ws://{}", listener.local_addr()?); + drop(listener); + std::fs::write( + codex_home.path().join("environments.toml"), + format!( + "default = \"{EXECUTOR_ID}\"\ninclude_local = false\n\n[[environments]]\nid = \"{EXECUTOR_ID}\"\nurl = \"{exec_server_url}\"\nconnect_timeout_sec = 0.05\n" + ), + )?; + + // A colliding host skill makes the authority assertion meaningful: once the executor is + // available, its body must win by name. + let local_skill_dir = codex_home.path().join("skills/local-deploy"); + std::fs::create_dir_all(&local_skill_dir)?; + std::fs::write( + local_skill_dir.join("SKILL.md"), + format!( + "---\nname: {SKILL_NAME}\ndescription: Colliding local skill.\n---\n\n{LOCAL_SKILL_BODY_MARKER}\n" + ), + )?; + + let plugin = TempDir::new()?; + let manifest_dir = plugin.path().join(".codex-plugin"); + let skill_dir = plugin.path().join("skills/deploy"); + let pid_file = plugin.path().join("executor-mcp.pid"); + std::fs::create_dir_all(&manifest_dir)?; + std::fs::create_dir_all(&skill_dir)?; + std::fs::write( + manifest_dir.join("plugin.json"), + r#"{"name":"executor-demo","apps":"./.app.json","interface":{"displayName":"Executor Demo"}}"#, + )?; + std::fs::write( + skill_dir.join("SKILL.md"), + format!( + "---\nname: deploy\ndescription: {SKILL_DESCRIPTION}\n---\n\n{SKILL_BODY_MARKER}\n" + ), + )?; + std::fs::write( + plugin.path().join(".app.json"), + format!(r#"{{"apps":{{"calendar":{{"id":"{CONNECTOR_ID}"}}}}}}"#), + )?; + std::fs::write( + plugin.path().join(".mcp.json"), + serde_json::to_vec_pretty(&json!({ + "mcpServers": { + (MCP_SERVER_NAME): { + "command": stdio_server_bin()?, + "env": { + "MCP_TEST_PID_FILE": pid_file.to_string_lossy(), + }, + "env_vars": [EXECUTOR_ENV_NAME], + "startup_timeout_sec": 10, + } + } + }))?, + )?; + + let response_mock = responses::mount_sse_sequence( + &responses_server, + vec![ + responses::sse(vec![ + responses::ev_response_created("environment-unavailable"), + responses::ev_assistant_message("unavailable-message", "Waiting"), + responses::ev_completed("environment-unavailable"), + ]), + responses::sse(vec![ + responses::ev_response_created("environment-available-call"), + responses::ev_function_call_with_namespace( + MCP_CALL_ID, + &format!("mcp__{MCP_SERVER_NAME}"), + "echo", + &json!({ + "message": "hello from the selected executor", + "env_var": EXECUTOR_ENV_NAME, + }) + .to_string(), + ), + responses::ev_completed("environment-available-call"), + ]), + responses::sse(vec![ + responses::ev_response_created("environment-available-done"), + responses::ev_assistant_message("available-message", "Done"), + responses::ev_completed("environment-available-done"), + ]), + responses::sse(vec![ + responses::ev_response_created("unchanged-step"), + responses::ev_assistant_message("unchanged-message", "Still ready"), + responses::ev_completed("unchanged-step"), + ]), + responses::sse(vec![ + responses::ev_response_created("resumed-step"), + responses::ev_assistant_message("resumed-message", "Ready after resume"), + responses::ev_completed("resumed-step"), + ]), + ], + ) + .await; + + let selected_root = SelectedCapabilityRoot { + id: PLUGIN_ID.to_string(), + location: CapabilityRootLocation::Environment { + environment_id: EXECUTOR_ID.to_string(), + path: PathUri::from_host_native_path(plugin.path())?, + }, + }; + let environment_cwd = AbsolutePathBuf::try_from(plugin.path().to_path_buf())?; + let mut app_server = TestAppServer::new(codex_home.path()).await?; + timeout(READ_TIMEOUT, app_server.initialize()).await??; + let thread_id = start_thread(&mut app_server, selected_root, environment_cwd.clone()).await?; + + run_turn( + &mut app_server, + &thread_id, + "Inspect the current capabilities", + environment_cwd.clone(), + ) + .await?; + let initial_requests = response_mock.requests(); + assert_selected_capabilities_absent(&initial_requests[0]); + + let mut exec_server = spawn_exec_server(codex_home.path(), &exec_server_url).await?; + add_environment(&mut app_server, &exec_server_url).await?; + tokio::time::sleep(Duration::from_millis(200)).await; + + run_turn( + &mut app_server, + &thread_id, + &format!("Use ${SKILL_NAME} and call its selected executor MCP"), + environment_cwd.clone(), + ) + .await?; + let first_mcp_pid = wait_for_pid_file(&pid_file).await?; + + run_turn( + &mut app_server, + &thread_id, + "Continue with the same selected capabilities", + environment_cwd.clone(), + ) + .await?; + assert_eq!(first_mcp_pid, wait_for_pid_file(&pid_file).await?); + + drop(app_server); + std::fs::remove_file(&pid_file)?; + + let mut app_server = TestAppServer::new(codex_home.path()).await?; + timeout(READ_TIMEOUT, app_server.initialize()).await??; + let request_id = app_server + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread_id.clone(), + ..Default::default() + }) + .await?; + let response = timeout( + READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let ThreadResumeResponse { thread, .. } = to_response(response)?; + assert_eq!(thread_id, thread.id); + tokio::time::sleep(Duration::from_millis(200)).await; + + run_turn( + &mut app_server, + &thread_id, + "Continue after resuming the thread", + environment_cwd, + ) + .await?; + let resumed_mcp_pid = wait_for_pid_file(&pid_file).await?; + assert_ne!(first_mcp_pid, resumed_mcp_pid); + + let requests = response_mock.requests(); + assert_eq!(5, requests.len()); + for request in &requests[1..] { + assert_selected_skill_is_injected_once(request); + assert_selected_plugin_tools(request); + } + let output = requests[2].function_call_output(MCP_CALL_ID); + let output = output["output"] + .as_str() + .expect("MCP function output should be text"); + assert!(output.contains("ECHOING: hello from the selected executor")); + assert!(output.contains(EXECUTOR_ENV_VALUE)); + + exec_server.kill().await?; + apps_server_handle.abort(); + let _ = apps_server_handle.await; + Ok(()) +} + +fn assert_selected_capabilities_absent(request: &ResponsesRequest) { + assert!( + request + .message_input_texts("developer") + .into_iter() + .all(|text| !text.contains(SKILL_DESCRIPTION)) + ); + assert!( + request + .tool_by_name(&format!("mcp__{MCP_SERVER_NAME}"), "echo") + .is_none() + ); + let connector = request + .tool_by_name("mcp__codex_apps__calendar", "connector_calendar") + .expect("host connector should remain model-visible"); + assert!( + connector["description"] + .as_str() + .is_some_and(|description| !description.contains(PLUGIN_DISPLAY_NAME)) + ); +} + +fn assert_selected_skill_is_injected_once(request: &ResponsesRequest) { + let catalog_fragments = request + .message_input_texts("developer") + .into_iter() + .filter(|text| text.contains(SKILL_DESCRIPTION)) + .collect::>(); + assert_eq!(1, catalog_fragments.len()); + assert!(catalog_fragments[0].contains("environment resource:")); + + let skill_fragments = request + .message_input_texts("user") + .into_iter() + .filter(|text| text.starts_with("")) + .collect::>(); + assert_eq!(1, skill_fragments.len()); + assert!(skill_fragments[0].contains(&format!("{SKILL_NAME}"))); + assert!(skill_fragments[0].contains(SKILL_BODY_MARKER)); + assert!(!skill_fragments[0].contains(LOCAL_SKILL_BODY_MARKER)); +} + +fn assert_selected_plugin_tools(request: &ResponsesRequest) { + assert!( + request + .tool_by_name(&format!("mcp__{MCP_SERVER_NAME}"), "echo") + .is_some() + ); + let connector = request + .tool_by_name("mcp__codex_apps__calendar", "connector_calendar") + .expect("selected connector should be model-visible"); + assert!( + connector["description"] + .as_str() + .is_some_and(|description| description.contains(PLUGIN_DISPLAY_NAME)) + ); +} + +async fn start_thread( + app_server: &mut TestAppServer, + selected_root: SelectedCapabilityRoot, + environment_cwd: AbsolutePathBuf, +) -> Result { + let request_id = app_server + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + environments: Some(vec![TurnEnvironmentParams { + environment_id: EXECUTOR_ID.to_string(), + cwd: environment_cwd.into(), + }]), + selected_capability_roots: Some(vec![selected_root]), + ..Default::default() + }) + .await?; + let response = timeout( + READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response(response)?; + Ok(thread.id) +} + +async fn run_turn( + app_server: &mut TestAppServer, + thread_id: &str, + text: &str, + environment_cwd: AbsolutePathBuf, +) -> Result<()> { + let request_id = app_server + .send_turn_start_request(TurnStartParams { + thread_id: thread_id.to_string(), + input: vec![UserInput::Text { + text: text.to_string(), + text_elements: Vec::new(), + }], + environments: Some(vec![TurnEnvironmentParams { + environment_id: EXECUTOR_ID.to_string(), + cwd: environment_cwd.into(), + }]), + ..Default::default() + }) + .await?; + timeout( + READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + timeout( + READ_TIMEOUT, + app_server.read_stream_until_notification_message("turn/completed"), + ) + .await??; + Ok(()) +} + +async fn add_environment(app_server: &mut TestAppServer, exec_server_url: &str) -> Result<()> { + let request_id = app_server + .send_raw_request( + "environment/add", + Some(json!({ + "environmentId": EXECUTOR_ID, + "execServerUrl": exec_server_url, + "connectTimeoutMs": 10_000, + })), + ) + .await?; + let response = timeout( + READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let _: EnvironmentAddResponse = to_response(response)?; + Ok(()) +} + +async fn spawn_exec_server(codex_home: &std::path::Path, url: &str) -> Result { + let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex")?) + .args(["exec-server", "--listen", url]) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .kill_on_drop(true) + .env("CODEX_HOME", codex_home) + .env(EXECUTOR_ENV_NAME, EXECUTOR_ENV_VALUE) + .spawn()?; + let stdout = child + .stdout + .take() + .context("exec-server stdout was not captured")?; + let mut lines = BufReader::new(stdout).lines(); + loop { + let line = timeout(READ_TIMEOUT, lines.next_line()) + .await + .context("timed out waiting for exec-server URL")?? + .context("exec-server exited before printing its URL")?; + if line.trim() == url { + return Ok(child); + } + } +} From 866e462d8bde64c42797067937cf03e4884e0b29 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Thu, 25 Jun 2026 22:35:19 +0100 Subject: [PATCH 09/12] Gate selected capability stack test on Windows --- codex-rs/app-server/tests/suite/v2/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 50c5313aaaa1..9963d37be5d2 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -56,6 +56,7 @@ mod request_user_input; mod request_validation; mod review; mod safety_check_downgrade; +#[cfg(not(target_os = "windows"))] mod selected_capability_stack; mod skills_list; mod sleep; From 258164021b98ccec2c7670ea48b8f54df005ffb9 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 26 Jun 2026 00:47:43 +0100 Subject: [PATCH 10/12] Cover same-turn selected capability activation --- .../suite/v2/selected_capability_stack.rs | 426 +++++++++++++----- 1 file changed, 313 insertions(+), 113 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs index e3b86af7864a..f185e679a5aa 100644 --- a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs +++ b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs @@ -13,6 +13,7 @@ use codex_app_server_protocol::CapabilityRootLocation; use codex_app_server_protocol::EnvironmentAddResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SelectedCapabilityRoot; +use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; @@ -21,6 +22,9 @@ use codex_app_server_protocol::TurnEnvironmentParams; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::UserInput; use codex_config::types::AuthCredentialsStoreMode; +use codex_protocol::config_types::CollaborationMode; +use codex_protocol::config_types::ModeKind; +use codex_protocol::config_types::Settings; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_path_uri::PathUri; use core_test_support::process::wait_for_pid_file; @@ -80,91 +84,7 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() Duration::ZERO, ) .await?; - - let codex_home = TempDir::new()?; - write_mock_responses_config_toml_with_chatgpt_base_url( - codex_home.path(), - &responses_server.uri(), - &apps_url, - )?; - let config_path = codex_home.path().join("config.toml"); - let config = std::fs::read_to_string(&config_path)?.replacen( - "model_provider = \"mock_provider\"", - "mcp_oauth_credentials_store = \"file\"\nmodel_provider = \"mock_provider\"", - 1, - ); - std::fs::write( - config_path, - format!("{config}\n[features]\napps = true\n\n[skills]\ninclude_instructions = true\n"), - )?; - write_chatgpt_auth( - codex_home.path(), - ChatGptAuthFixture::new("chatgpt-token") - .account_id("account-123") - .email("selected-capability-stack@example.com") - .plan_type("pro") - .chatgpt_account_id("account-123"), - AuthCredentialsStoreMode::File, - )?; - - // Reserve the URL before app-server starts. The configured environment initially fails to - // connect, then environment/add points the same stable ID at the same URL once it is live. - let listener = std::net::TcpListener::bind("127.0.0.1:0")?; - let exec_server_url = format!("ws://{}", listener.local_addr()?); - drop(listener); - std::fs::write( - codex_home.path().join("environments.toml"), - format!( - "default = \"{EXECUTOR_ID}\"\ninclude_local = false\n\n[[environments]]\nid = \"{EXECUTOR_ID}\"\nurl = \"{exec_server_url}\"\nconnect_timeout_sec = 0.05\n" - ), - )?; - - // A colliding host skill makes the authority assertion meaningful: once the executor is - // available, its body must win by name. - let local_skill_dir = codex_home.path().join("skills/local-deploy"); - std::fs::create_dir_all(&local_skill_dir)?; - std::fs::write( - local_skill_dir.join("SKILL.md"), - format!( - "---\nname: {SKILL_NAME}\ndescription: Colliding local skill.\n---\n\n{LOCAL_SKILL_BODY_MARKER}\n" - ), - )?; - - let plugin = TempDir::new()?; - let manifest_dir = plugin.path().join(".codex-plugin"); - let skill_dir = plugin.path().join("skills/deploy"); - let pid_file = plugin.path().join("executor-mcp.pid"); - std::fs::create_dir_all(&manifest_dir)?; - std::fs::create_dir_all(&skill_dir)?; - std::fs::write( - manifest_dir.join("plugin.json"), - r#"{"name":"executor-demo","apps":"./.app.json","interface":{"displayName":"Executor Demo"}}"#, - )?; - std::fs::write( - skill_dir.join("SKILL.md"), - format!( - "---\nname: deploy\ndescription: {SKILL_DESCRIPTION}\n---\n\n{SKILL_BODY_MARKER}\n" - ), - )?; - std::fs::write( - plugin.path().join(".app.json"), - format!(r#"{{"apps":{{"calendar":{{"id":"{CONNECTOR_ID}"}}}}}}"#), - )?; - std::fs::write( - plugin.path().join(".mcp.json"), - serde_json::to_vec_pretty(&json!({ - "mcpServers": { - (MCP_SERVER_NAME): { - "command": stdio_server_bin()?, - "env": { - "MCP_TEST_PID_FILE": pid_file.to_string_lossy(), - }, - "env_vars": [EXECUTOR_ENV_NAME], - "startup_timeout_sec": 10, - } - } - }))?, - )?; + let fixture = selected_capability_fixture(&responses_server.uri(), &apps_url)?; let response_mock = responses::mount_sse_sequence( &responses_server, @@ -207,54 +127,52 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() ) .await; - let selected_root = SelectedCapabilityRoot { - id: PLUGIN_ID.to_string(), - location: CapabilityRootLocation::Environment { - environment_id: EXECUTOR_ID.to_string(), - path: PathUri::from_host_native_path(plugin.path())?, - }, - }; - let environment_cwd = AbsolutePathBuf::try_from(plugin.path().to_path_buf())?; - let mut app_server = TestAppServer::new(codex_home.path()).await?; + let mut app_server = TestAppServer::new(fixture.codex_home.path()).await?; timeout(READ_TIMEOUT, app_server.initialize()).await??; - let thread_id = start_thread(&mut app_server, selected_root, environment_cwd.clone()).await?; + let thread_id = start_thread( + &mut app_server, + fixture.selected_root.clone(), + fixture.environment_cwd.clone(), + ) + .await?; run_turn( &mut app_server, &thread_id, "Inspect the current capabilities", - environment_cwd.clone(), + fixture.environment_cwd.clone(), ) .await?; let initial_requests = response_mock.requests(); assert_selected_capabilities_absent(&initial_requests[0]); - let mut exec_server = spawn_exec_server(codex_home.path(), &exec_server_url).await?; - add_environment(&mut app_server, &exec_server_url).await?; + let mut exec_server = + spawn_exec_server(fixture.codex_home.path(), &fixture.exec_server_url).await?; + add_environment(&mut app_server, &fixture.exec_server_url).await?; tokio::time::sleep(Duration::from_millis(200)).await; run_turn( &mut app_server, &thread_id, &format!("Use ${SKILL_NAME} and call its selected executor MCP"), - environment_cwd.clone(), + fixture.environment_cwd.clone(), ) .await?; - let first_mcp_pid = wait_for_pid_file(&pid_file).await?; + let first_mcp_pid = wait_for_pid_file(&fixture.pid_file).await?; run_turn( &mut app_server, &thread_id, "Continue with the same selected capabilities", - environment_cwd.clone(), + fixture.environment_cwd.clone(), ) .await?; - assert_eq!(first_mcp_pid, wait_for_pid_file(&pid_file).await?); + assert_eq!(first_mcp_pid, wait_for_pid_file(&fixture.pid_file).await?); drop(app_server); - std::fs::remove_file(&pid_file)?; + std::fs::remove_file(&fixture.pid_file)?; - let mut app_server = TestAppServer::new(codex_home.path()).await?; + let mut app_server = TestAppServer::new(fixture.codex_home.path()).await?; timeout(READ_TIMEOUT, app_server.initialize()).await??; let request_id = app_server .send_thread_resume_request(ThreadResumeParams { @@ -275,10 +193,10 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() &mut app_server, &thread_id, "Continue after resuming the thread", - environment_cwd, + fixture.environment_cwd, ) .await?; - let resumed_mcp_pid = wait_for_pid_file(&pid_file).await?; + let resumed_mcp_pid = wait_for_pid_file(&fixture.pid_file).await?; assert_ne!(first_mcp_pid, resumed_mcp_pid); let requests = response_mock.requests(); @@ -300,6 +218,284 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn selected_capabilities_become_available_between_samples_in_one_turn() -> Result<()> { + const USER_INPUT_CALL_ID: &str = "pause-for-environment"; + + let responses_server = responses::start_mock_server().await; + let (apps_url, apps_server_handle) = start_apps_server_with_delays( + vec![AppInfo { + id: CONNECTOR_ID.to_string(), + name: "Calendar".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + icon_assets: None, + icon_dark_assets: None, + distribution_channel: None, + branding: None, + app_metadata: None, + labels: None, + install_url: None, + is_accessible: false, + is_enabled: true, + plugin_display_names: Vec::new(), + }], + vec![connector_tool(CONNECTOR_ID, "Calendar")?], + Duration::ZERO, + Duration::ZERO, + ) + .await?; + let fixture = selected_capability_fixture(&responses_server.uri(), &apps_url)?; + let response_mock = responses::mount_sse_sequence( + &responses_server, + vec![ + responses::sse(vec![ + responses::ev_response_created("environment-pending"), + responses::ev_function_call( + USER_INPUT_CALL_ID, + "request_user_input", + &json!({ + "questions": [{ + "id": "continue", + "header": "Continue", + "question": "Continue after the executor is attached?", + "options": [{ + "label": "Yes (Recommended)", + "description": "Continue the same turn." + }, { + "label": "No", + "description": "Stop here." + }] + }], + "autoResolutionMs": 60_000 + }) + .to_string(), + ), + responses::ev_completed("environment-pending"), + ]), + responses::sse(vec![ + responses::ev_response_created("environment-ready-call"), + responses::ev_function_call_with_namespace( + MCP_CALL_ID, + &format!("mcp__{MCP_SERVER_NAME}"), + "echo", + &json!({ + "message": "same turn", + "env_var": EXECUTOR_ENV_NAME, + }) + .to_string(), + ), + responses::ev_completed("environment-ready-call"), + ]), + responses::sse(vec![ + responses::ev_response_created("same-turn-done"), + responses::ev_assistant_message("same-turn-message", "Done"), + responses::ev_completed("same-turn-done"), + ]), + ], + ) + .await; + + let mut app_server = TestAppServer::new(fixture.codex_home.path()).await?; + timeout(READ_TIMEOUT, app_server.initialize()).await??; + let thread_id = start_thread( + &mut app_server, + fixture.selected_root, + fixture.environment_cwd.clone(), + ) + .await?; + let turn_start_id = app_server + .send_turn_start_request(TurnStartParams { + thread_id, + input: vec![UserInput::Text { + text: "Use the executor when it becomes ready.".to_string(), + text_elements: Vec::new(), + }], + environments: Some(vec![TurnEnvironmentParams { + environment_id: EXECUTOR_ID.to_string(), + cwd: fixture.environment_cwd.into(), + }]), + collaboration_mode: Some(CollaborationMode { + mode: ModeKind::Plan, + settings: Settings { + model: "mock-model".to_string(), + reasoning_effort: None, + developer_instructions: None, + }, + }), + ..Default::default() + }) + .await?; + timeout( + READ_TIMEOUT, + app_server.read_stream_until_response_message(RequestId::Integer(turn_start_id)), + ) + .await??; + + let request = timeout(READ_TIMEOUT, app_server.read_stream_until_request_message()).await??; + let ServerRequest::ToolRequestUserInput { request_id, .. } = request else { + panic!("expected request_user_input, got {request:?}"); + }; + let requests = response_mock.requests(); + assert_eq!(1, requests.len()); + assert_selected_capabilities_absent(&requests[0]); + + let mut exec_server = + spawn_exec_server(fixture.codex_home.path(), &fixture.exec_server_url).await?; + add_environment(&mut app_server, &fixture.exec_server_url).await?; + tokio::time::sleep(Duration::from_millis(200)).await; + app_server + .send_response( + request_id, + json!({ + "answers": { + "continue": { "answers": ["yes"] } + } + }), + ) + .await?; + timeout( + READ_TIMEOUT, + app_server.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let requests = response_mock.requests(); + assert_eq!(3, requests.len()); + assert_selected_skill_catalog_available(&requests[1]); + assert_selected_plugin_tools(&requests[1]); + assert_selected_plugin_tools(&requests[2]); + let output = requests[2].function_call_output(MCP_CALL_ID); + let output = output["output"] + .as_str() + .expect("MCP function output should be text"); + assert!(output.contains("ECHOING: same turn")); + assert!(output.contains(EXECUTOR_ENV_VALUE)); + wait_for_pid_file(&fixture.pid_file).await?; + + exec_server.kill().await?; + apps_server_handle.abort(); + let _ = apps_server_handle.await; + Ok(()) +} + +struct SelectedCapabilityFixture { + codex_home: TempDir, + _plugin: TempDir, + pid_file: std::path::PathBuf, + exec_server_url: String, + selected_root: SelectedCapabilityRoot, + environment_cwd: AbsolutePathBuf, +} + +fn selected_capability_fixture( + responses_server_uri: &str, + apps_url: &str, +) -> Result { + let codex_home = TempDir::new()?; + write_mock_responses_config_toml_with_chatgpt_base_url( + codex_home.path(), + responses_server_uri, + apps_url, + )?; + let config_path = codex_home.path().join("config.toml"); + let config = std::fs::read_to_string(&config_path)?.replacen( + "model_provider = \"mock_provider\"", + "mcp_oauth_credentials_store = \"file\"\nmodel_provider = \"mock_provider\"", + 1, + ); + std::fs::write( + config_path, + format!( + "{config}\n[features]\napps = true\ndeferred_executor = true\n\n[skills]\ninclude_instructions = true\n" + ), + )?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .email("selected-capability-stack@example.com") + .plan_type("pro") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + // Reserve the URL before app-server starts. The configured environment initially fails to + // connect, then environment/add points the same stable ID at the same URL once it is live. + let listener = std::net::TcpListener::bind("127.0.0.1:0")?; + let exec_server_url = format!("ws://{}", listener.local_addr()?); + drop(listener); + std::fs::write( + codex_home.path().join("environments.toml"), + format!( + "default = \"{EXECUTOR_ID}\"\ninclude_local = false\n\n[[environments]]\nid = \"{EXECUTOR_ID}\"\nurl = \"{exec_server_url}\"\nconnect_timeout_sec = 0.05\n" + ), + )?; + + let local_skill_dir = codex_home.path().join("skills/local-deploy"); + std::fs::create_dir_all(&local_skill_dir)?; + std::fs::write( + local_skill_dir.join("SKILL.md"), + format!( + "---\nname: {SKILL_NAME}\ndescription: Colliding local skill.\n---\n\n{LOCAL_SKILL_BODY_MARKER}\n" + ), + )?; + + let plugin = TempDir::new()?; + let manifest_dir = plugin.path().join(".codex-plugin"); + let skill_dir = plugin.path().join("skills/deploy"); + let pid_file = plugin.path().join("executor-mcp.pid"); + std::fs::create_dir_all(&manifest_dir)?; + std::fs::create_dir_all(&skill_dir)?; + std::fs::write( + manifest_dir.join("plugin.json"), + r#"{"name":"executor-demo","apps":"./.app.json","interface":{"displayName":"Executor Demo"}}"#, + )?; + std::fs::write( + skill_dir.join("SKILL.md"), + format!( + "---\nname: deploy\ndescription: {SKILL_DESCRIPTION}\n---\n\n{SKILL_BODY_MARKER}\n" + ), + )?; + std::fs::write( + plugin.path().join(".app.json"), + format!(r#"{{"apps":{{"calendar":{{"id":"{CONNECTOR_ID}"}}}}}}"#), + )?; + std::fs::write( + plugin.path().join(".mcp.json"), + serde_json::to_vec_pretty(&json!({ + "mcpServers": { + (MCP_SERVER_NAME): { + "command": stdio_server_bin()?, + "env": { + "MCP_TEST_PID_FILE": pid_file.to_string_lossy(), + }, + "env_vars": [EXECUTOR_ENV_NAME], + "startup_timeout_sec": 10, + } + } + }))?, + )?; + + let selected_root = SelectedCapabilityRoot { + id: PLUGIN_ID.to_string(), + location: CapabilityRootLocation::Environment { + environment_id: EXECUTOR_ID.to_string(), + path: PathUri::from_host_native_path(plugin.path())?, + }, + }; + let environment_cwd = AbsolutePathBuf::try_from(plugin.path().to_path_buf())?; + Ok(SelectedCapabilityFixture { + codex_home, + _plugin: plugin, + pid_file, + exec_server_url, + selected_root, + environment_cwd, + }) +} + fn assert_selected_capabilities_absent(request: &ResponsesRequest) { assert!( request @@ -323,13 +519,7 @@ fn assert_selected_capabilities_absent(request: &ResponsesRequest) { } fn assert_selected_skill_is_injected_once(request: &ResponsesRequest) { - let catalog_fragments = request - .message_input_texts("developer") - .into_iter() - .filter(|text| text.contains(SKILL_DESCRIPTION)) - .collect::>(); - assert_eq!(1, catalog_fragments.len()); - assert!(catalog_fragments[0].contains("environment resource:")); + assert_selected_skill_catalog_available(request); let skill_fragments = request .message_input_texts("user") @@ -342,6 +532,16 @@ fn assert_selected_skill_is_injected_once(request: &ResponsesRequest) { assert!(!skill_fragments[0].contains(LOCAL_SKILL_BODY_MARKER)); } +fn assert_selected_skill_catalog_available(request: &ResponsesRequest) { + let catalog_fragments = request + .message_input_texts("developer") + .into_iter() + .filter(|text| text.contains(SKILL_DESCRIPTION)) + .collect::>(); + assert_eq!(1, catalog_fragments.len()); + assert!(catalog_fragments[0].contains("environment resource:")); +} + fn assert_selected_plugin_tools(request: &ResponsesRequest) { assert!( request From c38d428177dc958a9891e391da6fbf61f659f312 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 26 Jun 2026 01:24:47 +0100 Subject: [PATCH 11/12] Strengthen selected capability integration coverage --- .../suite/v2/selected_capability_stack.rs | 68 +++++++++++++++---- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs index f185e679a5aa..d312687ee386 100644 --- a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs +++ b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs @@ -11,6 +11,8 @@ use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url; use codex_app_server_protocol::AppInfo; use codex_app_server_protocol::CapabilityRootLocation; use codex_app_server_protocol::EnvironmentAddResponse; +use codex_app_server_protocol::ListMcpServerStatusParams; +use codex_app_server_protocol::ListMcpServerStatusResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SelectedCapabilityRoot; use codex_app_server_protocol::ServerRequest; @@ -22,6 +24,7 @@ use codex_app_server_protocol::TurnEnvironmentParams; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::UserInput; use codex_config::types::AuthCredentialsStoreMode; +use codex_exec_server::LOCAL_ENVIRONMENT_ID; use codex_protocol::config_types::CollaborationMode; use codex_protocol::config_types::ModeKind; use codex_protocol::config_types::Settings; @@ -133,6 +136,7 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() &mut app_server, fixture.selected_root.clone(), fixture.environment_cwd.clone(), + EXECUTOR_ID, ) .await?; @@ -149,7 +153,7 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() let mut exec_server = spawn_exec_server(fixture.codex_home.path(), &fixture.exec_server_url).await?; add_environment(&mut app_server, &fixture.exec_server_url).await?; - tokio::time::sleep(Duration::from_millis(200)).await; + wait_for_selected_mcp_server(&mut app_server, &thread_id).await?; run_turn( &mut app_server, @@ -187,12 +191,12 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() .await??; let ThreadResumeResponse { thread, .. } = to_response(response)?; assert_eq!(thread_id, thread.id); - tokio::time::sleep(Duration::from_millis(200)).await; + wait_for_selected_mcp_server(&mut app_server, &thread_id).await?; run_turn( &mut app_server, &thread_id, - "Continue after resuming the thread", + &format!("Use ${SKILL_NAME} after resuming the thread"), fixture.environment_cwd, ) .await?; @@ -201,10 +205,12 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() let requests = response_mock.requests(); assert_eq!(5, requests.len()); - for request in &requests[1..] { - assert_selected_skill_is_injected_once(request); + for request in &requests[1..4] { + assert_selected_skill_is_injected(request, /*expected_count*/ 1); assert_selected_plugin_tools(request); } + assert_selected_skill_is_injected(&requests[4], /*expected_count*/ 2); + assert_selected_plugin_tools(&requests[4]); let output = requests[2].function_call_output(MCP_CALL_ID); let output = output["output"] .as_str() @@ -303,6 +309,7 @@ async fn selected_capabilities_become_available_between_samples_in_one_turn() -> &mut app_server, fixture.selected_root, fixture.environment_cwd.clone(), + LOCAL_ENVIRONMENT_ID, ) .await?; let turn_start_id = app_server @@ -313,7 +320,7 @@ async fn selected_capabilities_become_available_between_samples_in_one_turn() -> text_elements: Vec::new(), }], environments: Some(vec![TurnEnvironmentParams { - environment_id: EXECUTOR_ID.to_string(), + environment_id: LOCAL_ENVIRONMENT_ID.to_string(), cwd: fixture.environment_cwd.into(), }]), collaboration_mode: Some(CollaborationMode { @@ -429,7 +436,7 @@ fn selected_capability_fixture( std::fs::write( codex_home.path().join("environments.toml"), format!( - "default = \"{EXECUTOR_ID}\"\ninclude_local = false\n\n[[environments]]\nid = \"{EXECUTOR_ID}\"\nurl = \"{exec_server_url}\"\nconnect_timeout_sec = 0.05\n" + "default = \"{EXECUTOR_ID}\"\ninclude_local = true\n\n[[environments]]\nid = \"{EXECUTOR_ID}\"\nurl = \"{exec_server_url}\"\nconnect_timeout_sec = 0.05\n" ), )?; @@ -518,7 +525,7 @@ fn assert_selected_capabilities_absent(request: &ResponsesRequest) { ); } -fn assert_selected_skill_is_injected_once(request: &ResponsesRequest) { +fn assert_selected_skill_is_injected(request: &ResponsesRequest, expected_count: usize) { assert_selected_skill_catalog_available(request); let skill_fragments = request @@ -526,10 +533,12 @@ fn assert_selected_skill_is_injected_once(request: &ResponsesRequest) { .into_iter() .filter(|text| text.starts_with("")) .collect::>(); - assert_eq!(1, skill_fragments.len()); - assert!(skill_fragments[0].contains(&format!("{SKILL_NAME}"))); - assert!(skill_fragments[0].contains(SKILL_BODY_MARKER)); - assert!(!skill_fragments[0].contains(LOCAL_SKILL_BODY_MARKER)); + assert_eq!(expected_count, skill_fragments.len()); + for fragment in skill_fragments { + assert!(fragment.contains(&format!("{SKILL_NAME}"))); + assert!(fragment.contains(SKILL_BODY_MARKER)); + assert!(!fragment.contains(LOCAL_SKILL_BODY_MARKER)); + } } fn assert_selected_skill_catalog_available(request: &ResponsesRequest) { @@ -562,12 +571,13 @@ async fn start_thread( app_server: &mut TestAppServer, selected_root: SelectedCapabilityRoot, environment_cwd: AbsolutePathBuf, + turn_environment_id: &str, ) -> Result { let request_id = app_server .send_thread_start_request(ThreadStartParams { model: Some("mock-model".to_string()), environments: Some(vec![TurnEnvironmentParams { - environment_id: EXECUTOR_ID.to_string(), + environment_id: turn_environment_id.to_string(), cwd: environment_cwd.into(), }]), selected_capability_roots: Some(vec![selected_root]), @@ -636,6 +646,38 @@ async fn add_environment(app_server: &mut TestAppServer, exec_server_url: &str) Ok(()) } +async fn wait_for_selected_mcp_server( + app_server: &mut TestAppServer, + thread_id: &str, +) -> Result<()> { + timeout(READ_TIMEOUT, async { + loop { + let request_id = app_server + .send_list_mcp_server_status_request(ListMcpServerStatusParams { + cursor: None, + limit: None, + detail: None, + thread_id: Some(thread_id.to_string()), + }) + .await?; + let response = app_server + .read_stream_until_response_message(RequestId::Integer(request_id)) + .await?; + let response: ListMcpServerStatusResponse = to_response(response)?; + if response + .data + .iter() + .any(|server| server.name == MCP_SERVER_NAME) + { + return Ok::<_, anyhow::Error>(()); + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + }) + .await??; + Ok(()) +} + async fn spawn_exec_server(codex_home: &std::path::Path, url: &str) -> Result { let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex")?) .args(["exec-server", "--listen", url]) From 658308252902cd0ef88852e488a37115dd7e0384 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 26 Jun 2026 03:05:08 +0100 Subject: [PATCH 12/12] Keep selected capability executor separate in integration test --- .../app-server/tests/suite/v2/selected_capability_stack.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs index d312687ee386..052f286ffc10 100644 --- a/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs +++ b/codex-rs/app-server/tests/suite/v2/selected_capability_stack.rs @@ -136,7 +136,6 @@ async fn selected_capability_stack_tracks_environment_availability_and_resume() &mut app_server, fixture.selected_root.clone(), fixture.environment_cwd.clone(), - EXECUTOR_ID, ) .await?; @@ -309,7 +308,6 @@ async fn selected_capabilities_become_available_between_samples_in_one_turn() -> &mut app_server, fixture.selected_root, fixture.environment_cwd.clone(), - LOCAL_ENVIRONMENT_ID, ) .await?; let turn_start_id = app_server @@ -571,13 +569,12 @@ async fn start_thread( app_server: &mut TestAppServer, selected_root: SelectedCapabilityRoot, environment_cwd: AbsolutePathBuf, - turn_environment_id: &str, ) -> Result { let request_id = app_server .send_thread_start_request(ThreadStartParams { model: Some("mock-model".to_string()), environments: Some(vec![TurnEnvironmentParams { - environment_id: turn_environment_id.to_string(), + environment_id: LOCAL_ENVIRONMENT_ID.to_string(), cwd: environment_cwd.into(), }]), selected_capability_roots: Some(vec![selected_root]), @@ -607,7 +604,7 @@ async fn run_turn( text_elements: Vec::new(), }], environments: Some(vec![TurnEnvironmentParams { - environment_id: EXECUTOR_ID.to_string(), + environment_id: LOCAL_ENVIRONMENT_ID.to_string(), cwd: environment_cwd.into(), }]), ..Default::default()