diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 0568f20e5faf..6eff342082f8 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -479,6 +479,9 @@ "default_mode_request_user_input": { "type": "boolean" }, + "deferred_executor": { + "type": "boolean" + }, "elevated_windows_sandbox": { "type": "boolean" }, @@ -4712,6 +4715,9 @@ "default_mode_request_user_input": { "type": "boolean" }, + "deferred_executor": { + "type": "boolean" + }, "elevated_windows_sandbox": { "type": "boolean" }, diff --git a/codex-rs/core/src/agents_md_tests.rs b/codex-rs/core/src/agents_md_tests.rs index 0e1a670fc20a..0122910c3b94 100644 --- a/codex-rs/core/src/agents_md_tests.rs +++ b/codex-rs/core/src/agents_md_tests.rs @@ -277,6 +277,7 @@ fn resolved_local_environments( ) }) .collect(), + starting: Vec::new(), } } diff --git a/codex-rs/core/src/environment_selection.rs b/codex-rs/core/src/environment_selection.rs index 16b2c47b5dd7..0d82a147be05 100644 --- a/codex-rs/core/src/environment_selection.rs +++ b/codex-rs/core/src/environment_selection.rs @@ -1,11 +1,12 @@ use std::collections::HashSet; +use std::fmt; use std::sync::Arc; use arc_swap::ArcSwap; +use codex_exec_server::Environment; use codex_exec_server::EnvironmentManager; +use codex_exec_server::ExecServerError; use codex_exec_server::ExecutorFileSystem; -use codex_protocol::error::CodexErr; -use codex_protocol::error::Result as CodexResult; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_path_uri::PathUri; @@ -31,13 +32,37 @@ pub(crate) fn default_thread_environment_selections( .collect() } -type SnapshotTask = Shared>; +type TurnEnvironmentResult = Result>; +type TurnEnvironmentResolution = Shared>; + +#[derive(Clone)] +struct SelectedTurnEnvironment { + selection: TurnEnvironmentSelection, + resolution: TurnEnvironmentResolution, +} + +#[derive(Clone)] +pub(crate) struct StartingTurnEnvironment { + pub(crate) selection: TurnEnvironmentSelection, + resolution: TurnEnvironmentResolution, +} + +impl fmt::Debug for StartingTurnEnvironment { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter + .debug_struct("StartingTurnEnvironment") + .field("selection", &self.selection) + .field("resolved", &self.resolution.peek().is_some()) + .finish_non_exhaustive() + } +} pub(crate) struct ThreadEnvironments { environment_manager: Arc, local_shell: Shell, shell_snapshot: ShellSnapshot, - snapshot_task: ArcSwap, + non_blocking_snapshots: bool, + environments: ArcSwap>, } impl ThreadEnvironments { @@ -46,132 +71,142 @@ impl ThreadEnvironments { local_shell: Shell, shell_snapshot: ShellSnapshot, current: TurnEnvironmentSnapshot, + non_blocking_snapshots: bool, ) -> Self { + // Reuse only attached environments from the supplied snapshot; drop starting entries. + let environments = current + .turn_environments + .into_iter() + .map(|environment| { + let selection = environment.selection(); + let resolution: TurnEnvironmentResolution = + futures::future::ready(Ok(environment)).boxed().shared(); + SelectedTurnEnvironment { + selection, + resolution, + } + }) + .collect(); Self { environment_manager, local_shell, shell_snapshot, - snapshot_task: ArcSwap::from_pointee(futures::future::ready(current).boxed().shared()), + non_blocking_snapshots, + environments: ArcSwap::from_pointee(environments), } } pub(crate) fn update_selections(&self, environments: &[TurnEnvironmentSelection]) { - let previous = self - .snapshot_task - .load() - .peek() - .cloned() - .unwrap_or_default(); - let environment_manager = Arc::clone(&self.environment_manager); - let local_shell = self.local_shell.clone(); - let shell_snapshot = self.shell_snapshot.clone(); - let environments = environments.to_vec(); - let (snapshot_task, snapshot) = async move { - Self::resolve_snapshot( - environment_manager, - local_shell, - shell_snapshot, - previous, - environments, - ) - .await - } - .remote_handle(); - self.snapshot_task - .store(Arc::new(snapshot.boxed().shared())); - drop(tokio::spawn(snapshot_task)); - } - - async fn resolve_snapshot( - environment_manager: Arc, - local_shell: Shell, - shell_snapshot: ShellSnapshot, - current: TurnEnvironmentSnapshot, - environments: Vec, - ) -> TurnEnvironmentSnapshot { + let previous = self.environments.load(); let mut seen_environment_ids = HashSet::with_capacity(environments.len()); - let mut turn_environments = Vec::with_capacity(environments.len()); - for selected_environment in &environments { + let mut next = Vec::with_capacity(environments.len()); + for selected_environment in environments { if !seen_environment_ids.insert(selected_environment.environment_id.as_str()) { continue; } - let turn_environment = match current.turn_environments.iter().find(|environment| { - environment.environment_id == selected_environment.environment_id - && environment.cwd() == &selected_environment.cwd - }) { - Some(environment) => environment.clone(), - None => match Self::resolve_selection( - &environment_manager, - &local_shell, - &shell_snapshot, - selected_environment, - ) - .await - { - Ok(environment) => environment, - Err(err) => { - tracing::warn!( - "skipping unresolved turn environment `{}`: {err}", - selected_environment.environment_id - ); - continue; - } - }, + if let Some(environment) = previous + .iter() + .find(|environment| environment.selection == *selected_environment) + && !matches!(environment.resolution.clone().now_or_never(), Some(Err(_))) + { + next.push(environment.clone()); + continue; + } + + let environment_id = &selected_environment.environment_id; + let Some(environment) = self.environment_manager.get_environment(environment_id) else { + tracing::warn!("skipping unknown turn environment `{environment_id}`"); + continue; }; - turn_environments.push(turn_environment); + let (resolution_task, resolution) = Self::resolve_environment( + selected_environment.clone(), + environment, + self.local_shell.clone(), + self.shell_snapshot.clone(), + ) + .remote_handle(); + drop(tokio::spawn(resolution_task)); + let resolution = resolution.boxed().shared(); + next.push(SelectedTurnEnvironment { + selection: selected_environment.clone(), + resolution, + }); } - TurnEnvironmentSnapshot { turn_environments } + self.environments.store(Arc::new(next)); } - async fn resolve_selection( - environment_manager: &EnvironmentManager, - local_shell: &Shell, - shell_snapshot: &ShellSnapshot, - selected_environment: &TurnEnvironmentSelection, - ) -> CodexResult { - let environment_id = selected_environment.environment_id.clone(); - let environment = environment_manager - .get_environment(&environment_id) - .ok_or_else(|| { - CodexErr::InvalidRequest(format!("unknown turn environment id `{environment_id}`")) - })?; - let shell = if environment.is_remote() { - match environment.info().await { - Ok(info) => match Shell::from_environment_shell_info(info.shell) { - Ok(shell) => Some(shell), + fn resolve_environment( + selection: TurnEnvironmentSelection, + environment: Arc, + local_shell: Shell, + shell_snapshot: ShellSnapshot, + ) -> BoxFuture<'static, TurnEnvironmentResult> { + async move { + let environment_id = &selection.environment_id; + if let Err(err) = environment.wait_until_ready().await { + tracing::warn!("turn environment `{environment_id}` failed to start: {err}"); + return Err(Arc::new(err)); + } + let shell = if environment.is_remote() { + match environment.info().await { + Ok(info) => match Shell::from_environment_shell_info(info.shell) { + Ok(shell) => Some(shell), + Err(err) => { + tracing::warn!( + "failed to resolve shell for environment `{environment_id}`: {err}" + ); + None + } + }, Err(err) => { tracing::warn!( - "failed to resolve shell for environment `{environment_id}`: {err}" + "failed to get info for environment `{environment_id}`: {err}" ); None } - }, - Err(err) => { - tracing::warn!("failed to get info for environment `{environment_id}`: {err}"); - None } - } - } else { - Some(local_shell.clone()) - }; - let mut turn_environment = TurnEnvironment::new( - environment_id, - environment, - selected_environment.cwd.clone(), - shell, - ); - let task = shell_snapshot - .clone() - .build(turn_environment.clone()) - .boxed() - .shared(); - drop(tokio::spawn(task.clone())); - turn_environment.shell_snapshot = task; - Ok(turn_environment) + } else { + Some(local_shell) + }; + let mut turn_environment = + TurnEnvironment::new(selection.environment_id, environment, selection.cwd, shell); + let task = shell_snapshot + .build(turn_environment.clone()) + .boxed() + .shared(); + drop(tokio::spawn(task.clone())); + turn_environment.shell_snapshot = task; + Ok(turn_environment) + } + .boxed() } pub(crate) async fn snapshot(&self) -> TurnEnvironmentSnapshot { - self.snapshot_task.load_full().as_ref().clone().await + let current = self.environments.load_full(); + let mut turn_environments = Vec::with_capacity(current.len()); + let mut starting = Vec::new(); + for environment in current.iter() { + let resolved = if self.non_blocking_snapshots { + environment.resolution.clone().now_or_never() + } else { + Some(environment.resolution.clone().await) + }; + match resolved { + Some(Ok(turn_environment)) => turn_environments.push(turn_environment), + Some(Err(err)) => tracing::debug!( + environment_id = %environment.selection.environment_id, + "skipping failed turn environment: {err}" + ), + None => starting.push(StartingTurnEnvironment { + selection: environment.selection.clone(), + resolution: environment.resolution.clone(), + }), + } + } + TurnEnvironmentSnapshot { + turn_environments, + starting, + } } pub(crate) fn environment_manager(&self) -> Arc { @@ -182,6 +217,7 @@ impl ThreadEnvironments { #[derive(Clone, Debug, Default)] pub(crate) struct TurnEnvironmentSnapshot { pub(crate) turn_environments: Vec, + pub(crate) starting: Vec, } impl TurnEnvironmentSnapshot { @@ -214,6 +250,9 @@ impl TurnEnvironmentSnapshot { } pub(crate) fn single_local_environment(&self) -> Option<&TurnEnvironment> { + if !self.starting.is_empty() { + return None; + } let [environment] = self.turn_environments.as_slice() else { return None; }; @@ -230,6 +269,8 @@ impl TurnEnvironmentSnapshot { #[cfg(test)] mod tests { + use std::time::Duration; + use codex_exec_server::Environment; use codex_exec_server::ExecServerRuntimePaths; use codex_exec_server::LOCAL_ENVIRONMENT_ID; @@ -237,7 +278,16 @@ mod tests { use codex_protocol::protocol::TurnEnvironmentSelection; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_path_uri::PathUri; + use futures::SinkExt; + use futures::StreamExt; use pretty_assertions::assert_eq; + use serde_json::Value; + use tokio::net::TcpListener; + use tokio::net::TcpStream; + use tokio::time::timeout; + use tokio_tungstenite::WebSocketStream; + use tokio_tungstenite::accept_async; + use tokio_tungstenite::tungstenite::Message; use super::*; @@ -250,6 +300,7 @@ mod tests { crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), + /*non_blocking_snapshots*/ false, )); turn_environments.update_selections(selections); turn_environments.snapshot().await; @@ -264,6 +315,61 @@ mod tests { .expect("runtime paths") } + async fn read_websocket_json(websocket: &mut WebSocketStream) -> Value { + loop { + match timeout(std::time::Duration::from_secs(5), websocket.next()) + .await + .expect("websocket read should not time out") + .expect("websocket should stay open") + .expect("websocket frame should read") + { + Message::Text(text) => { + return serde_json::from_str(text.as_ref()).expect("valid JSON-RPC message"); + } + Message::Binary(bytes) => { + return serde_json::from_slice(bytes.as_ref()).expect("valid JSON-RPC message"); + } + Message::Ping(_) | Message::Pong(_) => {} + other => panic!("expected JSON-RPC message, got {other:?}"), + } + } + } + + async fn serve_environment_info(listener: TcpListener) { + let (stream, _) = listener.accept().await.expect("connection"); + let mut websocket = accept_async(stream).await.expect("websocket handshake"); + + let initialize = read_websocket_json(&mut websocket).await; + assert_eq!(initialize["method"], "initialize"); + websocket + .send(Message::Text( + serde_json::json!({ + "id": initialize["id"], + "result": { "sessionId": "test-session" } + }) + .to_string() + .into(), + )) + .await + .expect("initialize response"); + let initialized = read_websocket_json(&mut websocket).await; + assert_eq!(initialized["method"], "initialized"); + + let info = read_websocket_json(&mut websocket).await; + assert_eq!(info["method"], "environment/info"); + websocket + .send(Message::Text( + serde_json::json!({ + "id": info["id"], + "result": { "shell": { "name": "zsh", "path": "/bin/zsh" } } + }) + .to_string() + .into(), + )) + .await + .expect("environment info response"); + } + #[tokio::test] async fn default_thread_environment_selections_use_manager_default_id() { let cwd = AbsolutePathBuf::current_dir().expect("cwd"); @@ -340,6 +446,7 @@ url = "ws://127.0.0.1:8765" local_shell.clone(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), + /*non_blocking_snapshots*/ false, ); turn_environments.update_selections(&[TurnEnvironmentSelection { environment_id: LOCAL_ENVIRONMENT_ID.to_string(), @@ -448,58 +555,175 @@ url = "ws://127.0.0.1:8765" } #[tokio::test] - async fn latest_environment_update_wins_while_previous_resolution_is_pending() { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + async fn blocking_snapshot_waits_for_starting_environment() { + let listener = TcpListener::bind("127.0.0.1:0") .await .expect("bind websocket listener"); let manager = Arc::new( - EnvironmentManager::create_for_tests_with_local( + EnvironmentManager::create_for_tests( Some(format!( "ws://{}", listener.local_addr().expect("listener address") )), - test_runtime_paths(), + Some(test_runtime_paths()), ) .await, ); - let cwd = AbsolutePathBuf::current_dir().expect("cwd"); - let turn_environments = Arc::new(ThreadEnvironments::new( + let selection = TurnEnvironmentSelection { + environment_id: REMOTE_ENVIRONMENT_ID.to_string(), + cwd: PathUri::from_abs_path(&AbsolutePathBuf::current_dir().expect("cwd")), + }; + let environments = Arc::new(ThreadEnvironments::new( manager, crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), + /*non_blocking_snapshots*/ false, )); - turn_environments.update_selections(&[TurnEnvironmentSelection { + environments.update_selections(std::slice::from_ref(&selection)); + let snapshot_task = tokio::spawn({ + let environments = Arc::clone(&environments); + async move { environments.snapshot().await } + }); + tokio::task::yield_now().await; + assert!(!snapshot_task.is_finished()); + + let server = tokio::spawn(serve_environment_info(listener)); + let snapshot = timeout(Duration::from_secs(5), snapshot_task) + .await + .expect("snapshot should finish after the environment starts") + .expect("snapshot task"); + + assert!(snapshot.starting.is_empty()); + assert_eq!(snapshot.to_selections(), vec![selection]); + server.await.expect("server task"); + } + + #[tokio::test] + async fn snapshot_keeps_starting_environment_until_it_can_be_attached() { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind websocket listener"); + let manager = Arc::new( + EnvironmentManager::create_for_tests_with_local( + Some(format!( + "ws://{}", + listener.local_addr().expect("listener address") + )), + test_runtime_paths(), + ) + .await, + ); + let cwd = AbsolutePathBuf::current_dir().expect("cwd"); + let cwd = PathUri::from_abs_path(&cwd); + let remote = TurnEnvironmentSelection { environment_id: REMOTE_ENVIRONMENT_ID.to_string(), - cwd: PathUri::from_abs_path(&cwd), - }]); - let (_connection, _) = - tokio::time::timeout(std::time::Duration::from_secs(5), listener.accept()) - .await - .expect("remote resolution should connect") - .expect("accept remote resolution connection"); + cwd: cwd.clone(), + }; let local = TurnEnvironmentSelection { environment_id: LOCAL_ENVIRONMENT_ID.to_string(), - cwd: PathUri::from_abs_path(&cwd), + cwd, }; + let turn_environments = ThreadEnvironments::new( + manager, + crate::shell::default_user_shell(), + ShellSnapshot::disabled(), + TurnEnvironmentSnapshot::default(), + /*non_blocking_snapshots*/ true, + ); + turn_environments.update_selections(&[remote.clone(), local.clone()]); + + let starting = turn_environments.snapshot().await; + assert!(starting.turn_environments.is_empty()); + assert_eq!( + starting + .starting + .iter() + .map(|environment| environment.selection.clone()) + .collect::>(), + vec![remote.clone(), local.clone()] + ); + assert!(starting.to_selections().is_empty()); + assert!(starting.single_local_environment().is_none()); - turn_environments.update_selections(std::slice::from_ref(&local)); - let snapshot = tokio::time::timeout( + let server = tokio::spawn(serve_environment_info(listener)); + timeout( std::time::Duration::from_secs(5), - turn_environments.snapshot(), + starting.starting[0].resolution.clone(), ) .await - .expect("latest environment resolution should complete"); + .expect("environment resolution should finish") + .expect("environment resolution should succeed"); + let attached = turn_environments.snapshot().await; + + assert!(attached.starting.is_empty()); + assert_eq!( + attached + .turn_environments + .iter() + .map(TurnEnvironment::selection) + .collect::>(), + vec![remote.clone(), local.clone()] + ); + assert_eq!(attached.to_selections(), vec![remote, local]); + server.await.expect("server task"); + } + + #[tokio::test] + async fn failed_resolution_is_replaced_from_the_environment_manager() { + let manager = Arc::new( + EnvironmentManager::create_for_tests( + Some("http://example.com".to_string()), + Some(test_runtime_paths()), + ) + .await, + ); + let selection = TurnEnvironmentSelection { + environment_id: REMOTE_ENVIRONMENT_ID.to_string(), + cwd: PathUri::from_abs_path(&AbsolutePathBuf::current_dir().expect("cwd")), + }; + let environments = ThreadEnvironments::new( + Arc::clone(&manager), + crate::shell::default_user_shell(), + ShellSnapshot::disabled(), + TurnEnvironmentSnapshot::default(), + /*non_blocking_snapshots*/ true, + ); + environments.update_selections(std::slice::from_ref(&selection)); + let failed_resolution = environments.environments.load()[0].resolution.clone(); + assert!(failed_resolution.clone().await.is_err()); + + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind replacement listener"); + manager + .upsert_environment( + REMOTE_ENVIRONMENT_ID.to_string(), + format!("ws://{}", listener.local_addr().expect("listener address")), + ) + .expect("replacement environment"); + environments.update_selections(std::slice::from_ref(&selection)); - assert_eq!(snapshot.to_selections(), vec![local]); + let replacement = environments.snapshot().await; + let [replacement] = replacement.starting.as_slice() else { + panic!("expected the replacement environment to be starting"); + }; + assert_eq!(replacement.selection, selection); + assert!(!failed_resolution.ptr_eq(&replacement.resolution)); } #[tokio::test] - async fn matching_environment_id_and_cwd_reuse_resolved_environment() { + async fn matching_environment_id_and_cwd_reuse_resolution() { let cwd = AbsolutePathBuf::current_dir().expect("cwd"); + let first_listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind first listener"); let manager = Arc::new( EnvironmentManager::create_for_tests( - Some("ws://127.0.0.1:8765".to_string()), + Some(format!( + "ws://{}", + first_listener.local_addr().expect("first listener address") + )), Some(test_runtime_paths()), ) .await, @@ -508,43 +732,98 @@ url = "ws://127.0.0.1:8765" environment_id: REMOTE_ENVIRONMENT_ID.to_string(), cwd: PathUri::from_abs_path(&cwd), }; - let initial = - resolve_turn_environments(Arc::clone(&manager), std::slice::from_ref(&selection)).await; + let environments = ThreadEnvironments::new( + Arc::clone(&manager), + crate::shell::default_user_shell(), + ShellSnapshot::disabled(), + TurnEnvironmentSnapshot::default(), + /*non_blocking_snapshots*/ true, + ); + environments.update_selections(std::slice::from_ref(&selection)); + let initial_snapshot = environments.snapshot().await; + let second_listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind second listener"); manager .upsert_environment( REMOTE_ENVIRONMENT_ID.to_string(), - "ws://127.0.0.1:9876".to_string(), + format!( + "ws://{}", + second_listener + .local_addr() + .expect("second listener address") + ), ) .expect("replace environment"); - let initial_snapshot = initial.snapshot().await; - initial.update_selections(std::slice::from_ref(&selection)); - let reused_snapshot = initial.snapshot().await; - initial.update_selections(&[TurnEnvironmentSelection { + environments.update_selections(std::slice::from_ref(&selection)); + let reused_snapshot = environments.snapshot().await; + environments.update_selections(&[TurnEnvironmentSelection { cwd: PathUri::from_abs_path(&cwd.join("changed")), ..selection }]); - let changed_snapshot = initial.snapshot().await; + let changed_snapshot = environments.snapshot().await; + + let initial = initial_snapshot + .starting + .first() + .expect("initial environment"); + let reused = reused_snapshot + .starting + .first() + .expect("reused environment"); + let changed = changed_snapshot + .starting + .first() + .expect("changed environment"); + assert!(initial.resolution.ptr_eq(&reused.resolution)); + assert!(!reused.resolution.ptr_eq(&changed.resolution)); + } + + #[tokio::test] + async fn inherited_environment_reuses_parent_handle() { + let cwd = AbsolutePathBuf::current_dir().expect("cwd"); + let selection = TurnEnvironmentSelection { + environment_id: REMOTE_ENVIRONMENT_ID.to_string(), + cwd: PathUri::from_abs_path(&cwd), + }; + let inherited_environment = Arc::new( + Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string())) + .expect("inherited environment"), + ); + let inherited = TurnEnvironment::new( + selection.environment_id.clone(), + Arc::clone(&inherited_environment), + selection.cwd.clone(), + /*shell*/ None, + ); + let manager = Arc::new(EnvironmentManager::without_environments()); + manager + .upsert_environment( + REMOTE_ENVIRONMENT_ID.to_string(), + "ws://127.0.0.1:9876".to_string(), + ) + .expect("replacement environment"); + let environments = ThreadEnvironments::new( + manager, + crate::shell::default_user_shell(), + ShellSnapshot::disabled(), + TurnEnvironmentSnapshot { + turn_environments: vec![inherited], + starting: Vec::new(), + }, + /*non_blocking_snapshots*/ false, + ); + + environments.update_selections(std::slice::from_ref(&selection)); + let snapshot = environments.snapshot().await; assert!(Arc::ptr_eq( - &initial_snapshot - .primary() - .expect("initial environment") - .environment, - &reused_snapshot - .primary() - .expect("reused environment") - .environment, - )); - assert!(!Arc::ptr_eq( - &reused_snapshot - .primary() - .expect("reused environment") - .environment, - &changed_snapshot + &snapshot .primary() - .expect("changed environment") + .expect("inherited environment") .environment, + &inherited_environment, )); } @@ -573,6 +852,7 @@ url = "ws://127.0.0.1:8765" cwd_uri.clone(), /*shell*/ None, )], + starting: Vec::new(), }; let multiple = TurnEnvironmentSnapshot { turn_environments: vec![ @@ -584,6 +864,7 @@ url = "ws://127.0.0.1:8765" /*shell*/ None, ), ], + starting: Vec::new(), }; assert_eq!(local.single_local_environment_cwd(), Some(cwd)); diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 583882cae1ef..1e8c250bc968 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -826,6 +826,7 @@ impl Session { default_shell.clone(), shell_snapshot, inherited_environments.unwrap_or_default(), + config.features.enabled(Feature::DeferredExecutor), )); turn_environments.update_selections(session_configuration.environment_selections()); let resolved_environments = turn_environments.snapshot().await; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 436b85907d05..e7532a1e57b4 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -4101,6 +4101,7 @@ async fn resolved_environments_for_configuration( default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), + /*non_blocking_snapshots*/ false, ); turn_environments.update_selections(session_configuration.environment_selections()); (environment_manager, turn_environments.snapshot().await) @@ -5010,6 +5011,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { default_user_shell(), ShellSnapshot::disabled(), resolved_environments, + /*non_blocking_snapshots*/ false, )); let environment = Arc::clone( &resolved_turn_environments @@ -7065,6 +7067,7 @@ where default_user_shell(), ShellSnapshot::disabled(), resolved_turn_environments.clone(), + /*non_blocking_snapshots*/ false, )); let environment = Arc::clone( &resolved_turn_environments diff --git a/codex-rs/core/tests/suite/remote_env.rs b/codex-rs/core/tests/suite/remote_env.rs index 63bb5deedac9..67e5668f45b3 100644 --- a/codex-rs/core/tests/suite/remote_env.rs +++ b/codex-rs/core/tests/suite/remote_env.rs @@ -55,6 +55,7 @@ use serde_json::json; use std::fs; use std::path::PathBuf; use std::process::Command; +use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; use tempfile::TempDir; @@ -299,6 +300,39 @@ async fn explicit_remote_shell_runs_in_remote_cwd() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn deferred_executor_reaches_model_before_remote_environment_is_ready() -> Result<()> { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; + let server = start_mock_server().await; + let response_mock = mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "done"), + ev_completed("resp-1"), + ]), + ) + .await; + let mut builder = test_codex() + .with_exec_server_url(format!("ws://{}", listener.local_addr()?)) + .with_config(|config| { + assert!(config.features.enable(Feature::DeferredExecutor).is_ok()); + }); + + let test = tokio::time::timeout(Duration::from_secs(5), builder.build(&server)) + .await + .context("thread startup should not wait for the remote environment")??; + tokio::time::timeout( + Duration::from_secs(5), + test.submit_turn("respond before the environment is ready"), + ) + .await + .context("turn should reach the model before the remote environment is ready")??; + + response_mock.single_request(); + Ok(()) +} + fn absolute_path(path: PathBuf) -> AbsolutePathBuf { AbsolutePathBuf::try_from(path).expect("path should be absolute") } diff --git a/codex-rs/features/src/lib.rs b/codex-rs/features/src/lib.rs index 8808cb37c399..8483ef97bdb2 100644 --- a/codex-rs/features/src/lib.rs +++ b/codex-rs/features/src/lib.rs @@ -124,6 +124,8 @@ pub enum Feature { UseLegacyLandlock, /// Experimental shell snapshotting. ShellSnapshot, + /// Allow turns to start while selected executors are still starting. + DeferredExecutor, /// Enable runtime metrics snapshots via a manual reader. RuntimeMetrics, /// Enable startup memory extraction and file-backed memory consolidation. @@ -816,6 +818,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::Stable, default_enabled: true, }, + FeatureSpec { + id: Feature::DeferredExecutor, + key: "deferred_executor", + stage: Stage::UnderDevelopment, + default_enabled: false, + }, FeatureSpec { id: Feature::JsRepl, key: "js_repl",