Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions rs/hang/examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ async fn run_subscribe(consumer: moq_net::OriginConsumer) -> anyhow::Result<()>

// Read the catalog to discover available tracks.
let catalog_track = broadcast
.consume_track(hang::Catalog::DEFAULT_NAME)
.subscribe(hang::Catalog::default_subscription())
.track(hang::Catalog::DEFAULT_NAME)?
.subscribe(hang::Catalog::default_subscription())?
.await?;
let mut catalog = moq_mux::catalog::hang::Consumer::new(catalog_track);

Expand All @@ -77,11 +77,11 @@ async fn run_subscribe(consumer: moq_net::OriginConsumer) -> anyhow::Result<()>

// Subscribe to the video track.
let track_consumer = broadcast
.consume_track(name)
.track(name)?
.subscribe(moq_net::Subscription {
priority: 1,
..Default::default()
})
})?
.await?;
let mut ordered = moq_mux::container::Consumer::new(track_consumer, moq_mux::catalog::hang::Container::Legacy)
.with_latency(Duration::from_millis(500));
Expand Down
10 changes: 5 additions & 5 deletions rs/hang/examples/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn run_session(origin: moq_net::OriginProducer) -> anyhow::Result<()> {
// The catalog can contain multiple tracks, used by the viewer to choose the best track.
fn create_track(broadcast: &mut moq_net::BroadcastProducer) -> anyhow::Result<moq_net::TrackProducer> {
// Basic information about the video track.
let video_track = moq_net::Track::new("video");
let video_track = "video";

// Example video configuration
// In a real application, you would get this from the encoder
Expand All @@ -61,7 +61,7 @@ fn create_track(broadcast: &mut moq_net::BroadcastProducer) -> anyhow::Result<mo
// Create a map of video renditions
// Multiple renditions allow the viewer to choose based on their capabilities
let mut renditions = std::collections::BTreeMap::new();
renditions.insert(video_track.name.clone(), video_config);
renditions.insert(video_track.to_string(), video_config);

// Create the catalog describing our video track.
let catalog = hang::catalog::Catalog {
Expand All @@ -75,21 +75,21 @@ fn create_track(broadcast: &mut moq_net::BroadcastProducer) -> anyhow::Result<mo
};

// Publish the catalog as a "catalog.json" track in the broadcast.
let mut catalog_track = broadcast.create_track(hang::Catalog::default_track())?;
let mut catalog_track = broadcast.create_track(hang::Catalog::DEFAULT_NAME, hang::Catalog::default_track_info())?;
let mut group = catalog_track.append_group()?;
group.write_frame(catalog.to_string()?)?;
group.finish()?;

// Actually create the media track now.
let track = broadcast.create_track(video_track)?;
let track = broadcast.create_track(video_track, None)?;

Ok(track)
}

// Produce a broadcast and publish it to the origin.
async fn run_broadcast(origin: moq_net::OriginProducer) -> anyhow::Result<()> {
// Create and publish a broadcast to the origin.
let mut broadcast = moq_net::Broadcast::new().produce();
let mut broadcast = moq_net::BroadcastInfo::new().produce();
let track = create_track(&mut broadcast)?;

// NOTE: The path is empty because we're using the URL to scope the broadcast.
Expand Down
9 changes: 6 additions & 3 deletions rs/hang/src/catalog/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,12 @@ impl Catalog {
Ok(serde_json::to_writer(writer, self)?)
}

pub fn default_track() -> moq_net::Track {
// The catalog is JSON and re-sent on every change, so it pays to compress.
moq_net::Track::new(Catalog::DEFAULT_NAME).with_compress(true)
/// Track properties for creating the catalog track via
/// [`create_track`](moq_net::BroadcastProducer::create_track) at
/// [`DEFAULT_NAME`](Self::DEFAULT_NAME). The catalog is JSON and re-sent on
/// every change, so it pays to compress.
pub fn default_track_info() -> moq_net::TrackInfo {
moq_net::TrackInfo::default().with_compress(true)
}

/// The subscription preferences used for the catalog track (high priority so
Expand Down
20 changes: 19 additions & 1 deletion rs/kio/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use crate::{
Counts, State, Weak,
Counts, Mut, State, Weak,
lock::*,
producer::{Producer, Ref},
waiter::*,
Expand Down Expand Up @@ -106,6 +106,24 @@ impl<T> Consumer<T> {
}
}

/// Acquire mutable access to the shared state.
///
/// Returns `Ok(Mut)` if the channel is open, or `Err(Ref)` with
/// read-only access if closed. Only locks once.
pub fn write(&self) -> Result<Mut<'_, T>, Ref<'_, T>> {
let state = self.state.lock();
if state.closed {
Err(Ref { state })
} else {
Ok(Mut::new(state))
}
}

/// Returns `true` if the channel has been closed by the producer.
pub fn is_closed(&self) -> bool {
self.state.lock().closed
}

/// Returns `true` if both consumers share the same underlying state.
pub fn same_channel(&self, other: &Self) -> bool {
self.state.is_clone(&other.state)
Expand Down
14 changes: 7 additions & 7 deletions rs/libmoq/src/consume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl Consume {
tokio::spawn(async move {
let res = async move {
let catalog = broadcast
.consume_track(hang::catalog::Catalog::DEFAULT_NAME)
.subscribe(hang::catalog::Catalog::default_subscription())
.track(hang::catalog::Catalog::DEFAULT_NAME)?
.subscribe(hang::catalog::Catalog::default_subscription())?
.await?;
Self::run_catalog(on_catalog, broadcast.clone(), catalog.into(), channel.1).await
}
Expand Down Expand Up @@ -250,11 +250,11 @@ impl Consume {
tokio::spawn(async move {
let res = async move {
let track = broadcast
.consume_track(&name)
.track(&name)?
.subscribe(moq_net::Subscription {
priority: 1,
..Default::default()
})
})?
.await?;
let track = moq_mux::container::Consumer::new(track, moq_mux::catalog::hang::Container::Legacy)
.with_latency(latency);
Expand Down Expand Up @@ -302,11 +302,11 @@ impl Consume {
tokio::spawn(async move {
let res = async move {
let track = broadcast
.consume_track(&name)
.track(&name)?
.subscribe(moq_net::Subscription {
priority: 2,
..Default::default()
})
})?
.await?;
let track = moq_mux::container::Consumer::new(track, moq_mux::catalog::hang::Container::Legacy)
.with_latency(latency);
Expand Down Expand Up @@ -407,7 +407,7 @@ impl Consume {
// `subscribe` blocks on SUBSCRIBE_OK, so run it inside the task.
tokio::spawn(async move {
let res = async move {
let track = broadcast.consume_track(&name).subscribe(None).await?;
let track = broadcast.track(&name)?.subscribe(None)?.await?;
Self::run_raw(on_frame, track, channel.1).await
}
.await;
Expand Down
4 changes: 2 additions & 2 deletions rs/libmoq/src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Publish {

impl Publish {
pub fn create(&mut self) -> Result<Id, Error> {
let mut broadcast = moq_net::Broadcast::new().produce();
let mut broadcast = moq_net::BroadcastInfo::new().produce();
let catalog = moq_mux::catalog::hang::Producer::new(&mut broadcast)?;

let id = self.broadcasts.insert((broadcast, catalog))?;
Expand Down Expand Up @@ -128,7 +128,7 @@ impl Publish {
/// if you want to describe the track in the catalog as well.
pub fn track(&mut self, broadcast: Id, name: &str) -> Result<Id, Error> {
let (broadcast, _) = self.broadcasts.get_mut(broadcast).ok_or(Error::BroadcastNotFound)?;
let track = broadcast.create_track(moq_net::Track::new(name))?;
let track = broadcast.create_track(name, None)?;
self.tracks.insert(track)
}

Expand Down
2 changes: 1 addition & 1 deletion rs/moq-audio/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl AudioConsumer {
};

let name = name.into();
let track = broadcast.consume_track(&name).subscribe(None).await?;
let track = broadcast.track(&name)?.subscribe(None)?.await?;
let mut track = moq_mux::container::Consumer::new(track, moq_mux::container::legacy::Wire);
if let Some(latency) = output.latency_max {
track = track.with_latency(latency);
Expand Down
6 changes: 4 additions & 2 deletions rs/moq-audio/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ impl AudioProducer {
// Audio hang frames carry microsecond timestamps; advertise that on the
// track so Lite05 subscribers know what scale to expect and the model
// layer accepts Frame::timestamp on append.
let track =
broadcast.create_track(moq_net::Track::new(name.clone()).with_timescale(hang::container::TIMESCALE))?;
let track = broadcast.create_track(
name.clone(),
moq_net::TrackInfo::default().with_timescale(hang::container::TIMESCALE),
)?;
let track = moq_mux::container::Producer::new(track, moq_mux::container::legacy::Wire);

let mut catalog_mut = catalog.clone();
Expand Down
4 changes: 2 additions & 2 deletions rs/moq-audio/tests/roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn f32_bytes(samples: &[f32]) -> Bytes {

#[tokio::test]
async fn opus_round_trip_48k_stereo() {
let mut broadcast = moq_net::Broadcast::new().produce();
let mut broadcast = moq_net::BroadcastInfo::new().produce();
let catalog = moq_mux::catalog::hang::Producer::new(&mut broadcast).unwrap();
let mut catalog_consumer = catalog.consume().unwrap();
let broadcast_consumer = broadcast.consume();
Expand Down Expand Up @@ -110,7 +110,7 @@ async fn opus_round_trip_48k_stereo() {

#[tokio::test]
async fn opus_round_trip_44100_s16_resampled() {
let mut broadcast = moq_net::Broadcast::new().produce();
let mut broadcast = moq_net::BroadcastInfo::new().produce();
let catalog = moq_mux::catalog::hang::Producer::new(&mut broadcast).unwrap();
let mut catalog_consumer = catalog.consume().unwrap();
let broadcast_consumer = broadcast.consume();
Expand Down
2 changes: 1 addition & 1 deletion rs/moq-boy/src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn handle_viewer_commands(
broadcast: moq_net::BroadcastConsumer,
cmd_tx: &tokio::sync::mpsc::Sender<Command>,
) -> anyhow::Result<()> {
let mut track = broadcast.consume_track("command").subscribe(None).await?;
let mut track = broadcast.track("command")?.subscribe(None)?.await?;

while let Some(mut group) = track.recv_group().await? {
while let Some(frame) = group.read_frame().await? {
Expand Down
2 changes: 1 addition & 1 deletion rs/moq-boy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async fn run(config: &Config) -> Result<()> {
let client = config.client.clone().init()?;

// Create the broadcast producer.
let mut broadcast = moq_net::Broadcast::new().produce();
let mut broadcast = moq_net::BroadcastInfo::new().produce();

// Publish origin: the game session broadcast.
let publish_origin = moq_net::Origin::random().produce();
Expand Down
2 changes: 1 addition & 1 deletion rs/moq-boy/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct StatusPublisher {

impl StatusPublisher {
pub fn new(broadcast: &mut moq_net::BroadcastProducer) -> anyhow::Result<Self> {
let producer = broadcast.create_track(moq_net::Track::new("status"))?;
let producer = broadcast.create_track("status", None)?;

Ok(Self {
producer,
Expand Down
2 changes: 1 addition & 1 deletion rs/moq-cli/src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct Publish {

impl Publish {
pub fn new(format: &PublishFormat) -> anyhow::Result<Self> {
let mut broadcast = moq_net::Broadcast::new().produce();
let mut broadcast = moq_net::BroadcastInfo::new().produce();
let catalog = moq_mux::catalog::hang::Producer::new(&mut broadcast)?;

let decoder = match format {
Expand Down
8 changes: 4 additions & 4 deletions rs/moq-ffi/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ impl MoqBroadcastConsumer {
pub async fn subscribe_catalog(&self) -> Result<Arc<MoqCatalogConsumer>, MoqError> {
let track = self
.inner
.consume_track(hang::catalog::Catalog::DEFAULT_NAME)
.subscribe(hang::catalog::Catalog::default_subscription())
.track(hang::catalog::Catalog::DEFAULT_NAME)?
.subscribe(hang::catalog::Catalog::default_subscription())?
.await?;
let consumer = moq_mux::catalog::hang::Consumer::from(track);
Ok(Arc::new(MoqCatalogConsumer {
Expand All @@ -97,7 +97,7 @@ impl MoqBroadcastConsumer {
///
/// Frames are returned as plain byte payloads with no codec or container parsing.
pub async fn subscribe_track(&self, name: String) -> Result<Arc<MoqTrackConsumer>, MoqError> {
let track = self.inner.consume_track(&name).subscribe(None).await?;
let track = self.inner.track(&name)?.subscribe(None)?.await?;
Ok(Arc::new(MoqTrackConsumer::new(track)))
}

Expand All @@ -117,7 +117,7 @@ impl MoqBroadcastConsumer {
let media: moq_mux::catalog::hang::Container = (&container)
.try_into()
.map_err(|e| MoqError::Codec(format!("invalid container: {e}")))?;
let track = self.inner.consume_track(&name).subscribe(None).await?;
let track = self.inner.track(&name)?.subscribe(None)?.await?;
let latency = std::time::Duration::from_millis(max_latency_ms);
let consumer = moq_mux::container::Consumer::new(track, media).with_latency(latency);
Ok(Arc::new(MoqMediaConsumer {
Expand Down
8 changes: 4 additions & 4 deletions rs/moq-ffi/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl MoqBroadcastProducer {
#[uniffi::constructor]
pub fn new() -> Result<Arc<Self>, MoqError> {
let _guard = crate::ffi::RUNTIME.enter();
let mut broadcast = moq_net::Broadcast::new().produce();
let mut broadcast = moq_net::BroadcastInfo::new().produce();
let catalog = moq_mux::catalog::hang::Producer::new(&mut broadcast)?;
Ok(Arc::new(Self {
state: std::sync::Mutex::new(Some(BroadcastProducer { broadcast, catalog })),
Expand Down Expand Up @@ -143,7 +143,7 @@ impl MoqBroadcastProducer {
let state = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
// Clone the broadcast handle (shared Arc internally) to get &mut access.
let mut broadcast = state.broadcast.clone();
let producer = broadcast.create_track(moq_net::Track::new(name))?;
let producer = broadcast.create_track(name, None)?;
Ok(Arc::new(MoqTrackProducer {
inner: std::sync::Mutex::new(Some(producer)),
}))
Expand Down Expand Up @@ -173,7 +173,7 @@ impl MoqTrackProducer {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.inner.lock().unwrap();
let track = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
Ok(track.name.clone())
Ok(track.name().to_string())
}

/// Wait until this track has at least one active consumer.
Expand Down Expand Up @@ -286,7 +286,7 @@ impl MoqMediaProducer {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.inner.lock().unwrap();
let media = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
Ok(media.track.name.clone())
Ok(media.track.name().to_string())
}

/// Wait until this media track has at least one active consumer.
Expand Down
2 changes: 1 addition & 1 deletion rs/moq-gst/src/sink/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ async fn run_session(
let client = client_config.init()?;

let origin = moq_net::Origin::random().produce();
let mut broadcast = moq_net::Broadcast::new().produce();
let mut broadcast = moq_net::BroadcastInfo::new().produce();
let broadcast_consumer = broadcast.consume();

let catalog = moq_mux::catalog::hang::Producer::new(&mut broadcast)?;
Expand Down
8 changes: 4 additions & 4 deletions rs/moq-gst/src/source/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ async fn run_session(
};

let catalog_track = broadcast
.consume_track(hang::catalog::Catalog::DEFAULT_NAME)
.subscribe(hang::catalog::Catalog::default_subscription())
.track(hang::catalog::Catalog::DEFAULT_NAME)?
.subscribe(hang::catalog::Catalog::default_subscription())?
.await?;
let mut catalog = moq_mux::catalog::hang::Consumer::new(catalog_track);
let catalog = catalog.next().await?.context("catalog missing")?.clone();
Expand All @@ -448,7 +448,7 @@ async fn run_session(
};
let caps = video_caps(&config)?;
let endpoint = request_pad(&control_tx, descriptor.clone(), caps).await?;
let track_consumer = broadcast.consume_track(&track_name).subscribe(None).await?;
let track_consumer = broadcast.track(&track_name)?.subscribe(None)?.await?;
let track = moq_mux::container::Consumer::new(track_consumer, moq_mux::catalog::hang::Container::Legacy)
.with_latency(Duration::from_secs(1));
tasks.push(spawn_track_pump(track, descriptor, endpoint, shutdown.clone()));
Expand All @@ -461,7 +461,7 @@ async fn run_session(
};
let caps = audio_caps(&config)?;
let endpoint = request_pad(&control_tx, descriptor.clone(), caps).await?;
let track_consumer = broadcast.consume_track(&track_name).subscribe(None).await?;
let track_consumer = broadcast.track(&track_name)?.subscribe(None)?.await?;
let track = moq_mux::container::Consumer::new(track_consumer, moq_mux::catalog::hang::Container::Legacy)
.with_latency(Duration::from_secs(1));
tasks.push(spawn_track_pump(track, descriptor, endpoint, shutdown.clone()));
Expand Down
6 changes: 3 additions & 3 deletions rs/moq-mux/src/catalog/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ impl Consumer {
Ok(match format {
CatalogFormat::Hang => {
let track = broadcast
.consume_track(hang::Catalog::DEFAULT_NAME)
.subscribe(hang::Catalog::default_subscription())
.track(hang::Catalog::DEFAULT_NAME)?
.subscribe(hang::Catalog::default_subscription())?
.await?;
Self::Hang(super::hang::Consumer::new(track))
}
CatalogFormat::Msf => {
let track = broadcast.consume_track(moq_msf::DEFAULT_NAME).subscribe(None).await?;
let track = broadcast.track(moq_msf::DEFAULT_NAME)?.subscribe(None)?.await?;
Self::Msf(super::msf::Consumer::new(track))
}
})
Expand Down
Loading
Loading