From 005450e87cbb071a8558da7e45269fc12bcb4c66 Mon Sep 17 00:00:00 2001 From: Richard Lee Date: Fri, 26 Jun 2026 12:28:04 -0500 Subject: [PATCH 1/3] unified-exec: consume pushed process events --- codex-rs/core/src/unified_exec/process.rs | 182 ++++++++++++------ .../core/src/unified_exec/process_tests.rs | 156 +++++++++++++-- codex-rs/exec-server-protocol/src/protocol.rs | 15 ++ codex-rs/exec-server/README.md | 14 +- codex-rs/exec-server/src/client.rs | 4 + codex-rs/exec-server/src/client_recovery.rs | 33 +++- .../exec-server/src/client_recovery_tests.rs | 42 ++++ codex-rs/exec-server/src/local_process.rs | 9 +- codex-rs/exec-server/src/process.rs | 45 ++++- codex-rs/exec-server/tests/exec_process.rs | 5 +- 10 files changed, 414 insertions(+), 91 deletions(-) diff --git a/codex-rs/core/src/unified_exec/process.rs b/codex-rs/core/src/unified_exec/process.rs index 5f785adfa75a..7bedfd43e322 100644 --- a/codex-rs/core/src/unified_exec/process.rs +++ b/codex-rs/core/src/unified_exec/process.rs @@ -14,6 +14,7 @@ use tokio_util::sync::CancellationToken; use crate::exec::is_likely_sandbox_denied; use codex_exec_server::ExecProcess; +use codex_exec_server::ExecProcessEvent; use codex_exec_server::ProcessSignal as ExecServerProcessSignal; use codex_exec_server::ReadResponse as ExecReadResponse; use codex_exec_server::StartedExecProcess; @@ -425,84 +426,151 @@ impl UnifiedExecProcess { cancellation_token, } = output_handles; let process = started.process; - let mut wake_rx = process.subscribe_wake(); + let mut events = process.subscribe_events(); tokio::spawn(async move { - let mut after_seq = None; + let mut last_seq: u64 = 0; loop { - match process - .read(after_seq, /*max_bytes*/ None, /*wait_ms*/ Some(0)) - .await + let event = match events.recv().await { + Ok(event) => Some(event), + Err(broadcast::error::RecvError::Lagged(_)) => None, + Err(broadcast::error::RecvError::Closed) => { + let state = state_tx.borrow().clone(); + let _ = state_tx.send_replace( + state.failed("exec-server process event stream closed".to_string()), + ); + output_closed.store(true, Ordering::Release); + output_closed_notify.notify_waiters(); + cancellation_token.cancel(); + break; + } + }; + let event_seq = event.as_ref().and_then(|event| match event { + ExecProcessEvent::Output(chunk) => Some(chunk.seq), + ExecProcessEvent::Exited { seq, .. } | ExecProcessEvent::Closed { seq } => { + Some(*seq) + } + ExecProcessEvent::Failed(_) => None, + }); + let missing_sandbox_denial = matches!( + event.as_ref(), + Some(ExecProcessEvent::Exited { + sandbox_denied: None, + .. + }) + ); + if event.is_none() + || event_seq.is_some_and(|seq| seq > last_seq.saturating_add(1)) + || missing_sandbox_denial { - Ok(response) => { - let ExecReadResponse { - chunks, - next_seq, - exited, - exit_code, - closed, - failure, - sandbox_denied, - } = response; - - for chunk in chunks { - let bytes = chunk.chunk.into_inner(); - let mut guard = output_buffer.lock().await; - guard.push_chunk(bytes.clone()); - drop(guard); - let _ = output_tx.send(bytes); - output_notify.notify_waiters(); - } - - if let Some(message) = failure { + let response = match process + .read( + Some(last_seq), + /*max_bytes*/ None, + /*wait_ms*/ Some(0), + ) + .await + { + Ok(response) => response, + Err(err) => { let state = state_tx.borrow().clone(); - let _ = state_tx.send_replace(state.failed(message)); + let _ = state_tx.send_replace(state.failed(err.to_string())); output_closed.store(true, Ordering::Release); output_closed_notify.notify_waiters(); cancellation_token.cancel(); break; } + }; + let ExecReadResponse { + chunks, + next_seq, + exited, + exit_code, + closed, + failure, + sandbox_denied, + } = response; + for chunk in chunks.into_iter().filter(|chunk| chunk.seq > last_seq) { + let bytes = chunk.chunk.into_inner(); + let mut guard = output_buffer.lock().await; + guard.push_chunk(bytes.clone()); + drop(guard); + let _ = output_tx.send(bytes); + output_notify.notify_waiters(); + } + last_seq = last_seq.max(next_seq.saturating_sub(1)); + if let Some(message) = failure { + let state = state_tx.borrow().clone(); + let _ = state_tx.send_replace(state.failed(message)); + output_closed.store(true, Ordering::Release); + output_closed_notify.notify_waiters(); + cancellation_token.cancel(); + break; + } + if sandbox_denied || exited { + let mut state = state_tx.borrow().clone(); + state.sandbox_denied |= sandbox_denied; + let _ = state_tx.send_replace(if exited { + state.exited(exit_code) + } else { + state + }); + } + if closed { + output_closed.store(true, Ordering::Release); + output_closed_notify.notify_waiters(); + cancellation_token.cancel(); + break; + } + continue; + } - if sandbox_denied { - let mut state = state_tx.borrow().clone(); - state.sandbox_denied = true; - let _ = state_tx.send_replace(state); - } - - if exited { - let state = state_tx.borrow().clone(); - let _ = state_tx.send_replace(state.exited(exit_code)); + let Some(event) = event else { + continue; + }; + match event { + ExecProcessEvent::Output(chunk) => { + if chunk.seq <= last_seq { + continue; } - - if closed { - output_closed.store(true, Ordering::Release); - output_closed_notify.notify_waiters(); - cancellation_token.cancel(); + last_seq = chunk.seq; + let bytes = chunk.chunk.into_inner(); + let mut guard = output_buffer.lock().await; + guard.push_chunk(bytes.clone()); + drop(guard); + let _ = output_tx.send(bytes); + output_notify.notify_waiters(); + } + ExecProcessEvent::Exited { + seq, + exit_code, + sandbox_denied, + } => { + if seq <= last_seq { + continue; } - - after_seq = next_seq.checked_sub(1); - if output_closed.load(Ordering::Acquire) { - break; + last_seq = seq; + let mut state = state_tx.borrow().clone(); + state.sandbox_denied |= sandbox_denied.unwrap_or(false); + let _ = state_tx.send_replace(state.exited(Some(exit_code))); + } + ExecProcessEvent::Closed { seq } => { + if seq <= last_seq { + continue; } + output_closed.store(true, Ordering::Release); + output_closed_notify.notify_waiters(); + cancellation_token.cancel(); + break; } - Err(err) => { + ExecProcessEvent::Failed(message) => { let state = state_tx.borrow().clone(); - let _ = state_tx.send_replace(state.failed(err.to_string())); + let _ = state_tx.send_replace(state.failed(message)); output_closed.store(true, Ordering::Release); output_closed_notify.notify_waiters(); cancellation_token.cancel(); break; } } - - if wake_rx.changed().await.is_err() { - let state = state_tx.borrow().clone(); - let _ = state_tx - .send_replace(state.failed("exec-server wake channel closed".to_string())); - output_closed.store(true, Ordering::Release); - output_closed_notify.notify_waiters(); - cancellation_token.cancel(); - break; - } } }) } diff --git a/codex-rs/core/src/unified_exec/process_tests.rs b/codex-rs/core/src/unified_exec/process_tests.rs index db64d461ec66..1a30dc8f49e7 100644 --- a/codex-rs/core/src/unified_exec/process_tests.rs +++ b/codex-rs/core/src/unified_exec/process_tests.rs @@ -1,10 +1,12 @@ use super::process::UnifiedExecProcess; use crate::unified_exec::UnifiedExecError; use codex_exec_server::ExecProcess; +use codex_exec_server::ExecProcessEvent; use codex_exec_server::ExecProcessEventReceiver; use codex_exec_server::ExecProcessFuture; use codex_exec_server::ExecServerError; use codex_exec_server::ProcessId; +use codex_exec_server::ProcessOutputChunk; use codex_exec_server::ProcessSignal; use codex_exec_server::ReadResponse; use codex_exec_server::StartedExecProcess; @@ -13,9 +15,12 @@ use codex_exec_server::WriteStatus; use pretty_assertions::assert_eq; use std::collections::VecDeque; use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use tokio::sync::Mutex; use tokio::sync::watch; -use tokio::time::Duration; + +use codex_exec_server::ExecOutputStream; struct MockExecProcess { process_id: ProcessId, @@ -23,10 +28,13 @@ struct MockExecProcess { read_responses: Mutex>, terminate_error: Option, wake_tx: watch::Sender, + events: Vec, + read_count: Arc, } impl MockExecProcess { async fn read(&self) -> Result { + self.read_count.fetch_add(1, Ordering::Relaxed); Ok(self .read_responses .lock() @@ -61,7 +69,7 @@ impl ExecProcess for MockExecProcess { } fn subscribe_events(&self) -> ExecProcessEventReceiver { - ExecProcessEventReceiver::empty() + ExecProcessEventReceiver::from_events(self.events.clone()) } fn read( @@ -100,6 +108,8 @@ async fn remote_process( read_responses: Mutex::new(VecDeque::new()), terminate_error, wake_tx, + events: Vec::new(), + read_count: Arc::new(AtomicUsize::new(0)), }), }; @@ -177,35 +187,157 @@ async fn remote_terminate_confirmed_updates_state_on_success_only() { #[tokio::test] async fn remote_process_waits_for_early_exit_event() { let (wake_tx, _wake_rx) = watch::channel(0); + let read_count = Arc::new(AtomicUsize::new(0)); let started = StartedExecProcess { process: Arc::new(MockExecProcess { process_id: "test-process".to_string().into(), write_response: WriteResponse { status: WriteStatus::Accepted, }, + read_responses: Mutex::new(VecDeque::new()), + terminate_error: None, + wake_tx, + events: vec![ + ExecProcessEvent::Exited { + seq: 1, + exit_code: 17, + sandbox_denied: Some(false), + }, + ExecProcessEvent::Closed { seq: 2 }, + ], + read_count: Arc::clone(&read_count), + }), + }; + + let process = UnifiedExecProcess::from_exec_server_started(started) + .await + .expect("remote process should observe early exit"); + + assert!(process.has_exited()); + assert_eq!(process.exit_code(), Some(17)); + assert_eq!(read_count.load(Ordering::Relaxed), 0); +} + +#[tokio::test] +async fn remote_process_preserves_sandbox_denial_before_closed_event() { + let (wake_tx, _wake_rx) = watch::channel(0); + let read_count = Arc::new(AtomicUsize::new(0)); + let started = StartedExecProcess { + process: Arc::new(MockExecProcess { + process_id: "sandbox-denied".to_string().into(), + write_response: WriteResponse { + status: WriteStatus::Accepted, + }, + read_responses: Mutex::new(VecDeque::new()), + terminate_error: None, + wake_tx, + events: vec![ExecProcessEvent::Exited { + seq: 1, + exit_code: 1, + sandbox_denied: Some(true), + }], + read_count: Arc::clone(&read_count), + }), + }; + + let error = UnifiedExecProcess::from_exec_server_started(started) + .await + .expect_err("sandbox denial should be preserved"); + + assert!(matches!(error, UnifiedExecError::SandboxDenied { .. })); + assert_eq!(read_count.load(Ordering::Relaxed), 0); +} + +#[tokio::test] +async fn remote_process_reads_legacy_exit_event_for_sandbox_denial() { + let (wake_tx, _wake_rx) = watch::channel(0); + let read_count = Arc::new(AtomicUsize::new(0)); + let started = StartedExecProcess { + process: Arc::new(MockExecProcess { + process_id: "legacy-sandbox-denied".to_string().into(), + write_response: WriteResponse { + status: WriteStatus::Accepted, + }, read_responses: Mutex::new(VecDeque::from([ReadResponse { chunks: Vec::new(), next_seq: 2, exited: true, - exit_code: Some(17), + exit_code: Some(1), + closed: false, + failure: None, + sandbox_denied: true, + }])), + terminate_error: None, + wake_tx, + events: vec![ExecProcessEvent::Exited { + seq: 1, + exit_code: 1, + sandbox_denied: None, + }], + read_count: Arc::clone(&read_count), + }), + }; + + let error = UnifiedExecProcess::from_exec_server_started(started) + .await + .expect_err("legacy exit should recover executor sandbox denial"); + + assert!(matches!(error, UnifiedExecError::SandboxDenied { .. })); + assert_eq!(read_count.load(Ordering::Relaxed), 1); +} + +#[tokio::test] +async fn remote_process_recovers_output_missing_from_event_replay() { + let (wake_tx, _wake_rx) = watch::channel(0); + let read_count = Arc::new(AtomicUsize::new(0)); + let recovered_chunks = vec![ + ProcessOutputChunk { + seq: 1, + stream: ExecOutputStream::Stdout, + chunk: b"one".to_vec().into(), + }, + ProcessOutputChunk { + seq: 2, + stream: ExecOutputStream::Stdout, + chunk: b"two".to_vec().into(), + }, + ]; + let started = StartedExecProcess { + process: Arc::new(MockExecProcess { + process_id: "truncated-replay".to_string().into(), + write_response: WriteResponse { + status: WriteStatus::Accepted, + }, + read_responses: Mutex::new(VecDeque::from([ReadResponse { + chunks: recovered_chunks, + next_seq: 5, + exited: true, + exit_code: Some(0), closed: true, failure: None, sandbox_denied: false, }])), terminate_error: None, - wake_tx: wake_tx.clone(), + wake_tx, + // A bounded event replay can begin after earlier events were evicted. + events: vec![ExecProcessEvent::Output(ProcessOutputChunk { + seq: 2, + stream: ExecOutputStream::Stdout, + chunk: b"two".to_vec().into(), + })], + read_count: Arc::clone(&read_count), }), }; - tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(10)).await; - let _ = wake_tx.send(1); - }); - let process = UnifiedExecProcess::from_exec_server_started(started) .await - .expect("remote process should observe early exit"); + .expect("missing replay events should recover through process/read"); - assert!(process.has_exited()); - assert_eq!(process.exit_code(), Some(17)); + let output_handles = process.output_handles(); + assert_eq!( + output_handles.output_buffer.lock().await.snapshot_chunks(), + vec![b"one".to_vec(), b"two".to_vec()] + ); + assert_eq!(process.exit_code(), Some(0)); + assert_eq!(read_count.load(Ordering::Relaxed), 1); } diff --git a/codex-rs/exec-server-protocol/src/protocol.rs b/codex-rs/exec-server-protocol/src/protocol.rs index 491fc8cf97ba..a265aaed0c3a 100644 --- a/codex-rs/exec-server-protocol/src/protocol.rs +++ b/codex-rs/exec-server-protocol/src/protocol.rs @@ -527,6 +527,8 @@ pub struct ExecExitedNotification { pub process_id: ProcessId, pub seq: u64, pub exit_code: i32, + #[serde(default)] + pub sandbox_denied: Option, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -564,6 +566,7 @@ mod base64_bytes { #[cfg(test)] mod tests { use super::EnvironmentInfo; + use super::ExecExitedNotification; use super::ExecParams; use super::FsReadFileParams; use super::HttpRequestParams; @@ -708,4 +711,16 @@ mod tests { ("req-explicit-timeout", Some(1234)) ); } + + #[test] + fn exited_notification_accepts_legacy_payload_without_sandbox_denied() { + let notification: ExecExitedNotification = serde_json::from_value(serde_json::json!({ + "processId": "proc-1", + "seq": 3, + "exitCode": 1, + })) + .expect("legacy exited notification should deserialize"); + + assert_eq!(notification.sandbox_denied, None); + } } diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index b1f927918660..a250ba588667 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -321,10 +321,15 @@ Params: { "processId": "proc-1", "seq": 2, - "exitCode": 0 + "exitCode": 0, + "sandboxDenied": false } ``` +`sandboxDenied` lets streaming clients preserve executor-side sandbox denial +detection without issuing a final `process/read` request. Clients recover it +with `process/read` when an older server omits the field. + ### `process/closed` Notification emitted after process output is closed and the process handle is @@ -334,7 +339,8 @@ Params: ```json { - "processId": "proc-1" + "processId": "proc-1", + "seq": 3 } ``` @@ -431,6 +437,6 @@ Terminate it: ```json {"id":4,"method":"process/terminate","params":{"processId":"proc-1"}} {"id":4,"result":{"running":true}} -{"method":"process/exited","params":{"processId":"proc-1","seq":3,"exitCode":0}} -{"method":"process/closed","params":{"processId":"proc-1"}} +{"method":"process/exited","params":{"processId":"proc-1","seq":3,"exitCode":0,"sandboxDenied":false}} +{"method":"process/closed","params":{"processId":"proc-1","seq":4}} ``` diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index e2429d8f1440..e5f813cc4e22 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -1161,6 +1161,7 @@ async fn handle_server_notification( let published_closed = session.publish_ordered_event(ExecProcessEvent::Exited { seq: params.seq, exit_code: params.exit_code, + sandbox_denied: params.sandbox_denied, }); if published_closed { inner.remove_session_if(¶ms.process_id, &session); @@ -1737,6 +1738,7 @@ mod tests { process_id: process_id.clone(), seq: 3, exit_code: 0, + sandbox_denied: Some(true), }) .expect("exit notification should serialize"), ), @@ -1786,6 +1788,7 @@ mod tests { ExecProcessEvent::Exited { seq: 3, exit_code: 0, + sandbox_denied: Some(true), }, ExecProcessEvent::Closed { seq: 4 }, ] @@ -2391,6 +2394,7 @@ mod tests { process_id: quiet_process_id, seq: 1, exit_code: 17, + sandbox_denied: Some(false), }) .expect("exit notification should serialize"), ), diff --git a/codex-rs/exec-server/src/client_recovery.rs b/codex-rs/exec-server/src/client_recovery.rs index 61cee009cdd5..252948cfc1da 100644 --- a/codex-rs/exec-server/src/client_recovery.rs +++ b/codex-rs/exec-server/src/client_recovery.rs @@ -58,7 +58,7 @@ impl SessionState { exit_code, closed, failure, - sandbox_denied: _, + sandbox_denied, } = response; if let Some(message) = failure { return Err(ExecServerError::Protocol(format!( @@ -102,11 +102,21 @@ impl SessionState { } } - let exit_known = ordered_events.exit_published - || ordered_events - .pending - .range(..=target_seq) - .any(|(_, event)| matches!(event, ExecProcessEvent::Exited { .. })); + let pending_exit = ordered_events.pending.range_mut(..=target_seq).find_map( + |(_, event)| match event { + ExecProcessEvent::Exited { + sandbox_denied: pending_sandbox_denied, + .. + } => Some(pending_sandbox_denied), + _ => None, + }, + ); + let exit_pending = pending_exit.is_some(); + if let Some(pending_sandbox_denied) = pending_exit { + *pending_sandbox_denied = + Some(pending_sandbox_denied.unwrap_or(false) || sandbox_denied); + } + let exit_known = ordered_events.exit_published || exit_pending; let event_count = target_seq - ordered_events.last_published_seq; let retained_count = ordered_events .pending @@ -123,9 +133,14 @@ impl SessionState { "recovering exited process did not include its exit code".to_string(), ) })?; - ordered_events - .pending - .insert(seq, ExecProcessEvent::Exited { seq, exit_code }); + ordered_events.pending.insert( + seq, + ExecProcessEvent::Exited { + seq, + exit_code, + sandbox_denied: Some(sandbox_denied), + }, + ); } else if missing_count != 0 { return Err(recovery_gap_error(target_seq)); } diff --git a/codex-rs/exec-server/src/client_recovery_tests.rs b/codex-rs/exec-server/src/client_recovery_tests.rs index b54b80e3c5b6..d2126fd70b42 100644 --- a/codex-rs/exec-server/src/client_recovery_tests.rs +++ b/codex-rs/exec-server/src/client_recovery_tests.rs @@ -1,6 +1,8 @@ use std::time::Duration; use super::*; +use crate::protocol::ExecOutputStream; +use crate::protocol::ProcessOutputChunk; fn registry_error(status: reqwest::StatusCode, code: Option<&str>) -> ExecServerError { ExecServerError::EnvironmentRegistryHttp { @@ -54,3 +56,43 @@ fn recovery_does_not_retry_other_registry_conflicts() { assert!(!is_retryable_registry_error(&error)); assert!(!is_retryable_recovery_error(&error)); } + +#[tokio::test] +async fn recovery_adds_sandbox_denial_to_pending_exit_event() { + let state = SessionState::new(/*recoverable*/ true); + assert!(!state.publish_ordered_event(ExecProcessEvent::Exited { + seq: 2, + exit_code: 1, + sandbox_denied: None, + })); + + state + .recover_events(ReadResponse { + chunks: vec![ProcessOutputChunk { + seq: 1, + stream: ExecOutputStream::Stderr, + chunk: b"sandbox denied".to_vec().into(), + }], + next_seq: 3, + exited: true, + exit_code: Some(1), + closed: false, + failure: None, + sandbox_denied: true, + }) + .expect("recovery should publish the pending exit"); + + let mut events = state.subscribe_events(); + assert!(matches!( + events.recv().await, + Ok(ExecProcessEvent::Output(_)) + )); + assert_eq!( + events.recv().await, + Ok(ExecProcessEvent::Exited { + seq: 2, + exit_code: 1, + sandbox_denied: Some(true), + }) + ); +} diff --git a/codex-rs/exec-server/src/local_process.rs b/codex-rs/exec-server/src/local_process.rs index 7335bd1eeda1..3c488f0ad7c3 100644 --- a/codex-rs/exec-server/src/local_process.rs +++ b/codex-rs/exec-server/src/local_process.rs @@ -876,13 +876,16 @@ async fn watch_exit( process.sandbox_denied = is_likely_sandbox_denied(process.sandbox, &exec_output); } let _ = process.wake_tx.send(seq); - process - .events - .publish(ExecProcessEvent::Exited { seq, exit_code }); + process.events.publish(ExecProcessEvent::Exited { + seq, + exit_code, + sandbox_denied: Some(process.sandbox_denied), + }); Some(ExecExitedNotification { process_id: process_id.clone(), seq, exit_code, + sandbox_denied: Some(process.sandbox_denied), }) } else { None diff --git a/codex-rs/exec-server/src/process.rs b/codex-rs/exec-server/src/process.rs index b136ef4682fa..54f57c2168a9 100644 --- a/codex-rs/exec-server/src/process.rs +++ b/codex-rs/exec-server/src/process.rs @@ -30,8 +30,14 @@ pub struct StartedExecProcess { #[derive(Debug, Clone, PartialEq, Eq)] pub enum ExecProcessEvent { Output(ProcessOutputChunk), - Exited { seq: u64, exit_code: i32 }, - Closed { seq: u64 }, + Exited { + seq: u64, + exit_code: i32, + sandbox_denied: Option, + }, + Closed { + seq: u64, + }, Failed(String), } @@ -125,21 +131,38 @@ impl ExecProcessEventLog { let live_rx = self.inner.live_tx.subscribe(); let replay = history.events.iter().cloned().collect(); - ExecProcessEventReceiver { replay, live_rx } + ExecProcessEventReceiver { + replay, + live_rx, + _keepalive: None, + } } } pub struct ExecProcessEventReceiver { replay: VecDeque, live_rx: broadcast::Receiver, + _keepalive: Option>, } impl ExecProcessEventReceiver { + /// Returns a receiver that remains open without yielding events. pub fn empty() -> Self { - let (_live_tx, live_rx) = broadcast::channel(1); + let (live_tx, live_rx) = broadcast::channel(1); Self { replay: VecDeque::new(), live_rx, + _keepalive: Some(live_tx), + } + } + + #[doc(hidden)] + pub fn from_events(events: Vec) -> Self { + let (live_tx, live_rx) = broadcast::channel(1); + Self { + replay: events.into(), + live_rx, + _keepalive: Some(live_tx), } } @@ -202,9 +225,21 @@ mod tests { use super::ExecProcessEvent; use super::ExecProcessEventLog; + use super::ExecProcessEventReceiver; use crate::protocol::ExecOutputStream; use crate::protocol::ProcessOutputChunk; + #[tokio::test] + async fn empty_event_receiver_stays_open() { + let mut events = ExecProcessEventReceiver::empty(); + + assert!( + timeout(Duration::from_millis(10), events.recv()) + .await + .is_err() + ); + } + #[tokio::test] async fn event_history_replay_is_bounded_by_retained_bytes() { let log = ExecProcessEventLog::new(/*event_capacity*/ 8, /*byte_capacity*/ 3); @@ -217,6 +252,7 @@ mod tests { log.publish(ExecProcessEvent::Exited { seq: 2, exit_code: 0, + sandbox_denied: Some(false), }); log.publish(ExecProcessEvent::Closed { seq: 3 }); @@ -238,6 +274,7 @@ mod tests { ExecProcessEvent::Exited { seq: 2, exit_code: 0, + sandbox_denied: Some(false), }, ExecProcessEvent::Closed { seq: 3 }, ] diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index 8fe5fe0be98e..73377da60b12 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -164,6 +164,7 @@ async fn collect_process_output_from_events( ExecProcessEvent::Exited { seq: _, exit_code: code, + .. } => { exit_code = Some(code); } @@ -190,7 +191,7 @@ async fn collect_process_event_snapshots( stream: chunk.stream, text: String::from_utf8_lossy(&chunk.chunk.into_inner()).into_owned(), }, - ExecProcessEvent::Exited { seq, exit_code } => { + ExecProcessEvent::Exited { seq, exit_code, .. } => { ProcessEventSnapshot::Exited { seq, exit_code } } ExecProcessEvent::Closed { seq } => ProcessEventSnapshot::Closed { seq }, @@ -792,7 +793,7 @@ async fn remote_exec_process_recovers_after_transport_disconnect() -> Result<()> last_seq = chunk.seq; output.extend_from_slice(&chunk.chunk.into_inner()); } - ExecProcessEvent::Exited { seq, exit_code } => { + ExecProcessEvent::Exited { seq, exit_code, .. } => { assert_eq!(seq, last_seq + 1); assert_eq!(exit_code, 7); last_seq = seq; From 9fa6c12da9d03c67b5c5f1a9c2790d8659e5a06e Mon Sep 17 00:00:00 2001 From: Richard Lee Date: Fri, 26 Jun 2026 18:02:36 -0500 Subject: [PATCH 2/3] unified-exec: add pushed event integration coverage --- .../core/src/unified_exec/process_tests.rs | 171 +----- codex-rs/core/tests/suite/mod.rs | 1 + .../suite/unified_exec_process_events.rs | 489 ++++++++++++++++++ codex-rs/exec-server/src/process.rs | 10 - 4 files changed, 491 insertions(+), 180 deletions(-) create mode 100644 codex-rs/core/tests/suite/unified_exec_process_events.rs diff --git a/codex-rs/core/src/unified_exec/process_tests.rs b/codex-rs/core/src/unified_exec/process_tests.rs index 1a30dc8f49e7..af0cb627413c 100644 --- a/codex-rs/core/src/unified_exec/process_tests.rs +++ b/codex-rs/core/src/unified_exec/process_tests.rs @@ -1,12 +1,10 @@ use super::process::UnifiedExecProcess; use crate::unified_exec::UnifiedExecError; use codex_exec_server::ExecProcess; -use codex_exec_server::ExecProcessEvent; use codex_exec_server::ExecProcessEventReceiver; use codex_exec_server::ExecProcessFuture; use codex_exec_server::ExecServerError; use codex_exec_server::ProcessId; -use codex_exec_server::ProcessOutputChunk; use codex_exec_server::ProcessSignal; use codex_exec_server::ReadResponse; use codex_exec_server::StartedExecProcess; @@ -15,26 +13,19 @@ use codex_exec_server::WriteStatus; use pretty_assertions::assert_eq; use std::collections::VecDeque; use std::sync::Arc; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; use tokio::sync::Mutex; use tokio::sync::watch; -use codex_exec_server::ExecOutputStream; - struct MockExecProcess { process_id: ProcessId, write_response: WriteResponse, read_responses: Mutex>, terminate_error: Option, wake_tx: watch::Sender, - events: Vec, - read_count: Arc, } impl MockExecProcess { async fn read(&self) -> Result { - self.read_count.fetch_add(1, Ordering::Relaxed); Ok(self .read_responses .lock() @@ -69,7 +60,7 @@ impl ExecProcess for MockExecProcess { } fn subscribe_events(&self) -> ExecProcessEventReceiver { - ExecProcessEventReceiver::from_events(self.events.clone()) + ExecProcessEventReceiver::empty() } fn read( @@ -108,8 +99,6 @@ async fn remote_process( read_responses: Mutex::new(VecDeque::new()), terminate_error, wake_tx, - events: Vec::new(), - read_count: Arc::new(AtomicUsize::new(0)), }), }; @@ -183,161 +172,3 @@ async fn remote_terminate_confirmed_updates_state_on_success_only() { assert!(process.has_exited()); } - -#[tokio::test] -async fn remote_process_waits_for_early_exit_event() { - let (wake_tx, _wake_rx) = watch::channel(0); - let read_count = Arc::new(AtomicUsize::new(0)); - let started = StartedExecProcess { - process: Arc::new(MockExecProcess { - process_id: "test-process".to_string().into(), - write_response: WriteResponse { - status: WriteStatus::Accepted, - }, - read_responses: Mutex::new(VecDeque::new()), - terminate_error: None, - wake_tx, - events: vec![ - ExecProcessEvent::Exited { - seq: 1, - exit_code: 17, - sandbox_denied: Some(false), - }, - ExecProcessEvent::Closed { seq: 2 }, - ], - read_count: Arc::clone(&read_count), - }), - }; - - let process = UnifiedExecProcess::from_exec_server_started(started) - .await - .expect("remote process should observe early exit"); - - assert!(process.has_exited()); - assert_eq!(process.exit_code(), Some(17)); - assert_eq!(read_count.load(Ordering::Relaxed), 0); -} - -#[tokio::test] -async fn remote_process_preserves_sandbox_denial_before_closed_event() { - let (wake_tx, _wake_rx) = watch::channel(0); - let read_count = Arc::new(AtomicUsize::new(0)); - let started = StartedExecProcess { - process: Arc::new(MockExecProcess { - process_id: "sandbox-denied".to_string().into(), - write_response: WriteResponse { - status: WriteStatus::Accepted, - }, - read_responses: Mutex::new(VecDeque::new()), - terminate_error: None, - wake_tx, - events: vec![ExecProcessEvent::Exited { - seq: 1, - exit_code: 1, - sandbox_denied: Some(true), - }], - read_count: Arc::clone(&read_count), - }), - }; - - let error = UnifiedExecProcess::from_exec_server_started(started) - .await - .expect_err("sandbox denial should be preserved"); - - assert!(matches!(error, UnifiedExecError::SandboxDenied { .. })); - assert_eq!(read_count.load(Ordering::Relaxed), 0); -} - -#[tokio::test] -async fn remote_process_reads_legacy_exit_event_for_sandbox_denial() { - let (wake_tx, _wake_rx) = watch::channel(0); - let read_count = Arc::new(AtomicUsize::new(0)); - let started = StartedExecProcess { - process: Arc::new(MockExecProcess { - process_id: "legacy-sandbox-denied".to_string().into(), - write_response: WriteResponse { - status: WriteStatus::Accepted, - }, - read_responses: Mutex::new(VecDeque::from([ReadResponse { - chunks: Vec::new(), - next_seq: 2, - exited: true, - exit_code: Some(1), - closed: false, - failure: None, - sandbox_denied: true, - }])), - terminate_error: None, - wake_tx, - events: vec![ExecProcessEvent::Exited { - seq: 1, - exit_code: 1, - sandbox_denied: None, - }], - read_count: Arc::clone(&read_count), - }), - }; - - let error = UnifiedExecProcess::from_exec_server_started(started) - .await - .expect_err("legacy exit should recover executor sandbox denial"); - - assert!(matches!(error, UnifiedExecError::SandboxDenied { .. })); - assert_eq!(read_count.load(Ordering::Relaxed), 1); -} - -#[tokio::test] -async fn remote_process_recovers_output_missing_from_event_replay() { - let (wake_tx, _wake_rx) = watch::channel(0); - let read_count = Arc::new(AtomicUsize::new(0)); - let recovered_chunks = vec![ - ProcessOutputChunk { - seq: 1, - stream: ExecOutputStream::Stdout, - chunk: b"one".to_vec().into(), - }, - ProcessOutputChunk { - seq: 2, - stream: ExecOutputStream::Stdout, - chunk: b"two".to_vec().into(), - }, - ]; - let started = StartedExecProcess { - process: Arc::new(MockExecProcess { - process_id: "truncated-replay".to_string().into(), - write_response: WriteResponse { - status: WriteStatus::Accepted, - }, - read_responses: Mutex::new(VecDeque::from([ReadResponse { - chunks: recovered_chunks, - next_seq: 5, - exited: true, - exit_code: Some(0), - closed: true, - failure: None, - sandbox_denied: false, - }])), - terminate_error: None, - wake_tx, - // A bounded event replay can begin after earlier events were evicted. - events: vec![ExecProcessEvent::Output(ProcessOutputChunk { - seq: 2, - stream: ExecOutputStream::Stdout, - chunk: b"two".to_vec().into(), - })], - read_count: Arc::clone(&read_count), - }), - }; - - let process = UnifiedExecProcess::from_exec_server_started(started) - .await - .expect("missing replay events should recover through process/read"); - - let output_handles = process.output_handles(); - assert_eq!( - output_handles.output_buffer.lock().await.snapshot_chunks(), - vec![b"one".to_vec(), b"two".to_vec()] - ); - assert_eq!(process.exit_code(), Some(0)); - assert_eq!(read_count.load(Ordering::Relaxed), 1); -} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 3d4a191f1229..3e2746337055 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -126,6 +126,7 @@ mod tools; mod truncation; mod turn_state; mod unified_exec; +mod unified_exec_process_events; #[cfg(unix)] mod unified_exec_zsh_fork_approvals; mod unstable_features_warning; diff --git a/codex-rs/core/tests/suite/unified_exec_process_events.rs b/codex-rs/core/tests/suite/unified_exec_process_events.rs new file mode 100644 index 000000000000..8ebd332ea26b --- /dev/null +++ b/codex-rs/core/tests/suite/unified_exec_process_events.rs @@ -0,0 +1,489 @@ +use anyhow::Context; +use anyhow::Result; +use base64::Engine; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use codex_features::Feature; +use codex_protocol::models::PermissionProfile; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::Op; +use codex_protocol::user_input::UserInput; +use core_test_support::responses::ev_assistant_message; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_function_call; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; +use core_test_support::test_codex::test_codex; +use core_test_support::test_codex::turn_permission_fields; +use futures::SinkExt; +use futures::StreamExt; +use pretty_assertions::assert_eq; +use serde_json::Value; +use serde_json::json; +use std::time::Duration; +use test_case::test_case; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::time::timeout; +use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::accept_async; +use tokio_tungstenite::tungstenite::Message; + +const CALL_ID: &str = "pushed-remote-process-events"; +const COMPLETE_OUTPUT: &str = "pushed remote output\n"; +const RECOVERED_OUTPUT: &str = "recovered missing output\n"; +const RETAINED_OUTPUT: &str = "retained output\n"; +const REPLAY_OUTPUT_EVENT_COUNT: u64 = 1024; +const REPLAY_RETAINED_OUTPUT_SEQ: u64 = 800; + +#[derive(Debug, Clone, Copy)] +enum PushedExecScenario { + Complete, + DirectDenied, + LegacyExit, + ReplayGap, +} + +async fn read_exec_server_json(websocket: &mut WebSocketStream) -> Value { + loop { + match timeout(Duration::from_secs(5), websocket.next()) + .await + .expect("websocket read should not time out") + .expect("websocket should stay open") + .expect("websocket frame should read") + { + Message::Text(text) => { + return serde_json::from_str(text.as_ref()).expect("valid JSON-RPC message"); + } + Message::Binary(bytes) => { + return serde_json::from_slice(bytes.as_ref()).expect("valid JSON-RPC message"); + } + Message::Ping(_) | Message::Pong(_) => {} + other => panic!("expected JSON-RPC message, got {other:?}"), + } + } +} + +async fn send_exec_server_json(websocket: &mut WebSocketStream, message: Value) { + websocket + .send(Message::Text(message.to_string().into())) + .await + .expect("exec-server message should send"); +} + +async fn accept_initialized_exec_server(listener: TcpListener) -> WebSocketStream { + let (stream, _) = listener.accept().await.expect("connection"); + let mut websocket = accept_async(stream).await.expect("websocket handshake"); + + let initialize = read_exec_server_json(&mut websocket).await; + assert_eq!(initialize["method"], "initialize"); + send_exec_server_json( + &mut websocket, + json!({ + "id": initialize["id"], + "result": { "sessionId": "test-session" } + }), + ) + .await; + let initialized = read_exec_server_json(&mut websocket).await; + assert_eq!(initialized["method"], "initialized"); + + websocket +} + +async fn send_environment_info(websocket: &mut WebSocketStream) { + let info = read_exec_server_json(websocket).await; + assert_eq!(info["method"], "environment/info"); + send_exec_server_json( + websocket, + json!({ + "id": info["id"], + "result": { "shell": { "name": "zsh", "path": "/bin/zsh" } } + }), + ) + .await; +} + +async fn serve_exec_with_pushed_events( + listener: TcpListener, + scenario: PushedExecScenario, +) -> usize { + let mut websocket = accept_initialized_exec_server(listener).await; + send_environment_info(&mut websocket).await; + + let process_start = loop { + let request = read_exec_server_json(&mut websocket).await; + match request["method"].as_str() { + Some("process/start") => break request, + Some("fs/getMetadata") => { + send_exec_server_json( + &mut websocket, + json!({ + "id": request["id"], + "error": { "code": -32004, "message": "not found" } + }), + ) + .await; + } + Some("fs/canonicalize") => { + send_exec_server_json( + &mut websocket, + json!({ + "id": request["id"], + "result": { "path": request["params"]["path"] } + }), + ) + .await; + } + method => panic!("unexpected exec-server request before process/start: {method:?}"), + } + }; + let process_id = process_start["params"]["processId"] + .as_str() + .expect("process/start should include processId") + .to_string(); + + let replay_output = |seq| -> &'static [u8] { + match seq { + 1 => RECOVERED_OUTPUT.as_bytes(), + REPLAY_RETAINED_OUTPUT_SEQ => RETAINED_OUTPUT.as_bytes(), + _ => b"x", + } + }; + if matches!(scenario, PushedExecScenario::ReplayGap) { + // The process replay log retains 256 events. This burst is much larger + // than both that log and the JSON-RPC event queue, so the reader must + // apply enough notifications to evict seq 1 before it can read the + // start response. The total output stays well below the server's 1 MiB + // retained-output limit, making the subsequent read genuinely able to + // recover every missing chunk. + for seq in 1..=REPLAY_OUTPUT_EVENT_COUNT { + send_exec_server_json( + &mut websocket, + json!({ + "method": "process/output", + "params": { + "processId": &process_id, + "seq": seq, + "stream": "stdout", + "chunk": BASE64_STANDARD.encode(replay_output(seq)), + } + }), + ) + .await; + } + send_exec_server_json( + &mut websocket, + json!({ + "method": "process/exited", + "params": { + "processId": &process_id, + "seq": REPLAY_OUTPUT_EVENT_COUNT + 1, + "exitCode": 0, + "sandboxDenied": false, + } + }), + ) + .await; + } + + send_exec_server_json( + &mut websocket, + json!({ + "id": process_start["id"], + "result": { "processId": &process_id } + }), + ) + .await; + + match scenario { + PushedExecScenario::Complete => { + let encoded_output = BASE64_STANDARD.encode(COMPLETE_OUTPUT); + for message in [ + json!({ + "method": "process/output", + "params": { + "processId": &process_id, + "seq": 1, + "stream": "stdout", + "chunk": encoded_output, + } + }), + json!({ + "method": "process/exited", + "params": { + "processId": &process_id, + "seq": 2, + "exitCode": 0, + "sandboxDenied": false, + } + }), + json!({ + "method": "process/closed", + "params": { "processId": &process_id, "seq": 3 } + }), + ] { + send_exec_server_json(&mut websocket, message).await; + } + } + PushedExecScenario::DirectDenied => { + send_exec_server_json( + &mut websocket, + json!({ + "method": "process/exited", + "params": { + "processId": &process_id, + "seq": 1, + "exitCode": 1, + "sandboxDenied": true, + } + }), + ) + .await; + } + PushedExecScenario::LegacyExit => { + send_exec_server_json( + &mut websocket, + json!({ + "method": "process/exited", + "params": { + "processId": &process_id, + "seq": 1, + "exitCode": 1, + } + }), + ) + .await; + } + PushedExecScenario::ReplayGap => {} + } + + let mut process_read_requests = 0; + loop { + let request = read_exec_server_json(&mut websocket).await; + match request["method"].as_str() { + Some("process/read") => { + process_read_requests += 1; + let result = match scenario { + PushedExecScenario::Complete => json!({ + "chunks": [{ + "seq": 1, + "stream": "stdout", + "chunk": BASE64_STANDARD.encode(COMPLETE_OUTPUT), + }], + "nextSeq": 4, + "exited": true, + "exitCode": 0, + "closed": true, + "failure": null, + "sandboxDenied": false, + }), + PushedExecScenario::DirectDenied => json!({ + "chunks": [], + "nextSeq": 2, + "exited": true, + "exitCode": 1, + "closed": false, + "failure": null, + "sandboxDenied": true, + }), + PushedExecScenario::LegacyExit => json!({ + "chunks": [], + "nextSeq": 3, + "exited": true, + "exitCode": 1, + "closed": true, + "failure": null, + "sandboxDenied": true, + }), + PushedExecScenario::ReplayGap => { + let chunks = (1..=REPLAY_OUTPUT_EVENT_COUNT) + .map(|seq| { + json!({ + "seq": seq, + "stream": "stdout", + "chunk": BASE64_STANDARD.encode(replay_output(seq)), + }) + }) + .collect::>(); + json!({ + "chunks": chunks, + "nextSeq": REPLAY_OUTPUT_EVENT_COUNT + 2, + "exited": true, + "exitCode": 0, + "closed": false, + "failure": null, + "sandboxDenied": false, + }) + } + }; + send_exec_server_json( + &mut websocket, + json!({ + "id": request["id"], + "result": result, + }), + ) + .await; + if matches!(scenario, PushedExecScenario::ReplayGap) && process_read_requests == 1 { + send_exec_server_json( + &mut websocket, + json!({ + "method": "process/closed", + "params": { + "processId": &process_id, + "seq": REPLAY_OUTPUT_EVENT_COUNT + 2, + } + }), + ) + .await; + } + } + Some("process/terminate") => { + send_exec_server_json( + &mut websocket, + json!({ + "id": request["id"], + "result": { "running": false } + }), + ) + .await; + return process_read_requests; + } + method => panic!("unexpected exec-server request: {method:?}"), + } + } +} + +#[test_case(PushedExecScenario::Complete ; "complete_event_stream")] +#[test_case(PushedExecScenario::DirectDenied ; "direct_sandbox_denial")] +#[test_case(PushedExecScenario::LegacyExit ; "legacy_exit_metadata")] +#[test_case(PushedExecScenario::ReplayGap ; "truncated_event_replay")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_command_consumes_pushed_remote_process_events( + scenario: PushedExecScenario, +) -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let server = start_mock_server().await; + let response_mock = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + CALL_ID, + "exec_command", + &json!({ + "cmd": "ignored by fake exec-server", + "yield_time_ms": 1_000, + }) + .to_string(), + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-2", "done"), + ev_completed("resp-2"), + ]), + ], + ) + .await; + let exec_server_url = format!("ws://{}", listener.local_addr()?); + let exec_server = tokio::spawn(serve_exec_with_pushed_events(listener, scenario)); + let mut builder = test_codex() + .with_exec_server_url(exec_server_url) + .with_config(|config| { + config.project_doc_max_bytes = 0; + config.use_experimental_unified_exec_tool = true; + config + .features + .enable(Feature::UnifiedExec) + .expect("test config should allow feature update"); + }); + let test = timeout(Duration::from_secs(5), builder.build(&server)) + .await + .context("thread startup should connect to the fake exec-server")??; + + let (sandbox_policy, permission_profile) = + turn_permission_fields(PermissionProfile::Disabled, test.config.cwd.as_path()); + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "run a one-shot remote command".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + additional_context: Default::default(), + thread_settings: codex_protocol::protocol::ThreadSettingsOverrides { + approval_policy: Some(AskForApproval::Never), + sandbox_policy: Some(sandbox_policy), + permission_profile, + collaboration_mode: Some(codex_protocol::config_types::CollaborationMode { + mode: codex_protocol::config_types::ModeKind::Default, + settings: codex_protocol::config_types::Settings { + model: test.session_configured.model.clone(), + reasoning_effort: None, + developer_instructions: None, + }, + }), + ..Default::default() + }, + }) + .await?; + let mut saw_exec_command_begin = false; + loop { + let event = timeout(Duration::from_secs(5), test.codex.next_event()) + .await + .context("turn should complete")?? + .msg; + match event { + EventMsg::ExecCommandBegin(event) if event.call_id == CALL_ID => { + saw_exec_command_begin = true; + } + EventMsg::TurnComplete(_) => break, + _ => {} + } + } + let process_read_requests = timeout(Duration::from_secs(5), exec_server) + .await + .context("fake exec-server should observe process cleanup")??; + let request = response_mock + .last_request() + .context("model should receive the exec_command output")?; + let (output, success) = request + .function_call_output_content_and_success(CALL_ID) + .context("exec_command output should be model visible")?; + let output = output.context("exec_command output should contain text")?; + match scenario { + PushedExecScenario::Complete => { + assert_ne!(success, Some(false)); + assert!(saw_exec_command_begin); + assert!(output.contains("Process exited with code 0")); + assert!(output.contains(COMPLETE_OUTPUT)); + assert_eq!(process_read_requests, 0, "unexpected compatibility read"); + } + PushedExecScenario::DirectDenied => { + assert!(!saw_exec_command_begin); + assert!(output.contains("Process exited with code 1")); + assert_eq!(process_read_requests, 0, "unexpected compatibility read"); + } + PushedExecScenario::LegacyExit => { + assert!(!saw_exec_command_begin); + assert!(output.contains("Process exited with code 1")); + assert_eq!(process_read_requests, 1, "expected compatibility read"); + } + PushedExecScenario::ReplayGap => { + assert_ne!(success, Some(false)); + assert!(saw_exec_command_begin); + assert_eq!(output.matches(RECOVERED_OUTPUT).count(), 1); + assert_eq!(output.matches(RETAINED_OUTPUT).count(), 1); + assert_eq!(process_read_requests, 1, "expected replay recovery read"); + } + } + + Ok(()) +} diff --git a/codex-rs/exec-server/src/process.rs b/codex-rs/exec-server/src/process.rs index 54f57c2168a9..afec44b8188a 100644 --- a/codex-rs/exec-server/src/process.rs +++ b/codex-rs/exec-server/src/process.rs @@ -156,16 +156,6 @@ impl ExecProcessEventReceiver { } } - #[doc(hidden)] - pub fn from_events(events: Vec) -> Self { - let (live_tx, live_rx) = broadcast::channel(1); - Self { - replay: events.into(), - live_rx, - _keepalive: Some(live_tx), - } - } - /// Returns the next replayed or live event. /// /// `Lagged` means this receiver fell behind the bounded live channel. The From 680f1cb11e95c9eeef20e4d32569206f354a94e5 Mon Sep 17 00:00:00 2001 From: Richard Lee Date: Fri, 26 Jun 2026 18:39:51 -0500 Subject: [PATCH 3/3] Fix thread history test initializer --- codex-rs/core/src/session/tests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 46f7ca6e60c0..d6336acd57a8 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -7016,6 +7016,7 @@ async fn submission_loop_channel_close_runs_full_thread_teardown() { dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: Default::default(), initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { cwd: Some(config.cwd.to_path_buf()),