perf: optimize HashTableLookupExpr::evaluate#19602
Conversation
There was a problem hiding this comment.
Pull request overview
This PR optimizes HashTableLookupExpr::evaluate by replacing per-row hash table lookups with a batch processing API. The previous implementation made individual get_matched_indices calls for each row, causing unnecessary memory allocations and redundant computations. The new approach uses a single batch call that sets bits in a buffer for all matching hashes at once, resulting in notable performance improvements across multiple TPC-H queries (up to 1.13x faster on Q18).
Key Changes:
- Introduced
set_bits_if_existstrait method for batch hash lookups - Refactored
HashTableLookupExpr::evaluateto use the new batch API - Added comprehensive test coverage for the new functionality
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
datafusion/physical-plan/src/joins/join_hash_map.rs |
Adds set_bits_if_exists trait method to JoinHashMapType and implements it for JoinHashMapU32 and JoinHashMapU64 with corresponding helper function and tests |
datafusion/physical-plan/src/joins/stream_join_utils.rs |
Implements set_bits_if_exists for PruningJoinHashMap to support the new batch lookup API |
datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs |
Refactors HashTableLookupExpr::evaluate to use the new batch API instead of per-row lookups |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
| } | ||
| self.hash_map | ||
| .set_bits_if_exists(hash_array.values(), buf.as_slice_mut()); |
There was a problem hiding this comment.
I think we could use with_hashes / reuse hashes buffer instead of allocating a new one each time.
There was a problem hiding this comment.
Done. Reusing the hashes buffer now. Thanks!
|
Very nice! I added 2 suggestions which could maybe improve it even further. |
Yes that's per design. It's only used when the filter is pushed down into and evaluated row by row by the Parquet machinery. There's work to make that the default: #19477 |
…er: &mut [u8])` -> `contain_hashes(&self, hash_values: &[u64]) -> BooleanArray`
1e684a1 to
2ab1a49
Compare
Thanks for reviewing! I've applied both suggestions. Could you please take another look? @Dandandan |
|
|
||
| // Optimization: if hash_expr is HashExpr, compute hashes directly into callback | ||
| // to avoid redundant allocations and copies. | ||
| if let Some(hash_expr) = self.hash_expr.as_any().downcast_ref::<HashExpr>() { |
There was a problem hiding this comment.
Isn't this always the case? We can remove the hashexpr (only store the inner expressions) while it is constructed to simplify the code?
There was a problem hiding this comment.
That's a great point. Storing the inner expressions directly is indeed a better approach, especially since the perfect hash join(which is being worked on in that separate pending PR) needs direct access to on_columns to identify the join key columns.
I initially considered changing HashTableLookupExpr.hash_expr to a concrete HashExpr type (which I've actually already done in another unmerged PR for perfect hash join). However, I think that accessing the columns through hash_expr.on_columns feels a bit clunky 😂.
898905f to
97864c9
Compare
|
It might be interesting to re-run https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/#hash-join-dynamic-filters and see if the numbers are even better now! |
I re-ran the benchmark( |
|
Thank you @UBarney! |
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The previous implementation of `HashTableLookupExpr::evaluate` relied on per-row calls to `get_matched_indices`, which incurred unnecessary performance overhead: 1. **Memory Overhead**: Each per-row call triggered small `Vec` allocations and potential resizes, leading to pressure on the memory allocator. 2. **Redundant Computation**: `get_matched_indices` traverses the entire hash chain to find all matches, which is unnecessary when we only need to verify the existence of a key. ### Performance Results (TPC-H) The following TPC-H results were obtained with **`DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true`:** ``` ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ baseline@9a9ff ┃ optimized ┃ Change ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 679.51 ms │ 728.06 ms │ 1.07x slower │ │ QQuery 2 │ 388.33 ms │ 384.11 ms │ no change │ │ QQuery 3 │ 864.38 ms │ 856.27 ms │ no change │ │ QQuery 4 │ 458.46 ms │ 468.26 ms │ no change │ │ QQuery 5 │ 1614.26 ms │ 1525.65 ms │ +1.06x faster │ │ QQuery 6 │ 611.20 ms │ 610.06 ms │ no change │ │ QQuery 7 │ 950.39 ms │ 940.13 ms │ no change │ │ QQuery 8 │ 1214.86 ms │ 1218.21 ms │ no change │ │ QQuery 9 │ 2657.61 ms │ 2482.09 ms │ +1.07x faster │ │ QQuery 10 │ 1050.70 ms │ 1001.96 ms │ no change │ │ QQuery 11 │ 383.92 ms │ 347.27 ms │ +1.11x faster │ │ QQuery 12 │ 963.14 ms │ 920.78 ms │ no change │ │ QQuery 13 │ 473.68 ms │ 480.97 ms │ no change │ │ QQuery 14 │ 363.36 ms │ 345.27 ms │ no change │ │ QQuery 15 │ 960.56 ms │ 955.05 ms │ no change │ │ QQuery 16 │ 281.95 ms │ 267.34 ms │ +1.05x faster │ │ QQuery 17 │ 5306.43 ms │ 4983.21 ms │ +1.06x faster │ │ QQuery 18 │ 3415.11 ms │ 3016.52 ms │ +1.13x faster │ │ QQuery 19 │ 761.67 ms │ 759.49 ms │ no change │ │ QQuery 20 │ 650.20 ms │ 642.40 ms │ no change │ │ QQuery 21 │ 3111.85 ms │ 2833.05 ms │ +1.10x faster │ │ QQuery 22 │ 141.75 ms │ 143.06 ms │ no change │ └──────────────┴────────────────┴────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩ │ Total Time (baseline@9a9ff) │ 27303.30ms │ │ Total Time (optimized) │ 25909.21ms │ │ Average Time (baseline@9a9ff) │ 1241.06ms │ │ Average Time (optimized) │ 1177.69ms │ │ Queries Faster │ 7 │ │ Queries Slower │ 1 │ │ Queries with No Change │ 14 │ │ Queries with Failure │ 0 │ └───────────────────────────────┴────────────┘ ``` Note that Q1 does not involve `HashJoin`. #### Note on Configuration Benchmarks were conducted with `DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true` because `HashTableLookupExpr::evaluate` is **NOT** invoked under default settings. I manually added `dbg!(&num_rows)` at [L335 in `partitioned_hash_eval.rs`](https://github.com/apache/datafusion/blob/9a9ff8d6162b7391736b0b7c82c00cb35b0652a1/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs#L335) and confirmed that the logic path is only triggered when this flag is enabled. Under default settings, `HashTableLookupExpr::evaluate` is not called; . I am uncertain if this current behavior is intentional. ## What changes are included in this PR? - Added `JoinHashMapType::contain_hashes`: A new trait method that processes a batch of hashes and updates a bitmask for existing keys. - Refactored `HashTableLookupExpr::evaluate`: Switched from per-row lookups to the new batch API. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? NO <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
Which issue does this PR close?
Rationale for this change
The previous implementation of
HashTableLookupExpr::evaluaterelied on per-row calls toget_matched_indices, which incurred unnecessary performance overhead:Vecallocations and potential resizes, leading to pressure on the memory allocator.get_matched_indicestraverses the entire hash chain to find all matches, which is unnecessary when we only need to verify the existence of a key.Performance Results (TPC-H)
The following TPC-H results were obtained with
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true:Note that Q1 does not involve
HashJoin.Note on Configuration
Benchmarks were conducted with
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=truebecauseHashTableLookupExpr::evaluateis NOT invoked under default settings.I manually added
dbg!(&num_rows)at L335 inpartitioned_hash_eval.rsand confirmed that the logic path is only triggered when this flag is enabled. Under default settings,HashTableLookupExpr::evaluateis not called; . I am uncertain if this current behavior is intentional.What changes are included in this PR?
JoinHashMapType::contain_hashes: A new trait method that processesa batch of hashes and updates a bitmask for existing keys.
HashTableLookupExpr::evaluate: Switched from per-row lookups tothe new batch API.
Are these changes tested?
Yes
Are there any user-facing changes?
NO