feat(moq-rtc): add WebRTC (WHIP/WHEP) gateway#1916
Conversation
|
Caution Review failedPull request was closed or merged during review WalkthroughAdds a new 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
efc012f to
225e347
Compare
There was a problem hiding this comment.
Actionable comments posted: 11
🧹 Nitpick comments (3)
rs/moq-rtc/src/ingest.rs (1)
11-18: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAdd rustdoc to the public
IngestSinktype and its constructor.
IngestSink(Line 11) andIngestSink::new(Line 18) are public but undocumented, while the module root and the other public codec bridges in this crate carry doc comments.As per coding guidelines: "Document every exported/public symbol in Rust and JS/TS."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-rtc/src/ingest.rs` around lines 11 - 18, Add rustdoc comments for the public IngestSink type and its constructor so they match the rest of the crate’s exported API documentation. Update the definitions of IngestSink and IngestSink::new to describe what the sink owns/does and what the constructor initializes or validates, keeping the docs concise and consistent with the existing public codec bridge comments in this module.Source: Coding guidelines
rs/moq-mux/src/codec/annexb.rs (1)
151-185: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winConsider sharing the NALU walk with
length_prefixed_to_annexb.
from_length_prefixedduplicates the length-prefixed → Annex-B loop already inlength_prefixed_to_annexb(Lines 97-116). The two even differ in their bounds-checking style (checked_addhere vs. plainpos + nthere), so a future fix to one can silently miss the other. Extracting the shared per-NAL walk would keep both in lockstep.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/codec/annexb.rs` around lines 151 - 185, `from_length_prefixed` duplicates the same length-prefixed NALU traversal already implemented in `length_prefixed_to_annexb`, which risks the two paths drifting apart. Refactor the shared NALU walk into a common helper used by both `from_length_prefixed` and `length_prefixed_to_annexb`, and keep the existing prefix handling in `from_length_prefixed` layered on top of that helper.rs/moq-rtc/src/client/mod.rs (1)
36-36: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winDocument the public constructor.
Client::newis public but has no rustdoc. As per coding guidelines, “Document every exported/public symbol in Rust and JS/TS.”🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-rtc/src/client/mod.rs` at line 36, `Client::new` is a public constructor but is missing rustdoc. Add a doc comment for the `new` function in `Client` describing what it creates, its `config` parameter, and the returned `Self`, matching the project’s requirement to document every exported symbol.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-rtc/bin/moq-rtc.rs`:
- Around line 228-236: The WHIP publish path in moq-rtc.rs currently fails
immediately on request_broadcast when the broadcast is not yet announced, which
can race startup. Update the broadcast lookup in the publish flow to wait/retry
via subscriber.announced() (or the equivalent announcement-aware path) before
calling client.publish, and keep the existing request_broadcast/broadcast_name
logic but avoid mapping the first miss to an error.
- Around line 151-173: The Role::Server branch in run_role is ignoring the
broadcast argument, so the mandatory --broadcast flag has no effect in server
mode. Update the match arm for Role::Server to either pass broadcast through to
run_server or remove the requirement for broadcast when running as a server, and
make sure the behavior is consistent with the CLI contract in moq-rtc.rs.
- Around line 132-135: The readiness notification is sent too early, before the
server is actually bound and serving. Update the startup flow in moq-rtc.rs so
the sd_notify::notify call with NotifyState::Ready happens only after run_role
has successfully started listening, or move the notification into run_role right
after the axum_server::bind_rustls / axum_server::bind and .serve setup
completes. Use the run_role function and the sd_notify::notify call as the key
locations to adjust the order.
In `@rs/moq-rtc/Cargo.toml`:
- Around line 43-61: Move the shared crates used by moq-rtc from inline version
specs to the workspace-level dependency table in rs/Cargo.toml, then switch the
matching entries in moq-rtc’s Cargo.toml to use { workspace = true }. Update the
dependency declarations for anyhow, axum, bytes, reqwest, str0m, thiserror,
tracing, url, and uuid, and preserve any required feature settings by aligning
them at the workspace level or reapplying them consistently in moq-rtc. Use the
existing dependency names in [dependencies] and [workspace.dependencies] to
locate the affected entries.
In `@rs/moq-rtc/src/client/mod.rs`:
- Around line 36-40: The shared HTTP client in `Client::new` is created without
any application-level timeout, so stalled WHIP/WHEP requests can hang startup.
Update `Client::new` to build the `reqwest::Client` with a bounded timeout using
`reqwest::Client::builder()` and `Duration::from_secs(...)`, then assign that
client into `Self { config, http: client, ... }` instead of
`reqwest::Client::new()`.
In `@rs/moq-rtc/src/egress.rs`:
- Around line 29-34: Document the public API surface of WriteRequest by adding
field-level documentation for each exported field in the WriteRequest struct.
Update the WriteRequest definition to describe mid, pt, time, and payload so the
public Rust symbol is fully documented and compliant with the coding guidelines.
In `@rs/moq-rtc/src/server/mux.rs`:
- Around line 147-168: The packet routing logic in mux::server::Mux::route (the
block using registry.by_addr and local_ufrag) should parse and handle STUN
before consulting the cached src fast path. Update the lookup order so a STUN
Binding Request can refresh routing via its current ufrag even when src is
already cached, then only fall back to by_addr for non-STUN or unmatched
packets. Keep the cache update that inserts src into by_addr after resolving
by_ufrag.
In `@rs/moq-rtc/src/server/whep.rs`:
- Around line 86-90: The WHEP broadcast lookup in request_broadcast is
converting a missing broadcast into Error::Other, which later makes status_for
return 500 instead of not-found. Update the error mapping in
whep::request_broadcast handling to use the existing not-found error path
already recognized by status_for, so missing broadcasts are surfaced as the
correct HTTP status. Keep the change localized around the consumer lookup and
preserve the current success path.
- Around line 27-35: The WHEP handler is building the Location header from
Path<String>, which only contains the wildcard tail and drops the mount prefix.
Update handle in whep.rs to derive the full request path from OriginalUri
instead of path.0, then construct the response Location using that original URI
plus the resource_id so client DELETE requests target the correct URL.
In `@rs/moq-rtc/src/server/whip.rs`:
- Around line 28-40: The WHIP response Location header is being built from
Path(path), which only contains the wildcard tail and can drop the mounted
prefix like /whip. Update handle in whip.rs to use the OriginalUri extractor so
you can derive the full request path before routing, then construct the Location
value relative to that original URI when setting header::LOCATION. Keep the
existing accept_offer flow and response_headers logic, but replace the current
format based on path with one that preserves the mount prefix for the returned
resource URL.
In `@rs/moq-rtc/src/session.rs`:
- Line 146: Add rustdoc for the public Session::run method in the Session impl.
Document what the async run(self) entrypoint does, any important behavior or
lifecycle expectations, and its Result return value so this exported symbol
meets the documentation requirement.
---
Nitpick comments:
In `@rs/moq-mux/src/codec/annexb.rs`:
- Around line 151-185: `from_length_prefixed` duplicates the same
length-prefixed NALU traversal already implemented in
`length_prefixed_to_annexb`, which risks the two paths drifting apart. Refactor
the shared NALU walk into a common helper used by both `from_length_prefixed`
and `length_prefixed_to_annexb`, and keep the existing prefix handling in
`from_length_prefixed` layered on top of that helper.
In `@rs/moq-rtc/src/client/mod.rs`:
- Line 36: `Client::new` is a public constructor but is missing rustdoc. Add a
doc comment for the `new` function in `Client` describing what it creates, its
`config` parameter, and the returned `Self`, matching the project’s requirement
to document every exported symbol.
In `@rs/moq-rtc/src/ingest.rs`:
- Around line 11-18: Add rustdoc comments for the public IngestSink type and its
constructor so they match the rest of the crate’s exported API documentation.
Update the definitions of IngestSink and IngestSink::new to describe what the
sink owns/does and what the constructor initializes or validates, keeping the
docs concise and consistent with the existing public codec bridge comments in
this module.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0935aad3-a375-40f0-bd79-c2d707da0e77
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (29)
Cargo.tomldoc/.vitepress/config.tsdoc/bin/index.mddoc/bin/rtc.mdrs/CLAUDE.mdrs/moq-mux/src/codec/annexb.rsrs/moq-mux/src/codec/h264/mod.rsrs/moq-mux/src/codec/h265/mod.rsrs/moq-rtc/Cargo.tomlrs/moq-rtc/bin/moq-rtc.rsrs/moq-rtc/src/client/mod.rsrs/moq-rtc/src/client/whep.rsrs/moq-rtc/src/client/whip.rsrs/moq-rtc/src/codec/h264.rsrs/moq-rtc/src/codec/mod.rsrs/moq-rtc/src/codec/opus.rsrs/moq-rtc/src/codec/vp8.rsrs/moq-rtc/src/codec/vp9.rsrs/moq-rtc/src/egress.rsrs/moq-rtc/src/error.rsrs/moq-rtc/src/ingest.rsrs/moq-rtc/src/lib.rsrs/moq-rtc/src/sdp.rsrs/moq-rtc/src/server/mod.rsrs/moq-rtc/src/server/mux.rsrs/moq-rtc/src/server/whep.rsrs/moq-rtc/src/server/whip.rsrs/moq-rtc/src/session.rsrs/moq-rtc/tests/bitstream.rs
| #[cfg(unix)] | ||
| let _ = sd_notify::notify(&[sd_notify::NotifyState::Ready]); | ||
|
|
||
| let driver = run_role(role, &broadcast, public_addr, publisher, subscriber_consumer); |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Compare readiness signaling patterns and confirm this READY is emitted before serving starts.
rg -n 'sd_notify::notify|NotifyState::Ready|axum_server::bind|bind_rustls|run_role\(' rsRepository: moq-dev/moq
Length of output: 1219
Move sd_notify::NotifyState::Ready to after the server begins listening.
The readiness signal at line 133 executes before run_role is called at line 135. Inside run_role, the blocking axum_server::bind_rustls (or axum_server::bind) and .serve() calls occurring at lines 253 and 256 handle TLS setup, socket binding, and listening. Signaling readiness before these operations causes systemd to route traffic to the process before it can accept connections, potentially leading to connection failures during startup.
Move the sd_notify::notify call to a point strictly after the server has successfully bound and started serving, or restructure run_role to signal readiness only once the serve loop is active.
Current execution order
#[cfg(unix)]
let _ = sd_notify::notify(&[sd_notify::NotifyState::Ready]); // Signals "Ready" now
let driver = run_role(role, &broadcast, public_addr, publisher, subscriber_consumer);
// run_role contains:
// axum_server::bind_rustls(...).serve(service).await?;
// // Actual listening happens here (lines 253/256)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-rtc/bin/moq-rtc.rs` around lines 132 - 135, The readiness notification
is sent too early, before the server is actually bound and serving. Update the
startup flow in moq-rtc.rs so the sd_notify::notify call with NotifyState::Ready
happens only after run_role has successfully started listening, or move the
notification into run_role right after the axum_server::bind_rustls /
axum_server::bind and .serve setup completes. Use the run_role function and the
sd_notify::notify call as the key locations to adjust the order.
| match role { | ||
| Role::Server { | ||
| listen, | ||
| udp_bind, | ||
| tls_cert, | ||
| tls_key, | ||
| direction, | ||
| } => { | ||
| run_server( | ||
| public_addr, | ||
| udp_bind, | ||
| publisher, | ||
| subscriber, | ||
| listen, | ||
| tls_cert, | ||
| tls_key, | ||
| direction, | ||
| ) | ||
| .await | ||
| } | ||
| Role::Client { url, direction } => { | ||
| run_client(broadcast, public_addr, publisher, subscriber, url, direction).await | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | 🏗️ Heavy lift
Honor --broadcast in server mode or stop requiring it there.
run_role receives broadcast, but the Role::Server branch drops it. That makes the mandatory flag ineffective for server roles, despite the CLI help saying the gateway binds to that broadcast.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-rtc/bin/moq-rtc.rs` around lines 151 - 173, The Role::Server branch in
run_role is ignoring the broadcast argument, so the mandatory --broadcast flag
has no effect in server mode. Update the match arm for Role::Server to either
pass broadcast through to run_server or remove the requirement for broadcast
when running as a server, and make sure the behavior is consistent with the CLI
contract in moq-rtc.rs.
| // WHIP client: read the local broadcast, push as RTP to remote. | ||
| // Once the per-codec re-packetizer lands, this should poll | ||
| // `subscriber.announced()` to await the broadcast rather than | ||
| // erroring on first-miss. | ||
| let broadcast = subscriber | ||
| .request_broadcast(broadcast_name) | ||
| .await | ||
| .map_err(|_| anyhow::anyhow!("broadcast {} not announced", broadcast_name))?; | ||
| client.publish(url, broadcast).await?; |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major | 🏗️ Heavy lift
Wait for the broadcast instead of failing on the first miss.
client publish can race relay connection or announcement propagation and exit with “not announced” even if the broadcast appears moments later. Replace the fail-fast lookup with the announced/retry path before dialing WHIP.
Do you want me to generate the retry/announcement loop for this path?
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-rtc/bin/moq-rtc.rs` around lines 228 - 236, The WHIP publish path in
moq-rtc.rs currently fails immediately on request_broadcast when the broadcast
is not yet announced, which can race startup. Update the broadcast lookup in the
publish flow to wait/retry via subscriber.announced() (or the equivalent
announcement-aware path) before calling client.publish, and keep the existing
request_broadcast/broadcast_name logic but avoid mapping the first miss to an
error.
| [dependencies] | ||
| anyhow = { version = "1", features = ["backtrace"] } | ||
| axum = { version = "0.8", features = ["tokio"] } | ||
| axum-server = { version = "0.8", features = ["tls-rustls"], optional = true } | ||
| bytes = "1" | ||
| clap = { version = "4", features = ["derive"], optional = true } | ||
| hang = { workspace = true } | ||
| moq-mux = { workspace = true } | ||
| moq-native = { workspace = true, default-features = false, features = ["aws-lc-rs"], optional = true } | ||
| moq-net = { workspace = true } | ||
| reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } | ||
| rustls = { version = "0.23", features = ["aws-lc-rs"], default-features = false, optional = true } | ||
| str0m = "0.19" | ||
| thiserror = "2" | ||
| tokio = { workspace = true, features = ["full"] } | ||
| tower-http = { version = "0.6", features = ["cors"], optional = true } | ||
| tracing = "0.1" | ||
| url = "2" | ||
| uuid = { version = "1", features = ["v4"] } |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick win
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Show root workspace dependency table
fd -t f '^Cargo.toml$' rs --max-depth 1 --exec sed -n '/\[workspace.dependencies\]/,/^\[/p' {}
fd -t f '^Cargo.toml$' --max-depth 1 --exec sed -n '/\[workspace.dependencies\]/,/^\[/p' {}Repository: moq-dev/moq
Length of output: 1207
🏁 Script executed:
#!/bin/bash
# Check which of the inline deps in moq-rtc are declared in other crates' Cargo.toml
echo "Searching for usage of inline deps across the workspace..."
for dep in anyhow axum bytes reqwest str0m thiserror tracing url uuid; do
count=$(fd -t f 'Cargo.toml' rs --exec grep -l "^$dep\s*=" {} \; 2>/dev/null | wc -l)
echo "$dep: found in $count Cargo.toml files"
done
</shell_script>Repository: moq-dev/moq
Length of output: 624
Add shared dependencies to [workspace.dependencies] and reference with { workspace = true }.
The inline dependencies anyhow, axum, bytes, reqwest, thiserror, tracing, url (and str0m, uuid) are used across multiple crates in the workspace. Per the coding guideline ("new crates should add shared dependency versions/paths under [workspace.dependencies] and reference them with { workspace = true }"), these should be centralized in the root Cargo.toml to ensure version alignment.
Update the root rs/Cargo.toml to add these under [workspace.dependencies] with a consistent version, then replace the inline entries in rs/moq-rtc/Cargo.toml with { workspace = true }.
For example, in the root Cargo.toml:
[workspace.dependencies]
# ... existing ...
anyhow = "1"
axum = "0.8"
bytes = "1"
reqwest = { version = "0.12", default-features = false }
str0m = "0.19"
thiserror = "2"
tracing = "0.1"
url = "2"
uuid = "1"Then in rs/moq-rtc/Cargo.toml:
[dependencies]
anyhow = { workspace = true }
axum = { workspace = true }
# ... other shared deps as workspace = true ...
str0m = { workspace = true }
# ... etc ...Note: reqwest and url may need feature flags aligned with the workspace if other crates use them; reqwest currently uses rustls-tls and default-features = false in this crate.
[original_guidelines_enforcement]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-rtc/Cargo.toml` around lines 43 - 61, Move the shared crates used by
moq-rtc from inline version specs to the workspace-level dependency table in
rs/Cargo.toml, then switch the matching entries in moq-rtc’s Cargo.toml to use {
workspace = true }. Update the dependency declarations for anyhow, axum, bytes,
reqwest, str0m, thiserror, tracing, url, and uuid, and preserve any required
feature settings by aligning them at the workspace level or reapplying them
consistently in moq-rtc. Use the existing dependency names in [dependencies] and
[workspace.dependencies] to locate the affected entries.
Source: Coding guidelines
| pub fn new(config: Config) -> Self { | ||
| Self { | ||
| config, | ||
| http: reqwest::Client::new(), | ||
| } |
There was a problem hiding this comment.
🩺 Stability & Availability | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify whether moq-rtc configures reqwest timeouts anywhere.
rg -n 'reqwest::Client::new|reqwest::Client::builder|\.timeout\s*\(' rs/moq-rtcRepository: moq-dev/moq
Length of output: 214
Configure a timeout on the shared HTTP client.
reqwest::Client::new() sets no application-level timeout, so stalled WHIP/WHEP POSTs or response-body reads can hang client startup indefinitely. Construct the client with reqwest::Client::builder().timeout(Duration::from_secs(<bound>)).build() and attach it to Self { ..., http: client, ... }.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-rtc/src/client/mod.rs` around lines 36 - 40, The shared HTTP client in
`Client::new` is created without any application-level timeout, so stalled
WHIP/WHEP requests can hang startup. Update `Client::new` to build the
`reqwest::Client` with a bounded timeout using `reqwest::Client::builder()` and
`Duration::from_secs(...)`, then assign that client into `Self { config, http:
client, ... }` instead of `reqwest::Client::new()`.
| // Fast path: a source we've already paired with a session. | ||
| let sender = registry.lock().unwrap().by_addr.get(&src).cloned(); | ||
| let sender = match sender { | ||
| Some(sender) => Some(sender), | ||
| // New source: only a STUN binding request (carrying the local | ||
| // ufrag) can introduce one. Parse outside the lock. | ||
| None => match local_ufrag(data) { | ||
| Some(ufrag) => { | ||
| let mut registry = registry.lock().unwrap(); | ||
| match registry.by_ufrag.get(&ufrag).cloned() { | ||
| // Cache addr -> session so this peer's later non-STUN | ||
| // packets route without re-parsing. | ||
| Some(sender) => { | ||
| registry.by_addr.insert(src, sender.clone()); | ||
| Some(sender) | ||
| } | ||
| None => None, | ||
| } | ||
| } | ||
| None => None, | ||
| }, | ||
| }; |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Parse STUN before the cached-address fast path.
Once src is cached, a later STUN Binding Request from the same UDP source is sent to the old session without checking its new ufrag. That can prevent a reused 5-tuple / ICE restart from being routed to the newly registered session.
Suggested routing order
- // Fast path: a source we've already paired with a session.
- let sender = registry.lock().unwrap().by_addr.get(&src).cloned();
- let sender = match sender {
- Some(sender) => Some(sender),
- // New source: only a STUN binding request (carrying the local
- // ufrag) can introduce one. Parse outside the lock.
- None => match local_ufrag(data) {
- Some(ufrag) => {
- let mut registry = registry.lock().unwrap();
- match registry.by_ufrag.get(&ufrag).cloned() {
- // Cache addr -> session so this peer's later non-STUN
- // packets route without re-parsing.
- Some(sender) => {
- registry.by_addr.insert(src, sender.clone());
- Some(sender)
- }
- None => None,
- }
- }
- None => None,
- },
- };
+ // STUN binding requests can introduce or update addr -> session routing.
+ // Parse them before the address cache so a reused source address with a
+ // new ufrag does not get pinned to the old session.
+ let sender = if let Some(ufrag) = local_ufrag(data) {
+ let mut registry = registry.lock().unwrap();
+ registry.by_ufrag.get(&ufrag).cloned().inspect(|sender| {
+ registry.by_addr.insert(src, sender.clone());
+ })
+ } else {
+ registry.lock().unwrap().by_addr.get(&src).cloned()
+ };📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Fast path: a source we've already paired with a session. | |
| let sender = registry.lock().unwrap().by_addr.get(&src).cloned(); | |
| let sender = match sender { | |
| Some(sender) => Some(sender), | |
| // New source: only a STUN binding request (carrying the local | |
| // ufrag) can introduce one. Parse outside the lock. | |
| None => match local_ufrag(data) { | |
| Some(ufrag) => { | |
| let mut registry = registry.lock().unwrap(); | |
| match registry.by_ufrag.get(&ufrag).cloned() { | |
| // Cache addr -> session so this peer's later non-STUN | |
| // packets route without re-parsing. | |
| Some(sender) => { | |
| registry.by_addr.insert(src, sender.clone()); | |
| Some(sender) | |
| } | |
| None => None, | |
| } | |
| } | |
| None => None, | |
| }, | |
| }; | |
| // STUN binding requests can introduce or update addr -> session routing. | |
| // Parse them before the address cache so a reused source address with a | |
| // new ufrag does not get pinned to the old session. | |
| let sender = if let Some(ufrag) = local_ufrag(data) { | |
| let mut registry = registry.lock().unwrap(); | |
| registry.by_ufrag.get(&ufrag).cloned().inspect(|sender| { | |
| registry.by_addr.insert(src, sender.clone()); | |
| }) | |
| } else { | |
| registry.lock().unwrap().by_addr.get(&src).cloned() | |
| }; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-rtc/src/server/mux.rs` around lines 147 - 168, The packet routing
logic in mux::server::Mux::route (the block using registry.by_addr and
local_ufrag) should parse and handle STUN before consulting the cached src fast
path. Update the lookup order so a STUN Binding Request can refresh routing via
its current ufrag even when src is already cached, then only fall back to
by_addr for non-STUN or unmatched packets. Keep the cache update that inserts
src into by_addr after resolving by_ufrag.
| async fn handle(server: State<Server>, path: Path<String>, headers: HeaderMap, body: Bytes) -> HttpResponse { | ||
| let (server, path) = (server.0, path.0); | ||
| match accept_offer(&server, &path, &headers, body).await { | ||
| Ok(Response { resource_id, answer }) => { | ||
| let mut response_headers = HeaderMap::new(); | ||
| response_headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("application/sdp")); | ||
| if let Ok(loc) = HeaderValue::from_str(&format!("/{path}/{resource_id}")) { | ||
| response_headers.insert(header::LOCATION, loc); | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major
Build Location from the original request URI
When this router is mounted under /whep, Path(path) extracts only the wildcard tail, causing the constructed /{path}/{resource_id} to omit the mount prefix. The resulting Location header points to an incorrect URL for client-side DELETE operations.
Use OriginalUri to capture the full request path including the mount prefix and construct the header accurately.
Suggested fix
body::Bytes,
- extract::{Path, State},
+ extract::{OriginalUri, Path, State},
@@
-async fn handle(server: State<Server>, path: Path<String>, headers: HeaderMap, body: Bytes) -> HttpResponse {
+async fn handle(
+ server: State<Server>,
+ path: Path<String>,
+ headers: HeaderMap,
+ OriginalUri(uri): OriginalUri,
+ body: Bytes,
+) -> HttpResponse {
@@
- if let Ok(loc) = HeaderValue::from_str(&format!("/{path}/{resource_id}")) {
+ let location = format!("{}/{}", uri.path().trim_end_matches('/'), resource_id);
+ if let Ok(loc) = HeaderValue::from_str(&location) {
response_headers.insert(header::LOCATION, loc);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn handle(server: State<Server>, path: Path<String>, headers: HeaderMap, body: Bytes) -> HttpResponse { | |
| let (server, path) = (server.0, path.0); | |
| match accept_offer(&server, &path, &headers, body).await { | |
| Ok(Response { resource_id, answer }) => { | |
| let mut response_headers = HeaderMap::new(); | |
| response_headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("application/sdp")); | |
| if let Ok(loc) = HeaderValue::from_str(&format!("/{path}/{resource_id}")) { | |
| response_headers.insert(header::LOCATION, loc); | |
| } | |
| async fn handle( | |
| server: State<Server>, | |
| path: Path<String>, | |
| headers: HeaderMap, | |
| OriginalUri(uri): OriginalUri, | |
| body: Bytes, | |
| ) -> HttpResponse { | |
| let (server, path) = (server.0, path.0); | |
| match accept_offer(&server, &path, &headers, body).await { | |
| Ok(Response { resource_id, answer }) => { | |
| let mut response_headers = HeaderMap::new(); | |
| response_headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("application/sdp")); | |
| let location = format!("{}/{}", uri.path().trim_end_matches('/'), resource_id); | |
| if let Ok(loc) = HeaderValue::from_str(&location) { | |
| response_headers.insert(header::LOCATION, loc); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-rtc/src/server/whep.rs` around lines 27 - 35, The WHEP handler is
building the Location header from Path<String>, which only contains the wildcard
tail and drops the mount prefix. Update handle in whep.rs to derive the full
request path from OriginalUri instead of path.0, then construct the response
Location using that original URI plus the resource_id so client DELETE requests
target the correct URL.
| async fn handle( | ||
| State(server): State<Server>, | ||
| Path(path): Path<String>, | ||
| headers: HeaderMap, | ||
| body: Bytes, | ||
| ) -> HttpResponse { | ||
| match accept_offer(&server, &path, &headers, body).await { | ||
| Ok(Response { resource_id, answer }) => { | ||
| let mut response_headers = HeaderMap::new(); | ||
| response_headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("application/sdp")); | ||
| if let Ok(loc) = HeaderValue::from_str(&format!("/{path}/{resource_id}")) { | ||
| response_headers.insert(header::LOCATION, loc); | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major
Build Location from the original request URI to preserve the mount prefix.
When this handler is mounted under a path like /whip, Path(path) only provides the wildcard tail, causing /{path}/{resource_id} to generate a URL that drops the mount prefix. This breaks subsequent client requests (e.g., DELETE) which expect the full resource URL.
Use the OriginalUri extractor to access the full request path before routing and construct the location header relative to it.
Suggested fix
body::Bytes,
- extract::{Path, State},
+ extract::{OriginalUri, Path, State},
@@
Path(path): Path<String>,
headers: HeaderMap,
+ OriginalUri(uri): OriginalUri,
body: Bytes,
) -> HttpResponse {
@@
- if let Ok(loc) = HeaderValue::from_str(&format!("/{path}/{resource_id}")) {
+ let location = format!("{}/{}", uri.path().trim_end_matches('/'), resource_id);
+ if let Ok(loc) = HeaderValue::from_str(&location) {
response_headers.insert(header::LOCATION, loc);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async fn handle( | |
| State(server): State<Server>, | |
| Path(path): Path<String>, | |
| headers: HeaderMap, | |
| body: Bytes, | |
| ) -> HttpResponse { | |
| match accept_offer(&server, &path, &headers, body).await { | |
| Ok(Response { resource_id, answer }) => { | |
| let mut response_headers = HeaderMap::new(); | |
| response_headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("application/sdp")); | |
| if let Ok(loc) = HeaderValue::from_str(&format!("/{path}/{resource_id}")) { | |
| response_headers.insert(header::LOCATION, loc); | |
| } | |
| async fn handle( | |
| State(server): State<Server>, | |
| Path(path): Path<String>, | |
| OriginalUri(uri): OriginalUri, | |
| headers: HeaderMap, | |
| body: Bytes, | |
| ) -> HttpResponse { | |
| match accept_offer(&server, &path, &headers, body).await { | |
| Ok(Response { resource_id, answer }) => { | |
| let mut response_headers = HeaderMap::new(); | |
| response_headers.insert(header::CONTENT_TYPE, HeaderValue::from_static("application/sdp")); | |
| let location = format!("{}/{}", uri.path().trim_end_matches('/'), resource_id); | |
| if let Ok(loc) = HeaderValue::from_str(&location) { | |
| response_headers.insert(header::LOCATION, loc); | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-rtc/src/server/whip.rs` around lines 28 - 40, The WHIP response
Location header is being built from Path(path), which only contains the wildcard
tail and can drop the mounted prefix like /whip. Update handle in whip.rs to use
the OriginalUri extractor so you can derive the full request path before
routing, then construct the Location value relative to that original URI when
setting header::LOCATION. Keep the existing accept_offer flow and
response_headers logic, but replace the current format based on path with one
that preserves the mount prefix for the returned resource URL.
| } | ||
| } | ||
|
|
||
| pub async fn run(mut self) -> Result<()> { |
There was a problem hiding this comment.
📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick win
Add rustdoc for Session::run.
This is a public method on a public type and currently lacks documentation. As per coding guidelines, **/*.{rs,js,ts,tsx} must document every exported/public symbol in Rust and JS/TS.
Suggested doc
+ /// Drive the session until ICE/DTLS/media handling completes or fails.
pub async fn run(mut self) -> Result<()> {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn run(mut self) -> Result<()> { | |
| /// Drive the session until ICE/DTLS/media handling completes or fails. | |
| pub async fn run(mut self) -> Result<()> { |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-rtc/src/session.rs` at line 146, Add rustdoc for the public
Session::run method in the Session impl. Document what the async run(self)
entrypoint does, any important behavior or lifecycle expectations, and its
Result return value so this exported symbol meets the documentation requirement.
Source: Coding guidelines
Expose the out-of-band parameter sets a length-prefixed (avc1/hvc1) stream needs to re-inject at each keyframe when converting back to Annex-B: - `h264::Avcc` gains `sps`/`pps` fields (and `#[non_exhaustive]`); `Avcc::parse` now collects the NAL lists, deriving resolution from the first SPS. - `h265::Hvcc` + `Hvcc::parse`: the HEVC analogue, sorting VPS/SPS/PPS by type. - `annexb::from_length_prefixed` (with an optional keyframe prefix) and `annexb::build_prefix`. The internal `avcc_params`/`hvcc_params` flatteners now delegate to the typed parsers, so there is a single byte-parser per codec. This also fixes `hvcc_params` to keep only VPS/SPS/PPS as its doc already claimed (it previously flattened every NAL array). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Port the moq-rtc gateway from the dev branch. It bridges WebRTC and MoQ, speaking WHIP (publish) and WHEP (subscribe) in either HTTP role, so it can accept incoming peers (OBS, browsers) or dial out to a remote WebRTC server. All four paths work; ingest covers H.264/VP8/VP9/Opus and egress adds H.265/AV1, using str0m for ICE/DTLS/SRTP and a shared single-UDP-port mux. Adapted to main's current APIs: `Broadcast`/`Track` model, infallible `OriginConsumer::request_broadcast` + `OriginProducer::dynamic` (#1913), `BroadcastConsumer::subscribe_track`, `publish_broadcast` lifetime tied to the producer, moq-mux's in-place H.264 `Import` (avc3 mode), and moq-native's `with_publish`/`with_consume` client builder. Wires the crate into the workspace and documents it (doc/bin/rtc.md, index, sidebar, crate map). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…ustive configs
- Revert the `avcc_params`/`hvcc_params` delegation: the new typed `Avcc`/`Hvcc`
parsers stay for moq-rtc, but the existing pub(crate) flatteners that MPEG-TS
export relies on are restored verbatim, so ts/export output is byte-identical
(the delegation had silently dropped non-VPS/SPS/PPS hvcC arrays and could
reorder them). Also drops a duplicated doc comment.
- Add `#[non_exhaustive]` to the public `server::Config` / `client::Config` so
future fields stay additive; build them via `Default` + field assignment.
- Document the public `codec::{h264,vp8,vp9,opus}::Bridge` items.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
225e347 to
dd8ba19
Compare
|
Caution Failed to replace (edit) comment. This is likely due to insufficient permissions or the comment being deleted. Error details |
Summary
Ports
moq-rtc, the WebRTC <-> MoQ gateway, fromdevtomain. It speaks WHIP (publish) and WHEP (subscribe) in either HTTP role, so it can accept incoming peers (OBS, browsers) or dial out to a remote WebRTC server. All four paths work; ingest covers H.264/VP8/VP9/Opus and egress adds H.265/AV1. str0m drives ICE/DTLS/SRTP, and every WHIP/WHEP session shares a single UDP media port (demuxed by ICE ufrag).This became a tractable port (rather than a large backport) once #1913 landed
OriginProducer::dynamic+ infallibleOriginConsumer::request_broadcastonmain. The rest was adapting moq-rtc to main's current APIs plus a small, focused moq-mux addition.Changes
moq-mux (commit 1) — expose the parameter sets a length-prefixed (avc1/hvc1) stream needs to re-inject at each keyframe:
h264::Avccgainssps/ppsfields (and#[non_exhaustive]);Avcc::parsecollects the NAL lists.h265::Hvcc+Hvcc::parse(the HEVC analogue, sorting VPS/SPS/PPS by type).annexb::from_length_prefixed(optional keyframe prefix) andannexb::build_prefix.avcc_params/hvcc_paramsflatteners now delegate to the typed parsers (one byte-parser per codec). This also fixeshvcc_paramsto keep only VPS/SPS/PPS as its doc already claimed.moq-rtc (commit 2) — the gateway crate, adapted to main:
Broadcast/Trackmodel;subscribe_track;request_broadcast/dynamic(feat(moq-net): add OriginProducer::dynamic + infallible OriginConsumer::request_broadcast #1913).publish_broadcastannouncement lifetime tied to the producer (no RAII guard).Importin avc3 mode (replacing the devSplit+Import).with_publish/with_consumeclient builder.doc/bin/rtc.md, index, sidebar,rs/CLAUDE.md).Public API changes (all additive)
moq-mux: newpub fn annexb::from_length_prefixed,annexb::build_prefix; newpub struct h265::Hvcc+Hvcc::parse; extendedpub struct h264::Avcc(addedsps/ppsfields, now#[non_exhaustive]— non-breaking).moq-rtc: new crate (public surface:Server/Config,server::{whip,whep}::accept,client,codec,egress/ingest,Error).No wire-protocol changes; targets
mainas a new, additive crate.Test plan
cargo check/clippyclean (moq-mux, moq-rtc,--all-targets);cargo build --workspaceokcargo test -p moq-mux -p moq-rtcgreen (260 + 17 + 5 bitstream, incl. the new H.264Importingest path and directAvcc::parse/Hvcc::parsetests)cargo fmt --check+taploclean(Written by Claude)
🤖 Generated with Claude Code