feat(json): group-scoped DEFLATE compression with browser support#1897
Conversation
Add an optional `Config.compression` to moq-json that compresses each group as a single zstd stream, flushed at every frame so a snapshot followed by deltas shares one warm window (later frames reuse the earlier ones as context). Frames are magicless with no per-frame checksum, since moq-net's framing already delimits each slice. An optional shared dictionary primes the window so even a group's first frame compresses well. When compression is enabled the delta-vs-snapshot rolling budget is measured on the real (compressed) slice sizes rather than the raw JSON, so the warm window's progressively smaller deltas pack more updates into a group. The plaintext path is unchanged, and compression defaults off, so existing tracks (including the hang catalog) are byte-identical on the wire. A cumulative per-group decompressed-size cap plus zstd's windowLogMax guard against decompression bombs. `Config` becomes `#[non_exhaustive]`; a `Consumer::with_compression` constructor takes the matching settings. A cloned consumer rebuilds its (non-cloneable) decoder window by replaying the group's already-read slices. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThe pull request adds optional raw DEFLATE compression for moq-json frame streams in Rust and TypeScript. It introduces compression codecs, wires compression into producer and consumer flows, extends Rust and TypeScript configuration and error handling, updates package dependencies, and adds tests for round-trips, decoding, ordering, reconstruction, and stored-frame size checks. 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Replace the zstd group-stream compression with per-frame DEFLATE (RFC 1951 raw deflate) and mirror it in @moq/json so a Rust producer and a browser consumer interoperate. The browser exposes no zstd, and its Compression Streams API can't flush mid-stream (verified: a long-lived CompressionStream emits nothing until close), so cross-frame window sharing isn't reachable in JS. Each frame is therefore compressed independently as a standalone deflate-raw blob, which the platform CompressionStream and Rust flate2 both speak. Snapshots and large frames still shrink well; tiny deltas barely benefit. Rust: - Swap the zstd dependency for flate2; compression.rs is now stateless per-frame compress/decompress with a per-frame zip-bomb cap. - Compression keeps only a `level`; the dictionary and window-sharing are gone (unreachable in the browser). Consumer::with_compression no longer takes settings, since deflate decode is self-describing. JS: - New deflate-raw codec helper on the Compression Streams API. - Producer gains `compression?: boolean`. Compression is async, so the compressed path serializes track writes through an ordered FIFO chain to preserve frame/group order while update()/mutate() stay synchronous. - Consumer decompresses each frame; the delta-vs-snapshot budget is decided synchronously on raw sizes (vs Rust's compressed sizes), which doesn't affect the wire format. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-json/src/lib.rs (1)
38-55: 📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick winMark the public
Errorenum as non-exhaustive.This PR adds variants to a public enum; without
#[non_exhaustive], downstream exhaustive matches stay part of the API surface and the next variant is another breaking change.Proposed fix
+#[non_exhaustive] pub enum Error {As per coding guidelines,
rs/**/*.rs: “Always mark public error enums as#[non_exhaustive].”🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-json/src/lib.rs` around lines 38 - 55, The public Error enum is missing the #[non_exhaustive] attribute, which means downstream code can exhaustively match on all current variants. This becomes a breaking change when new variants are added in the future. Add the #[non_exhaustive] attribute directly above the pub enum Error definition to mark it as non-exhaustive and allow safe addition of new variants without breaking existing exhaustive matches.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@js/json/src/producer.ts`:
- Around line 273-286: The update() method lacks a guard to check the `#failed`
flag before executing its synchronous logic. When a compressed-write fails in
the `#enqueue`() method, the track is closed and `#failed` is set to true, but a
subsequent update() call proceeds to execute synchronous operations like
`#snapshot`() and track.appendGroup(), which throws because the track is closed.
Add a check at the beginning of the update() method to return early if `#failed`
is true, mirroring the fail-fast guard pattern already present in `#enqueue`() to
ensure dropped producers do not throw unexpectedly.
In `@rs/moq-json/src/compression.rs`:
- Around line 79-80: The buffer allocation in the DeflateDecoder block uses
untrusted publisher-controlled input (slice.len()) to calculate initial vector
capacity without bounds, potentially allowing excessive memory allocation before
the decompression guard activates. Replace the direct calculation of
`slice.len() * 2 + 16` with a saturating multiplication and min operation that
caps the allocation at `MAX_DECOMPRESSED_FRAME` to prevent denial-of-service
attacks through large compressed frames.
- Around line 41-46: The `level` field in the Compression struct is currently a
public u32, but only values 0..=9 are valid according to the flate2 library
specification. Make invalid compression levels unrepresentable by replacing the
public u32 field with either an enum variant or a validated newtype wrapper that
can only be constructed with valid values 0 through 9. Update the code that uses
this field (such as the Level::new(self.level) call) to work with the new
validated type, ensuring that callers cannot construct a Compression struct with
invalid level values like 99.
---
Outside diff comments:
In `@rs/moq-json/src/lib.rs`:
- Around line 38-55: The public Error enum is missing the #[non_exhaustive]
attribute, which means downstream code can exhaustively match on all current
variants. This becomes a breaking change when new variants are added in the
future. Add the #[non_exhaustive] attribute directly above the pub enum Error
definition to mark it as non-exhaustive and allow safe addition of new variants
without breaking existing exhaustive matches.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0686d18e-f239-49fb-af48-6a5761d39c60
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (8)
Cargo.tomljs/json/src/compression.test.tsjs/json/src/compression.tsjs/json/src/consumer.tsjs/json/src/producer.tsrs/moq-json/Cargo.tomlrs/moq-json/src/compression.rsrs/moq-json/src/lib.rs
✅ Files skipped from review due to trivial changes (2)
- Cargo.toml
- js/json/src/compression.test.ts
Cap the initial inflate buffer at the per-frame ceiling so a publisher- controlled compressed frame can't force a large allocation before the streaming size guard runs. Add a #failed guard to the JS producer's update() so a torn-down compressed producer no-ops instead of throwing from appendGroup() on a closed track. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj
Replace the public `level: u32` on Compression with a `Level` newtype whose constructor clamps to the valid 0..=9 range, so an out-of-range level can't be constructed and reach flate2 (where it would produce backend-dependent output). Compression::default() is now derived. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj
Replace per-frame-independent compression with one sync-flushed DEFLATE stream per group, so deltas reuse the snapshot (and earlier deltas) as context and shrink ~3x. Each frame is a self-delimited slice; the fixed 00 00 ff ff sync-flush marker is stripped on the wire and re-appended to decode (RFC 7692 style), saving 4 bytes per frame. Rust uses flate2 streaming (Compress/Decompress with Z_SYNC_FLUSH). The consumer keeps a per-group decoder and replays a group's slices to rebuild a cloned consumer's window. JS uses pako for both encode and decode (the platform CompressionStream can't flush mid-stream, and fflate's flush is broken on some inputs). pako is an optional peer dependency loaded via dynamic import, so consumers that never compress a track don't bundle it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
js/json/src/compression.ts (1)
58-62: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAdd
@publicto the exported codec classes.
EncoderandDecoderalready have doc comments, but this path requires@publicon load-bearing exported classes.Suggested diff
/** * Encodes a group's frame payloads into one shared DEFLATE stream, one self-delimited slice per * frame. Hold one per group; create a new one at each group boundary. Build with {`@link` create}. + * `@public` */ export class Encoder {/** * Decodes a group's frame slices back into the original payloads. Hold one per group; feed slices * in frame order (each frame builds on the earlier ones). Build with {`@link` create}. + * `@public` */ export class Decoder {As per coding guidelines,
js/**/src/**/*.{ts,tsx}says to "Document every exported symbol and add a top-of-file@moduledoc block to each entrypoint; use@publicon load-bearing classes".Also applies to: 98-102
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@js/json/src/compression.ts` around lines 58 - 62, Add the missing `@public` annotations to the exported codec classes in compression.ts: both Encoder and Decoder need to be marked public in their class doc comments. Update the existing documentation blocks for these symbols so they satisfy the js/**/src/**/* guideline for load-bearing exported classes, and make sure the Decoder declaration is fixed the same way as Encoder.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@js/json/src/compression.test.ts`:
- Around line 44-47: Add a regression test in compression.test.ts that exercises
the oversized valid-frame path in Decoder.frame rather than just malformed
input. Create a frame that successfully decompresses to more than the 64 MiB
limit and assert that Decoder.create() / Decoder.frame() rejects it with the
decompressed-size cap behavior. Use the existing "codec rejects garbage" test as
a nearby reference, and target the Decoder.frame and compression logic in
compression.ts so the zip-bomb guard stays pinned.
In `@rs/moq-json/src/compression.rs`:
- Around line 9-12: The module comment in compression.rs overstates the safety
guarantee of the DEFLATE window; update the documentation near the top of the
file so it no longer claims the 32 KiB window alone bounds inflation. Reword the
rationale around Decoder::frame and the raw DEFLATE/Z_SYNC_FLUSH behavior to
state that the actual protection against oversized output is
MAX_DECOMPRESSED_FRAME, and keep the comment aligned with the current
implementation and invariants.
---
Nitpick comments:
In `@js/json/src/compression.ts`:
- Around line 58-62: Add the missing `@public` annotations to the exported codec
classes in compression.ts: both Encoder and Decoder need to be marked public in
their class doc comments. Update the existing documentation blocks for these
symbols so they satisfy the js/**/src/**/* guideline for load-bearing exported
classes, and make sure the Decoder declaration is fixed the same way as Encoder.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: fb528038-49ec-4228-84a1-a03339e28339
⛔ Files ignored due to path filters (1)
bun.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
js/json/package.jsonjs/json/src/compression.test.tsjs/json/src/compression.tsjs/json/src/consumer.tsjs/json/src/producer.tsrs/moq-json/src/compression.rsrs/moq-json/src/lib.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- js/json/src/producer.ts
- rs/moq-json/src/lib.rs
Each compressed frame now carries a QUIC-varint prefix of its decompressed length (matching @moq/net's Varint). The decoder sizes its output buffer and rejects an over-cap frame before inflating, and verifies the inflated length matches the prefix. This also lets a future browser decoder delimit DecompressionStream output, which carries no frame boundary of its own. Also address review feedback: reword the module doc so the size bound is MAX_DECOMPRESSED_FRAME rather than the 32 KiB window, mark the JS Encoder/Decoder @public, and add cap/mismatch regression tests on both sides. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj
fflate's streaming Deflate.flush() mis-encodes some inputs: a catalog snapshot + 3 deltas can't round-trip even through fflate's own Inflate, while the pako codec handles it. Add a regression test contrasting the two (with a positive control showing fflate works on simpler frames) so a future swap to the smaller fflate is caught. Adds fflate as a devDependency. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj
pako is synchronous, so the dynamic import was the only reason the codec was async. Make pako a normal dependency with a static import, turn the Encoder/Decoder factories into plain constructors, and drop the producer's async write-chain (#chain/#enqueue/#failed) now that compressed writes run inline. fflate's flush bug is fixed upstream; once released we can swap pako for the smaller fflate behind the same synchronous interface. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj
| } | ||
|
|
||
| // Concatenate chunks into one buffer (a single chunk passes through untouched). | ||
| function concat(chunks: Uint8Array[], total: number): Uint8Array { |
There was a problem hiding this comment.
Is there no way to write to a buffer directly to avoid these copies?
| private constructor(lib: Pako) { | ||
| this.#deflate = new lib.Deflate({ raw: true }); | ||
| this.#flush = lib.constants.Z_SYNC_FLUSH; | ||
| this.#deflate.onData = (chunk) => { |
There was a problem hiding this comment.
Could we copy to the destination directly instead of pushing to chunks?
Any way to have deflate reuse a buffer or write directly?
| const full = concat(this.#chunks, this.#total); | ||
| // Drop the trailing sync-flush marker (the decoder re-appends it) and prefix the length. | ||
| const deflate = full.subarray(0, full.length - SYNC_FLUSH_TAIL.length); | ||
| const prefix = Varint.encode(payload.length); |
There was a problem hiding this comment.
Do we still need the varint at the front? It's kind of annoying because it requires reallocating, and adds few bytes of course.
At the very least it should be appended to the end (since we're using a framed transport), but that would require some new varint encoding.
|
|
||
| // The decompressed-length prefix bounds the frame before any inflation. | ||
| const [declared, deflate] = Varint.decode(slice); | ||
| if (declared > MAX_DECOMPRESSED_FRAME) { |
There was a problem hiding this comment.
We need to enforce this within onData
| /// | ||
| /// This is a sender-only choice and need not match the consumer (the wire format is the same | ||
| /// at any level). Browser producers can't set it; the platform deflate picks its own level. | ||
| pub level: Level, |
There was a problem hiding this comment.
IDK just use the default level for now. Make compression a bool.
| /// options stay additive): `let mut config = Config::default(); config.delta_ratio = 0;`. | ||
| #[derive(Debug, Clone)] | ||
| #[non_exhaustive] | ||
| pub struct Config { |
There was a problem hiding this comment.
This should be ProducerConfig.
| /// | ||
| /// Decompression is self-describing, so no settings are needed beyond knowing the track is | ||
| /// compressed (the producer's level does not have to be matched). | ||
| pub fn with_compression(track: moq_net::TrackConsumer) -> Self { |
There was a problem hiding this comment.
We should have DecoderConfig instead. Compression is a boolean.
…prefix
Address review feedback:
- Compression is now a plain bool at the default DEFLATE level; drop the
Compression/Level types (Rust) and the per-frame varint length prefix on
both sides. moq-net already frames each slice, and the decoder bounds its
output as it inflates (in onData on the JS side) rather than from a
declared length.
- Split the Rust config into ProducerConfig { delta_ratio, compression } and
ConsumerConfig { compression }; Consumer::new now takes ConsumerConfig,
replacing with_compression.
- Cut copies: the encoder assembles one tight owned buffer (no prefix merge),
and the decoder feeds the slice + re-appended marker as two pako pushes
instead of allocating a combined buffer.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj
|
Pushed On copying / "have deflate reuse a buffer or write directly?" pako's streaming API only hands output to Dropped the front varint entirely (both sides). moq-net already frames each slice, so it wasn't needed for delimiting, and the size cap is now enforced as output is produced (in Compression is a Config split: per your pick I split Rust into (Written by Claude) Generated by Claude Code |
A second moq-boy call site (input.rs) constructs a moq_json::Consumer and needs the new ConsumerConfig argument. Missed locally because moq-boy can't link system ffmpeg here; CI caught it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj
Add a round-trip test on both sides using high-entropy data that barely compresses, so the compressed slice exceeds the Rust streaming CHUNK scratch buffer and pako's chunkSize. This exercises the multi-iteration (de)compress loops and the multi-chunk assembly that small payloads never reach. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01B3xwgHid4UYjeugewkxUyj
Moves payload compression up to the application layer (
moq-json) instead of the moq-lite wire (the alternative explored in #1874 / #1889). The relay stays media-agnostic, and@moq/jsongets a matching, wire-interoperable implementation so a Rust producer and a browser consumer can talk.Approach: shared-window DEFLATE per group
Each group is compressed as one raw DEFLATE (RFC 1951) stream, sync-flushed at each frame boundary. Every frame is a self-delimited slice, but later frames reuse the snapshot (and earlier deltas) as window context, so a delta restating known keys is almost all back-references. Measured on a realistic catalog (snapshot + 5 deltas): deltas drop from ~44 B (independent) to ~12–16 B (shared window), ~38% smaller overall.
Each frame's wire slice is just the DEFLATE bytes with the fixed 4-byte sync-flush marker
00 00 ff ffstripped (re-appended to decode), the RFC 7692 (permessage-deflate) trick, saving 4 bytes/frame. moq-net already frames each slice, so there's no length prefix; the decoder bounds its output as it inflates (inonDataon the JS side, in the inflate loop on Rust) rather than from any declared size.Compression is off by default; existing tracks (including the hang
catalog.json) stay byte-identical on the wire.Why pako on the JS side (and the dead ends)
All verified empirically:
CompressionStream("deflate-raw")genuinely can't flush mid-stream (emits nothing untilclose(); its writer has noflush()), so it can't encode a per-frame shared-window stream.fflatecan sync-flush, but its encoder flush is buggy on some inputs (a snapshot + 3 deltas case: even fflate's own decoder can't round-trip it). A regression test pins this; the bug is fixed upstream, so once released, swapping pako for the smaller fflate is a drop-in behind the same synchronous interface.flate2both directions.pako is synchronous, so the whole codec is synchronous and pako is a normal
dependency(~14 KB gzipped, pulled in only by@moq/jsonconsumers).What changed
Rust (
moq-json) —compression.rsis a per-group streamingEncoder/Decoderoverflate2'sCompress/DecompresswithZ_SYNC_FLUSH, raw DEFLATE at the default level, RFC 7692 tail strip/re-append, and a 64 MiB per-frame decompressed-size cap enforced as output is produced. TheConsumerkeeps a per-group decoder and replays a group's slices to rebuild a cloned consumer's window. The delta-vs-snapshot budget is measured on compressed slice sizes.JS (
@moq/json) —compression.tsis pako-based per-groupEncoder/Decoder(plain synchronous constructors) with the same tail stripping. The encoder makes one copy into a tight owned buffer (pako backs each chunk with a ~16 KB buffer andwriteFrameretains the reference, so returning a view would pin memory); the decoder feeds the slice + re-appended marker as two pako pushes, allocating no combined buffer.Producer/Consumerkeepcompression?: boolean; compressed writes happen inline.Public API
moq-jsonProducerConfig { delta_ratio: u32, compression: bool }(#[non_exhaustive],Default).ConsumerConfig { compression: bool }(#[non_exhaustive],Default).Producer::new(track, ProducerConfig);Consumer::new(track, ConsumerConfig).ErrorgainsDecompressandTooLarge(u64)(Erroris#[non_exhaustive]).moq-mux(catalog producer/consumer),moq-boy.@moq/jsonConfig.compression?: boolean(additive on/off toggle). Left as a single genericConfig<T>for producer and consumer; mirroring the RustProducerConfig/ConsumerConfigsplit is deferred to a follow-up if wanted.pakoadded as a normaldependency.Branch targeting
Targets
main: compression is off by default and unreleased, the catalog wire bytes are unchanged, and neithermoq-jsonnorjs/jsonis among the crates/packages the breaking-change rule routes todev. The compressed-frame wire format and the config API changed across revisions of this same (unreleased) PR.Cross-package sync
rs/moq-json↔js/jsonare in sync on the wire format.moq-jsonis not on the wire-sync table and the catalog format is unchanged, so nodoc/updates are required.Test plan
cargo test -p moq-json— 34 tests incl. group round-trip, cross-frame-context shrink, compressed snapshot/delta round-trips, late-joiner, mid-group cloned-consumer window rebuild, windowed-delta-beats-raw size check, a chunk-spanning large-frame round-trip, and codec garbage rejection.cargo clippy -p moq-json --tests+cargo fmt -p moq-json— clean.moq-muxbuilds.bun test js/json— 52 tests incl. group round-trip, cross-frame-context shrink, compressed snapshot/delta/late-joiner reconstruction, in-order live consumer, fresh-decoder-decodes-a-snapshot interop, inflate-past-cap rejection, a chunk-spanning large-frame round-trip, a size-shrink check, and a pako-vs-fflate test pinning why we use pako (with a positive control).tsc --noEmit+biome checkon@moq/json— clean.@moq/hangstill type-checks.just civia the pinned nix toolchain) — green.Wire-format interop verified
flate2(≈zlib) ↔ pako, both encode→decode directions, with RFC 7692 tail stripping, on small + large + the catalog cases.🤖 Generated with Claude Code
(Written by Claude)