diff --git a/rs/libmoq/README.md b/rs/libmoq/README.md index 726fc5da4..cbcc195b5 100644 --- a/rs/libmoq/README.md +++ b/rs/libmoq/README.md @@ -33,6 +33,8 @@ int32_t moq_origin_create(void); int32_t moq_origin_close(uint32_t origin); int32_t moq_origin_publish(uint32_t origin, const char *path, uintptr_t path_len, uint32_t broadcast); int32_t moq_origin_consume(uint32_t origin, const char *path, uintptr_t path_len); +int32_t moq_origin_consume_announced(uint32_t origin, const char *path, uintptr_t path_len, void (*on_broadcast)(void *user_data, int32_t broadcast), void *user_data); +int32_t moq_origin_consume_announced_close(uint32_t task); int32_t moq_origin_announced(uint32_t origin, void (*on_announce)(void *user_data, int32_t announced), void *user_data); int32_t moq_origin_announced_info(uint32_t announced, moq_announced *dst); int32_t moq_origin_announced_close(uint32_t announced); @@ -63,6 +65,6 @@ int32_t moq_consume_audio_ordered(uint32_t catalog, uint32_t index, uint64_t max int32_t moq_consume_audio_close(uint32_t track); // Consuming: Frames -int32_t moq_consume_frame_chunk(uint32_t frame, uint32_t index, moq_frame *dst); +int32_t moq_consume_frame(uint32_t frame, moq_frame *dst); int32_t moq_consume_frame_close(uint32_t frame); ``` diff --git a/rs/libmoq/src/api.rs b/rs/libmoq/src/api.rs index d9674e454..88cf82ec5 100644 --- a/rs/libmoq/src/api.rs +++ b/rs/libmoq/src/api.rs @@ -292,6 +292,56 @@ pub unsafe extern "C" fn moq_origin_consume(origin: u32, path: *const c_char, pa }) } +/// Consume a broadcast from an origin by path, waiting until it is announced. +/// +/// Unlike [moq_origin_consume], which fails immediately with a not-found code when the broadcast +/// has not been announced yet, this waits for the announcement to arrive (e.g. over the network) +/// and then delivers the broadcast handle via `on_broadcast`. Use it right after [moq_session_connect] +/// to avoid racing announcement gossip, instead of polling [moq_origin_consume] in a retry loop. +/// +/// `on_broadcast` is invoked with a positive broadcast handle once announced, then exactly once +/// more with a terminal code: `0` (the wait finished, including after +/// [moq_origin_consume_announced_close]) or a negative error. After the terminal (`<= 0`) callback, +/// `on_broadcast` is never called again and `user_data` is never touched again, so release +/// `user_data` there. The broadcast handle is usable with [moq_consume_catalog] / [moq_consume_track] +/// and must be freed separately with [moq_consume_close]. +/// +/// Returns a non-zero handle to the wait on success, or a negative code on (immediate) failure. +/// +/// # Safety +/// - The caller must ensure that path is a valid pointer to path_len bytes of data. +/// - The caller must keep `user_data` valid until the terminal (`<= 0`) `on_broadcast` callback. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn moq_origin_consume_announced( + origin: u32, + path: *const c_char, + path_len: usize, + on_broadcast: Option, + user_data: *mut c_void, +) -> i32 { + ffi::enter(move || { + let origin = ffi::parse_id(origin)?; + let path = unsafe { ffi::parse_str(path, path_len)? }.to_string(); + let on_broadcast = unsafe { ffi::OnStatus::new(user_data, on_broadcast) }; + State::lock().origin.consume_announced(origin, path, on_broadcast) + }) +} + +/// Abort a wait started by [moq_origin_consume_announced]. +/// +/// Returns immediately: zero on success, or a negative code if already closed. Does NOT free +/// `user_data`. The [moq_origin_consume_announced] `on_broadcast` callback still fires once more +/// with a terminal `0` (or a negative error), and that final callback is where `user_data` should +/// be released. Any broadcast handle already delivered is unaffected and must still be freed with +/// [moq_consume_close]. +#[unsafe(no_mangle)] +pub extern "C" fn moq_origin_consume_announced_close(task: u32) -> i32 { + ffi::enter(move || { + let task = ffi::parse_id(task)?; + State::lock().origin.consume_announced_close(task) + }) +} + /// Close an origin and clean up its resources. /// /// Returns a zero on success, or a negative code on failure. diff --git a/rs/libmoq/src/origin.rs b/rs/libmoq/src/origin.rs index 4a040f215..c7fb4ec75 100644 --- a/rs/libmoq/src/origin.rs +++ b/rs/libmoq/src/origin.rs @@ -30,6 +30,9 @@ pub struct Origin { /// Announcement listener tasks. Close signals shutdown; the task delivers a final callback, then removes itself. announced_task: NonZeroSlab>, + + /// Pending consume-until-announced tasks. Close signals shutdown; the task delivers a final callback, then removes itself. + consume_task: NonZeroSlab>, } impl Origin { @@ -115,13 +118,73 @@ impl Origin { pub fn consume(&mut self, origin: Id, path: P) -> Result { let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?; - // TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait - // for gossip instead of racing it. + // Synchronous lookup races announcement gossip. Use `consume_announced` to wait instead. // Uses the deprecated direct lookup to avoid the per-call cost of OriginProducer::consume(). #[allow(deprecated)] origin.get_broadcast(path).ok_or(Error::BroadcastNotFound) } + /// Wait until the broadcast at `path` is announced, then deliver its handle via the callback. + /// + /// The callback fires the broadcast handle (> 0) once announced, then a terminal `0`. On error + /// or cancellation it fires a single terminal code (`0` on close, negative on error). Returns a + /// task handle for cancellation via [`Self::consume_announced_close`]. + pub fn consume_announced(&mut self, origin: Id, path: String, on_broadcast: OnStatus) -> Result { + let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?; + let consumer = origin.consume(); + let channel = oneshot::channel(); + + let entry = TaskEntry { + close: Some(channel.0), + callback: on_broadcast, + }; + let id = self.consume_task.insert(Some(entry))?; + + tokio::spawn(async move { + let res = Self::run_consume_announced(on_broadcast, consumer, path, channel.1).await; + + // Deliver one final terminal callback (code <= 0), then drop the entry. + // Pull it out from under the lock so the callback never runs while held. + let entry = State::lock().origin.consume_task.remove(id).flatten(); + if let Some(entry) = entry { + entry.callback.call(res); + } + }); + + Ok(id) + } + + async fn run_consume_announced( + callback: OnStatus, + consumer: moq_net::OriginConsumer, + path: String, + mut close: oneshot::Receiver<()>, + ) -> Result<(), Error> { + // `biased` so a pending close always wins over a ready announcement. + let broadcast = tokio::select! { + biased; + _ = &mut close => return Ok(()), + found = consumer.announced_broadcast(path.as_str()) => found.ok_or(Error::BroadcastNotFound)?, + }; + + // Hold the lock only to buffer the broadcast; release it before the callback. + let broadcast_id = State::lock().consume.start(broadcast)?; + callback.call(broadcast_id); + Ok(()) + } + + pub fn consume_announced_close(&mut self, task: Id) -> Result<(), Error> { + // Signal shutdown; the task delivers a final callback and removes itself. + self.consume_task + .get_mut(task) + .and_then(|entry| entry.as_mut()) + .ok_or(Error::NotFound)? + .close + .take() + .ok_or(Error::NotFound)?; + Ok(()) + } + pub fn publish( &mut self, origin: Id, diff --git a/rs/libmoq/src/test.rs b/rs/libmoq/src/test.rs index a5e72f60b..f9e3e4ceb 100644 --- a/rs/libmoq/src/test.rs +++ b/rs/libmoq/src/test.rs @@ -654,6 +654,96 @@ fn local_publish_consume() { assert_eq!(moq_origin_close(origin), 0); } +#[test] +fn consume_announced_local() { + let origin = id(moq_origin_create()); + + // Start waiting before the broadcast exists: the announcement arrives afterwards. + let cb = Callback::new(); + let path = b"live"; + let _task = id(unsafe { + moq_origin_consume_announced( + origin, + path.as_ptr() as *const c_char, + path.len(), + Some(channel_callback), + cb.ptr, + ) + }); + + let broadcast = id(moq_publish_create()); + let init = opus_head(); + let format = b"opus"; + let media = id(unsafe { + moq_publish_media_ordered( + broadcast, + format.as_ptr() as *const c_char, + format.len(), + init.as_ptr(), + init.len(), + ) + }); + assert_eq!( + unsafe { moq_origin_publish(origin, path.as_ptr() as *const c_char, path.len(), broadcast) }, + 0 + ); + + // First the broadcast handle, then a terminal 0 once the wait finishes. + let consume = id(cb.recv()); + assert_eq!(cb.recv_terminal(), 0, "wait delivers terminal 0 after the handle"); + + // The delivered handle behaves like one from moq_origin_consume. + let catalog_cb = Callback::new(); + let catalog_task = id(unsafe { moq_consume_catalog(consume, Some(channel_callback), catalog_cb.ptr) }); + let catalog_id = id(catalog_cb.recv()); + + let mut audio_cfg = moq_audio_config { + name: std::ptr::null(), + name_len: 0, + codec: std::ptr::null(), + codec_len: 0, + description: std::ptr::null(), + description_len: 0, + sample_rate: 0, + channel_count: 0, + }; + assert_eq!(unsafe { moq_consume_audio_config(catalog_id, 0, &mut audio_cfg) }, 0); + assert_eq!(audio_cfg.sample_rate, 48000); + assert_eq!(audio_cfg.channel_count, 2); + + assert_eq!(moq_consume_catalog_free(catalog_id), 0); + assert_eq!(moq_consume_catalog_close(catalog_task), 0); + assert_eq!(catalog_cb.recv_terminal(), 0, "catalog close delivers terminal 0"); + assert_eq!(moq_consume_close(consume), 0); + assert_eq!(moq_publish_media_close(media), 0); + assert_eq!(moq_publish_close(broadcast), 0); + assert_eq!(moq_origin_close(origin), 0); +} + +#[test] +fn consume_announced_close_cancels() { + let origin = id(moq_origin_create()); + + // Wait for a broadcast that never arrives, then cancel it. + let cb = Callback::new(); + let path = b"never"; + let task = id(unsafe { + moq_origin_consume_announced( + origin, + path.as_ptr() as *const c_char, + path.len(), + Some(channel_callback), + cb.ptr, + ) + }); + + assert_eq!(moq_origin_consume_announced_close(task), 0); + assert_eq!(cb.recv_terminal(), 0, "close delivers terminal 0"); + assert!(moq_origin_consume_announced_close(task) < 0, "double-close should fail"); + + assert_eq!(moq_origin_close(origin), 0); +} + #[test] fn video_publish_consume() { let origin = id(moq_origin_create());