Skip to content

Spill diff rows to disk during accumulation to prevent OOM#96

Closed
mason-sharp wants to merge 1 commit intomainfrom
task/ACE-175/diff-memory-usage
Closed

Spill diff rows to disk during accumulation to prevent OOM#96
mason-sharp wants to merge 1 commit intomainfrom
task/ACE-175/diff-memory-usage

Conversation

@mason-sharp
Copy link
Copy Markdown
Member

During table-diff and mtree-diff, all differing rows were held in memory in DiffResult.NodeDiffs. For large diffs (e.g. 250K+ rows with JSONB data) this alone could exhaust available memory before the report was even written. Additionally, json.NewEncoder.Encode still buffered the entire JSON output in memory before writing.

Introduce DiffRowSink, which buffers rows in memory up to a configurable threshold (default 10K per sink) then spills overflow to a temporary NDJSON file. At report time each sink's rows are loaded back one sink at a time, sorted by PK, and streamed to the output JSON one row at a time through a buffered writer. Downstream consumers (repair, rerun) read from the JSON file and are unaffected.

Also fix the HTML reporter to read the already-written JSON file instead of re-marshaling the entire DiffOutput struct.

During table-diff and mtree-diff, all differing rows were held in
memory in DiffResult.NodeDiffs. For large diffs (e.g. 250K+ rows
with JSONB data) this alone could exhaust available memory before
the report was even written. Additionally, json.NewEncoder.Encode
(introduced in v1.6.0 as "streaming") still buffered the entire
JSON output in memory before writing.

Introduce DiffRowSink, which buffers rows in memory up to a
configurable threshold (default 10K per sink) then spills overflow
to a temporary NDJSON file. At report time each sink's rows are
loaded back one sink at a time, sorted by PK, and streamed to the
output JSON one row at a time through a buffered writer. Downstream
consumers (repair, rerun) read from the JSON file and are
unaffected.

Also fix the HTML reporter to read the already-written JSON file
instead of re-marshaling the entire DiffOutput struct.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 25, 2026

📝 Walkthrough

Walkthrough

The PR introduces a bounded-memory diff accumulation mechanism via DiffSinks that spills overflow rows to temporary NDJSON files. Configuration, core diff tasks, and streaming JSON generation are updated to support this feature, reducing peak memory usage during large diff operations.

Changes

Cohort / File(s) Summary
Configuration & Documentation
docs/configuration.md, docs/design/table_diff.md, internal/cli/default_config.yaml, pkg/config/config.go
Added new diff_spill_threshold configuration option (default 10000) to control in-memory diff row accumulation before spilling to disk, with corresponding documentation and schema updates.
DiffRowSink Implementation
pkg/common/diff_sink.go
Introduced DiffRowSink and DiffSinks types for bounded in-memory diff accumulation with automatic spill-to-disk capability via temporary NDJSON files; includes Iterate and SortedIterate methods for row replay.
Diff Task Integration
internal/consistency/diff/table_diff.go, internal/consistency/mtree/merkle.go
Refactored TableDiffTask and MerkleTreeTask to use DiffSinks instead of direct in-memory map population; added DiffSpillThreshold field propagation and config validation.
Report Generation & Streaming
pkg/common/utils.go, pkg/common/html_reporter.go
Updated WriteDiffReport to accept DiffSinks and stream JSON output from sinks via sorted iteration; refactored writeHTMLDiffReport to load diff payload from JSON file instead of in-memory DiffOutput.
Test Coverage
pkg/common/diff_sink_test.go, pkg/common/stream_json_test.go
Added unit tests for DiffRowSink behavior (no-spill, spill, sorted iteration, resource cleanup) and streaming JSON generation with heap growth validation and large synthetic payloads.

Poem

🐰 A sink of rows, bounded and wise,
Spills to disk when memory grows too high,
Streaming JSON diffs with sorted care,
Large payloads float through thin air! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Spill diff rows to disk during accumulation to prevent OOM' directly describes the main change: introducing a disk-spilling mechanism for diff rows to avoid out-of-memory errors.
Description check ✅ Passed The description comprehensively explains the problem (memory exhaustion with large diffs), the solution (DiffRowSink with configurable threshold and disk spillover), and related improvements (HTML reporter optimization).

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch task/ACE-175/diff-memory-usage

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/common/utils.go (1)

1233-1244: ⚠️ Potential issue | 🔴 Critical

Bug: Variable shadowing causes HTML path to always be empty.

Line 1235 uses := which declares a new htmlPath variable in the inner scope, shadowing the htmlPath declared at line 1233. The outer variable remains an empty string, so the return at line 1244 will never include the generated HTML path.

🐛 Proposed fix
 	var htmlPath string
 	if strings.EqualFold(format, "html") {
-		htmlPath, err := writeHTMLDiffReport(jsonFileName)
+		var err error
+		htmlPath, err = writeHTMLDiffReport(jsonFileName)
 		if err != nil {
 			return "", "", err
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/common/utils.go` around lines 1233 - 1244, The htmlPath declared at the
top is being shadowed by a new inner-variable because the code uses := when
calling writeHTMLDiffReport; change the inner declaration to assignment (use =
instead of :=) so the outer htmlPath is set, keep the existing error check and
logger.Info call that references htmlPath, and ensure the call to
writeHTMLDiffReport and subsequent error handling remain inside the
strings.EqualFold(format, "html") block.
🧹 Nitpick comments (1)
pkg/common/diff_sink.go (1)

162-185: Consider increasing max line size for extremely large JSONB columns.

The scanner is configured with a 10MB max line size (line 174). While this handles most cases, rows with very large JSONB columns (e.g., 250K+ character documents) could exceed this limit and cause bufio.ErrTooLong.

If this is a concern, consider making the max size configurable or increasing it:

-	scanner.Buffer(make([]byte, 0, 256*1024), 10*1024*1024)
+	scanner.Buffer(make([]byte, 0, 256*1024), 64*1024*1024) // 64MB max

Alternatively, document this as a known limitation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/common/diff_sink.go` around lines 162 - 185, The iterateSpillFile method
on DiffRowSink currently hardcodes a 10MB scanner max token size which can hit
bufio.ErrTooLong for very large JSONB rows; make the limit configurable and
surface a clearer error: add a DiffRowSink field or package constant (e.g.,
SpillMaxLineSize or s.maxSpillLineSize) and use it in scanner.Buffer instead of
the fixed 10*1024*1024, allow initialization to set a larger value (or default a
larger safe size), and explicitly check for bufio.ErrTooLong from scanner.Err()
to return a descriptive error mentioning the configured max size and suggesting
increasing it.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/consistency/diff/table_diff.go`:
- Around line 1344-1345: The temp spill sinks are created with t.diffSinks =
make(utils.DiffSinks) but CloseAll() is only called later, which can leave
NDJSON files orphaned on early returns (e.g., getFirstError()). Immediately
defer closing after creating t.diffSinks so cleanup always runs: after
t.diffSinks = make(utils.DiffSinks) add a deferred call to
t.diffSinks.CloseAll() (or a small deferred wrapper that nil-checks and logs
errors) to guarantee sinks are closed even if the function exits early
(referencing t.diffSinks, CloseAll(), and the early-return path via
getFirstError()).

In `@internal/consistency/mtree/merkle.go`:
- Around line 1920-1921: The MerkleTreeTask never reads
cfg.TableDiff.DiffSpillThreshold so ace.yaml cannot tune spill behavior; mirror
the fix in TableDiffTask.Validate(): in MerkleTreeTask's
validation/initialization (e.g., MerkleTreeTask.Validate or the constructor that
sets m.diffSinks) copy cfg.TableDiff.DiffSpillThreshold into the task when the
task's DiffSpillThreshold is unset/zero, then ensure subsequent use of
m.diffSinks or diff creation honors that task-level DiffSpillThreshold;
reference MerkleTreeTask, TableDiffTask.Validate, and
cfg.TableDiff.DiffSpillThreshold when making the change.
- Around line 627-629: The Append() error from diffSinks.GetSink(nodePairKey,
nodeName, m.DiffSpillThreshold).Append(orderedRow) must be propagated so a
spill-write failure fails the whole mtree-diff task; update compareRangesWorker
to return that error (instead of only logging it), ensure processWorkItem
collects and returns worker errors up the chain, and make DiffMtree treat a
non-nil error from processWorkItem as a hard failure (stopping further writes).
Touch the Append call site and the compareRangesWorker, processWorkItem, and
DiffMtree call/return paths to pass the error back out rather than swallowing
it.

In `@pkg/common/html_reporter.go`:
- Around line 50-58: The code currently uses os.ReadFile -> rawJSON and
json.Unmarshal into types.DiffOutput (diffResult), which loads the entire diff
into memory; replace that with a streaming approach: open the file (os.Open
jsonFilePath), create a json.NewDecoder(file) and stream-decode only the
top-level fields/arrays you need (iterate over the pairs/rows array with
Decoder.Token()/Decode into small element structs) and assemble the pair/row
view-model incrementally so you never hold the full types.DiffOutput in memory;
update the logic that currently reads diffResult to consume the streamed
elements instead, and close the file when done.

---

Outside diff comments:
In `@pkg/common/utils.go`:
- Around line 1233-1244: The htmlPath declared at the top is being shadowed by a
new inner-variable because the code uses := when calling writeHTMLDiffReport;
change the inner declaration to assignment (use = instead of :=) so the outer
htmlPath is set, keep the existing error check and logger.Info call that
references htmlPath, and ensure the call to writeHTMLDiffReport and subsequent
error handling remain inside the strings.EqualFold(format, "html") block.

---

Nitpick comments:
In `@pkg/common/diff_sink.go`:
- Around line 162-185: The iterateSpillFile method on DiffRowSink currently
hardcodes a 10MB scanner max token size which can hit bufio.ErrTooLong for very
large JSONB rows; make the limit configurable and surface a clearer error: add a
DiffRowSink field or package constant (e.g., SpillMaxLineSize or
s.maxSpillLineSize) and use it in scanner.Buffer instead of the fixed
10*1024*1024, allow initialization to set a larger value (or default a larger
safe size), and explicitly check for bufio.ErrTooLong from scanner.Err() to
return a descriptive error mentioning the configured max size and suggesting
increasing it.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c80d2a9d-991d-494b-9437-16ae9b86d014

📥 Commits

Reviewing files that changed from the base of the PR and between 005e1e0 and 6477d39.

📒 Files selected for processing (11)
  • docs/configuration.md
  • docs/design/table_diff.md
  • internal/cli/default_config.yaml
  • internal/consistency/diff/table_diff.go
  • internal/consistency/mtree/merkle.go
  • pkg/common/diff_sink.go
  • pkg/common/diff_sink_test.go
  • pkg/common/html_reporter.go
  • pkg/common/stream_json_test.go
  • pkg/common/utils.go
  • pkg/config/config.go

Comment on lines +1344 to 1345
t.diffSinks = make(utils.DiffSinks)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Defer sink cleanup so failed runs don't orphan spill files.

CloseAll() only happens on Line 1614. Any earlier return after spilling starts—for example the getFirstError() exit on Lines 1598-1600—skips cleanup and leaves the temp NDJSON files behind.

Suggested fix
  t.DiffResult = types.DiffOutput{
    NodeDiffs: make(map[string]types.DiffByNodePair),
    Summary: types.DiffSummary{
      Schema:            t.Schema,
      Table:             t.BaseTable,
      TableFilter:       t.TableFilter,
      EffectiveFilter:   t.EffectiveFilter,
      Nodes:             t.NodeList,
      BlockSize:         t.BlockSize,
      CompareUnitSize:   t.CompareUnitSize,
      ConcurrencyFactor: t.ConcurrencyFactor,
      MaxDiffRows:       t.MaxDiffRows,
      StartTime:         startTime.Format(time.RFC3339),
      TotalRowsChecked:  int64(maxCount),
      DiffRowsCount:     make(map[string]int),
      AgainstOrigin:     t.AgainstOrigin,
      AgainstOriginResolved: func() string {
        if t.resolvedAgainstOrigin != "" && t.SpockNodeNames != nil {
          if name, ok := t.SpockNodeNames[t.resolvedAgainstOrigin]; ok {
            return name
          }
        }
        return ""
      }(),
      Until: func() string {
        if t.untilTime != nil {
          return t.untilTime.Format(time.RFC3339)
        }
        return strings.TrimSpace(t.Until)
      }(),
      OriginOnly: t.resolvedAgainstOrigin != "",
    },
  }
  t.diffSinks = make(utils.DiffSinks)
+ defer t.diffSinks.CloseAll()
- jsonPath, _, err := utils.WriteDiffReport(t.DiffResult, t.diffSinks, t.Schema, t.BaseTable, t.Output)
- t.diffSinks.CloseAll()
+ jsonPath, _, err := utils.WriteDiffReport(t.DiffResult, t.diffSinks, t.Schema, t.BaseTable, t.Output)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/diff/table_diff.go` around lines 1344 - 1345, The temp
spill sinks are created with t.diffSinks = make(utils.DiffSinks) but CloseAll()
is only called later, which can leave NDJSON files orphaned on early returns
(e.g., getFirstError()). Immediately defer closing after creating t.diffSinks so
cleanup always runs: after t.diffSinks = make(utils.DiffSinks) add a deferred
call to t.diffSinks.CloseAll() (or a small deferred wrapper that nil-checks and
logs errors) to guarantee sinks are closed even if the function exits early
(referencing t.diffSinks, CloseAll(), and the early-return path via
getFirstError()).

Comment on lines +627 to +629
if err := m.diffSinks.GetSink(nodePairKey, nodeName, m.DiffSpillThreshold).Append(orderedRow); err != nil {
return false, err
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Propagate spill-write failures out of mtree-diff.

These new Append() errors never fail the task: processWorkItem() returns them, but compareRangesWorker() only logs them on Lines 293-297, and DiffMtree() still writes whatever partial rows were accumulated. A full temp filesystem will therefore produce a truncated diff with a successful status.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/mtree/merkle.go` around lines 627 - 629, The Append()
error from diffSinks.GetSink(nodePairKey, nodeName,
m.DiffSpillThreshold).Append(orderedRow) must be propagated so a spill-write
failure fails the whole mtree-diff task; update compareRangesWorker to return
that error (instead of only logging it), ensure processWorkItem collects and
returns worker errors up the chain, and make DiffMtree treat a non-nil error
from processWorkItem as a hard failure (stopping further writes). Touch the
Append call site and the compareRangesWorker, processWorkItem, and DiffMtree
call/return paths to pass the error back out rather than swallowing it.

Comment on lines +1920 to 1921
m.diffSinks = make(utils.DiffSinks)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

mtree-diff never reads the configured spill threshold.

TableDiffTask.Validate() already copies cfg.TableDiff.DiffSpillThreshold into the task when unset. MerkleTreeTask never does, so ace.yaml cannot actually tune spill behavior for mtree runs.

Suggested fix
func (m *MerkleTreeTask) Validate() error {
  if err := m.validateCommon(); err != nil {
    return err
  }
+ if m.DiffSpillThreshold == 0 {
+   if spill := config.Get().TableDiff.DiffSpillThreshold; spill > 0 {
+     m.DiffSpillThreshold = spill
+   }
+ }
  if m.Mode == "listen" {
    return nil
  }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/consistency/mtree/merkle.go` around lines 1920 - 1921, The
MerkleTreeTask never reads cfg.TableDiff.DiffSpillThreshold so ace.yaml cannot
tune spill behavior; mirror the fix in TableDiffTask.Validate(): in
MerkleTreeTask's validation/initialization (e.g., MerkleTreeTask.Validate or the
constructor that sets m.diffSinks) copy cfg.TableDiff.DiffSpillThreshold into
the task when the task's DiffSpillThreshold is unset/zero, then ensure
subsequent use of m.diffSinks or diff creation honors that task-level
DiffSpillThreshold; reference MerkleTreeTask, TableDiffTask.Validate, and
cfg.TableDiff.DiffSpillThreshold when making the change.

Comment on lines +50 to +58
// Read the already-written JSON file instead of re-marshaling the struct.
rawJSON, err := os.ReadFile(jsonFilePath)
if err != nil {
return "", fmt.Errorf("failed to marshal diff result for HTML embedding: %w", err)
return "", fmt.Errorf("failed to read diff JSON for HTML embedding: %w", err)
}

var diffResult types.DiffOutput
if err := json.Unmarshal(rawJSON, &diffResult); err != nil {
return "", fmt.Errorf("failed to parse diff JSON for HTML report: %w", err)
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

HTML generation still reloads the full diff into memory.

os.ReadFile keeps one full copy of the report, json.Unmarshal builds another full object graph, and the later pair/row view-model assembly allocates even more. For the 250K+/JSONB diffs this PR is trying to make safe, --output html can still hit the same OOM class.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/common/html_reporter.go` around lines 50 - 58, The code currently uses
os.ReadFile -> rawJSON and json.Unmarshal into types.DiffOutput (diffResult),
which loads the entire diff into memory; replace that with a streaming approach:
open the file (os.Open jsonFilePath), create a json.NewDecoder(file) and
stream-decode only the top-level fields/arrays you need (iterate over the
pairs/rows array with Decoder.Token()/Decode into small element structs) and
assemble the pair/row view-model incrementally so you never hold the full
types.DiffOutput in memory; update the logic that currently reads diffResult to
consume the streamed elements instead, and close the file when done.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main concern is json, not HTML. Large diffs may be unreadable/impractical to view in a browser, so leaving.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mason-sharp That's a fair point — a browser rendering hundreds of thousands of rows would be unusable regardless of the memory cost on the generation side, so the HTML path's memory footprint is a moot concern in practice. Acknowledged, leaving as-is makes sense.

@mason-sharp mason-sharp marked this pull request as draft March 25, 2026 00:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants