feat(moq-srt): bidirectional SRT/MPEG-TS gateway (+ timestamped ts::Export)#1915
Conversation
…ed tune-in `ts::Export::next()` now returns `Option<Frame>` instead of `Option<Bytes>`, so each muxed MPEG-TS chunk carries the source media `timestamp` and `keyframe` flag. A transport (e.g. SRT) can then pace delivery on the media clock rather than blasting bytes with no timing. The leading PAT/PMT now rides on the first frame so it inherits a real timestamp instead of being emitted as a separate untimed chunk. Also align the stream to the first video keyframe on tune-in: hold output until the first video keyframe is buffered and drop any non-video frames ahead of it. MPEG-TS carries the H.264/H.265 parameter sets in-band on the keyframe, so on a mid-stream join the audio source can start over a second before the oldest cached keyframe; emitting that lead audio first buries the SPS/PPS behind an audio-only preamble and a live decoder gives up before it configures video. BREAKING CHANGE: ts::Export::next()/poll_next() return Frame, not Bytes. The sole in-tree caller (moq-cli subscribe) is updated. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Add the `rs/moq-srt` crate (library + `moq-srt` binary): a pure-Rust SRT gateway over MoQ, ingest and egress. - `m=publish` (default): demux the connection's MPEG-TS with moq-mux and publish it into an origin as a broadcast (the contribution path: OBS, ffmpeg). - `m=request`: re-mux a broadcast from the origin back to MPEG-TS and stream it to a plain SRT player (VLC, ffmpeg), paced on the media clock via the new timestamped `ts::Export` frames. Two entry points: `run(origin, Config)` is the unauthenticated convenience; `Server`/`Request` let an embedder authorize each connection (treat the stream id as a token) and pick the broadcast path, mirroring `moq-native`'s `Server`/`Request`. SRT is provided by `srt-tokio`, no libsrt or ffmpeg dependency. The `moq-srt` binary runs a local QUIC/WebTransport server (`serve`) or forwards ingested broadcasts to a remote relay (`publish`). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (4)
🚧 Files skipped from review as they are similar to previous changes (4)
WalkthroughCargo workspace now includes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (2)
rs/moq-mux/src/container/ts/export_test.rs (1)
838-847: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winKeep the MP2 matching one-to-one.
any(|ing| ing.ends_with(rt))can satisfy both round-tripped programs with the same ingested stream, so a duplicate-one-track / drop-the-other regression would still pass. Make the comparison order-independent, but consume each ingested candidate at most once.Suggested fix
- for rt in &roundtripped { + let mut matched = vec![false; ingested.len()]; + for rt in &roundtripped { assert!(!rt.is_empty(), "a program lost all of its MP2 frames"); - assert!( - ingested.iter().any(|ing| ing.ends_with(rt)), - "round-tripped MP2 must be a byte-exact suffix of an ingested program" - ); + let Some((idx, _)) = ingested + .iter() + .enumerate() + .find(|(i, ing)| !matched[*i] && ing.ends_with(rt)) else { + panic!("round-tripped MP2 must be a byte-exact suffix of a distinct ingested program"); + }; + matched[idx] = true; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-mux/src/container/ts/export_test.rs` around lines 838 - 847, The MP2 round-trip assertion in export_test.rs is still allowing the same ingested stream to match multiple roundtripped programs via roundtripped.iter().any on ingested, so a duplicate/drop regression can pass. Update the check around the roundtripped loop to keep the comparison order-independent but one-to-one by tracking which ingested candidate has already been matched and consuming each only once. Use the existing roundtripped and ingested collections in that test block to enforce unique matching per program.rs/moq-srt/src/error.rs (1)
6-43: 📐 Maintainability & Code Quality | 🔵 TrivialReplace
Arcwrappers with#[error(transparent)]and removeCloneThis public error type violates the repository's error policy for
moq-*library crates, which mandates using#[error(transparent)]with#[from]for wrapped foreign errors. The currentArcworkaround and manualFromimplementations are unnecessary as no external consumers or internal usages requiringClone(e.g.,tokio::spawn,async move) were found.Update the
Errorenum to removeCloneand theArcwrappers, allowingthiserrorto handle conversions transparently:
- Remove the
Clonederive andstd::sync::Arcimport.- Change variants to
#[error(transparent)]and#[from].- Remove the manual
Fromimplementations forstd::io::Error,moq_mux::Error, andanyhow::Error.- use std::sync::Arc; - /// Errors produced while ingesting SRT into MoQ. - #[derive(Debug, Clone, thiserror::Error)] + #[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum Error { - #[error("moq: {0}")] + #[error(transparent)] Moq(#[from] moq_net::Error), - #[error("mux: {0}")] - Mux(Arc<moq_mux::Error>), + #[error(transparent)] + Mux(#[from] moq_mux::Error), - #[error("io: {0}")] - Io(Arc<std::io::Error>), + #[error(transparent)] + Io(#[from] std::io::Error), - #[error("{0}")] - Other(Arc<anyhow::Error>), + #[error(transparent)] + Other(#[from] anyhow::Error), } - - impl From<std::io::Error> for Error { - fn from(err: std::io::Error) -> Self { - Error::Io(Arc::new(err)) - } - } - - impl From<moq_mux::Error> for Error { - fn from(err: moq_mux::Error) -> Self { - Error::Mux(Arc::new(err)) - } - } - - impl From<anyhow::Error> for Error { - fn from(err: anyhow::Error) -> Self { - Error::Other(Arc::new(err)) - } - }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-srt/src/error.rs` around lines 6 - 43, The Error enum in error::Error still uses Arc wrappers, manual From impls, and Clone, which conflicts with the library error policy. Update the Error variants Moq, Mux, Io, and Other to use #[error(transparent)] with #[from] directly, remove the Clone derive and Arc imports, and delete the manual From<std::io::Error>, From<moq_mux::Error>, and From<anyhow::Error> implementations.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-mux/src/container/ts/export.rs`:
- Around line 208-212: The new public poll_next method on Export is missing
documentation, unlike next, so add a doc comment for this exported entrypoint.
Update the Export impl to document poll_next with a brief summary of its
behavior and how it uses the Waiter to drive frame polling. Keep the docs
aligned with the existing next method and ensure any other public symbols in
this area remain documented.
- Around line 232-250: The startup buffering logic in the export loop can
incorrectly use a delta video frame as the tune-in anchor once header_ready() is
true. Update the polling/parking behavior in the track read loop so that pending
video frames are only accepted as the initial anchor after a keyframe has been
buffered, and make video_ready() and first_video_pts() rely on that keyframe
state instead of the first retained video frame. Apply the same fix wherever the
same startup buffering logic appears in the export path.
In `@rs/moq-srt/bin/moq-srt.rs`:
- Around line 116-120: The HTTP sidecar bind is derived too early in run_serve
from the pre-init ServerConfig string, which can diverge from the actual QUIC
listener address after init. Update run_serve to initialize the server first
with config.init(), then derive web_bind from server.local_addr() so the sidecar
and /certificate.sha256 endpoint always follow the real bound socket, including
hostname resolution and port 0 cases.
In `@rs/moq-srt/Cargo.toml`:
- Around line 2-3: The package metadata description in Cargo.toml still says
ingest-only, which doesn’t match the crate’s actual bidirectional surface.
Update the description for moq-srt to align with what src/lib.rs and README.md
already present, and keep the wording consistent with the crate’s bidirectional
SRT role.
In `@rs/moq-srt/src/server.rs`:
- Around line 375-384: The pacing logic in `server.rs` currently uses
`saturating_sub` in the `Paced` timestamp calculation, which flattens any `ts`
earlier than `base` to the live edge. Update the `Paced` construction so the
delta is computed with a separate branch for `ts < base`, allowing those
timestamps to be offset backward from `anchor` instead of clamping to zero,
while preserving the existing re-anchor behavior when `send_at > now`. Add a
regression test around the pacing/reanchor path that covers a backward timestamp
arriving after re-anchoring to verify it keeps its earlier media time.
---
Nitpick comments:
In `@rs/moq-mux/src/container/ts/export_test.rs`:
- Around line 838-847: The MP2 round-trip assertion in export_test.rs is still
allowing the same ingested stream to match multiple roundtripped programs via
roundtripped.iter().any on ingested, so a duplicate/drop regression can pass.
Update the check around the roundtripped loop to keep the comparison
order-independent but one-to-one by tracking which ingested candidate has
already been matched and consuming each only once. Use the existing roundtripped
and ingested collections in that test block to enforce unique matching per
program.
In `@rs/moq-srt/src/error.rs`:
- Around line 6-43: The Error enum in error::Error still uses Arc wrappers,
manual From impls, and Clone, which conflicts with the library error policy.
Update the Error variants Moq, Mux, Io, and Other to use #[error(transparent)]
with #[from] directly, remove the Clone derive and Arc imports, and delete the
manual From<std::io::Error>, From<moq_mux::Error>, and From<anyhow::Error>
implementations.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c222721d-dffe-4062-ae33-28b46f7cff7b
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (15)
Cargo.tomlrs/moq-cli/src/subscribe.rsrs/moq-mux/src/container/ts/export.rsrs/moq-mux/src/container/ts/export_test.rsrs/moq-mux/src/container/ts/import_test.rsrs/moq-srt/Cargo.tomlrs/moq-srt/README.mdrs/moq-srt/bin/moq-srt.rsrs/moq-srt/bin/serve.rsrs/moq-srt/bin/web.rsrs/moq-srt/src/error.rsrs/moq-srt/src/lib.rsrs/moq-srt/src/listen.rsrs/moq-srt/src/server.rsrs/moq-srt/src/ts.rs
| let video_start = self.video_start; | ||
| for track in self.tracks.values_mut() { | ||
| if track.pending.is_some() || track.finished { | ||
| continue; | ||
| } | ||
| let is_video = matches!(track.kind, Kind::Video(_)); | ||
| loop { | ||
| match track.source.poll_read(waiter) { | ||
| Poll::Ready(Ok(Some(frame))) => { | ||
| if waiting_for_header && !track.source.header_ready() { | ||
| continue; | ||
| } | ||
| // Tune-in alignment: drop non-video frames before the first video | ||
| // keyframe (see `video_start`) so the in-band SPS/PPS leads the stream. | ||
| if let Some(start) = video_start | ||
| && !is_video && frame.timestamp < start | ||
| { | ||
| continue; | ||
| } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Skip startup delta frames until a video keyframe is buffered.
Once header_ready() flips true, the loop can park a non-keyframe video frame in pending. video_ready() / first_video_pts() then accept that delta frame as the tune-in anchor, so streams with out-of-band codec config can still start mid-GOP instead of on the first retained keyframe.
Suggested fix
for track in self.tracks.values_mut() {
if track.pending.is_some() || track.finished {
continue;
}
let is_video = matches!(track.kind, Kind::Video(_));
loop {
match track.source.poll_read(waiter) {
Poll::Ready(Ok(Some(frame))) => {
if waiting_for_header && !track.source.header_ready() {
continue;
}
+ if waiting_for_header && is_video && !frame.keyframe {
+ continue;
+ }
// Tune-in alignment: drop non-video frames before the first video
// keyframe (see `video_start`) so the in-band SPS/PPS leads the stream.
if let Some(start) = video_start
&& !is_video && frame.timestamp < start
{
continue;
} fn video_ready(&self) -> bool {
self.tracks
.values()
.filter(|t| matches!(t.kind, Kind::Video(_)))
- .all(|t| t.pending.is_some() || t.finished)
+ .all(|t| t.pending.as_ref().is_some_and(|f| f.keyframe) || t.finished)
}
@@
fn first_video_pts(&self) -> Option<crate::container::Timestamp> {
self.tracks
.values()
.filter(|t| matches!(t.kind, Kind::Video(_)))
- .filter_map(|t| t.pending.as_ref().map(|f| f.timestamp))
+ .filter_map(|t| t.pending.as_ref().filter(|f| f.keyframe).map(|f| f.timestamp))
.min()
}Also applies to: 488-503
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-mux/src/container/ts/export.rs` around lines 232 - 250, The startup
buffering logic in the export loop can incorrectly use a delta video frame as
the tune-in anchor once header_ready() is true. Update the polling/parking
behavior in the track read loop so that pending video frames are only accepted
as the initial anchor after a keyframe has been buffered, and make video_ready()
and first_video_pts() rely on that keyframe state instead of the first retained
video frame. Apply the same fix wherever the same startup buffering logic
appears in the export path.
| let send_at = anchor + Duration::from(ts).saturating_sub(Duration::from(base)); | ||
| if send_at > now { | ||
| // Media outran wall-clock: re-anchor so this newest frame is the live edge. | ||
| Paced { | ||
| send_at: now, | ||
| anchor: now, | ||
| base: ts, | ||
| } | ||
| } else { | ||
| Paced { send_at, anchor, base } |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Handle timestamps earlier than base without flattening them to anchor.
Duration::saturating_sub turns every ts < base frame into a zero offset, so a reordered/B-frame after re-anchoring gets paced at the live edge instead of at its earlier media time. That contradicts the contract in this docblock and compresses out-of-order video timing. Please branch the delta calculation so earlier timestamps subtract from anchor rather than saturating to it, and add a regression test for a post-reanchor backward timestamp.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rs/moq-srt/src/server.rs` around lines 375 - 384, The pacing logic in
`server.rs` currently uses `saturating_sub` in the `Paced` timestamp
calculation, which flattens any `ts` earlier than `base` to the live edge.
Update the `Paced` construction so the delta is computed with a separate branch
for `ts < base`, allowing those timestamps to be offset backward from `anchor`
instead of clamping to zero, while preserving the existing re-anchor behavior
when `send_at > now`. Add a regression test around the pacing/reanchor path that
covers a backward timestamp arriving after re-anchoring to verify it keeps its
earlier media time.
…amal-16281b # Conflicts: # Cargo.lock # Cargo.toml
Summary
Ports the
moq-srtSRT/MPEG-TS gateway fromdevtomain, along with the minimalmoq-muxts::Exportrework it depends on. (This is the "backport the enabler, then the crate" path discussed for landing SRT onmainahead of thedev → mainmerge.)1.
moq-mux: timestampedts::Export+ keyframe-aligned tune-in (breaking)ts::Export::next()now returnsOption<Frame>instead ofOption<Bytes>. Each muxed MPEG-TS chunk carries the source mediatimestampandkeyframeflag, so a transport can pace delivery on the media clock instead of blasting bytes with no timing. The leading PAT/PMT rides on the first frame (inheriting a real timestamp) rather than being emitted as a separate untimed chunk.It also aligns the stream to the first video keyframe on tune-in: output is held until the first video keyframe is buffered, and non-video frames ahead of it are dropped. MPEG-TS carries the H.264/H.265 parameter sets in-band on the keyframe, so on a mid-stream join the audio source can start over a second before the oldest cached keyframe; emitting that lead audio first buries the SPS/PPS behind an audio-only preamble and a live decoder gives up before it ever configures video. This is a real bug fix for live SRT egress, not just an SRT enabler.
This is a deliberately minimal slice of the larger
devcontainer/Framerefactor: it keepsmain's synchronousCatalogSource/Export::newand only touchests::Export.2.
moq-srt: the gateway crateA pure-Rust SRT gateway over MoQ, both directions:
m=publish(default): demux the connection's MPEG-TS withmoq-muxand publish it into an origin as a broadcast (contribution: OBS, ffmpeg).m=request: re-mux a broadcast back to MPEG-TS and stream it to a plain SRT player (VLC, ffmpeg), paced on the media clock via the new timestamped frames.run(origin, Config)is the unauthenticated convenience;Server/Requestlet an embedder authorize each connection and pick the path, mirroringmoq-native. SRT is provided bysrt-tokio(no libsrt/ffmpeg). Themoq-srtbinary runs a local QUIC/WebTransport server (serve) or forwards to a remote relay (publish).Breaking changes
moq-mux(0.x):ts::Export::next()/poll_next()returnFrame, notBytes. The only in-tree caller (moq-cli subscribe) is updated. Version bump left to release-plz.Public API
moq-srt:Config,run,Server,Request,Publish,Subscribe.moq-mux:ts::Export::next/poll_nextreturn type changedBytes→Frame(breaking).Test plan
cargo build --workspace --all-featurescleancargo clippy --workspace --all-featurescleanmoq-muxts tests: 183 pass, incl. a newexport_starts_at_video_keyframeregression test; 3 roundtrip fixtures re-timed past the keyframemoq-srt: 7 unit tests passcargo fmtvia nixmainwith a live SRT loopback (ffmpeg/VLC) this round — recommended before mergeBranch targeting
Targets
mainper the explicit request to land SRT there now. Note themoq-muxchange is breaking (0.x), which would normally route todev; calling it out so reviewers can redirect if preferred.🤖 Generated with Claude Code
(Written by Claude)