diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index c990c8933833..f1739034986b 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -246,13 +246,15 @@ impl JsonRpcConnection { } } Err(err) => { - send_malformed_message( + send_disconnected( &incoming_tx_for_reader, + &disconnected_tx_for_reader, Some(format!( "failed to parse JSON-RPC message from {reader_label}: {err}" )), ) .await; + break; } } } diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 7c1d109c6df0..a408a765d508 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -117,7 +117,7 @@ impl EnvironmentManager { } /// Builds a manager from a provider-supplied startup snapshot. - pub async fn from_provider

( + pub(crate) async fn from_provider

( provider: &P, local_runtime_paths: ExecServerRuntimePaths, ) -> Result @@ -368,11 +368,11 @@ mod tests { use super::EnvironmentManager; use super::LOCAL_ENVIRONMENT_ID; use super::REMOTE_ENVIRONMENT_ID; - use crate::EnvironmentProvider; use crate::ExecServerError; use crate::ExecServerRuntimePaths; use crate::ProcessId; use crate::environment_provider::EnvironmentDefault; + use crate::environment_provider::EnvironmentProvider; use crate::environment_provider::EnvironmentProviderSnapshot; use pretty_assertions::assert_eq; diff --git a/codex-rs/exec-server/src/environment_provider.rs b/codex-rs/exec-server/src/environment_provider.rs index a337d7e1bcac..9df6747563a8 100644 --- a/codex-rs/exec-server/src/environment_provider.rs +++ b/codex-rs/exec-server/src/environment_provider.rs @@ -14,7 +14,7 @@ use crate::environment::REMOTE_ENVIRONMENT_ID; /// selection. Providers that want the local environment to be addressable by /// id should include it explicitly in the returned list. #[async_trait] -pub trait EnvironmentProvider: Send + Sync { +pub(crate) trait EnvironmentProvider: Send + Sync { /// Returns the provider-owned environment startup snapshot. async fn snapshot( &self, @@ -23,14 +23,14 @@ pub trait EnvironmentProvider: Send + Sync { } #[derive(Clone, Debug)] -pub struct EnvironmentProviderSnapshot { +pub(crate) struct EnvironmentProviderSnapshot { pub environments: Vec<(String, Environment)>, pub default: EnvironmentDefault, pub include_all_environments_by_default: bool, } #[derive(Clone, Debug, PartialEq, Eq)] -pub enum EnvironmentDefault { +pub(crate) enum EnvironmentDefault { Disabled, EnvironmentId(String), } diff --git a/codex-rs/exec-server/src/environment_toml.rs b/codex-rs/exec-server/src/environment_toml.rs index bd45bcf83225..52fdb1a28239 100644 --- a/codex-rs/exec-server/src/environment_toml.rs +++ b/codex-rs/exec-server/src/environment_toml.rs @@ -9,13 +9,13 @@ use tokio_tungstenite::tungstenite::client::IntoClientRequest; use crate::DefaultEnvironmentProvider; use crate::Environment; -use crate::EnvironmentProvider; use crate::ExecServerError; use crate::ExecServerRuntimePaths; use crate::client_api::ExecServerTransportParams; use crate::client_api::StdioExecServerCommand; use crate::environment::LOCAL_ENVIRONMENT_ID; use crate::environment_provider::EnvironmentDefault; +use crate::environment_provider::EnvironmentProvider; use crate::environment_provider::EnvironmentProviderSnapshot; const ENVIRONMENTS_TOML_FILE: &str = "environments.toml"; diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index d8c147127c50..bfb4eb477581 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -42,7 +42,6 @@ pub use environment::EnvironmentManager; pub use environment::LOCAL_ENVIRONMENT_ID; pub use environment::REMOTE_ENVIRONMENT_ID; pub use environment_provider::DefaultEnvironmentProvider; -pub use environment_provider::EnvironmentProvider; pub use fs_helper::CODEX_FS_HELPER_ARG1; pub use fs_helper_main::main as run_fs_helper_main; pub use local_file_system::LOCAL_FS; diff --git a/codex-rs/exec-server/src/rpc.rs b/codex-rs/exec-server/src/rpc.rs index e4f2ff554a49..358db23f5b66 100644 --- a/codex-rs/exec-server/src/rpc.rs +++ b/codex-rs/exec-server/src/rpc.rs @@ -533,7 +533,9 @@ mod tests { use tokio::io::BufReader; use tokio::time::timeout; + use super::RpcCallError; use super::RpcClient; + use super::RpcClientEvent; use crate::connection::JsonRpcConnection; async fn read_jsonrpc_line(lines: &mut tokio::io::Lines>) -> JSONRPCMessage @@ -637,4 +639,41 @@ mod tests { panic!("server task failed: {err}"); } } + + #[tokio::test] + async fn stdio_malformed_message_closes_client_and_rejects_later_calls() { + let (client_stdin, _server_reader) = tokio::io::duplex(4096); + let (mut server_writer, client_stdout) = tokio::io::duplex(4096); + let connection = + JsonRpcConnection::from_stdio(client_stdout, client_stdin, "test-rpc".to_string()); + let (client, mut events_rx) = RpcClient::new(connection); + + server_writer + .write_all(b"not-json\n") + .await + .expect("malformed line should write"); + + let event = timeout(Duration::from_secs(1), events_rx.recv()) + .await + .expect("disconnect event should not time out") + .expect("disconnect event should be sent"); + let RpcClientEvent::Disconnected { reason } = event else { + panic!("expected disconnect event after malformed stdio message"); + }; + assert!( + reason + .as_deref() + .is_some_and(|reason| reason.contains("failed to parse JSON-RPC message")), + "disconnect should explain malformed stdio message, got {reason:?}" + ); + + let result = timeout( + Duration::from_secs(1), + client.call::<_, serde_json::Value>("after-malformed", &serde_json::json!({})), + ) + .await + .expect("later call should fail instead of hanging"); + assert!(matches!(result, Err(RpcCallError::Closed))); + assert_eq!(client.pending_request_count().await, 0); + } }