diff --git a/rs/moq-mux/src/catalog/filter.rs b/rs/moq-mux/src/catalog/filter.rs index d032397ed..d5ac6e956 100644 --- a/rs/moq-mux/src/catalog/filter.rs +++ b/rs/moq-mux/src/catalog/filter.rs @@ -131,11 +131,21 @@ impl Stream for Filter { Poll::Ready(Ok((emit, epoch))) => { self.last_epoch = epoch; self.fresh_input = false; + // End with upstream: if this is the final snapshot (inner already EOF'd), + // drop the retained input so a later filter change can't revive the stream + // after it has emitted its last value. + if inner_eof { + self.last_input = None; + } Poll::Ready(Ok(Some(emit))) } Poll::Ready(Err(_)) => Poll::Ready(Ok(None)), Poll::Pending => { - if inner_eof && self.last_input.is_none() { + // EOF is terminal: once `inner` is exhausted and there's nothing fresh to + // emit, finish and drop the retained input so a post-EOF setter can't make + // the closure emit again (a still-pending snapshot returns Ready above). + if inner_eof { + self.last_input = None; Poll::Ready(Ok(None)) } else { Poll::Pending @@ -203,6 +213,22 @@ mod test { } } + /// A still-live stream: yields its snapshot once, then parks (never EOFs). Models a + /// real upstream that stays open so post-snapshot retargeting is exercised without + /// tripping the end-with-upstream path. + struct Live(Option); + + impl Stream for Live { + type Ext = (); + + fn poll_next(&mut self, _: &kio::Waiter) -> Poll>> { + match self.0.take() { + Some(catalog) => Poll::Ready(Ok(Some(catalog))), + None => Poll::Pending, + } + } + } + fn h264(name: &str) -> (String, VideoConfig) { let mut config = VideoConfig::new(H264 { profile: 0x42, @@ -293,10 +319,29 @@ mod test { } #[test] - fn set_video_after_snapshot_reemits() { + fn ends_after_upstream_eof() { let snapshot = catalog_with(vec![h264("lo"), h264("hi")], vec![]); let mut f = Filter::new(Once(Some(snapshot))); + // First poll emits the filtered snapshot. + assert!(matches!(f.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(Some(_))))); + // Upstream is exhausted, so the stream ends rather than parking forever. + assert!(matches!(f.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(None)))); + + // EOF is terminal: a filter change after the end must not revive the stream. + f.set_video(FilterVideo { + name: Some("hi".into()), + ..Default::default() + }); + assert!(matches!(f.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(None)))); + } + + #[test] + fn set_video_after_snapshot_reemits() { + // A live (not-yet-EOF) upstream, so the retarget re-applies to the retained snapshot. + let snapshot = catalog_with(vec![h264("lo"), h264("hi")], vec![]); + let mut f = Filter::new(Live(Some(snapshot))); + let first = match f.poll_next(&kio::Waiter::noop()) { Poll::Ready(Ok(Some(c))) => c, other => panic!("got {other:?}"), diff --git a/rs/moq-mux/src/catalog/msf/consumer.rs b/rs/moq-mux/src/catalog/msf/consumer.rs index d0d3e6ca6..72e0beb14 100644 --- a/rs/moq-mux/src/catalog/msf/consumer.rs +++ b/rs/moq-mux/src/catalog/msf/consumer.rs @@ -292,11 +292,11 @@ fn derive_from_codec_config(track: &moq_msf::Track, codec: &AudioCodec, init: by let mut buf = init; match codec { AudioCodec::AAC(_) => { + // AudioSpecificConfig carries valid variable-length extensions (SBR/PS) after + // the core fields, so `parse` consumes the whole buffer; bytes past the core + // fields are legitimate config, not trailing junk. let cfg = crate::codec::aac::Config::parse(&mut buf).map_err(|_| Error::MalformedAac(track.name.clone()))?; - if buf.has_remaining() { - return Err(Error::AacTrailingBytes(track.name.clone()).into()); - } Ok(DerivedAudio { sample_rate: cfg.sample_rate, channel_count: cfg.channel_count, diff --git a/rs/moq-mux/src/catalog/msf/mod.rs b/rs/moq-mux/src/catalog/msf/mod.rs index ebe5ce0d2..065accf74 100644 --- a/rs/moq-mux/src/catalog/msf/mod.rs +++ b/rs/moq-mux/src/catalog/msf/mod.rs @@ -50,9 +50,6 @@ pub enum Error { #[error("MSF audio track {0:?} has malformed OpusHead")] MalformedOpus(String), - #[error("MSF audio track {0:?} AudioSpecificConfig has trailing bytes")] - AacTrailingBytes(String), - #[error("MSF audio track {0:?} OpusHead has trailing bytes")] OpusTrailingBytes(String), diff --git a/rs/moq-mux/src/catalog/target.rs b/rs/moq-mux/src/catalog/target.rs index 61950e84e..5356660b4 100644 --- a/rs/moq-mux/src/catalog/target.rs +++ b/rs/moq-mux/src/catalog/target.rs @@ -143,6 +143,12 @@ impl Stream for Target { Poll::Ready(Ok((emit, epoch))) => { self.last_epoch = epoch; self.fresh_input = false; + // End with upstream: if this is the final snapshot (inner already EOF'd), + // drop the retained input so a later retarget can't revive the stream after + // it has emitted its last value. + if inner_eof { + self.last_input = None; + } Poll::Ready(Ok(Some(emit))) } Poll::Ready(Err(_)) => { @@ -150,7 +156,11 @@ impl Stream for Target { Poll::Ready(Ok(None)) } Poll::Pending => { - if inner_eof && self.last_input.is_none() { + // EOF is terminal: once `inner` is exhausted and there's nothing fresh to + // emit, finish and drop the retained input so a post-EOF retarget can't make + // the closure emit again (a still-pending snapshot returns Ready above). + if inner_eof { + self.last_input = None; Poll::Ready(Ok(None)) } else { Poll::Pending @@ -385,10 +395,42 @@ fn best_audio(renditions: &BTreeMap) -> String { #[cfg(test)] mod test { + use std::collections::BTreeMap; + use hang::catalog::{Container, H264, VideoConfig}; use super::*; + /// A one-shot stream: yields its snapshot once, then EOF. + struct Once(Option); + + impl Stream for Once { + type Ext = (); + + fn poll_next(&mut self, _: &kio::Waiter) -> Poll>> { + Poll::Ready(Ok(self.0.take())) + } + } + + /// Once upstream ends and the final selected snapshot is emitted, the stream ends + /// rather than parking forever waiting for a post-EOF retarget. + #[test] + fn ends_after_upstream_eof() { + let mut catalog = Catalog::default(); + catalog.video.renditions = BTreeMap::from_iter(vec![vid("only", 640, 360, 500_000)]); + + let mut t = Target::new(Once(Some(catalog))); + assert!(matches!(t.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(Some(_))))); + assert!(matches!(t.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(None)))); + + // EOF is terminal: a retarget after the end must not revive the stream. + t.set_video(TargetVideo { + width: Some(320), + ..Default::default() + }); + assert!(matches!(t.poll_next(&kio::Waiter::noop()), Poll::Ready(Ok(None)))); + } + fn vid(name: &str, w: u32, h: u32, bitrate: u64) -> (String, VideoConfig) { let mut config = VideoConfig::new(H264 { profile: 0x42, diff --git a/rs/moq-mux/src/codec/aac/mod.rs b/rs/moq-mux/src/codec/aac/mod.rs index 0360f6c82..e2477911f 100644 --- a/rs/moq-mux/src/codec/aac/mod.rs +++ b/rs/moq-mux/src/codec/aac/mod.rs @@ -43,79 +43,45 @@ impl Config { /// Parse an AudioSpecificConfig buffer. /// /// Handles basic formats (object_type < 31), extended formats - /// (object_type == 31), and explicit sample rates (freq_index == 15). - /// Any SBR/PS extension bytes after the core fields are consumed. + /// (object_type == 31), and explicit sample rates (freq_index == 15). The + /// fields are bit-packed and not byte-aligned, so a bit reader is required: + /// with an explicit 24-bit rate the channelConfiguration lands mid-byte after + /// it. Any SBR/PS extension bits after the core fields are consumed. pub fn parse(buf: &mut T) -> Result { if buf.remaining() < 2 { return Err(Error::ConfigTooShort); } - // Read first byte - let b0 = buf.get_u8(); - let mut object_type = b0 >> 3; - let freq_index; + let mut reader = BitReader::new(buf); - let (profile, sample_rate, channel_count) = if object_type == 31 { - if buf.remaining() < 2 { - return Err(Error::ExtendedConfigTooShort); - } - // Extended format: next 6 bits are the extended object_type (32-63). - // Bits 5-7 of b0 are the first 3 bits of extended object_type. - let b_ext = buf.get_u8(); - // Bits 0-2 of b_ext are the last 3 bits of extended object_type. - let audio_object_type_ext = ((b0 & 0x07) << 3) | ((b_ext >> 5) & 0x07); - object_type = 32 + audio_object_type_ext; - // Bits 3-6 of b_ext are samplingFrequencyIndex (4 bits). - freq_index = (b_ext >> 1) & 0x0F; - // Bit 0 of b_ext is the first bit of channelConfiguration. - let channel_config_high = b_ext & 0x01; - - // Read next byte for rest of channelConfiguration. - if buf.remaining() < 1 { - return Err(Error::IncompleteConfig); - } - let b1 = buf.get_u8(); - // Bits 5-7 of b1 are the remaining 3 bits of channelConfiguration. - let channel_config = (channel_config_high << 3) | ((b1 >> 5) & 0x07); - - let sample_rate = sample_rate_from_index(freq_index, buf)?; - let channel_count = channel_count_from_config(channel_config); - - if buf.remaining() > 0 { - buf.advance(buf.remaining()); - } + // audioObjectType: 5 bits, escaped to 6 more when it reads 31. + let mut object_type = reader.read(5, Error::ConfigTooShort)? as u8; + if object_type == 31 { + object_type = 32 + reader.read(6, Error::ExtendedConfigTooShort)? as u8; + } - (object_type, sample_rate, channel_count) + // samplingFrequencyIndex: 4 bits; index 15 means an explicit 24-bit rate follows. + let freq_index = reader.read(4, Error::IncompleteConfig)? as u8; + let sample_rate = if freq_index == 15 { + reader.read(24, Error::ExplicitSampleRateTooShort)? } else { - // Standard format: bits 5-7 of b0 are first 3 bits of freq_index. - let mut freq_index_local = (b0 & 0x07) << 1; - - if buf.remaining() < 1 { - return Err(Error::IncompleteConfig); - } - let b1 = buf.get_u8(); - - // Complete frequency index (bit 7 of b1 is bit 0 of freq_index). - freq_index_local |= (b1 >> 7) & 0x01; - freq_index = freq_index_local; - - let channel_config = (b1 >> 3) & 0x0F; - - let sample_rate = sample_rate_from_index(freq_index, buf)?; - let channel_count = channel_count_from_config(channel_config); + *SAMPLE_RATES + .get(freq_index as usize) + .ok_or(Error::UnsupportedSampleRateIndex(freq_index))? + }; - // AudioSpecificConfig can have variable-length extensions (SBR, PS, - // etc.). We've already extracted the essential info; consume any - // remaining bytes to ensure the buffer is properly advanced. - if buf.remaining() > 0 { - buf.advance(buf.remaining()); - } + // channelConfiguration: 4 bits, immediately after the (possibly explicit) rate. + let channel_config = reader.read(4, Error::IncompleteConfig)? as u8; + let channel_count = channel_count_from_config(channel_config); - (object_type, sample_rate, channel_count) - }; + // AudioSpecificConfig can carry variable-length extensions (SBR, PS, etc.). + // We've extracted the essential fields; drain the rest so the buffer is advanced. + if buf.remaining() > 0 { + buf.advance(buf.remaining()); + } Ok(Self { - profile, + profile: object_type, sample_rate, channel_count, }) @@ -166,23 +132,48 @@ impl Config { } } -fn sample_rate_from_index(freq_index: u8, buf: &mut T) -> Result { - const SAMPLE_RATES: [u32; 13] = [ - 96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350, - ]; +/// The 13 standard AAC sampling frequencies, indexed by samplingFrequencyIndex +/// (ISO 14496-3 Table 1.18). Index 15 is the escape for an explicit 24-bit rate. +const SAMPLE_RATES: [u32; 13] = [ + 96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, 7350, +]; + +/// MSB-first bit reader that pulls bytes from a [`Buf`] on demand. +/// +/// AudioSpecificConfig is bit-packed: an explicit 24-bit sample rate pushes the +/// following channelConfiguration off byte boundaries, so the fields can't be +/// read a whole byte at a time. +struct BitReader<'a, T: Buf> { + buf: &'a mut T, + current: u8, + bits_left: u8, +} - if freq_index == 15 { - if buf.remaining() < 3 { - return Err(Error::ExplicitSampleRateTooShort); +impl<'a, T: Buf> BitReader<'a, T> { + fn new(buf: &'a mut T) -> Self { + Self { + buf, + current: 0, + bits_left: 0, } - let rate_bytes = [buf.get_u8(), buf.get_u8(), buf.get_u8()]; - return Ok(((rate_bytes[0] as u32) << 16) | ((rate_bytes[1] as u32) << 8) | (rate_bytes[2] as u32)); } - SAMPLE_RATES - .get(freq_index as usize) - .copied() - .ok_or(Error::UnsupportedSampleRateIndex(freq_index)) + /// Read `n` bits (n <= 32) MSB-first, returning `short` if the buffer runs dry. + fn read(&mut self, n: u8, short: Error) -> Result { + let mut value = 0u32; + for _ in 0..n { + if self.bits_left == 0 { + if !self.buf.has_remaining() { + return Err(short); + } + self.current = self.buf.get_u8(); + self.bits_left = 8; + } + self.bits_left -= 1; + value = (value << 1) | u32::from((self.current >> self.bits_left) & 1); + } + Ok(value) + } } /// Map an AAC `channel_config` (ISO 14496-3 Table 1.19) to its real channel count. @@ -233,11 +224,36 @@ mod tests { assert_eq!(cfg.channel_count, 2); } - // TODO: a round-trip test for the explicit-frequency (freq_index=0xF) form - // fails today because the parser reads `channel_config` from byte 1 even - // though ISO 14496-3 ยง1.6.2.1 puts it *after* the 24-bit explicit sample - // rate. The encoder follows the spec, the parser doesn't. Fixing requires - // a bit-level reader; deferred to a separate PR. + #[test] + fn round_trip_explicit_sample_rate() { + // A non-standard rate (no freq_index) forces the explicit 24-bit form, where + // channelConfiguration lands mid-byte after the rate. A byte-aligned parser + // misreads both fields; the bit reader round-trips them. + let cfg = Config { + profile: 2, + sample_rate: 44_056, // not in the standard table + channel_count: 2, + }; + let encoded = cfg.encode(); + assert_eq!(encoded.len(), 5, "explicit-rate config is 5 bytes"); + + let parsed = Config::parse(&mut encoded.as_ref()).unwrap(); + assert_eq!(parsed.profile, 2); + assert_eq!(parsed.sample_rate, 44_056); + assert_eq!(parsed.channel_count, 2); + } + + #[test] + fn parses_extended_object_type() { + // audioObjectType 31 escapes to a 6-bit extended type. Bytes encode + // AOT=31, ext=4 (-> object_type 36), freq_index=3 (48000), channel_config=2, + // which straddle byte boundaries: 11111 000100 0011 0010 + padding. + let buf: [u8; 3] = [0xF8, 0x86, 0x40]; + let cfg = Config::parse(&mut buf.as_slice()).unwrap(); + assert_eq!(cfg.profile, 36); + assert_eq!(cfg.sample_rate, 48_000); + assert_eq!(cfg.channel_count, 2); + } #[test] fn round_trip_5_1_channels() { diff --git a/rs/moq-mux/src/codec/h264/split.rs b/rs/moq-mux/src/codec/h264/split.rs index 0a69129ec..bd52d7698 100644 --- a/rs/moq-mux/src/codec/h264/split.rs +++ b/rs/moq-mux/src/codec/h264/split.rs @@ -138,6 +138,13 @@ impl Split { self.maybe_start_frame(pts)?; } Some(Avc3NalType::IdrSlice) => { + // first_mb_in_slice == 0 (ue(v), so the byte-after-header high bit is set) + // marks the first slice of a new picture: close any access unit still open. + // A bare IDR arriving right after a delta picture in the same chunk would + // otherwise fold both into one frame and mis-flag it a keyframe. + if nal.get(1).ok_or(Error::NalTooShort)? & 0x80 != 0 { + self.maybe_start_frame(pts)?; + } // Adopt this keyframe's inline set (dropping any the new GOP no longer // uses), or re-inject the retained set if the keyframe carried none. crate::codec::annexb::reconcile_keyframe_params( @@ -357,6 +364,39 @@ mod tests { ); } + /// A bare IDR arriving right after a delta picture in the same decode chunk must + /// open its own access unit, not fold into the delta's frame. Without closing the + /// open slice on the IDR's first slice, the two AUs merge and the result is + /// mis-flagged as a keyframe. + #[tokio::test(start_paused = true)] + async fn bare_idr_after_delta_splits() { + let sps: &[u8] = &[0x67, 0x42, 0xc0, 0x1f]; + let pps: &[u8] = &[0x68, 0xce, 0x3c, 0x80]; + let idr: &[u8] = &[0x65, 0x88, 0x84, 0x21]; + // P-slice with first_mb_in_slice set (byte 1 high bit), opening a new AU. + let pslice: &[u8] = &[0x61, 0xe0, 0x12, 0x34]; + // A trailing AUD so the bare IDR is a *complete* NAL during decode. + let aud: &[u8] = &[0x09, 0x10]; + + let mut split = Split::new(); + // One chunk: keyframe, a delta picture, then a bare IDR (no inline params). + let frames = split.decode(&annexb(&[sps, pps, idr, pslice, idr, aud]), ts()).unwrap(); + + // The keyframe and the delta both completed; the second IDR's AU is still buffered. + assert_eq!(frames.len(), 2); + assert!(frames[0].keyframe, "first AU is the keyframe"); + assert!(!frames[1].keyframe, "the delta picture must not be flagged a keyframe"); + // The delta frame holds only its own slice, not a merged keyframe. + assert_eq!(frames[1].payload.as_ref(), annexb(&[pslice]).freeze().as_ref()); + + // Flushing closes the bare IDR as its own self-contained keyframe (params + // re-injected). The trailing AUD opens a fresh slice-less AU that is dropped. + let tail = split.flush(ts()).unwrap(); + assert_eq!(tail.len(), 1); + assert!(tail[0].keyframe); + assert_eq!(tail[0].payload.as_ref(), annexb(&[sps, pps, idr]).freeze().as_ref()); + } + /// A keyframe that presents a smaller parameter set than a prior one reinits /// the retained set: the dropped PPS must not be re-injected on later bare /// keyframes. diff --git a/rs/moq-mux/src/codec/h265/split.rs b/rs/moq-mux/src/codec/h265/split.rs index 185cc2748..f3097f2b5 100644 --- a/rs/moq-mux/src/codec/h265/split.rs +++ b/rs/moq-mux/src/codec/h265/split.rs @@ -151,6 +151,13 @@ impl Split { | NALUnitType::BlaWRadl | NALUnitType::BlaWLp | NALUnitType::CraNut => { + // first_slice_segment_in_pic_flag (bit 7 of the third byte, after the + // 2-byte header) marks the first slice of a new picture: close any access + // unit still open. A bare IDR arriving right after a delta picture in the + // same chunk would otherwise fold both into one frame and mis-flag it a keyframe. + if nal.get(2).ok_or(Error::NalTooShort)? & 0x80 != 0 { + self.maybe_start_frame(pts)?; + } // Adopt this keyframe's inline set (dropping any the new GOP no longer // uses), or re-inject the retained set if the keyframe carried none. crate::codec::annexb::reconcile_keyframe_params( @@ -343,6 +350,37 @@ mod tests { ); } + /// A bare IDR arriving right after a delta picture in the same decode chunk must + /// open its own access unit, not fold into the delta's frame. Without closing the + /// open slice on the IDR's first slice, the two AUs merge and the result is + /// mis-flagged as a keyframe. + #[tokio::test(start_paused = true)] + async fn bare_idr_after_delta_splits() { + // TrailR (type 1) with first_slice_segment_in_pic_flag set (byte 2 high bit). + const TRAIL: &[u8] = &[0x02, 0x01, 0x80, 0x33]; + const AUD: &[u8] = &[0x46, 0x01, 0x50]; // AudNut (type 35) + + let mut split = Split::new(); + // One chunk: keyframe, a delta picture, then a bare IDR (no inline params). + let frames = split + .decode(&annexb(&[VPS, SPS, PPS, IDR, TRAIL, IDR, AUD]), ts()) + .unwrap(); + + assert_eq!(frames.len(), 2); + assert!(frames[0].keyframe, "first AU is the keyframe"); + assert!(!frames[1].keyframe, "the delta picture must not be flagged a keyframe"); + assert_eq!(frames[1].payload.as_ref(), annexb(&[TRAIL]).freeze().as_ref()); + + // Flushing closes the bare IDR as its own self-contained keyframe. + let tail = split.flush(ts()).unwrap(); + assert_eq!(tail.len(), 1); + assert!(tail[0].keyframe); + assert_eq!( + tail[0].payload.as_ref(), + annexb(&[VPS, SPS, PPS, IDR]).freeze().as_ref() + ); + } + /// A keyframe that presents a smaller parameter set than a prior one reinits /// the retained set: the dropped PPS must not be re-injected on later bare /// keyframes. diff --git a/rs/moq-mux/src/container/consumer.rs b/rs/moq-mux/src/container/consumer.rs index 73dd6bda6..16086489a 100644 --- a/rs/moq-mux/src/container/consumer.rs +++ b/rs/moq-mux/src/container/consumer.rs @@ -225,12 +225,21 @@ impl Consumer { // Still blocked on this group, don't skip it yet. Poll::Pending => break, Poll::Ready(Err(e)) => { - // The group was dropped/aborted -- typically it aged out of the relay - // cache (`Error::Old`) while we weren't reading it. Any sequences - // between it and the next buffered group were evicted alongside it, so - // jump straight to that group instead of stepping one-by-one and then - // blocking on a sequence gap of groups that will never arrive. - tracing::warn!(error = ?e, "current group dropped; skipping to next buffered group"); + // Tell a relay group eviction/abort (skip) from a payload decode error + // (propagate). The moq_net group's own terminal state is the source of + // truth: an evicted/aborted group reports the transport error from + // poll_finished, while a malformed payload leaves the group live or + // cleanly finished. A decode error is real and the caller must see it, + // not have the group silently dropped. + if !group.poll_aborted(waiter) { + return Poll::Ready(Err(e)); + } + // The group aged out of the relay cache (`Error::Old`) or was otherwise + // aborted. Any sequences between it and the next buffered group were + // evicted alongside it, so jump straight to that group instead of + // stepping one-by-one and then blocking on a sequence gap of groups + // that will never arrive. + tracing::warn!(error = ?e, "current group evicted; skipping to next buffered group"); self.pending.pop_front(); self.current = self.pending.front().map_or(self.current + 1, |g| g.sequence); } @@ -617,6 +626,15 @@ impl GroupBuffer { Poll::Pending } + + /// True if the group's moq_net stream was reset/aborted (evicted, `Old`, + /// cancelled, ...), as opposed to still live or cleanly finished. Lets the + /// consumer tell a transport eviction from a payload decode error: the former + /// surfaces as a terminal transport error from `poll_finished`, the latter + /// leaves the group readable or finished. + fn poll_aborted(&mut self, waiter: &kio::Waiter) -> bool { + matches!(self.group.poll_finished(waiter), Poll::Ready(Err(_))) + } } impl std::ops::Deref for GroupBuffer { @@ -1308,6 +1326,75 @@ mod tests { assert!(reached.is_ok(), "consumer hung on a missing sequence on a live track"); } + // ---- Decode errors ---- + + /// A container that decodes each frame's payload as an 8-byte LE microsecond + /// timestamp, but treats a `FAIL` payload as a malformed frame. Lets a test put a + /// decodable frame first (so startup selects the group) and a decode failure after. + struct FailingDecode; + + impl ContainerTrait for FailingDecode { + type Error = crate::Error; + + fn write(&self, group: &mut moq_net::GroupProducer, frames: &[Frame]) -> Result<(), Self::Error> { + for frame in frames { + group.write_frame(frame.payload.clone())?; + } + Ok(()) + } + + fn poll_read( + &self, + group: &mut moq_net::GroupConsumer, + waiter: &kio::Waiter, + ) -> Poll>, Self::Error>> { + use bytes::Buf; + + let Some(mut data) = ready!(group.poll_read_frame(waiter)?) else { + return Poll::Ready(Ok(None)); + }; + if data.as_ref() == b"FAIL" { + return Poll::Ready(Err(crate::Error::UnknownFormat("malformed payload".into()))); + } + Poll::Ready(Ok(Some(vec![Frame { + timestamp: ts(data.get_u64_le()), + payload: Bytes::new(), + keyframe: false, + duration: None, + }]))) + } + } + + /// A decode error on a cleanly-finished group must propagate to the caller, not be + /// mistaken for a relay eviction and silently skipped. Eviction-skip only fires when + /// the group's stream was actually aborted. + #[tokio::test] + async fn decode_error_propagates() { + tokio::time::pause(); + let mut track = track_producer("test"); + let consumer_track = track.consume(); + let mut consumer = Consumer::new(consumer_track, FailingDecode); + + // A decodable frame first (so startup selects the group), then a malformed one. + let mut group = track.create_group(moq_net::Group { sequence: 0 }).unwrap(); + group.write_frame(Bytes::from(0u64.to_le_bytes().to_vec())).unwrap(); + group.write_frame(Bytes::from_static(b"FAIL")).unwrap(); + group.finish().unwrap(); + track.finish().unwrap(); + + // The first frame decodes; the malformed second frame must surface as an error. + let first = consumer.read().await; + assert!(matches!(first, Ok(Some(_))), "first frame should decode, got {first:?}"); + + let second = tokio::time::timeout(Duration::from_millis(200), consumer.read()) + .await + .expect("consumer hung on a decode error"); + assert!( + matches!(second, Err(crate::Error::UnknownFormat(_))), + "decode error must propagate, got {second:?}" + ); + } + // ---- Frame Decoding ---- #[tokio::test] diff --git a/rs/moq-mux/src/container/fmp4/import_test.rs b/rs/moq-mux/src/container/fmp4/import_test.rs index 6cbbeefde..6795b5fd8 100644 --- a/rs/moq-mux/src/container/fmp4/import_test.rs +++ b/rs/moq-mux/src/container/fmp4/import_test.rs @@ -247,3 +247,45 @@ async fn test_msf_catalog_roundtrip() { assert_eq!(audio.channel_count, 2); assert!(matches!(audio.container, Container::Cmaf { .. })); } + +// ---- Sample-duration handling in decode() ---- + +fn sample(timestamp_us: u64, keyframe: bool, duration_us: Option) -> crate::container::Frame { + crate::container::Frame { + timestamp: crate::container::Timestamp::from_micros(timestamp_us).unwrap(), + payload: bytes::Bytes::from_static(&[0xDE, 0xAD]), + keyframe, + duration: duration_us.map(|d| crate::container::Timestamp::from_micros(d).unwrap()), + } +} + +/// A multi-sample fragment whose non-final sample carries no duration can't have its +/// DTS reconstructed, so decode rejects it rather than collapsing the timestamps. +#[test] +fn decode_rejects_durationless_multisample() { + let frames = vec![sample(0, true, None), sample(33_000, false, None)]; + let frag = super::encode_fragment(1, 1_000_000, 0, &frames).unwrap(); + let err = super::decode(frag, 1_000_000).unwrap_err(); + assert!(matches!(err, super::Error::MissingSampleDuration), "got {err:?}"); +} + +/// A single-sample fragment needs no duration (nothing follows it), so it still decodes. +#[test] +fn decode_single_sample_no_duration_ok() { + let frag = super::encode_fragment(1, 1_000_000, 0, &[sample(0, true, None)]).unwrap(); + let out = super::decode(frag, 1_000_000).unwrap(); + assert_eq!(out.len(), 1); + assert_eq!(out[0].timestamp.as_micros(), 0); +} + +/// With the durations the producer now backfills, every sample's DTS round-trips +/// through a multi-sample fragment. +#[test] +fn decode_multisample_with_durations_roundtrips() { + let frames = vec![sample(0, true, Some(33_000)), sample(33_000, false, Some(33_000))]; + let frag = super::encode_fragment(1, 1_000_000, 0, &frames).unwrap(); + let out = super::decode(frag, 1_000_000).unwrap(); + assert_eq!(out.len(), 2); + assert_eq!(out[0].timestamp.as_micros(), 0); + assert_eq!(out[1].timestamp.as_micros(), 33_000); +} diff --git a/rs/moq-mux/src/container/fmp4/mod.rs b/rs/moq-mux/src/container/fmp4/mod.rs index 4b43ea8ba..000a42e1c 100644 --- a/rs/moq-mux/src/container/fmp4/mod.rs +++ b/rs/moq-mux/src/container/fmp4/mod.rs @@ -126,6 +126,9 @@ pub enum Error { #[error("audio codec {0} needs a description (AudioSpecificConfig) to synthesize a CMAF init")] MissingAudioDescription(String), + + #[error("multi-sample fragment has a non-final sample with no duration; DTS is unrecoverable")] + MissingSampleDuration, } impl From for Error { @@ -225,9 +228,15 @@ pub(crate) fn decode(data: Bytes, timescale: u64) -> Result> { let default_size = traf.tfhd.default_sample_size; let default_duration = traf.tfhd.default_sample_duration; + // DTS is reconstructed by accumulating each sample's duration. A non-final sample + // with no resolvable duration would leave every following sample stuck at the same + // DTS, silently collapsing their timestamps, so reject that fragment instead. + let total_samples: usize = traf.trun.iter().map(|t| t.entries.len()).sum(); + let mut frames = Vec::new(); let mut offset = 0usize; let mut dts = base_dts; + let mut sample_index = 0usize; for trun in &traf.trun { for entry in &trun.entries { @@ -254,6 +263,14 @@ pub(crate) fn decode(data: Bytes, timescale: u64) -> Result> { // Carry the sample-duration through at the track's scale when present, so // the jitter buffer can use it and an exporter can write it back. let sample_duration = entry.duration.or(default_duration); + + // The last sample needs no duration (nothing follows it to time), but any + // earlier sample without one makes the rest of the fragment's DTS ambiguous. + let is_last = sample_index + 1 == total_samples; + if sample_duration.is_none() && !is_last { + return Err(Error::MissingSampleDuration); + } + let duration = sample_duration .map(|d| Timestamp::from_scale(d as u64, timescale)) .transpose()?; @@ -267,6 +284,7 @@ pub(crate) fn decode(data: Bytes, timescale: u64) -> Result> { offset = end; dts += sample_duration.unwrap_or(0) as u64; + sample_index += 1; } } diff --git a/rs/moq-mux/src/container/producer.rs b/rs/moq-mux/src/container/producer.rs index fb9583c6e..c0ac31c3c 100644 --- a/rs/moq-mux/src/container/producer.rs +++ b/rs/moq-mux/src/container/producer.rs @@ -150,22 +150,30 @@ impl Producer { /// Flush any buffered frames into the current group without closing it. /// - /// `next`, when given, is the timestamp of the frame that rolled the group over - /// (the next keyframe). The buffer's last frame is the only sample whose successor - /// wasn't visible when it arrived, so we backfill its duration from `next` here. - /// This adds no latency: that frame is already in hand. Containers that don't use - /// per-frame durations (Legacy, LOC) ignore it. + /// Backfills the per-sample duration the source didn't provide. A CMAF fragment + /// reconstructs each sample's DTS by accumulating durations, so every non-final + /// sample packed into one fragment needs one or the decoder collapses their + /// timestamps. Frames are in decode order, so a sample's duration is the gap to the + /// next buffered sample; the final sample borrows `next` (the timestamp of the + /// keyframe that rolled the group over), which is already in hand so this adds no + /// latency. Frames that already carry a duration (e.g. fMP4 passthrough) keep it, + /// and a backwards gap (a B-frame whose successor presents earlier) is left unset. + /// Containers that don't use per-frame durations (Legacy, LOC) ignore the field. fn flush(&mut self, next: Option) -> Result<(), C::Error> { if self.buffer.is_empty() { return Ok(()); } - if let Some(next) = next - && let Some(last) = self.buffer.last_mut() - && last.duration.is_none() - && let Ok(duration) = next.checked_sub(last.timestamp) - { - last.duration = Some(duration); + for i in 0..self.buffer.len() { + if self.buffer[i].duration.is_some() { + continue; + } + let boundary = self.buffer.get(i + 1).map(|f| f.timestamp).or(next); + if let Some(boundary) = boundary + && let Ok(duration) = boundary.checked_sub(self.buffer[i].timestamp) + { + self.buffer[i].duration = Some(duration); + } } let group = match &mut self.group { @@ -342,10 +350,11 @@ mod tests { } } - /// The keyframe that rolls a group over backfills the duration of the previous - /// group's last frame, without buffering an extra frame. + /// Every sample batched into one fragment gets a backfilled duration (from the next + /// buffered sample, or the rolling keyframe for the last), so a CMAF decoder can + /// reconstruct each DTS. No extra frame is buffered to learn the final boundary. #[tokio::test] - async fn keyframe_backfills_last_frame_duration() { + async fn keyframe_backfills_batched_durations() { let track = track_producer("test"); let recording = Recording::default(); let mut producer = Producer::new(track, recording.clone()).with_latency(std::time::Duration::from_secs(10)); @@ -358,9 +367,9 @@ mod tests { let writes = recording.0.borrow(); let group0 = &writes[0]; assert_eq!(group0.len(), 2); - // Last frame's duration backfilled from the next keyframe: 66ms - 33ms. + // The first sample's duration is the gap to the next buffered sample: 33ms - 0. + assert_eq!(group0[0].duration, Some(Timestamp::from_micros(33_000).unwrap())); + // The last sample's duration is backfilled from the next keyframe: 66ms - 33ms. assert_eq!(group0[1].duration, Some(Timestamp::from_micros(33_000).unwrap())); - // The earlier frame keeps None; only the trailing sample needs the boundary. - assert_eq!(group0[0].duration, None); } }