feat(moq-mux): clear consumer buffer when group timestamps rewind#1884
Conversation
Adds opt-in rewind detection to the container Consumer (rs/moq-mux, mirrored in the js/hang Consumer). When a newer group's timestamps jump backwards past the live edge by more than a threshold, the publisher is reneging the buffered tail (e.g. a voice agent interrupted mid-utterance, issue #1614). The consumer drops the reneged groups and resumes at the rewound timeline. Groups arrive out of order, so a single sequence floor is wrong: a late new-epoch group can have a lower sequence than the group that triggered detection. The recorded boundary (prev_max, reset_group, reset_timestamp) classifies any group by (sequence, timestamp): at or below prev_max is old, at or above reset_group is new, and the ambiguous span between them is split by timestamp (gap-fillers below reset_timestamp are kept, old stragglers above are dropped). Off by default (no threshold set), so existing behavior is unchanged. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
| * agent interrupted mid-utterance). The consumer drops the stale groups and resumes | ||
| * from the rewound group. Disabled (undefined) by default. | ||
| */ | ||
| resetThreshold?: Signal<Time.Milli> | Time.Milli; |
There was a problem hiding this comment.
Do we need this? Can we just hard-code it to zero?
There was a problem hiding this comment.
And we can just always enable this functionality. I don't see why we should ever make it optional.
There was a problem hiding this comment.
Done in 748f002 — removed the threshold and the prop entirely. Always-on now: a reset fires whenever a newer group's first frame timestamp is strictly below the live edge.
(Written by Claude)
| #resetThreshold?: Signal<Time.Milli>; | ||
| #groups: Group[] = []; | ||
| #active?: number; // the active group sequence number | ||
| #high?: { group: number; timestamp: Time.Micro }; // live edge: max delivered ts + its group |
There was a problem hiding this comment.
Should this be bundled with Reset? To keep the related varibles with each other?
There was a problem hiding this comment.
Done — bundled the live edge, the active boundary, and the new discontinuity counter into a single Rewind struct (Rust) / class (JS), so the related state lives together. Reset stays as the per-boundary value type.
(Written by Claude)
|
|
||
| // The active rewind boundary, if any. Out-of-order groups are classified against it | ||
| // so a late new-epoch group is kept while a reneged old-epoch straggler is dropped. | ||
| reset: Option<Reset>, |
There was a problem hiding this comment.
Same comments as the JS side.
There was a problem hiding this comment.
Applied to the Rust side too: dropped reset_threshold + with_reset_threshold/set_reset_threshold, bundled into a Rewind struct, always-on.
(Written by Claude)
| /// | ||
| /// Disabled by default. Set `threshold` above the worst-case cross-group reordering | ||
| /// (B-frames, jitter) so an ordinary stream never trips it. | ||
| pub fn with_reset_threshold(mut self, threshold: std::time::Duration) -> Self { |
There was a problem hiding this comment.
Always on please, don't allow a configurable threshold. If the timestamp goes backwards but groups go forward, reset.
There was a problem hiding this comment.
Done — always-on, no configurable threshold. The trigger is exactly that: a newer group (sequence forward) whose first frame timestamp is strictly < live_edge (timestamp backwards). All the existing tests (B-frames, out-of-order groups, latency skips) still pass, so it does not false-trigger on normal streams.
(Written by Claude)
| return !stale; | ||
| }); | ||
|
|
||
| console.warn(`buffer reset: group timestamps rewound (prevMax ${reset.prevMax}, group ${reset.group})`); |
There was a problem hiding this comment.
We actually have to notify downstream consumers to also reset. Like we need to throw out any media data queued in the audio ring buffer or waiting to be rendered.
Maybe we need to include a discontinuity counter or something?
There was a problem hiding this comment.
Added the discontinuity counter. It bumps on every reset and is exposed via Consumer::discontinuity() (Rust) and the next() return + a discontinuity getter (JS). So the read that returns the first frame of the new timeline also reports the bumped count, which is the signal downstream needs.
I stopped short of wiring it through js/watch in this PR, because that flush is more than a local change: backend.reset() already does the right thing (it flushes the audio ring buffer via audioDecoder.reset() and re-anchors the clock via sync.reset()), but it lives at the backend level, whereas the container Consumer is created per-track inside each decoder. So routing a per-track discontinuity up to the shared audio+video+sync reset is a coordination decision I did not want to guess at.
Two questions before I wire it:
- In the watch decoder loop, when
next()reports a discontinuity bump, should it callbackend.reset()(sync re-anchor + ring flush), or just flush its own decoder/ring locally and leave sync alone? The voice-agent case wants the clock re-anchored, so I lean towardbackend.reset(). - Same treatment for the Rust native render path (the
AudioConsumer/ FFI side), or browser-only for now?
Happy to do the wiring in this PR or as a follow-up once you pick the coordination point.
(Written by Claude)
There was a problem hiding this comment.
Wired it up for audio in d089bf8. The audio decoder reads discontinuity off each consumer.next(), and when it increments it calls this.#ring?.reset() + this.source.sync.reset() before decoding the first frame of the new utterance. That is exactly what MultiBackend.reset() does, now triggered automatically off the wire signal instead of the app having to call element.reset().
I did it synchronously inside the decode loop rather than via a signal up to MultiBackend so the flush is ordered before the new frame is decoded (a signal round-trip would land a microtask later). The decoder already holds this.source.sync, so it can re-anchor the shared clock directly.
Coverage:
- Audio-only (the voice-agent case): fully handled.
- A/V: audio drives the shared
sync.reset(), so both tracks re-anchor; video also drops the reneged groups inside its own container consumer. - Remaining gap: a video-only buffered stream that rewinds has no audio track to trigger the re-anchor. Easy to add the same hook to the video decode loop if you want it now, but I held off since it is niche and the video downstream queue is just a single held frame.
Let me know if you would rather this live in MultiBackend (surface a discontinuity signal off the decoder and have the backend orchestrate reset()) instead of in the decode loop.
(Written by Claude)
There was a problem hiding this comment.
Video done too in 586a92c, mirroring audio. The video decode loop reads discontinuity off next() and on a change re-anchors sync, clears the decode buffer, and clears the playback timestamp.
Two video-specific wrinkles I had to handle that audio did not have:
- Clearing
timestampis load-bearing: the output guard atdecoder.ts:246drops frames whose timestamp is below the current one, so without clearing it the rewound (lower-timestamp) frames would all be late-rejected. - A generation counter (the discontinuity count) is captured when a frame is decoded and re-checked after the
sync.waitin the output callback, so an in-flight frame from the reneged timeline that was parked waiting for its render time is dropped instead of rendering in the middle of the new utterance.
The held frame is left in place so the last picture shows until the new keyframe renders (no black flash).
So now: audio-only, A/V, and video-only buffered rewinds are all handled. Remaining: the Rust native render path (the FFI/AudioConsumer side), which I left out since #1614 is browser. 78 js/watch unit tests pass; tsc + biome clean.
(Written by Claude)
Addresses review feedback: - Always-on: remove the reset_threshold knob (with_reset_threshold / set_reset_threshold, the resetThreshold prop). A reset now fires whenever a newer group's timestamp goes strictly backwards past the live edge, with no configuration. The pre-existing tests (B-frames, out-of-order groups, latency skips) still pass, confirming it does not false-trigger on normal streams. - Bundle the related state (live edge, active boundary, discontinuity count) into one Rewind struct (Rust) / class (JS) so it lives in one place. - Add a discontinuity counter that increments on each reset, exposed via Consumer::discontinuity() (Rust) and the next() return + a getter (JS). Downstream consumers compare it across reads and flush their own decoder / render buffers when it changes. Wiring it through js/watch (the audio ring buffer and sync re-anchor) is a follow-up, since that coordination lives at the backend level above the per-track consumer. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
WalkthroughBoth the Rust ( 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@js/hang/src/container/consumer.ts`:
- Around line 269-317: The checkReset method returns early when liveEdge is
undefined, leaving groups unchecked even when liveEdge is later advanced by the
caller. At line 311 in the checkReset method, replace the timestamp value from
group.latest with start to use the reset timestamp boundary instead of buffered
data. Additionally, when liveEdge is advanced at lines 352-358 in the caller
code (outside checkReset), add logic to recheck any buffered groups against the
new liveEdge boundary to ensure missed resets and discontinuity increments are
not skipped.
In `@rs/moq-mux/src/container/consumer.rs`:
- Around line 355-382: The rewind detection logic in the block starting with let
reset = only checks the newest pending group via self.pending.back_mut().
Instead, iterate through all pending groups from the back (newest to oldest),
polling each group's minimum timestamp, and select the highest-sequence group
that satisfies the rewind condition where min < live_edge. This ensures that if
the newest group has no timestamp or is already forward, we still detect rewinds
in lower-sequence pending groups and select the appropriate one for the Reset
struct.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 44e9d455-e0b2-4c03-9eb2-13aa2d729754
📒 Files selected for processing (2)
js/hang/src/container/consumer.tsrs/moq-mux/src/container/consumer.rs
Wire the container Consumer's discontinuity counter through the audio decoder. When it increments (the publisher rewound the timeline), flush the audio ring buffer and re-anchor the shared sync clock before decoding the first frame of the new utterance. A backwards-timestamp rewind now triggers the same flush as a manual reset(), with no app involvement. Audio drives the shared sync.reset(), so an A/V stream re-anchors both tracks; video additionally drops its reneged groups inside its own container consumer. A video-only buffered stream rewinding is the remaining gap (no audio track to trigger the re-anchor). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Mirror the audio handling for video. The video decode loop reads discontinuity off consumer.next(); on a change it re-anchors the shared sync clock, clears the playback timestamp (otherwise its stale high value late-rejects the rewound, lower-timestamp frames at the output guard), clears the decode buffer, and bumps a generation so in-flight decodes from the reneged timeline are dropped on output instead of rendering mid-utterance. The held frame stays until the new keyframe renders, avoiding a black flash. With both tracks wired, a video-only buffered stream that rewinds is now handled too. The Rust native render path remains a follow-up. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
js/watch/src/video/decoder.ts (1)
249-267: 🎯 Functional Correctness | 🔴 CriticalDecoder reset alone is incomplete; must reconfigure after reset.
The issue is real: chunks queued inside
VideoDecodercan output after#onDiscontinuityis called, inherit the new generation at line 251, and pass the line 267 guard, rendering stale frames. However,VideoDecoder.reset()transitions the decoder to "unconfigured" state and requiresconfigure()to be called again before subsequentdecode()calls are valid. The proposed diff callsreset()but omits the requiredconfigure()call, which would break the decoder.To fix: call
decoder.reset()followed bydecoder.configure()with the original configuration parameters saved from lines 323/407, or refactor to tag chunks at decode submission time instead of output time.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@js/watch/src/video/decoder.ts` around lines 249 - 267, The issue is that calling VideoDecoder.reset() transitions the decoder to an unconfigured state, but the required decoder.configure() call is missing afterward. The VideoDecoder API requires configure() to be called before subsequent decode() calls are valid. To fix this, after calling decoder.reset() in the discontinuity handling, immediately call decoder.configure() using the configuration parameters that were saved from the initial setup (around lines 323 and 407). Alternatively, refactor the approach to tag chunks at the time of decode submission rather than at output time to avoid this reset/configure requirement.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@js/watch/src/video/decoder.ts`:
- Around line 249-267: The issue is that calling VideoDecoder.reset()
transitions the decoder to an unconfigured state, but the required
decoder.configure() call is missing afterward. The VideoDecoder API requires
configure() to be called before subsequent decode() calls are valid. To fix
this, after calling decoder.reset() in the discontinuity handling, immediately
call decoder.configure() using the configuration parameters that were saved from
the initial setup (around lines 323 and 407). Alternatively, refactor the
approach to tag chunks at the time of decode submission rather than at output
time to avoid this reset/configure requirement.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f17fc140-959a-4a74-8cb6-e71656cc7b6f
📒 Files selected for processing (1)
js/watch/src/video/decoder.ts
Addresses CodeRabbit review. Rewind detection only inspected the highest-sequence pending group (Rust) / ran only when a group received a frame (JS), so a rewind in a lower-sequence group could be missed when a higher-sequence group had already caught back up past the live edge (or had no frame yet). The reneged tail would then play without bumping the discontinuity counter. - Rust poll_reset scans all newer pending groups from the back and takes the highest-sequence one that actually rewound. - JS adds #checkBufferedReset (run from next()) to re-check buffered groups once delivery advances the live edge past them, mirroring the scan. - JS anchors the post-reset live edge at the rewind point (start), not the buffered group.latest, matching Rust. Adds reset_detected_behind_forward_newest_group. Also fixes the unrealistic non-monotonic timestamps in zero_latency_skips_aggressively (group 0 moved to ts 0 so its data no longer encodes an unintended rewind). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Summary
Adds always-on rewind detection to the container
Consumer(rs/moq-mux, mirrored in thejs/hangConsumer). When a newer group's timestamps jump strictly backwards past the live edge, the publisher is reneging the buffered tail. The consumer drops the reneged groups, resumes at the rewound timeline, and bumps a discontinuity counter. In the browser watch path, that counter auto-flushes the audio and video buffers and re-anchors the playback clock, so the rewind takes effect end-to-end with no app involvement.Prototype for #1614: a Python voice agent has no way to drop already-written audio when the user interrupts the bot, so the listener keeps hearing the old utterance for hundreds of ms. Rather than a new producer
flush()/cancel(), this handles it on the watch side: a backwards group timestamp is the reneg signal. The existingAudioProducer::reset_epoch()already emits it for a relative timeline (the agent writestimestamp_us=0, so the next group re-anchors below the buffered tail while the group sequence keeps climbing).How it works
Group sequence is monotonic, so the unambiguous tell is sequence climbs while timestamp drops. The subtlety is that groups arrive out of order, so a single sequence floor is wrong: a late new-epoch group can have a lower sequence than the group that triggered detection.
So a reset records a boundary of three values and classifies any group by
(sequence, timestamp):≤ prev_max(old live-edge group)≥ reset_group(detection group)timestamp < reset_timestampThe ambiguous span exists because
prev_max(the old epoch's max among delivered groups) can underestimate the true tail when an old straggler is still in flight. The timestamp clause splits that span: new gap-fillers sit belowreset_timestamp, old stragglers above it.The live edge, the active boundary, and the discontinuity counter are bundled into one
Rewindstruct (Rust) / class (JS).Downstream flush (js/watch)
Clearing the container buffer is not the whole story: the audio ring buffer and the decoded-video pipeline also hold media that must be dropped. Both decoders now read
discontinuityoff eachconsumer.next()and, when it increments, flush before decoding the first frame of the new utterance:sync.reset().sync.reset(), clear the playbacktimestamp(else the output guard late-rejects the rewound lower-timestamp frames), clear the decode buffer, and bump a generation so in-flight decodes from the reneged timeline are dropped on output rather than rendering mid-utterance. The held frame stays until the new keyframe renders (no black flash).This is the same flush a manual
element.reset()did, now automatic. Audio-only, A/V, and video-only buffered rewinds are all handled. The Rust native render path (FFI /AudioConsumer) is not wired yet.Public API
rs/moq-mux:Consumer::discontinuity() -> u64. TheReset/Rewindtypes are private.js/hang:Consumer.discontinuitygetter, plus adiscontinuityfield on thenext()return (additive).Always-on, no configuration. It only fires on a strictly-backwards group timestamp, which does not occur in normal (forward) streams, so existing streams are unaffected — confirmed by the pre-existing consumer tests passing unchanged.
Test plan
cargo test -p moq-mux --lib container::consumer— 35/35 pass, includingreset_classifies_out_of_order_groups(the(55,58,90)example),reset_keeps_out_of_order_new_group(out-of-order gap-fillers kept, assertsdiscontinuity() == 1),backwards_timestamp_resets_buffer,backwards_timestamp_always_resets.clippy -p moq-muxclean;rustfmtclean (system toolchain; nix's tty was erroring locally, so worth ajust fixthrough nix before merge).js/hang+js/watch:tscandbiomeclean; the 78js/watchunit tests pass. The watch wiring is compile- and unit-checked, not yet exercised end-to-end in a browser.Notes for reviewers
main. The branch is forked frommain; the change only fires on a backwards group timestamp (no effect on forward streams). It does establish a consumer-side reneg-via-timestamp convention, so say so if you would rather it settle ondev.MultiBackend, so it is ordered before the new frame is decoded (a signal round-trip would land a microtask later). Easy to move toMultiBackendorchestration if you prefer.(Written by Claude)
🤖 Generated with Claude Code