Add exact reverse scan support to PushdownSort with limit-after-reverse fix#47
Add exact reverse scan support to PushdownSort with limit-after-reverse fix#47zhuqi-lucas merged 0 commit intobranch-52from
Conversation
9a3dd73 to
c54b81e
Compare
c54b81e to
a66c2ef
Compare
8b58b9f to
03cd09a
Compare
03cd09a to
fdfa2c7
Compare
64ceaad to
79e89b0
Compare
1fe3ede to
fd2650a
Compare
There was a problem hiding this comment.
Pull request overview
Adds an Exact reverse scan mode for Parquet sort pushdown so ORDER BY ... DESC LIMIT N can eliminate the physical Sort/TopK and push LIMIT into the scan while still producing globally sorted output.
Changes:
- Introduces
enable_exact_reverse_scanParquet/session config and wires it intoParquetSourcereverse-scan pushdown behavior (ExactvsInexact). - Implements per-row-group buffering + row reversal in the Parquet opener path, and applies limit after reversal for correctness.
- Updates physical optimizer sort pushdown to remove
SortExeconExactand pushfetchdown; adds unit/snapshot + SLT coverage and config docs.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/source/user-guide/configs.md | Documents the new enable_exact_reverse_scan config option. |
| datafusion/sqllogictest/test_files/sort_pushdown.slt | Adds end-to-end SLT cases validating exact reverse behavior, limit/offset handling, and toggle on/off. |
| datafusion/sqllogictest/test_files/information_schema.slt | Updates SHOW ALL expectations to include the new config key. |
| datafusion/proto/src/logical_plan/file_formats.rs | Defaults new option during protobuf → options conversion. |
| datafusion/proto-common/src/from_proto/mod.rs | Defaults new option during protobuf → common options conversion. |
| datafusion/physical-optimizer/src/pushdown_sort.rs | On exact pushdown, removes SortExec and pushes fetch down into the plan tree. |
| datafusion/datasource-parquet/src/source.rs | Adds with_exact_reverse, tracks exact/inexact reverse behavior, and updates plan display. |
| datafusion/datasource-parquet/src/opener.rs | Adds reverse_rows support; wraps stream with ReversedRowGroupStream to reverse rows within RG and apply limit post-reversal. |
| datafusion/core/tests/physical_optimizer/test_utils.rs | Adds test helper to build Parquet exec with exact reverse enabled. |
| datafusion/core/tests/physical_optimizer/pushdown_sort.rs | Adds optimizer snapshot tests covering exact reverse sort removal and fetch pushdown (including through projections). |
| datafusion/common/src/file_options/parquet_writer.rs | Threads the new option through ParquetOptions test scaffolding / writer-option destructuring. |
| datafusion/common/src/config.rs | Adds the enable_exact_reverse_scan Parquet config definition and docs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Addresses Copilot review comment on PR #47: when `row_selection` is present (e.g. from page pruning via pushdown_filters), the parquet stream emits only the selected rows, so seeding `rg_row_counts` from `RowGroupMetaData::num_rows()` caused ReversedRowGroupStream to mis-detect row-group boundaries and silently mix batches from multiple row groups, producing wrong ordering. Fix: new `compute_selected_rows_per_rg` helper walks the RowSelection in lock-step with the row groups and computes the actual output row count per RG. Tests added: - 4 unit tests for compute_selected_rows_per_rg (no skip, spanning skips, all-skipped, short selection error) - test_exact_reverse_scan_multi_rg_produces_global_desc: verifies Inexact yields [7,8,9,4,5,6,1,2,3] while Exact yields [9..1] (globally DESC) - test_exact_reverse_scan_applies_limit_after_reversal: verifies limit=4 over [1..9] yields [9,8,7,6] (top of forward order, not first N pre-reverse) - test_exact_reverse_scan_with_row_selection_across_rgs: regression test for the row_selection bug — 3 RGs with per-RG selections yield the expected [10,9,8,7,6,5,4,3] - test_exact_reverse_scan_with_row_selection_and_limit: combined case
Addresses Copilot review comment on PR #47: when `row_selection` is present (e.g. from page pruning via pushdown_filters), the parquet stream emits only the selected rows, so seeding `rg_row_counts` from `RowGroupMetaData::num_rows()` caused ReversedRowGroupStream to mis-detect row-group boundaries and silently mix batches from multiple row groups, producing wrong ordering. Fix: new `compute_selected_rows_per_rg` helper walks the RowSelection in lock-step with the row groups and computes the actual output row count per RG. Tests added: - 4 unit tests for compute_selected_rows_per_rg (no skip, spanning skips, all-skipped, short selection error) - test_exact_reverse_scan_multi_rg_produces_global_desc: verifies Inexact yields [7,8,9,4,5,6,1,2,3] while Exact yields [9..1] (globally DESC) - test_exact_reverse_scan_applies_limit_after_reversal: verifies limit=4 over [1..9] yields [9,8,7,6] (top of forward order, not first N pre-reverse) - test_exact_reverse_scan_with_row_selection_across_rgs: regression test for the row_selection bug — 3 RGs with per-RG selections yield the expected [10,9,8,7,6,5,4,3] - test_exact_reverse_scan_with_row_selection_and_limit: combined case
d8d48c0 to
6ff5bdf
Compare
| // pushdown_filters), the stream emits only the selected rows, so | ||
| // `RowGroupMetaData::num_rows()` would over-count and cause | ||
| // ReversedRowGroupStream to misdetect row-group boundaries. | ||
| let rg_row_counts: Vec<usize> = if reverse_rows { |
There was a problem hiding this comment.
When rg_row_counts contains a leading or middle 0 (an RG where every row is skipped by RowSelection — reachable via pushdown_filters + page pruning), the stream’s rows_remaining_in_rg sits at 0. The first batch of the next RG arrives, saturating_sub(num_rows) stays 0, and flush_buffer() fires immediately — attributing that batch to the empty RG, advancing current_rg, and splitting the real RG’s batches across two flush cycles. Result: wrong batch ordering within the real RG.
There was a problem hiding this comment.
This is a good catch!
There was a problem hiding this comment.
Addressed in latest PR.
There was a problem hiding this comment.
Round-trip data flow on a remote executor:
• PushdownSort removes the Sort operator locally (Exact path), leaving only DataSourceExec with reverse_row_groups=true, reverse_rows=true.
• try_from_data_source_exec serializes only FileScanConfig + predicate + table_parquet_options — not the reverse_row_groups/reverse_rows/exact_reverse fields on ParquetSource.
• ParquetOptions → protobuf::ParquetOptions doesn’t include enable_exact_reverse_scan at all, and the reverse path hardcodes false.
• Deserialized plan: DataSourceExec with all reverse flags = false, no Sort above it. Reads files forward → ASC output returned for a DESC query.
There was a problem hiding this comment.
Addressed in latest PR.
xudong963
left a comment
There was a problem hiding this comment.
iirc, the pr won't be contributed to upstream?
I can try to contribute to upstream as an option which is default false. |
xudong963
left a comment
There was a problem hiding this comment.
If we verify these comments are real risk, could you please also add some slt tests for them?
There was a problem hiding this comment.
I think there is still a correctness issue in the exact reverse path here.
rg_row_counts only accounts for row_selection, but the stream can also have a row_filter applied. In arrow-rs, RowFilter is applied after row-group selection / row selection, so the actual number of rows emitted for a row group can be smaller than the count computed here.
On top of that, ReversedRowGroupStream infers row-group boundaries by accumulating emitted batch row counts, but the parquet reader can emit batches that span row-group boundaries. That means the boundary detector can drift and silently buffer batches from multiple row groups together.
As a result, queries like WHERE ... ORDER BY ... DESC LIMIT ... on the exact reverse path may return rows in the wrong order without failing.
There was a problem hiding this comment.
@xudong963 Fixed in latest PR:
I redesigned the implementation here:
Replaced ReversedRowGroupStream with per-RG independent reading (modeled after our internal ReverseParquetSource). Each RG gets its own ParquetRecordBatchStreamBuilder with .with_row_groups(vec![rg]), so RowFilter is applied independently per-RG. No more rg_row_counts boundary detection. Memory stays O(largest RG). Added SLT test with pushdown_filters=true + WHERE predicate confirming correct results.
There was a problem hiding this comment.
I think the physical-plan roundtrip is losing execution state here.
On decode we reconstruct the parquet source from ParquetSource::new(...).with_table_parquet_options(...), but the reverse-scan runtime flags (reverse_row_groups / reverse_rows) are not restored. On encode, those fields are not serialized either.
That looks dangerous for any path that ships physical plans through protobuf / RemoteExec: the optimizer may already have removed SortExec, but after deserialization the scan may no longer perform reverse scanning at execution time.
There was a problem hiding this comment.
Added reverse_row_groups and reverse_rows fields to ParquetScanExecNode proto message. Serialized on encode, restored on decode. Remote executors now correctly preserve reverse scan state after plan
roundtrip.
| Arc::new(self.clone().with_reverse_row_groups(true)) | ||
| }; | ||
|
|
||
| if self.exact_reverse { |
There was a problem hiding this comment.
I think returning Exact here may make the plan properties inconsistent.
On the exact-reverse path, the scan now produces the reversed order, but the downstream FileScanConfig metadata still keeps the original output_ordering rather than reversing it. That means after SortExec is removed, the plan can still advertise the original ordering even though the actual output direction has changed.
There was a problem hiding this comment.
The output_ordering reflects the physical sort order of the data files, which doesn't change, we just read it in reverse. The Exact result tells the optimizer that the output satisfies the requested reversed ordering, and PushdownSort updates the plan properties accordingly when it removes the Sort. So FileScanConfig.output_ordering staying as-is is correct here.
Thanks for review, i will look back this PR after urgent issues done. |
e7b8083 to
f0004c4
Compare
Summary
Adds Exact reverse scan support to the parquet sort pushdown. When enabled, the Sort/TopK operator above a FileScanExec for ORDER BY ... DESC LIMIT N queries can be removed entirely and the limit pushed down to the scan, while still producing globally sorted output.
This builds on top of the existing Inexact reverse scan (which only reverses row group order — TopK must remain to re-sort rows within each RG).
Design
The ReversedRowGroupStream wraps the parquet stream:
Limit is applied inside ReversedRowGroupStream (after reversal), not at the parquet reader level — applying it before reversal would read the first N rows in forward order, giving wrong results.
Changes
Core
Tests
Other
Backward Compatibility
Fully backward compatible: enable_exact_reverse_scan defaults to false. All existing Inexact behavior is preserved.
Based on
Approach inspired by apache#18817.