Filter pushdown dynamic bytes/morsels + FileStream dynamic work scheduling#8
Filter pushdown dynamic bytes/morsels + FileStream dynamic work scheduling#8
Conversation
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (5d6566a) to afc0784 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (5d6566a) to afc0784 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (5d6566a) to afc0784 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
…1074) ## Which issue does this PR close? - Closes apache#18092. ## Rationale for this change 1. Migrating to the modern TypeSignature API: [264030c/datafusion/expr-common/src/signature.rs](https://github.com/apache/datafusion/blob/264030cca76d0bdb4d8809f252b422e72624a345/datafusion/expr-common/src/signature.rs) 2. Coercing types of `approx_percentile_cont`, `approx_percentile_cont_with_weight`, `approx_median` to floats. It matches PostgreSQL, DuckDB, and ClickHouse behaviour, except for Spark. ## What changes are included in this PR? - Port remaining UDFs (approx_percentile_cont, approx_percentile_cont_with_weight, approx_median, stub functions) to signature APIs - Deprecate INTEGERS and NUMERICS arrays in favour of using the TypeSignature API - They are not removed yet, but marked as deprecated to avoid breaking downstream - Fix up a SLT for approx_percentile_cont, approx_median to make sure it returns a float ## Are these changes tested? - Tests are passing - Updated tests to expect floats in return types ## Are there any user-facing changes? - Signatures of `approx_percentile_cont`, `approx_percentile_cont_with_weight`, `approx_median` changed, so they now return floats instead of integers (as seen in tests)
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (5d6566a) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (5d6566a) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (5d6566a) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
…encies (apache#21502) ## Which issue does this PR close? - Closes #apache#21501. ## Rationale for this change Running tests for `datafusion-expr` currently fails locally, but due to feature unification with the `subtrait` dependency it does pass in CI. ## What changes are included in this PR? Enables a feature for a dependency during tests. ## Are these changes tested? Ran tests for the crate directly. ## Are there any user-facing changes? None. --------- Signed-off-by: Adam Gutglick <adamgsal@gmail.com>
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#21709 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> The `keep_only_maxrows` function panics on wide tables, format string treats the spaces variable as an index and the new code repeats the space character directly. This prevents the format argument error. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - replaces the format string in `keep_only_maxrows`. - adds `print_maxrows_limited_wide_table` unit test. - adds `test_cli_wide_result_set_no_crash` integration test. - adds a wide result set snapshot file. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes. I added unit and integration tests. The unit test passes a ten column schema to `keep_only_maxrows` and the integration test runs a wide query with a small row limit. These tests verify your format bug fix. ## Are there any user-facing changes? No <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (65f0aab) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (65f0aab) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (65f0aab) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
## Which issue does this PR close? - Closes apache#2547 ## Rationale for this change Related with apache#20830 all operator does not support operators above. ## What changes are included in this PR? Adds support for other expressions and add tests This is a question actually I've checked behaviors of Postgresql and Duckdb about null semantics and continued with the Postgresql behavior. However, I'm not sure if we want this so also put Duckdb outputs. It would be great to have feedback on this | Query | PostgreSQL | This PR | DuckDB | |---|---|---|---| | `5 = ALL(NULL::INT[])` | `NULL` | `NULL` | `true` | | `5 <> ALL(NULL::INT[])` | `NULL` | `NULL` | `true` | | `5 > ALL(NULL::INT[])` | `NULL` | `NULL` | `true` | | `5 < ALL(NULL::INT[])` | `NULL` | `NULL` | `true` | | `5 >= ALL(NULL::INT[])` | `NULL` | `NULL` | `true` | | `5 <= ALL(NULL::INT[])` | `NULL` | `NULL` | `true` | ## Are these changes tested? Added slt tests for this and they all pass ## Are there any user-facing changes? Yes user's can now use all operator with this new expressions --------- Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com>
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing filter-pushdown-dynamic-bytes-morsels (92c6a24) to afc0784 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Clarifying a function with more comments ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
## Which issue does this PR close? - Closes apache#20529 - Closes apache#20820 ## Rationale for this change This PR finally enables dynamic work scheduling in the FileStream (so that if a task is done it can look at any remaining work) This improves performance on queries that scan multiple files and the work is not balanced evenly across partitions in the plan (e.g. we have dynamic filtering that reduces work significantly) It is the last of a sequence of several PRs: - apache#21342 - apache#21327 - apache#21340 ## What changes are included in this PR? 1. Add shared state across sibling FileStream's and the wiring to connect them 2. Sibling streams put their file work into a shared queue when it can be reordered 3. Add a bunch of tests sjpw Note there are a bunch of other things that are NOT included in this PR, including 1. Trying to limit concurrent IO (this PR has the same properties as main -- up to one outstanding IO per partition) 2. Trying to issue multiple IOs by the same partition (aka to interleave IO and CPU work more) 4. Splitting files into smaller units (e.g. across row groups) As @Dandandan proposes below, I expect we can work on those changes as follow on PRs. ## Are these changes tested? Yes by existing functional and benchmark tests, as well as new functional tests ## Are there any user-facing changes? Yes, faster performance (see benchmarks): apache#21351 (comment) --------- Co-authored-by: Oleks V <comphead@users.noreply.github.com>
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Related to apache#21635. ## Rationale for this change Upon releasing 53.0.0 and 53.1.0 I found some steps not clear or misleading, I tried to clarify steps based on most recent experience <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
|
run benchmark tpch |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (92c6a24) to main diff using: tpch File an issue against this benchmark runner |
|
run benchmarks baseline:
ref: main
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: false
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS: true
changed:
ref: HEAD
env:
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS: true |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (66a6808) to main diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (66a6808) to main diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing HEAD (66a6808) to main diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Introduces a runtime adaptive filter selectivity tracking system for Parquet pushdown. Each filter is monitored with Welford online stats and moves through a state machine: New -> RowFilter|PostScan -> (promoted / demoted / dropped). Key changes: - New selectivity.rs module (SelectivityTracker, TrackerConfig, SelectivityStats, FilterState, PartitionedFilters, FilterId). - New OptionalFilterPhysicalExpr wrapper in physical_expr_common. HashJoinExec wraps dynamic join filters in it. - Removes reorder_filters config + supporting code. - Adds filter_pushdown_min_bytes_per_sec, filter_collecting_byte_ratio_threshold, filter_confidence_z config. - Predicate storage: Option<Arc<PhysicalExpr>> -> Option<Vec<(FilterId, Arc<PhysicalExpr>)>> on ParquetSource/ParquetOpener. - build_row_filter takes Vec<(FilterId,...)> + SelectivityTracker, returns RowFilterWithMetrics. DatafusionArrowPredicate reports per-batch stats back to the tracker. - ParquetOpener calls tracker.partition_filters() and apply_post_scan_filters_with_stats; records filter_apply_time. - Proto reserves tag 6 (was reorder_filters); adds 3 new optional fields. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Optional/dynamic filters from hash-join build sides were unconditionally placed as PostScan on first encounter, losing late-materialization benefits even when filter columns were small relative to the projection. With few file splits opened in parallel, the tracker rarely accumulated enough samples to promote them mid-query. Apply the same byte_ratio_threshold heuristic used for static filters. The CI lower-bound promotion and CI upper-bound demotion paths still apply, including Drop for ineffective optional filters. Local TPC-DS sf1 (M-series, pushdown_filters=true): | Query | Main | Branch | Branch+Fix | |-------|-------|--------|------------| | Q24 | 72 | 452 | 70 | | Q17 | 124 | 212 | 121 | | Q25 | 182 | 379 | 203 | | Q29 | 152 | 312 | 145 | | Q7 | 224 | 297 | 220 | | Q58 | 129 | 191 | 133 | | Q64 | 28213 | 672 | 578 | | Q9 | 228 | 96 | 87 | | Q76 | 172 | 105 | 156 | Q76 regresses slightly vs the no-fix branch (CASE/hash_lookup is CPU- heavy at row level) but still beats main. Also updates dynamic_filter_pushdown_config.slt to match the Optional(DynamicFilter ...) display introduced earlier in the branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
After moving optional filters to RowFilter via byte_ratio, queries with
1-row-group-per-file inputs (e.g. TPC-DS) had no chance to demote when
the chosen filter turned out to be CPU-dominated and ineffective:
partition_filters runs once per file open, all 12 split openers fire in
parallel and see no stats, and the existing Demote/Drop branches never
re-trigger for the lifetime of the scan.
Add a per-FilterId Arc<AtomicBool> "skip flag" owned by
SelectivityTracker. Once a filter has accumulated enough samples and its
CI upper bound on bytes-per-second falls below min_bytes_per_sec, the
hot per-batch update() path flips the flag — but only for filters
recorded as optional at first encounter (mandatory filters must always
execute or the result set changes).
Both consumers honour it:
* DatafusionArrowPredicate::evaluate returns an all-true mask without
invoking physical_expr (filter columns are still decoded; CPU is
reclaimed but I/O is not, pending arrow-rs API).
* apply_post_scan_filters_with_stats `continue`s past the filter,
skipping evaluation and the per-batch tracker.update.
Local TPC-DS sf1 (M-series, pushdown_filters=true), worst regressors
from main pushdown=off baseline:
| Query | Main(off) | Branch(byte-ratio) | +skip-flag |
|-------|-----------|--------------------|------------|
| Q72 | 619 | 554 | 261 |
| Q50 | 221 | 521 | 135 |
| Q23 | 892 | 1217 | 680 |
| Q67 | 310 | 510 | 306 |
| Q18 | 128 | 312 | 178 |
| Q13 | 399 | 558 | 363 |
| Q53 | 103 | 167 | 93 |
| Q63 | 106 | 173 | 93 |
| Q76 | 132 | 268 | 105 |
Q24-class wins are unaffected (Q24 holds at 70 ms vs 379 ms on main).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Q18 (and several other TPC-DS regressions) had a post-scan customer_demographics filter — `Optional(DynamicFilter)` on the single projected column `cd_demo_sk` — that burned 90 ms of CPU per scan but could never be skipped. The filter was correctly placed at PostScan (projection ⊆ filter columns ⇒ byte_ratio = 1.0 > threshold) but the mid-stream skip path never fired. Root cause: `SelectivityStats::update` only incremented `sample_count` when `batch_bytes > 0`. When the projection is a subset of the filter columns, `other_bytes_per_row = 0` and therefore `batch_bytes = 0` on every call, so the Welford counter stayed at zero, the CI upper bound stayed `None`, and the skip check short-circuited. Meanwhile the filter kept running per batch. Admit samples with `batch_bytes = 0`. The recorded effectiveness for those samples is legitimately zero (no late-materialization payoff), so the CI upper bound converges on zero after a few batches and the skip flag flips for optional filters — exactly what we want: CPU spent, no byte savings, optional ⇒ drop. Local TPC-DS sf1 (M-series, pushdown=on) vs main pushdown=off: | Query | Main(off) | Before | After | |-------|-----------|--------|-------| | Q18 | 99 | 182 | 118 | | Q67 | 312 | 503 | 346 | | Q26 | 80 | 151 | 94 | | Q85 | 149 | 246 | 157 | | Q91 | 64 | 108 | 58 | | Q53 | 103 | 144 | 99 | | Q63 | 103 | 148 | 99 | | Q13 | 399 | 558 | 376 | | Q72 | 619 | 489 | 277 | | Q24 | 379 | 70 | 70 | | Q64 | 28213 | -- | 519 | Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
66a6808 to
487e56b
Compare
|
run benchmark tpch |
The OptionalFilterPhysicalExpr wrapper introduced for optional dynamic filters changes two things observable in tests: 1. Display format: `DynamicFilter [...]` becomes `Optional(DynamicFilter [...])` when wrapped. Update insta snapshots in filter_pushdown.rs and plan strings in joins.slt / push_down_filter_parquet.slt. 2. Adaptive tracker placement: new filters flow through the byte-ratio heuristic and optional filters can be skipped mid-stream. This zeroes pushdown metrics for cases where the tracker chose PostScan or dropped the filter. Row group / page pruning still runs via the pruning predicate, so output rows are unchanged. Also fix test_discover_dynamic_filters_via_expressions_api to walk each expression subtree so the inner DynamicFilterPhysicalExpr is still found when wrapped in OptionalFilterPhysicalExpr. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
alamb/reschedule_io, "Dynamic work scheduling in FileStream") into this branchdatafusion/datasource/src/source.rsagainst the recentas_anyremoval (commit 03b390d), retaining the newOpenArgs/open_with_args/create_sibling_stateAPIs atop theDataSource: Anysupertrait formuse std::any::Any;infile_scan_config/mod.rsso the incoming code still compiles after theas_anytrait method was removed upstreamTest plan
cargo check --workspace --all-targetscargo clippy -p datafusion-datasource --all-targets --all-features -- -D warnings./dev/rust_lint.sh🤖 Generated with Claude Code