diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/create_table.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/create_table.rs new file mode 100644 index 0000000000000..d7ef8469e9869 --- /dev/null +++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/create_table.rs @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::error::Result; +use crate::engines::datafusion::error::DFSqlLogicTestError; +use crate::engines::datafusion::util::LogicTestContextProvider; +use datafusion::datasource::MemTable; +use datafusion::prelude::SessionContext; +use datafusion_common::{DataFusionError, OwnedTableReference}; +use datafusion_sql::planner::{object_name_to_table_reference, SqlToRel}; +use sqllogictest::DBOutput; +use sqlparser::ast::{ColumnDef, ObjectName}; +use std::sync::Arc; + +pub async fn create_table( + ctx: &SessionContext, + name: ObjectName, + columns: Vec, + if_not_exists: bool, + or_replace: bool, +) -> Result { + let table_reference = object_name_to_table_reference(name)?; + let existing_table = ctx.table(&table_reference).await; + match (if_not_exists, or_replace, existing_table) { + (true, false, Ok(_)) => Ok(DBOutput::StatementComplete(0)), + (false, true, Ok(_)) => { + ctx.deregister_table(&table_reference)?; + create_new_table(ctx, table_reference, columns) + } + (true, true, Ok(_)) => { + Err(DFSqlLogicTestError::from(DataFusionError::Execution( + "'IF NOT EXISTS' cannot coexist with 'REPLACE'".to_string(), + ))) + } + (_, _, Err(_)) => create_new_table(ctx, table_reference, columns), + (false, false, Ok(_)) => { + Err(DFSqlLogicTestError::from(DataFusionError::Execution( + format!("Table '{table_reference}' already exists"), + ))) + } + } +} + +fn create_new_table( + ctx: &SessionContext, + table_reference: OwnedTableReference, + columns: Vec, +) -> Result { + let sql_to_rel = SqlToRel::new(&LogicTestContextProvider {}); + let schema = Arc::new(sql_to_rel.build_schema(columns)?); + let table_provider = Arc::new(MemTable::try_new(schema, vec![])?); + ctx.register_table(&table_reference, table_provider)?; + Ok(DBOutput::StatementComplete(0)) +} diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert/mod.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert.rs similarity index 98% rename from datafusion/core/tests/sqllogictests/src/engines/datafusion/insert/mod.rs rename to datafusion/core/tests/sqllogictests/src/engines/datafusion/insert.rs index ee18851c258cd..aa0d563e1b6e7 100644 --- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert/mod.rs +++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert.rs @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -mod util; - -use self::util::LogicTestContextProvider; use super::error::Result; +use crate::engines::datafusion::util::LogicTestContextProvider; use arrow::record_batch::RecordBatch; use datafusion::datasource::MemTable; use datafusion::prelude::SessionContext; diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs index e1fffd669ccb7..0e3d0165f550f 100644 --- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs +++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs @@ -21,15 +21,18 @@ use sqllogictest::DBOutput; use self::error::{DFSqlLogicTestError, Result}; use async_trait::async_trait; +use create_table::create_table; use datafusion::arrow::record_batch::RecordBatch; use datafusion::prelude::SessionContext; use datafusion_sql::parser::{DFParser, Statement}; use insert::insert; use sqlparser::ast::Statement as SQLStatement; +mod create_table; mod error; mod insert; mod normalize; +mod util; pub struct DataFusion { ctx: SessionContext, @@ -87,9 +90,28 @@ async fn run_query( let statement0 = statements.pop_front().expect("at least one SQL statement"); if let Statement::Statement(statement) = statement0 { let statement = *statement; - if matches!(&statement, SQLStatement::Insert { .. }) { - return insert(ctx, statement).await; - } + match statement { + SQLStatement::Insert { .. } => return insert(ctx, statement).await, + SQLStatement::CreateTable { + query, + constraints, + table_properties, + with_options, + name, + columns, + if_not_exists, + or_replace, + .. + } if query.is_none() + && constraints.is_empty() + && table_properties.is_empty() + && with_options.is_empty() => + { + return create_table(ctx, name, columns, if_not_exists, or_replace) + .await + } + _ => {} + }; } } let df = ctx.sql(sql.as_str()).await?; diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert/util.rs b/datafusion/core/tests/sqllogictests/src/engines/datafusion/util.rs similarity index 100% rename from datafusion/core/tests/sqllogictests/src/engines/datafusion/insert/util.rs rename to datafusion/core/tests/sqllogictests/src/engines/datafusion/util.rs diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt b/datafusion/core/tests/sqllogictests/test_files/ddl.slt index bdce635a43647..dae86b8bb99a3 100644 --- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt +++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt @@ -419,3 +419,49 @@ SELECT * FROM aggregate_simple order by c1 DESC LIMIT 1; statement ok DROP TABLE aggregate_simple + + +# Should create an empty table +statement ok +CREATE TABLE table_without_values(field1 BIGINT, field2 BIGINT); + + +# Should skip existing table +statement ok +CREATE TABLE IF NOT EXISTS table_without_values(field1 BIGINT, field2 BIGINT); + + +# Should not recreate the same table +statement error Table 'table_without_values' already exists +CREATE TABLE table_without_values(field1 BIGINT, field2 BIGINT); + + +# 'IF NOT EXISTS' cannot coexist with 'REPLACE' +statement error Execution error: 'IF NOT EXISTS' cannot coexist with 'REPLACE' +CREATE OR REPLACE TABLE IF NOT EXISTS table_without_values(field1 BIGINT, field2 BIGINT); + +# Should insert into an empty table +statement ok +insert into table_without_values values (1, 2), (2, 3), (2, 4); + +query II rowsort +select * from table_without_values; +---- +1 2 +2 3 +2 4 + + +# Should recreate existing table +statement ok +CREATE OR REPLACE TABLE table_without_values(field1 BIGINT, field2 BIGINT); + + +# Should insert into a recreated table +statement ok +insert into table_without_values values (10, 20); + +query II rowsort +select * from table_without_values; +---- +10 20 diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 4b93f26f901ec..5ccbbc09f7743 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -114,7 +114,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } - pub(crate) fn build_schema(&self, columns: Vec) -> Result { + pub fn build_schema(&self, columns: Vec) -> Result { let mut fields = Vec::with_capacity(columns.len()); for column in columns {