Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rs/libmoq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ int32_t moq_origin_create(void);
int32_t moq_origin_close(uint32_t origin);
int32_t moq_origin_publish(uint32_t origin, const char *path, uintptr_t path_len, uint32_t broadcast);
int32_t moq_origin_consume(uint32_t origin, const char *path, uintptr_t path_len);
int32_t moq_origin_consume_announced(uint32_t origin, const char *path, uintptr_t path_len, void (*on_broadcast)(void *user_data, int32_t broadcast), void *user_data);
int32_t moq_origin_consume_announced_close(uint32_t task);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
int32_t moq_origin_announced(uint32_t origin, void (*on_announce)(void *user_data, int32_t announced), void *user_data);
int32_t moq_origin_announced_info(uint32_t announced, moq_announced *dst);
int32_t moq_origin_announced_close(uint32_t announced);
Expand Down Expand Up @@ -63,6 +65,6 @@ int32_t moq_consume_audio_ordered(uint32_t catalog, uint32_t index, uint64_t max
int32_t moq_consume_audio_close(uint32_t track);

// Consuming: Frames
int32_t moq_consume_frame_chunk(uint32_t frame, uint32_t index, moq_frame *dst);
int32_t moq_consume_frame(uint32_t frame, moq_frame *dst);
int32_t moq_consume_frame_close(uint32_t frame);
```
50 changes: 50 additions & 0 deletions rs/libmoq/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,56 @@ pub unsafe extern "C" fn moq_origin_consume(origin: u32, path: *const c_char, pa
})
}

/// Consume a broadcast from an origin by path, waiting until it is announced.
///
/// Unlike [moq_origin_consume], which fails immediately with a not-found code when the broadcast
/// has not been announced yet, this waits for the announcement to arrive (e.g. over the network)
/// and then delivers the broadcast handle via `on_broadcast`. Use it right after [moq_session_connect]
/// to avoid racing announcement gossip, instead of polling [moq_origin_consume] in a retry loop.
///
/// `on_broadcast` is invoked with a positive broadcast handle once announced, then exactly once
/// more with a terminal code: `0` (the wait finished, including after
/// [moq_origin_consume_announced_close]) or a negative error. After the terminal (`<= 0`) callback,
/// `on_broadcast` is never called again and `user_data` is never touched again, so release
/// `user_data` there. The broadcast handle is usable with [moq_consume_catalog] / [moq_consume_track]
/// and must be freed separately with [moq_consume_close].
///
/// Returns a non-zero handle to the wait on success, or a negative code on (immediate) failure.
///
/// # Safety
/// - The caller must ensure that path is a valid pointer to path_len bytes of data.
/// - The caller must keep `user_data` valid until the terminal (`<= 0`) `on_broadcast` callback.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn moq_origin_consume_announced(
origin: u32,
path: *const c_char,
path_len: usize,
on_broadcast: Option<extern "C" fn(user_data: *mut c_void, broadcast: i32)>,
user_data: *mut c_void,
) -> i32 {
ffi::enter(move || {
let origin = ffi::parse_id(origin)?;
let path = unsafe { ffi::parse_str(path, path_len)? }.to_string();
let on_broadcast = unsafe { ffi::OnStatus::new(user_data, on_broadcast) };
State::lock().origin.consume_announced(origin, path, on_broadcast)
})
}

/// Abort a wait started by [moq_origin_consume_announced].
///
/// Returns immediately: zero on success, or a negative code if already closed. Does NOT free
/// `user_data`. The [moq_origin_consume_announced] `on_broadcast` callback still fires once more
/// with a terminal `0` (or a negative error), and that final callback is where `user_data` should
/// be released. Any broadcast handle already delivered is unaffected and must still be freed with
/// [moq_consume_close].
#[unsafe(no_mangle)]
pub extern "C" fn moq_origin_consume_announced_close(task: u32) -> i32 {
ffi::enter(move || {
let task = ffi::parse_id(task)?;
State::lock().origin.consume_announced_close(task)
})
}

/// Close an origin and clean up its resources.
///
/// Returns a zero on success, or a negative code on failure.
Expand Down
67 changes: 65 additions & 2 deletions rs/libmoq/src/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub struct Origin {

/// Announcement listener tasks. Close signals shutdown; the task delivers a final callback, then removes itself.
announced_task: NonZeroSlab<Option<TaskEntry>>,

/// Pending consume-until-announced tasks. Close signals shutdown; the task delivers a final callback, then removes itself.
consume_task: NonZeroSlab<Option<TaskEntry>>,
}

impl Origin {
Expand Down Expand Up @@ -115,13 +118,73 @@ impl Origin {

pub fn consume<P: moq_net::AsPath>(&mut self, origin: Id, path: P) -> Result<moq_net::BroadcastConsumer, Error> {
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
// TODO: expose an async variant backed by `announced_broadcast` so FFI callers can wait
// for gossip instead of racing it.
// Synchronous lookup races announcement gossip. Use `consume_announced` to wait instead.
// Uses the deprecated direct lookup to avoid the per-call cost of OriginProducer::consume().
#[allow(deprecated)]
origin.get_broadcast(path).ok_or(Error::BroadcastNotFound)
}

/// Wait until the broadcast at `path` is announced, then deliver its handle via the callback.
///
/// The callback fires the broadcast handle (> 0) once announced, then a terminal `0`. On error
/// or cancellation it fires a single terminal code (`0` on close, negative on error). Returns a
/// task handle for cancellation via [`Self::consume_announced_close`].
pub fn consume_announced(&mut self, origin: Id, path: String, on_broadcast: OnStatus) -> Result<Id, Error> {
let origin = self.active.get_mut(origin).ok_or(Error::OriginNotFound)?;
let consumer = origin.consume();
let channel = oneshot::channel();

let entry = TaskEntry {
close: Some(channel.0),
callback: on_broadcast,
};
let id = self.consume_task.insert(Some(entry))?;

tokio::spawn(async move {
let res = Self::run_consume_announced(on_broadcast, consumer, path, channel.1).await;

// Deliver one final terminal callback (code <= 0), then drop the entry.
// Pull it out from under the lock so the callback never runs while held.
let entry = State::lock().origin.consume_task.remove(id).flatten();
if let Some(entry) = entry {
entry.callback.call(res);
}
});

Ok(id)
}

async fn run_consume_announced(
callback: OnStatus,
consumer: moq_net::OriginConsumer,
path: String,
mut close: oneshot::Receiver<()>,
) -> Result<(), Error> {
// `biased` so a pending close always wins over a ready announcement.
let broadcast = tokio::select! {
biased;
_ = &mut close => return Ok(()),
found = consumer.announced_broadcast(path.as_str()) => found.ok_or(Error::BroadcastNotFound)?,
};

// Hold the lock only to buffer the broadcast; release it before the callback.
let broadcast_id = State::lock().consume.start(broadcast)?;
callback.call(broadcast_id);
Ok(())
}

pub fn consume_announced_close(&mut self, task: Id) -> Result<(), Error> {
// Signal shutdown; the task delivers a final callback and removes itself.
self.consume_task
.get_mut(task)
.and_then(|entry| entry.as_mut())
.ok_or(Error::NotFound)?
.close
.take()
.ok_or(Error::NotFound)?;
Ok(())
}

pub fn publish<P: moq_net::AsPath>(
&mut self,
origin: Id,
Expand Down
90 changes: 90 additions & 0 deletions rs/libmoq/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,96 @@ fn local_publish_consume() {
assert_eq!(moq_origin_close(origin), 0);
}

#[test]
fn consume_announced_local() {
let origin = id(moq_origin_create());

// Start waiting before the broadcast exists: the announcement arrives afterwards.
let cb = Callback::new();
let path = b"live";
let _task = id(unsafe {
moq_origin_consume_announced(
origin,
path.as_ptr() as *const c_char,
path.len(),
Some(channel_callback),
cb.ptr,
)
});

let broadcast = id(moq_publish_create());
let init = opus_head();
let format = b"opus";
let media = id(unsafe {
moq_publish_media_ordered(
broadcast,
format.as_ptr() as *const c_char,
format.len(),
init.as_ptr(),
init.len(),
)
});
assert_eq!(
unsafe { moq_origin_publish(origin, path.as_ptr() as *const c_char, path.len(), broadcast) },
0
);

// First the broadcast handle, then a terminal 0 once the wait finishes.
let consume = id(cb.recv());
assert_eq!(cb.recv_terminal(), 0, "wait delivers terminal 0 after the handle");

// The delivered handle behaves like one from moq_origin_consume.
let catalog_cb = Callback::new();
let catalog_task = id(unsafe { moq_consume_catalog(consume, Some(channel_callback), catalog_cb.ptr) });
let catalog_id = id(catalog_cb.recv());

let mut audio_cfg = moq_audio_config {
name: std::ptr::null(),
name_len: 0,
codec: std::ptr::null(),
codec_len: 0,
description: std::ptr::null(),
description_len: 0,
sample_rate: 0,
channel_count: 0,
};
assert_eq!(unsafe { moq_consume_audio_config(catalog_id, 0, &mut audio_cfg) }, 0);
assert_eq!(audio_cfg.sample_rate, 48000);
assert_eq!(audio_cfg.channel_count, 2);

assert_eq!(moq_consume_catalog_free(catalog_id), 0);
assert_eq!(moq_consume_catalog_close(catalog_task), 0);
assert_eq!(catalog_cb.recv_terminal(), 0, "catalog close delivers terminal 0");
assert_eq!(moq_consume_close(consume), 0);
assert_eq!(moq_publish_media_close(media), 0);
assert_eq!(moq_publish_close(broadcast), 0);
assert_eq!(moq_origin_close(origin), 0);
}

#[test]
fn consume_announced_close_cancels() {
let origin = id(moq_origin_create());

// Wait for a broadcast that never arrives, then cancel it.
let cb = Callback::new();
let path = b"never";
let task = id(unsafe {
moq_origin_consume_announced(
origin,
path.as_ptr() as *const c_char,
path.len(),
Some(channel_callback),
cb.ptr,
)
});

assert_eq!(moq_origin_consume_announced_close(task), 0);
assert_eq!(cb.recv_terminal(), 0, "close delivers terminal 0");
assert!(moq_origin_consume_announced_close(task) < 0, "double-close should fail");

assert_eq!(moq_origin_close(origin), 0);
}

#[test]
fn video_publish_consume() {
let origin = id(moq_origin_create());
Expand Down
Loading