Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions rs/moq-mux/src/catalog/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,21 @@ impl<S: Stream> Stream for Filter<S> {
Poll::Ready(Ok((emit, epoch))) => {
self.last_epoch = epoch;
self.fresh_input = false;
// End with upstream: if this is the final snapshot (inner already EOF'd),
// drop the retained input so a later filter change can't revive the stream
// after it has emitted its last value.
if inner_eof {
self.last_input = None;
}
Poll::Ready(Ok(Some(emit)))
}
Poll::Ready(Err(_)) => Poll::Ready(Ok(None)),
Poll::Pending => {
if inner_eof && self.last_input.is_none() {
// EOF is terminal: once `inner` is exhausted and there's nothing fresh to
// emit, finish and drop the retained input so a post-EOF setter can't make
// the closure emit again (a still-pending snapshot returns Ready above).
if inner_eof {
self.last_input = None;
Poll::Ready(Ok(None))
} else {
Poll::Pending
Expand Down Expand Up @@ -203,6 +213,22 @@ mod test {
}
}

/// A still-live stream: yields its snapshot once, then parks (never EOFs). Models a
/// real upstream that stays open so post-snapshot retargeting is exercised without
/// tripping the end-with-upstream path.
struct Live(Option<Catalog>);

impl Stream for Live {
type Ext = ();

fn poll_next(&mut self, _: &kio::Waiter) -> Poll<crate::Result<Option<Catalog>>> {
match self.0.take() {
Some(catalog) => Poll::Ready(Ok(Some(catalog))),
None => Poll::Pending,
}
}
}

fn h264(name: &str) -> (String, VideoConfig) {
let mut config = VideoConfig::new(H264 {
profile: 0x42,
Expand Down Expand Up @@ -293,10 +319,29 @@ mod test {
}

#[test]
fn set_video_after_snapshot_reemits() {
fn ends_after_upstream_eof() {
let snapshot = catalog_with(vec![h264("lo"), h264("hi")], vec![]);
let mut f = Filter::new(Once(Some(snapshot)));

// First poll emits the filtered snapshot.
assert!(matches!(f.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(Some(_)))));
// Upstream is exhausted, so the stream ends rather than parking forever.
assert!(matches!(f.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(None))));

// EOF is terminal: a filter change after the end must not revive the stream.
f.set_video(FilterVideo {
name: Some("hi".into()),
..Default::default()
});
assert!(matches!(f.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(None))));
}

#[test]
fn set_video_after_snapshot_reemits() {
// A live (not-yet-EOF) upstream, so the retarget re-applies to the retained snapshot.
let snapshot = catalog_with(vec![h264("lo"), h264("hi")], vec![]);
let mut f = Filter::new(Live(Some(snapshot)));

let first = match f.poll_next(&kio::Waiter::noop()) {
Poll::Ready(Ok(Some(c))) => c,
other => panic!("got {other:?}"),
Expand Down
6 changes: 3 additions & 3 deletions rs/moq-mux/src/catalog/msf/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,11 @@ fn derive_from_codec_config(track: &moq_msf::Track, codec: &AudioCodec, init: by
let mut buf = init;
match codec {
AudioCodec::AAC(_) => {
// AudioSpecificConfig carries valid variable-length extensions (SBR/PS) after
// the core fields, so `parse` consumes the whole buffer; bytes past the core
// fields are legitimate config, not trailing junk.
let cfg =
crate::codec::aac::Config::parse(&mut buf).map_err(|_| Error::MalformedAac(track.name.clone()))?;
if buf.has_remaining() {
return Err(Error::AacTrailingBytes(track.name.clone()).into());
}
Ok(DerivedAudio {
sample_rate: cfg.sample_rate,
channel_count: cfg.channel_count,
Expand Down
3 changes: 0 additions & 3 deletions rs/moq-mux/src/catalog/msf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ pub enum Error {
#[error("MSF audio track {0:?} has malformed OpusHead")]
MalformedOpus(String),

#[error("MSF audio track {0:?} AudioSpecificConfig has trailing bytes")]
AacTrailingBytes(String),

#[error("MSF audio track {0:?} OpusHead has trailing bytes")]
OpusTrailingBytes(String),

Expand Down
44 changes: 43 additions & 1 deletion rs/moq-mux/src/catalog/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,24 @@ impl<S: Stream> Stream for Target<S> {
Poll::Ready(Ok((emit, epoch))) => {
self.last_epoch = epoch;
self.fresh_input = false;
// End with upstream: if this is the final snapshot (inner already EOF'd),
// drop the retained input so a later retarget can't revive the stream after
// it has emitted its last value.
if inner_eof {
self.last_input = None;
}
Poll::Ready(Ok(Some(emit)))
}
Poll::Ready(Err(_)) => {
// Producer dropped (impossible while Self holds it); treat as EOF.
Poll::Ready(Ok(None))
}
Poll::Pending => {
if inner_eof && self.last_input.is_none() {
// EOF is terminal: once `inner` is exhausted and there's nothing fresh to
// emit, finish and drop the retained input so a post-EOF retarget can't make
// the closure emit again (a still-pending snapshot returns Ready above).
if inner_eof {
self.last_input = None;
Poll::Ready(Ok(None))
} else {
Poll::Pending
Expand Down Expand Up @@ -385,10 +395,42 @@ fn best_audio(renditions: &BTreeMap<String, AudioConfig>) -> String {

#[cfg(test)]
mod test {
use std::collections::BTreeMap;

use hang::catalog::{Container, H264, VideoConfig};

use super::*;

/// A one-shot stream: yields its snapshot once, then EOF.
struct Once(Option<Catalog>);

impl Stream for Once {
type Ext = ();

fn poll_next(&mut self, _: &kio::Waiter) -> Poll<crate::Result<Option<Catalog>>> {
Poll::Ready(Ok(self.0.take()))
}
}

/// Once upstream ends and the final selected snapshot is emitted, the stream ends
/// rather than parking forever waiting for a post-EOF retarget.
#[test]
fn ends_after_upstream_eof() {
let mut catalog = Catalog::default();
catalog.video.renditions = BTreeMap::from_iter(vec![vid("only", 640, 360, 500_000)]);

let mut t = Target::new(Once(Some(catalog)));
assert!(matches!(t.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(Some(_)))));
assert!(matches!(t.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(None))));

// EOF is terminal: a retarget after the end must not revive the stream.
t.set_video(TargetVideo {
width: Some(320),
..Default::default()
});
assert!(matches!(t.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(None))));
}

fn vid(name: &str, w: u32, h: u32, bitrate: u64) -> (String, VideoConfig) {
let mut config = VideoConfig::new(H264 {
profile: 0x42,
Expand Down
174 changes: 95 additions & 79 deletions rs/moq-mux/src/codec/aac/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,79 +43,45 @@ impl Config {
/// Parse an AudioSpecificConfig buffer.
///
/// Handles basic formats (object_type < 31), extended formats
/// (object_type == 31), and explicit sample rates (freq_index == 15).
/// Any SBR/PS extension bytes after the core fields are consumed.
/// (object_type == 31), and explicit sample rates (freq_index == 15). The
/// fields are bit-packed and not byte-aligned, so a bit reader is required:
/// with an explicit 24-bit rate the channelConfiguration lands mid-byte after
/// it. Any SBR/PS extension bits after the core fields are consumed.
pub fn parse<T: Buf>(buf: &mut T) -> Result<Self> {
if buf.remaining() < 2 {
return Err(Error::ConfigTooShort);
}

// Read first byte
let b0 = buf.get_u8();
let mut object_type = b0 >> 3;
let freq_index;
let mut reader = BitReader::new(buf);

let (profile, sample_rate, channel_count) = if object_type == 31 {
if buf.remaining() < 2 {
return Err(Error::ExtendedConfigTooShort);
}
// Extended format: next 6 bits are the extended object_type (32-63).
// Bits 5-7 of b0 are the first 3 bits of extended object_type.
let b_ext = buf.get_u8();
// Bits 0-2 of b_ext are the last 3 bits of extended object_type.
let audio_object_type_ext = ((b0 & 0x07) << 3) | ((b_ext >> 5) & 0x07);
object_type = 32 + audio_object_type_ext;
// Bits 3-6 of b_ext are samplingFrequencyIndex (4 bits).
freq_index = (b_ext >> 1) & 0x0F;
// Bit 0 of b_ext is the first bit of channelConfiguration.
let channel_config_high = b_ext & 0x01;

// Read next byte for rest of channelConfiguration.
if buf.remaining() < 1 {
return Err(Error::IncompleteConfig);
}
let b1 = buf.get_u8();
// Bits 5-7 of b1 are the remaining 3 bits of channelConfiguration.
let channel_config = (channel_config_high << 3) | ((b1 >> 5) & 0x07);

let sample_rate = sample_rate_from_index(freq_index, buf)?;
let channel_count = channel_count_from_config(channel_config);

if buf.remaining() > 0 {
buf.advance(buf.remaining());
}
// audioObjectType: 5 bits, escaped to 6 more when it reads 31.
let mut object_type = reader.read(5, Error::ConfigTooShort)? as u8;
if object_type == 31 {
object_type = 32 + reader.read(6, Error::ExtendedConfigTooShort)? as u8;
}

(object_type, sample_rate, channel_count)
// samplingFrequencyIndex: 4 bits; index 15 means an explicit 24-bit rate follows.
let freq_index = reader.read(4, Error::IncompleteConfig)? as u8;
let sample_rate = if freq_index == 15 {
reader.read(24, Error::ExplicitSampleRateTooShort)?
} else {
// Standard format: bits 5-7 of b0 are first 3 bits of freq_index.
let mut freq_index_local = (b0 & 0x07) << 1;

if buf.remaining() < 1 {
return Err(Error::IncompleteConfig);
}
let b1 = buf.get_u8();

// Complete frequency index (bit 7 of b1 is bit 0 of freq_index).
freq_index_local |= (b1 >> 7) & 0x01;
freq_index = freq_index_local;

let channel_config = (b1 >> 3) & 0x0F;

let sample_rate = sample_rate_from_index(freq_index, buf)?;
let channel_count = channel_count_from_config(channel_config);
*SAMPLE_RATES
.get(freq_index as usize)
.ok_or(Error::UnsupportedSampleRateIndex(freq_index))?
};

// AudioSpecificConfig can have variable-length extensions (SBR, PS,
// etc.). We've already extracted the essential info; consume any
// remaining bytes to ensure the buffer is properly advanced.
if buf.remaining() > 0 {
buf.advance(buf.remaining());
}
// channelConfiguration: 4 bits, immediately after the (possibly explicit) rate.
let channel_config = reader.read(4, Error::IncompleteConfig)? as u8;
let channel_count = channel_count_from_config(channel_config);

(object_type, sample_rate, channel_count)
};
// AudioSpecificConfig can carry variable-length extensions (SBR, PS, etc.).
// We've extracted the essential fields; drain the rest so the buffer is advanced.
if buf.remaining() > 0 {
buf.advance(buf.remaining());
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Ok(Self {
profile,
profile: object_type,
sample_rate,
channel_count,
})
Expand Down Expand Up @@ -166,23 +132,48 @@ impl Config {
}
}

fn sample_rate_from_index<T: Buf>(freq_index: u8, buf: &mut T) -> Result<u32> {
const SAMPLE_RATES: [u32; 13] = [
96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350,
];
/// The 13 standard AAC sampling frequencies, indexed by samplingFrequencyIndex
/// (ISO 14496-3 Table 1.18). Index 15 is the escape for an explicit 24-bit rate.
const SAMPLE_RATES: [u32; 13] = [
96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350,
];

/// MSB-first bit reader that pulls bytes from a [`Buf`] on demand.
///
/// AudioSpecificConfig is bit-packed: an explicit 24-bit sample rate pushes the
/// following channelConfiguration off byte boundaries, so the fields can't be
/// read a whole byte at a time.
struct BitReader<'a, T: Buf> {
buf: &'a mut T,
current: u8,
bits_left: u8,
}

if freq_index == 15 {
if buf.remaining() < 3 {
return Err(Error::ExplicitSampleRateTooShort);
impl<'a, T: Buf> BitReader<'a, T> {
fn new(buf: &'a mut T) -> Self {
Self {
buf,
current: 0,
bits_left: 0,
}
let rate_bytes = [buf.get_u8(), buf.get_u8(), buf.get_u8()];
return Ok(((rate_bytes[0] as u32) << 16) | ((rate_bytes[1] as u32) << 8) | (rate_bytes[2] as u32));
}

SAMPLE_RATES
.get(freq_index as usize)
.copied()
.ok_or(Error::UnsupportedSampleRateIndex(freq_index))
/// Read `n` bits (n <= 32) MSB-first, returning `short` if the buffer runs dry.
fn read(&mut self, n: u8, short: Error) -> Result<u32> {
let mut value = 0u32;
for _ in 0..n {
if self.bits_left == 0 {
if !self.buf.has_remaining() {
return Err(short);
}
self.current = self.buf.get_u8();
self.bits_left = 8;
}
self.bits_left -= 1;
value = (value << 1) | u32::from((self.current >> self.bits_left) & 1);
}
Ok(value)
}
}

/// Map an AAC `channel_config` (ISO 14496-3 Table 1.19) to its real channel count.
Expand Down Expand Up @@ -233,11 +224,36 @@ mod tests {
assert_eq!(cfg.channel_count, 2);
}

// TODO: a round-trip test for the explicit-frequency (freq_index=0xF) form
// fails today because the parser reads `channel_config` from byte 1 even
// though ISO 14496-3 §1.6.2.1 puts it *after* the 24-bit explicit sample
// rate. The encoder follows the spec, the parser doesn't. Fixing requires
// a bit-level reader; deferred to a separate PR.
#[test]
fn round_trip_explicit_sample_rate() {
// A non-standard rate (no freq_index) forces the explicit 24-bit form, where
// channelConfiguration lands mid-byte after the rate. A byte-aligned parser
// misreads both fields; the bit reader round-trips them.
let cfg = Config {
profile: 2,
sample_rate: 44_056, // not in the standard table
channel_count: 2,
};
let encoded = cfg.encode();
assert_eq!(encoded.len(), 5, "explicit-rate config is 5 bytes");

let parsed = Config::parse(&mut encoded.as_ref()).unwrap();
assert_eq!(parsed.profile, 2);
assert_eq!(parsed.sample_rate, 44_056);
assert_eq!(parsed.channel_count, 2);
}

#[test]
fn parses_extended_object_type() {
// audioObjectType 31 escapes to a 6-bit extended type. Bytes encode
// AOT=31, ext=4 (-> object_type 36), freq_index=3 (48000), channel_config=2,
// which straddle byte boundaries: 11111 000100 0011 0010 + padding.
let buf: [u8; 3] = [0xF8, 0x86, 0x40];
let cfg = Config::parse(&mut buf.as_slice()).unwrap();
assert_eq!(cfg.profile, 36);
assert_eq!(cfg.sample_rate, 48_000);
assert_eq!(cfg.channel_count, 2);
}

#[test]
fn round_trip_5_1_channels() {
Expand Down
Loading
Loading