From 68f70a32c242030bfa553cc63b67df88c01168c7 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 19 Jun 2026 13:15:06 -0700 Subject: [PATCH] test(relay): cross-version qmux WebSocket backward-compat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Guard against the qmux 0.0.8 -> 0.2.0 bump (and future bumps) silently breaking already-deployed clients over WebSocket: - `old_qmux_client_interops_over_websocket`: an actual qmux 0.0.8 client (added as a renamed dev-dependency) connects to a server using the current qmux and the relay's real advertised subprotocols. They must converge on the newest lite ALPN over the legacy qmux-00 wire, and a uni stream must round-trip — exercising qmux-00 frame encode (old) -> decode (current). Both qmux versions share web-transport-trait 0.3, so one trait drives both sides. - `advertises_legacy_subprotocols_for_old_clients`: asserts the relay keeps advertising the bare `webtransport`/`qmux-00` fallbacks and `qmux-00.`, the subprotocols old clients depend on. Co-Authored-By: Claude Opus 4.8 --- Cargo.lock | 23 +++++- rs/moq-relay/Cargo.toml | 2 + rs/moq-relay/src/websocket.rs | 130 +++++++++++++++++++++++++++++++++- 3 files changed, 151 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64dc5f8f5..e881ef016 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3919,7 +3919,7 @@ dependencies = [ "moq-net", "notify", "parking_lot", - "qmux", + "qmux 0.2.0", "quinn", "rand 0.10.1", "rcgen", @@ -3985,7 +3985,8 @@ dependencies = [ "moq-native", "moq-net", "moq-token", - "qmux", + "qmux 0.0.8", + "qmux 0.2.0", "rcgen", "reqwest 0.12.28", "reqwest-middleware", @@ -5462,6 +5463,24 @@ dependencies = [ "zstd", ] +[[package]] +name = "qmux" +version = "0.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a761722f4499404d45c4971e332b8d0d1fe4a1882d404c2c5f96f3e49933e9" +dependencies = [ + "bytes", + "futures", + "rustls", + "thiserror 2.0.18", + "tokio", + "tokio-rustls", + "tokio-tungstenite", + "tracing", + "web-transport-proto", + "web-transport-trait", +] + [[package]] name = "qmux" version = "0.2.0" diff --git a/rs/moq-relay/Cargo.toml b/rs/moq-relay/Cargo.toml index f1d98e7a4..aaf51a91c 100644 --- a/rs/moq-relay/Cargo.toml +++ b/rs/moq-relay/Cargo.toml @@ -66,6 +66,8 @@ web-transport-trait = { workspace = true } sd-notify = "0.5" [dev-dependencies] +# Old qmux for the cross-version WebSocket interop test (old client -> current relay). +qmux_old = { package = "qmux", version = "0.0.8" } rcgen = "0.14" tempfile = "3" wiremock = "0.6" diff --git a/rs/moq-relay/src/websocket.rs b/rs/moq-relay/src/websocket.rs index 87fa8e5f6..ad6afe4ae 100644 --- a/rs/moq-relay/src/websocket.rs +++ b/rs/moq-relay/src/websocket.rs @@ -202,8 +202,10 @@ mod tests { use axum::{Router, extract::WebSocketUpgrade, routing::any}; use std::sync::Mutex; use tokio::sync::oneshot; - // Brings `qmux::Session::protocol` and `::closed` into scope. - use web_transport_trait::Session as _; + // Brings the session/stream trait methods (`protocol`, `closed`, `open_uni`, + // `accept_uni`, `write_all`, `read_all`, `finish`) into scope. Both the + // current and old qmux sessions implement these (shared web-transport-trait 0.3). + use web_transport_trait::{RecvStream as _, SendStream as _, Session as _}; /// The newest moq ALPN both sides agree on. Derived from the same source /// of truth that `supported_subprotocols` and `qmux::Client::with_protocols` @@ -344,4 +346,128 @@ mod tests { drop(session); server.abort(); } + + /// The relay must keep advertising the subprotocols old clients speak, or the + /// next qmux bump could silently drop pre-existing browser/native clients. + #[test] + fn advertises_legacy_subprotocols_for_old_clients() { + let list = supported_subprotocols(); + + // Bare fallbacks for the oldest peers (pre-qmux WebTransport polyfill and + // a client that pins only a wire version). + assert!( + list.iter().any(|s| s == "webtransport"), + "missing bare webtransport: {list:?}" + ); + assert!(list.iter().any(|s| s == "qmux-00"), "missing bare qmux-00: {list:?}"); + + // A qmux-00 client (e.g. qmux 0.0.8) negotiates `qmux-00.`; at + // least the newest lite ALPN must still be offered under qmux-00. + let newest_lite = moq_net::ALPNS + .iter() + .copied() + .find(|a| a.starts_with("moq-lite-")) + .expect("a moq-lite ALPN"); + let entry = format!("qmux-00.{newest_lite}"); + assert!(list.contains(&entry), "missing {entry}: {list:?}"); + } + + /// Cross-version regression: an **old** qmux 0.0.8 WebSocket client must still + /// interop with a server using the current qmux, proving the qmux bump didn't + /// break clients already deployed in the wild. + /// + /// The old client only knows the `qmux-00` wire format, so it offers + /// `qmux-00.` (+ bare fallbacks). The server advertises the relay's real + /// subprotocol list (qmux-01 preferred), and they must converge on a legacy + /// `qmux-00` pairing. A uni stream then round-trips, exercising qmux-00 frame + /// encode (old) -> decode (current) across the version gap. + #[tokio::test] + async fn old_qmux_client_interops_over_websocket() { + // (negotiated server protocol, bytes received on a uni stream) + let (got_tx, got_rx) = oneshot::channel::<(Option, Vec)>(); + let got_tx = Arc::new(Mutex::new(Some(got_tx))); + + let route = { + let got_tx = got_tx.clone(); + any(move |ws: WebSocketUpgrade| { + let got_tx = got_tx.clone(); + async move { + let ws = ws.protocols(supported_subprotocols()); + ws.on_upgrade(move |socket| async move { + let alpn = socket.protocol().and_then(|h| h.to_str().ok()).map(str::to_owned); + let socket = socket + .map(axum_to_tungstenite) + .sink_map_err(|_| tungstenite::Error::ConnectionClosed) + .with(tungstenite_to_axum); + + let upgraded = qmux::ws::Upgraded::new(socket); + let upgraded = match alpn.as_deref() { + Some(alpn) => upgraded.with_alpn(alpn), + None => upgraded, + }; + let session = upgraded.accept(); + let proto = session.protocol().map(str::to_owned); + + // Read the single uni stream the old client opens. + let bytes = match session.accept_uni().await { + Ok(mut recv) => recv.read_all().await.map(|b| b.to_vec()).unwrap_or_default(), + Err(_) => Vec::new(), + }; + if let Some(tx) = got_tx.lock().unwrap().take() { + let _ = tx.send((proto, bytes)); + } + let _ = session.closed().await; + }) + } + }) + }; + + let app = Router::new().route("/", route); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("bind listener"); + let addr = listener.local_addr().expect("local addr"); + let server = tokio::spawn(async move { + axum::serve(listener, app).await.expect("axum serve"); + }); + + // Old qmux 0.0.8 client. Its `with_protocols` takes bare `&[&str]` and + // offers them under the qmux-00 prefix. + let session = qmux_old::Client::new() + .with_protocols(moq_net::ALPNS) + .connect(&format!("ws://{addr}/")) + .await + .expect("old qmux client connect"); + + // Old client and current server must converge on the newest lite ALPN, + // carried over the legacy qmux-00 wire format. + assert_eq!( + session.protocol(), + Some(newest_moq_alpn()), + "old client negotiated {:?}, expected the newest moq ALPN over qmux-00", + session.protocol() + ); + + let mut send = session.open_uni().await.expect("old client open_uni"); + send.write_all(b"hello").await.expect("old client write_all"); + send.finish().expect("old client finish"); + + let (server_proto, server_bytes) = tokio::time::timeout(std::time::Duration::from_secs(5), got_rx) + .await + .expect("server channel timed out") + .expect("server channel dropped"); + + assert_eq!( + server_proto.as_deref(), + Some(newest_moq_alpn()), + "server should see the newest moq ALPN negotiated with the old client", + ); + assert_eq!( + server_bytes, b"hello", + "uni stream payload must survive the qmux-00 round-trip" + ); + + drop(session); + server.abort(); + } }