Skip to content

moq-net: make subscribe_track async, blocking on SUBSCRIBE_OK#1540

Merged
kixelated merged 4 commits into
devfrom
claude/interesting-tu-40c636
Jun 1, 2026
Merged

moq-net: make subscribe_track async, blocking on SUBSCRIBE_OK#1540
kixelated merged 4 commits into
devfrom
claude/interesting-tu-40c636

Conversation

@kixelated

Copy link
Copy Markdown
Collaborator

Summary

Reshapes subscribe_track so a subscription resolves once the publisher confirms it, and separates subscriber-controlled knobs from publisher-owned track properties. (Picks up the idea started in #1439, implemented cleanly on top of dev.)

  • Subscription (new) holds the subscriber wire params: priority, ordered, max_latency, start_group, end_group.
  • Track now holds only immutable publisher properties — just name today; timescale can be added later.
  • BroadcastConsumer::subscribe_track(name: &str, subscription: Subscription) is now async and returns once the publisher accepts. Concurrent subscribers to the same name coalesce onto one request.
  • BroadcastDynamic::requested_track() returns a TrackRequest. The publisher calls accept(Track) -> TrackProducer (creating the producer and unblocking the waiting subscriber) or deny(err). Dropping the request denies with Cancel.
  • lite subscriber: accepts right after SUBSCRIBE_OK, so a downstream subscribe_track resolves exactly when the upstream confirms — woven into the existing linger lifecycle (moq-net: linger upstream subscriptions across consumer churn (moq-lite) #1514).
  • ietf subscriber: accepts immediately, since group data can arrive before SUBSCRIBE_OK.
  • Per-consumer preferences are an Arc<Mutex<Subscription>> updatable via TrackConsumer::update_subscription; TrackProducer::subscription() exposes the aggregate (most-demanding) across all live subscribers, which the relay forwards upstream.
  • conducer::Waiter gained a waker() accessor so poll-based callers (moq-mux export) can drive the async subscribe inside the existing poll model.

Cross-package

  • FFI (moq-ffi / libmoq): keeps its synchronous surface by driving the async subscribe on the runtime (block_on / inside spawned tasks), so the py/swift/kt/go bindings need no regeneration.
  • js/net: no change. It already exposes subscribe(name, priority) with a reactive Track (updatePriority); this change brings the Rust API closer to it, and the wire format is unchanged.
  • docs: updated the Rust example in doc/lib/rs/env/native.md. The Python doc examples are unchanged (FFI API unchanged).

Test plan

  • cargo check --workspace --all-targets clean (no warnings)
  • cargo clippy --workspace --all-targets clean
  • cargo test for moq-net, moq-mux, moq-audio, moq-native, moq-relay — all pass, including the moq-native/moq-relay integration tests that exercise the real SUBSCRIBE_OK round-trip
  • just check full gate (JS/bun, etc.) — not run locally

(Written by Claude)

kixelated and others added 3 commits May 28, 2026 21:36
Split subscriber preferences out of `Track` into a new `Subscription`
{ priority, ordered, max_latency, start_group, end_group }. `Track` now
holds only immutable publisher properties (the name today; timescale
later).

`BroadcastConsumer::subscribe_track(name, subscription)` is now async and
returns once the publisher accepts the subscription. `requested_track()`
hands back a `TrackRequest`; the publisher calls `accept(Track)` (which
creates the producer and unblocks the subscriber) or `deny(err)`.
Concurrent subscribers to the same name coalesce onto one request.

The lite subscriber accepts right after SUBSCRIBE_OK, so a downstream
subscribe resolves exactly when the upstream confirms; this is woven into
the existing linger lifecycle. The IETF subscriber accepts immediately
since group data can arrive before SUBSCRIBE_OK.

Per-consumer preferences live in an Arc<Mutex<Subscription>> updatable via
`TrackConsumer::update_subscription`; `TrackProducer::subscription()`
exposes the aggregate across live subscribers. `conducer::Waiter` gained a
`waker()` accessor so poll-based callers (moq-mux export) can drive the
async subscribe.

FFI (moq-ffi/libmoq) keeps its synchronous surface by driving the async
subscribe on the runtime, so the language bindings need no regeneration.
The wire format is unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…40c636

# Conflicts:
#	rs/hang/examples/subscribe.rs
#	rs/hang/examples/video.rs
#	rs/hang/src/catalog/root.rs
#	rs/libmoq/src/consume.rs
#	rs/moq-audio/src/consumer.rs
#	rs/moq-audio/src/producer.rs
#	rs/moq-boy/src/input.rs
#	rs/moq-boy/src/status.rs
#	rs/moq-ffi/src/consumer.rs
#	rs/moq-ffi/src/producer.rs
#	rs/moq-native/examples/chat.rs
#	rs/moq-native/examples/clock.rs
#	rs/moq-net/src/ietf/publisher.rs
#	rs/moq-net/src/ietf/subscriber.rs
#	rs/moq-net/src/lite/publisher.rs
#	rs/moq-net/src/lite/subscriber.rs
#	rs/moq-net/src/model/broadcast.rs
#	rs/moq-net/src/model/track.rs
#	rs/moq-net/src/stats.rs
#	rs/moq-relay/src/web.rs
…40c636

# Conflicts:
#	rs/libmoq/src/audio.rs
#	rs/libmoq/src/consume.rs
#	rs/moq-mux/src/container/source.rs
#	rs/moq-net/src/ietf/publisher.rs
#	rs/moq-net/src/lite/subscriber.rs
#	rs/moq-net/src/model/broadcast.rs
#	rs/moq-net/src/model/track.rs
#	rs/moq-relay/src/web.rs
Comment thread rs/hang/src/catalog/root.rs Outdated
/// The subscription preferences used for the catalog track (high priority so
/// it preempts media tracks).
pub fn default_subscription() -> moq_net::Subscription {
moq_net::Subscription::new(100)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is confusing. We should remove ::new and require priority as a key, either by constructing the struct or using a builder.

Comment thread rs/moq-ffi/src/audio.rs Outdated
let consumer = moq_audio::AudioConsumer::new(
// `subscribe_track` now blocks on SUBSCRIBE_OK; drive it on the runtime so
// this synchronous FFI method keeps its signature.
let consumer = crate::ffi::RUNTIME.block_on(moq_audio::AudioConsumer::new(

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never use block_on. Change the method to be async.

Comment thread rs/moq-ffi/src/consumer.rs Outdated
let track = self.inner.subscribe_track(&hang::catalog::Catalog::default_track())?;
// `subscribe_track` now blocks on SUBSCRIBE_OK; drive it on the runtime so
// this synchronous FFI method keeps its signature.
let track = crate::ffi::RUNTIME.block_on(self.inner.subscribe_track(

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope

Comment thread rs/moq-mux/src/container/source.rs Outdated

/// Build the (deferred) subscribe future so `ExportSource` constructors stay
/// synchronous and the subscription resolves inside the existing poll loop.
fn subscribe_future(

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super gross, how can we improve it?

You are allowed to change methods and interfaces.

Comment thread rs/moq-net/src/lite/subscriber.rs Outdated
return SessionOutcome::Error(err);
}
};
let (_compression_tx, compression_rx) = tokio::sync::watch::channel(Some(info.compression));

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be compression specific any longer. We could even save the SubscribeOK itself.

And ideally we don't use tokio. Can you use kio Producer/Consumer? The Producer would be in this task (writes the Option) while the Consumer goes in the subscribes map.

Comment thread rs/moq-net/src/model/track.rs Outdated
/// The result is what an upstream relay should forward: highest priority,
/// ordered if anyone wants it, the tightest latency deadline, the earliest
/// start group, and the latest end group (unbounded wins).
fn aggregate_subscription(&mut self) -> Subscription {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Option instead of default

Comment thread rs/moq-net/src/model/track.rs Outdated
///
/// The preferences feed the producer's [`Self::subscription`] aggregate and
/// can be changed later via [`TrackConsumer::update_subscription`].
pub fn consume_with(&self, subscription: Subscription) -> TrackConsumer {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this to subscribe

Comment thread rs/moq-net/src/model/track.rs Outdated
}

/// Create a new consumer for the track with default ([`Subscription::default`]) preferences.
pub fn consume(&self) -> TrackConsumer {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe_default?

Comment thread rs/moq-net/src/model/track.rs Outdated
}

/// Replace this subscriber's preferences, updating the producer's aggregate.
pub fn update_subscription(&self, subscription: Subscription) {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update

Comment thread rs/moq-net/src/stats.rs
priority: 0,
..Default::default()
}) {
match broadcast.create_track(Track::new(name)) {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can create_track take impl Into<Track>, so Track::new is optional?

…rename

subscribe_track is no longer async: it returns a `TrackPending` that you can
`poll_ok(waiter)` (kio-style, no forced await) or `ok().await`. The resolver
channels use kio Producer/Consumer instead of tokio oneshot, and the lite
subscriber carries the whole SUBSCRIBE_OK through a kio channel instead of a
tokio watch (no longer compression-specific).

This lets the moq-mux export source drive the subscribe with `poll_ok` directly
(removing the Box<dyn Future> + Context::from_waker hack and the now-unused
`kio::Waiter::waker`), and lets moq-ffi drop every `block_on`: the subscribe_*
methods are now async (uniffi async export). The consumer is created at accept
time so it counts toward the producer immediately (fixes a no-linger cancel
race on lite-01/02).

Renames per review: Subscription.max_latency -> stale, start_group ->
group_start, end_group -> group_end; remove Subscription::new (construct the
struct); TrackProducer::consume_with -> subscribe, consume -> subscribe_default,
subscription() now returns Option; TrackConsumer::update_subscription -> update;
create_track takes `impl Into<Track>` (+ From<&str>/From<String>).

Bindings (py/swift) updated to await the now-async FFI subscribe methods;
Kotlin's suspend wrapper is transparent.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@kixelated

Copy link
Copy Markdown
Collaborator Author

Addressed the review in 8b77d931:

subscribe_track returns a pollable handle (no forced await). It's now sync and returns a TrackPending; call poll_ok(waiter) (kio-style) or ok().await. moq-mux's ExportSource uses poll_ok directly now — the Box<dyn Future> + Context::from_waker hack and the unused kio::Waiter::waker are gone.

No more block_on. The moq-ffi subscribe_catalog/subscribe_track/subscribe_media/subscribe_audio methods are async (uniffi async export). The only block_on left in ffi/libmoq are the runtime-keeper thread and a test helper.

More kio, less tokio.

  • Broadcast resolver channels: tokio oneshot → kio Producer/Consumer.
  • lite subscriber: the tokio watch<Option<Compression>> is now a kio Consumer<Option<SubscribeOk>> — the whole SUBSCRIBE_OK is carried, not just compression. The producer writes it in the subscribe task; the consumer lives in the subscribes map.

Renames / API:

  • Subscription: max_latencystale (with the reorder-tolerance doc you described), start_group/end_groupgroup_start/group_end, removed Subscription::new (construct the struct).
  • TrackProducer: consume_withsubscribe, consumesubscribe_default, subscription() now returns Option.
  • TrackConsumer::update_subscriptionupdate.
  • create_track takes impl Into<Track> (+ From<&str>/From<String>), so create_track("video") works.

One subtlety worth flagging: the consumer is now built at accept time (delivered through the channel) rather than lazily on poll, so it counts toward the producer immediately. Building it lazily caused a no-linger cancel race on lite-01/02 (the publisher saw unused() before the subscriber materialized its consumer).

Bindings: py + swift wrappers updated to await the now-async subscribe methods; Kotlin's suspend wrapper is transparent. js/net unchanged (wire format untouched).

cargo fmt/clippy clean; moq-net/mux/native/relay/libmoq/rtc/audio/kio tests all pass.

(Written by Claude)

@kixelated kixelated merged commit 6ff1d7a into dev Jun 1, 2026
6 checks passed
@kixelated kixelated deleted the claude/interesting-tu-40c636 branch June 1, 2026 15:34
kixelated added a commit that referenced this pull request Jun 1, 2026
Catches up with dev's landed work:

* `moq-net: runtime Timescale/Timestamp; container::Frame keeps source scale (#1473)` —
  canonical Timescale(NonZero<u64>) design.
* `moq-net: make subscribe_track async, blocking on SUBSCRIBE_OK (#1540)` —
  the async API the PR originally introduced is now on dev directly.
* `moq-net: add Lite05Wip version variant (unadvertised) (#1518)` —
  the version variant the PR added is on dev as `Lite05Wip`.
* `moq-mux: catalog filter/target and Annex-B exporters (#1487)` and the
  surrounding mux reorganization.
* SubscribeOk negotiates `Compression` on Lite05+ (dev's #1538-ish work).
* `moq-lite-05: add AnnounceOk message (#1573)`.

Almost every distinctive piece of the PR ended up in dev under a separate
PR, so this merge takes dev's versions wholesale and keeps only the
remaining infrastructure for per-frame timestamps:

* `Frame.timestamp: Option<Timestamp>` for the on-wire per-frame field.
* `VarInt::from_zigzag` / `to_zigzag` helpers for the signed-delta encoding.
* `lite::Version::has_timestamps()` feature gate.
* `hang::container::Frame::encode` stamps the moq-net frame timestamp so
  the wire layer can carry it on Lite05+ once the encoding lands.

What's still missing for a complete Lite05 timestamp feature (called out
in the PR description for follow-up):

* `Track.timescale` / `SubscribeOk.timescale` negotiation.
* Lite05 publisher and subscriber zigzag-delta encode/decode in
  `lite/publisher.rs::serve_group` and `lite/subscriber.rs::run_group`,
  which currently still match dev's untimed-frame format.
* A working `lite05_timestamp_roundtrip` integration test — the test from
  the previous merge referenced fields that dev redesigned away
  (`Track.priority`, `Track.timescale`, `with_publish`, `with_consume`,
  `Announced::expect`) and was deleted here pending a rewrite against
  dev's current API.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant