From 62bfa8b108dae69d657c1da084bed7a26c812fb6 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 31 Aug 2025 01:26:48 -0700 Subject: [PATCH 01/40] lazy_eval_coalesce --- .../apache/comet/CometExpressionSuite.scala | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 55b5121dd6..6ccf0f7e2b 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + import testImplicits._ override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit @@ -367,15 +368,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") checkSparkAnswerAndOperator(""" - |SELECT - | try_divide(10, 0), - | try_divide(NULL, 5), - | try_divide(5, NULL), - | try_divide(-2147483648, -1), - | try_divide(-9223372036854775808, -1), - | try_divide(DECIMAL('9999999999999999999999999999'), 0.1) - | from tbl - |""".stripMargin) + |SELECT + | try_divide(10, 0), + | try_divide(NULL, 5), + | try_divide(5, NULL), + | try_divide(-2147483648, -1), + | try_divide(-9223372036854775808, -1), + | try_divide(DECIMAL('9999999999999999999999999999'), 0.1) + | from tbl + |""".stripMargin) } } @@ -384,13 +385,28 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") checkSparkAnswerAndOperator(""" - |SELECT try_divide(-128, -1), - |try_divide(-32768, -1), - |try_divide(-2147483648, -1), - |try_divide(-9223372036854775808, -1), - |try_divide(CAST(99999 AS DECIMAL(5,0)), CAST(0.0001 AS DECIMAL(5,4))) - |from tbl - |""".stripMargin) + |SELECT try_divide(-128, -1), + |try_divide(-32768, -1), + |try_divide(-2147483648, -1), + |try_divide(-9223372036854775808, -1), + |try_divide(CAST(99999 AS DECIMAL(5,0)), CAST(0.0001 AS DECIMAL(5,4))) + |from tbl + |""".stripMargin) + } + } + + test("test coalesce lazy eval") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { + val data = Seq((100, 0)) + withParquetTable(data, "t1") { + val res = spark.sql(""" + |SELECT coalesce(_1 , 1/0) from t1; + | """.stripMargin) + + res.explain(true) + + checkSparkAnswer(res) + } } } From e78bc9931e8fe5e5cb851cf36f796484be595ebf Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 31 Aug 2025 15:14:52 -0700 Subject: [PATCH 02/40] lazy_coalesce_fallback_case_statement --- .../apache/comet/CometExpressionSuite.scala | 42 ++++++++----------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 6ccf0f7e2b..b0632f2815 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -44,7 +44,6 @@ import org.apache.spark.sql.types._ import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { - import testImplicits._ override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit @@ -368,15 +367,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") checkSparkAnswerAndOperator(""" - |SELECT - | try_divide(10, 0), - | try_divide(NULL, 5), - | try_divide(5, NULL), - | try_divide(-2147483648, -1), - | try_divide(-9223372036854775808, -1), - | try_divide(DECIMAL('9999999999999999999999999999'), 0.1) - | from tbl - |""".stripMargin) + |SELECT + | try_divide(10, 0), + | try_divide(NULL, 5), + | try_divide(5, NULL), + | try_divide(-2147483648, -1), + | try_divide(-9223372036854775808, -1), + | try_divide(DECIMAL('9999999999999999999999999999'), 0.1) + | from tbl + |""".stripMargin) } } @@ -385,13 +384,13 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withParquetTable(data, "tbl") { checkSparkAnswerAndOperator("SELECT try_divide(_1, _2) FROM tbl") checkSparkAnswerAndOperator(""" - |SELECT try_divide(-128, -1), - |try_divide(-32768, -1), - |try_divide(-2147483648, -1), - |try_divide(-9223372036854775808, -1), - |try_divide(CAST(99999 AS DECIMAL(5,0)), CAST(0.0001 AS DECIMAL(5,4))) - |from tbl - |""".stripMargin) + |SELECT try_divide(-128, -1), + |try_divide(-32768, -1), + |try_divide(-2147483648, -1), + |try_divide(-9223372036854775808, -1), + |try_divide(CAST(99999 AS DECIMAL(5,0)), CAST(0.0001 AS DECIMAL(5,4))) + |from tbl + |""".stripMargin) } } @@ -400,15 +399,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val data = Seq((100, 0)) withParquetTable(data, "t1") { val res = spark.sql(""" - |SELECT coalesce(_1 , 1/0) from t1; - | """.stripMargin) - - res.explain(true) - + |SELECT coalesce(_1 , 1/0) from t1; + | """.stripMargin) checkSparkAnswer(res) } - } - } test("test coalesce lazy eval") { withSQLConf( From 2f8701ec00c96bdaa0f8efb57aabc99ceb5547aa Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 31 Aug 2025 15:52:11 -0700 Subject: [PATCH 03/40] lazy_coalesce_fallback_case_statement --- .../scala/org/apache/comet/CometExpressionSuite.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index b0632f2815..2afa15434f 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -398,11 +398,14 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { val data = Seq((100, 0)) withParquetTable(data, "t1") { - val res = spark.sql(""" - |SELECT coalesce(_1 , 1/0) from t1; - | """.stripMargin) + val res = spark.sql( + """ + |SELECT coalesce(_1 , 1/0) from t1; + | """.stripMargin) checkSparkAnswer(res) } + } + } test("test coalesce lazy eval") { withSQLConf( From bcda6c3e3ab6f9104896e4c7300ffab5ac88716d Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 31 Aug 2025 16:57:22 -0700 Subject: [PATCH 04/40] lazy_coalesce_fallback_case_statement --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 2afa15434f..c534035d25 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -398,8 +398,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { val data = Seq((100, 0)) withParquetTable(data, "t1") { - val res = spark.sql( - """ + val res = spark.sql(""" |SELECT coalesce(_1 , 1/0) from t1; | """.stripMargin) checkSparkAnswer(res) From f9ec308e5ed622fad5b46ac4f633a1673efd2d54 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Tue, 2 Sep 2025 15:29:57 -0700 Subject: [PATCH 05/40] lazy_coalesce_fallback_case_statement --- .../scala/org/apache/comet/CometExpressionSuite.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index c534035d25..c045007225 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -395,13 +395,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("test coalesce lazy eval") { - withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { - val data = Seq((100, 0)) + withSQLConf( + SQLConf.ANSI_ENABLED.key -> "true", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + val data = Seq((9999999999999L, 0)) withParquetTable(data, "t1") { val res = spark.sql(""" - |SELECT coalesce(_1 , 1/0) from t1; + |SELECT coalesce(_1, CAST(_1 AS TINYINT)) from t1; | """.stripMargin) - checkSparkAnswer(res) + checkSparkAnswerAndOperator(res) } } } From 85acae510e95068b02e9b3cb4599755e8b930a7d Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Tue, 12 Aug 2025 18:04:05 -0700 Subject: [PATCH 06/40] init_ansi_mode_enabled --- native/core/src/execution/planner.rs | 10 ++- native/spark-expr/src/comet_scalar_funcs.rs | 38 ++++++++-- native/spark-expr/src/lib.rs | 5 +- .../src/math_funcs/checked_arithmetic.rs | 76 +++++++++++++++++-- 4 files changed, 110 insertions(+), 19 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 1465d33ade..98d121e5af 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -62,8 +62,9 @@ use datafusion::{ prelude::SessionContext, }; use datafusion_comet_spark_expr::{ - create_comet_physical_fun, create_modulo_expr, create_negate_expr, BloomFilterAgg, - BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, SparkSecond, + create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, create_modulo_expr, + create_negate_expr, BloomFilterAgg, BloomFilterMightContain, EvalMode, SparkHour, SparkMinute, + SparkSecond, }; use crate::execution::operators::ExecutionError::GeneralError; @@ -1007,7 +1008,7 @@ impl PhysicalPlanner { } _ => { let data_type = return_type.map(to_arrow_datatype).unwrap(); - if eval_mode == EvalMode::Try && data_type.is_integer() { + if [EvalMode::Try, EvalMode::Ansi].contains(&eval_mode) && data_type.is_integer() { let op_str = match op { DataFusionOperator::Plus => "checked_add", DataFusionOperator::Minus => "checked_sub", @@ -1017,11 +1018,12 @@ impl PhysicalPlanner { todo!("Operator yet to be implemented!"); } }; - let fun_expr = create_comet_physical_fun( + let fun_expr = create_comet_physical_fun_with_eval_mode( op_str, data_type.clone(), &self.session_ctx.state(), None, + eval_mode, )?; Ok(Arc::new(ScalarFunctionExpr::new( op_str, diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 93a820ba9a..251a02888e 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -21,7 +21,7 @@ use crate::math_funcs::modulo_expr::spark_modulo; use crate::{ spark_array_repeat, spark_ceil, spark_date_add, spark_date_sub, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_hex, spark_isnan, spark_make_decimal, - spark_read_side_padding, spark_round, spark_rpad, spark_unhex, spark_unscaled_value, + spark_read_side_padding, spark_round, spark_rpad, spark_unhex, spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkBitwiseGet, SparkBitwiseNot, SparkDateTrunc, SparkStringSpace, }; use arrow::datatypes::DataType; @@ -64,15 +64,41 @@ macro_rules! make_comet_scalar_udf { ); Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func))) }}; + ($name:expr, $func:ident, $data_type:ident, $eval_mode:ident) => {{ + let scalar_func = CometScalarFunction::new( + $name.to_string(), + Signature::variadic_any(Volatility::Immutable), + $data_type.clone(), + Arc::new(move |args| $func(args, &$data_type, $eval_mode)), + ); + Ok(Arc::new(ScalarUDF::new_from_impl(scalar_func))) + }}; } -/// Create a physical scalar function. pub fn create_comet_physical_fun( fun_name: &str, data_type: DataType, registry: &dyn FunctionRegistry, fail_on_error: Option, ) -> Result, DataFusionError> { + create_comet_physical_fun_with_eval_mode( + fun_name, + data_type, + registry, + fail_on_error, + EvalMode::Legacy, + ) +} + +/// Create a physical scalar function. +pub fn create_comet_physical_fun_with_eval_mode( + fun_name: &str, + data_type: DataType, + registry: &dyn FunctionRegistry, + fail_on_error: Option, + eval_mode: EvalMode, +) -> Result, DataFusionError> { + println!("creating spark physical plan with eval mode ANSI"); match fun_name { "ceil" => { make_comet_scalar_udf!("ceil", spark_ceil, data_type) @@ -117,16 +143,16 @@ pub fn create_comet_physical_fun( ) } "checked_add" => { - make_comet_scalar_udf!("checked_add", checked_add, data_type) + make_comet_scalar_udf!("checked_add", checked_add, data_type, eval_mode) } "checked_sub" => { - make_comet_scalar_udf!("checked_sub", checked_sub, data_type) + make_comet_scalar_udf!("checked_sub", checked_sub, data_type, eval_mode) } "checked_mul" => { - make_comet_scalar_udf!("checked_mul", checked_mul, data_type) + make_comet_scalar_udf!("checked_mul", checked_mul, data_type, eval_mode) } "checked_div" => { - make_comet_scalar_udf!("checked_div", checked_div, data_type) + make_comet_scalar_udf!("checked_div", checked_div, data_type, eval_mode) } "murmur3_hash" => { let func = Arc::new(spark_murmur3_hash); diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 4b29b61775..2031b21c57 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -64,7 +64,10 @@ pub use conditional_funcs::*; pub use conversion_funcs::*; pub use nondetermenistic_funcs::*; -pub use comet_scalar_funcs::{create_comet_physical_fun, register_all_comet_functions}; +pub use comet_scalar_funcs::{ + create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, + register_all_comet_functions, +}; pub use datetime_funcs::{ spark_date_add, spark_date_sub, SparkDateTrunc, SparkHour, SparkMinute, SparkSecond, TimestampTruncExpr, diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 0312cdb0b0..40d86724f4 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -18,6 +18,7 @@ use arrow::array::{Array, ArrowNativeTypeOp, PrimitiveArray, PrimitiveBuilder}; use arrow::array::{ArrayRef, AsArray}; +use crate::EvalMode; use arrow::datatypes::{ArrowPrimitiveType, DataType, Int32Type, Int64Type}; use datafusion::common::DataFusionError; use datafusion::physical_plan::ColumnarValue; @@ -27,6 +28,7 @@ pub fn try_arithmetic_kernel( left: &PrimitiveArray, right: &PrimitiveArray, op: &str, + is_ansi_mode: bool, ) -> Result where T: ArrowPrimitiveType, @@ -39,7 +41,17 @@ where if left.is_null(i) || right.is_null(i) { builder.append_null(); } else { - builder.append_option(left.value(i).add_checked(right.value(i)).ok()); + match left.value(i).add_checked(right.value(i)) { + Ok(v) => builder.append_value(v), + Err(_e) => { + if is_ansi_mode { + return Err(DataFusionError::Internal(format!( + "ARITHMETIC OVERFLOW : {}", + op + ))); + } + } + } } } } @@ -48,7 +60,17 @@ where if left.is_null(i) || right.is_null(i) { builder.append_null(); } else { - builder.append_option(left.value(i).sub_checked(right.value(i)).ok()); + match left.value(i).sub_checked(right.value(i)) { + Ok(v) => builder.append_value(v), + Err(_e) => { + if is_ansi_mode { + return Err(DataFusionError::Internal(format!( + "ARITHMETIC OVERFLOW : {}", + op + ))); + } + } + } } } } @@ -57,7 +79,17 @@ where if left.is_null(i) || right.is_null(i) { builder.append_null(); } else { - builder.append_option(left.value(i).mul_checked(right.value(i)).ok()); + match left.value(i).mul_checked(right.value(i)) { + Ok(v) => builder.append_value(v), + Err(_e) => { + if is_ansi_mode { + return Err(DataFusionError::Internal(format!( + "ARITHMETIC OVERFLOW : {}", + op + ))); + } + } + } } } } @@ -66,7 +98,17 @@ where if left.is_null(i) || right.is_null(i) { builder.append_null(); } else { - builder.append_option(left.value(i).div_checked(right.value(i)).ok()); + match left.value(i).div_checked(right.value(i)) { + Ok(v) => builder.append_value(v), + Err(_e) => { + if is_ansi_mode { + return Err(DataFusionError::Internal(format!( + "ARITHMETIC OVERFLOW : {}", + op + ))); + } + } + } } } } @@ -84,39 +126,55 @@ where pub fn checked_add( args: &[ColumnarValue], data_type: &DataType, + eval_mode: EvalMode, ) -> Result { - checked_arithmetic_internal(args, data_type, "checked_add") + checked_arithmetic_internal(args, data_type, "checked_add", eval_mode) } pub fn checked_sub( args: &[ColumnarValue], data_type: &DataType, + eval_mode: EvalMode, ) -> Result { - checked_arithmetic_internal(args, data_type, "checked_sub") + checked_arithmetic_internal(args, data_type, "checked_sub", eval_mode) } pub fn checked_mul( args: &[ColumnarValue], data_type: &DataType, + eval_mode: EvalMode, ) -> Result { - checked_arithmetic_internal(args, data_type, "checked_mul") + checked_arithmetic_internal(args, data_type, "checked_mul", eval_mode) } pub fn checked_div( args: &[ColumnarValue], data_type: &DataType, + eval_mode: EvalMode, ) -> Result { - checked_arithmetic_internal(args, data_type, "checked_div") + checked_arithmetic_internal(args, data_type, "checked_div", eval_mode) } fn checked_arithmetic_internal( args: &[ColumnarValue], data_type: &DataType, op: &str, + eval_mode: EvalMode, ) -> Result { let left = &args[0]; let right = &args[1]; + let is_ansi_mode = match eval_mode { + EvalMode::Try => false, + EvalMode::Ansi => true, + _ => { + return Err(DataFusionError::Internal(format!( + "Unsupported mode : {:?}", + eval_mode + ))) + } + }; + let (left_arr, right_arr): (ArrayRef, ArrayRef) = match (left, right) { (ColumnarValue::Array(l), ColumnarValue::Array(r)) => (Arc::clone(l), Arc::clone(r)), (ColumnarValue::Scalar(l), ColumnarValue::Array(r)) => { @@ -134,11 +192,13 @@ fn checked_arithmetic_internal( left_arr.as_primitive::(), right_arr.as_primitive::(), op, + is_ansi_mode, ), DataType::Int64 => try_arithmetic_kernel::( left_arr.as_primitive::(), right_arr.as_primitive::(), op, + is_ansi_mode, ), _ => Err(DataFusionError::Internal(format!( "Unsupported data type: {:?}", From d4d125a08b2019c3c616caebf150b944f918d2d4 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Wed, 13 Aug 2025 17:46:33 -0700 Subject: [PATCH 07/40] init_ansi_mode_enabled_add_tests --- native/core/src/execution/planner.rs | 2 -- native/spark-expr/src/comet_scalar_funcs.rs | 4 ++-- .../src/math_funcs/checked_arithmetic.rs | 21 +++++-------------- 3 files changed, 7 insertions(+), 20 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 98d121e5af..85e14265e5 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -242,8 +242,6 @@ impl PhysicalPlanner { ) -> Result, ExecutionError> { match spark_expr.expr_struct.as_ref().unwrap() { ExprStruct::Add(expr) => { - // TODO respect ANSI eval mode - // https://github.com/apache/datafusion-comet/issues/536 let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index 251a02888e..f96ddffce9 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -75,6 +75,7 @@ macro_rules! make_comet_scalar_udf { }}; } +/// Create a physical scalar function. pub fn create_comet_physical_fun( fun_name: &str, data_type: DataType, @@ -90,7 +91,7 @@ pub fn create_comet_physical_fun( ) } -/// Create a physical scalar function. +/// Create a physical scalar function with eval mode. Goal is to deprecate above function once all the operators have ANSI support pub fn create_comet_physical_fun_with_eval_mode( fun_name: &str, data_type: DataType, @@ -98,7 +99,6 @@ pub fn create_comet_physical_fun_with_eval_mode( fail_on_error: Option, eval_mode: EvalMode, ) -> Result, DataFusionError> { - println!("creating spark physical plan with eval mode ANSI"); match fun_name { "ceil" => { make_comet_scalar_udf!("ceil", spark_ceil, data_type) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 40d86724f4..661c0c91fe 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -35,6 +35,7 @@ where { let len = left.len(); let mut builder = PrimitiveBuilder::::with_capacity(len); + let error_msg = format!("{} : [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_{}' to tolerate overflow and return NULL instead", op, op.split("_").last().unwrap()); match op { "checked_add" => { for i in 0..len { @@ -45,10 +46,7 @@ where Ok(v) => builder.append_value(v), Err(_e) => { if is_ansi_mode { - return Err(DataFusionError::Internal(format!( - "ARITHMETIC OVERFLOW : {}", - op - ))); + return Err(DataFusionError::Internal(error_msg)); } } } @@ -64,10 +62,7 @@ where Ok(v) => builder.append_value(v), Err(_e) => { if is_ansi_mode { - return Err(DataFusionError::Internal(format!( - "ARITHMETIC OVERFLOW : {}", - op - ))); + return Err(DataFusionError::Internal(error_msg)); } } } @@ -83,10 +78,7 @@ where Ok(v) => builder.append_value(v), Err(_e) => { if is_ansi_mode { - return Err(DataFusionError::Internal(format!( - "ARITHMETIC OVERFLOW : {}", - op - ))); + return Err(DataFusionError::Internal(error_msg)); } } } @@ -102,10 +94,7 @@ where Ok(v) => builder.append_value(v), Err(_e) => { if is_ansi_mode { - return Err(DataFusionError::Internal(format!( - "ARITHMETIC OVERFLOW : {}", - op - ))); + return Err(DataFusionError::Internal(error_msg)); } } } From 0d36d606e62ccff52c6157529d94dc40c1bd892a Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 14 Aug 2025 11:16:17 -0700 Subject: [PATCH 08/40] init_ansi_mode_enabled_add_tests --- native/core/src/execution/planner.rs | 1 + native/spark-expr/src/math_funcs/checked_arithmetic.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 85e14265e5..313acd9b95 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -281,6 +281,7 @@ impl PhysicalPlanner { ExprStruct::Divide(expr) => { // TODO respect ANSI eval mode // https://github.com/apache/datafusion-comet/issues/533 + print!("divide op !!!"); let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 661c0c91fe..54e916e702 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -163,6 +163,7 @@ fn checked_arithmetic_internal( ))) } }; + println!("Eval mode {:?}", eval_mode); let (left_arr, right_arr): (ArrayRef, ArrayRef) = match (left, right) { (ColumnarValue::Array(l), ColumnarValue::Array(r)) => (Arc::clone(l), Arc::clone(r)), From 46790d3d374d3bccafac1a5b93227df15df41c3c Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 14 Aug 2025 14:20:18 -0700 Subject: [PATCH 09/40] init_ansi_mode_enabled_add_tests --- native/core/src/execution/planner.rs | 11 +-- .../src/math_funcs/checked_arithmetic.rs | 14 ++- .../org/apache/comet/serde/arithmetic.scala | 2 +- .../apache/comet/CometExpressionSuite.scala | 98 +++++++++++++++++-- 4 files changed, 104 insertions(+), 21 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 313acd9b95..af3439d2c8 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -253,8 +253,6 @@ impl PhysicalPlanner { ) } ExprStruct::Subtract(expr) => { - // TODO respect ANSI eval mode - // https://github.com/apache/datafusion-comet/issues/535 let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -266,8 +264,6 @@ impl PhysicalPlanner { ) } ExprStruct::Multiply(expr) => { - // TODO respect ANSI eval mode - // https://github.com/apache/datafusion-comet/issues/534 let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -279,9 +275,6 @@ impl PhysicalPlanner { ) } ExprStruct::Divide(expr) => { - // TODO respect ANSI eval mode - // https://github.com/apache/datafusion-comet/issues/533 - print!("divide op !!!"); let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr( expr.left.as_ref().unwrap(), @@ -293,8 +286,6 @@ impl PhysicalPlanner { ) } ExprStruct::IntegralDivide(expr) => { - // TODO respect eval mode - // https://github.com/apache/datafusion-comet/issues/533 let eval_mode = from_protobuf_eval_mode(expr.eval_mode)?; self.create_binary_expr_with_options( expr.left.as_ref().unwrap(), @@ -1007,7 +998,7 @@ impl PhysicalPlanner { } _ => { let data_type = return_type.map(to_arrow_datatype).unwrap(); - if [EvalMode::Try, EvalMode::Ansi].contains(&eval_mode) && data_type.is_integer() { + if [EvalMode::Try, EvalMode::Ansi].contains(&eval_mode) && data_type.is_numeric() { let op_str = match op { DataFusionOperator::Plus => "checked_add", DataFusionOperator::Minus => "checked_sub", diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 54e916e702..0163f61de4 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -19,7 +19,7 @@ use arrow::array::{Array, ArrowNativeTypeOp, PrimitiveArray, PrimitiveBuilder}; use arrow::array::{ArrayRef, AsArray}; use crate::EvalMode; -use arrow::datatypes::{ArrowPrimitiveType, DataType, Int32Type, Int64Type}; +use arrow::datatypes::{ArrowPrimitiveType, DataType, Float32Type, Float64Type, Int32Type, Int64Type}; use datafusion::common::DataFusionError; use datafusion::physical_plan::ColumnarValue; use std::sync::Arc; @@ -190,6 +190,18 @@ fn checked_arithmetic_internal( op, is_ansi_mode, ), + DataType::Float32 =>try_arithmetic_kernel::( + left_arr.as_primitive::(), + right_arr.as_primitive::(), + op, + is_ansi_mode, + ), + DataType::Float64 =>try_arithmetic_kernel::( + left_arr.as_primitive::(), + right_arr.as_primitive::(), + op, + is_ansi_mode, + ), _ => Err(DataFusionError::Internal(format!( "Unsupported data type: {:?}", data_type diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index 0f1eeb758a..9a35bff437 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -192,7 +192,7 @@ object CometDivide extends CometExpressionSerde[Divide] with MathBase { // Datafusion now throws an exception for dividing by zero // See https://github.com/apache/arrow-datafusion/pull/6792 // For now, use NullIf to swap zeros with nulls. - val rightExpr = nullIfWhenPrimitive(expr.right) + val rightExpr = if (expr.evalMode != EvalMode.ANSI) nullIfWhenPrimitive(expr.right) else expr.right if (!supportedDataType(expr.left.dataType)) { withInfo(expr, s"Unsupported datatype ${expr.left.dataType}") return None diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index c045007225..31f1b070b9 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.sql.types._ -import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus +import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -324,7 +324,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { | try_add(NULL, 5), | try_add(5, NULL), | try_add(9223372036854775807, 1), - | try_add(-9223372036854775808, -1) + | try_add(-9223372036854775808, -1), | from tbl | """.stripMargin)) } @@ -394,16 +394,96 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("test coalesce lazy eval") { + test("ANSI support for add") { + assume(isSpark35Plus) + val data = Seq((Integer.MAX_VALUE, 1), (Integer.MIN_VALUE, -1)) withSQLConf( SQLConf.ANSI_ENABLED.key -> "true", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { - val data = Seq((9999999999999L, 0)) - withParquetTable(data, "t1") { + CometConf.COMET_ANSI_MODE_ENABLED.key -> "true") { + withParquetTable(data, "tbl") { + spark.table("tbl").printSchema() val res = spark.sql(""" - |SELECT coalesce(_1, CAST(_1 AS TINYINT)) from t1; - | """.stripMargin) - checkSparkAnswerAndOperator(res) + |SELECT + | _1 + _2 + | from tbl + | """.stripMargin) + + checkSparkMaybeThrows(res) match { + case (Some(sparkExc), Some(cometExc)) => + val cometErrorPattern = + """org.apache.comet.CometNativeException: checked_add : [ARITHMETIC_OVERFLOW] integer overflow""" + assert(cometExc.getMessage.contains(cometErrorPattern)) + assert(sparkExc.getMessage.contains("overflow")) + case _ => fail("Exception should be thrown") + } + } + } + } + + test("ANSI support for subtract") { + val data = Seq((Integer.MIN_VALUE, 1)) + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true", "spark.comet.ansi.enabled" -> "true") { + withParquetTable(data, "tbl") { + val res = spark.sql(""" + |SELECT + | _1 - _2 + | from tbl + | """.stripMargin) + + checkSparkMaybeThrows(res) match { + case (Some(sparkExc), Some(cometExc)) => + val cometErrorPattern = + """org.apache.comet.CometNativeException: checked_sub : [ARITHMETIC_OVERFLOW] integer overflow""" + assert(cometExc.getMessage.contains(cometErrorPattern)) + assert(sparkExc.getMessage.contains("overflow")) + case _ => fail("Exception should be thrown") + } + } + } + } + + test("ANSI support for multiply") { + val data = Seq((Integer.MAX_VALUE, 10)) + withSQLConf( + SQLConf.ANSI_ENABLED.key -> "true", + CometConf.COMET_ANSI_MODE_ENABLED.key -> "true") { + withParquetTable(data, "tbl") { + val res = spark.sql(""" + |SELECT + | _1 * _2 + | from tbl + | """.stripMargin) + + checkSparkMaybeThrows(res) match { + case (Some(sparkExc), Some(cometExc)) => + val cometErrorPattern = + """org.apache.comet.CometNativeException: checked_mul : [ARITHMETIC_OVERFLOW] integer overflow""" + assert(cometExc.getMessage.contains(cometErrorPattern)) + assert(sparkExc.getMessage.contains("overflow")) + case _ => fail("Exception should be thrown") + } + } + } + } + + test("ANSI support for divide") { + val data = Seq((Integer.MIN_VALUE, 0)) + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true",CometConf.COMET_ANSI_MODE_ENABLED.key -> "true", + "spark.comet.explainFallback.enabled" -> "true") { + withParquetTable(data, "tbl") { + val res = spark.sql(""" + |SELECT + | _1 / _2 + | from tbl + | """.stripMargin) + checkSparkMaybeThrows(res) match { + case (Some(sparkExc), Some(cometExc)) => + val cometErrorPattern = + """org.apache.comet.CometNativeException: checked_div : [ARITHMETIC_OVERFLOW] integer overflow""" + assert(cometExc.getMessage.contains(cometErrorPattern)) + assert(sparkExc.getMessage.contains("Division by zero")) + case _ => fail("Exception should be thrown") + } } } } From 0e95fa16e30cc4f3ba2ec48a7ad5a38e2ad9da02 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 14 Aug 2025 14:29:06 -0700 Subject: [PATCH 10/40] init_ansi_mode_enabled_add_tests --- native/spark-expr/src/math_funcs/checked_arithmetic.rs | 8 +++++--- .../main/scala/org/apache/comet/serde/arithmetic.scala | 3 ++- .../scala/org/apache/comet/CometExpressionSuite.scala | 4 +++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 0163f61de4..73e3f430ea 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -19,7 +19,9 @@ use arrow::array::{Array, ArrowNativeTypeOp, PrimitiveArray, PrimitiveBuilder}; use arrow::array::{ArrayRef, AsArray}; use crate::EvalMode; -use arrow::datatypes::{ArrowPrimitiveType, DataType, Float32Type, Float64Type, Int32Type, Int64Type}; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Float32Type, Float64Type, Int32Type, Int64Type, +}; use datafusion::common::DataFusionError; use datafusion::physical_plan::ColumnarValue; use std::sync::Arc; @@ -190,13 +192,13 @@ fn checked_arithmetic_internal( op, is_ansi_mode, ), - DataType::Float32 =>try_arithmetic_kernel::( + DataType::Float32 => try_arithmetic_kernel::( left_arr.as_primitive::(), right_arr.as_primitive::(), op, is_ansi_mode, ), - DataType::Float64 =>try_arithmetic_kernel::( + DataType::Float64 => try_arithmetic_kernel::( left_arr.as_primitive::(), right_arr.as_primitive::(), op, diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index 9a35bff437..30549e3b6c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -192,7 +192,8 @@ object CometDivide extends CometExpressionSerde[Divide] with MathBase { // Datafusion now throws an exception for dividing by zero // See https://github.com/apache/arrow-datafusion/pull/6792 // For now, use NullIf to swap zeros with nulls. - val rightExpr = if (expr.evalMode != EvalMode.ANSI) nullIfWhenPrimitive(expr.right) else expr.right + val rightExpr = + if (expr.evalMode != EvalMode.ANSI) nullIfWhenPrimitive(expr.right) else expr.right if (!supportedDataType(expr.left.dataType)) { withInfo(expr, s"Unsupported datatype ${expr.left.dataType}") return None diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 31f1b070b9..11025c2710 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -468,7 +468,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("ANSI support for divide") { val data = Seq((Integer.MIN_VALUE, 0)) - withSQLConf(SQLConf.ANSI_ENABLED.key -> "true",CometConf.COMET_ANSI_MODE_ENABLED.key -> "true", + withSQLConf( + SQLConf.ANSI_ENABLED.key -> "true", + CometConf.COMET_ANSI_MODE_ENABLED.key -> "true", "spark.comet.explainFallback.enabled" -> "true") { withParquetTable(data, "tbl") { val res = spark.sql(""" From 7e0603e58af1dec05c5bb300bd07ae4b2c61930b Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 14 Aug 2025 17:24:24 -0700 Subject: [PATCH 11/40] init_ansi_mode_enabled_add_tests --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 11025c2710..f19ff3e9ae 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -324,7 +324,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { | try_add(NULL, 5), | try_add(5, NULL), | try_add(9223372036854775807, 1), - | try_add(-9223372036854775808, -1), + | try_add(-9223372036854775808, -1) | from tbl | """.stripMargin)) } From 88f3745533dea2e097f272b6fff9f4a87c8e2e58 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 17 Aug 2025 00:04:15 -0700 Subject: [PATCH 12/40] init_ansi_mode_enabled_add_tests_exceptions --- .../src/math_funcs/checked_arithmetic.rs | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 73e3f430ea..02b4d9a289 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -18,7 +18,7 @@ use arrow::array::{Array, ArrowNativeTypeOp, PrimitiveArray, PrimitiveBuilder}; use arrow::array::{ArrayRef, AsArray}; -use crate::EvalMode; +use crate::{divide_by_zero_error, EvalMode, SparkError}; use arrow::datatypes::{ ArrowPrimitiveType, DataType, Float32Type, Float64Type, Int32Type, Int64Type, }; @@ -48,7 +48,12 @@ where Ok(v) => builder.append_value(v), Err(_e) => { if is_ansi_mode { - return Err(DataFusionError::Internal(error_msg)); + return Err(SparkError::ArithmeticOverflow { + from_type: String::from("Integer/Float"), + } + .into()); + } else { + builder.append_null(); } } } @@ -64,7 +69,12 @@ where Ok(v) => builder.append_value(v), Err(_e) => { if is_ansi_mode { - return Err(DataFusionError::Internal(error_msg)); + return Err(SparkError::ArithmeticOverflow { + from_type: String::from("Integer/Float"), + } + .into()); + } else { + builder.append_null(); } } } @@ -80,7 +90,12 @@ where Ok(v) => builder.append_value(v), Err(_e) => { if is_ansi_mode { - return Err(DataFusionError::Internal(error_msg)); + return Err(SparkError::ArithmeticOverflow { + from_type: String::from("Integer/Float"), + } + .into()); + } else { + builder.append_null(); } } } @@ -96,7 +111,16 @@ where Ok(v) => builder.append_value(v), Err(_e) => { if is_ansi_mode { - return Err(DataFusionError::Internal(error_msg)); + return if right.value(i).is_zero() { + Err(divide_by_zero_error().into()) + } else { + return Err(SparkError::ArithmeticOverflow { + from_type: String::from("Integer/Float"), + } + .into()); + }; + } else { + builder.append_null(); } } } @@ -165,7 +189,7 @@ fn checked_arithmetic_internal( ))) } }; - println!("Eval mode {:?}", eval_mode); + println!("Eval mode {:?} operation : {}", eval_mode, op); let (left_arr, right_arr): (ArrayRef, ArrayRef) = match (left, right) { (ColumnarValue::Array(l), ColumnarValue::Array(r)) => (Arc::clone(l), Arc::clone(r)), From b8f6f645306ee28c3418e9f1eb3c470f61fd0586 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Wed, 20 Aug 2025 10:27:28 -0700 Subject: [PATCH 13/40] disable_coalesce_ansi_mode --- .../src/math_funcs/checked_arithmetic.rs | 2 - .../apache/comet/CometExpressionSuite.scala | 61 ++----------------- 2 files changed, 4 insertions(+), 59 deletions(-) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index 02b4d9a289..bb93efe1c4 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -37,7 +37,6 @@ where { let len = left.len(); let mut builder = PrimitiveBuilder::::with_capacity(len); - let error_msg = format!("{} : [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_{}' to tolerate overflow and return NULL instead", op, op.split("_").last().unwrap()); match op { "checked_add" => { for i in 0..len { @@ -189,7 +188,6 @@ fn checked_arithmetic_internal( ))) } }; - println!("Eval mode {:?} operation : {}", eval_mode, op); let (left_arr, right_arr): (ArrayRef, ArrayRef) = match (left, right) { (ColumnarValue::Array(l), ColumnarValue::Array(r)) => (Arc::clone(l), Arc::clone(r)), diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index f19ff3e9ae..44a98780af 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.optimizer.SimplifyExtractValueOps import org.apache.spark.sql.comet.{CometColumnarToRowExec, CometProjectExec, CometWindowExec} -import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{InputAdapter, ProjectExec, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ @@ -503,13 +503,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } - test("Verify rpad expr support for second arg instead of just literal") { - val data = Seq(("IfIWasARoadIWouldBeBent", 10), ("తెలుగు", 2)) - withParquetTable(data, "t1") { - val res = sql("select rpad(_1,_2) , rpad(_1,2) from t1 order by _1") - checkSparkAnswerAndOperator(res) - } - } test("dictionary arithmetic") { // TODO: test ANSI mode @@ -1404,40 +1397,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("disable expression using dynamic config") { - def countSparkProjectExec(plan: SparkPlan) = { - plan.collect { case _: ProjectExec => - true - }.length - } - withParquetTable(Seq(0, 1, 2).map(n => (n, n)), "tbl") { - val sql = "select _1+_2 from tbl" - val (_, cometPlan) = checkSparkAnswer(sql) - assert(0 == countSparkProjectExec(cometPlan)) - withSQLConf(CometConf.getExprEnabledConfigKey("Add") -> "false") { - val (_, cometPlan) = checkSparkAnswer(sql) - assert(1 == countSparkProjectExec(cometPlan)) - } - } - } - - test("enable incompat expression using dynamic config") { - def countSparkProjectExec(plan: SparkPlan) = { - plan.collect { case _: ProjectExec => - true - }.length - } - withParquetTable(Seq(0, 1, 2).map(n => (n.toString, n.toString)), "tbl") { - val sql = "select initcap(_1) from tbl" - val (_, cometPlan) = checkSparkAnswer(sql) - assert(1 == countSparkProjectExec(cometPlan)) - withSQLConf(CometConf.getExprAllowIncompatConfigKey("InitCap") -> "true") { - val (_, cometPlan) = checkSparkAnswer(sql) - assert(0 == countSparkProjectExec(cometPlan)) - } - } - } - test("signum") { testDoubleScalarExpr("signum") } @@ -1818,16 +1777,14 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("DatePart functions: Year/Month/DayOfMonth/DayOfWeek/DayOfYear/WeekOfYear/Quarter") { + test("Year") { Seq(false, true).foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary.toString) { val table = "test" withTable(table) { sql(s"create table $table(col timestamp) using parquet") - sql(s"insert into $table values (now()), (timestamp('1900-01-01')), (null)") - checkSparkAnswerAndOperator( - "SELECT col, year(col), month(col), day(col), weekday(col), " + - s" dayofweek(col), dayofyear(col), weekofyear(col), quarter(col) FROM $table") + sql(s"insert into $table values (now()), (null)") + checkSparkAnswerAndOperator(s"SELECT year(col) FROM $table") } } } @@ -3092,14 +3049,4 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("test length function") { - withTable("t1") { - sql( - "create table t1 using parquet as select cast(id as string) as c1, cast(id as binary) as c2 from range(10)") - // FIXME: Change checkSparkAnswer to checkSparkAnswerAndOperator after resolving - // https://github.com/apache/datafusion-comet/issues/2348 - checkSparkAnswer("select length(c1), length(c2) AS x FROM t1 ORDER BY c1") - } - } - } From 4fce770c1f2ed07101fb730ea12815b4ad39d02d Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Wed, 20 Aug 2025 16:06:11 -0700 Subject: [PATCH 14/40] disable_coalesce_ansi_mode --- .../scala/org/apache/comet/CometExpressionSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 44a98780af..77ce37a6d5 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -411,7 +411,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => val cometErrorPattern = - """org.apache.comet.CometNativeException: checked_add : [ARITHMETIC_OVERFLOW] integer overflow""" + """org.apache.comet.CometNativeException: [ARITHMETIC_OVERFLOW] Integer/Float overflow. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error""" assert(cometExc.getMessage.contains(cometErrorPattern)) assert(sparkExc.getMessage.contains("overflow")) case _ => fail("Exception should be thrown") @@ -433,7 +433,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => val cometErrorPattern = - """org.apache.comet.CometNativeException: checked_sub : [ARITHMETIC_OVERFLOW] integer overflow""" + """org.apache.comet.CometNativeException: [ARITHMETIC_OVERFLOW] Integer/Float overflow. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error""" assert(cometExc.getMessage.contains(cometErrorPattern)) assert(sparkExc.getMessage.contains("overflow")) case _ => fail("Exception should be thrown") @@ -457,7 +457,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => val cometErrorPattern = - """org.apache.comet.CometNativeException: checked_mul : [ARITHMETIC_OVERFLOW] integer overflow""" + """org.apache.comet.CometNativeException: [ARITHMETIC_OVERFLOW] Integer/Float overflow. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error""" assert(cometExc.getMessage.contains(cometErrorPattern)) assert(sparkExc.getMessage.contains("overflow")) case _ => fail("Exception should be thrown") @@ -481,7 +481,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => val cometErrorPattern = - """org.apache.comet.CometNativeException: checked_div : [ARITHMETIC_OVERFLOW] integer overflow""" + """org.apache.comet.CometNativeException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead""" assert(cometExc.getMessage.contains(cometErrorPattern)) assert(sparkExc.getMessage.contains("Division by zero")) case _ => fail("Exception should be thrown") From 2b762fc54ce1b535ea8bdecd88aaa97f6f308f30 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sat, 23 Aug 2025 13:37:29 -0700 Subject: [PATCH 15/40] disable_coalesce_ansi_mode --- .../src/math_funcs/checked_arithmetic.rs | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/native/spark-expr/src/math_funcs/checked_arithmetic.rs b/native/spark-expr/src/math_funcs/checked_arithmetic.rs index bb93efe1c4..68baba6120 100644 --- a/native/spark-expr/src/math_funcs/checked_arithmetic.rs +++ b/native/spark-expr/src/math_funcs/checked_arithmetic.rs @@ -20,7 +20,8 @@ use arrow::array::{ArrayRef, AsArray}; use crate::{divide_by_zero_error, EvalMode, SparkError}; use arrow::datatypes::{ - ArrowPrimitiveType, DataType, Float32Type, Float64Type, Int32Type, Int64Type, + ArrowPrimitiveType, DataType, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, + Int64Type, Int8Type, }; use datafusion::common::DataFusionError; use datafusion::physical_plan::ColumnarValue; @@ -200,8 +201,20 @@ fn checked_arithmetic_internal( (ColumnarValue::Scalar(l), ColumnarValue::Scalar(r)) => (l.to_array()?, r.to_array()?), }; - // Rust only supports checked_arithmetic on Int32 and Int64 + // Rust only supports checked_arithmetic on numeric types let result_array = match data_type { + DataType::Int8 => try_arithmetic_kernel::( + left_arr.as_primitive::(), + right_arr.as_primitive::(), + op, + is_ansi_mode, + ), + DataType::Int16 => try_arithmetic_kernel::( + left_arr.as_primitive::(), + right_arr.as_primitive::(), + op, + is_ansi_mode, + ), DataType::Int32 => try_arithmetic_kernel::( left_arr.as_primitive::(), right_arr.as_primitive::(), @@ -214,6 +227,12 @@ fn checked_arithmetic_internal( op, is_ansi_mode, ), + DataType::Float16 => try_arithmetic_kernel::( + left_arr.as_primitive::(), + right_arr.as_primitive::(), + op, + is_ansi_mode, + ), DataType::Float32 => try_arithmetic_kernel::( left_arr.as_primitive::(), right_arr.as_primitive::(), From 332b40259ea1bfbd27619d9f2d0c6dd86543ba8a Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 24 Aug 2025 15:09:16 -0700 Subject: [PATCH 16/40] disable_coalesce_ansi_mode --- native/core/src/execution/planner.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index af3439d2c8..80a24c56a8 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -998,7 +998,9 @@ impl PhysicalPlanner { } _ => { let data_type = return_type.map(to_arrow_datatype).unwrap(); - if [EvalMode::Try, EvalMode::Ansi].contains(&eval_mode) && data_type.is_numeric() { + if [EvalMode::Try, EvalMode::Ansi].contains(&eval_mode) + && (data_type.is_floating() || data_type.is_integer()) + { let op_str = match op { DataFusionOperator::Plus => "checked_add", DataFusionOperator::Minus => "checked_sub", From 6897c53abb5429bb2d0d3284d6395220ef2b487b Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 31 Aug 2025 01:31:49 -0700 Subject: [PATCH 17/40] undo_golden_file_generation_changes --- .../apache/comet/serde/QueryPlanSerde.scala | 485 +++++++++++------- .../apache/comet/CometExpressionSuite.scala | 45 +- 2 files changed, 306 insertions(+), 224 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 1e0e97862d..421ddde7c9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -19,8 +19,8 @@ package org.apache.comet.serde +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer -import scala.jdk.CollectionConverters._ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, NormalizeNaNAndZero} import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils +import org.apache.spark.sql.catalyst.util.{CharVarcharCodegenUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues import org.apache.spark.sql.comet._ import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec @@ -43,9 +43,13 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashJoin, Sh import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +import com.google.protobuf.ByteString import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.{isCometScan, withInfo} +import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.expressions._ import org.apache.comet.objectstore.NativeConfig import org.apache.comet.serde.ExprOuterClass.{AggExpr, Expr, ScalarFunc} @@ -53,7 +57,7 @@ import org.apache.comet.serde.OperatorOuterClass.{AggregateMode => CometAggregat import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto} import org.apache.comet.serde.Types.{DataType => ProtoDataType} import org.apache.comet.serde.Types.DataType._ -import org.apache.comet.serde.literals.CometLiteral +import org.apache.comet.serde.Types.ListLiteral import org.apache.comet.shims.CometExprShim /** @@ -67,9 +71,20 @@ object QueryPlanSerde extends Logging with CometExprShim { private val opSerdeMap: Map[Class[_ <: SparkPlan], CometOperatorSerde[_]] = Map(classOf[ProjectExec] -> CometProject, classOf[SortExec] -> CometSort) - private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + /** + * Mapping of Spark expression class to Comet expression handler. + */ + private val exprSerdeMap: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + classOf[AttributeReference] -> CometAttributeReference, + classOf[Alias] -> CometAlias, + classOf[Add] -> CometAdd, + classOf[Subtract] -> CometSubtract, + classOf[Multiply] -> CometMultiply, + classOf[Divide] -> CometDivide, + classOf[IntegralDivide] -> CometIntegralDivide, + classOf[Remainder] -> CometRemainder, + classOf[Round] -> CometRound, classOf[ArrayAppend] -> CometArrayAppend, - // TODO ArrayCompact classOf[ArrayContains] -> CometArrayContains, classOf[ArrayDistinct] -> CometArrayDistinct, classOf[ArrayExcept] -> CometArrayExcept, @@ -83,186 +98,129 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[ArraysOverlap] -> CometArraysOverlap, classOf[ArrayUnion] -> CometArrayUnion, classOf[CreateArray] -> CometCreateArray, + classOf[GetArrayItem] -> CometGetArrayItem, classOf[ElementAt] -> CometElementAt, - classOf[Flatten] -> CometFlatten, - classOf[GetArrayItem] -> CometGetArrayItem) - - private val conditionalExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - Map(classOf[CaseWhen] -> CometCaseWhen, classOf[If] -> CometIf) - - private val predicateExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - classOf[And] -> CometAnd, - classOf[EqualTo] -> CometEqualTo, - classOf[EqualNullSafe] -> CometEqualNullSafe, - classOf[GreaterThan] -> CometGreaterThan, - classOf[GreaterThanOrEqual] -> CometGreaterThanOrEqual, - classOf[LessThan] -> CometLessThan, - classOf[LessThanOrEqual] -> CometLessThanOrEqual, - classOf[In] -> CometIn, - classOf[IsNotNull] -> CometIsNotNull, - classOf[IsNull] -> CometIsNull, - classOf[InSet] -> CometInSet, - classOf[Not] -> CometNot, - classOf[Or] -> CometOr) - - private val mathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + classOf[Ascii] -> CometScalarFunction("ascii"), + classOf[ConcatWs] -> CometScalarFunction("concat_ws"), + classOf[Chr] -> CometScalarFunction("char"), + classOf[InitCap] -> CometInitCap, + classOf[BitwiseCount] -> CometBitwiseCount, + classOf[BitwiseGet] -> CometBitwiseGet, + classOf[BitwiseNot] -> CometBitwiseNot, + classOf[BitwiseAnd] -> CometBitwiseAnd, + classOf[BitwiseOr] -> CometBitwiseOr, + classOf[BitwiseXor] -> CometBitwiseXor, + classOf[BitLength] -> CometScalarFunction("bit_length"), + classOf[FromUnixTime] -> CometFromUnixTime, + classOf[Length] -> CometScalarFunction("length"), classOf[Acos] -> CometScalarFunction("acos"), - classOf[Add] -> CometAdd, + classOf[Cos] -> CometScalarFunction("cos"), classOf[Asin] -> CometScalarFunction("asin"), + classOf[Sin] -> CometScalarFunction("sin"), classOf[Atan] -> CometScalarFunction("atan"), - classOf[Atan2] -> CometAtan2, - classOf[Ceil] -> CometCeil, - classOf[Cos] -> CometScalarFunction("cos"), - classOf[Divide] -> CometDivide, + classOf[Tan] -> CometScalarFunction("tan"), classOf[Exp] -> CometScalarFunction("exp"), classOf[Expm1] -> CometScalarFunction("expm1"), - classOf[Floor] -> CometFloor, - classOf[Hex] -> CometHex, - classOf[IntegralDivide] -> CometIntegralDivide, - classOf[IsNaN] -> CometIsNaN, - classOf[Log] -> CometLog, - classOf[Log2] -> CometLog2, - classOf[Log10] -> CometLog10, - classOf[Multiply] -> CometMultiply, - classOf[Pow] -> CometScalarFunction("pow"), - classOf[Rand] -> CometRand, - classOf[Randn] -> CometRandn, - classOf[Remainder] -> CometRemainder, - classOf[Round] -> CometRound, - classOf[Signum] -> CometScalarFunction("signum"), - classOf[Sin] -> CometScalarFunction("sin"), classOf[Sqrt] -> CometScalarFunction("sqrt"), - classOf[Subtract] -> CometSubtract, - classOf[Tan] -> CometScalarFunction("tan"), - classOf[UnaryMinus] -> CometUnaryMinus, - classOf[Unhex] -> CometUnhex) - - private val mapExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - classOf[GetMapValue] -> CometMapExtract, - classOf[MapKeys] -> CometMapKeys, - classOf[MapEntries] -> CometMapEntries, - classOf[MapValues] -> CometMapValues, - classOf[MapFromArrays] -> CometMapFromArrays) - - private val structExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - classOf[CreateNamedStruct] -> CometCreateNamedStruct, - classOf[GetArrayStructFields] -> CometGetArrayStructFields, - classOf[GetStructField] -> CometGetStructField, - classOf[StructsToJson] -> CometStructsToJson) - - private val hashExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( + classOf[Signum] -> CometScalarFunction("signum"), classOf[Md5] -> CometScalarFunction("md5"), - classOf[Murmur3Hash] -> CometMurmur3Hash, - classOf[Sha2] -> CometSha2, - classOf[XxHash64] -> CometXxHash64) - - private val stringExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - classOf[Ascii] -> CometScalarFunction("ascii"), - classOf[BitLength] -> CometScalarFunction("bit_length"), - classOf[Chr] -> CometScalarFunction("char"), - classOf[ConcatWs] -> CometScalarFunction("concat_ws"), - classOf[Contains] -> CometScalarFunction("contains"), - classOf[EndsWith] -> CometScalarFunction("ends_with"), - classOf[InitCap] -> CometInitCap, - classOf[Length] -> CometScalarFunction("length"), - classOf[Like] -> CometLike, - classOf[Lower] -> CometLower, - classOf[OctetLength] -> CometScalarFunction("octet_length"), - classOf[Reverse] -> CometScalarFunction("reverse"), - classOf[RLike] -> CometRLike, - classOf[StartsWith] -> CometScalarFunction("starts_with"), + classOf[ShiftLeft] -> CometShiftLeft, + classOf[ShiftRight] -> CometShiftRight, classOf[StringInstr] -> CometScalarFunction("instr"), classOf[StringRepeat] -> CometStringRepeat, classOf[StringReplace] -> CometScalarFunction("replace"), - classOf[StringRPad] -> CometStringRPad, - classOf[StringSpace] -> CometScalarFunction("string_space"), classOf[StringTranslate] -> CometScalarFunction("translate"), classOf[StringTrim] -> CometScalarFunction("trim"), - classOf[StringTrimBoth] -> CometScalarFunction("btrim"), classOf[StringTrimLeft] -> CometScalarFunction("ltrim"), classOf[StringTrimRight] -> CometScalarFunction("rtrim"), + classOf[StringTrimBoth] -> CometScalarFunction("btrim"), + classOf[Upper] -> CometUpper, + classOf[Lower] -> CometLower, + classOf[Murmur3Hash] -> CometMurmur3Hash, + classOf[XxHash64] -> CometXxHash64, + classOf[Sha2] -> CometSha2, + classOf[MapKeys] -> CometMapKeys, + classOf[MapEntries] -> CometMapEntries, + classOf[MapValues] -> CometMapValues, + classOf[MapFromArrays] -> CometMapFromArrays, + classOf[GetMapValue] -> CometMapExtract, + classOf[EqualTo] -> CometEqualTo, + classOf[EqualNullSafe] -> CometEqualNullSafe, + classOf[Not] -> CometNot, + classOf[And] -> CometAnd, + classOf[Or] -> CometOr, + classOf[GreaterThan] -> CometGreaterThan, + classOf[GreaterThanOrEqual] -> CometGreaterThanOrEqual, + classOf[LessThan] -> CometLessThan, + classOf[LessThanOrEqual] -> CometLessThanOrEqual, + classOf[IsNull] -> CometIsNull, + classOf[IsNotNull] -> CometIsNotNull, + classOf[IsNaN] -> CometIsNaN, + classOf[In] -> CometIn, + classOf[InSet] -> CometInSet, + classOf[Rand] -> CometRand, + classOf[Randn] -> CometRandn, + classOf[SparkPartitionID] -> CometSparkPartitionId, + classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId, + classOf[StringSpace] -> CometScalarFunction("string_space"), + classOf[StartsWith] -> CometScalarFunction("starts_with"), + classOf[EndsWith] -> CometScalarFunction("ends_with"), + classOf[Contains] -> CometScalarFunction("contains"), classOf[Substring] -> CometSubstring, - classOf[Upper] -> CometUpper) - - private val bitwiseExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - classOf[BitwiseAnd] -> CometBitwiseAnd, - classOf[BitwiseCount] -> CometBitwiseCount, - classOf[BitwiseGet] -> CometBitwiseGet, - classOf[BitwiseOr] -> CometBitwiseOr, - classOf[BitwiseNot] -> CometBitwiseNot, - classOf[BitwiseXor] -> CometBitwiseXor, - classOf[ShiftLeft] -> CometShiftLeft, - classOf[ShiftRight] -> CometShiftRight) - - private val temporalExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - classOf[DateAdd] -> CometDateAdd, - classOf[DateSub] -> CometDateSub, - classOf[FromUnixTime] -> CometFromUnixTime, + classOf[Like] -> CometLike, + classOf[RLike] -> CometRLike, + classOf[OctetLength] -> CometScalarFunction("octet_length"), + classOf[Reverse] -> CometScalarFunction("reverse"), + classOf[StringRPad] -> CometStringRPad, + classOf[Year] -> CometYear, classOf[Hour] -> CometHour, classOf[Minute] -> CometMinute, classOf[Second] -> CometSecond, + classOf[DateAdd] -> CometDateAdd, + classOf[DateSub] -> CometDateSub, classOf[TruncDate] -> CometTruncDate, classOf[TruncTimestamp] -> CometTruncTimestamp, - classOf[Year] -> CometYear, - classOf[Month] -> CometMonth, - classOf[DayOfMonth] -> CometDayOfMonth, - classOf[DayOfWeek] -> CometDayOfWeek, - classOf[WeekDay] -> CometWeekDay, - classOf[DayOfYear] -> CometDayOfYear, - classOf[WeekOfYear] -> CometWeekOfYear, - classOf[Quarter] -> CometQuarter) - - private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - classOf[Cast] -> CometCast) - - private val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map( - // TODO SortOrder (?) - // TODO PromotePrecision - // TODO CheckOverflow - // TODO KnownFloatingPointNormalized - // TODO ScalarSubquery - // TODO UnscaledValue - // TODO MakeDecimal - // TODO BloomFilterMightContain - // TODO RegExpReplace - classOf[Alias] -> CometAlias, - classOf[AttributeReference] -> CometAttributeReference, - classOf[Coalesce] -> CometCoalesce, - classOf[Literal] -> CometLiteral, - classOf[MonotonicallyIncreasingID] -> CometMonotonicallyIncreasingId, - classOf[SparkPartitionID] -> CometSparkPartitionId) - - /** - * Mapping of Spark expression class to Comet expression handler. - */ - private val exprSerdeMap: Map[Class[_ <: Expression], CometExpressionSerde[_]] = - mathExpressions ++ hashExpressions ++ stringExpressions ++ - conditionalExpressions ++ mapExpressions ++ predicateExpressions ++ - structExpressions ++ bitwiseExpressions ++ miscExpressions ++ arrayExpressions ++ - temporalExpressions ++ conversionExpressions + classOf[Cast] -> CometCast, + classOf[CreateNamedStruct] -> CometCreateNamedStruct, + classOf[GetStructField] -> CometGetStructField, + classOf[GetArrayStructFields] -> CometGetArrayStructFields, + classOf[StructsToJson] -> CometStructsToJson, + classOf[Flatten] -> CometFlatten, + classOf[Atan2] -> CometAtan2, + classOf[Ceil] -> CometCeil, + classOf[Floor] -> CometFloor, + classOf[Log] -> CometLog, + classOf[Log10] -> CometLog10, + classOf[Log2] -> CometLog2, + classOf[Hex] -> CometHex, + classOf[Unhex] -> CometUnhex, + classOf[Pow] -> CometScalarFunction[Pow]("pow"), + classOf[If] -> CometIf, + classOf[CaseWhen] -> CometCaseWhen, + classOf[Coalesce] -> CometCoalesce) /** * Mapping of Spark aggregate expression class to Comet expression handler. */ private val aggrSerdeMap: Map[Class[_], CometAggregateExpressionSerde[_]] = Map( + classOf[Sum] -> CometSum, classOf[Average] -> CometAverage, + classOf[Count] -> CometCount, + classOf[Min] -> CometMin, + classOf[Max] -> CometMax, + classOf[First] -> CometFirst, + classOf[Last] -> CometLast, classOf[BitAndAgg] -> CometBitAndAgg, classOf[BitOrAgg] -> CometBitOrAgg, classOf[BitXorAgg] -> CometBitXOrAgg, - classOf[BloomFilterAggregate] -> CometBloomFilterAggregate, - classOf[Corr] -> CometCorr, - classOf[Count] -> CometCount, - classOf[CovPopulation] -> CometCovPopulation, classOf[CovSample] -> CometCovSample, - classOf[First] -> CometFirst, - classOf[Last] -> CometLast, - classOf[Max] -> CometMax, - classOf[Min] -> CometMin, - classOf[StddevPop] -> CometStddevPop, - classOf[StddevSamp] -> CometStddevSamp, - classOf[Sum] -> CometSum, + classOf[CovPopulation] -> CometCovPopulation, + classOf[VarianceSamp] -> CometVarianceSamp, classOf[VariancePop] -> CometVariancePop, - classOf[VarianceSamp] -> CometVarianceSamp) + classOf[StddevSamp] -> CometStddevSamp, + classOf[StddevPop] -> CometStddevPop, + classOf[Corr] -> CometCorr, + classOf[BloomFilterAggregate] -> CometBloomFilterAggregate) def supportedDataType(dt: DataType, allowComplex: Boolean = false): Boolean = dt match { case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | @@ -637,24 +595,15 @@ object QueryPlanSerde extends Logging with CometExprShim { expr: Expression, inputs: Seq[Attribute], binding: Boolean): Option[Expr] = { - val conf = SQLConf.get + SQLConf.get def convert[T <: Expression](expr: T, handler: CometExpressionSerde[T]): Option[Expr] = { - val exprConfName = handler.getExprConfigName(expr) - if (!CometConf.isExprEnabled(exprConfName)) { - withInfo( - expr, - "Expression support is disabled. Set " + - s"${CometConf.getExprEnabledConfigKey(exprConfName)}=true to enable it.") - return None - } handler.getSupportLevel(expr) match { case Unsupported(notes) => withInfo(expr, notes.getOrElse("")) None case Incompatible(notes) => - val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName) - if (exprAllowIncompat || CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { + if (CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get()) { if (notes.isDefined) { logWarning( s"Comet supports $expr when ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true " + @@ -666,9 +615,8 @@ object QueryPlanSerde extends Logging with CometExprShim { withInfo( expr, s"$expr is not fully compatible with Spark$optionalNotes. To enable it anyway, " + - s"set ${CometConf.getExprAllowIncompatConfigKey(exprConfName)}=true, or set " + - s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true to enable all " + - s"incompatible expressions. ${CometConf.COMPAT_GUIDE}.") + s"set ${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. " + + s"${CometConf.COMPAT_GUIDE}.") None } case Compatible(notes) => @@ -686,10 +634,151 @@ object QueryPlanSerde extends Logging with CometExprShim { exprToProtoInternal(Literal(value, dataType), inputs, binding) case UnaryExpression(child) if expr.prettyName == "trycast" => - val timeZoneId = conf.sessionLocalTimeZone + val timeZoneId = SQLConf.get.sessionLocalTimeZone val cast = Cast(child, expr.dataType, Some(timeZoneId), EvalMode.TRY) convert(cast, CometCast) + case Literal(value, dataType) + if supportedDataType( + dataType, + allowComplex = value == null || + // Nested literal support for native reader + // can be tracked https://github.com/apache/datafusion-comet/issues/1937 + // now supports only Array of primitive + (Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, CometConf.SCAN_NATIVE_DATAFUSION) + .contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()) && dataType + .isInstanceOf[ArrayType]) && !isComplexType( + dataType.asInstanceOf[ArrayType].elementType)) => + val exprBuilder = LiteralOuterClass.Literal.newBuilder() + + if (value == null) { + exprBuilder.setIsNull(true) + } else { + exprBuilder.setIsNull(false) + dataType match { + case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean]) + case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte]) + case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short]) + case _: IntegerType | _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int]) + case _: LongType | _: TimestampType | _: TimestampNTZType => + exprBuilder.setLongVal(value.asInstanceOf[Long]) + case _: FloatType => exprBuilder.setFloatVal(value.asInstanceOf[Float]) + case _: DoubleType => exprBuilder.setDoubleVal(value.asInstanceOf[Double]) + case _: StringType => + exprBuilder.setStringVal(value.asInstanceOf[UTF8String].toString) + case _: DecimalType => + // Pass decimal literal as bytes. + val unscaled = value.asInstanceOf[Decimal].toBigDecimal.underlying.unscaledValue + exprBuilder.setDecimalVal( + com.google.protobuf.ByteString.copyFrom(unscaled.toByteArray)) + case _: BinaryType => + val byteStr = + com.google.protobuf.ByteString.copyFrom(value.asInstanceOf[Array[Byte]]) + exprBuilder.setBytesVal(byteStr) + case a: ArrayType => + val listLiteralBuilder = ListLiteral.newBuilder() + val array = value.asInstanceOf[GenericArrayData].array + a.elementType match { + case NullType => + array.foreach(_ => listLiteralBuilder.addNullMask(true)) + case BooleanType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Boolean] + listLiteralBuilder.addBooleanValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case ByteType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Integer] + listLiteralBuilder.addByteValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case ShortType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Short] + listLiteralBuilder.addShortValues( + if (casted != null) casted.intValue() + else null.asInstanceOf[java.lang.Integer]) + listLiteralBuilder.addNullMask(casted != null) + }) + case IntegerType | DateType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Integer] + listLiteralBuilder.addIntValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case LongType | TimestampType | TimestampNTZType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Long] + listLiteralBuilder.addLongValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case FloatType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Float] + listLiteralBuilder.addFloatValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case DoubleType => + array.foreach(v => { + val casted = v.asInstanceOf[java.lang.Double] + listLiteralBuilder.addDoubleValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case StringType => + array.foreach(v => { + val casted = v.asInstanceOf[org.apache.spark.unsafe.types.UTF8String] + listLiteralBuilder.addStringValues( + if (casted != null) casted.toString else "") + listLiteralBuilder.addNullMask(casted != null) + }) + case _: DecimalType => + array + .foreach(v => { + val casted = + v.asInstanceOf[Decimal] + listLiteralBuilder.addDecimalValues(if (casted != null) { + com.google.protobuf.ByteString + .copyFrom(casted.toBigDecimal.underlying.unscaledValue.toByteArray) + } else ByteString.EMPTY) + listLiteralBuilder.addNullMask(casted != null) + }) + case _: BinaryType => + array + .foreach(v => { + val casted = + v.asInstanceOf[Array[Byte]] + listLiteralBuilder.addBytesValues(if (casted != null) { + com.google.protobuf.ByteString.copyFrom(casted) + } else ByteString.EMPTY) + listLiteralBuilder.addNullMask(casted != null) + }) + } + exprBuilder.setListVal(listLiteralBuilder.build()) + exprBuilder.setDatatype(serializeDataType(dataType).get) + case dt => + logWarning(s"Unexpected datatype '$dt' for literal value '$value'") + } + } + + val dt = serializeDataType(dataType) + + if (dt.isDefined) { + exprBuilder.setDatatype(dt.get) + + Some( + ExprOuterClass.Expr + .newBuilder() + .setLiteral(exprBuilder) + .build()) + } else { + withInfo(expr, s"Unsupported datatype $dataType") + None + } + case Literal(_, dataType) if !supportedDataType(dataType) => + withInfo(expr, s"Unsupported datatype $dataType") + None + // ToPrettyString is new in Spark 3.5 case _ if expr.getClass.getSimpleName == "ToPrettyString" && expr @@ -784,6 +873,22 @@ object QueryPlanSerde extends Logging with CometExprShim { None } + // abs implementation is not correct + // https://github.com/apache/datafusion-comet/issues/666 +// case Abs(child, failOnErr) => +// val childExpr = exprToProtoInternal(child, inputs) +// if (childExpr.isDefined) { +// val evalModeStr = +// if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY +// val absBuilder = ExprOuterClass.Abs.newBuilder() +// absBuilder.setChild(childExpr.get) +// absBuilder.setEvalMode(evalModeStr) +// Some(Expr.newBuilder().setAbs(absBuilder).build()) +// } else { +// withInfo(expr, child) +// None +// } + case RegExpReplace(subject, pattern, replacement, startPosition) => if (!RegExp.isSupportedPattern(pattern.toString) && !CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { @@ -814,6 +919,22 @@ object QueryPlanSerde extends Logging with CometExprShim { None } + case UnaryMinus(child, failOnError) => + val childExpr = exprToProtoInternal(child, inputs, binding) + if (childExpr.isDefined) { + val builder = ExprOuterClass.UnaryMinus.newBuilder() + builder.setChild(childExpr.get) + builder.setFailOnError(failOnError) + Some( + ExprOuterClass.Expr + .newBuilder() + .setUnaryMinus(builder) + .build()) + } else { + withInfo(expr, child) + None + } + // With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called to pad spaces for // char types. // See https://github.com/apache/spark/pull/38151 @@ -905,9 +1026,6 @@ object QueryPlanSerde extends Logging with CometExprShim { } case af @ ArrayFilter(_, func) if func.children.head.isInstanceOf[IsNotNull] => convert(af, CometArrayCompact) - case l @ Length(child) if child.dataType == BinaryType => - withInfo(l, "Length on BinaryType is not supported") - None case expr => QueryPlanSerde.exprSerdeMap.get(expr.getClass) match { case Some(handler) => @@ -1312,20 +1430,12 @@ object QueryPlanSerde extends Logging with CometExprShim { return None } - val groupingExprsWithInput = - groupingExpressions.map(expr => expr.name -> exprToProto(expr, child.output)) - - val emptyExprs = groupingExprsWithInput.collect { - case (expr, proto) if proto.isEmpty => expr - } - - if (emptyExprs.nonEmpty) { - withInfo(op, s"Unsupported group expressions: ${emptyExprs.mkString(", ")}") + val groupingExprs = groupingExpressions.map(exprToProto(_, child.output)) + if (groupingExprs.exists(_.isEmpty)) { + withInfo(op, "Not all grouping expressions are supported") return None } - val groupingExprs = groupingExprsWithInput.map(_._2) - // In some of the cases, the aggregateExpressions could be empty. // For example, if the aggregate functions only have group by or if the aggregate // functions only have distinct aggregate functions: @@ -1875,17 +1985,6 @@ trait CometOperatorSerde[T <: SparkPlan] { */ trait CometExpressionSerde[T <: Expression] { - /** - * Get a short name for the expression that can be used as part of a config key related to the - * expression, such as enabling or disabling that expression. - * - * @param expr - * The Spark expression. - * @return - * Short name for the expression, defaulting to the Spark class name - */ - def getExprConfigName(expr: T): String = expr.getClass.getSimpleName - /** * Determine the support level of the expression based on its attributes. * diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 77ce37a6d5..f98d030750 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -398,8 +398,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { assume(isSpark35Plus) val data = Seq((Integer.MAX_VALUE, 1), (Integer.MIN_VALUE, -1)) withSQLConf( - SQLConf.ANSI_ENABLED.key -> "true", - CometConf.COMET_ANSI_MODE_ENABLED.key -> "true") { + SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { spark.table("tbl").printSchema() val res = spark.sql(""" @@ -445,8 +444,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("ANSI support for multiply") { val data = Seq((Integer.MAX_VALUE, 10)) withSQLConf( - SQLConf.ANSI_ENABLED.key -> "true", - CometConf.COMET_ANSI_MODE_ENABLED.key -> "true") { + SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { val res = spark.sql(""" |SELECT @@ -490,20 +488,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("test coalesce lazy eval") { - withSQLConf( - SQLConf.ANSI_ENABLED.key -> "true", - CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { - val data = Seq((9999999999999L, 0)) - withParquetTable(data, "t1") { - val res = spark.sql(""" - |SELECT coalesce(_1, CAST(_1 AS TINYINT)) from t1; - | """.stripMargin) - checkSparkAnswerAndOperator(res) - } - } - } - test("dictionary arithmetic") { // TODO: test ANSI mode withSQLConf(SQLConf.ANSI_ENABLED.key -> "false", "parquet.enable.dictionary" -> "true") { @@ -642,8 +626,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz") { withSQLConf( - SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -665,7 +648,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz to string") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -687,7 +670,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz to long, date") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -775,7 +758,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc with timestamp_ntz") { - withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -810,7 +793,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc with format array") { - withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val numRows = 1000 Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => @@ -1477,7 +1460,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq("true", "false").foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary, - CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { withParquetTable( (-5 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)), "tbl", @@ -1966,7 +1949,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq( ( s"SELECT cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as C from $table", - Set("Cast from CalendarIntervalType to StringType is not supported")), + Set("make_interval is not supported")), ( "SELECT " + "date_part('YEAR', make_interval(c0, c1, c0, c1, c0, c0, c2))" @@ -1985,8 +1968,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + s"(SELECT c1, cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as casted from $table) as B " + "where A.c1 = B.c1 ", Set( - "Cast from CalendarIntervalType to StringType is not supported", - "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled")), + "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled", + "make_interval is not supported")), (s"select * from $table LIMIT 10 OFFSET 3", Set("Comet shuffle is not enabled"))) .foreach(test => { val qry = test._1 @@ -2028,7 +2011,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, - CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col string, a int, b float) using parquet") @@ -2134,7 +2117,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, - CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col string, a int, b float) using parquet") @@ -2861,7 +2844,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // this test requires native_comet scan due to unsigned u8/u16 issue withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, - CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path1 = new Path(dir.toURI.toString, "test1.parquet") From 7a0e9f8bfe5c47f2cce86354fb865696dce60929 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Sun, 31 Aug 2025 15:49:08 -0700 Subject: [PATCH 18/40] lazy_coalesce_fallback_case_statement --- .../apache/comet/serde/QueryPlanSerde.scala | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 421ddde7c9..996c4321b4 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -919,6 +919,99 @@ object QueryPlanSerde extends Logging with CometExprShim { None } +<<<<<<< HEAD +======= +<<<<<<< HEAD +======= + case If(predicate, trueValue, falseValue) => + val predicateExpr = exprToProtoInternal(predicate, inputs, binding) + val trueExpr = exprToProtoInternal(trueValue, inputs, binding) + val falseExpr = exprToProtoInternal(falseValue, inputs, binding) + if (predicateExpr.isDefined && trueExpr.isDefined && falseExpr.isDefined) { + val builder = ExprOuterClass.IfExpr.newBuilder() + builder.setIfExpr(predicateExpr.get) + builder.setTrueExpr(trueExpr.get) + builder.setFalseExpr(falseExpr.get) + Some( + ExprOuterClass.Expr + .newBuilder() + .setIf(builder) + .build()) + } else { + withInfo(expr, predicate, trueValue, falseValue) + None + } + +// TODO : Remove this once ANSI mode is tested with coalesce lazy eval . + case c @ (CaseWhen(_, _) | Coalesce(_)) => + val (finalBranches, finalElse) = c match { + case CaseWhen(branches, elseValue) => + (branches, elseValue) + + case Coalesce(children) => + val branches = children.dropRight(1).map { child => + (IsNotNull(child), child) + } + val elseValue = Some(children.last) + (branches, elseValue) + } + + var allBranches: Seq[Expression] = Seq() + val whenSeq = finalBranches.map(elements => { + allBranches = allBranches :+ elements._1 + exprToProtoInternal(elements._1, inputs, binding) + }) + val thenSeq = finalBranches.map(elements => { + allBranches = allBranches :+ elements._2 + exprToProtoInternal(elements._2, inputs, binding) + }) + assert(whenSeq.length == thenSeq.length) + if (whenSeq.forall(_.isDefined) && thenSeq.forall(_.isDefined)) { + val builder = ExprOuterClass.CaseWhen.newBuilder() + builder.addAllWhen(whenSeq.map(_.get).asJava) + builder.addAllThen(thenSeq.map(_.get).asJava) + if (finalElse.isDefined) { + val elseValueExpr = + exprToProtoInternal(finalElse.get, inputs, binding) + if (elseValueExpr.isDefined) { + builder.setElseExpr(elseValueExpr.get) + } else { + withInfo(expr, finalElse.get) + return None + } + } + Some( + ExprOuterClass.Expr + .newBuilder() + .setCaseWhen(builder) + .build()) + } else { + withInfo(expr, allBranches: _*) + None + } + + case BitwiseAnd(left, right) => + createBinaryExpr( + expr, + left, + right, + inputs, + binding, + (builder, binaryExpr) => builder.setBitwiseAnd(binaryExpr)) + + case n @ Not(In(_, _)) => + CometNotIn.convert(n, inputs, binding) + + case Not(child) => + createUnaryExpr( + expr, + child, + inputs, + binding, + (builder, unaryExpr) => builder.setNot(unaryExpr)) + +>>>>>>> bbe63eb0 (undo_golden_file_generation_changes) +>>>>>>> 90860a97 (lazy_coalesce_fallback_case_statement) case UnaryMinus(child, failOnError) => val childExpr = exprToProtoInternal(child, inputs, binding) if (childExpr.isDefined) { From 219648edf7004eca6e8c2d2cbfc034d5dad7d318 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Tue, 2 Sep 2025 17:02:20 -0700 Subject: [PATCH 19/40] lazy_coalesce_fallback_case_statement_rebase --- .../apache/comet/serde/QueryPlanSerde.scala | 3 ++ .../apache/comet/CometExpressionSuite.scala | 31 +++++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 996c4321b4..c703f111a2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -919,6 +919,7 @@ object QueryPlanSerde extends Logging with CometExprShim { None } +<<<<<<< HEAD <<<<<<< HEAD ======= <<<<<<< HEAD @@ -1012,6 +1013,8 @@ object QueryPlanSerde extends Logging with CometExprShim { >>>>>>> bbe63eb0 (undo_golden_file_generation_changes) >>>>>>> 90860a97 (lazy_coalesce_fallback_case_statement) +======= +>>>>>>> b001f6b9 (lazy_coalesce_fallback_case_statement_rebase) case UnaryMinus(child, failOnError) => val childExpr = exprToProtoInternal(child, inputs, binding) if (childExpr.isDefined) { diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index f98d030750..2cfe7bc845 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -115,7 +115,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("Integral Division Overflow Handling Matches Spark Behavior") { withTable("t1") { - withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { val value = Long.MinValue sql("create table t1(c1 long, c2 short) using parquet") sql(s"insert into t1 values($value, -1)") @@ -397,8 +397,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("ANSI support for add") { assume(isSpark35Plus) val data = Seq((Integer.MAX_VALUE, 1), (Integer.MIN_VALUE, -1)) - withSQLConf( - SQLConf.ANSI_ENABLED.key -> "true") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { spark.table("tbl").printSchema() val res = spark.sql(""" @@ -443,8 +442,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("ANSI support for multiply") { val data = Seq((Integer.MAX_VALUE, 10)) - withSQLConf( - SQLConf.ANSI_ENABLED.key -> "true") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { val res = spark.sql(""" |SELECT @@ -488,6 +486,26 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } +<<<<<<< HEAD +======= + test("Verify coalesce performs lazy evaluation") { + val data = Seq((Integer.MIN_VALUE, 0)) + withSQLConf( + SQLConf.ANSI_ENABLED.key -> "true", + "spark.comet.explainFallback.enabled" -> "true") { + withParquetTable(data, "tbl") { + val res = spark.sql(""" + |SELECT + | coalesce(_1, _1/0) + | from tbl + | """.stripMargin) + + checkSparkAnswer(res) + } + } + } + +>>>>>>> b001f6b9 (lazy_coalesce_fallback_case_statement_rebase) test("dictionary arithmetic") { // TODO: test ANSI mode withSQLConf(SQLConf.ANSI_ENABLED.key -> "false", "parquet.enable.dictionary" -> "true") { @@ -626,7 +644,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz") { withSQLConf( - SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu") { + SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", + CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") From 82b654c4ae92b4ad4216ef66e8f637d64118c4e9 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Tue, 2 Sep 2025 17:23:02 -0700 Subject: [PATCH 20/40] lazy_coalesce_fallback_case_statement_rebase --- .../scala/org/apache/comet/CometExpressionSuite.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 2cfe7bc845..5732557417 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -489,18 +489,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { <<<<<<< HEAD ======= test("Verify coalesce performs lazy evaluation") { - val data = Seq((Integer.MIN_VALUE, 0)) - withSQLConf( - SQLConf.ANSI_ENABLED.key -> "true", - "spark.comet.explainFallback.enabled" -> "true") { + val data = Seq((Integer.MAX_VALUE, 9999999999999L)) + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { val res = spark.sql(""" |SELECT - | coalesce(_1, _1/0) + | coalesce(_1, CAST(_2 AS TINYINT)) | from tbl | """.stripMargin) - checkSparkAnswer(res) + checkSparkAnswerAndOperator(res) } } } From fc2a2172a0e1311b6d23b61059282da50eaafc3a Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Wed, 3 Sep 2025 17:12:37 -0700 Subject: [PATCH 21/40] rebase_main --- .../apache/comet/CometExpressionSuite.scala | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 5732557417..51330753ce 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -115,7 +115,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("Integral Division Overflow Handling Matches Spark Behavior") { withTable("t1") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val value = Long.MinValue sql("create table t1(c1 long, c2 short) using parquet") sql(s"insert into t1 values($value, -1)") @@ -409,7 +409,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => val cometErrorPattern = - """org.apache.comet.CometNativeException: [ARITHMETIC_OVERFLOW] Integer/Float overflow. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error""" + """org.apache.spark.SparkArithmeticException: [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_add' to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error""" assert(cometExc.getMessage.contains(cometErrorPattern)) assert(sparkExc.getMessage.contains("overflow")) case _ => fail("Exception should be thrown") @@ -427,11 +427,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { | _1 - _2 | from tbl | """.stripMargin) - checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => val cometErrorPattern = - """org.apache.comet.CometNativeException: [ARITHMETIC_OVERFLOW] Integer/Float overflow. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error""" + """org.apache.spark.SparkArithmeticException: [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_subtract' to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.""" assert(cometExc.getMessage.contains(cometErrorPattern)) assert(sparkExc.getMessage.contains("overflow")) case _ => fail("Exception should be thrown") @@ -453,7 +452,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => val cometErrorPattern = - """org.apache.comet.CometNativeException: [ARITHMETIC_OVERFLOW] Integer/Float overflow. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error""" + """org.apache.spark.SparkArithmeticException: [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_multiply' to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error""" assert(cometExc.getMessage.contains(cometErrorPattern)) assert(sparkExc.getMessage.contains("overflow")) case _ => fail("Exception should be thrown") @@ -477,7 +476,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => val cometErrorPattern = - """org.apache.comet.CometNativeException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead""" + """org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead""" assert(cometExc.getMessage.contains(cometErrorPattern)) assert(sparkExc.getMessage.contains("Division by zero")) case _ => fail("Exception should be thrown") @@ -643,7 +642,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -665,7 +664,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz to string") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -687,7 +686,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("cast timestamp and timestamp_ntz to long, date") { withSQLConf( SESSION_LOCAL_TIMEZONE.key -> "Asia/Kathmandu", - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -775,7 +774,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc with timestamp_ntz") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "timestamp_trunc.parquet") @@ -810,7 +809,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("date_trunc with format array") { - withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val numRows = 1000 Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => @@ -1477,7 +1476,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq("true", "false").foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { withParquetTable( (-5 until 5).map(i => (i.toDouble + 0.3, i.toDouble + 0.8)), "tbl", @@ -2028,7 +2027,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col string, a int, b float) using parquet") @@ -2134,7 +2133,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq(true, false).foreach { dictionary => withSQLConf( "parquet.enable.dictionary" -> dictionary.toString, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { val table = "test" withTable(table) { sql(s"create table $table(col string, a int, b float) using parquet") @@ -2861,7 +2860,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { // this test requires native_comet scan due to unsigned u8/u16 issue withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, - CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") { + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") { Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path1 = new Path(dir.toURI.toString, "test1.parquet") From 5255a7516fe927c95a8047b4ab3073f3a247e518 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 4 Sep 2025 11:39:45 -0700 Subject: [PATCH 22/40] rebase_main --- .../org/apache/comet/CometExpressionSuite.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 51330753ce..e6c2c3d18d 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -395,7 +395,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("ANSI support for add") { - assume(isSpark35Plus) + assume(isSpark40Plus) val data = Seq((Integer.MAX_VALUE, 1), (Integer.MIN_VALUE, -1)) withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { @@ -419,8 +419,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("ANSI support for subtract") { + assume(isSpark40Plus) val data = Seq((Integer.MIN_VALUE, 1)) - withSQLConf(SQLConf.ANSI_ENABLED.key -> "true", "spark.comet.ansi.enabled" -> "true") { + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { val res = spark.sql(""" |SELECT @@ -440,6 +441,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("ANSI support for multiply") { + assume(isSpark40Plus) val data = Seq((Integer.MAX_VALUE, 10)) withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { @@ -462,11 +464,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("ANSI support for divide") { + assume(isSpark40Plus) val data = Seq((Integer.MIN_VALUE, 0)) withSQLConf( +<<<<<<< HEAD SQLConf.ANSI_ENABLED.key -> "true", CometConf.COMET_ANSI_MODE_ENABLED.key -> "true", "spark.comet.explainFallback.enabled" -> "true") { +======= + SQLConf.ANSI_ENABLED.key -> "true") { +>>>>>>> c5726856 (rebase_main) withParquetTable(data, "tbl") { val res = spark.sql(""" |SELECT @@ -1965,7 +1972,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { Seq( ( s"SELECT cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as C from $table", - Set("make_interval is not supported")), + Set("Cast from CalendarIntervalType to StringType is not supported")), ( "SELECT " + "date_part('YEAR', make_interval(c0, c1, c0, c1, c0, c0, c2))" @@ -1984,8 +1991,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { + s"(SELECT c1, cast(make_interval(c0, c1, c0, c1, c0, c0, c2) as string) as casted from $table) as B " + "where A.c1 = B.c1 ", Set( - "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled", - "make_interval is not supported")), + "Cast from CalendarIntervalType to StringType is not supported", + "Comet shuffle is not enabled: spark.comet.exec.shuffle.enabled is not enabled")), (s"select * from $table LIMIT 10 OFFSET 3", Set("Comet shuffle is not enabled"))) .foreach(test => { val qry = test._1 From 44d1155c0bcd66745c5cd778f1d3a8c2fe3bbb3d Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Thu, 4 Sep 2025 12:01:19 -0700 Subject: [PATCH 23/40] rebase_main --- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index e6c2c3d18d..55fda20797 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.SESSION_LOCAL_TIMEZONE import org.apache.spark.sql.types._ -import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} +import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ @@ -466,6 +466,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("ANSI support for divide") { assume(isSpark40Plus) val data = Seq((Integer.MIN_VALUE, 0)) +<<<<<<< HEAD withSQLConf( <<<<<<< HEAD SQLConf.ANSI_ENABLED.key -> "true", @@ -474,6 +475,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { ======= SQLConf.ANSI_ENABLED.key -> "true") { >>>>>>> c5726856 (rebase_main) +======= + withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { +>>>>>>> f5152d33 (rebase_main) withParquetTable(data, "tbl") { val res = spark.sql(""" |SELECT From d8e30c6ab2f8a7d68d65e18395f35ea1775b0fe0 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Fri, 5 Sep 2025 00:45:19 -0700 Subject: [PATCH 24/40] rebase_main --- .../org/apache/comet/serde/arithmetic.scala | 30 ++++--------------- .../apache/comet/CometExpressionSuite.scala | 22 +++++--------- 2 files changed, 13 insertions(+), 39 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala index 30549e3b6c..76ed35f8ef 100644 --- a/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala +++ b/spark/src/main/scala/org/apache/comet/serde/arithmetic.scala @@ -88,11 +88,7 @@ trait MathBase { object CometAdd extends CometExpressionSerde[Add] with MathBase { override def getSupportLevel(expr: Add): SupportLevel = { - if (expr.evalMode == EvalMode.ANSI) { - Incompatible(Some("ANSI mode is not supported")) - } else { - Compatible(None) - } + Compatible(None) } override def convert( @@ -118,11 +114,7 @@ object CometAdd extends CometExpressionSerde[Add] with MathBase { object CometSubtract extends CometExpressionSerde[Subtract] with MathBase { override def getSupportLevel(expr: Subtract): SupportLevel = { - if (expr.evalMode == EvalMode.ANSI) { - Incompatible(Some("ANSI mode is not supported")) - } else { - Compatible(None) - } + Compatible(None) } override def convert( @@ -148,11 +140,7 @@ object CometSubtract extends CometExpressionSerde[Subtract] with MathBase { object CometMultiply extends CometExpressionSerde[Multiply] with MathBase { override def getSupportLevel(expr: Multiply): SupportLevel = { - if (expr.evalMode == EvalMode.ANSI) { - Incompatible(Some("ANSI mode is not supported")) - } else { - Compatible(None) - } + Compatible(None) } override def convert( @@ -178,11 +166,7 @@ object CometMultiply extends CometExpressionSerde[Multiply] with MathBase { object CometDivide extends CometExpressionSerde[Divide] with MathBase { override def getSupportLevel(expr: Divide): SupportLevel = { - if (expr.evalMode == EvalMode.ANSI) { - Incompatible(Some("ANSI mode is not supported")) - } else { - Compatible(None) - } + Compatible(None) } override def convert( @@ -213,11 +197,7 @@ object CometDivide extends CometExpressionSerde[Divide] with MathBase { object CometIntegralDivide extends CometExpressionSerde[IntegralDivide] with MathBase { override def getSupportLevel(expr: IntegralDivide): SupportLevel = { - if (expr.evalMode == EvalMode.ANSI) { - Incompatible(Some("ANSI mode is not supported")) - } else { - Compatible(None) - } + Compatible(None) } override def convert( diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 55fda20797..1aa1ae7677 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -46,6 +46,9 @@ import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { import testImplicits._ + val ARITHMETIC_OVERFLOW_EXCEPTION_MSG = + """org.apache.comet.CometNativeException: [ARITHMETIC_OVERFLOW] Integer/Float overflow. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.""" + override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { super.test(testName, testTags: _*) { @@ -395,7 +398,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("ANSI support for add") { - assume(isSpark40Plus) val data = Seq((Integer.MAX_VALUE, 1), (Integer.MIN_VALUE, -1)) withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { @@ -408,9 +410,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => - val cometErrorPattern = - """org.apache.spark.SparkArithmeticException: [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_add' to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error""" - assert(cometExc.getMessage.contains(cometErrorPattern)) + assert(cometExc.getMessage.contains(ARITHMETIC_OVERFLOW_EXCEPTION_MSG)) assert(sparkExc.getMessage.contains("overflow")) case _ => fail("Exception should be thrown") } @@ -419,7 +419,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("ANSI support for subtract") { - assume(isSpark40Plus) val data = Seq((Integer.MIN_VALUE, 1)) withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { @@ -430,9 +429,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { | """.stripMargin) checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => - val cometErrorPattern = - """org.apache.spark.SparkArithmeticException: [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_subtract' to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error.""" - assert(cometExc.getMessage.contains(cometErrorPattern)) + assert(cometExc.getMessage.contains(ARITHMETIC_OVERFLOW_EXCEPTION_MSG)) assert(sparkExc.getMessage.contains("overflow")) case _ => fail("Exception should be thrown") } @@ -441,7 +438,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("ANSI support for multiply") { - assume(isSpark40Plus) val data = Seq((Integer.MAX_VALUE, 10)) withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") { withParquetTable(data, "tbl") { @@ -453,9 +449,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => - val cometErrorPattern = - """org.apache.spark.SparkArithmeticException: [ARITHMETIC_OVERFLOW] integer overflow. Use 'try_multiply' to tolerate overflow and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error""" - assert(cometExc.getMessage.contains(cometErrorPattern)) + assert(cometExc.getMessage.contains(ARITHMETIC_OVERFLOW_EXCEPTION_MSG)) assert(sparkExc.getMessage.contains("overflow")) case _ => fail("Exception should be thrown") } @@ -464,7 +458,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("ANSI support for divide") { - assume(isSpark40Plus) val data = Seq((Integer.MIN_VALUE, 0)) <<<<<<< HEAD withSQLConf( @@ -484,10 +477,11 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { | _1 / _2 | from tbl | """.stripMargin) + checkSparkMaybeThrows(res) match { case (Some(sparkExc), Some(cometExc)) => val cometErrorPattern = - """org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead""" + """org.apache.comet.CometNativeException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead""" assert(cometExc.getMessage.contains(cometErrorPattern)) assert(sparkExc.getMessage.contains("Division by zero")) case _ => fail("Exception should be thrown") From 7d5b742c2ebcf9afbce561db9ec154aa88eb4352 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Fri, 5 Sep 2025 23:58:28 -0700 Subject: [PATCH 25/40] rebase_main --- dev/diffs/3.4.3.diff | 2991 +++++++++++++++++++++++++++++++++++++++++ dev/diffs/3.5.6.diff | 2997 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 5988 insertions(+) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 1c0ca867d6..b7951be96c 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -3002,6 +3002,2997 @@ index 07361cfdce9..97dab2a3506 100644 + } + } ++ conf ++ } ++ )) + + case class TestHiveVersion(hiveClient: HiveClient) + extends TestHiveContext(TestHive.sparkContext, hiveClient) +diff --git a/pom.xml b/pom.xml +index d354488..9c17449 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -148,6 +148,8 @@ + 0.10.0 + 2.5.1 + 2.0.8 ++ 3.4 ++ 0.10.0-SNAPSHOT + + 2.5.1 + 2.0.8 ++ 3.5 ++ 0.10.0-SNAPSHOT + - 2.5.1 - 2.0.8 -+ 3.5 -+ 0.10.0-SNAPSHOT -