From db29d0c946af7e3a5a9ee590f091dadc69a62671 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Thu, 30 May 2024 15:51:10 -0400 Subject: [PATCH 01/14] Extending join fuzz tests to support join filtering --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 368 ++++++++++++------ .../building-logical-plans.md | 149 ------- 2 files changed, 251 insertions(+), 266 deletions(-) delete mode 100644 docs/source/library-user-guide/building-logical-plans.md diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 824f1eec4a853..88c4d0867fbbb 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -21,7 +21,12 @@ use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; -use arrow_schema::Schema; +use arrow_schema::{Field, Schema}; + +use datafusion_common::ScalarValue; +use datafusion_physical_expr::expressions::Literal; +use datafusion_physical_expr::PhysicalExprRef; + use rand::Rng; use datafusion::common::JoinSide; @@ -40,92 +45,142 @@ use test_utils::stagger_batch_with_seed; #[tokio::test] async fn test_inner_join_1k() { - run_join_test( + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::Inner, + None, + ) + .run_test() + .await +} + +fn less_than_10_join_filter(schema1: Arc, _schema2: Arc) -> JoinFilter { + let less_than_100 = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Lt, + Arc::new(Literal::new(ScalarValue::from(100))), + )) as _; + let column_indices = vec![ColumnIndex { + index: 0, + side: JoinSide::Left, + }]; + let intermediate_schema = + Schema::new(vec![schema1.field_with_name("a").unwrap().to_owned()]); + + JoinFilter::new(less_than_100, column_indices, intermediate_schema) +} + +#[tokio::test] +async fn test_inner_join_1k_filtered() { + JoinFuzzTestCase::new( make_staggered_batches(1000), make_staggered_batches(1000), JoinType::Inner, + Some(Box::new(less_than_10_join_filter)), ) + .run_test() + .await +} + +#[tokio::test] +async fn test_inner_join_1k_smjoin() { + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::Inner, + None, + ) + .run_test() .await } #[tokio::test] async fn test_left_join_1k() { - run_join_test( + JoinFuzzTestCase::new( make_staggered_batches(1000), make_staggered_batches(1000), JoinType::Left, + None, ) + .run_test() .await } #[tokio::test] async fn test_right_join_1k() { - run_join_test( + JoinFuzzTestCase::new( make_staggered_batches(1000), make_staggered_batches(1000), JoinType::Right, + None, ) + .run_test() .await } #[tokio::test] async fn test_full_join_1k() { - run_join_test( + JoinFuzzTestCase::new( make_staggered_batches(1000), make_staggered_batches(1000), JoinType::Full, + None, ) + .run_test() .await } #[tokio::test] async fn test_semi_join_1k() { - run_join_test( + JoinFuzzTestCase::new( make_staggered_batches(1000), make_staggered_batches(1000), JoinType::LeftSemi, + None, ) + .run_test() .await } #[tokio::test] async fn test_anti_join_1k() { - run_join_test( + JoinFuzzTestCase::new( make_staggered_batches(1000), make_staggered_batches(1000), JoinType::LeftAnti, + None, ) + .run_test() .await } -/// Perform sort-merge join and hash join on same input -/// and verify two outputs are equal -async fn run_join_test( +struct JoinFuzzTestCase { + batch_sizes: &'static [usize], input1: Vec, input2: Vec, join_type: JoinType, -) { - let batch_sizes = [1, 2, 7, 49, 50, 51, 100]; - for batch_size in batch_sizes { - let session_config = SessionConfig::new().with_batch_size(batch_size); - let ctx = SessionContext::new_with_config(session_config); - let task_ctx = ctx.task_ctx(); - - let schema1 = input1[0].schema(); - let schema2 = input2[0].schema(); - let on_columns = vec![ - ( - Arc::new(Column::new_with_schema("a", &schema1).unwrap()) as _, - Arc::new(Column::new_with_schema("a", &schema2).unwrap()) as _, - ), - ( - Arc::new(Column::new_with_schema("b", &schema1).unwrap()) as _, - Arc::new(Column::new_with_schema("b", &schema2).unwrap()) as _, - ), - ]; + join_filter_builder: Option, Arc) -> JoinFilter>>, +} - // Nested loop join uses filter for joining records - let column_indices = vec![ +impl JoinFuzzTestCase { + fn new( + input1: Vec, + input2: Vec, + join_type: JoinType, + join_filter_builder: Option, Arc) -> JoinFilter>>, + ) -> Self { + Self { + batch_sizes: &[1, 2, 7, 49, 50, 51, 100], + input1, + input2, + join_type, + join_filter_builder, + } + } + + fn column_indices(&self) -> Vec { + vec![ ColumnIndex { index: 0, side: JoinSide::Left, @@ -142,120 +197,199 @@ async fn run_join_test( index: 1, side: JoinSide::Right, }, - ]; - let intermediate_schema = Schema::new(vec![ + ] + } + + fn on_columns(&self) -> Vec<(PhysicalExprRef, PhysicalExprRef)> { + let schema1 = self.input1[0].schema(); + let schema2 = self.input2[0].schema(); + vec![ + ( + Arc::new(Column::new_with_schema("a", &schema1).unwrap()) as _, + Arc::new(Column::new_with_schema("a", &schema2).unwrap()) as _, + ), + ( + Arc::new(Column::new_with_schema("b", &schema1).unwrap()) as _, + Arc::new(Column::new_with_schema("b", &schema2).unwrap()) as _, + ), + ] + } + + fn intermediate_schema(&self) -> Schema { + let schema1 = self.input1[0].schema(); + let schema2 = self.input2[0].schema(); + Schema::new(vec![ schema1.field_with_name("a").unwrap().to_owned(), schema1.field_with_name("b").unwrap().to_owned(), schema2.field_with_name("a").unwrap().to_owned(), schema2.field_with_name("b").unwrap().to_owned(), - ]); - - let equal_a = Arc::new(BinaryExpr::new( - Arc::new(Column::new("a", 0)), - Operator::Eq, - Arc::new(Column::new("a", 2)), - )) as _; - let equal_b = Arc::new(BinaryExpr::new( - Arc::new(Column::new("b", 1)), - Operator::Eq, - Arc::new(Column::new("b", 3)), - )) as _; - let expression = Arc::new(BinaryExpr::new(equal_a, Operator::And, equal_b)) as _; - - let on_filter = JoinFilter::new(expression, column_indices, intermediate_schema); + ]) + } - // sort-merge join + fn left_right(&self) -> (Arc, Arc) { + let schema1 = self.input1[0].schema(); + let schema2 = self.input2[0].schema(); let left = Arc::new( - MemoryExec::try_new(&[input1.clone()], schema1.clone(), None).unwrap(), + MemoryExec::try_new(&[self.input1.clone()], schema1.clone(), None).unwrap(), ); let right = Arc::new( - MemoryExec::try_new(&[input2.clone()], schema2.clone(), None).unwrap(), + MemoryExec::try_new(&[self.input2.clone()], schema2.clone(), None).unwrap(), ); - let smj = Arc::new( + (left, right) + } + + fn join_filter(&self) -> Option { + let schema1 = self.input1[0].schema(); + let schema2 = self.input2[0].schema(); + self.join_filter_builder + .as_ref() + .map(|builder| builder(schema1, schema2)) + } + + fn sort_merge_join(&self) -> Arc { + let (left, right) = self.left_right(); + Arc::new( SortMergeJoinExec::try_new( left, right, - on_columns.clone(), - None, - join_type, + self.on_columns().clone(), + self.join_filter(), + self.join_type, vec![SortOptions::default(), SortOptions::default()], false, ) .unwrap(), - ); - let smj_collected = collect(smj, task_ctx.clone()).await.unwrap(); + ) + } - // hash join - let left = Arc::new( - MemoryExec::try_new(&[input1.clone()], schema1.clone(), None).unwrap(), - ); - let right = Arc::new( - MemoryExec::try_new(&[input2.clone()], schema2.clone(), None).unwrap(), - ); - let hj = Arc::new( + fn hash_join(&self) -> Arc { + let (left, right) = self.left_right(); + Arc::new( HashJoinExec::try_new( left, right, - on_columns.clone(), - None, - &join_type, + self.on_columns().clone(), + self.join_filter(), + &self.join_type, None, PartitionMode::Partitioned, false, ) .unwrap(), - ); - let hj_collected = collect(hj, task_ctx.clone()).await.unwrap(); + ) + } - // nested loop join - let left = Arc::new( - MemoryExec::try_new(&[input1.clone()], schema1.clone(), None).unwrap(), - ); - let right = Arc::new( - MemoryExec::try_new(&[input2.clone()], schema2.clone(), None).unwrap(), - ); - let nlj = Arc::new( - NestedLoopJoinExec::try_new(left, right, Some(on_filter), &join_type) + fn nested_loop_join(&self) -> Arc { + let (left, right) = self.left_right(); + // Nested loop join uses filter for joining records + let column_indices = self.column_indices(); + let intermediate_schema = self.intermediate_schema(); + + let equal_a = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Column::new("a", 2)), + )) as _; + let equal_b = Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Eq, + Arc::new(Column::new("b", 3)), + )) as _; + let expression = Arc::new(BinaryExpr::new(equal_a, Operator::And, equal_b)) as _; + + let on_filter = JoinFilter::new(expression, column_indices, intermediate_schema); + + let join_filter = match self.join_filter() { + None => on_filter, + Some(filter) => { + let expr = Arc::new(BinaryExpr::new( + filter.expression().clone(), + Operator::And, + on_filter.expression().clone(), + )) as _; + let mut column_indices = vec![]; + column_indices.extend_from_slice(filter.column_indices()); + column_indices.extend_from_slice(on_filter.column_indices()); + let mut fields: Vec = vec![]; + filter + .schema() + .fields + .into_iter() + .for_each(|field| fields.push(field.as_ref().clone())); + on_filter + .schema() + .fields + .into_iter() + .for_each(|field| fields.push(field.as_ref().clone())); + let schema = Schema::new(fields); + JoinFilter::new(expr, column_indices, schema) + } + }; + + Arc::new( + NestedLoopJoinExec::try_new(left, right, Some(join_filter), &self.join_type) .unwrap(), - ); - let nlj_collected = collect(nlj, task_ctx.clone()).await.unwrap(); - - // compare - let smj_formatted = pretty_format_batches(&smj_collected).unwrap().to_string(); - let hj_formatted = pretty_format_batches(&hj_collected).unwrap().to_string(); - let nlj_formatted = pretty_format_batches(&nlj_collected).unwrap().to_string(); - - let mut smj_formatted_sorted: Vec<&str> = smj_formatted.trim().lines().collect(); - smj_formatted_sorted.sort_unstable(); - - let mut hj_formatted_sorted: Vec<&str> = hj_formatted.trim().lines().collect(); - hj_formatted_sorted.sort_unstable(); - - let mut nlj_formatted_sorted: Vec<&str> = nlj_formatted.trim().lines().collect(); - nlj_formatted_sorted.sort_unstable(); - - for (i, (smj_line, hj_line)) in smj_formatted_sorted - .iter() - .zip(&hj_formatted_sorted) - .enumerate() - { - assert_eq!( - (i, smj_line), - (i, hj_line), - "SortMergeJoinExec and HashJoinExec produced different results" - ); - } + ) + } + + /// Perform sort-merge join and hash join on same input + /// and verify two outputs are equal + async fn run_test(&self) { + for batch_size in self.batch_sizes { + let session_config = SessionConfig::new().with_batch_size(*batch_size); + let ctx = SessionContext::new_with_config(session_config); + let task_ctx = ctx.task_ctx(); + let smj = self.sort_merge_join(); + let smj_collected = collect(smj, task_ctx.clone()).await.unwrap(); + + let hj = self.hash_join(); + let hj_collected = collect(hj, task_ctx.clone()).await.unwrap(); + + let nlj = self.nested_loop_join(); + let nlj_collected = collect(nlj, task_ctx.clone()).await.unwrap(); + + // compare + let smj_formatted = + pretty_format_batches(&smj_collected).unwrap().to_string(); + let hj_formatted = pretty_format_batches(&hj_collected).unwrap().to_string(); + let nlj_formatted = + pretty_format_batches(&nlj_collected).unwrap().to_string(); + + let mut smj_formatted_sorted: Vec<&str> = + smj_formatted.trim().lines().collect(); + smj_formatted_sorted.sort_unstable(); + + let mut hj_formatted_sorted: Vec<&str> = + hj_formatted.trim().lines().collect(); + hj_formatted_sorted.sort_unstable(); + + let mut nlj_formatted_sorted: Vec<&str> = + nlj_formatted.trim().lines().collect(); + nlj_formatted_sorted.sort_unstable(); + + for (i, (smj_line, hj_line)) in smj_formatted_sorted + .iter() + .zip(&hj_formatted_sorted) + .enumerate() + { + assert_eq!( + (i, smj_line), + (i, hj_line), + "SortMergeJoinExec and HashJoinExec produced different results" + ); + } - for (i, (nlj_line, hj_line)) in nlj_formatted_sorted - .iter() - .zip(&hj_formatted_sorted) - .enumerate() - { - assert_eq!( - (i, nlj_line), - (i, hj_line), - "NestedLoopJoinExec and HashJoinExec produced different results" - ); + for (i, (nlj_line, hj_line)) in nlj_formatted_sorted + .iter() + .zip(&hj_formatted_sorted) + .enumerate() + { + assert_eq!( + (i, nlj_line), + (i, hj_line), + "NestedLoopJoinExec and HashJoinExec produced different results" + ); + } } } } diff --git a/docs/source/library-user-guide/building-logical-plans.md b/docs/source/library-user-guide/building-logical-plans.md deleted file mode 100644 index fe922d8eaeb11..0000000000000 --- a/docs/source/library-user-guide/building-logical-plans.md +++ /dev/null @@ -1,149 +0,0 @@ - - -# Building Logical Plans - -A logical plan is a structured representation of a database query that describes the high-level operations and -transformations needed to retrieve data from a database or data source. It abstracts away specific implementation -details and focuses on the logical flow of the query, including operations like filtering, sorting, and joining tables. - -This logical plan serves as an intermediate step before generating an optimized physical execution plan. This is -explained in more detail in the [Query Planning and Execution Overview] section of the [Architecture Guide]. - -## Building Logical Plans Manually - -DataFusion's [LogicalPlan] is an enum containing variants representing all the supported operators, and also -contains an `Extension` variant that allows projects building on DataFusion to add custom logical operators. - -It is possible to create logical plans by directly creating instances of the [LogicalPlan] enum as follows, but is is -much easier to use the [LogicalPlanBuilder], which is described in the next section. - -Here is an example of building a logical plan directly: - - - -```rust -// create a logical table source -let schema = Schema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("name", DataType::Utf8, true), -]); -let table_source = LogicalTableSource::new(SchemaRef::new(schema)); - -// create a TableScan plan -let projection = None; // optional projection -let filters = vec![]; // optional filters to push down -let fetch = None; // optional LIMIT -let table_scan = LogicalPlan::TableScan(TableScan::try_new( - "person", - Arc::new(table_source), - projection, - filters, - fetch, -)?); - -// create a Filter plan that evaluates `id > 500` that wraps the TableScan -let filter_expr = col("id").gt(lit(500)); -let plan = LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(table_scan))?); - -// print the plan -println!("{}", plan.display_indent_schema()); -``` - -This example produces the following plan: - -``` -Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N] - TableScan: person [id:Int32;N, name:Utf8;N] -``` - -## Building Logical Plans with LogicalPlanBuilder - -DataFusion logical plans can be created using the [LogicalPlanBuilder] struct. There is also a [DataFrame] API which is -a higher-level API that delegates to [LogicalPlanBuilder]. - -The following associated functions can be used to create a new builder: - -- `empty` - create an empty plan with no fields -- `values` - create a plan from a set of literal values -- `scan` - create a plan representing a table scan -- `scan_with_filters` - create a plan representing a table scan with filters - -Once the builder is created, transformation methods can be called to declare that further operations should be -performed on the plan. Note that all we are doing at this stage is building up the logical plan structure. No query -execution will be performed. - -Here are some examples of transformation methods, but for a full list, refer to the [LogicalPlanBuilder] API documentation. - -- `filter` -- `limit` -- `sort` -- `distinct` -- `join` - -The following example demonstrates building the same simple query plan as the previous example, with a table scan followed by a filter. - - - -```rust -// create a logical table source -let schema = Schema::new(vec![ - Field::new("id", DataType::Int32, true), - Field::new("name", DataType::Utf8, true), -]); -let table_source = LogicalTableSource::new(SchemaRef::new(schema)); - -// optional projection -let projection = None; - -// create a LogicalPlanBuilder for a table scan -let builder = LogicalPlanBuilder::scan("person", Arc::new(table_source), projection)?; - -// perform a filter operation and build the plan -let plan = builder - .filter(col("id").gt(lit(500)))? // WHERE id > 500 - .build()?; - -// print the plan -println!("{}", plan.display_indent_schema()); -``` - -This example produces the following plan: - -``` -Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N] - TableScan: person [id:Int32;N, name:Utf8;N] -``` - -## Table Sources - -The previous example used a [LogicalTableSource], which is used for tests and documentation in DataFusion, and is also -suitable if you are using DataFusion to build logical plans but do not use DataFusion's physical planner. However, if you -want to use a [TableSource] that can be executed in DataFusion then you will need to use [DefaultTableSource], which is a -wrapper for a [TableProvider]. - -[query planning and execution overview]: https://docs.rs/datafusion/latest/datafusion/index.html#query-planning-and-execution-overview -[architecture guide]: https://docs.rs/datafusion/latest/datafusion/index.html#architecture -[logicalplan]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html -[logicalplanbuilder]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html -[dataframe]: using-the-dataframe-api.md -[logicaltablesource]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalTableSource.html -[defaulttablesource]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html -[tableprovider]: https://docs.rs/datafusion/latest/datafusion/datasource/provider/trait.TableProvider.html -[tablesource]: https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.TableSource.html From 1dd7840a737956ab87ad792362ffaf47d2eef5e6 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Thu, 30 May 2024 19:12:16 -0400 Subject: [PATCH 02/14] Restored wrongly modified file --- .../building-logical-plans.md | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 docs/source/library-user-guide/building-logical-plans.md diff --git a/docs/source/library-user-guide/building-logical-plans.md b/docs/source/library-user-guide/building-logical-plans.md new file mode 100644 index 0000000000000..fe922d8eaeb11 --- /dev/null +++ b/docs/source/library-user-guide/building-logical-plans.md @@ -0,0 +1,149 @@ + + +# Building Logical Plans + +A logical plan is a structured representation of a database query that describes the high-level operations and +transformations needed to retrieve data from a database or data source. It abstracts away specific implementation +details and focuses on the logical flow of the query, including operations like filtering, sorting, and joining tables. + +This logical plan serves as an intermediate step before generating an optimized physical execution plan. This is +explained in more detail in the [Query Planning and Execution Overview] section of the [Architecture Guide]. + +## Building Logical Plans Manually + +DataFusion's [LogicalPlan] is an enum containing variants representing all the supported operators, and also +contains an `Extension` variant that allows projects building on DataFusion to add custom logical operators. + +It is possible to create logical plans by directly creating instances of the [LogicalPlan] enum as follows, but is is +much easier to use the [LogicalPlanBuilder], which is described in the next section. + +Here is an example of building a logical plan directly: + + + +```rust +// create a logical table source +let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), +]); +let table_source = LogicalTableSource::new(SchemaRef::new(schema)); + +// create a TableScan plan +let projection = None; // optional projection +let filters = vec![]; // optional filters to push down +let fetch = None; // optional LIMIT +let table_scan = LogicalPlan::TableScan(TableScan::try_new( + "person", + Arc::new(table_source), + projection, + filters, + fetch, +)?); + +// create a Filter plan that evaluates `id > 500` that wraps the TableScan +let filter_expr = col("id").gt(lit(500)); +let plan = LogicalPlan::Filter(Filter::try_new(filter_expr, Arc::new(table_scan))?); + +// print the plan +println!("{}", plan.display_indent_schema()); +``` + +This example produces the following plan: + +``` +Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N] + TableScan: person [id:Int32;N, name:Utf8;N] +``` + +## Building Logical Plans with LogicalPlanBuilder + +DataFusion logical plans can be created using the [LogicalPlanBuilder] struct. There is also a [DataFrame] API which is +a higher-level API that delegates to [LogicalPlanBuilder]. + +The following associated functions can be used to create a new builder: + +- `empty` - create an empty plan with no fields +- `values` - create a plan from a set of literal values +- `scan` - create a plan representing a table scan +- `scan_with_filters` - create a plan representing a table scan with filters + +Once the builder is created, transformation methods can be called to declare that further operations should be +performed on the plan. Note that all we are doing at this stage is building up the logical plan structure. No query +execution will be performed. + +Here are some examples of transformation methods, but for a full list, refer to the [LogicalPlanBuilder] API documentation. + +- `filter` +- `limit` +- `sort` +- `distinct` +- `join` + +The following example demonstrates building the same simple query plan as the previous example, with a table scan followed by a filter. + + + +```rust +// create a logical table source +let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), +]); +let table_source = LogicalTableSource::new(SchemaRef::new(schema)); + +// optional projection +let projection = None; + +// create a LogicalPlanBuilder for a table scan +let builder = LogicalPlanBuilder::scan("person", Arc::new(table_source), projection)?; + +// perform a filter operation and build the plan +let plan = builder + .filter(col("id").gt(lit(500)))? // WHERE id > 500 + .build()?; + +// print the plan +println!("{}", plan.display_indent_schema()); +``` + +This example produces the following plan: + +``` +Filter: person.id > Int32(500) [id:Int32;N, name:Utf8;N] + TableScan: person [id:Int32;N, name:Utf8;N] +``` + +## Table Sources + +The previous example used a [LogicalTableSource], which is used for tests and documentation in DataFusion, and is also +suitable if you are using DataFusion to build logical plans but do not use DataFusion's physical planner. However, if you +want to use a [TableSource] that can be executed in DataFusion then you will need to use [DefaultTableSource], which is a +wrapper for a [TableProvider]. + +[query planning and execution overview]: https://docs.rs/datafusion/latest/datafusion/index.html#query-planning-and-execution-overview +[architecture guide]: https://docs.rs/datafusion/latest/datafusion/index.html#architecture +[logicalplan]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html +[logicalplanbuilder]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalPlanBuilder.html +[dataframe]: using-the-dataframe-api.md +[logicaltablesource]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/builder/struct.LogicalTableSource.html +[defaulttablesource]: https://docs.rs/datafusion/latest/datafusion/datasource/default_table_source/struct.DefaultTableSource.html +[tableprovider]: https://docs.rs/datafusion/latest/datafusion/datasource/provider/trait.TableProvider.html +[tablesource]: https://docs.rs/datafusion-expr/latest/datafusion_expr/trait.TableSource.html From 1f6e5255eb1a7e7af1a60546ff52869830867d9e Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Mon, 3 Jun 2024 22:38:59 -0400 Subject: [PATCH 03/14] Simplified nested loop test case --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 31 ++----------------- 1 file changed, 2 insertions(+), 29 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 88c4d0867fbbb..ab5d9fa28d348 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -21,7 +21,7 @@ use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; use arrow::record_batch::RecordBatch; use arrow::util::pretty::pretty_format_batches; -use arrow_schema::{Field, Schema}; +use arrow_schema::Schema; use datafusion_common::ScalarValue; use datafusion_physical_expr::expressions::Literal; @@ -299,35 +299,8 @@ impl JoinFuzzTestCase { let on_filter = JoinFilter::new(expression, column_indices, intermediate_schema); - let join_filter = match self.join_filter() { - None => on_filter, - Some(filter) => { - let expr = Arc::new(BinaryExpr::new( - filter.expression().clone(), - Operator::And, - on_filter.expression().clone(), - )) as _; - let mut column_indices = vec![]; - column_indices.extend_from_slice(filter.column_indices()); - column_indices.extend_from_slice(on_filter.column_indices()); - let mut fields: Vec = vec![]; - filter - .schema() - .fields - .into_iter() - .for_each(|field| fields.push(field.as_ref().clone())); - on_filter - .schema() - .fields - .into_iter() - .for_each(|field| fields.push(field.as_ref().clone())); - let schema = Schema::new(fields); - JoinFilter::new(expr, column_indices, schema) - } - }; - Arc::new( - NestedLoopJoinExec::try_new(left, right, Some(join_filter), &self.join_type) + NestedLoopJoinExec::try_new(left, right, Some(on_filter), &self.join_type) .unwrap(), ) } From bc330b92e1dcc30054b5cdc5a949e1a01726e9b4 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Tue, 4 Jun 2024 07:40:45 -0400 Subject: [PATCH 04/14] Adding filtered cases for other tests --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 65 ++++++++++++++++++- 1 file changed, 63 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index ab5d9fa28d348..96c3af5249a4e 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -107,6 +107,18 @@ async fn test_left_join_1k() { .await } +#[tokio::test] +async fn test_left_join_1k_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::Left, + Some(Box::new(less_than_10_join_filter)), + ) + .run_test() + .await +} + #[tokio::test] async fn test_right_join_1k() { JoinFuzzTestCase::new( @@ -118,6 +130,17 @@ async fn test_right_join_1k() { .run_test() .await } +#[tokio::test] +async fn test_right_join_1k_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::Right, + Some(Box::new(less_than_10_join_filter)), + ) + .run_test() + .await +} #[tokio::test] async fn test_full_join_1k() { @@ -131,6 +154,18 @@ async fn test_full_join_1k() { .await } +#[tokio::test] +async fn test_full_join_1k_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::Full, + Some(Box::new(less_than_10_join_filter)), + ) + .run_test() + .await +} + #[tokio::test] async fn test_semi_join_1k() { JoinFuzzTestCase::new( @@ -143,6 +178,18 @@ async fn test_semi_join_1k() { .await } +#[tokio::test] +async fn test_semi_join_1k_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::LeftSemi, + Some(Box::new(less_than_10_join_filter)), + ) + .run_test() + .await +} + #[tokio::test] async fn test_anti_join_1k() { JoinFuzzTestCase::new( @@ -155,12 +202,26 @@ async fn test_anti_join_1k() { .await } +#[tokio::test] +async fn test_anti_join_1k_filtered() { + JoinFuzzTestCase::new( + make_staggered_batches(1000), + make_staggered_batches(1000), + JoinType::LeftAnti, + Some(Box::new(less_than_10_join_filter)), + ) + .run_test() + .await +} + +type JoinFilterBuilder = Box, Arc) -> JoinFilter>; + struct JoinFuzzTestCase { batch_sizes: &'static [usize], input1: Vec, input2: Vec, join_type: JoinType, - join_filter_builder: Option, Arc) -> JoinFilter>>, + join_filter_builder: Option, } impl JoinFuzzTestCase { @@ -168,7 +229,7 @@ impl JoinFuzzTestCase { input1: Vec, input2: Vec, join_type: JoinType, - join_filter_builder: Option, Arc) -> JoinFilter>>, + join_filter_builder: Option, ) -> Self { Self { batch_sizes: &[1, 2, 7, 49, 50, 51, 100], From 2ef4baed05ca386ddd1e9e41a1fd3e092078769e Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Thu, 6 Jun 2024 12:27:39 -0400 Subject: [PATCH 05/14] Checking row count first --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 96c3af5249a4e..d0cd906f6c923 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -401,6 +401,7 @@ impl JoinFuzzTestCase { nlj_formatted.trim().lines().collect(); nlj_formatted_sorted.sort_unstable(); + assert_eq!(smj_formatted_sorted.len(), hj_formatted_sorted.len(), "SortMergeJoinExec and HashJoinExec produced different row counts"); for (i, (smj_line, hj_line)) in smj_formatted_sorted .iter() .zip(&hj_formatted_sorted) From 0e0b5881e635b32d00337475dd53368500813fcf Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Thu, 6 Jun 2024 12:57:05 -0400 Subject: [PATCH 06/14] Making left side nullable --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index d0cd906f6c923..294e689d0b6a1 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -280,8 +280,8 @@ impl JoinFuzzTestCase { let schema1 = self.input1[0].schema(); let schema2 = self.input2[0].schema(); Schema::new(vec![ - schema1.field_with_name("a").unwrap().to_owned(), - schema1.field_with_name("b").unwrap().to_owned(), + schema1.field_with_name("a").unwrap().to_owned().with_nullable(true), + schema1.field_with_name("b").unwrap().to_owned().with_nullable(true), schema2.field_with_name("a").unwrap().to_owned(), schema2.field_with_name("b").unwrap().to_owned(), ]) @@ -401,7 +401,11 @@ impl JoinFuzzTestCase { nlj_formatted.trim().lines().collect(); nlj_formatted_sorted.sort_unstable(); - assert_eq!(smj_formatted_sorted.len(), hj_formatted_sorted.len(), "SortMergeJoinExec and HashJoinExec produced different row counts"); + assert_eq!( + smj_formatted_sorted.len(), + hj_formatted_sorted.len(), + "SortMergeJoinExec and HashJoinExec produced different row counts" + ); for (i, (smj_line, hj_line)) in smj_formatted_sorted .iter() .zip(&hj_formatted_sorted) From 8ae1d0500b1e6cd5ce2f41f9306a37f3de9de4cd Mon Sep 17 00:00:00 2001 From: Oleks V Date: Tue, 11 Jun 2024 09:20:29 -0700 Subject: [PATCH 07/14] Update datafusion/core/tests/fuzz_cases/join_fuzz.rs --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 294e689d0b6a1..49daad3260f71 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -119,6 +119,8 @@ async fn test_left_join_1k_filtered() { .await } +// Add support for RightJoins +#[ignore] #[tokio::test] async fn test_right_join_1k() { JoinFuzzTestCase::new( From d920902386ae71622cd6c1108ca2644d7a1f61bf Mon Sep 17 00:00:00 2001 From: Oleks V Date: Tue, 11 Jun 2024 09:20:35 -0700 Subject: [PATCH 08/14] Update datafusion/core/tests/fuzz_cases/join_fuzz.rs --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 49daad3260f71..02cb097f19396 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -204,6 +204,8 @@ async fn test_anti_join_1k() { .await } +// Test failed for now. https://github.com/apache/datafusion/issues/10872 +#[ignore] #[tokio::test] async fn test_anti_join_1k_filtered() { JoinFuzzTestCase::new( From 1aa2a75fead8d76df27c563383675efb5846073d Mon Sep 17 00:00:00 2001 From: Oleks V Date: Tue, 11 Jun 2024 09:32:41 -0700 Subject: [PATCH 09/14] Update datafusion/core/tests/fuzz_cases/join_fuzz.rs --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 02cb097f19396..c7c85600b9f0c 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -132,6 +132,8 @@ async fn test_right_join_1k() { .run_test() .await } +// Add support for Right filtered joins +#[ignored] #[tokio::test] async fn test_right_join_1k_filtered() { JoinFuzzTestCase::new( From 9f4dfba363e93cffb9e5240892dd98ddd2cb30f5 Mon Sep 17 00:00:00 2001 From: Oleks V Date: Tue, 11 Jun 2024 09:32:50 -0700 Subject: [PATCH 10/14] Update datafusion/core/tests/fuzz_cases/join_fuzz.rs --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index c7c85600b9f0c..dea3bbe393b14 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -119,7 +119,6 @@ async fn test_left_join_1k_filtered() { .await } -// Add support for RightJoins #[ignore] #[tokio::test] async fn test_right_join_1k() { From e6f53999d23373033c618b68b97c4a970b665f16 Mon Sep 17 00:00:00 2001 From: Oleks V Date: Tue, 11 Jun 2024 09:32:58 -0700 Subject: [PATCH 11/14] Update datafusion/core/tests/fuzz_cases/join_fuzz.rs --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index dea3bbe393b14..6eaed6d7210d7 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -119,7 +119,6 @@ async fn test_left_join_1k_filtered() { .await } -#[ignore] #[tokio::test] async fn test_right_join_1k() { JoinFuzzTestCase::new( From 60477dfcd2c47e1eea0667289b1181cc150a7b3a Mon Sep 17 00:00:00 2001 From: Oleks V Date: Tue, 11 Jun 2024 10:45:52 -0700 Subject: [PATCH 12/14] Update datafusion/core/tests/fuzz_cases/join_fuzz.rs --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index 6eaed6d7210d7..f154238b87e83 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -131,7 +131,7 @@ async fn test_right_join_1k() { .await } // Add support for Right filtered joins -#[ignored] +#[ignore] #[tokio::test] async fn test_right_join_1k_filtered() { JoinFuzzTestCase::new( From d68d944f1550258a2ee2b382af4fcadd0dd96cd1 Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Tue, 11 Jun 2024 21:11:55 -0400 Subject: [PATCH 13/14] Running formatter --- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index f154238b87e83..8c2e24de56b9a 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -284,8 +284,16 @@ impl JoinFuzzTestCase { let schema1 = self.input1[0].schema(); let schema2 = self.input2[0].schema(); Schema::new(vec![ - schema1.field_with_name("a").unwrap().to_owned().with_nullable(true), - schema1.field_with_name("b").unwrap().to_owned().with_nullable(true), + schema1 + .field_with_name("a") + .unwrap() + .to_owned() + .with_nullable(true), + schema1 + .field_with_name("b") + .unwrap() + .to_owned() + .with_nullable(true), schema2.field_with_name("a").unwrap().to_owned(), schema2.field_with_name("b").unwrap().to_owned(), ]) From 47617e55af3f54b439dad67c68052fae6772720f Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Tue, 11 Jun 2024 21:21:55 -0400 Subject: [PATCH 14/14] Triggering rebuild after transient failure of cargo doc