Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,8 @@ abstract class DeltaInsertIntoTests(
}
}

test("insertInto: Timestamp No Timezone round trips across timezones") {
// Cast from TIMESTAMP_NTZ to TIMESTAMP has not been supported.
ignore("insertInto: Timestamp No Timezone round trips across timezones") {
val t1 = "timestamp_ntz"
withTable(t1) {
withTimeZone("GMT-8") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object VeloxValidatorApi {
StringType | BinaryType | _: DecimalType | DateType | TimestampType |
YearMonthIntervalType.DEFAULT | NullType =>
true
case other if other.typeName == "timestamp_ntz" => true
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBas
case _: DoubleType =>
case _: StringType =>
case _: TimestampType =>
case other if other.typeName == "timestamp_ntz" =>
case _: DateType =>
case _: BinaryType =>
case _: DecimalType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl
}
}

test("fallback with index based schema evolution") {
testWithMinSparkVersion("fallback with index based schema evolution", "3.4") {
val query = "SELECT c2 FROM test"
Seq("parquet", "orc").foreach {
format =>
Expand All @@ -295,9 +295,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl
runQueryAndCompare(query) {
df =>
val plan = df.queryExecution.executedPlan
val fallback = parquetUseColumnNames == "false" ||
orcUseColumnNames == "false"
assert(collect(plan) { case g: GlutenPlan => g }.isEmpty == fallback)
assert(collect(plan) { case g: GlutenPlan => g }.nonEmpty)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.gluten.execution
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

import java.io.File

Expand Down Expand Up @@ -465,17 +466,17 @@ class VeloxParquetDataTypeValidationSuite extends VeloxWholeStageTransformerSuit
}
}

testWithMinSparkVersion("Fallback for TimestampNTZ type scan", "3.4") {
testWithMinSparkVersion("TimestampNTZ type scan", "3.4") {
withTempDir {
dir =>
val path = new File(dir, "ntz_data").toURI.getPath
val inputDf =
spark.sql("SELECT CAST('2024-01-01 00:00:00' AS TIMESTAMP_NTZ) AS ts_ntz")
inputDf.write.format("parquet").save(path)
val df = spark.read.format("parquet").load(path)
val df = spark.read.parquet(path)
val executedPlan = getExecutedPlan(df)
assert(!executedPlan.exists(plan => plan.isInstanceOf[BatchScanExecTransformer]))
checkAnswer(df, inputDf)
assert(executedPlan.exists(plan => plan.isInstanceOf[BatchScanExecTransformer]))
checkAnswer(df, Seq(Row(java.time.LocalDateTime.of(2024, 1, 1, 0, 0, 0, 0))))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.functions

import org.apache.gluten.execution.ProjectExecTransformer
import org.apache.gluten.execution.{BatchScanExecTransformer, ProjectExecTransformer}

import org.apache.spark.sql.execution.ProjectExec
import org.apache.spark.sql.types.Decimal
Expand Down Expand Up @@ -489,4 +489,44 @@ class DateFunctionsValidateSuite extends FunctionsValidateSuite {
}
}
}

testWithMinSparkVersion("read as timestamp_ntz", "3.4") {
val inputs: Seq[String] = Seq(
"1970-01-01",
"1970-01-01 00:00:00-02:00",
"1970-01-01 00:00:00 +02:00",
"2000-01-01",
"1970-01-01 00:00:00",
"2000-01-01 12:21:56",
"2015-03-18T12:03:17Z",
"2015-03-18 12:03:17",
"2015-03-18T12:03:17",
"2015-03-18 12:03:17.123",
"2015-03-18T12:03:17.123",
"2015-03-18T12:03:17.456",
"2015-03-18 12:03:17.456"
)

withTempPath {
dir =>
val path = dir.getAbsolutePath
val inputDF = spark.createDataset(inputs).toDF("input")
val df = inputDF.selectExpr("cast(input as timestamp_ntz) as ts")
df.coalesce(1).write.mode("overwrite").parquet(path)
val readDf = spark.read.parquet(path)
readDf.createOrReplaceTempView("view")

runQueryAndCompare("select * from view") {
checkGlutenPlan[BatchScanExecTransformer]
}

// Ensures the fallback of unsupported function works.
runQueryAndCompare("select hour(ts) from view") {
df =>
assert(collect(df.queryExecution.executedPlan) {
case p if p.isInstanceOf[ProjectExec] => p
}.nonEmpty)
}
}
}
}
8 changes: 5 additions & 3 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "jni/JniFileSystem.h"
#include "memory/GlutenBufferedInputBuilder.h"
#include "operators/functions/SparkExprToSubfieldFilterParser.h"
#include "operators/plannodes/RowVectorStream.h"
#include "shuffle/ArrowShuffleDictionaryWriter.h"
#include "udf/UdfLoader.h"
#include "utils/Exception.h"
Expand All @@ -47,7 +48,6 @@
#include "velox/connectors/hive/BufferedInputBuilder.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
Expand All @@ -56,6 +56,7 @@
#include "velox/dwio/orc/reader/OrcReader.h"
#include "velox/dwio/parquet/RegisterParquetReader.h"
#include "velox/dwio/parquet/RegisterParquetWriter.h"
#include "velox/functions/sparksql/types/TimestampNTZRegistration.h"
#include "velox/serializers/PrestoSerializer.h"

DECLARE_bool(velox_exception_user_stacktrace_enabled);
Expand Down Expand Up @@ -195,6 +196,7 @@ void VeloxBackend::init(
velox::orc::registerOrcReaderFactory();
velox::exec::ExprToSubfieldFilterParser::registerParser(std::make_unique<SparkExprToSubfieldFilterParser>());
velox::connector::hive::BufferedInputBuilder::registerBuilder(std::make_shared<GlutenBufferedInputBuilder>());
velox::functions::sparksql::registerTimestampNTZType();

// Register Velox functions
registerAllFunctions();
Expand Down Expand Up @@ -318,13 +320,13 @@ void VeloxBackend::initConnector(const std::shared_ptr<velox::config::ConfigBase
}
velox::connector::registerConnector(
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, hiveConf, ioExecutor_.get()));

// Register value-stream connector for runtime iterator-based inputs
auto valueStreamDynamicFilterEnabled =
backendConf_->get<bool>(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault);
velox::connector::registerConnector(
std::make_shared<ValueStreamConnector>(kIteratorConnectorId, hiveConf, valueStreamDynamicFilterEnabled));

#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
Expand Down
9 changes: 7 additions & 2 deletions cpp/velox/substrait/SubstraitParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

#include "SubstraitParser.h"
#include "TypeUtils.h"
#include "velox/common/base/Exceptions.h"

#include "VeloxSubstraitSignature.h"
#include "velox/common/base/Exceptions.h"
#include "velox/functions/sparksql/types/TimestampNTZType.h"

namespace gluten {

Expand Down Expand Up @@ -78,6 +78,8 @@ TypePtr SubstraitParser::parseType(const ::substrait::Type& substraitType, bool
return DATE();
case ::substrait::Type::KindCase::kTimestampTz:
return TIMESTAMP();
case ::substrait::Type::KindCase::kTimestamp:
return facebook::velox::functions::sparksql::TIMESTAMP_NTZ();
case ::substrait::Type::KindCase::kDecimal: {
auto precision = substraitType.decimal().precision();
auto scale = substraitType.decimal().scale();
Expand Down Expand Up @@ -356,6 +358,9 @@ int64_t SubstraitParser::getLiteralValue(const ::substrait::Expression::Literal&
memcpy(&decimalValue, decimal.c_str(), 16);
return static_cast<int64_t>(decimalValue);
}
if (literal.has_timestamp()) {
return literal.timestamp();
}
return literal.i64();
}

Expand Down
6 changes: 4 additions & 2 deletions cpp/velox/substrait/SubstraitToVeloxExpr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

#include "SubstraitToVeloxExpr.h"
#include "TypeUtils.h"
#include "velox/functions/sparksql/types/TimestampNTZType.h"
#include "velox/type/Timestamp.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/VariantToVector.h"

#include "velox/type/Timestamp.h"

using namespace facebook::velox;

namespace {
Expand Down Expand Up @@ -133,6 +133,8 @@ TypePtr getScalarType(const ::substrait::Expression::Literal& literal) {
return DATE();
case ::substrait::Expression_Literal::LiteralTypeCase::kTimestampTz:
return TIMESTAMP();
case ::substrait::Expression_Literal::LiteralTypeCase::kTimestamp:
return facebook::velox::functions::sparksql::TIMESTAMP_NTZ();
case ::substrait::Expression_Literal::LiteralTypeCase::kString:
return VARCHAR();
case ::substrait::Expression_Literal::LiteralTypeCase::kVarChar:
Expand Down
7 changes: 7 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
#include "operators/plannodes/RowVectorStream.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/exec/TableWriter.h"
#include "velox/functions/sparksql/types/TimestampNTZType.h"
#include "velox/type/Type.h"

#include "utils/ConfigExtractor.h"
#include "utils/ObjectStore.h"
#include "utils/VeloxArrowUtils.h"
#include "utils/VeloxWriterUtils.h"

#include "config.pb.h"
Expand Down Expand Up @@ -1497,6 +1499,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
// The columns present in the table, if not available default to the baseSchema.
auto tableSchema = splitInfo->tableSchema ? splitInfo->tableSchema : baseSchema;

// Spark's TimestampNTZ type is stored as TIMESTAMP in file.
if (tableSchema) {
tableSchema = asRowType(replaceTimestampNTZ(tableSchema, TIMESTAMP()));
}

connector::ConnectorTableHandlePtr tableHandle;
auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr;
auto connectorId = kHiveConnectorId;
Expand Down
38 changes: 37 additions & 1 deletion cpp/velox/utils/VeloxArrowUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,56 @@

#include "memory/VeloxColumnarBatch.h"
#include "utils/Common.h"
#include "velox/functions/sparksql/types/TimestampNTZType.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/arrow/Bridge.h"

namespace gluten {

using namespace facebook;

velox::TypePtr replaceTimestampNTZ(const velox::TypePtr& type, const velox::TypePtr& replacementType) {
if (velox::functions::sparksql::isTimestampNTZType(type)) {
return replacementType;
}

switch (type->kind()) {
case velox::TypeKind::ROW: {
auto rowType = velox::asRowType(type);
std::vector<std::string> names = rowType->names();
std::vector<velox::TypePtr> types;
for (size_t i = 0; i < rowType->size(); ++i) {
types.push_back(replaceTimestampNTZ(rowType->childAt(i), replacementType));
}
return ROW(std::move(names), std::move(types));
}
case velox::TypeKind::ARRAY: {
auto arrayType = std::dynamic_pointer_cast<const velox::ArrayType>(type);
auto rewrittenElement = replaceTimestampNTZ(arrayType->elementType(), replacementType);
return ARRAY(std::move(rewrittenElement));
}
case velox::TypeKind::MAP: {
auto mapType = std::dynamic_pointer_cast<const velox::MapType>(type);
auto rewrittenKey = replaceTimestampNTZ(mapType->keyType(), replacementType);
auto rewrittenValue = replaceTimestampNTZ(mapType->valueType(), replacementType);
return MAP(std::move(rewrittenKey), std::move(rewrittenValue));
}
default:
return type;
}
}

void toArrowSchema(const velox::TypePtr& rowType, facebook::velox::memory::MemoryPool* pool, struct ArrowSchema* out) {
exportToArrow(velox::BaseVector::create(rowType, 0, pool), *out, ArrowUtils::getBridgeOptions());
}

std::shared_ptr<arrow::Schema> toArrowSchema(const velox::TypePtr& rowType, facebook::velox::memory::MemoryPool* pool) {
ArrowSchema arrowSchema;
toArrowSchema(rowType, pool, &arrowSchema);
// Arrow does not provide a standard representation for TimestampNTZ type.
// Use BIGINT type to ensure the correct byte size.
velox::TypePtr tableSchema = replaceTimestampNTZ(rowType, velox::BIGINT());

toArrowSchema(tableSchema, pool, &arrowSchema);
GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&arrowSchema));
return outputSchema;
}
Expand Down
5 changes: 5 additions & 0 deletions cpp/velox/utils/VeloxArrowUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ std::shared_ptr<arrow::Schema> toArrowSchema(
const facebook::velox::TypePtr& rowType,
facebook::velox::memory::MemoryPool* pool);

// Rewrites TimestampNTZ to replacementType recursively in Row/Array/Map types.
facebook::velox::TypePtr replaceTimestampNTZ(
const facebook::velox::TypePtr& type,
const facebook::velox::TypePtr& replacementType);

facebook::velox::TypePtr fromArrowSchema(const std::shared_ptr<arrow::Schema>& schema);

arrow::Result<std::shared_ptr<arrow::Buffer>> toArrowBuffer(facebook::velox::BufferPtr buffer, arrow::MemoryPool* pool);
Expand Down
4 changes: 2 additions & 2 deletions ep/build-velox/src/get-velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
set -exu

CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
VELOX_REPO=https://github.com/IBM/velox.git
VELOX_BRANCH=dft-2026_03_24
VELOX_REPO=https://github.com/rui-mo/velox-dev.git
VELOX_BRANCH=ts_ntz_gluten
VELOX_ENHANCED_BRANCH=ibm-2026_03_24
VELOX_HOME=""
RUN_SETUP_SCRIPT=ON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ object SparkArrowUtil {
new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")
}
case dt if dt.catalogString == "timestamp_ntz" =>
new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
// TODO: There is no standard representation for this type in Arrow.
// Use bigint to ensure the correct byte size.
new ArrowType.Int(8 * 8, true)
case YearMonthIntervalType.DEFAULT =>
new ArrowType.Interval(IntervalUnit.YEAR_MONTH)
case _: ArrayType => ArrowType.List.INSTANCE
Expand All @@ -74,17 +76,6 @@ object SparkArrowUtil {
case ArrowType.Binary.INSTANCE => BinaryType
case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType
case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND && ts.getTimezone == null =>
// TimestampNTZType is only available in Spark 3.4+
try {
Class
.forName("org.apache.spark.sql.types.TimestampNTZType$")
.getField("MODULE$")
.get(null)
.asInstanceOf[DataType]
} catch {
case _: ClassNotFoundException => TimestampType
}
case _: ArrowType.Timestamp => TimestampType
case interval: ArrowType.Interval if interval.getUnit == IntervalUnit.YEAR_MONTH =>
YearMonthIntervalType.DEFAULT
Expand Down Expand Up @@ -168,8 +159,6 @@ object SparkArrowUtil {
}.asJava)
}

// TimestampNTZ is not supported for native computation, but the Arrow type mapping is needed
// for row-to-columnar transitions when the fallback validator tags NTZ operators.
def checkSchema(schema: StructType): Boolean = {
try {
SparkSchemaUtil.toArrowSchema(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,13 @@ abstract class DeltaSuite extends WholeStageTransformerSuite {

// TIMESTAMP_NTZ was introduced in Spark 3.4 / Delta 2.4
testWithMinSparkVersion(
"delta: create table with TIMESTAMP_NTZ should fallback and return correct results",
"delta: create table with TIMESTAMP_NTZ and return correct results",
"3.4") {
withTable("delta_ntz") {
spark.sql("CREATE TABLE delta_ntz(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING DELTA")
spark.sql("""INSERT INTO delta_ntz VALUES
|('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin)
val df = runQueryAndCompare("select * from delta_ntz", noFallBack = false) { _ => }
val df = runQueryAndCompare("select * from delta_ntz") { _ => }
checkAnswer(
df,
Row(
Expand Down
Loading
Loading