diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 46f7ca6e60c0..d6336acd57a8 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -7016,6 +7016,7 @@ async fn submission_loop_channel_close_runs_full_thread_teardown() { dynamic_tools: Vec::new(), selected_capability_roots: Vec::new(), multi_agent_version: None, + history_mode: Default::default(), initial_window_id: Uuid::now_v7().to_string(), metadata: ThreadPersistenceMetadata { cwd: Some(config.cwd.to_path_buf()), diff --git a/codex-rs/core/src/unified_exec/process.rs b/codex-rs/core/src/unified_exec/process.rs index 5f785adfa75a..7bedfd43e322 100644 --- a/codex-rs/core/src/unified_exec/process.rs +++ b/codex-rs/core/src/unified_exec/process.rs @@ -14,6 +14,7 @@ use tokio_util::sync::CancellationToken; use crate::exec::is_likely_sandbox_denied; use codex_exec_server::ExecProcess; +use codex_exec_server::ExecProcessEvent; use codex_exec_server::ProcessSignal as ExecServerProcessSignal; use codex_exec_server::ReadResponse as ExecReadResponse; use codex_exec_server::StartedExecProcess; @@ -425,84 +426,151 @@ impl UnifiedExecProcess { cancellation_token, } = output_handles; let process = started.process; - let mut wake_rx = process.subscribe_wake(); + let mut events = process.subscribe_events(); tokio::spawn(async move { - let mut after_seq = None; + let mut last_seq: u64 = 0; loop { - match process - .read(after_seq, /*max_bytes*/ None, /*wait_ms*/ Some(0)) - .await + let event = match events.recv().await { + Ok(event) => Some(event), + Err(broadcast::error::RecvError::Lagged(_)) => None, + Err(broadcast::error::RecvError::Closed) => { + let state = state_tx.borrow().clone(); + let _ = state_tx.send_replace( + state.failed("exec-server process event stream closed".to_string()), + ); + output_closed.store(true, Ordering::Release); + output_closed_notify.notify_waiters(); + cancellation_token.cancel(); + break; + } + }; + let event_seq = event.as_ref().and_then(|event| match event { + ExecProcessEvent::Output(chunk) => Some(chunk.seq), + ExecProcessEvent::Exited { seq, .. } | ExecProcessEvent::Closed { seq } => { + Some(*seq) + } + ExecProcessEvent::Failed(_) => None, + }); + let missing_sandbox_denial = matches!( + event.as_ref(), + Some(ExecProcessEvent::Exited { + sandbox_denied: None, + .. + }) + ); + if event.is_none() + || event_seq.is_some_and(|seq| seq > last_seq.saturating_add(1)) + || missing_sandbox_denial { - Ok(response) => { - let ExecReadResponse { - chunks, - next_seq, - exited, - exit_code, - closed, - failure, - sandbox_denied, - } = response; - - for chunk in chunks { - let bytes = chunk.chunk.into_inner(); - let mut guard = output_buffer.lock().await; - guard.push_chunk(bytes.clone()); - drop(guard); - let _ = output_tx.send(bytes); - output_notify.notify_waiters(); - } - - if let Some(message) = failure { + let response = match process + .read( + Some(last_seq), + /*max_bytes*/ None, + /*wait_ms*/ Some(0), + ) + .await + { + Ok(response) => response, + Err(err) => { let state = state_tx.borrow().clone(); - let _ = state_tx.send_replace(state.failed(message)); + let _ = state_tx.send_replace(state.failed(err.to_string())); output_closed.store(true, Ordering::Release); output_closed_notify.notify_waiters(); cancellation_token.cancel(); break; } + }; + let ExecReadResponse { + chunks, + next_seq, + exited, + exit_code, + closed, + failure, + sandbox_denied, + } = response; + for chunk in chunks.into_iter().filter(|chunk| chunk.seq > last_seq) { + let bytes = chunk.chunk.into_inner(); + let mut guard = output_buffer.lock().await; + guard.push_chunk(bytes.clone()); + drop(guard); + let _ = output_tx.send(bytes); + output_notify.notify_waiters(); + } + last_seq = last_seq.max(next_seq.saturating_sub(1)); + if let Some(message) = failure { + let state = state_tx.borrow().clone(); + let _ = state_tx.send_replace(state.failed(message)); + output_closed.store(true, Ordering::Release); + output_closed_notify.notify_waiters(); + cancellation_token.cancel(); + break; + } + if sandbox_denied || exited { + let mut state = state_tx.borrow().clone(); + state.sandbox_denied |= sandbox_denied; + let _ = state_tx.send_replace(if exited { + state.exited(exit_code) + } else { + state + }); + } + if closed { + output_closed.store(true, Ordering::Release); + output_closed_notify.notify_waiters(); + cancellation_token.cancel(); + break; + } + continue; + } - if sandbox_denied { - let mut state = state_tx.borrow().clone(); - state.sandbox_denied = true; - let _ = state_tx.send_replace(state); - } - - if exited { - let state = state_tx.borrow().clone(); - let _ = state_tx.send_replace(state.exited(exit_code)); + let Some(event) = event else { + continue; + }; + match event { + ExecProcessEvent::Output(chunk) => { + if chunk.seq <= last_seq { + continue; } - - if closed { - output_closed.store(true, Ordering::Release); - output_closed_notify.notify_waiters(); - cancellation_token.cancel(); + last_seq = chunk.seq; + let bytes = chunk.chunk.into_inner(); + let mut guard = output_buffer.lock().await; + guard.push_chunk(bytes.clone()); + drop(guard); + let _ = output_tx.send(bytes); + output_notify.notify_waiters(); + } + ExecProcessEvent::Exited { + seq, + exit_code, + sandbox_denied, + } => { + if seq <= last_seq { + continue; } - - after_seq = next_seq.checked_sub(1); - if output_closed.load(Ordering::Acquire) { - break; + last_seq = seq; + let mut state = state_tx.borrow().clone(); + state.sandbox_denied |= sandbox_denied.unwrap_or(false); + let _ = state_tx.send_replace(state.exited(Some(exit_code))); + } + ExecProcessEvent::Closed { seq } => { + if seq <= last_seq { + continue; } + output_closed.store(true, Ordering::Release); + output_closed_notify.notify_waiters(); + cancellation_token.cancel(); + break; } - Err(err) => { + ExecProcessEvent::Failed(message) => { let state = state_tx.borrow().clone(); - let _ = state_tx.send_replace(state.failed(err.to_string())); + let _ = state_tx.send_replace(state.failed(message)); output_closed.store(true, Ordering::Release); output_closed_notify.notify_waiters(); cancellation_token.cancel(); break; } } - - if wake_rx.changed().await.is_err() { - let state = state_tx.borrow().clone(); - let _ = state_tx - .send_replace(state.failed("exec-server wake channel closed".to_string())); - output_closed.store(true, Ordering::Release); - output_closed_notify.notify_waiters(); - cancellation_token.cancel(); - break; - } } }) } diff --git a/codex-rs/core/src/unified_exec/process_tests.rs b/codex-rs/core/src/unified_exec/process_tests.rs index db64d461ec66..af0cb627413c 100644 --- a/codex-rs/core/src/unified_exec/process_tests.rs +++ b/codex-rs/core/src/unified_exec/process_tests.rs @@ -15,7 +15,6 @@ use std::collections::VecDeque; use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::watch; -use tokio::time::Duration; struct MockExecProcess { process_id: ProcessId, @@ -173,39 +172,3 @@ async fn remote_terminate_confirmed_updates_state_on_success_only() { assert!(process.has_exited()); } - -#[tokio::test] -async fn remote_process_waits_for_early_exit_event() { - let (wake_tx, _wake_rx) = watch::channel(0); - let started = StartedExecProcess { - process: Arc::new(MockExecProcess { - process_id: "test-process".to_string().into(), - write_response: WriteResponse { - status: WriteStatus::Accepted, - }, - read_responses: Mutex::new(VecDeque::from([ReadResponse { - chunks: Vec::new(), - next_seq: 2, - exited: true, - exit_code: Some(17), - closed: true, - failure: None, - sandbox_denied: false, - }])), - terminate_error: None, - wake_tx: wake_tx.clone(), - }), - }; - - tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(10)).await; - let _ = wake_tx.send(1); - }); - - let process = UnifiedExecProcess::from_exec_server_started(started) - .await - .expect("remote process should observe early exit"); - - assert!(process.has_exited()); - assert_eq!(process.exit_code(), Some(17)); -} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 3d4a191f1229..3e2746337055 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -126,6 +126,7 @@ mod tools; mod truncation; mod turn_state; mod unified_exec; +mod unified_exec_process_events; #[cfg(unix)] mod unified_exec_zsh_fork_approvals; mod unstable_features_warning; diff --git a/codex-rs/core/tests/suite/unified_exec_process_events.rs b/codex-rs/core/tests/suite/unified_exec_process_events.rs new file mode 100644 index 000000000000..8ebd332ea26b --- /dev/null +++ b/codex-rs/core/tests/suite/unified_exec_process_events.rs @@ -0,0 +1,489 @@ +use anyhow::Context; +use anyhow::Result; +use base64::Engine; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use codex_features::Feature; +use codex_protocol::models::PermissionProfile; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::Op; +use codex_protocol::user_input::UserInput; +use core_test_support::responses::ev_assistant_message; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_function_call; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; +use core_test_support::test_codex::test_codex; +use core_test_support::test_codex::turn_permission_fields; +use futures::SinkExt; +use futures::StreamExt; +use pretty_assertions::assert_eq; +use serde_json::Value; +use serde_json::json; +use std::time::Duration; +use test_case::test_case; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::time::timeout; +use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::accept_async; +use tokio_tungstenite::tungstenite::Message; + +const CALL_ID: &str = "pushed-remote-process-events"; +const COMPLETE_OUTPUT: &str = "pushed remote output\n"; +const RECOVERED_OUTPUT: &str = "recovered missing output\n"; +const RETAINED_OUTPUT: &str = "retained output\n"; +const REPLAY_OUTPUT_EVENT_COUNT: u64 = 1024; +const REPLAY_RETAINED_OUTPUT_SEQ: u64 = 800; + +#[derive(Debug, Clone, Copy)] +enum PushedExecScenario { + Complete, + DirectDenied, + LegacyExit, + ReplayGap, +} + +async fn read_exec_server_json(websocket: &mut WebSocketStream) -> Value { + loop { + match timeout(Duration::from_secs(5), websocket.next()) + .await + .expect("websocket read should not time out") + .expect("websocket should stay open") + .expect("websocket frame should read") + { + Message::Text(text) => { + return serde_json::from_str(text.as_ref()).expect("valid JSON-RPC message"); + } + Message::Binary(bytes) => { + return serde_json::from_slice(bytes.as_ref()).expect("valid JSON-RPC message"); + } + Message::Ping(_) | Message::Pong(_) => {} + other => panic!("expected JSON-RPC message, got {other:?}"), + } + } +} + +async fn send_exec_server_json(websocket: &mut WebSocketStream, message: Value) { + websocket + .send(Message::Text(message.to_string().into())) + .await + .expect("exec-server message should send"); +} + +async fn accept_initialized_exec_server(listener: TcpListener) -> WebSocketStream { + let (stream, _) = listener.accept().await.expect("connection"); + let mut websocket = accept_async(stream).await.expect("websocket handshake"); + + let initialize = read_exec_server_json(&mut websocket).await; + assert_eq!(initialize["method"], "initialize"); + send_exec_server_json( + &mut websocket, + json!({ + "id": initialize["id"], + "result": { "sessionId": "test-session" } + }), + ) + .await; + let initialized = read_exec_server_json(&mut websocket).await; + assert_eq!(initialized["method"], "initialized"); + + websocket +} + +async fn send_environment_info(websocket: &mut WebSocketStream) { + let info = read_exec_server_json(websocket).await; + assert_eq!(info["method"], "environment/info"); + send_exec_server_json( + websocket, + json!({ + "id": info["id"], + "result": { "shell": { "name": "zsh", "path": "/bin/zsh" } } + }), + ) + .await; +} + +async fn serve_exec_with_pushed_events( + listener: TcpListener, + scenario: PushedExecScenario, +) -> usize { + let mut websocket = accept_initialized_exec_server(listener).await; + send_environment_info(&mut websocket).await; + + let process_start = loop { + let request = read_exec_server_json(&mut websocket).await; + match request["method"].as_str() { + Some("process/start") => break request, + Some("fs/getMetadata") => { + send_exec_server_json( + &mut websocket, + json!({ + "id": request["id"], + "error": { "code": -32004, "message": "not found" } + }), + ) + .await; + } + Some("fs/canonicalize") => { + send_exec_server_json( + &mut websocket, + json!({ + "id": request["id"], + "result": { "path": request["params"]["path"] } + }), + ) + .await; + } + method => panic!("unexpected exec-server request before process/start: {method:?}"), + } + }; + let process_id = process_start["params"]["processId"] + .as_str() + .expect("process/start should include processId") + .to_string(); + + let replay_output = |seq| -> &'static [u8] { + match seq { + 1 => RECOVERED_OUTPUT.as_bytes(), + REPLAY_RETAINED_OUTPUT_SEQ => RETAINED_OUTPUT.as_bytes(), + _ => b"x", + } + }; + if matches!(scenario, PushedExecScenario::ReplayGap) { + // The process replay log retains 256 events. This burst is much larger + // than both that log and the JSON-RPC event queue, so the reader must + // apply enough notifications to evict seq 1 before it can read the + // start response. The total output stays well below the server's 1 MiB + // retained-output limit, making the subsequent read genuinely able to + // recover every missing chunk. + for seq in 1..=REPLAY_OUTPUT_EVENT_COUNT { + send_exec_server_json( + &mut websocket, + json!({ + "method": "process/output", + "params": { + "processId": &process_id, + "seq": seq, + "stream": "stdout", + "chunk": BASE64_STANDARD.encode(replay_output(seq)), + } + }), + ) + .await; + } + send_exec_server_json( + &mut websocket, + json!({ + "method": "process/exited", + "params": { + "processId": &process_id, + "seq": REPLAY_OUTPUT_EVENT_COUNT + 1, + "exitCode": 0, + "sandboxDenied": false, + } + }), + ) + .await; + } + + send_exec_server_json( + &mut websocket, + json!({ + "id": process_start["id"], + "result": { "processId": &process_id } + }), + ) + .await; + + match scenario { + PushedExecScenario::Complete => { + let encoded_output = BASE64_STANDARD.encode(COMPLETE_OUTPUT); + for message in [ + json!({ + "method": "process/output", + "params": { + "processId": &process_id, + "seq": 1, + "stream": "stdout", + "chunk": encoded_output, + } + }), + json!({ + "method": "process/exited", + "params": { + "processId": &process_id, + "seq": 2, + "exitCode": 0, + "sandboxDenied": false, + } + }), + json!({ + "method": "process/closed", + "params": { "processId": &process_id, "seq": 3 } + }), + ] { + send_exec_server_json(&mut websocket, message).await; + } + } + PushedExecScenario::DirectDenied => { + send_exec_server_json( + &mut websocket, + json!({ + "method": "process/exited", + "params": { + "processId": &process_id, + "seq": 1, + "exitCode": 1, + "sandboxDenied": true, + } + }), + ) + .await; + } + PushedExecScenario::LegacyExit => { + send_exec_server_json( + &mut websocket, + json!({ + "method": "process/exited", + "params": { + "processId": &process_id, + "seq": 1, + "exitCode": 1, + } + }), + ) + .await; + } + PushedExecScenario::ReplayGap => {} + } + + let mut process_read_requests = 0; + loop { + let request = read_exec_server_json(&mut websocket).await; + match request["method"].as_str() { + Some("process/read") => { + process_read_requests += 1; + let result = match scenario { + PushedExecScenario::Complete => json!({ + "chunks": [{ + "seq": 1, + "stream": "stdout", + "chunk": BASE64_STANDARD.encode(COMPLETE_OUTPUT), + }], + "nextSeq": 4, + "exited": true, + "exitCode": 0, + "closed": true, + "failure": null, + "sandboxDenied": false, + }), + PushedExecScenario::DirectDenied => json!({ + "chunks": [], + "nextSeq": 2, + "exited": true, + "exitCode": 1, + "closed": false, + "failure": null, + "sandboxDenied": true, + }), + PushedExecScenario::LegacyExit => json!({ + "chunks": [], + "nextSeq": 3, + "exited": true, + "exitCode": 1, + "closed": true, + "failure": null, + "sandboxDenied": true, + }), + PushedExecScenario::ReplayGap => { + let chunks = (1..=REPLAY_OUTPUT_EVENT_COUNT) + .map(|seq| { + json!({ + "seq": seq, + "stream": "stdout", + "chunk": BASE64_STANDARD.encode(replay_output(seq)), + }) + }) + .collect::>(); + json!({ + "chunks": chunks, + "nextSeq": REPLAY_OUTPUT_EVENT_COUNT + 2, + "exited": true, + "exitCode": 0, + "closed": false, + "failure": null, + "sandboxDenied": false, + }) + } + }; + send_exec_server_json( + &mut websocket, + json!({ + "id": request["id"], + "result": result, + }), + ) + .await; + if matches!(scenario, PushedExecScenario::ReplayGap) && process_read_requests == 1 { + send_exec_server_json( + &mut websocket, + json!({ + "method": "process/closed", + "params": { + "processId": &process_id, + "seq": REPLAY_OUTPUT_EVENT_COUNT + 2, + } + }), + ) + .await; + } + } + Some("process/terminate") => { + send_exec_server_json( + &mut websocket, + json!({ + "id": request["id"], + "result": { "running": false } + }), + ) + .await; + return process_read_requests; + } + method => panic!("unexpected exec-server request: {method:?}"), + } + } +} + +#[test_case(PushedExecScenario::Complete ; "complete_event_stream")] +#[test_case(PushedExecScenario::DirectDenied ; "direct_sandbox_denial")] +#[test_case(PushedExecScenario::LegacyExit ; "legacy_exit_metadata")] +#[test_case(PushedExecScenario::ReplayGap ; "truncated_event_replay")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_command_consumes_pushed_remote_process_events( + scenario: PushedExecScenario, +) -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let server = start_mock_server().await; + let response_mock = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_function_call( + CALL_ID, + "exec_command", + &json!({ + "cmd": "ignored by fake exec-server", + "yield_time_ms": 1_000, + }) + .to_string(), + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-2", "done"), + ev_completed("resp-2"), + ]), + ], + ) + .await; + let exec_server_url = format!("ws://{}", listener.local_addr()?); + let exec_server = tokio::spawn(serve_exec_with_pushed_events(listener, scenario)); + let mut builder = test_codex() + .with_exec_server_url(exec_server_url) + .with_config(|config| { + config.project_doc_max_bytes = 0; + config.use_experimental_unified_exec_tool = true; + config + .features + .enable(Feature::UnifiedExec) + .expect("test config should allow feature update"); + }); + let test = timeout(Duration::from_secs(5), builder.build(&server)) + .await + .context("thread startup should connect to the fake exec-server")??; + + let (sandbox_policy, permission_profile) = + turn_permission_fields(PermissionProfile::Disabled, test.config.cwd.as_path()); + test.codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "run a one-shot remote command".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + responsesapi_client_metadata: None, + additional_context: Default::default(), + thread_settings: codex_protocol::protocol::ThreadSettingsOverrides { + approval_policy: Some(AskForApproval::Never), + sandbox_policy: Some(sandbox_policy), + permission_profile, + collaboration_mode: Some(codex_protocol::config_types::CollaborationMode { + mode: codex_protocol::config_types::ModeKind::Default, + settings: codex_protocol::config_types::Settings { + model: test.session_configured.model.clone(), + reasoning_effort: None, + developer_instructions: None, + }, + }), + ..Default::default() + }, + }) + .await?; + let mut saw_exec_command_begin = false; + loop { + let event = timeout(Duration::from_secs(5), test.codex.next_event()) + .await + .context("turn should complete")?? + .msg; + match event { + EventMsg::ExecCommandBegin(event) if event.call_id == CALL_ID => { + saw_exec_command_begin = true; + } + EventMsg::TurnComplete(_) => break, + _ => {} + } + } + let process_read_requests = timeout(Duration::from_secs(5), exec_server) + .await + .context("fake exec-server should observe process cleanup")??; + let request = response_mock + .last_request() + .context("model should receive the exec_command output")?; + let (output, success) = request + .function_call_output_content_and_success(CALL_ID) + .context("exec_command output should be model visible")?; + let output = output.context("exec_command output should contain text")?; + match scenario { + PushedExecScenario::Complete => { + assert_ne!(success, Some(false)); + assert!(saw_exec_command_begin); + assert!(output.contains("Process exited with code 0")); + assert!(output.contains(COMPLETE_OUTPUT)); + assert_eq!(process_read_requests, 0, "unexpected compatibility read"); + } + PushedExecScenario::DirectDenied => { + assert!(!saw_exec_command_begin); + assert!(output.contains("Process exited with code 1")); + assert_eq!(process_read_requests, 0, "unexpected compatibility read"); + } + PushedExecScenario::LegacyExit => { + assert!(!saw_exec_command_begin); + assert!(output.contains("Process exited with code 1")); + assert_eq!(process_read_requests, 1, "expected compatibility read"); + } + PushedExecScenario::ReplayGap => { + assert_ne!(success, Some(false)); + assert!(saw_exec_command_begin); + assert_eq!(output.matches(RECOVERED_OUTPUT).count(), 1); + assert_eq!(output.matches(RETAINED_OUTPUT).count(), 1); + assert_eq!(process_read_requests, 1, "expected replay recovery read"); + } + } + + Ok(()) +} diff --git a/codex-rs/exec-server-protocol/src/protocol.rs b/codex-rs/exec-server-protocol/src/protocol.rs index 491fc8cf97ba..a265aaed0c3a 100644 --- a/codex-rs/exec-server-protocol/src/protocol.rs +++ b/codex-rs/exec-server-protocol/src/protocol.rs @@ -527,6 +527,8 @@ pub struct ExecExitedNotification { pub process_id: ProcessId, pub seq: u64, pub exit_code: i32, + #[serde(default)] + pub sandbox_denied: Option, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -564,6 +566,7 @@ mod base64_bytes { #[cfg(test)] mod tests { use super::EnvironmentInfo; + use super::ExecExitedNotification; use super::ExecParams; use super::FsReadFileParams; use super::HttpRequestParams; @@ -708,4 +711,16 @@ mod tests { ("req-explicit-timeout", Some(1234)) ); } + + #[test] + fn exited_notification_accepts_legacy_payload_without_sandbox_denied() { + let notification: ExecExitedNotification = serde_json::from_value(serde_json::json!({ + "processId": "proc-1", + "seq": 3, + "exitCode": 1, + })) + .expect("legacy exited notification should deserialize"); + + assert_eq!(notification.sandbox_denied, None); + } } diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index b1f927918660..a250ba588667 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -321,10 +321,15 @@ Params: { "processId": "proc-1", "seq": 2, - "exitCode": 0 + "exitCode": 0, + "sandboxDenied": false } ``` +`sandboxDenied` lets streaming clients preserve executor-side sandbox denial +detection without issuing a final `process/read` request. Clients recover it +with `process/read` when an older server omits the field. + ### `process/closed` Notification emitted after process output is closed and the process handle is @@ -334,7 +339,8 @@ Params: ```json { - "processId": "proc-1" + "processId": "proc-1", + "seq": 3 } ``` @@ -431,6 +437,6 @@ Terminate it: ```json {"id":4,"method":"process/terminate","params":{"processId":"proc-1"}} {"id":4,"result":{"running":true}} -{"method":"process/exited","params":{"processId":"proc-1","seq":3,"exitCode":0}} -{"method":"process/closed","params":{"processId":"proc-1"}} +{"method":"process/exited","params":{"processId":"proc-1","seq":3,"exitCode":0,"sandboxDenied":false}} +{"method":"process/closed","params":{"processId":"proc-1","seq":4}} ``` diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index e2429d8f1440..e5f813cc4e22 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -1161,6 +1161,7 @@ async fn handle_server_notification( let published_closed = session.publish_ordered_event(ExecProcessEvent::Exited { seq: params.seq, exit_code: params.exit_code, + sandbox_denied: params.sandbox_denied, }); if published_closed { inner.remove_session_if(¶ms.process_id, &session); @@ -1737,6 +1738,7 @@ mod tests { process_id: process_id.clone(), seq: 3, exit_code: 0, + sandbox_denied: Some(true), }) .expect("exit notification should serialize"), ), @@ -1786,6 +1788,7 @@ mod tests { ExecProcessEvent::Exited { seq: 3, exit_code: 0, + sandbox_denied: Some(true), }, ExecProcessEvent::Closed { seq: 4 }, ] @@ -2391,6 +2394,7 @@ mod tests { process_id: quiet_process_id, seq: 1, exit_code: 17, + sandbox_denied: Some(false), }) .expect("exit notification should serialize"), ), diff --git a/codex-rs/exec-server/src/client_recovery.rs b/codex-rs/exec-server/src/client_recovery.rs index 61cee009cdd5..252948cfc1da 100644 --- a/codex-rs/exec-server/src/client_recovery.rs +++ b/codex-rs/exec-server/src/client_recovery.rs @@ -58,7 +58,7 @@ impl SessionState { exit_code, closed, failure, - sandbox_denied: _, + sandbox_denied, } = response; if let Some(message) = failure { return Err(ExecServerError::Protocol(format!( @@ -102,11 +102,21 @@ impl SessionState { } } - let exit_known = ordered_events.exit_published - || ordered_events - .pending - .range(..=target_seq) - .any(|(_, event)| matches!(event, ExecProcessEvent::Exited { .. })); + let pending_exit = ordered_events.pending.range_mut(..=target_seq).find_map( + |(_, event)| match event { + ExecProcessEvent::Exited { + sandbox_denied: pending_sandbox_denied, + .. + } => Some(pending_sandbox_denied), + _ => None, + }, + ); + let exit_pending = pending_exit.is_some(); + if let Some(pending_sandbox_denied) = pending_exit { + *pending_sandbox_denied = + Some(pending_sandbox_denied.unwrap_or(false) || sandbox_denied); + } + let exit_known = ordered_events.exit_published || exit_pending; let event_count = target_seq - ordered_events.last_published_seq; let retained_count = ordered_events .pending @@ -123,9 +133,14 @@ impl SessionState { "recovering exited process did not include its exit code".to_string(), ) })?; - ordered_events - .pending - .insert(seq, ExecProcessEvent::Exited { seq, exit_code }); + ordered_events.pending.insert( + seq, + ExecProcessEvent::Exited { + seq, + exit_code, + sandbox_denied: Some(sandbox_denied), + }, + ); } else if missing_count != 0 { return Err(recovery_gap_error(target_seq)); } diff --git a/codex-rs/exec-server/src/client_recovery_tests.rs b/codex-rs/exec-server/src/client_recovery_tests.rs index b54b80e3c5b6..d2126fd70b42 100644 --- a/codex-rs/exec-server/src/client_recovery_tests.rs +++ b/codex-rs/exec-server/src/client_recovery_tests.rs @@ -1,6 +1,8 @@ use std::time::Duration; use super::*; +use crate::protocol::ExecOutputStream; +use crate::protocol::ProcessOutputChunk; fn registry_error(status: reqwest::StatusCode, code: Option<&str>) -> ExecServerError { ExecServerError::EnvironmentRegistryHttp { @@ -54,3 +56,43 @@ fn recovery_does_not_retry_other_registry_conflicts() { assert!(!is_retryable_registry_error(&error)); assert!(!is_retryable_recovery_error(&error)); } + +#[tokio::test] +async fn recovery_adds_sandbox_denial_to_pending_exit_event() { + let state = SessionState::new(/*recoverable*/ true); + assert!(!state.publish_ordered_event(ExecProcessEvent::Exited { + seq: 2, + exit_code: 1, + sandbox_denied: None, + })); + + state + .recover_events(ReadResponse { + chunks: vec![ProcessOutputChunk { + seq: 1, + stream: ExecOutputStream::Stderr, + chunk: b"sandbox denied".to_vec().into(), + }], + next_seq: 3, + exited: true, + exit_code: Some(1), + closed: false, + failure: None, + sandbox_denied: true, + }) + .expect("recovery should publish the pending exit"); + + let mut events = state.subscribe_events(); + assert!(matches!( + events.recv().await, + Ok(ExecProcessEvent::Output(_)) + )); + assert_eq!( + events.recv().await, + Ok(ExecProcessEvent::Exited { + seq: 2, + exit_code: 1, + sandbox_denied: Some(true), + }) + ); +} diff --git a/codex-rs/exec-server/src/local_process.rs b/codex-rs/exec-server/src/local_process.rs index 7335bd1eeda1..3c488f0ad7c3 100644 --- a/codex-rs/exec-server/src/local_process.rs +++ b/codex-rs/exec-server/src/local_process.rs @@ -876,13 +876,16 @@ async fn watch_exit( process.sandbox_denied = is_likely_sandbox_denied(process.sandbox, &exec_output); } let _ = process.wake_tx.send(seq); - process - .events - .publish(ExecProcessEvent::Exited { seq, exit_code }); + process.events.publish(ExecProcessEvent::Exited { + seq, + exit_code, + sandbox_denied: Some(process.sandbox_denied), + }); Some(ExecExitedNotification { process_id: process_id.clone(), seq, exit_code, + sandbox_denied: Some(process.sandbox_denied), }) } else { None diff --git a/codex-rs/exec-server/src/process.rs b/codex-rs/exec-server/src/process.rs index b136ef4682fa..afec44b8188a 100644 --- a/codex-rs/exec-server/src/process.rs +++ b/codex-rs/exec-server/src/process.rs @@ -30,8 +30,14 @@ pub struct StartedExecProcess { #[derive(Debug, Clone, PartialEq, Eq)] pub enum ExecProcessEvent { Output(ProcessOutputChunk), - Exited { seq: u64, exit_code: i32 }, - Closed { seq: u64 }, + Exited { + seq: u64, + exit_code: i32, + sandbox_denied: Option, + }, + Closed { + seq: u64, + }, Failed(String), } @@ -125,21 +131,28 @@ impl ExecProcessEventLog { let live_rx = self.inner.live_tx.subscribe(); let replay = history.events.iter().cloned().collect(); - ExecProcessEventReceiver { replay, live_rx } + ExecProcessEventReceiver { + replay, + live_rx, + _keepalive: None, + } } } pub struct ExecProcessEventReceiver { replay: VecDeque, live_rx: broadcast::Receiver, + _keepalive: Option>, } impl ExecProcessEventReceiver { + /// Returns a receiver that remains open without yielding events. pub fn empty() -> Self { - let (_live_tx, live_rx) = broadcast::channel(1); + let (live_tx, live_rx) = broadcast::channel(1); Self { replay: VecDeque::new(), live_rx, + _keepalive: Some(live_tx), } } @@ -202,9 +215,21 @@ mod tests { use super::ExecProcessEvent; use super::ExecProcessEventLog; + use super::ExecProcessEventReceiver; use crate::protocol::ExecOutputStream; use crate::protocol::ProcessOutputChunk; + #[tokio::test] + async fn empty_event_receiver_stays_open() { + let mut events = ExecProcessEventReceiver::empty(); + + assert!( + timeout(Duration::from_millis(10), events.recv()) + .await + .is_err() + ); + } + #[tokio::test] async fn event_history_replay_is_bounded_by_retained_bytes() { let log = ExecProcessEventLog::new(/*event_capacity*/ 8, /*byte_capacity*/ 3); @@ -217,6 +242,7 @@ mod tests { log.publish(ExecProcessEvent::Exited { seq: 2, exit_code: 0, + sandbox_denied: Some(false), }); log.publish(ExecProcessEvent::Closed { seq: 3 }); @@ -238,6 +264,7 @@ mod tests { ExecProcessEvent::Exited { seq: 2, exit_code: 0, + sandbox_denied: Some(false), }, ExecProcessEvent::Closed { seq: 3 }, ] diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index 8fe5fe0be98e..73377da60b12 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -164,6 +164,7 @@ async fn collect_process_output_from_events( ExecProcessEvent::Exited { seq: _, exit_code: code, + .. } => { exit_code = Some(code); } @@ -190,7 +191,7 @@ async fn collect_process_event_snapshots( stream: chunk.stream, text: String::from_utf8_lossy(&chunk.chunk.into_inner()).into_owned(), }, - ExecProcessEvent::Exited { seq, exit_code } => { + ExecProcessEvent::Exited { seq, exit_code, .. } => { ProcessEventSnapshot::Exited { seq, exit_code } } ExecProcessEvent::Closed { seq } => ProcessEventSnapshot::Closed { seq }, @@ -792,7 +793,7 @@ async fn remote_exec_process_recovers_after_transport_disconnect() -> Result<()> last_seq = chunk.seq; output.extend_from_slice(&chunk.chunk.into_inner()); } - ExecProcessEvent::Exited { seq, exit_code } => { + ExecProcessEvent::Exited { seq, exit_code, .. } => { assert_eq!(seq, last_seq + 1); assert_eq!(exit_code, 7); last_seq = seq;