Perf: Window topn optimisation#21479
Conversation
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you — this PR looks really nice.
I took a quick look and left a few suggestions. I’ll review the optimizer rewrite and execution side more carefully later.
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| // Standalone H2O groupby Q8 benchmark: PartitionedTopKExec enabled vs disabled |
There was a problem hiding this comment.
We could keep this benchmark in this PR, but it would be great to clean it up later.
To make benchmark maintenance easier, we could directly add queries representing this workload to h2o window benchmark, so that similar benchmarks won't get scattered to multiple places.
datafusion/benchmarks/bench.sh
Line 123 in e1ad871
Though the issue is now the h2o benchmark counts the dataset loading time, so we can't isolate the target executor's processing time, so we could add an option to eliminate the data loading time later 🤔
There was a problem hiding this comment.
Though the issue is now the h2o benchmark counts the dataset loading time, so we can't isolate the target executor's processing time, so we could add an option to eliminate the data loading time later
Shall I keep benchmark query in h2o benchmark in this PR or shall we do it once we eliminate data loading time?
There was a problem hiding this comment.
I prefer to move the benchmark in this PR into the h2o framework. We could directly add queries to the h2o-window queries, since it's not a standard benchmark.
| // Step 1: Match FilterExec at the top | ||
| let filter = plan.downcast_ref::<FilterExec>()?; | ||
|
|
||
| // Don't handle filters with projections |
There was a problem hiding this comment.
I'm curious why skipping this
There was a problem hiding this comment.
The filter's column indices would point to the projected schema, not the window exec's output schema, so our index-based matching for the ROW_NUMBER column would be wrong without resolving the projection mapping. Skipping this case for simplicity right now.
There was a problem hiding this comment.
Yes, it's a good idea to keep things simpler at start.
Could you file a PR for this follow-up work? I'm happy to do it also.
| )?)) | ||
| } | ||
|
|
||
| fn apply_expressions( |
There was a problem hiding this comment.
Not related to this PR, but I’m curious why this is a required ExecutionPlan API and when it is used, given that different operators can hold expressions for very different purposes 🤔
| # Tests for Window TopN optimization: PartitionedTopKExec | ||
|
|
||
| statement ok | ||
| CREATE TABLE window_topn_t (id INT, pk INT, val INT) AS VALUES |
There was a problem hiding this comment.
I suggest moving the main test coverage here, instead of keeping it in unit tests across different layers such as optimizer tests. Once we have solid coverage here, it is less likely to get lost during local refactors.
We can also extend the coverage with more edge cases, for example:
- predicates such as
rn < 2,2 > rn, etc. - mixing other window expressions with
row_number() - empty or overlapping partition / order keys, such as
... OVER (ORDER BY id)or... OVER (PARTITION BY id ORDER BY id, customer) - different sort options such as
ASC,DESC, andNULLS FIRST - the
QUALIFYclause https://datafusion.apache.org/user-guide/sql/select.html#qualify-clause - and more
There was a problem hiding this comment.
added tests for these cases
|
If it has regressions as large as |
2010YOUY01
left a comment
There was a problem hiding this comment.
I have reviewed it carefully, and it looks good to me.
I think it’s ready to go once the output batch coalescing is addressed (see comment). The other suggestions are preferably to be handled in follow-up PRs to keep this PR simple and focused.
| // Step 1: Match FilterExec at the top | ||
| let filter = plan.downcast_ref::<FilterExec>()?; | ||
|
|
||
| // Don't handle filters with projections |
There was a problem hiding this comment.
Yes, it's a good idea to keep things simpler at start.
Could you file a PR for this follow-up work? I'm happy to do it also.
| }}; | ||
| } | ||
|
|
||
| // ---------- Accumulation phase ---------- |
There was a problem hiding this comment.
Optimization to try as follow-up:
To make it faster, we might want to add a fast path for single partition keys like PARTITION BY a, since we don't have to do row conversion here.
Co-authored-by: Yongting You <2010youy01@gmail.com>
2010YOUY01
left a comment
There was a problem hiding this comment.
Thanks again!
I plan to merge it after 1-2 days, in case others want to review it again.
This reverts commit 936db37.
|
This seems to have compilation issues against main, despite not having merge conflicts. I've opened a revert PR, sorry about that @SubhamSinghal. I think we definitely want this optimization in. |
Thanks for catching this timely, we got that fixed. Probably we should manually re-trigger CI before merging for large PRs, until the merge queue is able to handle this 🤔 |
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> `main` is not able to compile due to merge race by #21479 and #21573 This PR fixes the conflict ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
## Which issue does this PR close? - Related to apache#6899. ## Rationale for this change Queries like `SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM t WHERE rn <= K` are extremely common in analytics ("top N per group"). The current plan sorts the **entire** dataset O(N log N), computes ROW_NUMBER for all rows, then filters. With 10M rows, 1K partitions, and K=3, we sort all 10M rows but only keep 3K. This PR introduces a `PartitionedTopKExec` operator that replaces the `SortExec`, maintaining a per-partition `TopK` heap (reusing DataFusion's existing `TopK` implementation). Cost drops to O(N log K) time and O(K × P × row_size) memory. ## What changes are included in this PR? **New physical operator: `PartitionedTopKExec`** (`physical-plan/src/sorts/partitioned_topk.rs`) - Reads unsorted input, groups rows by partition key using `RowConverter`, feeds sub-batches to a per-partition `TopK` heap - Emits only the top-K rows per partition in sorted `(partition_keys, order_keys)` order - Reuses the existing `TopK` implementation for heap management, sort key comparison, eviction, and batch compaction **New optimizer rule: `WindowTopN`** (`physical-optimizer/src/window_topn.rs`) Detects the pattern: ```text FilterExec(rn <= K) [optional ProjectionExec] BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...) SortExec(partition_keys, order_keys) ``` And replaces it with: ```text [optional ProjectionExec] BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...) PartitionedTopKExec(fetch=K) ``` Both `FilterExec` and `SortExec` are removed. Supported predicates: `rn <= K`, `rn < K`, `K >= rn`, `K > rn`. The rule only fires for `ROW_NUMBER` with a `PARTITION BY` clause. Global top-K (no `PARTITION BY`) is already handled by `SortExec` with `fetch`. **Config flag:** `datafusion.optimizer.enable_window_topn` (default: `true`) **Benchmark results** (H2O groupby Q8, 10M rows, top-2 per partition): cargo run --release --example h2o_window_topn_bench | Scenario | Enabled (ms) | Disabled (ms) | Speedup | |----------|-------------|--------------|---------| | 100 partitions (100K rows/part) | 43 | 174 | 4.0x | | 1K partitions (10K rows/part) | 71 | 146 | 2.1x | | 10K partitions (1K rows/part) | 619 | 128 | 0.2x (regression) | | 100K partitions (100 rows/part) | 4368 | 135 | 0.03x (regression) | The 100K-partition regression is expected: per-partition `TopK` overhead (RowConverter, MemoryReservation per instance) dominates when partitions are very numerous with few rows each. For the common case (moderate partition cardinality), the optimization provides 2-3x speedup. ## Are these changes tested? Yes: - **7 unit tests** (`core/tests/physical_optimizer/window_topn.rs`): basic ROW_NUMBER, `rn < K`, flipped predicates, non-window column filter, config disabled, no partition by, projection between filter and window - **5 SLT tests** (`sqllogictest/test_files/window_topn.slt`): correctness verification, EXPLAIN plan validation, `rn < K`, no-partition-by case, config disabled fallback ## Are there any user-facing changes? No breaking API changes. The optimization is disabled by default and transparent to users. It can be enabled via: ```sql SET datafusion.optimizer.enable_window_topn = true; ``` --------- Co-authored-by: Subham Singhal <subhamsinghal@Subhams-MacBook-Air.local> Co-authored-by: Yongting You <2010youy01@gmail.com>
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> `main` is not able to compile due to merge race by apache#21479 and apache#21573 This PR fixes the conflict ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
Which issue does this PR close?
ROW_NUMBER < 5/ TopK #6899.Rationale for this change
Queries like
SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn FROM t WHERE rn <= Kare extremely common in analytics ("top N per group"). The current plan sorts the entire dataset O(N log N), computes ROW_NUMBER for all rows, then filters. With 10M rows, 1K partitions, and K=3, we sort all 10M rows but only keep 3K.This PR introduces a
PartitionedTopKExecoperator that replaces theSortExec, maintaining a per-partitionTopKheap (reusing DataFusion's existingTopKimplementation). Cost drops to O(N log K) time and O(K × P × row_size) memory.What changes are included in this PR?
New physical operator:
PartitionedTopKExec(physical-plan/src/sorts/partitioned_topk.rs)RowConverter, feeds sub-batches to a per-partitionTopKheap(partition_keys, order_keys)orderTopKimplementation for heap management, sort key comparison, eviction, and batch compactionNew optimizer rule:
WindowTopN(physical-optimizer/src/window_topn.rs)Detects the pattern:
And replaces it with:
Both
FilterExecandSortExecare removed.Supported predicates:
rn <= K,rn < K,K >= rn,K > rn.The rule only fires for
ROW_NUMBERwith aPARTITION BYclause. Global top-K (noPARTITION BY) is already handled bySortExecwithfetch.Config flag:
datafusion.optimizer.enable_window_topn(default:true)Benchmark results (H2O groupby Q8, 10M rows, top-2 per partition):
cargo run --release --example h2o_window_topn_bench
The 100K-partition regression is expected: per-partition
TopKoverhead (RowConverter, MemoryReservation per instance)dominates when partitions are very numerous with few rows each. For the common case (moderate partition cardinality), the
optimization provides 2-3x speedup.
Are these changes tested?
Yes:
core/tests/physical_optimizer/window_topn.rs): basic ROW_NUMBER,rn < K, flipped predicates, non-window column filter, config disabled, no partition by, projection between filter and windowsqllogictest/test_files/window_topn.slt): correctness verification, EXPLAIN plan validation,rn < K, no-partition-by case, config disabled fallbackAre there any user-facing changes?
No breaking API changes. The optimization is disabled by default and transparent to users. It can be enabled via: