Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions codex-rs/code-mode-protocol/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ pub trait CodeModeSessionDelegate: Send + Sync {
text: String,
cancellation_token: CancellationToken,
) -> NotificationFuture<'a>;

/// Reports that a cell has reached its terminal state and will issue no more callbacks.
///
/// Implementations must keep this non-blocking. The session does not wait for an
/// acknowledgement or retry delivery after a transport failure.
fn cell_closed(&self, cell_id: &CellId);
}

/// A stateful code-mode session owned by one Codex thread.
Expand Down
28 changes: 18 additions & 10 deletions codex-rs/code-mode/src/cell_actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ async fn run_cell<H: CellHost>(
),
error_text: Some("exec runtime ended unexpectedly".to_string()),
};
let rejected_event = match host
let (completion_committed, rejected_event) = match host
.commit_completion(
HashMap::new(),
event,
Expand All @@ -350,14 +350,18 @@ async fn run_cell<H: CellHost>(
)
.await
{
CompletionCommit::Committed => None,
CompletionCommit::Rejected(event) => Some(event),
CompletionCommit::Committed => (true, None),
CompletionCommit::Rejected(event) => (false, Some(event)),
};
match cell_state.deliver_completion(
let completion_delivery = cell_state.deliver_completion(
observer
.take()
.map(|observer| (observer.mode, observer.response_tx)),
) {
);
if completion_committed {
host.terminal();
}
match completion_delivery {
CompletionDelivery::Delivered => break,
CompletionDelivery::Buffered => {}
CompletionDelivery::Rejected(response_tx) => {
Expand Down Expand Up @@ -510,7 +514,7 @@ async fn run_cell<H: CellHost>(
content_items: std::mem::take(&mut content_items),
error_text,
};
let rejected_event = match host
let (completion_committed, rejected_event) = match host
.commit_completion(
stored_value_writes,
event,
Expand All @@ -519,14 +523,18 @@ async fn run_cell<H: CellHost>(
)
.await
{
CompletionCommit::Committed => None,
CompletionCommit::Rejected(event) => Some(event),
CompletionCommit::Committed => (true, None),
CompletionCommit::Rejected(event) => (false, Some(event)),
};
match cell_state.deliver_completion(
let completion_delivery = cell_state.deliver_completion(
observer
.take()
.map(|observer| (observer.mode, observer.response_tx)),
) {
);
if completion_committed {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A successful commit can still produce CompletionDelivery::Rejected: terminate() may claim CellPhase::Completed between the commit await and deliver_completion. Sound pretty racy to me

host.terminal();
}
match completion_delivery {
CompletionDelivery::Delivered => break,
CompletionDelivery::Buffered => {}
CompletionDelivery::Rejected(response_tx) => {
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/code-mode/src/cell_actor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ impl CellHost for TestHost {
cell_state.commit_completion(event, pending_initial_yield_items, || {})
}

fn terminal(&self) {}

async fn closed(&self) {}
}

Expand Down Expand Up @@ -86,6 +88,8 @@ impl CellHost for RecordingHost {
})
}

fn terminal(&self) {}

async fn closed(&self) {}
}

Expand Down
3 changes: 3 additions & 0 deletions codex-rs/code-mode/src/cell_actor/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ pub(crate) trait CellHost: Send + Sync + 'static {
cell_state: Arc<CellState>,
) -> impl Future<Output = CompletionCommit> + Send;

/// Reports that execution reached a terminal state, even when its result remains buffered.
fn terminal(&self);

fn closed(&self) -> impl Future<Output = ()> + Send;
}

Expand Down
6 changes: 6 additions & 0 deletions codex-rs/code-mode/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ impl CodeModeSessionDelegate for NoopCodeModeSessionDelegate {
) -> NotificationFuture<'a> {
Box::pin(async { Ok(()) })
}

fn cell_closed(&self, _cell_id: &CellId) {}
}

#[derive(Default)]
Expand Down Expand Up @@ -402,6 +404,10 @@ impl runtime::SessionRuntimeDelegate for ProtocolDelegate {
)
.await
}

fn cell_closed(&self, cell_id: &runtime::CellId) {
self.delegate.cell_closed(&protocol_cell_id(cell_id));
}
}

fn runtime_request(request: CreateCellRequest) -> runtime::CreateCellRequest {
Expand Down
75 changes: 75 additions & 0 deletions codex-rs/code-mode/src/service_contract_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ enum DelegateEvent {
NotificationFinished,
ToolStarted,
ToolCancelled,
CellClosed(CellId),
}

fn record_cell_closed(events_tx: &mpsc::UnboundedSender<DelegateEvent>, cell_id: &CellId) {
let _ = events_tx.send(DelegateEvent::CellClosed(cell_id.clone()));
}

struct BlockingDelegate {
Expand Down Expand Up @@ -101,6 +106,10 @@ impl CodeModeSessionDelegate for HeldNotificationDelegate {
Ok(())
})
}

fn cell_closed(&self, cell_id: &CellId) {
record_cell_closed(&self.events_tx, cell_id);
}
}

impl CodeModeSessionDelegate for ReleasableNotificationDelegate {
Expand Down Expand Up @@ -135,6 +144,10 @@ impl CodeModeSessionDelegate for ReleasableNotificationDelegate {
}
})
}

fn cell_closed(&self, cell_id: &CellId) {
record_cell_closed(&self.events_tx, cell_id);
}
}

impl BlockingDelegate {
Expand Down Expand Up @@ -193,6 +206,10 @@ impl CodeModeSessionDelegate for BlockingDelegate {
Err("cancelled".to_string())
})
}

fn cell_closed(&self, cell_id: &CellId) {
record_cell_closed(&self.events_tx, cell_id);
}
}

fn cell_id(value: &str) -> CellId {
Expand Down Expand Up @@ -427,6 +444,40 @@ await tools.block({});
);
}

#[tokio::test]
async fn background_completion_notifies_the_delegate_without_another_observation() {
let (delegate, mut events_rx) = BlockingDelegate::new();
let service = CodeModeService::with_delegate(delegate);
let created_cell_id = service
.create_cell(execute_request(
r#"await new Promise(resolve => setTimeout(resolve, 100)); text("done");"#,
))
.await
.unwrap();
assert_eq!(
tokio::time::timeout(
Duration::from_secs(2),
service.observe(ObserveRequest {
idempotency_key: "background-completion".to_string(),
cell_id: created_cell_id.clone(),
yield_time_ms: 1,
}),
)
.await
.expect("initial observation should yield while the cell is still running")
.unwrap(),
ObserveOutcome::Yielded {
cell_id: created_cell_id.clone(),
content_items: Vec::new(),
}
);

assert_eq!(
next_event(&mut events_rx).await,
DelegateEvent::CellClosed(created_cell_id)
);
}

#[tokio::test]
async fn returns_and_resumes_from_the_pending_frontier() {
let (delegate, mut events_rx) = BlockingDelegate::new();
Expand Down Expand Up @@ -566,6 +617,10 @@ async fn termination_cancels_pending_callbacks_before_responding() {
next_event(&mut events_rx).await,
DelegateEvent::NotificationCancelled
);
assert_eq!(
next_event(&mut events_rx).await,
DelegateEvent::CellClosed(cell_id("1"))
);
}

#[tokio::test]
Expand Down Expand Up @@ -599,6 +654,10 @@ await tools.block({});
next_event(&mut events_rx).await,
DelegateEvent::ToolCancelled
);
assert_eq!(
next_event(&mut events_rx).await,
DelegateEvent::CellClosed(cell_id("1"))
);

assert_eq!(
execute_with_yield_time(
Expand Down Expand Up @@ -641,6 +700,10 @@ async fn shutdown_cancels_notifications_while_natural_completion_is_draining() {
delegate.release_notification();

assert_eq!(shutdown.await.unwrap(), Ok(()));
assert_eq!(
next_event(&mut events_rx).await,
DelegateEvent::CellClosed(cell_id("1"))
);
}

#[tokio::test]
Expand Down Expand Up @@ -687,6 +750,10 @@ async fn repeated_termination_is_rejected_while_callback_cleanup_is_pending() {
content_items: Vec::new(),
}
);
assert_eq!(
next_event(&mut events_rx).await,
DelegateEvent::CellClosed(cell_id("1"))
);
}

#[tokio::test]
Expand Down Expand Up @@ -730,6 +797,10 @@ async fn create_cell_returns_before_natural_completion() {
next_event(&mut events_rx).await,
DelegateEvent::NotificationFinished
);
assert_eq!(
next_event(&mut events_rx).await,
DelegateEvent::CellClosed(cell_id("1"))
);
}

#[tokio::test]
Expand Down Expand Up @@ -840,4 +911,8 @@ async fn natural_completion_cleans_up_callbacks_before_responding() {
next_event(&mut events_rx).await,
DelegateEvent::ToolCancelled
);
assert_eq!(
next_event(&mut events_rx).await,
DelegateEvent::CellClosed(cell_id("1"))
);
}
18 changes: 18 additions & 0 deletions codex-rs/code-mode/src/service_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ impl CodeModeSessionDelegate for ReleasableToolDelegate {
) -> NotificationFuture<'a> {
Box::pin(async { Ok(()) })
}

fn cell_closed(&self, _cell_id: &CellId) {}
}

#[derive(Debug, PartialEq)]
Expand All @@ -83,6 +85,9 @@ enum RecordedDelegateCall {
text: String,
cancellation_requested: bool,
},
CellClosed {
cell_id: CellId,
},
}

struct RecordingDelegate {
Expand Down Expand Up @@ -151,6 +156,15 @@ impl CodeModeSessionDelegate for RecordingDelegate {
.expect("test must provide one result per notification")
})
}

fn cell_closed(&self, cell_id: &CellId) {
self.calls
.lock()
.unwrap()
.push(RecordedDelegateCall::CellClosed {
cell_id: cell_id.clone(),
});
}
}

fn execute_request(source: &str) -> CreateCellRequest {
Expand Down Expand Up @@ -1215,6 +1229,7 @@ async fn protocol_delegate_maps_callbacks_cancellation_and_errors_field_for_fiel
.await,
Err("notification failed".to_string())
);
runtime::SessionRuntimeDelegate::cell_closed(&adapter, &runtime::CellId::new("cell-d4"));

assert_eq!(
delegate.take_calls(),
Expand Down Expand Up @@ -1245,6 +1260,9 @@ async fn protocol_delegate_maps_callbacks_cancellation_and_errors_field_for_fiel
text: "progress".to_string(),
cancellation_requested: true,
},
RecordedDelegateCall::CellClosed {
cell_id: cell_id("cell-d4"),
},
]
);
}
Expand Down
10 changes: 10 additions & 0 deletions codex-rs/code-mode/src/session_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
Expand Down Expand Up @@ -273,6 +274,7 @@ impl<D: SessionRuntimeDelegate> SessionRuntime<D> {
let host = Arc::new(RuntimeCellHost {
cell_id: cell_id.clone(),
inner: Arc::clone(&self.inner),
terminal_notified: AtomicBool::new(false),
});
let mut cells = self.inner.cells.lock().await;
if self.inner.shutdown_token.is_cancelled() {
Expand Down Expand Up @@ -380,6 +382,7 @@ impl PendingEvent {
struct RuntimeCellHost<D: SessionRuntimeDelegate> {
cell_id: CellId,
inner: Arc<Inner<D>>,
terminal_notified: AtomicBool,
}

impl<D: SessionRuntimeDelegate> CellHost for RuntimeCellHost<D> {
Expand Down Expand Up @@ -435,8 +438,15 @@ impl<D: SessionRuntimeDelegate> CellHost for RuntimeCellHost<D> {
})
}

fn terminal(&self) {
if !self.terminal_notified.swap(true, Ordering::AcqRel) {
self.inner.delegate.cell_closed(&self.cell_id);
}
}

async fn closed(&self) {
self.inner.cells.lock().await.remove(&self.cell_id);
self.terminal();
}
}

Expand Down
Loading
Loading