From f45f1b099671ee21ab7d9490d97d71c8225c7215 Mon Sep 17 00:00:00 2001 From: JasonLi-cn Date: Mon, 15 Jul 2024 12:02:46 +0800 Subject: [PATCH 1/6] feat: support group by unnest --- datafusion/sql/src/select.rs | 108 ++++++++++++-- datafusion/sql/src/utils.rs | 16 +++ datafusion/sqllogictest/test_files/unnest.slt | 132 +++++++++++++++++- 3 files changed, 243 insertions(+), 13 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 236403e83d74e..4f44d9131619b 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -22,10 +22,12 @@ use crate::planner::{ idents_to_table_reference, ContextProvider, PlannerContext, SqlToRel, }; use crate::utils::{ - check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs, - resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest, + check_columns_satisfy_exprs, extract_aliases, rebase_expr, rebase_expr_by_name, + resolve_aliases_to_exprs, resolve_columns, resolve_positions_to_exprs, + transform_bottom_unnest, }; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; use datafusion_common::{Column, UnnestOptions}; use datafusion_expr::expr::Alias; @@ -611,6 +613,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { group_by_exprs: &[Expr], aggr_exprs: &[Expr], ) -> Result<(LogicalPlan, Vec, Option)> { + let (input, group_by_exprs, has_unnest) = + self.try_process_group_by_unnest(input, group_by_exprs, aggr_exprs)?; + // create the aggregate plan let plan = LogicalPlanBuilder::from(input.clone()) .aggregate(group_by_exprs.to_vec(), aggr_exprs.to_vec())? @@ -651,21 +656,34 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // now attempt to resolve columns and replace with fully-qualified columns let aggr_projection_exprs = aggr_projection_exprs .iter() - .map(|expr| resolve_columns(expr, input)) + .map(|expr| resolve_columns(expr, &input)) .collect::>>()?; // next we replace any expressions that are not a column with a column referencing // an output column from the aggregate schema let column_exprs_post_aggr = aggr_projection_exprs .iter() - .map(|expr| expr_as_column_expr(expr, input)) + .map(|expr| expr_as_column_expr(expr, &input)) .collect::>>()?; // next we re-write the projection - let select_exprs_post_aggr = select_exprs - .iter() - .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input)) - .collect::>>()?; + let select_exprs_post_aggr = if has_unnest { + let aggr_projection_expr_names = aggr_projection_exprs + .iter() + .map(|expr| expr.display_name()) + .collect::>>()?; + select_exprs + .iter() + .map(|expr| { + rebase_expr_by_name(expr, &aggr_projection_expr_names, &input) + }) + .collect::>>()? + } else { + select_exprs + .iter() + .map(|expr| rebase_expr(expr, &aggr_projection_exprs, &input)) + .collect::>>()? + }; // finally, we have some validation that the re-written projection can be resolved // from the aggregate output columns @@ -679,7 +697,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // aggregation. let having_expr_post_aggr = if let Some(having_expr) = having_expr_opt { let having_expr_post_aggr = - rebase_expr(having_expr, &aggr_projection_exprs, input)?; + rebase_expr(having_expr, &aggr_projection_exprs, &input)?; check_columns_satisfy_exprs( &column_exprs_post_aggr, @@ -694,6 +712,78 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Ok((plan, select_exprs_post_aggr, having_expr_post_aggr)) } + + fn try_process_group_by_unnest( + &self, + input: &LogicalPlan, + group_by_exprs: &[Expr], + aggr_exprs: &[Expr], + ) -> Result<(LogicalPlan, Vec, bool)> { + let mut aggr_expr_using_columns: Option> = None; + + // rewrite group_by_exprs + let mut intermediate_plan = input.clone(); + let mut intermediate_select_exprs = group_by_exprs.to_vec(); + let mut has_unnest = false; + + loop { + let mut unnest_columns = vec![]; + let mut inner_projection_exprs = vec![]; + + let outer_projection_exprs: Vec = intermediate_select_exprs + .iter() + .map(|expr| { + transform_bottom_unnest( + &intermediate_plan, + &mut unnest_columns, + &mut inner_projection_exprs, + expr, + ) + }) + .collect::>>()? + .into_iter() + .flatten() + .collect(); + + if unnest_columns.is_empty() { + break; + } else { + let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); + let unnest_options = UnnestOptions::new().with_preserve_nulls(false); + + let mut projection_exprs = match &aggr_expr_using_columns { + Some(exprs) => (*exprs).clone(), + None => { + let mut columns = HashSet::new(); + for expr in aggr_exprs { + expr.apply(|expr| { + if let Expr::Column(c) = expr { + columns.insert(Expr::Column(c.clone())); + } + Ok(TreeNodeRecursion::Continue) + }) + // As the closure always returns Ok, this "can't" error + .expect("Unexpected error"); + } + aggr_expr_using_columns = Some(columns.clone()); + columns + } + }; + projection_exprs.extend(inner_projection_exprs); + + let plan = LogicalPlanBuilder::from(intermediate_plan.clone()) + .project(projection_exprs)? + .unnest_columns_with_options(columns, unnest_options)? + .build()?; + + intermediate_plan = plan; + intermediate_select_exprs = outer_projection_exprs; + has_unnest = true; + } + } + + Ok((intermediate_plan, intermediate_select_exprs, has_unnest)) + } } // If there are any multiple-defined windows, we raise an error. diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 2eacbd174fc24..9172d51b6eacf 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -85,6 +85,22 @@ pub(crate) fn rebase_expr( .data() } +pub(crate) fn rebase_expr_by_name( + expr: &Expr, + base_expr_names: &[String], + plan: &LogicalPlan, +) -> Result { + expr.clone() + .transform_down(|nested_expr| { + if base_expr_names.contains(&nested_expr.display_name()?) { + Ok(Transformed::yes(expr_as_column_expr(&nested_expr, plan)?)) + } else { + Ok(Transformed::no(nested_expr)) + } + }) + .data() +} + /// Determines if the set of `Expr`'s are a valid projection on the input /// `Expr::Column`'s. pub(crate) fn check_columns_satisfy_exprs( diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 06733f7b1e40e..df85f8299fe88 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -496,12 +496,10 @@ select unnest(column1) from (select * from (values([1,2,3]), ([4,5,6])) limit 1 5 6 -## FIXME: https://github.com/apache/datafusion/issues/11198 +## FIXME: https://github.com/apache/datafusion/issues/11198 query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0 and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them. select unnest(column1), unnest(column1) from unnest_table; -statement ok -drop table unnest_table; ## unnest list followed by unnest struct query ??? @@ -556,4 +554,130 @@ physical_plan 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 06)----------UnnestExec 07)------------ProjectionExec: expr=[column3@0 as unnest(recursive_unnest_table.column3), column3@0 as column3] -08)--------------MemoryExec: partitions=1, partition_sizes=[1] \ No newline at end of file +08)--------------MemoryExec: partitions=1, partition_sizes=[1] + + +## group by unnest + +### without agg exprs +query I +select unnest(column1) c1 from unnest_table group by c1 order by c1; +---- +1 +2 +3 +4 +5 +6 +12 + +query II +select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1, c2 order by c1, c2; +---- +1 7 +2 NULL +3 NULL +4 8 +5 9 +6 11 +12 NULL +NULL 10 +NULL 12 +NULL 42 +NULL NULL + +query III +select unnest(column1) c1, unnest(column2) c2, column3 c3 from unnest_table group by c1, c2, c3 order by c1, c2, c3; +---- +1 7 1 +2 NULL 1 +3 NULL 1 +4 8 2 +5 9 2 +6 11 3 +12 NULL NULL +NULL 10 2 +NULL 12 3 +NULL 42 NULL +NULL NULL NULL + +### with agg exprs + +query IIII +select unnest(column1) c1, unnest(column2) c2, column3 c3, count(1) from unnest_table group by c1, c2, c3 order by c1, c2, c3; +---- +1 7 1 1 +2 NULL 1 1 +3 NULL 1 1 +4 8 2 1 +5 9 2 1 +6 11 3 1 +12 NULL NULL 1 +NULL 10 2 1 +NULL 12 3 1 +NULL 42 NULL 1 +NULL NULL NULL 1 + +query IIII +select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4) from unnest_table group by c1, c2, c3 order by c1, c2, c3; +---- +1 7 1 1 +2 NULL 1 1 +3 NULL 1 1 +4 8 2 1 +5 9 2 1 +6 11 3 0 +12 NULL NULL 0 +NULL 10 2 1 +NULL 12 3 0 +NULL 42 NULL 0 +NULL NULL NULL 0 + +query IIIII +select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4), sum(column3) from unnest_table group by c1, c2, c3 order by c1, c2, c3; +---- +1 7 1 1 1 +2 NULL 1 1 1 +3 NULL 1 1 1 +4 8 2 1 2 +5 9 2 1 2 +6 11 3 0 3 +12 NULL NULL 0 NULL +NULL 10 2 1 2 +NULL 12 3 0 3 +NULL 42 NULL 0 NULL +NULL NULL NULL 0 NULL + +### group by recursive unnest list + +query ? +select unnest(unnest(column2)) c2 from recursive_unnest_table group by c2 order by c2; +---- +[1] +[1, 1] +[2] +[3, 4] +[5] +[7, 8] +[, 6] +NULL + +query ?I +select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table group by c2 order by c2; +---- +[1] 1 +[1, 1] 1 +[2] 1 +[3, 4] 1 +[5] 1 +[7, 8] 1 +[, 6] 1 +NULL 1 + +### TODO: group by unnest struct + +query error +select unnest(column1) c1 from nested_unnest_table group by c1.c0; +---- +DataFusion error: Internal error: unnest on struct can ony be applied at the root level of select expression. +This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker From 30129360c7e9b89db9542106de743776a1a504df Mon Sep 17 00:00:00 2001 From: JasonLi-cn Date: Mon, 15 Jul 2024 12:32:19 +0800 Subject: [PATCH 2/6] pass slt --- datafusion/sqllogictest/test_files/unnest.slt | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index df85f8299fe88..5b4c0a2537f58 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -675,9 +675,5 @@ select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table gr NULL 1 ### TODO: group by unnest struct - -query error +query error DataFusion error: Internal error: unnest on struct can ony be applied at the root level of select expression select unnest(column1) c1 from nested_unnest_table group by c1.c0; ----- -DataFusion error: Internal error: unnest on struct can ony be applied at the root level of select expression. -This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker From 8d7937dd721bb5e29c31271ade1b7d7d98cb9a1d Mon Sep 17 00:00:00 2001 From: JasonLi-cn Date: Tue, 16 Jul 2024 00:06:27 +0800 Subject: [PATCH 3/6] refactor: mv process_group_by_unnest into try_process_unnest --- datafusion/sql/src/select.rs | 208 +++++++++--------- datafusion/sql/src/utils.rs | 16 -- datafusion/sqllogictest/test_files/unnest.slt | 2 +- 3 files changed, 110 insertions(+), 116 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 4f44d9131619b..faf71d4ea70e5 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -22,9 +22,8 @@ use crate::planner::{ idents_to_table_reference, ContextProvider, PlannerContext, SqlToRel, }; use crate::utils::{ - check_columns_satisfy_exprs, extract_aliases, rebase_expr, rebase_expr_by_name, - resolve_aliases_to_exprs, resolve_columns, resolve_positions_to_exprs, - transform_bottom_unnest, + check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs, + resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest, }; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; @@ -39,7 +38,7 @@ use datafusion_expr::utils::{ find_aggregate_exprs, find_window_exprs, }; use datafusion_expr::{ - Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, + Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, }; use sqlparser::ast::{ Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, @@ -299,6 +298,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { input: LogicalPlan, select_exprs: Vec, ) -> Result { + // Try process group by unnest + let input = self.try_process_aggregate_unnest(input)?; + let mut intermediate_plan = input; let mut intermediate_select_exprs = select_exprs; // Each expr in select_exprs can contains multiple unnest stage @@ -356,6 +358,102 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .build() } + fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result { + match &input { + LogicalPlan::Aggregate(agg) => { + let (new_input, new_group_by_exprs) = + self.try_process_group_by_unnest(agg)?; + LogicalPlanBuilder::from(new_input) + .aggregate(new_group_by_exprs, agg.aggr_expr.clone())? + .build() + } + LogicalPlan::Filter(filter) => match filter.input.as_ref() { + LogicalPlan::Aggregate(agg) => { + let (new_input, new_group_by_exprs) = + self.try_process_group_by_unnest(agg)?; + LogicalPlanBuilder::from(new_input) + .aggregate(new_group_by_exprs, agg.aggr_expr.clone())? + .filter(filter.predicate.clone())? + .build() + } + _ => Ok(input), + }, + _ => Ok(input), + } + } + + fn try_process_group_by_unnest( + &self, + agg: &Aggregate, + ) -> Result<(LogicalPlan, Vec)> { + let mut aggr_expr_using_columns: Option> = None; + + let input = agg.input.as_ref(); + let group_by_exprs = &agg.group_expr; + let aggr_exprs = &agg.aggr_expr; + + // rewrite group_by_exprs + let mut intermediate_plan = input.clone(); + let mut intermediate_select_exprs = group_by_exprs.to_vec(); + + loop { + let mut unnest_columns = vec![]; + let mut inner_projection_exprs = vec![]; + + let outer_projection_exprs: Vec = intermediate_select_exprs + .iter() + .map(|expr| { + transform_bottom_unnest( + &intermediate_plan, + &mut unnest_columns, + &mut inner_projection_exprs, + expr, + ) + }) + .collect::>>()? + .into_iter() + .flatten() + .collect(); + + if unnest_columns.is_empty() { + break; + } else { + let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); + let unnest_options = UnnestOptions::new().with_preserve_nulls(false); + + let mut projection_exprs = match &aggr_expr_using_columns { + Some(exprs) => (*exprs).clone(), + None => { + let mut columns = HashSet::new(); + for expr in aggr_exprs { + expr.apply(|expr| { + if let Expr::Column(c) = expr { + columns.insert(Expr::Column(c.clone())); + } + Ok(TreeNodeRecursion::Continue) + }) + // As the closure always returns Ok, this "can't" error + .expect("Unexpected error"); + } + aggr_expr_using_columns = Some(columns.clone()); + columns + } + }; + projection_exprs.extend(inner_projection_exprs); + + let plan = LogicalPlanBuilder::from(intermediate_plan.clone()) + .project(projection_exprs)? + .unnest_columns_with_options(columns, unnest_options)? + .build()?; + + intermediate_plan = plan; + intermediate_select_exprs = outer_projection_exprs; + } + } + + Ok((intermediate_plan, intermediate_select_exprs)) + } + fn plan_selection( &self, selection: Option, @@ -613,9 +711,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { group_by_exprs: &[Expr], aggr_exprs: &[Expr], ) -> Result<(LogicalPlan, Vec, Option)> { - let (input, group_by_exprs, has_unnest) = - self.try_process_group_by_unnest(input, group_by_exprs, aggr_exprs)?; - // create the aggregate plan let plan = LogicalPlanBuilder::from(input.clone()) .aggregate(group_by_exprs.to_vec(), aggr_exprs.to_vec())? @@ -656,34 +751,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // now attempt to resolve columns and replace with fully-qualified columns let aggr_projection_exprs = aggr_projection_exprs .iter() - .map(|expr| resolve_columns(expr, &input)) + .map(|expr| resolve_columns(expr, input)) .collect::>>()?; // next we replace any expressions that are not a column with a column referencing // an output column from the aggregate schema let column_exprs_post_aggr = aggr_projection_exprs .iter() - .map(|expr| expr_as_column_expr(expr, &input)) + .map(|expr| expr_as_column_expr(expr, input)) .collect::>>()?; // next we re-write the projection - let select_exprs_post_aggr = if has_unnest { - let aggr_projection_expr_names = aggr_projection_exprs - .iter() - .map(|expr| expr.display_name()) - .collect::>>()?; - select_exprs - .iter() - .map(|expr| { - rebase_expr_by_name(expr, &aggr_projection_expr_names, &input) - }) - .collect::>>()? - } else { - select_exprs - .iter() - .map(|expr| rebase_expr(expr, &aggr_projection_exprs, &input)) - .collect::>>()? - }; + let select_exprs_post_aggr = select_exprs + .iter() + .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input)) + .collect::>>()?; // finally, we have some validation that the re-written projection can be resolved // from the aggregate output columns @@ -697,7 +779,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // aggregation. let having_expr_post_aggr = if let Some(having_expr) = having_expr_opt { let having_expr_post_aggr = - rebase_expr(having_expr, &aggr_projection_exprs, &input)?; + rebase_expr(having_expr, &aggr_projection_exprs, input)?; check_columns_satisfy_exprs( &column_exprs_post_aggr, @@ -712,78 +794,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Ok((plan, select_exprs_post_aggr, having_expr_post_aggr)) } - - fn try_process_group_by_unnest( - &self, - input: &LogicalPlan, - group_by_exprs: &[Expr], - aggr_exprs: &[Expr], - ) -> Result<(LogicalPlan, Vec, bool)> { - let mut aggr_expr_using_columns: Option> = None; - - // rewrite group_by_exprs - let mut intermediate_plan = input.clone(); - let mut intermediate_select_exprs = group_by_exprs.to_vec(); - let mut has_unnest = false; - - loop { - let mut unnest_columns = vec![]; - let mut inner_projection_exprs = vec![]; - - let outer_projection_exprs: Vec = intermediate_select_exprs - .iter() - .map(|expr| { - transform_bottom_unnest( - &intermediate_plan, - &mut unnest_columns, - &mut inner_projection_exprs, - expr, - ) - }) - .collect::>>()? - .into_iter() - .flatten() - .collect(); - - if unnest_columns.is_empty() { - break; - } else { - let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); - let unnest_options = UnnestOptions::new().with_preserve_nulls(false); - - let mut projection_exprs = match &aggr_expr_using_columns { - Some(exprs) => (*exprs).clone(), - None => { - let mut columns = HashSet::new(); - for expr in aggr_exprs { - expr.apply(|expr| { - if let Expr::Column(c) = expr { - columns.insert(Expr::Column(c.clone())); - } - Ok(TreeNodeRecursion::Continue) - }) - // As the closure always returns Ok, this "can't" error - .expect("Unexpected error"); - } - aggr_expr_using_columns = Some(columns.clone()); - columns - } - }; - projection_exprs.extend(inner_projection_exprs); - - let plan = LogicalPlanBuilder::from(intermediate_plan.clone()) - .project(projection_exprs)? - .unnest_columns_with_options(columns, unnest_options)? - .build()?; - - intermediate_plan = plan; - intermediate_select_exprs = outer_projection_exprs; - has_unnest = true; - } - } - - Ok((intermediate_plan, intermediate_select_exprs, has_unnest)) - } } // If there are any multiple-defined windows, we raise an error. diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 9172d51b6eacf..2eacbd174fc24 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -85,22 +85,6 @@ pub(crate) fn rebase_expr( .data() } -pub(crate) fn rebase_expr_by_name( - expr: &Expr, - base_expr_names: &[String], - plan: &LogicalPlan, -) -> Result { - expr.clone() - .transform_down(|nested_expr| { - if base_expr_names.contains(&nested_expr.display_name()?) { - Ok(Transformed::yes(expr_as_column_expr(&nested_expr, plan)?)) - } else { - Ok(Transformed::no(nested_expr)) - } - }) - .data() -} - /// Determines if the set of `Expr`'s are a valid projection on the input /// `Expr::Column`'s. pub(crate) fn check_columns_satisfy_exprs( diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index 5b4c0a2537f58..d5964717f6629 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -675,5 +675,5 @@ select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table gr NULL 1 ### TODO: group by unnest struct -query error DataFusion error: Internal error: unnest on struct can ony be applied at the root level of select expression +query error DataFusion error: Error during planning: Projection references non\-aggregate values select unnest(column1) c1 from nested_unnest_table group by c1.c0; From 17a5158ab448bfad9fc18732fbf3d073efbd3f8f Mon Sep 17 00:00:00 2001 From: JasonLi-cn Date: Tue, 16 Jul 2024 10:23:33 +0800 Subject: [PATCH 4/6] chore: add some documentation comments and tests --- datafusion/sql/src/select.rs | 22 ++++++++++++++++--- datafusion/sqllogictest/test_files/unnest.slt | 11 ++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index faf71d4ea70e5..f6c212a4b3c94 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -382,6 +382,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } + /// Try converting Unnest(Expr) of group by to Unnest/Projection + /// Return the new input and group_by_exprs of Aggregate. fn try_process_group_by_unnest( &self, agg: &Aggregate, @@ -392,7 +394,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let group_by_exprs = &agg.group_expr; let aggr_exprs = &agg.aggr_expr; - // rewrite group_by_exprs + // process unnest of group_by_exprs, and input of agg will be rewritten + // for example: + // + // ``` + // Aggregate: groupBy=[[UNNEST(Column(Column { relation: Some(Bare { table: "tab" }), name: "array_col" }))]], aggr=[[]] + // TableScan: tab + // ``` + // + // will be transformed into + // + // ``` + // Aggregate: groupBy=[[unnest(tab.array_col)]], aggr=[[]] + // Unnest: lists[unnest(tab.array_col)] structs[] + // Projection: tab.array_col AS unnest(tab.array_col) + // TableScan: tab + // ``` let mut intermediate_plan = input.clone(); let mut intermediate_select_exprs = group_by_exprs.to_vec(); @@ -441,12 +458,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; projection_exprs.extend(inner_projection_exprs); - let plan = LogicalPlanBuilder::from(intermediate_plan.clone()) + intermediate_plan = LogicalPlanBuilder::from(intermediate_plan) .project(projection_exprs)? .unnest_columns_with_options(columns, unnest_options)? .build()?; - intermediate_plan = plan; intermediate_select_exprs = outer_projection_exprs; } } diff --git a/datafusion/sqllogictest/test_files/unnest.slt b/datafusion/sqllogictest/test_files/unnest.slt index d5964717f6629..61eab011ea95b 100644 --- a/datafusion/sqllogictest/test_files/unnest.slt +++ b/datafusion/sqllogictest/test_files/unnest.slt @@ -648,6 +648,17 @@ NULL 12 3 0 3 NULL 42 NULL 0 NULL NULL NULL NULL 0 NULL +query II +select unnest(column1), count(*) from unnest_table group by unnest(column1) order by unnest(column1) desc; +---- +12 1 +6 1 +5 1 +4 1 +3 1 +2 1 +1 1 + ### group by recursive unnest list query ? From 810ca8b802d643ee73f39179b5ff2225f8e3dc73 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 16 Jul 2024 16:28:18 -0400 Subject: [PATCH 5/6] Avoid cloning input --- datafusion/sql/src/select.rs | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index f6c212a4b3c94..b41d12628a895 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -33,6 +33,7 @@ use datafusion_expr::expr::Alias; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, }; +use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::utils::{ expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, @@ -359,24 +360,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result { - match &input { + match input { LogicalPlan::Aggregate(agg) => { + let agg_expr = agg.aggr_expr.clone(); let (new_input, new_group_by_exprs) = self.try_process_group_by_unnest(agg)?; LogicalPlanBuilder::from(new_input) - .aggregate(new_group_by_exprs, agg.aggr_expr.clone())? + .aggregate(new_group_by_exprs, agg_expr)? .build() } - LogicalPlan::Filter(filter) => match filter.input.as_ref() { - LogicalPlan::Aggregate(agg) => { - let (new_input, new_group_by_exprs) = - self.try_process_group_by_unnest(agg)?; - LogicalPlanBuilder::from(new_input) - .aggregate(new_group_by_exprs, agg.aggr_expr.clone())? - .filter(filter.predicate.clone())? - .build() - } - _ => Ok(input), + LogicalPlan::Filter(mut filter) => { + filter.input = Arc::new(self.try_process_aggregate_unnest(unwrap_arc(filter.input))?); + Ok(LogicalPlan::Filter(filter)) }, _ => Ok(input), } @@ -386,13 +381,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { /// Return the new input and group_by_exprs of Aggregate. fn try_process_group_by_unnest( &self, - agg: &Aggregate, + agg: Aggregate, ) -> Result<(LogicalPlan, Vec)> { let mut aggr_expr_using_columns: Option> = None; - let input = agg.input.as_ref(); - let group_by_exprs = &agg.group_expr; - let aggr_exprs = &agg.aggr_expr; + let Aggregate { + input, + group_expr: group_by_exprs, + aggr_expr: aggr_exprs, + .. + } = agg; // process unnest of group_by_exprs, and input of agg will be rewritten // for example: @@ -410,8 +408,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Projection: tab.array_col AS unnest(tab.array_col) // TableScan: tab // ``` - let mut intermediate_plan = input.clone(); - let mut intermediate_select_exprs = group_by_exprs.to_vec(); + let mut intermediate_plan = unwrap_arc(input); + let mut intermediate_select_exprs = group_by_exprs; loop { let mut unnest_columns = vec![]; @@ -442,7 +440,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Some(exprs) => (*exprs).clone(), None => { let mut columns = HashSet::new(); - for expr in aggr_exprs { + for expr in &aggr_exprs { expr.apply(|expr| { if let Expr::Column(c) = expr { columns.insert(Expr::Column(c.clone())); From 2309e8b2d9906a8ab79a5fb12d77bc5e67b36c81 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 16 Jul 2024 16:32:17 -0400 Subject: [PATCH 6/6] use consistent field names --- datafusion/sql/src/select.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index b41d12628a895..a3ef039018525 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -370,9 +370,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .build() } LogicalPlan::Filter(mut filter) => { - filter.input = Arc::new(self.try_process_aggregate_unnest(unwrap_arc(filter.input))?); + filter.input = Arc::new( + self.try_process_aggregate_unnest(unwrap_arc(filter.input))?, + ); Ok(LogicalPlan::Filter(filter)) - }, + } _ => Ok(input), } } @@ -387,8 +389,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let Aggregate { input, - group_expr: group_by_exprs, - aggr_expr: aggr_exprs, + group_expr, + aggr_expr, .. } = agg; @@ -409,7 +411,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // TableScan: tab // ``` let mut intermediate_plan = unwrap_arc(input); - let mut intermediate_select_exprs = group_by_exprs; + let mut intermediate_select_exprs = group_expr; loop { let mut unnest_columns = vec![]; @@ -440,7 +442,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Some(exprs) => (*exprs).clone(), None => { let mut columns = HashSet::new(); - for expr in &aggr_exprs { + for expr in &aggr_expr { expr.apply(|expr| { if let Expr::Column(c) = expr { columns.insert(Expr::Column(c.clone()));