Skip to content
Draft
597 changes: 561 additions & 36 deletions datafusion/datasource-parquet/src/opener.rs

Large diffs are not rendered by default.

53 changes: 41 additions & 12 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl ArrowPredicate for DatafusionArrowPredicate {
/// of evaluating the resulting expression.
///
/// See the module level documentation for more information.
#[derive(Clone)]
pub(crate) struct FilterCandidate {
expr: Arc<dyn PhysicalExpr>,
/// Estimate for the total number of bytes that will need to be processed
Expand Down Expand Up @@ -1019,10 +1020,28 @@ pub fn build_row_filter(
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowFilter>> {
let rows_pruned = &file_metrics.pushdown_rows_pruned;
let rows_matched = &file_metrics.pushdown_rows_matched;
let time = &file_metrics.row_pushdown_eval_time;
let Some(candidates) =
build_row_filter_candidates(expr, file_schema, metadata, reorder_predicates)?
else {
return Ok(None);
};
row_filter_from_candidates(&candidates, file_metrics).map(Some)
}

/// Expensive, metrics-free first phase of row-filter construction.
///
/// Splits the predicate into conjuncts, resolves each one against the file
/// schema and parquet metadata (building [`ProjectionMask`]s and cost
/// estimates), and optionally reorders candidates by estimated cost. The
/// result is the same for every open of a given (predicate, file_schema,
/// file_metadata) triple, so a donor may build it once and share it with
/// sibling stealers.
pub(crate) fn build_row_filter_candidates(
expr: &Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
metadata: &ParquetMetaData,
reorder_predicates: bool,
) -> Result<Option<Vec<FilterCandidate>>> {
// Split into conjuncts:
// `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`]
let predicates = split_conjunction(expr);
Expand All @@ -1039,14 +1058,29 @@ pub fn build_row_filter(
.flatten()
.collect();

// no candidates
if candidates.is_empty() {
return Ok(None);
}

if reorder_predicates {
candidates.sort_unstable_by_key(|c| c.required_bytes);
}
Ok(Some(candidates))
}

/// Cheap per-open second phase: wire each candidate up with the current
/// file's row-filter metrics.
///
/// Called separately from [`build_row_filter_candidates`] so that the
/// expensive candidate construction can be shared across sibling opens
/// of the same file.
pub(crate) fn row_filter_from_candidates(
candidates: &[FilterCandidate],
file_metrics: &ParquetFileMetrics,
) -> Result<RowFilter> {
let rows_pruned = &file_metrics.pushdown_rows_pruned;
let rows_matched = &file_metrics.pushdown_rows_matched;
let time = &file_metrics.row_pushdown_eval_time;

// To avoid double-counting metrics when multiple predicates are used:
// - All predicates should count rows_pruned (cumulative pruned rows)
Expand All @@ -1055,31 +1089,26 @@ pub fn build_row_filter(
let total_candidates = candidates.len();

candidates
.into_iter()
.iter()
.enumerate()
.map(|(idx, candidate)| {
let is_last = idx == total_candidates - 1;

// All predicates share the pruned counter (cumulative)
let predicate_rows_pruned = rows_pruned.clone();

// Only the last predicate tracks matched rows (final result)
let predicate_rows_matched = if is_last {
rows_matched.clone()
} else {
metrics::Count::new()
};

DatafusionArrowPredicate::try_new(
candidate,
candidate.clone(),
predicate_rows_pruned,
predicate_rows_matched,
time.clone(),
)
.map(|pred| Box::new(pred) as _)
})
.collect::<Result<Vec<_>, _>>()
.map(|filters| Some(RowFilter::new(filters)))
.map(RowFilter::new)
}

#[cfg(test)]
Expand Down
23 changes: 12 additions & 11 deletions datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ use parquet::data_type::Decimal;
use parquet::schema::types::SchemaDescriptor;
use parquet::{bloom_filter::Sbbf, file::metadata::RowGroupMetaData};

/// Starting byte offset of a row group in its parquet file.
///
/// Uses the first column's dictionary page offset when present, otherwise its
/// data page offset — intentionally *not* the metadata location, per
/// <https://github.com/apache/datafusion/issues/5995>.
pub fn row_group_start_offset(metadata: &RowGroupMetaData) -> i64 {
let col = metadata.column(0);
col.dictionary_page_offset()
.unwrap_or_else(|| col.data_page_offset())
}

/// Reduces the [`ParquetAccessPlan`] based on row group level metadata.
///
/// This struct implements the various types of pruning that are applied to a
Expand Down Expand Up @@ -224,17 +235,7 @@ impl RowGroupAccessPlanFilter {
if !self.access_plan.should_scan(idx) {
continue;
}

// Skip the row group if the first dictionary/data page are not
// within the range.
//
// note don't use the location of metadata
// <https://github.com/apache/datafusion/issues/5995>
let col = metadata.column(0);
let offset = col
.dictionary_page_offset()
.unwrap_or_else(|| col.data_page_offset());
if !range.contains(offset) {
if !range.contains(row_group_start_offset(metadata)) {
self.access_plan.skip(idx);
}
}
Expand Down
9 changes: 8 additions & 1 deletion datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion_common::config::ConfigOptions;
#[cfg(feature = "parquet_encryption")]
use datafusion_common::config::EncryptionFactoryOptions;
use datafusion_datasource::as_file_source;
use datafusion_datasource::file_stream::FileOpener;
use datafusion_datasource::file_stream::{FileOpener, SharedWorkSource};
use datafusion_datasource::morsel::Morselizer;

use arrow::datatypes::TimeUnit;
Expand Down Expand Up @@ -526,6 +526,7 @@ impl FileSource for ParquetSource {
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
shared_work_source: Option<SharedWorkSource>,
) -> datafusion_common::Result<Box<dyn Morselizer>> {
let expr_adapter_factory = base_config
.expr_adapter_factory
Expand Down Expand Up @@ -553,6 +554,10 @@ impl FileSource for ParquetSource {
.as_ref()
.map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());

let output_schema = Arc::new(
self.projection
.project_schema(self.table_schema.table_schema())?,
);
Ok(Box::new(ParquetMorselizer {
partition_index: partition,
projection: self.projection.clone(),
Expand Down Expand Up @@ -580,6 +585,8 @@ impl FileSource for ParquetSource {
encryption_factory: self.get_encryption_factory_with_config(),
max_predicate_cache_size: self.max_predicate_cache_size(),
reverse_row_groups: self.reverse_row_groups,
shared_work_source,
output_schema,
}))
}

Expand Down
9 changes: 8 additions & 1 deletion datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;

use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use crate::file_stream::{FileOpener, SharedWorkSource};
use crate::morsel::{FileOpenerMorselizer, Morselizer};
#[expect(deprecated)]
use crate::schema_adapter::SchemaAdapterFactory;
Expand Down Expand Up @@ -82,11 +82,18 @@ pub trait FileSource: Any + Send + Sync {
///
/// It is preferred to implement the [`Morselizer`] API directly by
/// implementing this method.
///
/// `shared_work_source`, when `Some`, is the queue of unopened files
/// shared across sibling streams. File sources that can sub-divide a
/// single file into smaller stealable work units (e.g. parquet row-group
/// splitting) may push donated chunks onto it; sources that cannot simply
/// ignore the parameter.
fn create_morselizer(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
_shared_work_source: Option<SharedWorkSource>,
) -> Result<Box<dyn Morselizer>> {
let opener = self.create_file_opener(object_store, base_config, partition)?;
Ok(Box::new(FileOpenerMorselizer::new(opener)))
Expand Down
9 changes: 7 additions & 2 deletions datafusion/datasource/src/file_scan_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,6 @@ impl DataSource for FileScanConfig {

let source = self.file_source.with_batch_size(batch_size);

let morselizer = source.create_morselizer(object_store, self, partition)?;

// Extract the shared work source from the sibling state if it exists.
// This allows multiple sibling streams to steal work from a single
// shared queue of unopened files.
Expand All @@ -607,6 +605,13 @@ impl DataSource for FileScanConfig {
.and_then(|state| state.downcast_ref::<SharedWorkSource>())
.cloned();

let morselizer = source.create_morselizer(
object_store,
self,
partition,
shared_work_source.clone(),
)?;

let stream = FileStreamBuilder::new(self)
.with_partition(partition)
.with_shared_work_source(shared_work_source)
Expand Down
1 change: 1 addition & 0 deletions datafusion/datasource/src/file_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use self::scan_state::{ScanAndReturn, ScanState};

pub use builder::FileStreamBuilder;
pub use metrics::{FileStreamMetrics, StartableTime};
pub use work_source::SharedWorkSource;

/// A stream that iterates record batch by record batch, file over file.
pub struct FileStream {
Expand Down
35 changes: 31 additions & 4 deletions datafusion/datasource/src/file_stream/scan_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion_physical_plan::metrics::ScopedTimerGuard;
use futures::stream::BoxStream;
use futures::{FutureExt as _, StreamExt as _};

use super::work_source::WorkSource;
use super::work_source::{FileLease, PopResult, WorkSource};
use super::{FileStreamMetrics, OnError};

/// State [`FileStreamState::Scan`].
Expand Down Expand Up @@ -81,6 +81,12 @@ pub(super) struct ScanState {
/// Once the I/O completes, yields the next planner and is pushed back
/// onto `ready_planners`.
pending_planner: Option<PendingMorselPlanner>,
/// Lease on the current file popped from a shared work source. While
/// held, idle siblings will wait for potential donations from this
/// file instead of declaring the shared source drained. `None` for
/// files that came from a local work source or for pre-finalized
/// morsels.
current_file_lease: Option<FileLease>,
/// Metrics for the active scan queues.
metrics: FileStreamMetrics,
}
Expand All @@ -102,6 +108,7 @@ impl ScanState {
ready_morsels: Default::default(),
reader: None,
pending_planner: None,
current_file_lease: None,
metrics,
}
}
Expand Down Expand Up @@ -146,6 +153,7 @@ impl ScanState {
return match self.on_error {
OnError::Skip => {
self.metrics.files_processed.add(1);
self.current_file_lease = None;
ScanAndReturn::Continue
}
OnError::Fail => ScanAndReturn::Error(err),
Expand Down Expand Up @@ -174,6 +182,7 @@ impl ScanState {
let batch = batch.slice(0, *remain);
let done = 1 + self.work_source.skipped_on_limit();
self.metrics.files_processed.add(done);
self.current_file_lease = None;
*remain = 0;
(batch, true)
}
Expand All @@ -197,6 +206,7 @@ impl ScanState {
return match self.on_error {
OnError::Skip => {
self.metrics.files_processed.add(1);
self.current_file_lease = None;
ScanAndReturn::Continue
}
OnError::Fail => ScanAndReturn::Error(err),
Expand All @@ -205,6 +215,7 @@ impl ScanState {
Poll::Ready(None) => {
self.reader = None;
self.metrics.files_processed.add(1);
self.current_file_lease = None;
self.metrics.time_scanning_until_data.stop();
self.metrics.time_scanning_total.stop();
return ScanAndReturn::Continue;
Expand All @@ -218,6 +229,11 @@ impl ScanState {
self.metrics.time_scanning_until_data.start();
self.metrics.time_scanning_total.start();
self.reader = Some(morsel.into_stream());
// A morsel is now streaming, so we're past the pre-scan window
// where donations happen. Release the in-flight donor slot so
// idle siblings can make progress (or exit) without waiting on
// this stream to finish its assigned row groups.
self.current_file_lease = None;
return ScanAndReturn::Continue;
}

Expand Down Expand Up @@ -248,6 +264,7 @@ impl ScanState {
}
Ok(None) => {
self.metrics.files_processed.add(1);
self.current_file_lease = None;
self.metrics.time_opening.stop();
ScanAndReturn::Continue
}
Expand All @@ -257,6 +274,7 @@ impl ScanState {
match self.on_error {
OnError::Skip => {
self.metrics.files_processed.add(1);
self.current_file_lease = None;
ScanAndReturn::Continue
}
OnError::Fail => ScanAndReturn::Error(err),
Expand All @@ -266,10 +284,18 @@ impl ScanState {
}

// No outstanding work remains, so begin planning the next unopened file.
let part_file = match self.work_source.pop_front() {
Some(part_file) => part_file,
None => return ScanAndReturn::Done(None),
let (part_file, lease) = match self.work_source.pop_front() {
PopResult::Ready(file, lease) => (file, lease),
PopResult::Pending => {
// A sibling is pre-scan on a shared file that may still
// donate morsels. Re-schedule ourselves so we re-check the
// queues as soon as the scheduler picks us up.
cx.waker().wake_by_ref();
return ScanAndReturn::Return(Poll::Pending);
}
PopResult::Done => return ScanAndReturn::Done(None),
};
self.current_file_lease = lease;

self.metrics.time_opening.start();
match self.morselizer.plan_file(part_file) {
Expand All @@ -283,6 +309,7 @@ impl ScanState {
self.metrics.file_open_errors.add(1);
self.metrics.time_opening.stop();
self.metrics.files_processed.add(1);
self.current_file_lease = None;
ScanAndReturn::Continue
}
OnError::Fail => ScanAndReturn::Error(err),
Expand Down
Loading
Loading