From 910ba82a6045676647d8b68ba207649d2f227753 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 9 Apr 2026 22:38:40 -0700 Subject: [PATCH 1/2] Stream Realtime V2 background agent progress Co-authored-by: Codex --- .../tests/suite/v2/realtime_conversation.rs | 39 ++- codex-rs/core/src/realtime_conversation.rs | 274 ++++++++---------- 2 files changed, 160 insertions(+), 153 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 0dc88d72b4e3..f1d0ebd5dae2 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -1222,6 +1222,7 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out v2_background_agent_tool_call("call_v2", "delegate from v2"), ], vec![], + vec![], ])]), ) .await?; @@ -1249,8 +1250,15 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out requests[0] ); - let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await; + let progress = harness.sideband_outbound_request(/*request_index*/ 1).await; + assert_v2_progress_update(&progress, "delegated from v2"); + + let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await; assert_v2_function_call_output(&tool_output, "call_v2", "delegated from v2"); + assert_eq!( + function_call_output_sideband_requests(&harness.realtime_server).len(), + 1 + ); harness.shutdown().await; Ok(()) @@ -1278,6 +1286,7 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<( v2_background_agent_tool_call("call_shell", "run shell through delegated turn"), ], vec![], + vec![], ])]); let mut harness = RealtimeE2eHarness::new_with_sandbox( @@ -1329,7 +1338,10 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<( requests[1] ); - let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await; + let progress = harness.sideband_outbound_request(/*request_index*/ 1).await; + assert_v2_progress_update(&progress, "shell tool finished"); + + let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await; assert_v2_function_call_output(&tool_output, "call_shell", "shell tool finished"); assert_eq!( function_call_output_sideband_requests(&harness.realtime_server).len(), @@ -1379,6 +1391,7 @@ async fn webrtc_v2_tool_call_does_not_block_sideband_audio() -> Result<()> { }), ], vec![], + vec![], ])]), ) .await?; @@ -1405,7 +1418,10 @@ async fn webrtc_v2_tool_call_does_not_block_sideband_audio() -> Result<()> { .await?; assert_eq!(turn_completed.thread_id, harness.thread_id); - let tool_output = harness.sideband_outbound_request(/*request_index*/ 1).await; + let progress = harness.sideband_outbound_request(/*request_index*/ 1).await; + assert_v2_progress_update(&progress, "late delegated result"); + + let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await; assert_v2_function_call_output(&tool_output, "call_audio", "late delegated result"); harness.shutdown().await; @@ -1643,6 +1659,23 @@ fn assert_v2_function_call_output(request: &Value, call_id: &str, expected_outpu ); } +fn assert_v2_progress_update(request: &Value, expected_text: &str) { + assert_eq!( + request, + &json!({ + "type": "conversation.item.create", + "item": { + "type": "message", + "role": "user", + "content": [{ + "type": "input_text", + "text": format!("{expected_text}\n\nUpdate from background agent (task hasn't finished yet):") + }] + } + }) + ); +} + fn assert_v1_session_update(request: &Value) -> Result<()> { assert_eq!(request["type"].as_str(), Some("session.update")); assert_eq!(request["session"]["type"].as_str(), Some("quicksilver")); diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 4b12aa4fe260..fbd9d5974ced 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -47,7 +47,6 @@ use codex_protocol::protocol::RealtimeVoicesList; use http::HeaderMap; use http::HeaderValue; use http::header::AUTHORIZATION; -use serde_json::Value; use serde_json::json; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -65,8 +64,10 @@ const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64; const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256; const REALTIME_STARTUP_CONTEXT_TOKEN_BUDGET: usize = 5_000; const DEFAULT_REALTIME_MODEL: &str = "gpt-realtime-1.5"; -const ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX: &str = - "Conversation already has an active response in progress:"; +const REALTIME_V2_PROGRESS_UPDATE_SUFFIX: &str = + "\n\nUpdate from background agent (task hasn't finished yet):"; +const REALTIME_V2_STEER_ACKNOWLEDGEMENT: &str = + "This was sent to steer the previous background agent task."; #[derive(Clone, Copy, Debug, PartialEq, Eq)] enum RealtimeConversationEnd { @@ -100,11 +101,11 @@ struct RealtimeHandoffState { #[derive(Debug, PartialEq, Eq)] enum HandoffOutput { - ImmediateAppend { + ProgressUpdate { handoff_id: String, output_text: String, }, - FinalToolCall { + FinalUpdate { handoff_id: String, output_text: String, }, @@ -116,11 +117,6 @@ struct OutputAudioState { audio_end_ms: u32, } -struct ResponseCreateState { - pending_response_create: bool, - response_in_progress: bool, -} - struct RealtimeInputTask { writer: RealtimeWebsocketWriter, events: RealtimeWebsocketEvents, @@ -130,6 +126,7 @@ struct RealtimeInputTask { events_tx: Sender, handoff_state: RealtimeHandoffState, session_kind: RealtimeSessionKind, + event_parser: RealtimeEventParser, } impl RealtimeHandoffState { @@ -203,7 +200,8 @@ impl RealtimeConversationManager { model_client, sdp, } = start; - let session_kind = match session_config.event_parser { + let event_parser = session_config.event_parser; + let session_kind = match event_parser { RealtimeEventParser::V1 => RealtimeSessionKind::V1, RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2, }; @@ -261,6 +259,7 @@ impl RealtimeConversationManager { events_tx, handoff_state: handoff.clone(), session_kind, + event_parser, }); let mut guard = self.state.lock().await; @@ -374,21 +373,14 @@ impl RealtimeConversationManager { }; *handoff.last_output_text.lock().await = Some(output_text.clone()); - match handoff.session_kind { - RealtimeSessionKind::V1 => { - handoff - .output_tx - .send(HandoffOutput::ImmediateAppend { - handoff_id, - output_text, - }) - .await - .map_err(|_| { - CodexErr::InvalidRequest("conversation is not running".to_string()) - })?; - } - RealtimeSessionKind::V2 => {} - } + handoff + .output_tx + .send(HandoffOutput::ProgressUpdate { + handoff_id, + output_text, + }) + .await + .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; Ok(()) } @@ -414,7 +406,7 @@ impl RealtimeConversationManager { handoff .output_tx - .send(HandoffOutput::FinalToolCall { + .send(HandoffOutput::FinalUpdate { handoff_id, output_text, }) @@ -874,26 +866,17 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { events_tx, handoff_state, session_kind, + event_parser, } = input; tokio::spawn(async move { - let mut response_create_state = ResponseCreateState { - pending_response_create: false, - response_in_progress: false, - }; let mut output_audio_state: Option = None; loop { let result = tokio::select! { // Text typed by the user that should be sent into realtime. user_text = user_text_rx.recv() => { - handle_user_text_input( - user_text, - &writer, - &events_tx, - session_kind, - &mut response_create_state, - ) + handle_user_text_input(user_text, &writer, &events_tx, session_kind) .await } // Background agent progress or final output that should be sent back to realtime. @@ -902,8 +885,8 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { background_agent_output, &writer, &events_tx, - session_kind, - &mut response_create_state, + &handoff_state, + event_parser, ) .await } @@ -916,7 +899,6 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> { &handoff_state, session_kind, &mut output_audio_state, - &mut response_create_state, ) .await } @@ -938,7 +920,6 @@ async fn handle_user_text_input( writer: &RealtimeWebsocketWriter, events_tx: &Sender, session_kind: RealtimeSessionKind, - response_create_state: &mut ResponseCreateState, ) -> anyhow::Result<()> { let text = text.context("user text input channel closed")?; @@ -953,18 +934,13 @@ async fn handle_user_text_input( match session_kind { RealtimeSessionKind::V1 => {} RealtimeSessionKind::V2 => { - if response_create_state.response_in_progress { - response_create_state.pending_response_create = true; - } else if let Err(err) = writer.send_response_create().await { + if let Err(err) = writer.send_response_create().await { let mapped_error = map_api_error(err); warn!("failed to send text response.create: {mapped_error}"); let _ = events_tx .send(RealtimeEvent::Error(mapped_error.to_string())) .await; return Err(mapped_error.into()); - } else { - response_create_state.pending_response_create = false; - response_create_state.response_in_progress = true; } } } @@ -975,64 +951,67 @@ async fn handle_handoff_output( handoff_output: Result, writer: &RealtimeWebsocketWriter, events_tx: &Sender, - session_kind: RealtimeSessionKind, - response_create_state: &mut ResponseCreateState, + handoff_state: &RealtimeHandoffState, + event_parser: RealtimeEventParser, ) -> anyhow::Result<()> { let handoff_output = handoff_output.context("handoff output channel closed")?; - let should_create_response = match handoff_output { - HandoffOutput::ImmediateAppend { - handoff_id, - output_text, - } => { - if let Err(err) = writer - .send_conversation_handoff_append(handoff_id, output_text) - .await - { - let mapped_error = map_api_error(err); - warn!("failed to send handoff output: {mapped_error}"); - let _ = events_tx - .send(RealtimeEvent::Error(mapped_error.to_string())) - .await; - return Err(mapped_error.into()); + let result = match event_parser { + RealtimeEventParser::V1 => match handoff_output { + HandoffOutput::ProgressUpdate { + handoff_id, + output_text, } - false - } - HandoffOutput::FinalToolCall { - handoff_id, - output_text, - } => { - if let Err(err) = writer - .send_conversation_handoff_append(handoff_id, output_text) - .await - { - let mapped_error = map_api_error(err); - warn!("failed to send handoff output: {mapped_error}"); - let _ = events_tx - .send(RealtimeEvent::Error(mapped_error.to_string())) - .await; - return Err(mapped_error.into()); + | HandoffOutput::FinalUpdate { + handoff_id, + output_text, + } => { + writer + .send_conversation_handoff_append(handoff_id, output_text) + .await } - match session_kind { - RealtimeSessionKind::V1 => false, - RealtimeSessionKind::V2 => true, + }, + RealtimeEventParser::RealtimeV2 => match handoff_output { + HandoffOutput::ProgressUpdate { + handoff_id, + output_text, + } => { + let active_handoff = handoff_state.active_handoff.lock().await.clone(); + match active_handoff { + Some(active_handoff) if active_handoff == handoff_id => {} + Some(_) | None => { + debug!("dropping stale realtime handoff progress update"); + return Ok(()); + } + } + writer + .send_conversation_item_create(format!( + "{output_text}{REALTIME_V2_PROGRESS_UPDATE_SUFFIX}" + )) + .await } - } + HandoffOutput::FinalUpdate { + handoff_id, + output_text, + } => { + if let Err(err) = writer + .send_conversation_handoff_append(handoff_id, output_text) + .await + { + Err(err) + } else { + writer.send_response_create().await + } + } + }, }; - if should_create_response { - if response_create_state.response_in_progress { - response_create_state.pending_response_create = true; - } else if let Err(err) = writer.send_response_create().await { - let mapped_error = map_api_error(err); - warn!("failed to send handoff response.create: {mapped_error}"); - let _ = events_tx - .send(RealtimeEvent::Error(mapped_error.to_string())) - .await; - return Err(mapped_error.into()); - } else { - response_create_state.pending_response_create = false; - response_create_state.response_in_progress = true; - } + if let Err(err) = result { + let mapped_error = map_api_error(err); + warn!("failed to send handoff output: {mapped_error}"); + let _ = events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await; + return Err(mapped_error.into()); } Ok(()) } @@ -1044,7 +1023,6 @@ async fn handle_realtime_server_event( handoff_state: &RealtimeHandoffState, session_kind: RealtimeSessionKind, output_audio_state: &mut Option, - response_create_state: &mut ResponseCreateState, ) -> anyhow::Result<()> { let event = match event { Ok(Some(event)) => event, @@ -1063,19 +1041,7 @@ async fn handle_realtime_server_event( } }; - let mut forward_event = true; let should_stop = match &event { - RealtimeEvent::ConversationItemAdded(item) => { - match session_kind { - RealtimeSessionKind::V1 => {} - RealtimeSessionKind::V2 => { - if let Some("response.created") = item.get("type").and_then(Value::as_str) { - response_create_state.response_in_progress = true; - } - } - } - false - } RealtimeEvent::AudioOut(frame) => { match session_kind { RealtimeSessionKind::V1 => {} @@ -1114,59 +1080,67 @@ async fn handle_realtime_server_event( false } RealtimeEvent::ResponseCancelled(_) => { - response_create_state.response_in_progress = false; *output_audio_state = None; + false + } + RealtimeEvent::HandoffRequested(handoff) => { + *output_audio_state = None; + match session_kind { - RealtimeSessionKind::V1 => {} + RealtimeSessionKind::V1 => { + *handoff_state.last_output_text.lock().await = None; + *handoff_state.active_handoff.lock().await = Some(handoff.handoff_id.clone()); + } RealtimeSessionKind::V2 => { - if response_create_state.pending_response_create { - if let Err(err) = writer.send_response_create().await { - let mapped_error = map_api_error(err); - warn!( - "failed to send deferred response.create after cancellation: {mapped_error}" - ); - let _ = events_tx - .send(RealtimeEvent::Error(mapped_error.to_string())) - .await; - return Err(mapped_error.into()); + let active_handoff = handoff_state.active_handoff.lock().await.clone(); + match active_handoff { + Some(_) => { + if let Err(err) = writer + .send_conversation_handoff_append( + handoff.handoff_id.clone(), + REALTIME_V2_STEER_ACKNOWLEDGEMENT.to_string(), + ) + .await + { + let mapped_error = map_api_error(err); + warn!( + "failed to send handoff steering acknowledgement: {mapped_error}" + ); + let _ = events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await; + return Err(mapped_error.into()); + } + if let Err(err) = writer.send_response_create().await { + let mapped_error = map_api_error(err); + warn!( + "failed to send handoff steering response.create: {mapped_error}" + ); + let _ = events_tx + .send(RealtimeEvent::Error(mapped_error.to_string())) + .await; + return Err(mapped_error.into()); + } + } + None => { + *handoff_state.last_output_text.lock().await = None; + *handoff_state.active_handoff.lock().await = + Some(handoff.handoff_id.clone()); } - response_create_state.pending_response_create = false; - response_create_state.response_in_progress = true; } } } false } - RealtimeEvent::HandoffRequested(handoff) => { - *handoff_state.active_handoff.lock().await = Some(handoff.handoff_id.clone()); - *handoff_state.last_output_text.lock().await = None; - response_create_state.response_in_progress = false; - *output_audio_state = None; - false - } - RealtimeEvent::Error(message) => match session_kind { - RealtimeSessionKind::V1 => true, - RealtimeSessionKind::V2 => { - if message.starts_with(ACTIVE_RESPONSE_CONFLICT_ERROR_PREFIX) { - warn!( - "realtime rejected response.create because a response is already in progress; deferring follow-up response.create" - ); - response_create_state.pending_response_create = true; - response_create_state.response_in_progress = true; - forward_event = false; - false - } else { - true - } - } - }, + RealtimeEvent::Error(_) => true, RealtimeEvent::SessionUpdated { .. } | RealtimeEvent::InputTranscriptDelta(_) | RealtimeEvent::OutputTranscriptDelta(_) + | RealtimeEvent::ConversationItemAdded(_) | RealtimeEvent::ConversationItemDone { .. } => false, }; - if forward_event && events_tx.send(event).await.is_err() { + if events_tx.send(event).await.is_err() { anyhow::bail!("realtime output event channel closed"); } if should_stop { From 2d9d579c6362e99dcfe786917370cf605ee1588f Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 9 Apr 2026 23:23:33 -0700 Subject: [PATCH 2/2] Add Realtime V2 progress integration coverage Co-authored-by: Codex --- .../tests/suite/v2/realtime_conversation.rs | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index f1d0ebd5dae2..a9b510da252e 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -1264,6 +1264,44 @@ async fn webrtc_v2_background_agent_tool_call_delegates_and_returns_function_out Ok(()) } +#[tokio::test] +async fn webrtc_v2_background_agent_progress_is_sent_before_function_output() -> Result<()> { + skip_if_no_network!(Ok(())); + + let mut harness = RealtimeE2eHarness::new( + RealtimeTestVersion::V2, + main_loop_responses(vec![create_final_assistant_message_sse_response( + "progress before final", + )?]), + realtime_sideband(vec![realtime_sideband_connection(vec![ + vec![ + session_updated("sess_v2_progress_before_final"), + v2_background_agent_tool_call("call_progress_order", "stream progress"), + ], + vec![], + vec![], + ])]), + ) + .await?; + + let started = harness.start_webrtc_realtime("v=offer\r\n").await?; + assert_eq!(started.started.version, RealtimeConversationVersion::V2); + + let turn_completed = harness + .read_notification::("turn/completed") + .await?; + assert_eq!(turn_completed.thread_id, harness.thread_id); + + let progress = harness.sideband_outbound_request(/*request_index*/ 1).await; + assert_v2_progress_update(&progress, "progress before final"); + + let tool_output = harness.sideband_outbound_request(/*request_index*/ 2).await; + assert_v2_function_call_output(&tool_output, "call_progress_order", "progress before final"); + + harness.shutdown().await; + Ok(()) +} + #[tokio::test] async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<()> { skip_if_no_network!(Ok(()));