From 6222276a839e3194713b3eb418d06c975934aa0b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 8 Sep 2024 09:10:12 +0100 Subject: [PATCH 1/3] Allow using dictionaries as filters --- datafusion/core/tests/dataframe/mod.rs | 59 ++++++++++++++++++- datafusion/expr/src/logical_plan/plan.rs | 14 ++++- .../src/expressions/is_not_null.rs | 36 ++++++++++- .../physical-expr/src/expressions/is_null.rs | 19 +++++- 4 files changed, 123 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 86cacbaa06d87..b32e04ff54a64 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -29,7 +29,7 @@ use arrow::{ }, record_batch::RecordBatch, }; -use arrow_array::{Array, Float32Array, Float64Array, UnionArray}; +use arrow_array::{Array, BooleanArray, DictionaryArray, Float32Array, Float64Array, Int8Array, UnionArray}; use arrow_buffer::ScalarBuffer; use arrow_schema::{ArrowError, UnionFields, UnionMode}; use datafusion_functions_aggregate::count::count_udaf; @@ -2363,3 +2363,60 @@ async fn dense_union_is_null() { ]; assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); } + + +#[tokio::test] +async fn boolean_dictionary_as_filter() { + let values = vec![Some(true), Some(false), None, Some(true)]; + let keys = vec![0, 0, 1, 2, 1, 3, 1]; + let values_array = BooleanArray::from(values); + let keys_array = Int8Array::from(keys); + let array = DictionaryArray::new( + keys_array, + Arc::new(values_array) as Arc, + ); + + let field = Field::new( + "my_dict", + DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Boolean)), + true, + ); + let schema = Arc::new(Schema::new(vec![field])); + + let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap(); + + let ctx = SessionContext::new(); + + ctx.register_batch("dict_batch", batch).unwrap(); + + let df = ctx.table("dict_batch").await.unwrap(); + + // view_all + let expected = [ + "+---------+", + "| my_dict |", + "+---------+", + "| true |", + "| true |", + "| false |", + "| |", + "| false |", + "| true |", + "| false |", + "+---------+", + ]; + assert_batches_eq!(expected, &df.clone().collect().await.unwrap()); + + // filter where is null + let result_df = df.clone().filter(col("my_dict")).unwrap(); + let expected = [ + "+---------+", + "| my_dict |", + "+---------+", + "| true |", + "| true |", + "| true |", + "+---------+", + ]; + assert_batches_eq!(expected, &result_df.collect().await.unwrap()); +} diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ca7d04b9b03ec..8142922c6c587 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2102,6 +2102,17 @@ impl Filter { Self::try_new_internal(predicate, input, true) } + fn is_allowed_filter_type(data_type: &DataType) -> bool { + match data_type { + // Interpret NULL as a missing boolean value. + DataType::Boolean | DataType::Null => true, + DataType::Dictionary(_, value_type) => { + Filter::is_allowed_filter_type(value_type.as_ref()) + } + _ => false, + } + } + fn try_new_internal( predicate: Expr, input: Arc, @@ -2112,8 +2123,7 @@ impl Filter { // construction (such as with correlated subqueries) so we make a best effort here and // ignore errors resolving the expression against the schema. if let Ok(predicate_type) = predicate.get_type(input.schema()) { - // Interpret NULL as a missing boolean value. - if predicate_type != DataType::Boolean && predicate_type != DataType::Null { + if !Filter::is_allowed_filter_type(&predicate_type) { return plan_err!( "Cannot create filter with non-boolean predicate '{predicate}' returning {predicate_type}" ); diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 58559352d44c0..3c2a05ff8498a 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -117,7 +117,7 @@ mod tests { use super::*; use crate::expressions::col; use arrow::{ - array::{BooleanArray, StringArray}, + array::{BooleanArray, StringArray, StringDictionaryBuilder}, datatypes::*, }; use arrow_array::{Array, Float64Array, Int32Array, UnionArray}; @@ -189,4 +189,38 @@ mod tests { assert_eq!(expected, actual); } + + #[test] + fn dictionary_is_not_null() { + let mut builder = StringDictionaryBuilder::::new(); + builder.append("a").unwrap(); + builder.append("").unwrap(); + builder.append_null(); + builder.append("a").unwrap(); + let array = builder.finish(); + + let field = Field::new( + "my_dict", + DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), + true, + ); + + let schema = Schema::new(vec![field]); + let expr = is_not_null(col("my_dict", &schema).unwrap()).unwrap(); + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap(); + + // expression: "a is not null" + let actual = expr + .evaluate(&batch) + .unwrap() + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); + + let actual = as_boolean_array(&actual).unwrap(); + + let expected = &BooleanArray::from(vec![true, true, false, true]); + + assert_eq!(expected, actual); + } } diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 3cdb49bcab42f..c83638c20ef8d 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -181,7 +181,7 @@ mod tests { use super::*; use crate::expressions::col; use arrow::{ - array::{BooleanArray, StringArray}, + array::{BooleanArray, StringArray, StringDictionaryBuilder}, datatypes::*, }; use arrow_array::{Float64Array, Int32Array}; @@ -278,4 +278,21 @@ mod tests { let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); assert_eq!(expected, &result); } + + #[test] + fn dictionary_is_null() { + let mut builder = StringDictionaryBuilder::::new(); + builder.append("a").unwrap(); + builder.append("").unwrap(); + builder.append_null(); + builder.append("a").unwrap(); + let array = builder.finish(); + + let array_ref = Arc::new(array) as ArrayRef; + let result = compute_is_null(array_ref).unwrap(); + + let expected = &BooleanArray::from(vec![false, false, true, false]); + + assert_eq!(expected, &result); + } } From 95195e0a0eb778ea4f391db3ed2a591f2f8d7344 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 8 Sep 2024 09:14:36 +0100 Subject: [PATCH 2/3] revert, nested --- datafusion/core/tests/dataframe/mod.rs | 52 +++++++++++++++++-- .../src/expressions/is_not_null.rs | 36 +------------ .../physical-expr/src/expressions/is_null.rs | 19 +------ 3 files changed, 51 insertions(+), 56 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index b32e04ff54a64..7dd473defbd81 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2375,6 +2375,7 @@ async fn boolean_dictionary_as_filter() { keys_array, Arc::new(values_array) as Arc, ); + let array = Arc::new(array); let field = Field::new( "my_dict", @@ -2383,7 +2384,7 @@ async fn boolean_dictionary_as_filter() { ); let schema = Arc::new(Schema::new(vec![field])); - let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap(); + let batch = RecordBatch::try_new(schema, vec![array.clone()]).unwrap(); let ctx = SessionContext::new(); @@ -2407,7 +2408,6 @@ async fn boolean_dictionary_as_filter() { ]; assert_batches_eq!(expected, &df.clone().collect().await.unwrap()); - // filter where is null let result_df = df.clone().filter(col("my_dict")).unwrap(); let expected = [ "+---------+", @@ -2418,5 +2418,51 @@ async fn boolean_dictionary_as_filter() { "| true |", "+---------+", ]; - assert_batches_eq!(expected, &result_df.collect().await.unwrap()); + assert_batches_eq!(expected, &result_df.collect().await.unwrap()); + + // test nested dictionary + let keys = vec![0, 2]; // 0 -> true, 2 -> false + let keys_array = Int8Array::from(keys); + let nested_array = DictionaryArray::new( + keys_array, + array, + ); + + let field = Field::new( + "my_nested_dict", + DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Boolean)))), + true, + ); + + let schema = Arc::new(Schema::new(vec![field])); + + let batch = RecordBatch::try_new(schema, vec![Arc::new(nested_array)]).unwrap(); + + ctx.register_batch("nested_dict_batch", batch).unwrap(); + + let df = ctx.table("nested_dict_batch").await.unwrap(); + + // view_all + let expected = [ + "+----------------+", + "| my_nested_dict |", + "+----------------+", + "| true |", + "| false |", + "+----------------+", + ]; + + assert_batches_eq!(expected, &df.clone().collect().await.unwrap()); + + let result_df = df.clone().filter(col("my_nested_dict")).unwrap(); + let expected = [ + "+----------------+", + "| my_nested_dict |", + "+----------------+", + "| true |", + "+----------------+", + ]; + + assert_batches_eq!(expected, &result_df.collect().await.unwrap()); + } diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 3c2a05ff8498a..58559352d44c0 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -117,7 +117,7 @@ mod tests { use super::*; use crate::expressions::col; use arrow::{ - array::{BooleanArray, StringArray, StringDictionaryBuilder}, + array::{BooleanArray, StringArray}, datatypes::*, }; use arrow_array::{Array, Float64Array, Int32Array, UnionArray}; @@ -189,38 +189,4 @@ mod tests { assert_eq!(expected, actual); } - - #[test] - fn dictionary_is_not_null() { - let mut builder = StringDictionaryBuilder::::new(); - builder.append("a").unwrap(); - builder.append("").unwrap(); - builder.append_null(); - builder.append("a").unwrap(); - let array = builder.finish(); - - let field = Field::new( - "my_dict", - DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), - true, - ); - - let schema = Schema::new(vec![field]); - let expr = is_not_null(col("my_dict", &schema).unwrap()).unwrap(); - let batch = - RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap(); - - // expression: "a is not null" - let actual = expr - .evaluate(&batch) - .unwrap() - .into_array(batch.num_rows()) - .expect("Failed to convert to array"); - - let actual = as_boolean_array(&actual).unwrap(); - - let expected = &BooleanArray::from(vec![true, true, false, true]); - - assert_eq!(expected, actual); - } } diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index c83638c20ef8d..3cdb49bcab42f 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -181,7 +181,7 @@ mod tests { use super::*; use crate::expressions::col; use arrow::{ - array::{BooleanArray, StringArray, StringDictionaryBuilder}, + array::{BooleanArray, StringArray}, datatypes::*, }; use arrow_array::{Float64Array, Int32Array}; @@ -278,21 +278,4 @@ mod tests { let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); assert_eq!(expected, &result); } - - #[test] - fn dictionary_is_null() { - let mut builder = StringDictionaryBuilder::::new(); - builder.append("a").unwrap(); - builder.append("").unwrap(); - builder.append_null(); - builder.append("a").unwrap(); - let array = builder.finish(); - - let array_ref = Arc::new(array) as ArrayRef; - let result = compute_is_null(array_ref).unwrap(); - - let expected = &BooleanArray::from(vec![false, false, true, false]); - - assert_eq!(expected, &result); - } } From db435c9b71c1d80e4ab9fd74189dd880fdba206a Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 8 Sep 2024 09:40:52 +0100 Subject: [PATCH 3/3] fmt --- datafusion/core/tests/dataframe/mod.rs | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 7dd473defbd81..d7c38f0f5cf1e 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -29,7 +29,10 @@ use arrow::{ }, record_batch::RecordBatch, }; -use arrow_array::{Array, BooleanArray, DictionaryArray, Float32Array, Float64Array, Int8Array, UnionArray}; +use arrow_array::{ + Array, BooleanArray, DictionaryArray, Float32Array, Float64Array, Int8Array, + UnionArray, +}; use arrow_buffer::ScalarBuffer; use arrow_schema::{ArrowError, UnionFields, UnionMode}; use datafusion_functions_aggregate::count::count_udaf; @@ -2364,17 +2367,14 @@ async fn dense_union_is_null() { assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); } - #[tokio::test] async fn boolean_dictionary_as_filter() { let values = vec![Some(true), Some(false), None, Some(true)]; let keys = vec![0, 0, 1, 2, 1, 3, 1]; let values_array = BooleanArray::from(values); let keys_array = Int8Array::from(keys); - let array = DictionaryArray::new( - keys_array, - Arc::new(values_array) as Arc, - ); + let array = + DictionaryArray::new(keys_array, Arc::new(values_array) as Arc); let array = Arc::new(array); let field = Field::new( @@ -2423,14 +2423,17 @@ async fn boolean_dictionary_as_filter() { // test nested dictionary let keys = vec![0, 2]; // 0 -> true, 2 -> false let keys_array = Int8Array::from(keys); - let nested_array = DictionaryArray::new( - keys_array, - array, - ); + let nested_array = DictionaryArray::new(keys_array, array); let field = Field::new( "my_nested_dict", - DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Boolean)))), + DataType::Dictionary( + Box::new(DataType::Int8), + Box::new(DataType::Dictionary( + Box::new(DataType::Int8), + Box::new(DataType::Boolean), + )), + ), true, ); @@ -2464,5 +2467,4 @@ async fn boolean_dictionary_as_filter() { ]; assert_batches_eq!(expected, &result_df.collect().await.unwrap()); - }