moq-lite-05: add TRACK stream, drop SUBSCRIBE_OK/FETCH_OK#1648
Conversation
8941496 to
89a6614
Compare
|
Addressed the TrackProducer/TrackConsumer split feedback in 89a6614. The do-everything
All call sites updated (publish serve fns → (Written by Claude) |
89a6614 to
8d5f311
Compare
|
Heads up: CI also tripped on a brand-new advisory unrelated to this PR — |
8d5f311 to
b96ac13
Compare
|
|
||
| test("Consumer delivers frames from a single group", async () => { | ||
| const track = new Track("test"); | ||
| const track = new TrackSubscriber("test"); |
There was a problem hiding this comment.
Sorry I meant there should be a TrackProducer and TrackConsumer/Subscriber split, just like Rust.
| // IETF carries no per-track publisher properties, so commit defaults up | ||
| // front: this resolves the consumer's track.info() and gives us the write | ||
| // side that incoming object streams are routed into. | ||
| const producer = request.accept(); |
There was a problem hiding this comment.
Not true, should be set based on TRACK_STATUS or SUBSCRIBE_OK (or FETCH_OK).
| if (!published) throw new Error("not found"); | ||
|
|
||
| // Priority is a subscriber concern; 0 is fine for an info-only probe. | ||
| const probe = published.subscribe(track, 0); |
There was a problem hiding this comment.
No we need to have broadcast.track(track).info().await
It triggers a TrackRequest which gets accept(TrackInfo)
|
|
||
| if (emitRange && !startSent) { | ||
| startSent = true; | ||
| await encodeSubscribeResponse(stream, { start: new SubscribeStart(group.sequence) }, this.version); |
There was a problem hiding this comment.
We should rename SUBSCRIBE_START -> SUBSCRIBE_OK (doing it on the draft PR).
And you should return SUBSCRIBE_OK immediately, using the minimum group in cache. If there's nothing in cache, return undefined instead of blocking.
Implements moq-dev/drafts#25 for moq-lite-05-wip. Wire protocol (rs/moq-net, gated to Lite05Wip; older drafts unchanged): - New Track stream (control type 0x6): a TRACK request answered with a single TRACK_INFO (publisher priority, ordered, cache, timescale, compression) then FIN, or reset on error. Properties are immutable and fetched once. - Removed SUBSCRIBE_OK and FETCH_OK. A subscription is accepted implicitly (rejection = stream reset); FETCH returns bare FRAME messages. - Moved the resolved range into SUBSCRIBE_START (0x0) / SUBSCRIBE_END (0x1); SUBSCRIBE_DROP renumbered to 0x2. - Added priority/ordered to crate::TrackInfo (serde-skipped, so the catalog is unchanged) so a relay round-trips them. Subscriber opens a TRACK stream immediately on a track request and accepts the producer up front, then opens SUBSCRIBE / FETCH on demand. The relay drains START/END/DROP from the upstream subscribe stream. JS mirror (js/net): same wire changes, plus a Rust-style track API split: - The do-everything Track is split into TrackProducer (write) and TrackSubscriber (read) sharing a TrackState, plus a lazy TrackConsumer handle from broadcast.track(name) with subscribe() and info() (fetch() is a follow-up PR). - TrackRequest.accept(info) returns a TrackProducer; TrackSubscriber.info() is an async getter the wire commits TRACK_INFO into. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
b96ac13 to
a7417e6
Compare
|
Addressed the review comments in a7417e6:
(Written by Claude) |
Implements moq-dev/drafts#25 for
moq-lite-05-wip. The previousFETCH_OKwas wrong anyway.Targets
dev(wire-protocol change underrs/moq-net, plus breaking JS API changes).Wire protocol (
rs/moq-net, gated toLite05Wip; drafts 01–04 unchanged)0x6): a subscriber sendsTRACK(broadcast path + track name); the publisher replies with a singleTRACK_INFO(Publisher Priority,Ordered,Cache,Timescale,Compression) and FINs, or resets on error. These properties are immutable, so they're fetched once and cached instead of echoed on every response.SUBSCRIBE_OKandFETCH_OK. A subscription is accepted implicitly (rejection = stream reset); a FETCH returns bareFRAMEmessages.SUBSCRIBE_START(0x0) /SUBSCRIBE_END(0x1);SUBSCRIBE_DROPrenumbered to0x2.priority/orderedtocrate::TrackInfo(serde-skipped, so the hang catalog is unchanged) so a relay round-trips the publisher's values.The subscriber opens a TRACK stream immediately on a track request and accepts the producer up front (so downstream subscribers resolve and frames decode without blocking), then opens SUBSCRIBE / FETCH on demand. The relay drains
START/END/DROPfrom the upstream subscribe stream (it previously only waited for FIN).JS mirror (
js/net)Same wire changes, plus a consumer-side API matching Rust:
Trackclass renamed toTrackSubscriber; newTrackConsumerhandle frombroadcast.track(name)withsubscribe()andinfo(). (fetch()is a follow-up PR.)TrackRequest.accept(info): TrackSubscriber(mirrors RustTrackRequest::accept) and an asyncTrack.info(); the consumer wire commits TRACK_INFO so apps never see placeholder defaults.broadcast.track(name).subscribe({ priority }).Cross-package sync
rs/moq-netwire ↔js/net: done.doc/concept(SUBSCRIBE_OK → TRACK_INFO): intentionally deferred while the draft is WIP.fetch()support: follow-up PR (needs a publisher FETCH responder + group cache).Known limitation
The JS publisher resolves a track's real
TRACK_INFOvia a cached probe (broadcast.subscribe(name)→await track.info()). For a media track, the first such lookup briefly constructs and closes aVideoEncoder, becausejs/publishcouplesaccept()andserve(). Bounded (cached, once per track per connection) and lite-05-wip is opt-in.Test plan
just checkgreen: Rust 332 tests + clippy/doc/shear/sort; all JS packages typecheck + test; biome/remark/shfmt/taplo/nixfmt.rs/moq-nativeintegration tests exercise lite-05 end-to-end over WebTransport: timestamp round-trip, bare-FRAME FETCH with Deflate matching TRACK_INFO, concurrent fetch+subscribe.🤖 Generated with Claude Code
(Written by Claude)