feat(moq-cli): add --pace to emit TS output, following the live edge#1973
feat(moq-cli): add --pace to emit TS output, following the live edge#1973kixelated wants to merge 13 commits into
Conversation
`moq-cli subscribe --format ts` writes a retained broadcast as fast as it can be read. Add `--pace` to instead emit frames on the media clock (anchored to the first frame), like ffmpeg's `-re`, which is what a downstream player or re-publish expects. The pacing itself is a reusable `moq_mux::container::Pacer` building block: it maps each frame's presentation timestamp onto the wall clock and returns the Instant the frame is due, leaving the clock read and the sleep to the caller so it stays runtime-free. The anchor never moves, so a burst-read retained stream still drains at its media rate; reordered B-frames map into the past and emit immediately. (This is the opposite policy from moq-srt's live-edge stamper, which re-anchors to the edge because the SRT receiver owns the jitter buffer.) Only TS exposes per-frame timestamps today, so `--pace` with any other format is rejected by `SubscribeArgs::validate()` before connecting to the relay. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Sorry @kixelated, you have reached your weekly rate limit of 500000 diff characters.
Please try again later or upgrade to continue using Sourcery
`due` now reads the clock itself, and only on the first frame: the anchor is the only place `Instant::now()` is needed, so `get_or_insert_with` skips the call on every later frame (which map off the anchor). The first call returns the anchor instant, so the unit tests stay deterministic by reading it back rather than stubbing the clock, keeping the public surface free of a tokio time type. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis change adds real-time pacing support for MPEG-TS subscriptions. A new 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
rs/moq-cli/src/subscribe.rs (1)
73-81: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winConsider a unit test for
validate().A quick inline test (
pace=true, format=Ts→ Ok;pace=true, format!=Ts→ Err) would lock in this contract cheaply, consistent with the existing pattern of inline#[cfg(test)]tests elsewhere in the crate.✅ Example test
#[cfg(test)] mod tests { use super::*; #[test] fn pace_requires_ts_format() { let mut args = SubscribeArgs { format: SubscribeFormat::Fmp4, max_latency: Duration::from_millis(500), pace: true, fragment_duration: None, catalog: None, }; assert!(args.validate().is_err()); args.format = SubscribeFormat::Ts; assert!(args.validate().is_ok()); } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-cli/src/subscribe.rs` around lines 73 - 81, Add a small unit test for SubscribeArgs::validate to lock in the pace/format contract: verify that pace=true with SubscribeFormat::Ts returns Ok, and pace=true with any non-Ts format returns Err. Place it in an inline #[cfg(test)] mod next to validate(), following the crate’s existing test style, and reference SubscribeArgs, SubscribeFormat, and validate() so the behavior stays covered.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@rs/moq-cli/src/subscribe.rs`:
- Around line 73-81: Add a small unit test for SubscribeArgs::validate to lock
in the pace/format contract: verify that pace=true with SubscribeFormat::Ts
returns Ok, and pace=true with any non-Ts format returns Err. Place it in an
inline #[cfg(test)] mod next to validate(), following the crate’s existing test
style, and reference SubscribeArgs, SubscribeFormat, and validate() so the
behavior stays covered.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f8c656ba-f455-4c1f-9525-65595a4316d3
📒 Files selected for processing (5)
doc/bin/cli.mdrs/moq-cli/src/main.rsrs/moq-cli/src/subscribe.rsrs/moq-mux/src/container/mod.rsrs/moq-mux/src/container/pace.rs
Lock in that SubscribeArgs::validate accepts --pace only with --format ts. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Pacing on the raw frame PTS was wrong for reordered (B-frame) streams: the muxer emits in decode order, where the PTS is non-monotonic, so the per-frame schedule came out bursty within the reorder window (bounded by the catalog `jitter`). ts::Export now paces on its internal decode clock (the authored DTS, seeded from the catalog jitter reserve), which is monotonic. It does not sleep; it hands back the due wall-clock instant on a new `ts::Output` struct (replacing the bare container::Frame return) so the caller decides whether to honor it. This keeps moq-mux runtime-free (no tokio time dependency). moq-cli `--pace` now just sleeps until `frame.pace`. The standalone container::Pacer is demoted to pub(crate) (Export owns its use). moq-srt's Subscriber keeps its own live-edge stamper (its re-anchor policy differs) and only needed the return-type rename. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
rs/moq-mux/src/container/ts/export.rs (1)
246-246: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winDocument
poll_next.
poll_nextis public and newly changed here, but it has no rustdoc contract likenext. Add a short doc comment or reduce the visibility if it is not meant to be exported. As per coding guidelines,**/*.{rs,ts,tsx,js,jsx}: Document every exported public API symbol.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/container/ts/export.rs` at line 246, The public API method poll_next on Export needs rustdoc because it is exported and newly changed; add a short doc comment that matches the contract style used for next, or reduce its visibility if it is not intended to be public. Update the documentation directly on Export::poll_next so the exported symbol is covered by the project’s public API docs requirement.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-mux/src/container/ts/export.rs`:
- Around line 236-246: `Export::next` and `Export::poll_next` are making a
breaking public API change by switching their return type from `Option<Frame>`
to `Option<Output>`. Restore the existing `Frame`-based signatures for the
current public methods, and introduce a separate paced-output API (using
`Output` and its pacing fields) so callers can opt in without breaking
`rs/moq-srt/src/ts.rs` and other downstream users. Keep the new pacing behavior
additive by routing it through a new method or type while preserving the
existing `Export::next`/`poll_next` contract.
In `@rs/moq-srt/src/ts.rs`:
- Line 90: The public Subscriber::next API is leaking
moq_mux::container::ts::Output through moq-srt’s boundary, so update
ts::Subscriber::next to return the local moq-srt Output type instead. Keep the
conversion from the muxer’s ts::Output to moq-srt::Output inside this module,
and adjust any internal call sites or helper logic so the dependency type
remains encapsulated behind the moq-srt layer.
---
Nitpick comments:
In `@rs/moq-mux/src/container/ts/export.rs`:
- Line 246: The public API method poll_next on Export needs rustdoc because it
is exported and newly changed; add a short doc comment that matches the contract
style used for next, or reduce its visibility if it is not intended to be
public. Update the documentation directly on Export::poll_next so the exported
symbol is covered by the project’s public API docs requirement.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 78c89b1a-c103-482a-bf8b-812bb60006ec
📒 Files selected for processing (5)
rs/moq-cli/src/subscribe.rsrs/moq-mux/src/container/mod.rsrs/moq-mux/src/container/pace.rsrs/moq-mux/src/container/ts/export.rsrs/moq-srt/src/ts.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rs/moq-mux/src/container/mod.rs
…ency budget Rework the pacer into a jitter-buffer model that holds at most `lead` of buffer ahead of the live edge, re-anchoring past that so a tune-in burst or a faster-than-real source never accrues unbounded latency. This is the moq-srt egress pacer generalized: `lead = 0` is "never lead now" (the SRT receiver owns the buffer), `lead > 0` lets a sleeping caller hold that much buffer itself. ts::Export feeds the pacer its `--latency` budget, so `Output.pace`: - streams a retained broadcast at media rate (like `-re`) when the caller sleeps in lockstep, the cap never trips; - keeps a live or bursty source within `latency` of the edge instead of playing the initial GOP burst out slowly as permanent latency. moq-srt now shares this pacer: its Export already runs at latency 0, so it just stamps each SRT payload with `frame.pace` and its bespoke `pace()` + `Paced` + the standalone re-anchor test are deleted (the logic moved to the moq-mux Pacer, which also paces on the decode clock, a small improvement over moq-srt's PTS). moq-cli `--pace` documents that `--max-latency` is the pace buffer. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The pace buffer and the read/skip budget were both driven by the muxer's single `latency`, so moq-srt could only get its required pace lead of 0 by leaving the read budget at 0 too. Decouple them: `ts::Export` gains `with_pace_lead`, which overrides the pace buffer independently of `with_latency`. moq-srt now sets the read/skip budget to the connection's negotiated SRT latency (`socket.settings().send_tsbpd_latency`, the max of our floor and the caller's `?latency`), so a track tolerates the same jitter the receiver's TSBPD does, while pinning the pace lead to zero. Pace lead must stay 0 for SRT: the receiver owns the jitter buffer, and a muxer lead would just stack on top of TSBPD and double the end-to-end latency. moq-cli is unchanged: it leaves the pace lead following `--max-latency`. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
rs/moq-mux/src/container/ts/export.rs (1)
141-166: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winMake
Outputnon-exhaustive before publishing it.
Outputis a newly exposed return type; leaving it exhaustive makes any future field addition another breaking change. Since callers receive it fromExport::next,#[non_exhaustive]keeps field access while preventing exhaustive construction/matching outside the crate.♻️ Proposed API hardening
+#[non_exhaustive] pub struct Output {As per coding guidelines, “Before exposing a new public type, function, field, or enum variant, prefer smaller, more future-proof API shapes such as ... non-exhaustive public types when appropriate.”
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/container/ts/export.rs` around lines 141 - 166, Mark the public Output type in export::Export::next as non-exhaustive so future fields can be added without breaking callers; update the Output definition to use #[non_exhaustive] while keeping its existing public fields (payload, timestamp, keyframe, pace) accessible for read-only use.Source: Coding guidelines
rs/moq-srt/src/ts.rs (1)
97-99: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winUpdate
nextdocs to mention the pacing instant.The returned
ts::Outputnow carriespace; the current text still describes only TS bytes plus media timestamp.📝 Proposed doc update
- /// Pull the next muxed frame (TS bytes + media timestamp), or `None` once the - /// broadcast ends. + /// Pull the next muxed frame (TS bytes, media timestamp, and pacing instant), + /// or `None` once the broadcast ends.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-srt/src/ts.rs` around lines 97 - 99, The doc comment for `ts::Muxer::next` is outdated because `ts::Output` now includes a pacing instant in `pace` in addition to TS bytes and media timestamp. Update the `next` documentation to mention that the returned `ts::Output` carries pacing information, and keep the wording aligned with the `ts::Output` struct’s current fields so callers understand all values they receive.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@rs/moq-mux/src/container/ts/export.rs`:
- Around line 141-166: Mark the public Output type in export::Export::next as
non-exhaustive so future fields can be added without breaking callers; update
the Output definition to use #[non_exhaustive] while keeping its existing public
fields (payload, timestamp, keyframe, pace) accessible for read-only use.
In `@rs/moq-srt/src/ts.rs`:
- Around line 97-99: The doc comment for `ts::Muxer::next` is outdated because
`ts::Output` now includes a pacing instant in `pace` in addition to TS bytes and
media timestamp. Update the `next` documentation to mention that the returned
`ts::Output` carries pacing information, and keep the wording aligned with the
`ts::Output` struct’s current fields so callers understand all values they
receive.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4799c6cf-4def-412b-92ba-b12c03fca476
📒 Files selected for processing (3)
rs/moq-mux/src/container/ts/export.rsrs/moq-srt/src/server.rsrs/moq-srt/src/ts.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rs/moq-srt/src/server.rs
The egress read/skip budget was derived from the per-connection negotiated SRT latency (`socket.settings().send_tsbpd_latency`), which rises with a caller's `?latency`. But the gateway's latency is a fixed policy, not a per-caller knob, so use the configured value: thread `Server`'s latency through to the egress muxer instead of reading it off the socket. Rename the flag `--srt-latency` -> `--latency` to match (the `srt-` prefix is redundant on the moq-srt binary, and it's a fixed value, not a per-call max). `--srt-latency` stays a hidden alias for back-compat. A caller's `?latency` still raises its own SRT receive buffer as before; it just no longer moves the gateway-side read budget. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Keep future fields on the new public ts::Output additive, and mention the pacing instant in Subscriber::next's docs. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Unlike --latency (free), --listen collides with moq-native's --server-bind, which aliases --listen and is flattened into the same subcommand. Keep the srt- prefix on the listener flags to disambiguate, and note why in the help text. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
`moq-native`'s `--server-bind` aliased the bare `--listen`, which forced the SRT listener to stay `--srt-listen`. Rename that clap alias to `--server-listen` (in keeping with the `--server-*` namespace) so `--listen` is free. The TOML `listen` key is untouched (kept as a serde alias), so relay configs keep working. With `--listen` free, drop the redundant `srt-` prefix from the moq-srt listener flags: `--srt-listen` -> `--listen`, `--srt-prefix` -> `--prefix` (both keep the old names as hidden aliases). `--latency` was already de-prefixed. Updated the one CLI site that used the old alias (demo/pub/justfile: `serve --listen` -> `--server-listen`) and the moq-srt README examples. moq-rtc/moq-cli `--listen` are their own separate flags and are unaffected. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…8b97c # Conflicts: # rs/moq-srt/README.md # rs/moq-srt/bin/moq-srt.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
rs/moq-srt/src/server.rs (1)
60-68: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winUpdate
Server::binddocs for the new latency semantics.
latencyis now stored and used for subscribe egress read/skip budgeting, but the public docs still describe only handshake receive latency.Suggested doc update
- /// `latency` is the SRT receive latency, negotiated at handshake time; pass - /// `None` for a sensible default (200ms). + /// `latency` is the SRT receive latency, negotiated at handshake time; pass + /// `None` for a sensible default (200ms). For subscribe connections, the same + /// value also bounds the MPEG-TS egress read/skip budget before the receiver's + /// TSBPD owns buffering.As per coding guidelines, exported Rust API symbols should be documented.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-srt/src/server.rs` around lines 60 - 68, Update the public documentation for Server::bind to reflect the new latency semantics. The current doc comment on bind only describes SRT handshake receive latency, but latency is also stored on Server and used for subscribe egress read/skip budgeting. Revise the bind docs (and any related doc comments on Server if present) so they explain the full behavior of latency, keeping the API documentation complete for the exported symbol.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@rs/moq-srt/src/server.rs`:
- Around line 60-68: Update the public documentation for Server::bind to reflect
the new latency semantics. The current doc comment on bind only describes SRT
handshake receive latency, but latency is also stored on Server and used for
subscribe egress read/skip budgeting. Revise the bind docs (and any related doc
comments on Server if present) so they explain the full behavior of latency,
keeping the API documentation complete for the exported symbol.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b12c664e-94d6-4826-8976-18313f69cb46
📒 Files selected for processing (3)
rs/moq-mux/src/container/ts/export.rsrs/moq-srt/src/server.rsrs/moq-srt/src/ts.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- rs/moq-srt/src/ts.rs
- rs/moq-mux/src/container/ts/export.rs
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Summary
Adds
--pacetomoq-cli subscribe --format ts, emitting the stream at its real-time rate (like ffmpeg's-re) via a jitter-buffer pacer ints::Exportthat follows the live edge with a bounded latency budget.The pacer
Pacermaps a frame's decode timestamp to the wall-clock instant it's due, holding at mostleadof buffer ahead of the live edge. When the source outruns that (a tune-in burst delivers a whole GOP at once, or it drifts ahead of wall-clock), it re-anchors to the edge so latency never grows pastlead. It paces on the decode clock (DTS) — which the muxer already authors from the catalogjitterreserve — so reordered B-frame streams pace evenly rather than bursting on non-monotonic PTS.This is the moq-srt egress pacer generalized:
lead = 0-> never lead now (the SRT receiver's TSBPD owns the buffer). moq-srt's case.lead > 0-> a sleeping caller holds that much buffer itself. moq-cli's case (lead = --max-latency).Exportonly computes the instant (no sleep), so moq-mux stays runtime-free; the caller owns the delay.What's in it
ts::Export: returnsts::Output { payload, timestamp, keyframe, pace }(#[non_exhaustive]).paceis the due instant on the decode clock.with_latencysets the read/skip budget (and, by default, the pace buffer);with_pace_leadoverrides the pace buffer independently (SRT pins it to 0).--pace:sleep_until(frame.pace); rejected before connecting for non-tsformats.frame.pace(pace lead 0, since TSBPD owns the buffer) and matches the muxer's read/skip budget to the server's configuredConfig.latency.--paceexample indoc/bin/cli.md.Merge note
Rebased over main's #1975 (
moq-rtmp/srt/rtc -> library-only). That refactor removed the standalonemoq-srtbinary, so the CLI-flag work this PR briefly carried (--srt-latency/--srt-listen/--srt-prefixrenames and amoq-native--listenalias change to free up--listen) was dropped in the merge — it targeted a binary that no longer exists. The moq-srt library pacing changes remain. When the SRT CLI returns as amoq-cli srtsubcommand, the flag naming can be revisited there.API surface (breaking, in moq-mux)
pub struct ts::Output(#[non_exhaustive]);ts::Export::{next, poll_next}returnOption<Output>instead ofOption<container::Frame>.ts::Export::with_pace_lead;container::Pacerispub(crate).--paceflag;SubscribeArgs::validate().Test plan
cargo test -p moq-mux -p moq-srt -p moq-cli(340 / 6 / pass; Pacer covers media-rate, re-anchor-past-budget, trailing-frame, zero-lead)cargo clippy ... -D warningsclean (moq-mux/moq-srt/moq-cli/moq-native)(Written by Claude Opus 4.8)