Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod tests {
let predicate = self
.predicate
.as_ref()
.map(|p| logical2physical(p, &table_schema));
.map(|p| logical2physical(p, Arc::clone(&table_schema)));

let mut source = ParquetSource::new(table_schema);
if let Some(predicate) = predicate {
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1283,17 +1283,15 @@ impl DefaultPhysicalPlanner {
let (left, left_col_keys, left_projected) =
wrap_projection_for_join_if_necessary(
&left_keys,
original_left.as_ref().clone(),
Arc::clone(original_left),
)?;
let (right, right_col_keys, right_projected) =
wrap_projection_for_join_if_necessary(
&right_keys,
original_right.as_ref().clone(),
Arc::clone(original_right),
)?;
let column_on = (left_col_keys, right_col_keys);

let left = Arc::new(left);
let right = Arc::new(right);
let column_on = (left_col_keys, right_col_keys);
let (new_join, requalified) = Join::try_new_with_project_input(
node,
Arc::clone(&left),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn parquet_nested_filter_pushdown(c: &mut Criterion) {

group.bench_function("no_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&create_predicate(), &file_schema);
let predicate = logical2physical(&create_predicate(), file_schema);
b.iter(|| {
let matched = scan_with_predicate(&dataset_path, &predicate, false)
.expect("baseline parquet scan with filter succeeded");
Expand All @@ -75,7 +75,7 @@ fn parquet_nested_filter_pushdown(c: &mut Criterion) {

group.bench_function("with_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&create_predicate(), &file_schema);
let predicate = logical2physical(&create_predicate(), file_schema);
b.iter(|| {
let matched = scan_with_predicate(&dataset_path, &predicate, true)
.expect("pushdown parquet scan with filter succeeded");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn parquet_struct_filter_pushdown(c: &mut Criterion) {
// Scenario 1: SELECT * FROM t WHERE get_field(s, 'id') = 42
group.bench_function("select_star/no_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_id_eq_literal(), &file_schema);
let predicate = logical2physical(&struct_id_eq_literal(), file_schema);
b.iter(|| {
let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all())
.expect("scan succeeded");
Expand All @@ -103,7 +103,7 @@ fn parquet_struct_filter_pushdown(c: &mut Criterion) {

group.bench_function("select_star/with_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_id_eq_literal(), &file_schema);
let predicate = logical2physical(&struct_id_eq_literal(), file_schema);
b.iter(|| {
let matched = scan(&dataset_path, &predicate, true, ProjectionMask::all())
.expect("scan succeeded");
Expand All @@ -114,7 +114,7 @@ fn parquet_struct_filter_pushdown(c: &mut Criterion) {
// Scenario 2: SELECT * FROM t WHERE get_field(s, 'id') = id
group.bench_function("select_star_cross_col/no_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_id_eq_top_id(), &file_schema);
let predicate = logical2physical(&struct_id_eq_top_id(), file_schema);
b.iter(|| {
let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all())
.expect("scan succeeded");
Expand All @@ -124,7 +124,7 @@ fn parquet_struct_filter_pushdown(c: &mut Criterion) {

group.bench_function("select_star_cross_col/with_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_id_eq_top_id(), &file_schema);
let predicate = logical2physical(&struct_id_eq_top_id(), file_schema);
b.iter(|| {
let matched = scan(&dataset_path, &predicate, true, ProjectionMask::all())
.expect("scan succeeded");
Expand All @@ -135,7 +135,7 @@ fn parquet_struct_filter_pushdown(c: &mut Criterion) {
// Scenario 3: SELECT id FROM t WHERE get_field(s, 'id') = 42
group.bench_function("select_id/no_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_id_eq_literal(), &file_schema);
let predicate = logical2physical(&struct_id_eq_literal(), file_schema);
b.iter(|| {
// Without pushdown we must read all columns to evaluate the predicate.
let matched = scan(&dataset_path, &predicate, false, ProjectionMask::all())
Expand All @@ -146,7 +146,7 @@ fn parquet_struct_filter_pushdown(c: &mut Criterion) {

group.bench_function("select_id/with_pushdown", |b| {
let file_schema = setup_reader(&dataset_path);
let predicate = logical2physical(&struct_id_eq_literal(), &file_schema);
let predicate = logical2physical(&struct_id_eq_literal(), file_schema);
let id_only = id_projection(&dataset_path);
b.iter(|| {
// With pushdown the filter runs first, then we only project `id`.
Expand Down
36 changes: 20 additions & 16 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ mod test {

// A filter on "a" should not exclude any rows even if it matches the data
let expr = col("a").eq(lit(1));
let predicate = logical2physical(&expr, &schema);
let predicate = logical2physical(&expr, Arc::clone(&schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1431,7 +1431,7 @@ mod test {

// A filter on `b = 5.0` should exclude all rows
let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
let predicate = logical2physical(&expr, &schema);
let predicate = logical2physical(&expr, Arc::clone(&schema));
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down Expand Up @@ -1477,7 +1477,8 @@ mod test {
let expr = col("part").eq(lit(1));
// Mark the expression as dynamic even if it's not to force partition pruning to happen
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let predicate =
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1488,7 +1489,7 @@ mod test {
let expr = col("part").eq(lit(2));
// Mark the expression as dynamic even if it's not to force partition pruning to happen
// Otherwise we assume it already happened at the planning stage and won't re-do the work here
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let predicate = make_dynamic_expr(logical2physical(&expr, table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down Expand Up @@ -1544,7 +1545,7 @@ mod test {

// Filter should match the partition value and file statistics
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1553,7 +1554,7 @@ mod test {

// Should prune based on partition value but not file statistics
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1562,7 +1563,7 @@ mod test {

// Should prune based on file statistics but not partition value
let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1571,7 +1572,7 @@ mod test {

// Should prune based on both partition value and file statistics
let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down Expand Up @@ -1617,7 +1618,7 @@ mod test {

// Filter should match the partition value and data value
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1626,7 +1627,7 @@ mod test {

// Filter should match the partition value but not the data value
let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1635,7 +1636,7 @@ mod test {

// Filter should not match the partition value but match the data value
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1644,7 +1645,7 @@ mod test {

// Filter should not match the partition value or the data value
let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, table_schema);
let opener = make_opener(predicate);
let stream = opener.open(file).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down Expand Up @@ -1697,7 +1698,7 @@ mod test {
// This filter could prune based on statistics, but since it's not dynamic it's not applied for pruning
// (the assumption is this happened already at planning time)
let expr = col("a").eq(lit(42));
let predicate = logical2physical(&expr, &table_schema);
let predicate = logical2physical(&expr, Arc::clone(&table_schema));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1706,7 +1707,8 @@ mod test {

// If we make the filter dynamic, it should prune.
// This allows dynamic filters to prune partitions/files even if they are populated late into execution.
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let predicate =
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1716,7 +1718,8 @@ mod test {
// If we have a filter that touches partition columns only and is dynamic, it should prune even if there are no stats.
file.statistics = Some(Arc::new(Statistics::new_unknown(&file_schema)));
let expr = col("part").eq(lit(2));
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let predicate =
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand All @@ -1725,7 +1728,8 @@ mod test {

// Similarly a filter that combines partition and data columns should prune even if there are no stats.
let expr = col("part").eq(lit(2)).and(col("a").eq(lit(42)));
let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
let predicate =
make_dynamic_expr(logical2physical(&expr, Arc::clone(&table_schema)));
let opener = make_opener(predicate);
let stream = opener.open(file.clone()).unwrap().await.unwrap();
let (num_batches, num_rows) = count_batches_and_rows(stream).await;
Expand Down
Loading
Loading