From c4d6ea39472f249f6ab13e164618e1a0037193b6 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Fri, 26 Jun 2026 11:10:05 -0700 Subject: [PATCH] Add zero-copy frame bytes --- .github/justfile | 2 +- demo/boy/justfile | 2 +- demo/justfile | 4 +- demo/pub/justfile | 3 +- demo/relay/justfile | 3 +- demo/sub/justfile | 2 +- demo/web/justfile | 1 + flake.nix | 88 +++++++-------- go/justfile | 5 + infra/apt/justfile | 2 +- infra/justfile | 2 +- infra/rpm/justfile | 2 +- js/justfile | 1 + justfile | 4 +- kt/justfile | 1 + nix/modules/moq-relay.nix | 73 +++++++------ py/justfile | 6 + rs/justfile | 2 + rs/moq-mux/src/codec/aac/import.rs | 4 +- rs/moq-mux/src/codec/h264/mod.rs | 7 +- rs/moq-mux/src/codec/legacy.rs | 4 +- rs/moq-mux/src/codec/opus/import.rs | 4 +- rs/moq-mux/src/codec/vp8/import.rs | 11 +- rs/moq-mux/src/codec/vp9/import.rs | 11 +- rs/moq-mux/src/import/track.rs | 8 +- rs/moq-native/src/error.rs | 4 +- rs/moq-native/src/server.rs | 2 +- rs/moq-net/src/coding/reader.rs | 20 +--- rs/moq-net/src/coding/writer.rs | 13 ++- rs/moq-net/src/ietf/adapter.rs | 31 ++++++ rs/moq-net/src/ietf/publisher.rs | 7 +- rs/moq-net/src/ietf/subscriber.rs | 10 +- rs/moq-net/src/lite/publisher.rs | 27 +++-- rs/moq-net/src/lite/subscriber.rs | 12 +- rs/moq-net/src/model/bytes.rs | 76 +++++++++++++ rs/moq-net/src/model/frame.rs | 164 ++++++++++++++++++++-------- rs/moq-net/src/model/group.rs | 9 +- rs/moq-net/src/model/mod.rs | 2 + rs/moq-net/src/model/track.rs | 8 +- swift/justfile | 1 + test/justfile | 7 +- 41 files changed, 425 insertions(+), 220 deletions(-) create mode 100644 rs/moq-net/src/model/bytes.rs diff --git a/.github/justfile b/.github/justfile index 137021cfc..28cd55767 100644 --- a/.github/justfile +++ b/.github/justfile @@ -1,4 +1,4 @@ -set fallback +set fallback := true # Lint GitHub Actions workflows. Silently skipped if actionlint is not installed. check: diff --git a/demo/boy/justfile b/demo/boy/justfile index 1a3947045..a9b1f973c 100644 --- a/demo/boy/justfile +++ b/demo/boy/justfile @@ -1,4 +1,4 @@ -set fallback +set fallback := true # Run the GB demo: relay + emulator publisher + web viewer. default: diff --git a/demo/justfile b/demo/justfile index c3d009716..5eb811bd5 100644 --- a/demo/justfile +++ b/demo/justfile @@ -1,4 +1,4 @@ -set fallback +set fallback := true mod boy mod pub @@ -10,6 +10,7 @@ mod web # # Picks the first free port at/after 4443 (both UDP for QUIC and TCP for HTTP) # so multiple worktrees can run `just dev` without colliding. The relay reads + # the port from MOQ_SERVER_BIND/MOQ_WEB_HTTP_LISTEN, overriding localhost.toml. default: #!/usr/bin/env bash @@ -33,6 +34,7 @@ default: # Find the first free port at/after `start` (checked on both TCP and UDP). # `lsof` isn't available everywhere (e.g. Git Bash on Windows); without it we # can't probe, so fall back to `start` and let the relay surface a bind error if + # it's actually taken. [private] port start="4443": diff --git a/demo/pub/justfile b/demo/pub/justfile index 20556733c..d86283cfa 100644 --- a/demo/pub/justfile +++ b/demo/pub/justfile @@ -1,4 +1,4 @@ -set fallback +set fallback := true # --- R2 bucket management (vid.moq.dev) --- @@ -258,6 +258,7 @@ ffmpeg-cmaf input output='-' *args: # # `-pes_payload_size 0` flushes one PES per audio frame instead of batching ~8 # (the default 2930-byte minimum), so audio interleaves evenly with video rather + # than arriving in ~185ms bursts. Without it the player must buffer a whole burst. [private] ffmpeg-ts input output='-' *args: diff --git a/demo/relay/justfile b/demo/relay/justfile index 1f73c13a5..a620aafd9 100644 --- a/demo/relay/justfile +++ b/demo/relay/justfile @@ -1,4 +1,4 @@ -set fallback +set fallback := true # Run a localhost relay server without authentication. default: @@ -7,6 +7,7 @@ default: # Run a cluster of relay servers. # # The relays grant anonymous access (see *.toml), so no JWT is needed locally. + # Cluster peers still authenticate to each other via mTLS (the cert/ca recipes). cluster: bun install diff --git a/demo/sub/justfile b/demo/sub/justfile index e7ec69ed8..4aeab1454 100644 --- a/demo/sub/justfile +++ b/demo/sub/justfile @@ -1,4 +1,4 @@ -set fallback +set fallback := true # Subscribe to a broadcast using GStreamer and render to the screen. gst name url='http://localhost:4443' *args: diff --git a/demo/web/justfile b/demo/web/justfile index 6d3cd3a7a..7966385e8 100644 --- a/demo/web/justfile +++ b/demo/web/justfile @@ -6,6 +6,7 @@ default: # Run the web server targeting the specified relay. # The vite config opens the watch, publish, and stats demos in separate tabs; + # set MOQ_NO_OPEN=1 to skip opening a browser. serve url='http://localhost:4443': VITE_RELAY_URL="{{ url }}" bun --bun vite diff --git a/flake.nix b/flake.nix index 48faa5bae..7ca61092a 100644 --- a/flake.nix +++ b/flake.nix @@ -55,13 +55,14 @@ "rust-src" "rust-analyzer" ]; - targets = [ - "wasm32-unknown-unknown" - ] - ++ pkgs.lib.optionals pkgs.stdenv.isDarwin [ - "x86_64-apple-darwin" - "aarch64-apple-darwin" - ]; + targets = + [ + "wasm32-unknown-unknown" + ] + ++ pkgs.lib.optionals pkgs.stdenv.isDarwin [ + "x86_64-apple-darwin" + "aarch64-apple-darwin" + ]; }; # GStreamer dependencies (for moq-gst plugin) @@ -178,47 +179,48 @@ overlayPkgs = pkgs.extend self.overlays.default; in { - packages = (rec { - default = pkgs.symlinkJoin { - name = "moq-all"; - paths = [ + packages = + (rec { + default = pkgs.symlinkJoin { + name = "moq-all"; + paths = [ + moq-relay + moq-cli + moq-token-cli + ]; + }; + + # Inherit packages from the overlay + inherit (overlayPkgs) moq-relay moq-cli moq-token-cli - ]; - }; + moq-boy + libmoq + moq-gst + ; - # Inherit packages from the overlay - inherit (overlayPkgs) - moq-relay - moq-cli - moq-token-cli - moq-boy - libmoq - moq-gst - ; - - # Bundle of packaging + repo-publish tooling, pinned via flake.lock. - # CI builds this and prepends its bin/ to $PATH so subsequent steps - # use the same versions a local `nix develop` user would. - packaging = pkgs.symlinkJoin { - name = "moq-packaging-tools"; - paths = packagingDeps ++ publishDeps; + # Bundle of packaging + repo-publish tooling, pinned via flake.lock. + # CI builds this and prepends its bin/ to $PATH so subsequent steps + # use the same versions a local `nix develop` user would. + packaging = pkgs.symlinkJoin { + name = "moq-packaging-tools"; + paths = packagingDeps ++ publishDeps; + }; + }) + # x86_64-darwin release artifacts are cross-compiled from the + # aarch64-darwin runner (see nix/overlay.nix). The cross outputs only + # evaluate on an aarch64-darwin host, so gate them on the system to + # keep `nix flake check` working on Linux and Intel macs. + // pkgs.lib.optionalAttrs (system == "aarch64-darwin") { + inherit (overlayPkgs) + moq-relay-x86_64-apple-darwin + moq-cli-x86_64-apple-darwin + moq-token-cli-x86_64-apple-darwin + libmoq-x86_64-apple-darwin + moq-gst-plugin-x86_64-apple-darwin + ; }; - }) - # x86_64-darwin release artifacts are cross-compiled from the - # aarch64-darwin runner (see nix/overlay.nix). The cross outputs only - # evaluate on an aarch64-darwin host, so gate them on the system to - # keep `nix flake check` working on Linux and Intel macs. - // pkgs.lib.optionalAttrs (system == "aarch64-darwin") { - inherit (overlayPkgs) - moq-relay-x86_64-apple-darwin - moq-cli-x86_64-apple-darwin - moq-token-cli-x86_64-apple-darwin - libmoq-x86_64-apple-darwin - moq-gst-plugin-x86_64-apple-darwin - ; - }; # Re-export gst_all_1 so users can pair the plugin with a matching # gstreamer in one nix invocation: diff --git a/go/justfile b/go/justfile index 12d60f8b2..487afdc6d 100644 --- a/go/justfile +++ b/go/justfile @@ -11,22 +11,26 @@ default: # Build moq-ffi for the host, run uniffi-bindgen-go, stage both the ffi and # wrapper modules into dist/ (wiring the wrapper to the local ffi via replace), # and run `go build`/`go vet`/`go test`. Skips cleanly if cargo, go, or + # uniffi-bindgen-go is missing. check: bash scripts/check.sh # Stage the in-tree go/ffi source + per-target moq-ffi libs + generated + # bindings into the moq-go-ffi module ready for publish. package-ffi *args: bash scripts/package-ffi.sh {{ args }} # Stage the in-tree go/wrapper source into the moq-go module, rewriting its + # moq-go-ffi require to the target ffi version, ready for publish. package-wrapper *args: bash scripts/package-wrapper.sh {{ args }} # Remove the generated bindings, native libs, and vendoring that # scripts/check.sh produces in go/ (see go/.gitignore). The tmp staging + # under the workspace dist/ is swept by `just js clean`. clean: rm -rf moq/moq.go moq/lib go.sum vendor @@ -34,6 +38,7 @@ clean: # Full Go CI: `check` builds moq-ffi, regenerates bindings, runs go # vet/build/test. Takes a newline-separated list of changed files; # skips if FILES is non-empty and none match the Go scope. Run + # `just go ci` (no FILES) to force-run. ci FILES="": #!/usr/bin/env bash diff --git a/infra/apt/justfile b/infra/apt/justfile index f8654f08f..895745376 100644 --- a/infra/apt/justfile +++ b/infra/apt/justfile @@ -1,4 +1,4 @@ -set fallback +set fallback := true # Deploy the apt.moq.dev worker. deploy: diff --git a/infra/justfile b/infra/justfile index 5edcd8ec8..b27556db1 100644 --- a/infra/justfile +++ b/infra/justfile @@ -1,4 +1,4 @@ -set fallback +set fallback := true mod apt mod rpm diff --git a/infra/rpm/justfile b/infra/rpm/justfile index f1811346b..ebb307ca2 100644 --- a/infra/rpm/justfile +++ b/infra/rpm/justfile @@ -1,4 +1,4 @@ -set fallback +set fallback := true # Deploy the rpm.moq.dev worker. deploy: diff --git a/js/justfile b/js/justfile index 109716aad..35ac01ce0 100644 --- a/js/justfile +++ b/js/justfile @@ -42,6 +42,7 @@ build: bun run --filter='*' build # Remove node_modules and build output. The bun workspace spans the repo + # (deps hoist to the root, not into js/), so sweep from the workspace root. clean: #!/usr/bin/env bash diff --git a/justfile b/justfile index bdcb4eb8a..0884c2bc6 100644 --- a/justfile +++ b/justfile @@ -9,10 +9,8 @@ mod py mod kt mod swift mod go - # Unit tests per language (`just test`). mod test - # Demos and infra. mod demo mod infra @@ -140,6 +138,7 @@ build: # .cargo/config.toml. The profile already optimizes for size; wasm-opt is # skipped since it didn't improve the gzipped (over-the-wire) size here. # Resolve the artifact via CARGO_TARGET_DIR so it also works on runners that + # redirect the target directory (it defaults to `target`). wasm: cargo build -p moq-wasm --target wasm32-unknown-unknown --profile wasm-release @@ -149,6 +148,7 @@ wasm: # Delete build artifacts and caches to reclaim disk space. Each language # owns its own `clean` (see js/rs/py/kt/swift/go justfiles); this # orchestrates them, sweeps the caches no language owns, then recurses into + # any agent worktrees under .claude/worktrees/. clean: #!/usr/bin/env bash diff --git a/kt/justfile b/kt/justfile index 6f0f9446d..42f90901a 100644 --- a/kt/justfile +++ b/kt/justfile @@ -24,6 +24,7 @@ package *args: bash scripts/package.sh {{ args }} # Remove gradle output plus the moq-ffi bindings and native libs that + # scripts/check.sh generates into the source tree (see kt/.gitignore). clean: #!/usr/bin/env bash diff --git a/nix/modules/moq-relay.nix b/nix/modules/moq-relay.nix index 89cfd9ac8..a9b0959ce 100644 --- a/nix/modules/moq-relay.nix +++ b/nix/modules/moq-relay.nix @@ -215,45 +215,46 @@ in AmbientCapabilities = lib.optional (cfg.port < 1024) "CAP_NET_BIND_SERVICE"; }; - environment = { - # Enable jemalloc heap profiling; dump with `kill -USR1 ` - MALLOC_CONF = "prof:true,prof_active:true,prof_prefix:${cfg.heapDumpPrefix}"; + environment = + { + # Enable jemalloc heap profiling; dump with `kill -USR1 ` + MALLOC_CONF = "prof:true,prof_active:true,prof_prefix:${cfg.heapDumpPrefix}"; - MOQ_LOG_LEVEL = lib.mkDefault cfg.logLevel; + MOQ_LOG_LEVEL = lib.mkDefault cfg.logLevel; - # Server configuration - MOQ_SERVER_BIND = "[::]:${toString cfg.port}"; + # Server configuration + MOQ_SERVER_BIND = "[::]:${toString cfg.port}"; - MOQ_CLIENT_TLS_DISABLE_VERIFY = lib.boolToString cfg.cluster.disableTlsVerify; - } - // lib.optionalAttrs (cfg.tls.generate != [ ]) { - # TLS configuration - MOQ_SERVER_TLS_GENERATE = lib.concatStringsSep "," cfg.tls.generate; - } - // lib.optionalAttrs (cfg.tls.certs != [ ]) { - MOQ_SERVER_TLS_CERT = lib.concatMapStringsSep "," (cert: "${cert.chain}") cfg.tls.certs; - } - // lib.optionalAttrs (cfg.tls.certs != [ ]) { - MOQ_SERVER_TLS_KEY = lib.concatMapStringsSep "," (cert: "${cert.key}") cfg.tls.certs; - } - // lib.optionalAttrs cfg.auth.enable { - # Auth configuration - MOQ_AUTH_KEY = if cfg.auth.keyFile != null then cfg.auth.keyFile else "${cfg.stateDir}/root.jwk"; - } - // lib.optionalAttrs (cfg.auth.publicPath != null) { - MOQ_AUTH_PUBLIC = cfg.auth.publicPath; - } - // lib.optionalAttrs (cfg.cluster.rootUrl != null) { - # Cluster configuration - MOQ_CLUSTER_ROOT = cfg.cluster.rootUrl; - } - // lib.optionalAttrs (cfg.cluster.mode != "none") { - MOQ_CLUSTER_TOKEN = - if cfg.cluster.tokenFile != null then cfg.cluster.tokenFile else "${cfg.stateDir}/cluster.jwt"; - } - // lib.optionalAttrs (cfg.cluster.nodeUrl != null) { - MOQ_CLUSTER_NODE = cfg.cluster.nodeUrl; - }; + MOQ_CLIENT_TLS_DISABLE_VERIFY = lib.boolToString cfg.cluster.disableTlsVerify; + } + // lib.optionalAttrs (cfg.tls.generate != [ ]) { + # TLS configuration + MOQ_SERVER_TLS_GENERATE = lib.concatStringsSep "," cfg.tls.generate; + } + // lib.optionalAttrs (cfg.tls.certs != [ ]) { + MOQ_SERVER_TLS_CERT = lib.concatMapStringsSep "," (cert: "${cert.chain}") cfg.tls.certs; + } + // lib.optionalAttrs (cfg.tls.certs != [ ]) { + MOQ_SERVER_TLS_KEY = lib.concatMapStringsSep "," (cert: "${cert.key}") cfg.tls.certs; + } + // lib.optionalAttrs cfg.auth.enable { + # Auth configuration + MOQ_AUTH_KEY = if cfg.auth.keyFile != null then cfg.auth.keyFile else "${cfg.stateDir}/root.jwk"; + } + // lib.optionalAttrs (cfg.auth.publicPath != null) { + MOQ_AUTH_PUBLIC = cfg.auth.publicPath; + } + // lib.optionalAttrs (cfg.cluster.rootUrl != null) { + # Cluster configuration + MOQ_CLUSTER_ROOT = cfg.cluster.rootUrl; + } + // lib.optionalAttrs (cfg.cluster.mode != "none") { + MOQ_CLUSTER_TOKEN = + if cfg.cluster.tokenFile != null then cfg.cluster.tokenFile else "${cfg.stateDir}/cluster.jwt"; + } + // lib.optionalAttrs (cfg.cluster.nodeUrl != null) { + MOQ_CLUSTER_NODE = cfg.cluster.nodeUrl; + }; }; }; } diff --git a/py/justfile b/py/justfile index 41d747e75..a4d2ba1c7 100644 --- a/py/justfile +++ b/py/justfile @@ -15,6 +15,7 @@ default: # Build moq-ffi (maturin cdylib + bindings) and install the pure-python moq-rs # wrapper, both editable into the workspace venv. `--no-deps` on the wrapper + # keeps uv from fetching moq-ffi off PyPI; maturin just installed it locally. _develop: cd moq-ffi && uv run --no-sync maturin develop --uv @@ -22,6 +23,7 @@ _develop: # Lint + format + editable build + pyright. `--no-install-workspace` installs # the root dev group (ruff, maturin, pyright, pytest) without trying to + # pip-build the workspace members; `_develop` then installs them editable. check: uv sync --no-install-workspace @@ -41,6 +43,7 @@ test: uv run --no-sync pytest moq-rs/tests/ moq-ffi/tests/ # Local dev build: editable install of moq-ffi (with the cdylib + uniffi + # bindings) and the moq-rs wrapper into the workspace venv. build: uv sync --no-install-workspace @@ -48,6 +51,7 @@ build: # Remove the virtualenv, release dist, bytecode caches, and the uniffi # bindings maturin drops in during editable installs. The uv workspace venv + # lives at the repo root, so reach up for it. clean: #!/usr/bin/env bash @@ -59,6 +63,7 @@ clean: # Build the pure-python moq-rs wrapper sdist + wheel into py/dist (for release). # moq-ffi is built separately by maturin (see release-py.yml); the wrapper is + # pure python so it needs no compilation, just a metadata-correct wheel. package: rm -rf dist @@ -67,6 +72,7 @@ package: # Full Python CI: lint + tests + build. Takes a newline-separated list # of changed files; skips if FILES is non-empty and none match the # Python scope (which includes rs/moq-ffi because moq-ffi bundles it via + # maturin). Run `just py ci` (no FILES) to force-run everything. ci FILES="": #!/usr/bin/env bash diff --git a/rs/justfile b/rs/justfile index a18edddc7..deb310044 100644 --- a/rs/justfile +++ b/rs/justfile @@ -27,6 +27,7 @@ check *args: # force-run everything. `cargo publish --dry-run` lives in release-rs.yml. # # `cargo deny` runs here (not in `check`) so the inner-loop stays fast + # and devs aren't blocked by a fresh upstream advisory mid-edit. ci FILES="": #!/usr/bin/env bash @@ -97,6 +98,7 @@ update: # # Examples: # just rs package moq-relay deb + # just rs package moq-cli rpm package crate packager: #!/usr/bin/env bash diff --git a/rs/moq-mux/src/codec/aac/import.rs b/rs/moq-mux/src/codec/aac/import.rs index 35c4b25d0..304caf68d 100644 --- a/rs/moq-mux/src/codec/aac/import.rs +++ b/rs/moq-mux/src/codec/aac/import.rs @@ -73,11 +73,11 @@ impl Import { } /// Publish one AAC packet as its own group, stamping `pts` or a wall clock when absent. - pub fn decode(&mut self, frame: &[u8], pts: Option) -> crate::Result<()> { + pub fn decode(&mut self, frame: B, pts: Option) -> crate::Result<()> { let timestamp = self.rendition.timestamp(pts)?; self.track.write(Frame { timestamp, - payload: bytes::Bytes::copy_from_slice(frame), + payload: frame.into_bytes(), keyframe: true, duration: None, })?; diff --git a/rs/moq-mux/src/codec/h264/mod.rs b/rs/moq-mux/src/codec/h264/mod.rs index de3cd1e6f..469f8dbe7 100644 --- a/rs/moq-mux/src/codec/h264/mod.rs +++ b/rs/moq-mux/src/codec/h264/mod.rs @@ -32,14 +32,15 @@ const NAL_TYPE_PPS: u8 = 8; /// `length_size` known out-of-band from the avcC (`super::Avcc::parse(avcc).length_size`). /// The payload is passed through verbatim. pub(crate) fn avc1_frame( - data: &[u8], + data: impl moq_net::AsBytes, length_size: usize, pts: moq_net::Timestamp, ) -> crate::Result { + let keyframe = avc1_is_keyframe(data.as_ref(), length_size); Ok(crate::container::Frame { timestamp: pts, - payload: data.to_vec().into(), - keyframe: avc1_is_keyframe(data, length_size), + payload: data.into_bytes(), + keyframe, duration: None, }) } diff --git a/rs/moq-mux/src/codec/legacy.rs b/rs/moq-mux/src/codec/legacy.rs index 826069091..0eb7dcb57 100644 --- a/rs/moq-mux/src/codec/legacy.rs +++ b/rs/moq-mux/src/codec/legacy.rs @@ -164,12 +164,12 @@ impl Import { } /// Publish one whole frame as a hang frame in its own group. - pub fn decode(&mut self, frame: &[u8], pts: Option) -> crate::Result<()> { + pub fn decode(&mut self, frame: B, pts: Option) -> crate::Result<()> { let timestamp = self.rendition.timestamp(pts)?; self.track.write(Frame { timestamp, duration: None, - payload: bytes::Bytes::copy_from_slice(frame), + payload: frame.into_bytes(), keyframe: true, })?; self.track.finish_group()?; diff --git a/rs/moq-mux/src/codec/opus/import.rs b/rs/moq-mux/src/codec/opus/import.rs index 9fe97367f..ce9424f70 100644 --- a/rs/moq-mux/src/codec/opus/import.rs +++ b/rs/moq-mux/src/codec/opus/import.rs @@ -59,11 +59,11 @@ impl Import { } /// Publish one Opus packet as its own group, stamping `pts` or a wall clock when absent. - pub fn decode(&mut self, frame: &[u8], pts: Option) -> crate::Result<()> { + pub fn decode(&mut self, frame: B, pts: Option) -> crate::Result<()> { let timestamp = self.rendition.timestamp(pts)?; self.track.write(Frame { timestamp, - payload: bytes::Bytes::copy_from_slice(frame), + payload: frame.into_bytes(), keyframe: true, duration: None, })?; diff --git a/rs/moq-mux/src/codec/vp8/import.rs b/rs/moq-mux/src/codec/vp8/import.rs index f78d4af38..c9eacbc5d 100644 --- a/rs/moq-mux/src/codec/vp8/import.rs +++ b/rs/moq-mux/src/codec/vp8/import.rs @@ -1,5 +1,3 @@ -use bytes::Bytes; - use crate::catalog::hang::CatalogExt; use crate::container::Frame; use crate::container::jitter::Jitter; @@ -70,13 +68,12 @@ impl Import { } /// Decode a single VP8 frame. - pub fn decode(&mut self, frame: &[u8], pts: Option) -> crate::Result<()> { - if frame.is_empty() { + pub fn decode(&mut self, frame: B, pts: Option) -> crate::Result<()> { + if frame.as_ref().is_empty() { return Err(super::Error::EmptyFrame.into()); } - let payload = Bytes::copy_from_slice(frame); - let header = FrameHeader::parse(&payload)?; + let header = FrameHeader::parse(frame.as_ref())?; if let Some((width, height)) = header.dimensions { self.init(width, height)?; } @@ -84,7 +81,7 @@ impl Import { let pts = self.rendition.timestamp(pts)?; self.track.write(Frame { timestamp: pts, - payload, + payload: frame.into_bytes(), keyframe: header.keyframe, duration: None, })?; diff --git a/rs/moq-mux/src/codec/vp9/import.rs b/rs/moq-mux/src/codec/vp9/import.rs index f44464139..5c0e6f3f5 100644 --- a/rs/moq-mux/src/codec/vp9/import.rs +++ b/rs/moq-mux/src/codec/vp9/import.rs @@ -1,5 +1,3 @@ -use bytes::Bytes; - use crate::catalog::hang::CatalogExt; use crate::container::Frame; use crate::container::jitter::Jitter; @@ -70,13 +68,12 @@ impl Import { } /// Decode a single VP9 frame (or superframe). - pub fn decode(&mut self, frame: &[u8], pts: Option) -> crate::Result<()> { - if frame.is_empty() { + pub fn decode(&mut self, frame: B, pts: Option) -> crate::Result<()> { + if frame.as_ref().is_empty() { return Err(super::Error::EmptyFrame.into()); } - let payload = Bytes::copy_from_slice(frame); - let header = FrameHeader::parse(&payload)?; + let header = FrameHeader::parse(frame.as_ref())?; if let Some(key) = header.key { self.init(key.to_catalog(), key.width, key.height)?; } @@ -84,7 +81,7 @@ impl Import { let pts = self.rendition.timestamp(pts)?; self.track.write(Frame { timestamp: pts, - payload, + payload: frame.into_bytes(), keyframe: header.keyframe, duration: None, })?; diff --git a/rs/moq-mux/src/import/track.rs b/rs/moq-mux/src/import/track.rs index 64216b846..c3069f1d0 100644 --- a/rs/moq-mux/src/import/track.rs +++ b/rs/moq-mux/src/import/track.rs @@ -175,7 +175,7 @@ impl Track { } /// Decode one whole frame. - pub fn decode(&mut self, frame: &[u8], pts: Option) -> Result<()> { + pub fn decode(&mut self, frame: B, pts: Option) -> Result<()> { match self.kind { TrackKind::Avc3 { ref mut split, @@ -183,7 +183,7 @@ impl Track { } => { // One whole access unit per call, so flush to emit it rather than // waiting for the next start code. - let mut frames = split.decode(frame, pts)?; + let mut frames = split.decode(frame.as_ref(), pts)?; frames.extend(split.flush(pts)?); import.decode(frames)?; } @@ -199,7 +199,7 @@ impl Track { ref mut split, ref mut import, } => { - let mut frames = split.decode(frame, pts)?; + let mut frames = split.decode(frame.as_ref(), pts)?; frames.extend(split.flush(pts)?); import.decode(frames)?; } @@ -207,7 +207,7 @@ impl Track { ref mut split, ref mut import, } => { - let mut frames = split.decode(frame, pts)?; + let mut frames = split.decode(frame.as_ref(), pts)?; frames.extend(split.flush(pts)?); import.decode(frames)?; } diff --git a/rs/moq-native/src/error.rs b/rs/moq-native/src/error.rs index e3cb97a87..1945ef8d1 100644 --- a/rs/moq-native/src/error.rs +++ b/rs/moq-native/src/error.rs @@ -2,8 +2,8 @@ use std::sync::Arc; /// Errors produced while configuring or establishing native MoQ connections. /// -/// Backend-specific failures live in per-backend error types ([`crate::tls::Error`], -/// [`crate::quinn::Error`], etc.). They're wrapped in `Arc` here so the aggregate +/// Backend-specific failures live in per-backend error types (`crate::tls::Error`, +/// `crate::quinn::Error`, etc.). They're wrapped in `Arc` here so the aggregate /// stays `Clone` even though the underlying transport/IO errors are not. #[derive(Debug, Clone, thiserror::Error)] #[non_exhaustive] diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index e1565fe66..c931a2796 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -281,7 +281,7 @@ impl Server { /// Returns the next partially established QUIC or WebTransport session. /// - /// This returns a [Request] instead of a [web_transport_quinn::Session] + /// This returns a [Request] instead of a `web_transport_quinn::Session` /// so the connection can be rejected early on an invalid path or missing auth. /// /// The [Request] is either a WebTransport or a raw QUIC request. diff --git a/rs/moq-net/src/coding/reader.rs b/rs/moq-net/src/coding/reader.rs index 543350803..43a8b859c 100644 --- a/rs/moq-net/src/coding/reader.rs +++ b/rs/moq-net/src/coding/reader.rs @@ -86,21 +86,13 @@ impl Reader { } } - /// Read into the provided buffer, draining the reader's internal buffer first. - /// - /// Returns the number of bytes written, or `None` if the stream is closed - /// (and the internal buffer was empty). - pub async fn read_buf( - &mut self, - dst: &mut B, - ) -> Result, Error> { - if !self.buffer.is_empty() && dst.has_remaining_mut() { - let n = cmp::min(self.buffer.len(), dst.remaining_mut()); - let chunk = self.buffer.split_to(n); - dst.put_slice(&chunk); - return Ok(Some(n)); + /// Read the next chunk, draining the reader's internal buffer first. + pub async fn read_chunk(&mut self, max: usize) -> Result, Error> { + if !self.buffer.is_empty() { + let n = cmp::min(self.buffer.len(), max); + return Ok(Some(self.buffer.split_to(n).freeze())); } - self.stream.read_buf(dst).await.map_err(Error::from_transport) + self.stream.read_chunk(max).await.map_err(Error::from_transport) } /// Read exactly the given number of bytes from the stream. diff --git a/rs/moq-net/src/coding/writer.rs b/rs/moq-net/src/coding/writer.rs index 8af22045c..c1199620b 100644 --- a/rs/moq-net/src/coding/writer.rs +++ b/rs/moq-net/src/coding/writer.rs @@ -39,8 +39,7 @@ impl Writer { Ok(()) } - // Not public to avoid accidental partial writes. - async fn write(&mut self, buf: &mut Buf) -> Result { + pub(crate) async fn write(&mut self, buf: &mut Buf) -> Result { self.stream .as_mut() .unwrap() @@ -59,6 +58,16 @@ impl Writer { Ok(()) } + /// Write the entire [`bytes::Bytes`] chunk to the stream. + pub async fn write_chunk(&mut self, chunk: bytes::Bytes) -> Result<(), Error> { + self.stream + .as_mut() + .unwrap() + .write_chunk(chunk) + .await + .map_err(Error::from_transport) + } + /// Mark the stream as finished. pub fn finish(&mut self) -> Result<(), Error> { self.stream.as_mut().unwrap().finish().map_err(Error::from_transport) diff --git a/rs/moq-net/src/ietf/adapter.rs b/rs/moq-net/src/ietf/adapter.rs index c257740f1..35954833a 100644 --- a/rs/moq-net/src/ietf/adapter.rs +++ b/rs/moq-net/src/ietf/adapter.rs @@ -212,6 +212,23 @@ impl web_transport_trait::SendStream for VirtualSendStream { Ok(len) } + async fn write_chunk(&mut self, chunk: Bytes) -> Result<(), Self::Error> { + if let Some(pending) = &mut self.pending { + pending.buf.extend_from_slice(&chunk); + + if let Some(request_id) = pending.try_parse()? { + let mut pending = self.pending.take().unwrap(); + let buf = std::mem::take(&mut pending.buf).freeze(); + pending.register(request_id); + self.control_tx.send(buf).map_err(|_| crate::Error::Closed)?; + } + } else { + self.control_tx.send(chunk).map_err(|_| crate::Error::Closed)?; + } + + Ok(()) + } + fn set_priority(&mut self, _order: u8) {} fn finish(&mut self) -> Result<(), Self::Error> { @@ -248,6 +265,20 @@ impl web_transport_trait::SendStream for Adapte } } + async fn write_buf(&mut self, buf: &mut B) -> Result { + match self { + Self::Real(s) => s.write_buf(buf).await.map_err(|_| crate::Error::Closed), + Self::Virtual(s) => s.write_buf(buf).await, + } + } + + async fn write_chunk(&mut self, chunk: Bytes) -> Result<(), Self::Error> { + match self { + Self::Real(s) => s.write_chunk(chunk).await.map_err(|_| crate::Error::Closed), + Self::Virtual(s) => s.write_chunk(chunk).await, + } + } + fn set_priority(&mut self, order: u8) { match self { Self::Real(s) => s.set_priority(order), diff --git a/rs/moq-net/src/ietf/publisher.rs b/rs/moq-net/src/ietf/publisher.rs index bf39cadd4..3361c05b8 100644 --- a/rs/moq-net/src/ietf/publisher.rs +++ b/rs/moq-net/src/ietf/publisher.rs @@ -336,8 +336,7 @@ impl Publisher { let mut ext = bytes::BytesMut::new(); ietf::encode_object_time(&mut ext, frame.timestamp, version)?; stream.encode(&(ext.len() as u64)).await?; - let mut ext = ext.freeze(); - stream.write_all(&mut ext).await?; + stream.write_chunk(ext.freeze()).await?; } // Write the size of the frame. @@ -357,9 +356,9 @@ impl Publisher { }; match chunk? { - Some(mut chunk) => { + Some(chunk) => { let n = chunk.len() as u64; - stream.write_all(&mut chunk).await?; + stream.write_chunk(chunk).await?; track_stats.bytes(n); } None => break, diff --git a/rs/moq-net/src/ietf/subscriber.rs b/rs/moq-net/src/ietf/subscriber.rs index f29e4d1b3..f454c9318 100644 --- a/rs/moq-net/src/ietf/subscriber.rs +++ b/rs/moq-net/src/ietf/subscriber.rs @@ -875,12 +875,12 @@ impl Subscriber { mut frame: FrameProducer, track_stats: &SubscriberTrack, ) -> Result<(), Error> { - // FrameProducer impls BufMut; read_buf writes stream bytes directly into - // the per-frame buffer (see lite/subscriber.rs run_frame for rationale). while bytes::BufMut::has_remaining_mut(&frame) { - match stream.read_buf(&mut frame).await? { - Some(n) if n > 0 => { - track_stats.bytes(n as u64); + let remaining = bytes::BufMut::remaining_mut(&frame); + match stream.read_chunk(remaining).await? { + Some(chunk) if !chunk.is_empty() => { + track_stats.bytes(chunk.len() as u64); + frame.write(chunk)?; } _ => return Err(Error::WrongSize), } diff --git a/rs/moq-net/src/lite/publisher.rs b/rs/moq-net/src/lite/publisher.rs index 679ad8a88..fb8f7babe 100644 --- a/rs/moq-net/src/lite/publisher.rs +++ b/rs/moq-net/src/lite/publisher.rs @@ -1,5 +1,6 @@ use std::{sync::Arc, time::Duration}; +use bytes::Buf; use futures::{FutureExt, StreamExt, stream::FuturesUnordered}; use web_transport_trait::Stats; @@ -785,19 +786,19 @@ async fn write_fetch_frame( Compression::None => { writer.encode(&frame.size).await?; track_stats.frame(); - while let Some(mut chunk) = frame.read_chunk().await? { + while let Some(chunk) = frame.read_chunk().await? { let n = chunk.len() as u64; - writer.write_all(&mut chunk).await?; + writer.write_chunk(chunk).await?; track_stats.bytes(n); } } compression => { let payload = frame.read_all().await?; - let mut chunk = bytes::Bytes::from(compression.compress(&payload)); + let chunk = bytes::Bytes::from(compression.compress(&payload)); let n = chunk.len() as u64; writer.encode(&n).await?; track_stats.frame(); - writer.write_all(&mut chunk).await?; + writer.write_chunk(chunk).await?; track_stats.bytes(n); } } @@ -1051,18 +1052,16 @@ impl Subscription { mut chunk: bytes::Bytes, ) -> Result<(), Error> { let n = chunk.len() as u64; - loop { - tokio::select! { - biased; - result = stream.write_all(&mut chunk) => { - result?; - break; - } - new_pri = priority.next() => stream.set_priority(new_pri), - Ok(()) = self.track_priority.changed() => priority.set_track(*self.track_priority.borrow_and_update()), - } + while chunk.has_remaining() { + self.apply_priority(stream, priority); + stream.write(&mut chunk).await?; } self.track_stats.bytes(n); Ok(()) } + + fn apply_priority(&mut self, stream: &mut Writer, priority: &mut PriorityHandle) { + priority.set_track(*self.track_priority.borrow_and_update()); + stream.set_priority(priority.current()); + } } diff --git a/rs/moq-net/src/lite/subscriber.rs b/rs/moq-net/src/lite/subscriber.rs index 60f25b334..982f8ddfe 100644 --- a/rs/moq-net/src/lite/subscriber.rs +++ b/rs/moq-net/src/lite/subscriber.rs @@ -696,14 +696,12 @@ impl Subscriber { frame: &mut FrameProducer, track_stats: &SubscriberTrack, ) -> Result<(), Error> { - // FrameProducer impls BufMut over its pre-allocated per-frame buffer, so - // read_buf writes QUIC stream bytes directly into the frame — no - // intermediate Bytes allocations, and quinn's reassembly arena is freed - // as we drain it. while bytes::BufMut::has_remaining_mut(frame) { - match stream.read_buf(frame).await? { - Some(n) if n > 0 => { - track_stats.bytes(n as u64); + let remaining = bytes::BufMut::remaining_mut(frame); + match stream.read_chunk(remaining).await? { + Some(chunk) if !chunk.is_empty() => { + track_stats.bytes(chunk.len() as u64); + frame.write(chunk)?; } _ => return Err(Error::WrongSize), } diff --git a/rs/moq-net/src/model/bytes.rs b/rs/moq-net/src/model/bytes.rs new file mode 100644 index 000000000..4fadd6aa2 --- /dev/null +++ b/rs/moq-net/src/model/bytes.rs @@ -0,0 +1,76 @@ +use bytes::{Bytes, BytesMut}; + +/// Converts borrowed or owned byte buffers into [`Bytes`]. +/// +/// Owned buffers keep their allocation when possible. Borrowed buffers copy into +/// a new [`Bytes`] value. +pub trait AsBytes: AsRef<[u8]> { + /// Convert this buffer into owned bytes. + fn into_bytes(self) -> Bytes; +} + +impl AsBytes for Bytes { + fn into_bytes(self) -> Bytes { + self + } +} + +impl AsBytes for &Bytes { + fn into_bytes(self) -> Bytes { + self.clone() + } +} + +impl AsBytes for BytesMut { + fn into_bytes(self) -> Bytes { + self.freeze() + } +} + +impl AsBytes for &BytesMut { + fn into_bytes(self) -> Bytes { + Bytes::copy_from_slice(self.as_ref()) + } +} + +impl AsBytes for Vec { + fn into_bytes(self) -> Bytes { + Bytes::from(self) + } +} + +impl AsBytes for &Vec { + fn into_bytes(self) -> Bytes { + Bytes::copy_from_slice(self) + } +} + +impl AsBytes for String { + fn into_bytes(self) -> Bytes { + Bytes::from(self) + } +} + +impl AsBytes for &String { + fn into_bytes(self) -> Bytes { + Bytes::copy_from_slice(self.as_bytes()) + } +} + +impl AsBytes for &str { + fn into_bytes(self) -> Bytes { + Bytes::copy_from_slice(self.as_bytes()) + } +} + +impl AsBytes for &[u8] { + fn into_bytes(self) -> Bytes { + Bytes::copy_from_slice(self) + } +} + +impl AsBytes for &[u8; N] { + fn into_bytes(self) -> Bytes { + Bytes::copy_from_slice(self) + } +} diff --git a/rs/moq-net/src/model/frame.rs b/rs/moq-net/src/model/frame.rs index 8e6727797..29d9a326b 100644 --- a/rs/moq-net/src/model/frame.rs +++ b/rs/moq-net/src/model/frame.rs @@ -1,20 +1,20 @@ use std::sync::Arc; +use std::sync::OnceLock; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Poll, ready}; use bytes::buf::UninitSlice; use bytes::{BufMut, Bytes}; -use crate::{Error, GroupInfo, Result, Timestamp}; +use crate::{AsBytes, Error, GroupInfo, Result, Timestamp}; /// Maximum payload size accepted for a single frame. /// -/// The receive path preallocates a buffer from the declared frame size, so an +/// The receive path trusts the declared frame size when storing the payload, so an /// untrusted peer could otherwise request a multi-gigabyte allocation with a -/// single varint. [`FrameProducer::new`] enforces this for every frame: it's the -/// sole allocation chokepoint (reached via [`FrameInfo::produce`] and -/// [`crate::GroupProducer::create_frame`]) and rejects an oversized declared size -/// with [`Error::FrameTooLarge`] before allocating. +/// single varint. [`FrameProducer::new`] enforces this for every frame and +/// rejects an oversized declared size with [`Error::FrameTooLarge`] before the +/// payload is stored. /// /// Matches the per-group cache cap (`MAX_GROUP_CACHE`), so a single frame may fill /// a group. 16 MiB was too tight for a high-bitrate CMAF fragment carried as one @@ -54,11 +54,10 @@ impl FrameInfo { } } -/// Single-allocation buffer shared between a [FrameProducer] and many [FrameConsumer]s. +/// Payload storage shared between a [FrameProducer] and many [FrameConsumer]s. /// -/// Internally an [Arc] over a thin pointer + length owning a heap allocation. The -/// data pointer is stable for the life of any clone, so [Bytes] views taken via -/// [Bytes::from_owner] remain valid. [Clone] is cheap (one atomic increment). +/// Whole-frame [`Bytes`] writes are stored directly. Partial writes and [`BufMut`] +/// writes fall back to one mutable heap allocation sized to the declared frame. /// /// The producer writes through the raw pointer (sole writer); `written` provides /// happens-before for cross-thread reads. Implements [AsRef]<[u8]> directly so it @@ -67,19 +66,28 @@ impl FrameInfo { struct FrameBuf(Arc); struct FrameBufInner { + capacity: usize, + written: AtomicUsize, + storage: OnceLock, +} + +enum FrameStorage { + Shared(Bytes), + Mutable(MutableFrameBuf), +} + +struct MutableFrameBuf { // Owned heap allocation of `capacity` bytes (zero-initialized). data: *mut u8, capacity: usize, - written: AtomicUsize, } -// Safety: `data` is owned (Box-allocated, freed in Drop); the producer is the -// sole writer; consumers only read bytes `< written`, which was set via Release -// after the corresponding writes completed (Acquire pairs on the consumer side). -unsafe impl Send for FrameBufInner {} -unsafe impl Sync for FrameBufInner {} +// Safety: `data` is owned (Box-allocated, freed in Drop). The producer is the +// sole writer and consumers only read bytes `< written`. +unsafe impl Send for MutableFrameBuf {} +unsafe impl Sync for MutableFrameBuf {} -impl Drop for FrameBufInner { +impl Drop for MutableFrameBuf { fn drop(&mut self) { // Safety: data was obtained from `Box::into_raw` of a `Box<[u8]>` of // length `capacity` and is not aliased at drop (Arc refcount hit 0). @@ -90,15 +98,21 @@ impl Drop for FrameBufInner { } } -impl FrameBuf { +impl MutableFrameBuf { fn new(size: usize) -> Self { let boxed: Box<[u8]> = vec![0u8; size].into_boxed_slice(); let capacity = boxed.len(); let data = Box::into_raw(boxed) as *mut u8; + Self { data, capacity } + } +} + +impl FrameBuf { + fn new(size: usize) -> Self { Self(Arc::new(FrameBufInner { - data, - capacity, + capacity: size, written: AtomicUsize::new(0), + storage: OnceLock::new(), })) } @@ -110,12 +124,31 @@ impl FrameBuf { self.0.written.load(ord) } - /// Safety: caller must be the sole producer (FrameProducer-as-BufMut invariant). - unsafe fn data_ptr(&self) -> *mut u8 { - self.0.data + fn try_set_bytes(&self, bytes: Bytes) -> std::result::Result<(), Bytes> { + if bytes.len() != self.capacity() || self.written(Ordering::Acquire) != 0 { + return Err(bytes); + } + self.0 + .storage + .set(FrameStorage::Shared(bytes)) + .map_err(|storage| match storage { + FrameStorage::Shared(bytes) => bytes, + FrameStorage::Mutable(_) => unreachable!("try_set_bytes only installs shared storage"), + }) + } + + fn mutable(&self) -> &MutableFrameBuf { + match self + .0 + .storage + .get_or_init(|| FrameStorage::Mutable(MutableFrameBuf::new(self.capacity()))) + { + FrameStorage::Shared(_) => unreachable!("finished shared frame cannot become mutable"), + FrameStorage::Mutable(buf) => buf, + } } - /// Safety: caller must be the sole producer; `new_written` must be `<= capacity`. + /// Safety: caller must be the sole producer and `new_written` must be `<= capacity`. unsafe fn store_written(&self, new_written: usize) { // Release pairs with consumers' Acquire load to publish prior writes. self.0.written.store(new_written, Ordering::Release); @@ -127,10 +160,16 @@ impl AsRef<[u8]> for FrameBuf { // Snapshot the initialized region (bytes the producer has written so far). // Acquire pairs with the producer's Release on `written`. let written = self.0.written.load(Ordering::Acquire); - // Safety: data..data+written is initialized (zero-init at alloc + producer - // writes up to `written`). The Arc keeps the allocation alive while any - // reference to the slice lives. - unsafe { std::slice::from_raw_parts(self.0.data, written) } + match self.0.storage.get() { + Some(FrameStorage::Shared(bytes)) => &bytes[..written], + Some(FrameStorage::Mutable(buf)) => { + // Safety: data..data+written is initialized (zero-init at alloc + + // producer writes up to `written`). The Arc keeps the allocation alive + // while any reference to the slice lives. + unsafe { std::slice::from_raw_parts(buf.data, written) } + } + None => &[], + } } } @@ -170,9 +209,9 @@ impl std::ops::Deref for FrameProducer { impl FrameProducer { /// Create a new frame producer for the given frame header. /// - /// The single allocation chokepoint: rejects a frame whose declared + /// The payload storage chokepoint: rejects a frame whose declared /// [`FrameInfo::size`] exceeds [`MAX_FRAME_SIZE`] with [`Error::FrameTooLarge`] - /// before allocating the (untrusted) buffer. + /// before storing the untrusted payload. pub(crate) fn new(info: FrameInfo, group: GroupInfo) -> Result { if info.size > MAX_FRAME_SIZE { return Err(Error::FrameTooLarge); @@ -194,14 +233,30 @@ impl FrameProducer { /// Write a chunk of data to the frame. /// /// Returns [Error::WrongSize] if the chunk would exceed the remaining bytes. - pub fn write>(&mut self, chunk: B) -> Result<()> { - let chunk = chunk.into(); - if chunk.len() > self.remaining_mut() { + pub fn write(&mut self, chunk: B) -> Result<()> { + let len = chunk.as_ref().len(); + if len > self.remaining_mut() { return Err(Error::WrongSize); } // Surface aborts before writing. self.bail_if_aborted()?; - self.put_slice(&chunk); + if len == self.buf.capacity() && self.buf.written(Ordering::Acquire) == 0 { + match self.buf.try_set_bytes(chunk.into_bytes()) { + Ok(()) => { + let cap = self.buf.capacity(); + // Safety: `try_set_bytes` checked that the buffer exactly matches + // the declared size, so publishing all bytes is within bounds. + unsafe { self.buf.store_written(cap) }; + self.notify_written(cap); + return Ok(()); + } + Err(chunk) => { + self.put_slice(&chunk); + return Ok(()); + } + } + } + self.put_slice(chunk.as_ref()); Ok(()) } @@ -258,6 +313,16 @@ impl FrameProducer { } Ok(()) } + + fn notify_written(&mut self, written: usize) { + // Briefly take the kio write lock to wake waiters; drop of `Mut` triggers + // kio's notify. Also flip `fin` if we just filled the buffer. + if let Ok(mut state) = self.state.write() + && written == self.buf.capacity() + { + state.fin = true; + } + } } // Safety: `chunk_mut` returns a slice into the producer-private region of the @@ -274,11 +339,12 @@ unsafe impl BufMut for FrameProducer { fn chunk_mut(&mut self) -> &mut UninitSlice { let written = self.buf.written(Ordering::Acquire); let cap = self.buf.capacity(); + let buf = self.buf.mutable(); // Safety: writes to `[written..cap]` are unaliased — consumers only ever // read `[..written]`, and we hold `&mut self`. The slice's lifetime is // tied to `&mut self` by the function signature. unsafe { - let ptr = self.buf.data_ptr().add(written); + let ptr = buf.data.add(written); UninitSlice::from_raw_parts_mut(ptr, cap - written) } } @@ -292,14 +358,7 @@ unsafe impl BufMut for FrameProducer { ); // Safety: sole-writer invariant + bounds-checked above. unsafe { self.buf.store_written(prev + cnt) }; - - // Briefly take the kio write lock to wake waiters; drop of `Mut` - // triggers kio's notify. Also flip `fin` if we just filled the buffer. - if let Ok(mut state) = self.state.write() { - if prev + cnt == cap { - state.fin = true; - } - } + self.notify_written(prev + cnt); } } @@ -468,6 +527,25 @@ mod test { assert_eq!(data, Bytes::from_static(b"hello")); } + #[test] + fn whole_bytes_write_reuses_allocation() { + let input = Bytes::from(vec![1, 2, 3, 4, 5]); + let input_ptr = input.as_ptr(); + let mut producer = FrameInfo { + size: input.len() as u64, + timestamp: Timestamp::ZERO, + } + .produce() + .unwrap(); + producer.write(input.clone()).unwrap(); + producer.finish().unwrap(); + + let mut consumer = producer.consume(); + let data = consumer.read_all().now_or_never().unwrap().unwrap(); + assert_eq!(data, input); + assert_eq!(data.as_ptr(), input_ptr); + } + #[test] fn multi_chunk_read_all() { let mut producer = FrameInfo { diff --git a/rs/moq-net/src/model/group.rs b/rs/moq-net/src/model/group.rs index 1be5a7357..d74a37041 100644 --- a/rs/moq-net/src/model/group.rs +++ b/rs/moq-net/src/model/group.rs @@ -12,7 +12,7 @@ use std::task::{Poll, ready}; use bytes::Bytes; -use crate::{Error, Result, Timescale, Timestamp, TrackInfo}; +use crate::{AsBytes, Error, Result, Timescale, Timestamp, TrackInfo}; use super::{FrameConsumer, FrameInfo, FrameProducer}; @@ -193,10 +193,9 @@ impl GroupProducer { /// /// `timestamp` is converted into the parent track's timescale. Use /// [Self::write_frame_now] to stamp wall-clock time instead of supplying one. - pub fn write_frame>(&mut self, timestamp: Timestamp, data: B) -> Result<()> { - let data = data.into(); + pub fn write_frame(&mut self, timestamp: Timestamp, data: B) -> Result<()> { let mut frame = self.create_frame(FrameInfo { - size: data.len() as u64, + size: data.as_ref().len() as u64, timestamp, })?; frame.write(data)?; @@ -207,7 +206,7 @@ impl GroupProducer { /// Like [Self::write_frame] but stamps the frame with wall-clock now /// ([`Timestamp::now`]). For data with no real presentation time of its own /// (catalogs, JSON state) or sources whose protocol can't carry one. - pub fn write_frame_now>(&mut self, data: B) -> Result<()> { + pub fn write_frame_now(&mut self, data: B) -> Result<()> { self.write_frame(Timestamp::now(), data) } diff --git a/rs/moq-net/src/model/mod.rs b/rs/moq-net/src/model/mod.rs index 27258d847..5c49f1ff6 100644 --- a/rs/moq-net/src/model/mod.rs +++ b/rs/moq-net/src/model/mod.rs @@ -1,5 +1,6 @@ mod bandwidth; mod broadcast; +mod bytes; mod compression; mod frame; mod group; @@ -10,6 +11,7 @@ mod track; pub use bandwidth::*; pub use broadcast::*; +pub use bytes::*; pub use compression::*; pub use frame::*; pub use group::*; diff --git a/rs/moq-net/src/model/track.rs b/rs/moq-net/src/model/track.rs index 5646578e2..c04f130f1 100644 --- a/rs/moq-net/src/model/track.rs +++ b/rs/moq-net/src/model/track.rs @@ -566,18 +566,18 @@ impl TrackProducer { /// /// The timestamp is converted into the track's timescale. Use /// [`Self::write_frame_now`] to stamp wall-clock time instead. - pub fn write_frame>(&mut self, timestamp: Timestamp, frame: B) -> Result<()> { + pub fn write_frame(&mut self, timestamp: Timestamp, frame: B) -> Result<()> { let mut group = self.append_group()?; - group.write_frame(timestamp, frame.into())?; + group.write_frame(timestamp, frame)?; group.finish()?; Ok(()) } /// Like [`Self::write_frame`] but stamps the frame with wall-clock now /// ([`Timestamp::now`]). - pub fn write_frame_now>(&mut self, frame: B) -> Result<()> { + pub fn write_frame_now(&mut self, frame: B) -> Result<()> { let mut group = self.append_group()?; - group.write_frame_now(frame.into())?; + group.write_frame_now(frame)?; group.finish()?; Ok(()) } diff --git a/swift/justfile b/swift/justfile index 3da8db9ac..a8331b194 100644 --- a/swift/justfile +++ b/swift/justfile @@ -22,6 +22,7 @@ package *args: bash scripts/package.sh {{ args }} # Remove SwiftPM build output plus the XCFramework and generated bindings + # that scripts/check.sh lays down (see swift/.gitignore). clean: rm -rf .build .swiftpm Package.resolved Sources/MoqFFI/Generated.swift MoqFFI.xcframework diff --git a/test/justfile b/test/justfile index 84c76781d..1cef5204d 100644 --- a/test/justfile +++ b/test/justfile @@ -8,9 +8,9 @@ # package registry to catch packaging breakage in a release. # - `just test smoke` (smoke/, below) builds every client from THIS checkout # to catch interop regressions before anything is published. - # Fall back to the root justfile so `just js test` (etc.) resolve from here. -set fallback + +set fallback := true set working-directory := '.' # Run unit tests for every language (default). @@ -22,17 +22,20 @@ default *args: # Cross-language media interop smoke test, built from this checkout. Stands up a # relay and runs the publisher x subscriber matrix. Default: rust only. Pass # flags through, e.g. + # just test smoke --publishers rust,python,js --subscribers rust,c smoke *args: ./smoke/smoke.sh {{ args }} # Full interop matrix: rust + python + browser publish; everyone subscribes # (incl. native node/bun JS, the C libmoq client, and the GStreamer moqsrc + # plugin). --timeout 30 gives headless Chromium cold-start headroom. smoke-full: ./smoke/smoke.sh --publishers rust,python,js --subscribers rust,python,js,js-native-node,js-native-bun,c,gst --timeout 30 # Negative control: no publisher, every subscriber must time out (proves the + # harness can actually report failure). smoke-negative *args: ./smoke/smoke.sh --negative {{ args }}