moq-lite-05: add AnnounceOk message (responder origin + initial active count)#1573
Conversation
…e count) The moq-lite announce stream stamps the publisher's own origin id onto the trailing hop of every Announce, even though that id is identical for every announce on the session. It also has no initial-sync boundary for Lite03+, so a caller doing get_broadcast() right after connect races the announce gossip. Add an AnnounceOk message (Lite05Wip only), sent once by the publisher right after AnnounceInterest and before any Announce. It carries: - origin: the publisher's session origin id, reported once. In Lite05 the publisher no longer stamps itself onto each hop chain; the subscriber appends the responder origin on receipt, so the stored chain stays identical to Lite04 and loop detection / shortest-path selection are unchanged. A node never stamps its own origin now: the receiver stamps the remote sender's. - active: the count N of currently-active broadcasts, followed by exactly N initial Announce::Active messages. The count lets connect() block until the initial set lands. A SyncLatch fires once every announce-prefix stream has its initial set, generalized across both boundaries (AnnounceInit for Lite01/02, AnnounceOk + N for Lite05); Lite03/04 and no-origin fire immediately. announced_broadcast() is kept for broadcasts that come online after connect. JS mirrors the wire (AnnounceOk, no-self-hop send, receiver re-append) and wires ALPN_05_WIP into connect/accept, which previously had no Lite05 branch. JS does not add connect-blocking: its Connection is pull-based, so the get_broadcast() race does not exist there. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
|
||
| // Lite05+ reports our origin once via AnnounceOk; the subscriber stamps it | ||
| // onto each announce, so we no longer put it in the per-announce hop list. | ||
| const selfHops = this.version === Version.DRAFT_05_WIP ? [] : [this.origin]; |
There was a problem hiding this comment.
Is this needed? Just inline it.
There was a problem hiding this comment.
Inlined. (Written by Claude)
| /// origin, letting `connect()` block past the startup race. The receiver fires | ||
| /// immediately when there is nothing to wait on (no subscribe origin, or a version | ||
| /// without an initial-set boundary). | ||
| pub fn start<S: web_transport_trait::Session>( |
There was a problem hiding this comment.
Can you make this async instead of returning a oneshot?
There was a problem hiding this comment.
Done. Replaced the oneshot with a Setup handle (kio-backed) returned from lite::start; connect() now does setup.ready().await. It can't be a plain async fn start because the server calls start too and must not block on the initial set, so readiness is an awaitable handle rather than folded into start itself. (Written by Claude)
| /// One-shot latch that fires once every announce-prefix stream has received its | ||
| /// initial set (AnnounceInit for Lite01/02, AnnounceOk + N for Lite05). Lets | ||
| /// `connect()` block until the initial broadcasts are available locally. | ||
| pub(super) struct SyncLatch { |
There was a problem hiding this comment.
Maybe this should be more generic. Like call it Setup or something so we can use it to track when extensions are also done being negotiated in the future.
And IDK maybe try to use kio. It wouldn't be that hard to have a Producer/Consumer pair and a poll_ready method that checks when remaining <= 0.
There was a problem hiding this comment.
Also while you're here, add a note to try to use kio instead of tokio primitives for any new async stuff. The goal is to fully support both an async and synchronous (poll-based) API.
There was a problem hiding this comment.
Done. Renamed to Setup (in lite/setup.rs) and documented it as the generic place to track session-setup completion, with extension negotiation called out as a future step. Reworked it onto a kio Producer/Consumer pair with both poll_ready(&Waiter) and async ready() (the latter is implemented via the former). (Written by Claude)
There was a problem hiding this comment.
Added a note to the Setup module doc-comment to prefer kio over tokio primitives for new async state, so we keep both an async and a poll-based API available. Happy to also drop a line in CLAUDE.md if you'd like it somewhere more discoverable. (Written by Claude)
There was a problem hiding this comment.
Renamed it from Setup to Connecting per your follow-up — also avoids confusion with the IETF SETUP message module (setup::Client/setup::Server). Now in rs/moq-net/src/lite/connecting.rs. (Written by Claude)
3c0b16c to
ec78447
Compare
Replace the oneshot-based `SyncLatch` with a generic, kio-backed `Connecting` tracker (rs/moq-net/src/lite/connecting.rs): - `lite::start` returns a `Connecting` handle instead of a raw `oneshot::Receiver`; `connect()` awaits `connecting.ready()`. The server ignores it. - Named `Connecting` (not `Setup`) to avoid confusion with the IETF SETUP message module, and kept generic so future work like extension negotiation can register additional steps before a session is considered connected. - Models readiness with kio's channel close instead of an explicit counter: each in-flight step (one per announce prefix) holds a `ConnectingProducer`, and the session is connected once every producer has been dropped. A prefix drops its producer when its initial set lands (or, on an early error, via scope exit), so a failed prefix can't hang `connect()`. This removes the `arm`/`complete_one` counter and the `PrefixGuard` RAII wrapper entirely. - The producer is threaded through `run -> run_announce -> run_announce_prefix` as a parameter rather than stored on `Subscriber`: the struct is cloned for several long-lived tasks (`bw`, `run_uni`, broadcast serving), and any clone retaining a producer would keep the channel open and hang `connect()`. Only the per-prefix path holds one, so it's released exactly when that prefix's initial set is in. Also inline the JS `selfHops` local in the publisher: the Lite05 branch always omits the hop (subscriber stamps it), the Draft03/04 branch always stamps `[this.origin]`, and only the shared live-update loop keeps the version check. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
ec78447 to
5039cb7
Compare
Catches up with dev's landed work: * `moq-net: runtime Timescale/Timestamp; container::Frame keeps source scale (#1473)` — canonical Timescale(NonZero<u64>) design. * `moq-net: make subscribe_track async, blocking on SUBSCRIBE_OK (#1540)` — the async API the PR originally introduced is now on dev directly. * `moq-net: add Lite05Wip version variant (unadvertised) (#1518)` — the version variant the PR added is on dev as `Lite05Wip`. * `moq-mux: catalog filter/target and Annex-B exporters (#1487)` and the surrounding mux reorganization. * SubscribeOk negotiates `Compression` on Lite05+ (dev's #1538-ish work). * `moq-lite-05: add AnnounceOk message (#1573)`. Almost every distinctive piece of the PR ended up in dev under a separate PR, so this merge takes dev's versions wholesale and keeps only the remaining infrastructure for per-frame timestamps: * `Frame.timestamp: Option<Timestamp>` for the on-wire per-frame field. * `VarInt::from_zigzag` / `to_zigzag` helpers for the signed-delta encoding. * `lite::Version::has_timestamps()` feature gate. * `hang::container::Frame::encode` stamps the moq-net frame timestamp so the wire layer can carry it on Lite05+ once the encoding lands. What's still missing for a complete Lite05 timestamp feature (called out in the PR description for follow-up): * `Track.timescale` / `SubscribeOk.timescale` negotiation. * Lite05 publisher and subscriber zigzag-delta encode/decode in `lite/publisher.rs::serve_group` and `lite/subscriber.rs::run_group`, which currently still match dev's untimed-frame format. * A working `lite05_timestamp_roundtrip` integration test — the test from the previous merge referenced fields that dev redesigned away (`Track.priority`, `Track.timescale`, `with_publish`, `with_consume`, `Announced::expect`) and was deleted here pending a rewrite against dev's current API. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Adds an
AnnounceOkmessage to the moq-lite-05 announce stream, sent once by the publisher right after it readsAnnounceInterestand before anyAnnounce. It does two things:Announce. That id is identical for every announce on a session, so re-encoding it per-message is pure waste. In Lite05 a node no longer stamps its own origin onto a chain; the receiver stamps the remote sender's origin (fromAnnounceOk) on receipt. The stored hop chain is byte-identical to Lite04, so loop detection / shortest-path selection are unchanged.active: N, the count of currently-active broadcasts, followed by exactlyNinitialAnnounce::Active. This gives the announce stream a discrete initial-set boundary (the successor toAnnounceInit).The count lets
connect()block until the initial set has landed, closing the startup race where a synchronousget_broadcast()right after connect could miss broadcasts that were live but not yet gossiped. ASyncLatchfires once every announce-prefix stream has its initial set, generalized across both boundaries —AnnounceInit(Lite01/02) andAnnounceOk + N(Lite05); Lite03/04 have no boundary and resolve immediately.OriginConsumer::announced_broadcast()is kept — it still serves the distinct "wait for a broadcast that comes online after connect" case.This is opt-in:
Lite05Wipis not advertised over ALPN or in the default version set, so nothing negotiates it by default.Why
The trailing self-hop is redundant on every announce, and Lite03+ had no way to know when the initial set was complete.
AnnounceOkfixes both with one message.JS mirror
@moq/netmirrors the wire:AnnounceOkmessage, no-self-hop send, and the subscriber re-appending the responder origin (so theignoreSelffilter still sees the full chain). It also wiresALPN_05_WIPintoconnect/accept, which previously had no Lite05 negotiation branch at all.JS does not add connect-blocking: its
Connectionis pull-based (announced()opens the announce stream lazily andconsume(path)subscribes directly without consulting announcements), so the synchronousget_broadcast()race doesn't exist there.Notes for reviewers
devper the wire-protocol branch convention. Rebased ontodev, which had substantially reworked these files (AnnounceConsumersplit,Session::newgaining publisher/consumer args, restart/REANNOUNCE semantics, its own lite-05 additions). Integrated with all of those:prepare_active_hops, made version-aware so Lite05 skips it.start_announceandrestart_announcetake the responder origin and append it (with MAX_HOPS handling) on the receive side.Test plan
AnnounceOkround-trip / old-version reject / zero-active / zero-origin reject (rs)SyncLatchunit tests: arm/complete, arm(0), guard-fires-on-drop, idempotent completion (rs)integration: lite draft-05-wipend-to-end test exercisesAnnounceOkover the wire (js)just rs check(clippy + fmt + tests) andjust js check(biome + tsc + tests) green🤖 Generated with Claude Code
(Written by Claude)