Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rs/moq-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ web-transport-trait = { workspace = true }
sd-notify = "0.5"

[dev-dependencies]
# Old qmux for the cross-version WebSocket interop test (old client -> current relay).
qmux_old = { package = "qmux", version = "0.0.8" }
rcgen = "0.14"
tempfile = "3"
wiremock = "0.6"
130 changes: 128 additions & 2 deletions rs/moq-relay/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,10 @@ mod tests {
use axum::{Router, extract::WebSocketUpgrade, routing::any};
use std::sync::Mutex;
use tokio::sync::oneshot;
// Brings `qmux::Session::protocol` and `::closed` into scope.
use web_transport_trait::Session as _;
// Brings the session/stream trait methods (`protocol`, `closed`, `open_uni`,
// `accept_uni`, `write_all`, `read_all`, `finish`) into scope. Both the
// current and old qmux sessions implement these (shared web-transport-trait 0.3).
use web_transport_trait::{RecvStream as _, SendStream as _, Session as _};

/// The newest moq ALPN both sides agree on. Derived from the same source
/// of truth that `supported_subprotocols` and `qmux::Client::with_protocols`
Expand Down Expand Up @@ -344,4 +346,128 @@ mod tests {
drop(session);
server.abort();
}

/// The relay must keep advertising the subprotocols old clients speak, or the
/// next qmux bump could silently drop pre-existing browser/native clients.
#[test]
fn advertises_legacy_subprotocols_for_old_clients() {
let list = supported_subprotocols();

// Bare fallbacks for the oldest peers (pre-qmux WebTransport polyfill and
// a client that pins only a wire version).
assert!(
list.iter().any(|s| s == "webtransport"),
"missing bare webtransport: {list:?}"
);
assert!(list.iter().any(|s| s == "qmux-00"), "missing bare qmux-00: {list:?}");

// A qmux-00 client (e.g. qmux 0.0.8) negotiates `qmux-00.<lite-alpn>`; at
// least the newest lite ALPN must still be offered under qmux-00.
let newest_lite = moq_net::ALPNS
.iter()
.copied()
.find(|a| a.starts_with("moq-lite-"))
.expect("a moq-lite ALPN");
let entry = format!("qmux-00.{newest_lite}");
assert!(list.contains(&entry), "missing {entry}: {list:?}");
}

/// Cross-version regression: an **old** qmux 0.0.8 WebSocket client must still
/// interop with a server using the current qmux, proving the qmux bump didn't
/// break clients already deployed in the wild.
///
/// The old client only knows the `qmux-00` wire format, so it offers
/// `qmux-00.<alpn>` (+ bare fallbacks). The server advertises the relay's real
/// subprotocol list (qmux-01 preferred), and they must converge on a legacy
/// `qmux-00` pairing. A uni stream then round-trips, exercising qmux-00 frame
/// encode (old) -> decode (current) across the version gap.
#[tokio::test]
async fn old_qmux_client_interops_over_websocket() {
// (negotiated server protocol, bytes received on a uni stream)
let (got_tx, got_rx) = oneshot::channel::<(Option<String>, Vec<u8>)>();
let got_tx = Arc::new(Mutex::new(Some(got_tx)));

let route = {
let got_tx = got_tx.clone();
any(move |ws: WebSocketUpgrade| {
let got_tx = got_tx.clone();
async move {
let ws = ws.protocols(supported_subprotocols());
ws.on_upgrade(move |socket| async move {
let alpn = socket.protocol().and_then(|h| h.to_str().ok()).map(str::to_owned);
let socket = socket
.map(axum_to_tungstenite)
.sink_map_err(|_| tungstenite::Error::ConnectionClosed)
.with(tungstenite_to_axum);

let upgraded = qmux::ws::Upgraded::new(socket);
let upgraded = match alpn.as_deref() {
Some(alpn) => upgraded.with_alpn(alpn),
None => upgraded,
};
let session = upgraded.accept();
let proto = session.protocol().map(str::to_owned);

// Read the single uni stream the old client opens.
let bytes = match session.accept_uni().await {
Ok(mut recv) => recv.read_all().await.map(|b| b.to_vec()).unwrap_or_default(),
Err(_) => Vec::new(),
};
if let Some(tx) = got_tx.lock().unwrap().take() {
let _ = tx.send((proto, bytes));
}
let _ = session.closed().await;
})
}
})
};

let app = Router::new().route("/", route);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind listener");
let addr = listener.local_addr().expect("local addr");
let server = tokio::spawn(async move {
axum::serve(listener, app).await.expect("axum serve");
});

// Old qmux 0.0.8 client. Its `with_protocols` takes bare `&[&str]` and
// offers them under the qmux-00 prefix.
let session = qmux_old::Client::new()
.with_protocols(moq_net::ALPNS)
.connect(&format!("ws://{addr}/"))
.await
.expect("old qmux client connect");

// Old client and current server must converge on the newest lite ALPN,
// carried over the legacy qmux-00 wire format.
assert_eq!(
session.protocol(),
Some(newest_moq_alpn()),
"old client negotiated {:?}, expected the newest moq ALPN over qmux-00",
session.protocol()
);

let mut send = session.open_uni().await.expect("old client open_uni");
send.write_all(b"hello").await.expect("old client write_all");
send.finish().expect("old client finish");

let (server_proto, server_bytes) = tokio::time::timeout(std::time::Duration::from_secs(5), got_rx)
.await
.expect("server channel timed out")
.expect("server channel dropped");

assert_eq!(
server_proto.as_deref(),
Some(newest_moq_alpn()),
"server should see the newest moq ALPN negotiated with the old client",
);
assert_eq!(
server_bytes, b"hello",
"uni stream payload must survive the qmux-00 round-trip"
);

drop(session);
server.abort();
}
}
Loading