From cdace0269a6859ccb3ee9e0e4bb6c85f54f6e8f6 Mon Sep 17 00:00:00 2001 From: Sayan Sisodiya Date: Wed, 17 Jun 2026 00:27:23 -0700 Subject: [PATCH 1/5] core: track starting environments in snapshots --- codex-rs/core/src/agents_md_tests.rs | 6 +- codex-rs/core/src/environment_selection.rs | 482 +++++++++++++++------ 2 files changed, 358 insertions(+), 130 deletions(-) diff --git a/codex-rs/core/src/agents_md_tests.rs b/codex-rs/core/src/agents_md_tests.rs index 0e1a670fc20a..f0d0e3c0ef8d 100644 --- a/codex-rs/core/src/agents_md_tests.rs +++ b/codex-rs/core/src/agents_md_tests.rs @@ -262,8 +262,8 @@ async fn agents_md_paths(config: &TestConfig) -> std::io::Result> { fn resolved_local_environments( environments: [(&str, AbsolutePathBuf); N], ) -> TurnEnvironmentSnapshot { - TurnEnvironmentSnapshot { - turn_environments: environments + TurnEnvironmentSnapshot::from_turn_environments( + environments .into_iter() .map(|(environment_id, cwd)| { TurnEnvironment::new( @@ -277,7 +277,7 @@ fn resolved_local_environments( ) }) .collect(), - } + ) } fn project_provenance(path: AbsolutePathBuf, cwd: AbsolutePathBuf) -> InstructionProvenance { diff --git a/codex-rs/core/src/environment_selection.rs b/codex-rs/core/src/environment_selection.rs index 16b2c47b5dd7..b92161c73290 100644 --- a/codex-rs/core/src/environment_selection.rs +++ b/codex-rs/core/src/environment_selection.rs @@ -2,16 +2,13 @@ use std::collections::HashSet; use std::sync::Arc; use arc_swap::ArcSwap; +use codex_exec_server::Environment; use codex_exec_server::EnvironmentManager; use codex_exec_server::ExecutorFileSystem; -use codex_protocol::error::CodexErr; -use codex_protocol::error::Result as CodexResult; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_path_uri::PathUri; use futures::FutureExt; -use futures::future::BoxFuture; -use futures::future::Shared; use crate::session::turn_context::TurnEnvironment; use crate::shell::Shell; @@ -31,13 +28,17 @@ pub(crate) fn default_thread_environment_selections( .collect() } -type SnapshotTask = Shared>; +#[derive(Clone, Debug)] +pub(crate) struct StartingTurnEnvironment { + pub(crate) selection: TurnEnvironmentSelection, + pub(crate) environment: Arc, +} pub(crate) struct ThreadEnvironments { environment_manager: Arc, local_shell: Shell, shell_snapshot: ShellSnapshot, - snapshot_task: ArcSwap, + snapshot: ArcSwap, } impl ThreadEnvironments { @@ -51,127 +52,180 @@ impl ThreadEnvironments { environment_manager, local_shell, shell_snapshot, - snapshot_task: ArcSwap::from_pointee(futures::future::ready(current).boxed().shared()), + snapshot: ArcSwap::from_pointee(current), } } pub(crate) fn update_selections(&self, environments: &[TurnEnvironmentSelection]) { - let previous = self - .snapshot_task - .load() - .peek() - .cloned() - .unwrap_or_default(); - let environment_manager = Arc::clone(&self.environment_manager); - let local_shell = self.local_shell.clone(); - let shell_snapshot = self.shell_snapshot.clone(); - let environments = environments.to_vec(); - let (snapshot_task, snapshot) = async move { - Self::resolve_snapshot( - environment_manager, - local_shell, - shell_snapshot, - previous, - environments, - ) - .await - } - .remote_handle(); - self.snapshot_task - .store(Arc::new(snapshot.boxed().shared())); - drop(tokio::spawn(snapshot_task)); - } - - async fn resolve_snapshot( - environment_manager: Arc, - local_shell: Shell, - shell_snapshot: ShellSnapshot, - current: TurnEnvironmentSnapshot, - environments: Vec, - ) -> TurnEnvironmentSnapshot { + let previous = self.snapshot.load(); let mut seen_environment_ids = HashSet::with_capacity(environments.len()); let mut turn_environments = Vec::with_capacity(environments.len()); - for selected_environment in &environments { + let mut starting = Vec::with_capacity(environments.len()); + let mut ordered_selections = Vec::with_capacity(environments.len()); + for selected_environment in environments { if !seen_environment_ids.insert(selected_environment.environment_id.as_str()) { continue; } - let turn_environment = match current.turn_environments.iter().find(|environment| { + // Reuse the exact attached or starting environment already selected by this thread. + if let Some(environment) = previous.turn_environments.iter().find(|environment| { environment.environment_id == selected_environment.environment_id && environment.cwd() == &selected_environment.cwd }) { - Some(environment) => environment.clone(), - None => match Self::resolve_selection( - &environment_manager, - &local_shell, - &shell_snapshot, - selected_environment, - ) - .await - { - Ok(environment) => environment, - Err(err) => { - tracing::warn!( - "skipping unresolved turn environment `{}`: {err}", - selected_environment.environment_id - ); - continue; - } - }, + turn_environments.push(environment.clone()); + ordered_selections.push(selected_environment.clone()); + continue; + } + if let Some(environment) = previous + .starting + .iter() + .find(|environment| environment.selection == *selected_environment) + { + starting.push(environment.clone()); + ordered_selections.push(selected_environment.clone()); + continue; + } + + // Only new selections consult the manager; reused selections keep their stable handle. + let environment_id = &selected_environment.environment_id; + let Some(environment) = self.environment_manager.get_environment(environment_id) else { + tracing::warn!("skipping unknown turn environment `{environment_id}`"); + continue; }; - turn_environments.push(turn_environment); + if environment.is_remote() { + // Connect in the background and leave attachment to a later snapshot. + environment.start_connecting(); + starting.push(StartingTurnEnvironment { + selection: selected_environment.clone(), + environment, + }); + } else { + turn_environments.push(self.build_turn_environment( + selected_environment, + environment, + Some(self.local_shell.clone()), + )); + } + ordered_selections.push(selected_environment.clone()); } - TurnEnvironmentSnapshot { turn_environments } + self.snapshot.store(Arc::new(TurnEnvironmentSnapshot { + turn_environments, + starting, + ordered_selections, + })); } - async fn resolve_selection( - environment_manager: &EnvironmentManager, - local_shell: &Shell, - shell_snapshot: &ShellSnapshot, - selected_environment: &TurnEnvironmentSelection, - ) -> CodexResult { - let environment_id = selected_environment.environment_id.clone(); - let environment = environment_manager - .get_environment(&environment_id) - .ok_or_else(|| { - CodexErr::InvalidRequest(format!("unknown turn environment id `{environment_id}`")) - })?; - let shell = if environment.is_remote() { - match environment.info().await { - Ok(info) => match Shell::from_environment_shell_info(info.shell) { - Ok(shell) => Some(shell), - Err(err) => { - tracing::warn!( - "failed to resolve shell for environment `{environment_id}`: {err}" - ); - None - } - }, + async fn resolve_starting_environment( + &self, + starting: &StartingTurnEnvironment, + ) -> TurnEnvironment { + let environment_id = &starting.selection.environment_id; + let shell = match starting.environment.info().boxed().await { + Ok(info) => match Shell::from_environment_shell_info(info.shell) { + Ok(shell) => Some(shell), Err(err) => { - tracing::warn!("failed to get info for environment `{environment_id}`: {err}"); + tracing::warn!( + "failed to resolve shell for environment `{environment_id}`: {err}" + ); None } + }, + Err(err) => { + tracing::warn!("failed to get info for environment `{environment_id}`: {err}"); + None } - } else { - Some(local_shell.clone()) }; + self.build_turn_environment( + &starting.selection, + Arc::clone(&starting.environment), + shell, + ) + } + + fn build_turn_environment( + &self, + selected_environment: &TurnEnvironmentSelection, + environment: Arc, + shell: Option, + ) -> TurnEnvironment { let mut turn_environment = TurnEnvironment::new( - environment_id, + selected_environment.environment_id.clone(), environment, selected_environment.cwd.clone(), shell, ); - let task = shell_snapshot + let task = self + .shell_snapshot .clone() .build(turn_environment.clone()) .boxed() .shared(); drop(tokio::spawn(task.clone())); turn_environment.shell_snapshot = task; - Ok(turn_environment) + turn_environment } pub(crate) async fn snapshot(&self) -> TurnEnvironmentSnapshot { - self.snapshot_task.load_full().as_ref().clone().await + loop { + let current = self.snapshot.load_full(); + if current.starting.is_empty() { + return current.as_ref().clone(); + } + + // Rebuild both lists in configured order while promoting completed startups. + let mut changed = false; + let mut turn_environments = Vec::with_capacity(current.ordered_selections.len()); + let mut starting = Vec::with_capacity(current.starting.len()); + for selection in ¤t.ordered_selections { + if let Some(environment) = current.turn_environments.iter().find(|environment| { + environment.environment_id == selection.environment_id + && environment.cwd() == &selection.cwd + }) { + turn_environments.push(environment.clone()); + continue; + } + let Some(environment) = current + .starting + .iter() + .find(|environment| environment.selection == *selection) + else { + continue; + }; + if !environment.environment.startup_finished() { + // Never wait for an environment whose startup is still running. + starting.push(environment.clone()); + continue; + } + + changed = true; + // Startup finished, so this only reads its saved success or failure. + match environment.environment.wait_until_ready().boxed().await { + Ok(()) => { + turn_environments + .push(self.resolve_starting_environment(environment).await); + } + Err(err) => { + tracing::warn!( + "turn environment `{}` failed to start: {err}", + environment.selection.environment_id + ); + } + } + } + if !changed { + return current.as_ref().clone(); + } + + let next = Arc::new(TurnEnvironmentSnapshot { + turn_environments, + starting, + ordered_selections: current.ordered_selections.clone(), + }); + // Do not overwrite selections changed while shell resolution was in flight. + let previous = self.snapshot.compare_and_swap(¤t, Arc::clone(&next)); + if Arc::ptr_eq(&previous, ¤t) { + return next.as_ref().clone(); + } + } } pub(crate) fn environment_manager(&self) -> Arc { @@ -182,9 +236,25 @@ impl ThreadEnvironments { #[derive(Clone, Debug, Default)] pub(crate) struct TurnEnvironmentSnapshot { pub(crate) turn_environments: Vec, + pub(crate) starting: Vec, + // Attached and starting environments are stored separately, so retain their configured order. + ordered_selections: Vec, } impl TurnEnvironmentSnapshot { + #[cfg(test)] + pub(crate) fn from_turn_environments(turn_environments: Vec) -> Self { + let ordered_selections = turn_environments + .iter() + .map(TurnEnvironment::selection) + .collect(); + Self { + turn_environments, + starting: Vec::new(), + ordered_selections, + } + } + pub(crate) fn primary(&self) -> Option<&TurnEnvironment> { self.turn_environments.first() } @@ -202,10 +272,7 @@ impl TurnEnvironmentSnapshot { } pub(crate) fn to_selections(&self) -> Vec { - self.turn_environments - .iter() - .map(TurnEnvironment::selection) - .collect() + self.ordered_selections.clone() } pub(crate) fn primary_filesystem(&self) -> Option> { @@ -237,7 +304,16 @@ mod tests { use codex_protocol::protocol::TurnEnvironmentSelection; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_path_uri::PathUri; + use futures::SinkExt; + use futures::StreamExt; use pretty_assertions::assert_eq; + use serde_json::Value; + use tokio::net::TcpListener; + use tokio::net::TcpStream; + use tokio::time::timeout; + use tokio_tungstenite::WebSocketStream; + use tokio_tungstenite::accept_async; + use tokio_tungstenite::tungstenite::Message; use super::*; @@ -264,6 +340,61 @@ mod tests { .expect("runtime paths") } + async fn read_websocket_json(websocket: &mut WebSocketStream) -> Value { + loop { + match timeout(std::time::Duration::from_secs(5), websocket.next()) + .await + .expect("websocket read should not time out") + .expect("websocket should stay open") + .expect("websocket frame should read") + { + Message::Text(text) => { + return serde_json::from_str(text.as_ref()).expect("valid JSON-RPC message"); + } + Message::Binary(bytes) => { + return serde_json::from_slice(bytes.as_ref()).expect("valid JSON-RPC message"); + } + Message::Ping(_) | Message::Pong(_) => {} + other => panic!("expected JSON-RPC message, got {other:?}"), + } + } + } + + async fn serve_environment_info(listener: TcpListener) { + let (stream, _) = listener.accept().await.expect("connection"); + let mut websocket = accept_async(stream).await.expect("websocket handshake"); + + let initialize = read_websocket_json(&mut websocket).await; + assert_eq!(initialize["method"], "initialize"); + websocket + .send(Message::Text( + serde_json::json!({ + "id": initialize["id"], + "result": { "sessionId": "test-session" } + }) + .to_string() + .into(), + )) + .await + .expect("initialize response"); + let initialized = read_websocket_json(&mut websocket).await; + assert_eq!(initialized["method"], "initialized"); + + let info = read_websocket_json(&mut websocket).await; + assert_eq!(info["method"], "environment/info"); + websocket + .send(Message::Text( + serde_json::json!({ + "id": info["id"], + "result": { "shell": { "name": "zsh", "path": "/bin/zsh" } } + }) + .to_string() + .into(), + )) + .await + .expect("environment info response"); + } + #[tokio::test] async fn default_thread_environment_selections_use_manager_default_id() { let cwd = AbsolutePathBuf::current_dir().expect("cwd"); @@ -447,6 +578,84 @@ url = "ws://127.0.0.1:8765" assert_eq!(resolved.snapshot().await.to_selections(), vec![local]); } + #[tokio::test] + async fn snapshot_keeps_starting_environment_until_it_can_be_attached() { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind websocket listener"); + let manager = Arc::new( + EnvironmentManager::create_for_tests_with_local( + Some(format!( + "ws://{}", + listener.local_addr().expect("listener address") + )), + test_runtime_paths(), + ) + .await, + ); + let cwd = AbsolutePathBuf::current_dir().expect("cwd"); + let cwd = PathUri::from_abs_path(&cwd); + let remote = TurnEnvironmentSelection { + environment_id: REMOTE_ENVIRONMENT_ID.to_string(), + cwd: cwd.clone(), + }; + let local = TurnEnvironmentSelection { + environment_id: LOCAL_ENVIRONMENT_ID.to_string(), + cwd, + }; + let turn_environments = ThreadEnvironments::new( + manager, + crate::shell::default_user_shell(), + ShellSnapshot::disabled(), + TurnEnvironmentSnapshot::default(), + ); + turn_environments.update_selections(&[remote.clone(), local.clone()]); + + let starting = turn_environments.snapshot().await; + assert_eq!( + starting + .turn_environments + .iter() + .map(TurnEnvironment::selection) + .collect::>(), + vec![local.clone()] + ); + assert_eq!( + starting + .starting + .iter() + .map(|environment| environment.selection.clone()) + .collect::>(), + vec![remote.clone()] + ); + assert_eq!( + starting.to_selections(), + vec![remote.clone(), local.clone()] + ); + + let server = tokio::spawn(serve_environment_info(listener)); + timeout( + std::time::Duration::from_secs(5), + starting.starting[0].environment.wait_until_ready(), + ) + .await + .expect("environment startup should finish") + .expect("environment startup should succeed"); + let attached = turn_environments.snapshot().await; + + assert!(attached.starting.is_empty()); + assert_eq!( + attached + .turn_environments + .iter() + .map(TurnEnvironment::selection) + .collect::>(), + vec![remote.clone(), local.clone()] + ); + assert_eq!(attached.to_selections(), vec![remote, local]); + server.await.expect("server task"); + } + #[tokio::test] async fn latest_environment_update_wins_while_previous_resolution_is_pending() { let listener = tokio::net::TcpListener::bind("127.0.0.1:0") @@ -495,11 +704,17 @@ url = "ws://127.0.0.1:8765" } #[tokio::test] - async fn matching_environment_id_and_cwd_reuse_resolved_environment() { + async fn matching_environment_id_and_cwd_reuse_starting_environment() { let cwd = AbsolutePathBuf::current_dir().expect("cwd"); + let first_listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind first listener"); let manager = Arc::new( EnvironmentManager::create_for_tests( - Some("ws://127.0.0.1:8765".to_string()), + Some(format!( + "ws://{}", + first_listener.local_addr().expect("first listener address") + )), Some(test_runtime_paths()), ) .await, @@ -508,41 +723,58 @@ url = "ws://127.0.0.1:8765" environment_id: REMOTE_ENVIRONMENT_ID.to_string(), cwd: PathUri::from_abs_path(&cwd), }; - let initial = - resolve_turn_environments(Arc::clone(&manager), std::slice::from_ref(&selection)).await; + let environments = ThreadEnvironments::new( + Arc::clone(&manager), + crate::shell::default_user_shell(), + ShellSnapshot::disabled(), + TurnEnvironmentSnapshot::default(), + ); + environments.update_selections(std::slice::from_ref(&selection)); + let initial_snapshot = environments.snapshot().await; + let second_listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind second listener"); manager .upsert_environment( REMOTE_ENVIRONMENT_ID.to_string(), - "ws://127.0.0.1:9876".to_string(), + format!( + "ws://{}", + second_listener + .local_addr() + .expect("second listener address") + ), ) .expect("replace environment"); - let initial_snapshot = initial.snapshot().await; - initial.update_selections(std::slice::from_ref(&selection)); - let reused_snapshot = initial.snapshot().await; - initial.update_selections(&[TurnEnvironmentSelection { + environments.update_selections(std::slice::from_ref(&selection)); + let reused_snapshot = environments.snapshot().await; + environments.update_selections(&[TurnEnvironmentSelection { cwd: PathUri::from_abs_path(&cwd.join("changed")), ..selection }]); - let changed_snapshot = initial.snapshot().await; + let changed_snapshot = environments.snapshot().await; assert!(Arc::ptr_eq( &initial_snapshot - .primary() + .starting + .first() .expect("initial environment") .environment, &reused_snapshot - .primary() + .starting + .first() .expect("reused environment") .environment, )); assert!(!Arc::ptr_eq( &reused_snapshot - .primary() + .starting + .first() .expect("reused environment") .environment, &changed_snapshot - .primary() + .starting + .first() .expect("changed environment") .environment, )); @@ -566,25 +798,21 @@ url = "ws://127.0.0.1:8765" Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string())) .expect("remote environment"), ); - let remote = TurnEnvironmentSnapshot { - turn_environments: vec![TurnEnvironment::new( + let remote = TurnEnvironmentSnapshot::from_turn_environments(vec![TurnEnvironment::new( + REMOTE_ENVIRONMENT_ID.to_string(), + remote_environment.clone(), + cwd_uri.clone(), + /*shell*/ None, + )]); + let multiple = TurnEnvironmentSnapshot::from_turn_environments(vec![ + local.primary().expect("local environment").clone(), + TurnEnvironment::new( REMOTE_ENVIRONMENT_ID.to_string(), - remote_environment.clone(), - cwd_uri.clone(), + remote_environment, + cwd_uri, /*shell*/ None, - )], - }; - let multiple = TurnEnvironmentSnapshot { - turn_environments: vec![ - local.primary().expect("local environment").clone(), - TurnEnvironment::new( - REMOTE_ENVIRONMENT_ID.to_string(), - remote_environment, - cwd_uri, - /*shell*/ None, - ), - ], - }; + ), + ]); assert_eq!(local.single_local_environment_cwd(), Some(cwd)); assert_eq!(remote.single_local_environment_cwd(), None); From 7df846ed305d7d0fab0d0e35bbf9343e26a47000 Mon Sep 17 00:00:00 2001 From: Sayan Sisodiya Date: Wed, 17 Jun 2026 21:02:24 -0700 Subject: [PATCH 2/5] core: simplify starting environment resolution --- codex-rs/core/src/environment_selection.rs | 370 +++++++++++---------- 1 file changed, 189 insertions(+), 181 deletions(-) diff --git a/codex-rs/core/src/environment_selection.rs b/codex-rs/core/src/environment_selection.rs index b92161c73290..6e7da8f4ab6d 100644 --- a/codex-rs/core/src/environment_selection.rs +++ b/codex-rs/core/src/environment_selection.rs @@ -1,14 +1,18 @@ use std::collections::HashSet; +use std::fmt; use std::sync::Arc; use arc_swap::ArcSwap; use codex_exec_server::Environment; use codex_exec_server::EnvironmentManager; +use codex_exec_server::ExecServerError; use codex_exec_server::ExecutorFileSystem; use codex_protocol::protocol::TurnEnvironmentSelection; use codex_utils_absolute_path::AbsolutePathBuf; use codex_utils_path_uri::PathUri; use futures::FutureExt; +use futures::future::BoxFuture; +use futures::future::Shared; use crate::session::turn_context::TurnEnvironment; use crate::shell::Shell; @@ -28,17 +32,38 @@ pub(crate) fn default_thread_environment_selections( .collect() } -#[derive(Clone, Debug)] +type TurnEnvironmentResolution = + Shared>>>; + +#[derive(Clone)] +struct SelectedTurnEnvironment { + selection: TurnEnvironmentSelection, + // Starting environments resolve in the background without blocking snapshots. + delayed_start: bool, + resolution: TurnEnvironmentResolution, +} + +#[derive(Clone)] pub(crate) struct StartingTurnEnvironment { pub(crate) selection: TurnEnvironmentSelection, - pub(crate) environment: Arc, + resolution: TurnEnvironmentResolution, +} + +impl fmt::Debug for StartingTurnEnvironment { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter + .debug_struct("StartingTurnEnvironment") + .field("selection", &self.selection) + .field("resolved", &self.resolution.peek().is_some()) + .finish_non_exhaustive() + } } pub(crate) struct ThreadEnvironments { environment_manager: Arc, local_shell: Shell, shell_snapshot: ShellSnapshot, - snapshot: ArcSwap, + environments: ArcSwap>, } impl ThreadEnvironments { @@ -48,184 +73,141 @@ impl ThreadEnvironments { shell_snapshot: ShellSnapshot, current: TurnEnvironmentSnapshot, ) -> Self { + // Reuse only attached environments from the supplied snapshot; drop starting entries. + let environments = current + .turn_environments + .into_iter() + .map(|environment| { + let selection = environment.selection(); + let resolution: TurnEnvironmentResolution = + futures::future::ready(Ok(environment)).boxed().shared(); + SelectedTurnEnvironment { + selection, + delayed_start: false, + resolution, + } + }) + .collect(); Self { environment_manager, local_shell, shell_snapshot, - snapshot: ArcSwap::from_pointee(current), + environments: ArcSwap::from_pointee(environments), } } pub(crate) fn update_selections(&self, environments: &[TurnEnvironmentSelection]) { - let previous = self.snapshot.load(); + let previous = self.environments.load(); let mut seen_environment_ids = HashSet::with_capacity(environments.len()); - let mut turn_environments = Vec::with_capacity(environments.len()); - let mut starting = Vec::with_capacity(environments.len()); - let mut ordered_selections = Vec::with_capacity(environments.len()); + let mut next = Vec::with_capacity(environments.len()); + let mut new_resolutions = Vec::new(); for selected_environment in environments { if !seen_environment_ids.insert(selected_environment.environment_id.as_str()) { continue; } - // Reuse the exact attached or starting environment already selected by this thread. - if let Some(environment) = previous.turn_environments.iter().find(|environment| { - environment.environment_id == selected_environment.environment_id - && environment.cwd() == &selected_environment.cwd - }) { - turn_environments.push(environment.clone()); - ordered_selections.push(selected_environment.clone()); - continue; - } if let Some(environment) = previous - .starting .iter() .find(|environment| environment.selection == *selected_environment) { - starting.push(environment.clone()); - ordered_selections.push(selected_environment.clone()); + next.push(environment.clone()); continue; } - // Only new selections consult the manager; reused selections keep their stable handle. let environment_id = &selected_environment.environment_id; let Some(environment) = self.environment_manager.get_environment(environment_id) else { tracing::warn!("skipping unknown turn environment `{environment_id}`"); continue; }; - if environment.is_remote() { - // Connect in the background and leave attachment to a later snapshot. - environment.start_connecting(); - starting.push(StartingTurnEnvironment { - selection: selected_environment.clone(), - environment, - }); - } else { - turn_environments.push(self.build_turn_environment( - selected_environment, - environment, - Some(self.local_shell.clone()), - )); - } - ordered_selections.push(selected_environment.clone()); + let delayed_start = environment.is_remote() && !environment.startup_finished(); + let resolution = Self::resolve_environment( + selected_environment.clone(), + environment, + self.local_shell.clone(), + self.shell_snapshot.clone(), + ); + new_resolutions.push(resolution.clone()); + next.push(SelectedTurnEnvironment { + selection: selected_environment.clone(), + delayed_start, + resolution, + }); + } + self.environments.store(Arc::new(next)); + for resolution in new_resolutions { + drop(tokio::spawn(resolution)); } - self.snapshot.store(Arc::new(TurnEnvironmentSnapshot { - turn_environments, - starting, - ordered_selections, - })); - } - - async fn resolve_starting_environment( - &self, - starting: &StartingTurnEnvironment, - ) -> TurnEnvironment { - let environment_id = &starting.selection.environment_id; - let shell = match starting.environment.info().boxed().await { - Ok(info) => match Shell::from_environment_shell_info(info.shell) { - Ok(shell) => Some(shell), - Err(err) => { - tracing::warn!( - "failed to resolve shell for environment `{environment_id}`: {err}" - ); - None - } - }, - Err(err) => { - tracing::warn!("failed to get info for environment `{environment_id}`: {err}"); - None - } - }; - self.build_turn_environment( - &starting.selection, - Arc::clone(&starting.environment), - shell, - ) } - fn build_turn_environment( - &self, - selected_environment: &TurnEnvironmentSelection, + fn resolve_environment( + selection: TurnEnvironmentSelection, environment: Arc, - shell: Option, - ) -> TurnEnvironment { - let mut turn_environment = TurnEnvironment::new( - selected_environment.environment_id.clone(), - environment, - selected_environment.cwd.clone(), - shell, - ); - let task = self - .shell_snapshot - .clone() - .build(turn_environment.clone()) - .boxed() - .shared(); - drop(tokio::spawn(task.clone())); - turn_environment.shell_snapshot = task; - turn_environment - } - - pub(crate) async fn snapshot(&self) -> TurnEnvironmentSnapshot { - loop { - let current = self.snapshot.load_full(); - if current.starting.is_empty() { - return current.as_ref().clone(); + local_shell: Shell, + shell_snapshot: ShellSnapshot, + ) -> TurnEnvironmentResolution { + async move { + let environment_id = &selection.environment_id; + if let Err(err) = environment.wait_until_ready().await { + tracing::warn!("turn environment `{environment_id}` failed to start: {err}"); + return Err(Arc::new(err)); } - - // Rebuild both lists in configured order while promoting completed startups. - let mut changed = false; - let mut turn_environments = Vec::with_capacity(current.ordered_selections.len()); - let mut starting = Vec::with_capacity(current.starting.len()); - for selection in ¤t.ordered_selections { - if let Some(environment) = current.turn_environments.iter().find(|environment| { - environment.environment_id == selection.environment_id - && environment.cwd() == &selection.cwd - }) { - turn_environments.push(environment.clone()); - continue; - } - let Some(environment) = current - .starting - .iter() - .find(|environment| environment.selection == *selection) - else { - continue; - }; - if !environment.environment.startup_finished() { - // Never wait for an environment whose startup is still running. - starting.push(environment.clone()); - continue; - } - - changed = true; - // Startup finished, so this only reads its saved success or failure. - match environment.environment.wait_until_ready().boxed().await { - Ok(()) => { - turn_environments - .push(self.resolve_starting_environment(environment).await); - } + let shell = if environment.is_remote() { + match environment.info().await { + Ok(info) => match Shell::from_environment_shell_info(info.shell) { + Ok(shell) => Some(shell), + Err(err) => { + tracing::warn!( + "failed to resolve shell for environment `{environment_id}`: {err}" + ); + None + } + }, Err(err) => { tracing::warn!( - "turn environment `{}` failed to start: {err}", - environment.selection.environment_id + "failed to get info for environment `{environment_id}`: {err}" ); + None } } - } - if !changed { - return current.as_ref().clone(); - } + } else { + Some(local_shell) + }; + let mut turn_environment = + TurnEnvironment::new(selection.environment_id, environment, selection.cwd, shell); + let task = shell_snapshot + .build(turn_environment.clone()) + .boxed() + .shared(); + drop(tokio::spawn(task.clone())); + turn_environment.shell_snapshot = task; + Ok(turn_environment) + } + .boxed() + .shared() + } - let next = Arc::new(TurnEnvironmentSnapshot { - turn_environments, - starting, - ordered_selections: current.ordered_selections.clone(), - }); - // Do not overwrite selections changed while shell resolution was in flight. - let previous = self.snapshot.compare_and_swap(¤t, Arc::clone(&next)); - if Arc::ptr_eq(&previous, ¤t) { - return next.as_ref().clone(); + pub(crate) async fn snapshot(&self) -> TurnEnvironmentSnapshot { + let current = self.environments.load_full(); + let mut turn_environments = Vec::with_capacity(current.len()); + let mut starting = Vec::new(); + for environment in current.iter() { + let resolved = if environment.delayed_start { + environment.resolution.peek().cloned() + } else { + Some(environment.resolution.clone().await) + }; + match resolved { + Some(Ok(turn_environment)) => turn_environments.push(turn_environment), + Some(Err(_)) => {} + None => starting.push(StartingTurnEnvironment { + selection: environment.selection.clone(), + resolution: environment.resolution.clone(), + }), } } + TurnEnvironmentSnapshot { + turn_environments, + starting, + } } pub(crate) fn environment_manager(&self) -> Arc { @@ -237,21 +219,14 @@ impl ThreadEnvironments { pub(crate) struct TurnEnvironmentSnapshot { pub(crate) turn_environments: Vec, pub(crate) starting: Vec, - // Attached and starting environments are stored separately, so retain their configured order. - ordered_selections: Vec, } impl TurnEnvironmentSnapshot { #[cfg(test)] pub(crate) fn from_turn_environments(turn_environments: Vec) -> Self { - let ordered_selections = turn_environments - .iter() - .map(TurnEnvironment::selection) - .collect(); Self { turn_environments, starting: Vec::new(), - ordered_selections, } } @@ -272,7 +247,10 @@ impl TurnEnvironmentSnapshot { } pub(crate) fn to_selections(&self) -> Vec { - self.ordered_selections.clone() + self.turn_environments + .iter() + .map(TurnEnvironment::selection) + .collect() } pub(crate) fn primary_filesystem(&self) -> Option> { @@ -628,19 +606,16 @@ url = "ws://127.0.0.1:8765" .collect::>(), vec![remote.clone()] ); - assert_eq!( - starting.to_selections(), - vec![remote.clone(), local.clone()] - ); + assert_eq!(starting.to_selections(), vec![local.clone()]); let server = tokio::spawn(serve_environment_info(listener)); timeout( std::time::Duration::from_secs(5), - starting.starting[0].environment.wait_until_ready(), + starting.starting[0].resolution.clone(), ) .await - .expect("environment startup should finish") - .expect("environment startup should succeed"); + .expect("environment resolution should finish") + .expect("environment resolution should succeed"); let attached = turn_environments.snapshot().await; assert!(attached.starting.is_empty()); @@ -657,7 +632,7 @@ url = "ws://127.0.0.1:8765" } #[tokio::test] - async fn latest_environment_update_wins_while_previous_resolution_is_pending() { + async fn latest_environment_update_replaces_pending_resolution() { let listener = tokio::net::TcpListener::bind("127.0.0.1:0") .await .expect("bind websocket listener"); @@ -704,7 +679,7 @@ url = "ws://127.0.0.1:8765" } #[tokio::test] - async fn matching_environment_id_and_cwd_reuse_starting_environment() { + async fn matching_environment_id_and_cwd_reuse_resolution() { let cwd = AbsolutePathBuf::current_dir().expect("cwd"); let first_listener = TcpListener::bind("127.0.0.1:0") .await @@ -754,29 +729,62 @@ url = "ws://127.0.0.1:8765" }]); let changed_snapshot = environments.snapshot().await; + let initial = initial_snapshot + .starting + .first() + .expect("initial environment"); + let reused = reused_snapshot + .starting + .first() + .expect("reused environment"); + let changed = changed_snapshot + .starting + .first() + .expect("changed environment"); + assert!(initial.resolution.ptr_eq(&reused.resolution)); + assert!(!reused.resolution.ptr_eq(&changed.resolution)); + } + + #[tokio::test] + async fn inherited_environment_reuses_parent_handle() { + let cwd = AbsolutePathBuf::current_dir().expect("cwd"); + let selection = TurnEnvironmentSelection { + environment_id: REMOTE_ENVIRONMENT_ID.to_string(), + cwd: PathUri::from_abs_path(&cwd), + }; + let inherited_environment = Arc::new( + Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string())) + .expect("inherited environment"), + ); + let inherited = TurnEnvironment::new( + selection.environment_id.clone(), + Arc::clone(&inherited_environment), + selection.cwd.clone(), + /*shell*/ None, + ); + let manager = Arc::new(EnvironmentManager::without_environments()); + manager + .upsert_environment( + REMOTE_ENVIRONMENT_ID.to_string(), + "ws://127.0.0.1:9876".to_string(), + ) + .expect("replacement environment"); + let environments = ThreadEnvironments::new( + manager, + crate::shell::default_user_shell(), + ShellSnapshot::disabled(), + TurnEnvironmentSnapshot::from_turn_environments(vec![inherited]), + ); + + environments.update_selections(std::slice::from_ref(&selection)); + let snapshot = environments.snapshot().await; + assert!(Arc::ptr_eq( - &initial_snapshot - .starting - .first() - .expect("initial environment") - .environment, - &reused_snapshot - .starting - .first() - .expect("reused environment") - .environment, - )); - assert!(!Arc::ptr_eq( - &reused_snapshot - .starting - .first() - .expect("reused environment") - .environment, - &changed_snapshot - .starting - .first() - .expect("changed environment") + &snapshot + .primary() + .expect("inherited environment") .environment, + &inherited_environment, )); } From ec14591848089381d1cc0ee33f0c4662445fa747 Mon Sep 17 00:00:00 2001 From: Sayan Sisodiya Date: Wed, 17 Jun 2026 21:23:12 -0700 Subject: [PATCH 3/5] core: inline environment resolution startup --- codex-rs/core/src/environment_selection.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/codex-rs/core/src/environment_selection.rs b/codex-rs/core/src/environment_selection.rs index 6e7da8f4ab6d..ddd4ef4f38b4 100644 --- a/codex-rs/core/src/environment_selection.rs +++ b/codex-rs/core/src/environment_selection.rs @@ -100,7 +100,6 @@ impl ThreadEnvironments { let previous = self.environments.load(); let mut seen_environment_ids = HashSet::with_capacity(environments.len()); let mut next = Vec::with_capacity(environments.len()); - let mut new_resolutions = Vec::new(); for selected_environment in environments { if !seen_environment_ids.insert(selected_environment.environment_id.as_str()) { continue; @@ -118,14 +117,14 @@ impl ThreadEnvironments { tracing::warn!("skipping unknown turn environment `{environment_id}`"); continue; }; - let delayed_start = environment.is_remote() && !environment.startup_finished(); + let delayed_start = !environment.startup_finished(); let resolution = Self::resolve_environment( selected_environment.clone(), environment, self.local_shell.clone(), self.shell_snapshot.clone(), ); - new_resolutions.push(resolution.clone()); + drop(tokio::spawn(resolution.clone())); next.push(SelectedTurnEnvironment { selection: selected_environment.clone(), delayed_start, @@ -133,9 +132,6 @@ impl ThreadEnvironments { }); } self.environments.store(Arc::new(next)); - for resolution in new_resolutions { - drop(tokio::spawn(resolution)); - } } fn resolve_environment( From 1698a51d3b7addd77fceab499dfa0fd4dc94bac0 Mon Sep 17 00:00:00 2001 From: Sayan Sisodiya Date: Thu, 18 Jun 2026 15:10:02 -0700 Subject: [PATCH 4/5] core: gate deferred environment snapshots --- codex-rs/core/config.schema.json | 6 + codex-rs/core/src/agents_md_tests.rs | 7 +- codex-rs/core/src/environment_selection.rs | 201 +++++++++++++++++---- codex-rs/core/src/session/session.rs | 6 + codex-rs/core/src/session/tests.rs | 4 + codex-rs/core/tests/suite/remote_env.rs | 34 ++++ codex-rs/features/src/lib.rs | 8 + 7 files changed, 229 insertions(+), 37 deletions(-) diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 0568f20e5faf..6eff342082f8 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -479,6 +479,9 @@ "default_mode_request_user_input": { "type": "boolean" }, + "deferred_executor": { + "type": "boolean" + }, "elevated_windows_sandbox": { "type": "boolean" }, @@ -4712,6 +4715,9 @@ "default_mode_request_user_input": { "type": "boolean" }, + "deferred_executor": { + "type": "boolean" + }, "elevated_windows_sandbox": { "type": "boolean" }, diff --git a/codex-rs/core/src/agents_md_tests.rs b/codex-rs/core/src/agents_md_tests.rs index f0d0e3c0ef8d..0122910c3b94 100644 --- a/codex-rs/core/src/agents_md_tests.rs +++ b/codex-rs/core/src/agents_md_tests.rs @@ -262,8 +262,8 @@ async fn agents_md_paths(config: &TestConfig) -> std::io::Result> { fn resolved_local_environments( environments: [(&str, AbsolutePathBuf); N], ) -> TurnEnvironmentSnapshot { - TurnEnvironmentSnapshot::from_turn_environments( - environments + TurnEnvironmentSnapshot { + turn_environments: environments .into_iter() .map(|(environment_id, cwd)| { TurnEnvironment::new( @@ -277,7 +277,8 @@ fn resolved_local_environments( ) }) .collect(), - ) + starting: Vec::new(), + } } fn project_provenance(path: AbsolutePathBuf, cwd: AbsolutePathBuf) -> InstructionProvenance { diff --git a/codex-rs/core/src/environment_selection.rs b/codex-rs/core/src/environment_selection.rs index ddd4ef4f38b4..996a216162d0 100644 --- a/codex-rs/core/src/environment_selection.rs +++ b/codex-rs/core/src/environment_selection.rs @@ -13,6 +13,7 @@ use codex_utils_path_uri::PathUri; use futures::FutureExt; use futures::future::BoxFuture; use futures::future::Shared; +use tokio_util::task::AbortOnDropHandle; use crate::session::turn_context::TurnEnvironment; use crate::shell::Shell; @@ -35,12 +36,20 @@ pub(crate) fn default_thread_environment_selections( type TurnEnvironmentResolution = Shared>>>; +#[derive(Clone, Copy)] +pub(crate) enum EnvironmentSnapshotMode { + Blocking, + NonBlocking, +} + #[derive(Clone)] struct SelectedTurnEnvironment { selection: TurnEnvironmentSelection, - // Starting environments resolve in the background without blocking snapshots. + // True when the environment was still starting when this resolution was created. delayed_start: bool, resolution: TurnEnvironmentResolution, + // Dropping the selected entry cancels resolution work that is no longer needed. + _resolution_task: Option>>, } #[derive(Clone)] @@ -63,6 +72,7 @@ pub(crate) struct ThreadEnvironments { environment_manager: Arc, local_shell: Shell, shell_snapshot: ShellSnapshot, + snapshot_mode: EnvironmentSnapshotMode, environments: ArcSwap>, } @@ -72,6 +82,7 @@ impl ThreadEnvironments { local_shell: Shell, shell_snapshot: ShellSnapshot, current: TurnEnvironmentSnapshot, + snapshot_mode: EnvironmentSnapshotMode, ) -> Self { // Reuse only attached environments from the supplied snapshot; drop starting entries. let environments = current @@ -85,6 +96,7 @@ impl ThreadEnvironments { selection, delayed_start: false, resolution, + _resolution_task: None, } }) .collect(); @@ -92,6 +104,7 @@ impl ThreadEnvironments { environment_manager, local_shell, shell_snapshot, + snapshot_mode, environments: ArcSwap::from_pointee(environments), } } @@ -107,6 +120,7 @@ impl ThreadEnvironments { if let Some(environment) = previous .iter() .find(|environment| environment.selection == *selected_environment) + && !matches!(environment.resolution.peek(), Some(Err(_))) { next.push(environment.clone()); continue; @@ -124,11 +138,17 @@ impl ThreadEnvironments { self.local_shell.clone(), self.shell_snapshot.clone(), ); - drop(tokio::spawn(resolution.clone())); + let resolution_task = Arc::new(AbortOnDropHandle::new(tokio::spawn({ + let resolution = resolution.clone(); + async move { + let _ = resolution.await; + } + }))); next.push(SelectedTurnEnvironment { selection: selected_environment.clone(), delayed_start, resolution, + _resolution_task: Some(resolution_task), }); } self.environments.store(Arc::new(next)); @@ -186,10 +206,12 @@ impl ThreadEnvironments { let mut turn_environments = Vec::with_capacity(current.len()); let mut starting = Vec::new(); for environment in current.iter() { - let resolved = if environment.delayed_start { - environment.resolution.peek().cloned() - } else { - Some(environment.resolution.clone().await) + let resolved = match self.snapshot_mode { + EnvironmentSnapshotMode::Blocking => Some(environment.resolution.clone().await), + EnvironmentSnapshotMode::NonBlocking if environment.delayed_start => { + environment.resolution.peek().cloned() + } + EnvironmentSnapshotMode::NonBlocking => Some(environment.resolution.clone().await), }; match resolved { Some(Ok(turn_environment)) => turn_environments.push(turn_environment), @@ -218,14 +240,6 @@ pub(crate) struct TurnEnvironmentSnapshot { } impl TurnEnvironmentSnapshot { - #[cfg(test)] - pub(crate) fn from_turn_environments(turn_environments: Vec) -> Self { - Self { - turn_environments, - starting: Vec::new(), - } - } - pub(crate) fn primary(&self) -> Option<&TurnEnvironment> { self.turn_environments.first() } @@ -255,6 +269,9 @@ impl TurnEnvironmentSnapshot { } pub(crate) fn single_local_environment(&self) -> Option<&TurnEnvironment> { + if !self.starting.is_empty() { + return None; + } let [environment] = self.turn_environments.as_slice() else { return None; }; @@ -271,6 +288,8 @@ impl TurnEnvironmentSnapshot { #[cfg(test)] mod tests { + use std::time::Duration; + use codex_exec_server::Environment; use codex_exec_server::ExecServerRuntimePaths; use codex_exec_server::LOCAL_ENVIRONMENT_ID; @@ -300,6 +319,7 @@ mod tests { crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), + EnvironmentSnapshotMode::Blocking, )); turn_environments.update_selections(selections); turn_environments.snapshot().await; @@ -445,6 +465,7 @@ url = "ws://127.0.0.1:8765" local_shell.clone(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), + EnvironmentSnapshotMode::Blocking, ); turn_environments.update_selections(&[TurnEnvironmentSelection { environment_id: LOCAL_ENVIRONMENT_ID.to_string(), @@ -552,6 +573,51 @@ url = "ws://127.0.0.1:8765" assert_eq!(resolved.snapshot().await.to_selections(), vec![local]); } + #[tokio::test] + async fn blocking_snapshot_waits_for_starting_environment() { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind websocket listener"); + let manager = Arc::new( + EnvironmentManager::create_for_tests( + Some(format!( + "ws://{}", + listener.local_addr().expect("listener address") + )), + Some(test_runtime_paths()), + ) + .await, + ); + let selection = TurnEnvironmentSelection { + environment_id: REMOTE_ENVIRONMENT_ID.to_string(), + cwd: PathUri::from_abs_path(&AbsolutePathBuf::current_dir().expect("cwd")), + }; + let environments = Arc::new(ThreadEnvironments::new( + manager, + crate::shell::default_user_shell(), + ShellSnapshot::disabled(), + TurnEnvironmentSnapshot::default(), + EnvironmentSnapshotMode::Blocking, + )); + environments.update_selections(std::slice::from_ref(&selection)); + let snapshot_task = tokio::spawn({ + let environments = Arc::clone(&environments); + async move { environments.snapshot().await } + }); + tokio::task::yield_now().await; + assert!(!snapshot_task.is_finished()); + + let server = tokio::spawn(serve_environment_info(listener)); + let snapshot = timeout(Duration::from_secs(5), snapshot_task) + .await + .expect("snapshot should finish after the environment starts") + .expect("snapshot task"); + + assert!(snapshot.starting.is_empty()); + assert_eq!(snapshot.to_selections(), vec![selection]); + server.await.expect("server task"); + } + #[tokio::test] async fn snapshot_keeps_starting_environment_until_it_can_be_attached() { let listener = TcpListener::bind("127.0.0.1:0") @@ -582,6 +648,7 @@ url = "ws://127.0.0.1:8765" crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), + EnvironmentSnapshotMode::NonBlocking, ); turn_environments.update_selections(&[remote.clone(), local.clone()]); @@ -603,6 +670,7 @@ url = "ws://127.0.0.1:8765" vec![remote.clone()] ); assert_eq!(starting.to_selections(), vec![local.clone()]); + assert!(starting.single_local_environment().is_none()); let server = tokio::spawn(serve_environment_info(listener)); timeout( @@ -628,7 +696,7 @@ url = "ws://127.0.0.1:8765" } #[tokio::test] - async fn latest_environment_update_replaces_pending_resolution() { + async fn latest_environment_update_cancels_pending_resolution() { let listener = tokio::net::TcpListener::bind("127.0.0.1:0") .await .expect("bind websocket listener"); @@ -648,11 +716,20 @@ url = "ws://127.0.0.1:8765" crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), + EnvironmentSnapshotMode::Blocking, )); turn_environments.update_selections(&[TurnEnvironmentSelection { environment_id: REMOTE_ENVIRONMENT_ID.to_string(), cwd: PathUri::from_abs_path(&cwd), }]); + let resolution_abort = turn_environments + .environments + .load() + .first() + .and_then(|environment| environment._resolution_task.as_ref()) + .expect("resolution task") + .abort_handle(); + assert!(!resolution_abort.is_finished()); let (_connection, _) = tokio::time::timeout(std::time::Duration::from_secs(5), listener.accept()) .await @@ -664,16 +741,61 @@ url = "ws://127.0.0.1:8765" }; turn_environments.update_selections(std::slice::from_ref(&local)); - let snapshot = tokio::time::timeout( - std::time::Duration::from_secs(5), - turn_environments.snapshot(), - ) + timeout(Duration::from_secs(1), async { + while !resolution_abort.is_finished() { + tokio::task::yield_now().await; + } + }) .await - .expect("latest environment resolution should complete"); + .expect("replacing the selection should cancel its resolution task"); + let snapshot = turn_environments.snapshot().await; assert_eq!(snapshot.to_selections(), vec![local]); } + #[tokio::test] + async fn failed_resolution_is_replaced_from_the_environment_manager() { + let manager = Arc::new( + EnvironmentManager::create_for_tests( + Some("http://example.com".to_string()), + Some(test_runtime_paths()), + ) + .await, + ); + let selection = TurnEnvironmentSelection { + environment_id: REMOTE_ENVIRONMENT_ID.to_string(), + cwd: PathUri::from_abs_path(&AbsolutePathBuf::current_dir().expect("cwd")), + }; + let environments = ThreadEnvironments::new( + Arc::clone(&manager), + crate::shell::default_user_shell(), + ShellSnapshot::disabled(), + TurnEnvironmentSnapshot::default(), + EnvironmentSnapshotMode::NonBlocking, + ); + environments.update_selections(std::slice::from_ref(&selection)); + let failed_resolution = environments.environments.load()[0].resolution.clone(); + assert!(failed_resolution.clone().await.is_err()); + + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind replacement listener"); + manager + .upsert_environment( + REMOTE_ENVIRONMENT_ID.to_string(), + format!("ws://{}", listener.local_addr().expect("listener address")), + ) + .expect("replacement environment"); + environments.update_selections(std::slice::from_ref(&selection)); + + let replacement = environments.snapshot().await; + let [replacement] = replacement.starting.as_slice() else { + panic!("expected the replacement environment to be starting"); + }; + assert_eq!(replacement.selection, selection); + assert!(!failed_resolution.ptr_eq(&replacement.resolution)); + } + #[tokio::test] async fn matching_environment_id_and_cwd_reuse_resolution() { let cwd = AbsolutePathBuf::current_dir().expect("cwd"); @@ -699,6 +821,7 @@ url = "ws://127.0.0.1:8765" crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), + EnvironmentSnapshotMode::NonBlocking, ); environments.update_selections(std::slice::from_ref(&selection)); let initial_snapshot = environments.snapshot().await; @@ -769,7 +892,11 @@ url = "ws://127.0.0.1:8765" manager, crate::shell::default_user_shell(), ShellSnapshot::disabled(), - TurnEnvironmentSnapshot::from_turn_environments(vec![inherited]), + TurnEnvironmentSnapshot { + turn_environments: vec![inherited], + starting: Vec::new(), + }, + EnvironmentSnapshotMode::Blocking, ); environments.update_selections(std::slice::from_ref(&selection)); @@ -802,21 +929,27 @@ url = "ws://127.0.0.1:8765" Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string())) .expect("remote environment"), ); - let remote = TurnEnvironmentSnapshot::from_turn_environments(vec![TurnEnvironment::new( - REMOTE_ENVIRONMENT_ID.to_string(), - remote_environment.clone(), - cwd_uri.clone(), - /*shell*/ None, - )]); - let multiple = TurnEnvironmentSnapshot::from_turn_environments(vec![ - local.primary().expect("local environment").clone(), - TurnEnvironment::new( + let remote = TurnEnvironmentSnapshot { + turn_environments: vec![TurnEnvironment::new( REMOTE_ENVIRONMENT_ID.to_string(), - remote_environment, - cwd_uri, + remote_environment.clone(), + cwd_uri.clone(), /*shell*/ None, - ), - ]); + )], + starting: Vec::new(), + }; + let multiple = TurnEnvironmentSnapshot { + turn_environments: vec![ + local.primary().expect("local environment").clone(), + TurnEnvironment::new( + REMOTE_ENVIRONMENT_ID.to_string(), + remote_environment, + cwd_uri, + /*shell*/ None, + ), + ], + starting: Vec::new(), + }; assert_eq!(local.single_local_environment_cwd(), Some(cwd)); assert_eq!(remote.single_local_environment_cwd(), None); diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 583882cae1ef..f9bef189aea4 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -2,6 +2,7 @@ use super::input_queue::InputQueue; use super::*; use crate::agents_md::LoadedAgentsMd; use crate::config::ConstraintError; +use crate::environment_selection::EnvironmentSnapshotMode; use crate::environment_selection::ThreadEnvironments; use crate::environment_selection::TurnEnvironmentSnapshot; use crate::shell_snapshot::ShellSnapshot; @@ -826,6 +827,11 @@ impl Session { default_shell.clone(), shell_snapshot, inherited_environments.unwrap_or_default(), + if config.features.enabled(Feature::DeferredExecutor) { + EnvironmentSnapshotMode::NonBlocking + } else { + EnvironmentSnapshotMode::Blocking + }, )); turn_environments.update_selections(session_configuration.environment_selections()); let resolved_environments = turn_environments.snapshot().await; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 436b85907d05..a57d8270064a 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -6,6 +6,7 @@ use crate::config::ConfigOverrides; use crate::config::test_config; use crate::context::ContextualUserFragment; use crate::context::TurnAborted; +use crate::environment_selection::EnvironmentSnapshotMode; use crate::environment_selection::ThreadEnvironments; use crate::function_tool::FunctionCallError; use crate::shell::default_user_shell; @@ -4101,6 +4102,7 @@ async fn resolved_environments_for_configuration( default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), + EnvironmentSnapshotMode::Blocking, ); turn_environments.update_selections(session_configuration.environment_selections()); (environment_manager, turn_environments.snapshot().await) @@ -5010,6 +5012,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { default_user_shell(), ShellSnapshot::disabled(), resolved_environments, + EnvironmentSnapshotMode::Blocking, )); let environment = Arc::clone( &resolved_turn_environments @@ -7065,6 +7068,7 @@ where default_user_shell(), ShellSnapshot::disabled(), resolved_turn_environments.clone(), + EnvironmentSnapshotMode::Blocking, )); let environment = Arc::clone( &resolved_turn_environments diff --git a/codex-rs/core/tests/suite/remote_env.rs b/codex-rs/core/tests/suite/remote_env.rs index 63bb5deedac9..67e5668f45b3 100644 --- a/codex-rs/core/tests/suite/remote_env.rs +++ b/codex-rs/core/tests/suite/remote_env.rs @@ -55,6 +55,7 @@ use serde_json::json; use std::fs; use std::path::PathBuf; use std::process::Command; +use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; use tempfile::TempDir; @@ -299,6 +300,39 @@ async fn explicit_remote_shell_runs_in_remote_cwd() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn deferred_executor_reaches_model_before_remote_environment_is_ready() -> Result<()> { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; + let server = start_mock_server().await; + let response_mock = mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "done"), + ev_completed("resp-1"), + ]), + ) + .await; + let mut builder = test_codex() + .with_exec_server_url(format!("ws://{}", listener.local_addr()?)) + .with_config(|config| { + assert!(config.features.enable(Feature::DeferredExecutor).is_ok()); + }); + + let test = tokio::time::timeout(Duration::from_secs(5), builder.build(&server)) + .await + .context("thread startup should not wait for the remote environment")??; + tokio::time::timeout( + Duration::from_secs(5), + test.submit_turn("respond before the environment is ready"), + ) + .await + .context("turn should reach the model before the remote environment is ready")??; + + response_mock.single_request(); + Ok(()) +} + fn absolute_path(path: PathBuf) -> AbsolutePathBuf { AbsolutePathBuf::try_from(path).expect("path should be absolute") } diff --git a/codex-rs/features/src/lib.rs b/codex-rs/features/src/lib.rs index 8808cb37c399..8483ef97bdb2 100644 --- a/codex-rs/features/src/lib.rs +++ b/codex-rs/features/src/lib.rs @@ -124,6 +124,8 @@ pub enum Feature { UseLegacyLandlock, /// Experimental shell snapshotting. ShellSnapshot, + /// Allow turns to start while selected executors are still starting. + DeferredExecutor, /// Enable runtime metrics snapshots via a manual reader. RuntimeMetrics, /// Enable startup memory extraction and file-backed memory consolidation. @@ -816,6 +818,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::Stable, default_enabled: true, }, + FeatureSpec { + id: Feature::DeferredExecutor, + key: "deferred_executor", + stage: Stage::UnderDevelopment, + default_enabled: false, + }, FeatureSpec { id: Feature::JsRepl, key: "js_repl", From bb8869d02f094240b96c8bf1985e7c91e8f1d1f7 Mon Sep 17 00:00:00 2001 From: Sayan Sisodiya Date: Thu, 18 Jun 2026 21:41:13 -0700 Subject: [PATCH 5/5] core: simplify starting environment snapshots --- codex-rs/core/src/environment_selection.rs | 144 +++++---------------- codex-rs/core/src/session/session.rs | 7 +- codex-rs/core/src/session/tests.rs | 7 +- 3 files changed, 34 insertions(+), 124 deletions(-) diff --git a/codex-rs/core/src/environment_selection.rs b/codex-rs/core/src/environment_selection.rs index 996a216162d0..0d82a147be05 100644 --- a/codex-rs/core/src/environment_selection.rs +++ b/codex-rs/core/src/environment_selection.rs @@ -13,7 +13,6 @@ use codex_utils_path_uri::PathUri; use futures::FutureExt; use futures::future::BoxFuture; use futures::future::Shared; -use tokio_util::task::AbortOnDropHandle; use crate::session::turn_context::TurnEnvironment; use crate::shell::Shell; @@ -33,23 +32,13 @@ pub(crate) fn default_thread_environment_selections( .collect() } -type TurnEnvironmentResolution = - Shared>>>; - -#[derive(Clone, Copy)] -pub(crate) enum EnvironmentSnapshotMode { - Blocking, - NonBlocking, -} +type TurnEnvironmentResult = Result>; +type TurnEnvironmentResolution = Shared>; #[derive(Clone)] struct SelectedTurnEnvironment { selection: TurnEnvironmentSelection, - // True when the environment was still starting when this resolution was created. - delayed_start: bool, resolution: TurnEnvironmentResolution, - // Dropping the selected entry cancels resolution work that is no longer needed. - _resolution_task: Option>>, } #[derive(Clone)] @@ -72,7 +61,7 @@ pub(crate) struct ThreadEnvironments { environment_manager: Arc, local_shell: Shell, shell_snapshot: ShellSnapshot, - snapshot_mode: EnvironmentSnapshotMode, + non_blocking_snapshots: bool, environments: ArcSwap>, } @@ -82,7 +71,7 @@ impl ThreadEnvironments { local_shell: Shell, shell_snapshot: ShellSnapshot, current: TurnEnvironmentSnapshot, - snapshot_mode: EnvironmentSnapshotMode, + non_blocking_snapshots: bool, ) -> Self { // Reuse only attached environments from the supplied snapshot; drop starting entries. let environments = current @@ -94,9 +83,7 @@ impl ThreadEnvironments { futures::future::ready(Ok(environment)).boxed().shared(); SelectedTurnEnvironment { selection, - delayed_start: false, resolution, - _resolution_task: None, } }) .collect(); @@ -104,7 +91,7 @@ impl ThreadEnvironments { environment_manager, local_shell, shell_snapshot, - snapshot_mode, + non_blocking_snapshots, environments: ArcSwap::from_pointee(environments), } } @@ -120,7 +107,7 @@ impl ThreadEnvironments { if let Some(environment) = previous .iter() .find(|environment| environment.selection == *selected_environment) - && !matches!(environment.resolution.peek(), Some(Err(_))) + && !matches!(environment.resolution.clone().now_or_never(), Some(Err(_))) { next.push(environment.clone()); continue; @@ -131,24 +118,18 @@ impl ThreadEnvironments { tracing::warn!("skipping unknown turn environment `{environment_id}`"); continue; }; - let delayed_start = !environment.startup_finished(); - let resolution = Self::resolve_environment( + let (resolution_task, resolution) = Self::resolve_environment( selected_environment.clone(), environment, self.local_shell.clone(), self.shell_snapshot.clone(), - ); - let resolution_task = Arc::new(AbortOnDropHandle::new(tokio::spawn({ - let resolution = resolution.clone(); - async move { - let _ = resolution.await; - } - }))); + ) + .remote_handle(); + drop(tokio::spawn(resolution_task)); + let resolution = resolution.boxed().shared(); next.push(SelectedTurnEnvironment { selection: selected_environment.clone(), - delayed_start, resolution, - _resolution_task: Some(resolution_task), }); } self.environments.store(Arc::new(next)); @@ -159,7 +140,7 @@ impl ThreadEnvironments { environment: Arc, local_shell: Shell, shell_snapshot: ShellSnapshot, - ) -> TurnEnvironmentResolution { + ) -> BoxFuture<'static, TurnEnvironmentResult> { async move { let environment_id = &selection.environment_id; if let Err(err) = environment.wait_until_ready().await { @@ -198,7 +179,6 @@ impl ThreadEnvironments { Ok(turn_environment) } .boxed() - .shared() } pub(crate) async fn snapshot(&self) -> TurnEnvironmentSnapshot { @@ -206,16 +186,17 @@ impl ThreadEnvironments { let mut turn_environments = Vec::with_capacity(current.len()); let mut starting = Vec::new(); for environment in current.iter() { - let resolved = match self.snapshot_mode { - EnvironmentSnapshotMode::Blocking => Some(environment.resolution.clone().await), - EnvironmentSnapshotMode::NonBlocking if environment.delayed_start => { - environment.resolution.peek().cloned() - } - EnvironmentSnapshotMode::NonBlocking => Some(environment.resolution.clone().await), + let resolved = if self.non_blocking_snapshots { + environment.resolution.clone().now_or_never() + } else { + Some(environment.resolution.clone().await) }; match resolved { Some(Ok(turn_environment)) => turn_environments.push(turn_environment), - Some(Err(_)) => {} + Some(Err(err)) => tracing::debug!( + environment_id = %environment.selection.environment_id, + "skipping failed turn environment: {err}" + ), None => starting.push(StartingTurnEnvironment { selection: environment.selection.clone(), resolution: environment.resolution.clone(), @@ -319,7 +300,7 @@ mod tests { crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), - EnvironmentSnapshotMode::Blocking, + /*non_blocking_snapshots*/ false, )); turn_environments.update_selections(selections); turn_environments.snapshot().await; @@ -465,7 +446,7 @@ url = "ws://127.0.0.1:8765" local_shell.clone(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), - EnvironmentSnapshotMode::Blocking, + /*non_blocking_snapshots*/ false, ); turn_environments.update_selections(&[TurnEnvironmentSelection { environment_id: LOCAL_ENVIRONMENT_ID.to_string(), @@ -597,7 +578,7 @@ url = "ws://127.0.0.1:8765" crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), - EnvironmentSnapshotMode::Blocking, + /*non_blocking_snapshots*/ false, )); environments.update_selections(std::slice::from_ref(&selection)); let snapshot_task = tokio::spawn({ @@ -648,28 +629,21 @@ url = "ws://127.0.0.1:8765" crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), - EnvironmentSnapshotMode::NonBlocking, + /*non_blocking_snapshots*/ true, ); turn_environments.update_selections(&[remote.clone(), local.clone()]); let starting = turn_environments.snapshot().await; - assert_eq!( - starting - .turn_environments - .iter() - .map(TurnEnvironment::selection) - .collect::>(), - vec![local.clone()] - ); + assert!(starting.turn_environments.is_empty()); assert_eq!( starting .starting .iter() .map(|environment| environment.selection.clone()) .collect::>(), - vec![remote.clone()] + vec![remote.clone(), local.clone()] ); - assert_eq!(starting.to_selections(), vec![local.clone()]); + assert!(starting.to_selections().is_empty()); assert!(starting.single_local_environment().is_none()); let server = tokio::spawn(serve_environment_info(listener)); @@ -695,64 +669,6 @@ url = "ws://127.0.0.1:8765" server.await.expect("server task"); } - #[tokio::test] - async fn latest_environment_update_cancels_pending_resolution() { - let listener = tokio::net::TcpListener::bind("127.0.0.1:0") - .await - .expect("bind websocket listener"); - let manager = Arc::new( - EnvironmentManager::create_for_tests_with_local( - Some(format!( - "ws://{}", - listener.local_addr().expect("listener address") - )), - test_runtime_paths(), - ) - .await, - ); - let cwd = AbsolutePathBuf::current_dir().expect("cwd"); - let turn_environments = Arc::new(ThreadEnvironments::new( - manager, - crate::shell::default_user_shell(), - ShellSnapshot::disabled(), - TurnEnvironmentSnapshot::default(), - EnvironmentSnapshotMode::Blocking, - )); - turn_environments.update_selections(&[TurnEnvironmentSelection { - environment_id: REMOTE_ENVIRONMENT_ID.to_string(), - cwd: PathUri::from_abs_path(&cwd), - }]); - let resolution_abort = turn_environments - .environments - .load() - .first() - .and_then(|environment| environment._resolution_task.as_ref()) - .expect("resolution task") - .abort_handle(); - assert!(!resolution_abort.is_finished()); - let (_connection, _) = - tokio::time::timeout(std::time::Duration::from_secs(5), listener.accept()) - .await - .expect("remote resolution should connect") - .expect("accept remote resolution connection"); - let local = TurnEnvironmentSelection { - environment_id: LOCAL_ENVIRONMENT_ID.to_string(), - cwd: PathUri::from_abs_path(&cwd), - }; - - turn_environments.update_selections(std::slice::from_ref(&local)); - timeout(Duration::from_secs(1), async { - while !resolution_abort.is_finished() { - tokio::task::yield_now().await; - } - }) - .await - .expect("replacing the selection should cancel its resolution task"); - let snapshot = turn_environments.snapshot().await; - - assert_eq!(snapshot.to_selections(), vec![local]); - } - #[tokio::test] async fn failed_resolution_is_replaced_from_the_environment_manager() { let manager = Arc::new( @@ -771,7 +687,7 @@ url = "ws://127.0.0.1:8765" crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), - EnvironmentSnapshotMode::NonBlocking, + /*non_blocking_snapshots*/ true, ); environments.update_selections(std::slice::from_ref(&selection)); let failed_resolution = environments.environments.load()[0].resolution.clone(); @@ -821,7 +737,7 @@ url = "ws://127.0.0.1:8765" crate::shell::default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), - EnvironmentSnapshotMode::NonBlocking, + /*non_blocking_snapshots*/ true, ); environments.update_selections(std::slice::from_ref(&selection)); let initial_snapshot = environments.snapshot().await; @@ -896,7 +812,7 @@ url = "ws://127.0.0.1:8765" turn_environments: vec![inherited], starting: Vec::new(), }, - EnvironmentSnapshotMode::Blocking, + /*non_blocking_snapshots*/ false, ); environments.update_selections(std::slice::from_ref(&selection)); diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index f9bef189aea4..1e8c250bc968 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -2,7 +2,6 @@ use super::input_queue::InputQueue; use super::*; use crate::agents_md::LoadedAgentsMd; use crate::config::ConstraintError; -use crate::environment_selection::EnvironmentSnapshotMode; use crate::environment_selection::ThreadEnvironments; use crate::environment_selection::TurnEnvironmentSnapshot; use crate::shell_snapshot::ShellSnapshot; @@ -827,11 +826,7 @@ impl Session { default_shell.clone(), shell_snapshot, inherited_environments.unwrap_or_default(), - if config.features.enabled(Feature::DeferredExecutor) { - EnvironmentSnapshotMode::NonBlocking - } else { - EnvironmentSnapshotMode::Blocking - }, + config.features.enabled(Feature::DeferredExecutor), )); turn_environments.update_selections(session_configuration.environment_selections()); let resolved_environments = turn_environments.snapshot().await; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index a57d8270064a..e7532a1e57b4 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -6,7 +6,6 @@ use crate::config::ConfigOverrides; use crate::config::test_config; use crate::context::ContextualUserFragment; use crate::context::TurnAborted; -use crate::environment_selection::EnvironmentSnapshotMode; use crate::environment_selection::ThreadEnvironments; use crate::function_tool::FunctionCallError; use crate::shell::default_user_shell; @@ -4102,7 +4101,7 @@ async fn resolved_environments_for_configuration( default_user_shell(), ShellSnapshot::disabled(), TurnEnvironmentSnapshot::default(), - EnvironmentSnapshotMode::Blocking, + /*non_blocking_snapshots*/ false, ); turn_environments.update_selections(session_configuration.environment_selections()); (environment_manager, turn_environments.snapshot().await) @@ -5012,7 +5011,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { default_user_shell(), ShellSnapshot::disabled(), resolved_environments, - EnvironmentSnapshotMode::Blocking, + /*non_blocking_snapshots*/ false, )); let environment = Arc::clone( &resolved_turn_environments @@ -7068,7 +7067,7 @@ where default_user_shell(), ShellSnapshot::disabled(), resolved_turn_environments.clone(), - EnvironmentSnapshotMode::Blocking, + /*non_blocking_snapshots*/ false, )); let environment = Arc::clone( &resolved_turn_environments