diff --git a/codex-rs/code-mode-protocol/src/session.rs b/codex-rs/code-mode-protocol/src/session.rs index 99c1fc72c48d..178a4a80afa5 100644 --- a/codex-rs/code-mode-protocol/src/session.rs +++ b/codex-rs/code-mode-protocol/src/session.rs @@ -63,6 +63,12 @@ pub trait CodeModeSessionDelegate: Send + Sync { text: String, cancellation_token: CancellationToken, ) -> NotificationFuture<'a>; + + /// Reports that a cell has reached its terminal state and will issue no more callbacks. + /// + /// Implementations must keep this non-blocking. The session does not wait for an + /// acknowledgement or retry delivery after a transport failure. + fn cell_closed(&self, cell_id: &CellId); } /// A stateful code-mode session owned by one Codex thread. diff --git a/codex-rs/code-mode/src/cell_actor/mod.rs b/codex-rs/code-mode/src/cell_actor/mod.rs index b04fc7f96984..9cbdb71f197b 100644 --- a/codex-rs/code-mode/src/cell_actor/mod.rs +++ b/codex-rs/code-mode/src/cell_actor/mod.rs @@ -341,7 +341,7 @@ async fn run_cell( ), error_text: Some("exec runtime ended unexpectedly".to_string()), }; - let rejected_event = match host + let (completion_committed, rejected_event) = match host .commit_completion( HashMap::new(), event, @@ -350,14 +350,18 @@ async fn run_cell( ) .await { - CompletionCommit::Committed => None, - CompletionCommit::Rejected(event) => Some(event), + CompletionCommit::Committed => (true, None), + CompletionCommit::Rejected(event) => (false, Some(event)), }; - match cell_state.deliver_completion( + let completion_delivery = cell_state.deliver_completion( observer .take() .map(|observer| (observer.mode, observer.response_tx)), - ) { + ); + if completion_committed { + host.terminal(); + } + match completion_delivery { CompletionDelivery::Delivered => break, CompletionDelivery::Buffered => {} CompletionDelivery::Rejected(response_tx) => { @@ -510,7 +514,7 @@ async fn run_cell( content_items: std::mem::take(&mut content_items), error_text, }; - let rejected_event = match host + let (completion_committed, rejected_event) = match host .commit_completion( stored_value_writes, event, @@ -519,14 +523,18 @@ async fn run_cell( ) .await { - CompletionCommit::Committed => None, - CompletionCommit::Rejected(event) => Some(event), + CompletionCommit::Committed => (true, None), + CompletionCommit::Rejected(event) => (false, Some(event)), }; - match cell_state.deliver_completion( + let completion_delivery = cell_state.deliver_completion( observer .take() .map(|observer| (observer.mode, observer.response_tx)), - ) { + ); + if completion_committed { + host.terminal(); + } + match completion_delivery { CompletionDelivery::Delivered => break, CompletionDelivery::Buffered => {} CompletionDelivery::Rejected(response_tx) => { diff --git a/codex-rs/code-mode/src/cell_actor/tests.rs b/codex-rs/code-mode/src/cell_actor/tests.rs index c236aabe36d8..edcca302f363 100644 --- a/codex-rs/code-mode/src/cell_actor/tests.rs +++ b/codex-rs/code-mode/src/cell_actor/tests.rs @@ -52,6 +52,8 @@ impl CellHost for TestHost { cell_state.commit_completion(event, pending_initial_yield_items, || {}) } + fn terminal(&self) {} + async fn closed(&self) {} } @@ -86,6 +88,8 @@ impl CellHost for RecordingHost { }) } + fn terminal(&self) {} + async fn closed(&self) {} } diff --git a/codex-rs/code-mode/src/cell_actor/types.rs b/codex-rs/code-mode/src/cell_actor/types.rs index 425a1e5d520f..8bb81479c336 100644 --- a/codex-rs/code-mode/src/cell_actor/types.rs +++ b/codex-rs/code-mode/src/cell_actor/types.rs @@ -89,6 +89,9 @@ pub(crate) trait CellHost: Send + Sync + 'static { cell_state: Arc, ) -> impl Future + Send; + /// Reports that execution reached a terminal state, even when its result remains buffered. + fn terminal(&self); + fn closed(&self) -> impl Future + Send; } diff --git a/codex-rs/code-mode/src/service.rs b/codex-rs/code-mode/src/service.rs index 4e0cdb717b80..0c5e6dabc6b5 100644 --- a/codex-rs/code-mode/src/service.rs +++ b/codex-rs/code-mode/src/service.rs @@ -74,6 +74,8 @@ impl CodeModeSessionDelegate for NoopCodeModeSessionDelegate { ) -> NotificationFuture<'a> { Box::pin(async { Ok(()) }) } + + fn cell_closed(&self, _cell_id: &CellId) {} } #[derive(Default)] @@ -402,6 +404,10 @@ impl runtime::SessionRuntimeDelegate for ProtocolDelegate { ) .await } + + fn cell_closed(&self, cell_id: &runtime::CellId) { + self.delegate.cell_closed(&protocol_cell_id(cell_id)); + } } fn runtime_request(request: CreateCellRequest) -> runtime::CreateCellRequest { diff --git a/codex-rs/code-mode/src/service_contract_tests.rs b/codex-rs/code-mode/src/service_contract_tests.rs index b7b55039ddd0..2db461362ee7 100644 --- a/codex-rs/code-mode/src/service_contract_tests.rs +++ b/codex-rs/code-mode/src/service_contract_tests.rs @@ -21,6 +21,11 @@ enum DelegateEvent { NotificationFinished, ToolStarted, ToolCancelled, + CellClosed(CellId), +} + +fn record_cell_closed(events_tx: &mpsc::UnboundedSender, cell_id: &CellId) { + let _ = events_tx.send(DelegateEvent::CellClosed(cell_id.clone())); } struct BlockingDelegate { @@ -101,6 +106,10 @@ impl CodeModeSessionDelegate for HeldNotificationDelegate { Ok(()) }) } + + fn cell_closed(&self, cell_id: &CellId) { + record_cell_closed(&self.events_tx, cell_id); + } } impl CodeModeSessionDelegate for ReleasableNotificationDelegate { @@ -135,6 +144,10 @@ impl CodeModeSessionDelegate for ReleasableNotificationDelegate { } }) } + + fn cell_closed(&self, cell_id: &CellId) { + record_cell_closed(&self.events_tx, cell_id); + } } impl BlockingDelegate { @@ -193,6 +206,10 @@ impl CodeModeSessionDelegate for BlockingDelegate { Err("cancelled".to_string()) }) } + + fn cell_closed(&self, cell_id: &CellId) { + record_cell_closed(&self.events_tx, cell_id); + } } fn cell_id(value: &str) -> CellId { @@ -427,6 +444,40 @@ await tools.block({}); ); } +#[tokio::test] +async fn background_completion_notifies_the_delegate_without_another_observation() { + let (delegate, mut events_rx) = BlockingDelegate::new(); + let service = CodeModeService::with_delegate(delegate); + let created_cell_id = service + .create_cell(execute_request( + r#"await new Promise(resolve => setTimeout(resolve, 100)); text("done");"#, + )) + .await + .unwrap(); + assert_eq!( + tokio::time::timeout( + Duration::from_secs(2), + service.observe(ObserveRequest { + idempotency_key: "background-completion".to_string(), + cell_id: created_cell_id.clone(), + yield_time_ms: 1, + }), + ) + .await + .expect("initial observation should yield while the cell is still running") + .unwrap(), + ObserveOutcome::Yielded { + cell_id: created_cell_id.clone(), + content_items: Vec::new(), + } + ); + + assert_eq!( + next_event(&mut events_rx).await, + DelegateEvent::CellClosed(created_cell_id) + ); +} + #[tokio::test] async fn returns_and_resumes_from_the_pending_frontier() { let (delegate, mut events_rx) = BlockingDelegate::new(); @@ -566,6 +617,10 @@ async fn termination_cancels_pending_callbacks_before_responding() { next_event(&mut events_rx).await, DelegateEvent::NotificationCancelled ); + assert_eq!( + next_event(&mut events_rx).await, + DelegateEvent::CellClosed(cell_id("1")) + ); } #[tokio::test] @@ -599,6 +654,10 @@ await tools.block({}); next_event(&mut events_rx).await, DelegateEvent::ToolCancelled ); + assert_eq!( + next_event(&mut events_rx).await, + DelegateEvent::CellClosed(cell_id("1")) + ); assert_eq!( execute_with_yield_time( @@ -641,6 +700,10 @@ async fn shutdown_cancels_notifications_while_natural_completion_is_draining() { delegate.release_notification(); assert_eq!(shutdown.await.unwrap(), Ok(())); + assert_eq!( + next_event(&mut events_rx).await, + DelegateEvent::CellClosed(cell_id("1")) + ); } #[tokio::test] @@ -687,6 +750,10 @@ async fn repeated_termination_is_rejected_while_callback_cleanup_is_pending() { content_items: Vec::new(), } ); + assert_eq!( + next_event(&mut events_rx).await, + DelegateEvent::CellClosed(cell_id("1")) + ); } #[tokio::test] @@ -730,6 +797,10 @@ async fn create_cell_returns_before_natural_completion() { next_event(&mut events_rx).await, DelegateEvent::NotificationFinished ); + assert_eq!( + next_event(&mut events_rx).await, + DelegateEvent::CellClosed(cell_id("1")) + ); } #[tokio::test] @@ -840,4 +911,8 @@ async fn natural_completion_cleans_up_callbacks_before_responding() { next_event(&mut events_rx).await, DelegateEvent::ToolCancelled ); + assert_eq!( + next_event(&mut events_rx).await, + DelegateEvent::CellClosed(cell_id("1")) + ); } diff --git a/codex-rs/code-mode/src/service_tests.rs b/codex-rs/code-mode/src/service_tests.rs index 8d5654392ea5..c71904ea3b6e 100644 --- a/codex-rs/code-mode/src/service_tests.rs +++ b/codex-rs/code-mode/src/service_tests.rs @@ -69,6 +69,8 @@ impl CodeModeSessionDelegate for ReleasableToolDelegate { ) -> NotificationFuture<'a> { Box::pin(async { Ok(()) }) } + + fn cell_closed(&self, _cell_id: &CellId) {} } #[derive(Debug, PartialEq)] @@ -83,6 +85,9 @@ enum RecordedDelegateCall { text: String, cancellation_requested: bool, }, + CellClosed { + cell_id: CellId, + }, } struct RecordingDelegate { @@ -151,6 +156,15 @@ impl CodeModeSessionDelegate for RecordingDelegate { .expect("test must provide one result per notification") }) } + + fn cell_closed(&self, cell_id: &CellId) { + self.calls + .lock() + .unwrap() + .push(RecordedDelegateCall::CellClosed { + cell_id: cell_id.clone(), + }); + } } fn execute_request(source: &str) -> CreateCellRequest { @@ -1215,6 +1229,7 @@ async fn protocol_delegate_maps_callbacks_cancellation_and_errors_field_for_fiel .await, Err("notification failed".to_string()) ); + runtime::SessionRuntimeDelegate::cell_closed(&adapter, &runtime::CellId::new("cell-d4")); assert_eq!( delegate.take_calls(), @@ -1245,6 +1260,9 @@ async fn protocol_delegate_maps_callbacks_cancellation_and_errors_field_for_fiel text: "progress".to_string(), cancellation_requested: true, }, + RecordedDelegateCall::CellClosed { + cell_id: cell_id("cell-d4"), + }, ] ); } diff --git a/codex-rs/code-mode/src/session_runtime/mod.rs b/codex-rs/code-mode/src/session_runtime/mod.rs index cf845921a9af..3a33a0ff4a19 100644 --- a/codex-rs/code-mode/src/session_runtime/mod.rs +++ b/codex-rs/code-mode/src/session_runtime/mod.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::time::Duration; @@ -273,6 +274,7 @@ impl SessionRuntime { let host = Arc::new(RuntimeCellHost { cell_id: cell_id.clone(), inner: Arc::clone(&self.inner), + terminal_notified: AtomicBool::new(false), }); let mut cells = self.inner.cells.lock().await; if self.inner.shutdown_token.is_cancelled() { @@ -380,6 +382,7 @@ impl PendingEvent { struct RuntimeCellHost { cell_id: CellId, inner: Arc>, + terminal_notified: AtomicBool, } impl CellHost for RuntimeCellHost { @@ -435,8 +438,15 @@ impl CellHost for RuntimeCellHost { }) } + fn terminal(&self) { + if !self.terminal_notified.swap(true, Ordering::AcqRel) { + self.inner.delegate.cell_closed(&self.cell_id); + } + } + async fn closed(&self) { self.inner.cells.lock().await.remove(&self.cell_id); + self.terminal(); } } diff --git a/codex-rs/code-mode/src/session_runtime/tests.rs b/codex-rs/code-mode/src/session_runtime/tests.rs index 1cc16c887eac..f05d2a6aa3ea 100644 --- a/codex-rs/code-mode/src/session_runtime/tests.rs +++ b/codex-rs/code-mode/src/session_runtime/tests.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::future::Future; use std::sync::Arc; +use std::sync::atomic::AtomicBool; use std::task::Context; use std::task::Poll; use std::task::Waker; @@ -48,6 +49,8 @@ impl SessionRuntimeDelegate for RecordingDelegate { ) -> Result<(), String> { Ok(()) } + + fn cell_closed(&self, _cell_id: &CellId) {} } impl SessionRuntimeDelegate for ImmediateToolDelegate { @@ -69,6 +72,8 @@ impl SessionRuntimeDelegate for ImmediateToolDelegate { ) -> Result<(), String> { Ok(()) } + + fn cell_closed(&self, _cell_id: &CellId) {} } impl SessionRuntimeDelegate for BlockingToolDelegate { @@ -95,6 +100,8 @@ impl SessionRuntimeDelegate for BlockingToolDelegate { ) -> Result<(), String> { Ok(()) } + + fn cell_closed(&self, _cell_id: &CellId) {} } impl SessionRuntimeDelegate for NonCooperativeToolDelegate { @@ -116,6 +123,8 @@ impl SessionRuntimeDelegate for NonCooperativeToolDelegate { ) -> Result<(), String> { Ok(()) } + + fn cell_closed(&self, _cell_id: &CellId) {} } fn tool_definition(name: &str) -> ToolDefinition { @@ -568,6 +577,7 @@ async fn termination_rejects_a_waiting_store_commit_before_the_next_cell_can_loa let host = RuntimeCellHost { cell_id: CellId::new("terminating-writer"), inner: Arc::clone(&runtime.inner), + terminal_notified: AtomicBool::new(false), }; let completion = ActorEvent::Completed { content_items: vec![OutputItem::Text { diff --git a/codex-rs/code-mode/src/session_runtime/types.rs b/codex-rs/code-mode/src/session_runtime/types.rs index 44b8aaf3635a..6250cd6c97cd 100644 --- a/codex-rs/code-mode/src/session_runtime/types.rs +++ b/codex-rs/code-mode/src/session_runtime/types.rs @@ -206,7 +206,8 @@ pub struct NestedToolCall { /// /// Implementations should forward callback cancellation tokens to downstream /// work. After cancellation begins, the runtime allows callbacks a bounded -/// grace period to finish, then aborts their local tasks. +/// grace period to finish, then aborts their local tasks. Terminal notifications +/// must be non-blocking and are not acknowledged or retried. pub trait SessionRuntimeDelegate: Send + Sync + 'static { fn invoke_tool( &self, @@ -221,6 +222,11 @@ pub trait SessionRuntimeDelegate: Send + Sync + 'static { text: String, cancellation_token: CancellationToken, ) -> impl Future> + Send; + + /// Reports that execution is terminal and will issue no more callbacks. + /// + /// The terminal result may remain buffered for a later observation. + fn cell_closed(&self, cell_id: &CellId); } /// A failure reported by a session runtime operation. diff --git a/codex-rs/core/src/tools/code_mode/delegate.rs b/codex-rs/core/src/tools/code_mode/delegate.rs index fa6fb8930008..37ff900a853f 100644 --- a/codex-rs/core/src/tools/code_mode/delegate.rs +++ b/codex-rs/core/src/tools/code_mode/delegate.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::collections::hash_map::Entry; use std::sync::Arc; use std::sync::Mutex; @@ -24,7 +25,12 @@ use crate::tools::parallel::ToolCallRuntime; pub(super) struct CodeModeDispatchBroker { dispatch_tx: async_channel::Sender, dispatch_rx: async_channel::Receiver, - dispatch_gates: Arc>>>, + dispatch_gates: Arc>>, +} + +enum DispatchGate { + Waiting(watch::Sender), + ClosedBeforeReady, } impl CodeModeDispatchBroker { @@ -38,11 +44,35 @@ impl CodeModeDispatchBroker { } pub(super) fn mark_cell_ready_for_dispatch(&self, cell_id: &CellId) { - dispatch_gate(&self.dispatch_gates, cell_id).send_replace(true); + let mut dispatch_gates = match self.dispatch_gates.lock() { + Ok(dispatch_gates) => dispatch_gates, + Err(poisoned) => poisoned.into_inner(), + }; + match dispatch_gates.entry(cell_id.clone()) { + Entry::Occupied(mut entry) => match entry.get_mut() { + DispatchGate::Waiting(ready_tx) => { + ready_tx.send_replace(true); + } + DispatchGate::ClosedBeforeReady => { + entry.remove(); + } + }, + Entry::Vacant(entry) => { + entry.insert(DispatchGate::Waiting(watch::channel(true).0)); + } + } } pub(super) fn close_cell(&self, cell_id: &CellId) { - remove_dispatch_gate(&self.dispatch_gates, cell_id); + close_dispatch_gate(&self.dispatch_gates, cell_id); + } + + pub(super) fn forget_cell(&self, cell_id: &CellId) { + let mut dispatch_gates = match self.dispatch_gates.lock() { + Ok(dispatch_gates) => dispatch_gates, + Err(poisoned) => poisoned.into_inner(), + }; + dispatch_gates.remove(cell_id); } pub(super) fn start_turn_worker( @@ -87,7 +117,7 @@ impl CodeModeDispatchBroker { { host.notify(call_id, cell_id, text).await } else { - remove_dispatch_gate(&dispatch_gates, &cell_id); + close_dispatch_gate(&dispatch_gates, &cell_id); Err("code mode notification cancelled".to_string()) }; let _ = response_tx.send(response); @@ -105,7 +135,7 @@ impl CodeModeDispatchBroker { ) .await { - remove_dispatch_gate(&dispatch_gates, &cell_id); + close_dispatch_gate(&dispatch_gates, &cell_id); continue; } let host = Arc::clone(&host); @@ -129,40 +159,53 @@ impl CodeModeDispatchBroker { } } -fn dispatch_gate( - dispatch_gates: &Mutex>>, - cell_id: &CellId, -) -> watch::Sender { - let mut dispatch_gates = match dispatch_gates.lock() { - Ok(dispatch_gates) => dispatch_gates, - Err(poisoned) => poisoned.into_inner(), - }; - dispatch_gates - .entry(cell_id.clone()) - .or_insert_with(|| watch::channel(false).0) - .clone() -} - -fn remove_dispatch_gate( - dispatch_gates: &Mutex>>, - cell_id: &CellId, -) { +fn close_dispatch_gate(dispatch_gates: &Mutex>, cell_id: &CellId) { let mut dispatch_gates = match dispatch_gates.lock() { Ok(dispatch_gates) => dispatch_gates, Err(poisoned) => poisoned.into_inner(), }; - dispatch_gates.remove(cell_id); + match dispatch_gates.entry(cell_id.clone()) { + Entry::Occupied(mut entry) => match entry.get_mut() { + DispatchGate::Waiting(ready_tx) if *ready_tx.borrow() => { + entry.remove(); + } + DispatchGate::Waiting(_) => { + entry.insert(DispatchGate::ClosedBeforeReady); + } + DispatchGate::ClosedBeforeReady => {} + }, + Entry::Vacant(entry) => { + entry.insert(DispatchGate::ClosedBeforeReady); + } + } } async fn wait_until_cell_ready_for_dispatch( - dispatch_gates: &Mutex>>, + dispatch_gates: &Mutex>, cell_id: &CellId, cancellation_token: &CancellationToken, ) -> bool { if cancellation_token.is_cancelled() { return false; } - let mut ready_rx = dispatch_gate(dispatch_gates, cell_id).subscribe(); + let mut ready_rx = { + let mut dispatch_gates = match dispatch_gates.lock() { + Ok(dispatch_gates) => dispatch_gates, + Err(poisoned) => poisoned.into_inner(), + }; + match dispatch_gates.entry(cell_id.clone()) { + Entry::Occupied(entry) => match entry.get() { + DispatchGate::Waiting(ready_tx) => ready_tx.subscribe(), + DispatchGate::ClosedBeforeReady => return false, + }, + Entry::Vacant(entry) => { + let ready_tx = watch::channel(false).0; + let ready_rx = ready_tx.subscribe(); + entry.insert(DispatchGate::Waiting(ready_tx)); + ready_rx + } + } + }; loop { if *ready_rx.borrow_and_update() { return true; @@ -238,6 +281,10 @@ impl CodeModeSessionDelegate for CodeModeDispatchBroker { } }) } + + fn cell_closed(&self, cell_id: &CellId) { + self.close_cell(cell_id); + } } enum DispatchMessage { @@ -307,3 +354,7 @@ impl CoreTurnHost { }) } } + +#[cfg(test)] +#[path = "delegate_tests.rs"] +mod tests; diff --git a/codex-rs/core/src/tools/code_mode/delegate_tests.rs b/codex-rs/core/src/tools/code_mode/delegate_tests.rs new file mode 100644 index 000000000000..4aa95d2f35c8 --- /dev/null +++ b/codex-rs/core/src/tools/code_mode/delegate_tests.rs @@ -0,0 +1,83 @@ +use std::sync::Arc; +use std::time::Duration; + +use codex_code_mode::CellId; +use codex_code_mode::CodeModeService; +use codex_code_mode::CodeModeSessionDelegate; +use codex_code_mode::CreateCellRequest; +use codex_code_mode::ObserveOutcome; +use codex_code_mode::ObserveRequest; + +use super::CodeModeDispatchBroker; + +#[test] +fn terminal_notification_removes_the_cell_dispatch_gate() { + let broker = CodeModeDispatchBroker::new(); + let cell_id = CellId::new("cell-a7".to_string()); + broker.mark_cell_ready_for_dispatch(&cell_id); + assert!(broker.dispatch_gates.lock().unwrap().contains_key(&cell_id)); + + CodeModeSessionDelegate::cell_closed(&broker, &cell_id); + + assert!(!broker.dispatch_gates.lock().unwrap().contains_key(&cell_id)); +} + +#[test] +fn terminal_notification_before_readiness_does_not_reopen_the_dispatch_gate() { + let broker = CodeModeDispatchBroker::new(); + let cell_id = CellId::new("cell-a7".to_string()); + + CodeModeSessionDelegate::cell_closed(&broker, &cell_id); + assert!(matches!( + broker.dispatch_gates.lock().unwrap().get(&cell_id), + Some(super::DispatchGate::ClosedBeforeReady) + )); + + broker.mark_cell_ready_for_dispatch(&cell_id); + + assert!(!broker.dispatch_gates.lock().unwrap().contains_key(&cell_id)); +} + +#[tokio::test] +async fn background_completion_removes_the_dispatch_gate_without_another_observation() { + let broker = Arc::new(CodeModeDispatchBroker::new()); + let service = CodeModeService::with_delegate(broker.clone()); + let cell_id = service + .create_cell(CreateCellRequest { + idempotency_key: "delegate-cleanup-cell".to_string(), + tool_call_id: "call-1".to_string(), + enabled_tools: Vec::new(), + source: concat!( + "await new Promise(resolve => setTimeout(resolve, 100));", + "text('done');", + ) + .to_string(), + }) + .await + .unwrap(); + broker.mark_cell_ready_for_dispatch(&cell_id); + + assert_eq!( + service + .observe(ObserveRequest { + idempotency_key: "delegate-cleanup-observation".to_string(), + cell_id: cell_id.clone(), + yield_time_ms: 1, + }) + .await + .unwrap(), + ObserveOutcome::Yielded { + cell_id: cell_id.clone(), + content_items: Vec::new(), + } + ); + tokio::time::timeout(Duration::from_secs(/*secs*/ 2), async { + while broker.dispatch_gates.lock().unwrap().contains_key(&cell_id) { + tokio::task::yield_now().await; + } + }) + .await + .expect("terminal notification should remove the dispatch gate"); + + service.shutdown().await.unwrap(); +} diff --git a/codex-rs/core/src/tools/code_mode/mod.rs b/codex-rs/core/src/tools/code_mode/mod.rs index a2574aaa2535..e8372ee5af79 100644 --- a/codex-rs/core/src/tools/code_mode/mod.rs +++ b/codex-rs/core/src/tools/code_mode/mod.rs @@ -164,7 +164,7 @@ impl CodeModeService { } pub(crate) fn finish_cell_dispatch(&self, cell_id: &CellId) { - self.dispatch_broker.close_cell(cell_id); + self.dispatch_broker.forget_cell(cell_id); } pub(crate) fn start_turn_worker(