From 236990d1283ad9ded0c96ec4029c180b1e7fe473 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 26 Jul 2023 17:32:12 -0400 Subject: [PATCH 01/10] inner join_set --- .../core/src/datasource/physical_plan/json.rs | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index cbae85f6c8be2..5ef9aff7a828b 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -277,14 +277,42 @@ pub async fn plan_to_json( let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { let (_, mut multipart_writer) = storeref.put_multipart(&file).await?; - let mut buffer = Vec::with_capacity(1024); - while let Some(batch) = stream.next().await.transpose()? { - let mut writer = json::LineDelimitedWriter::new(buffer); - writer.write(&batch)?; - buffer = writer.into_inner(); - multipart_writer.write_all(&buffer).await?; - buffer.clear(); + + let mut inner_join_set = JoinSet::new(); + while let Some(batch) = stream.try_next().await?{ + inner_join_set.spawn(async move { + let buffer = Vec::with_capacity(1024); + let mut writer = json::LineDelimitedWriter::new(buffer); + writer.write(&batch)?; + let r: Result, DataFusionError> = Ok(writer.into_inner()); + r + }); } + + while let Some(result) = inner_join_set.join_next().await{ + match result { + Ok(r) => { + let batch = r?; + multipart_writer.write_all(&batch).await?; + }, + Err(e) => { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } else { + unreachable!(); + } + } + } + } + + // let mut buffer = Vec::with_capacity(1024); + // while let Some(batch) = stream.next().await.transpose()? { + // let mut writer = json::LineDelimitedWriter::new(buffer); + // writer.write(&batch)?; + // buffer = writer.into_inner(); + // multipart_writer.write_all(&buffer).await?; + // buffer.clear(); + // } multipart_writer .shutdown() .await From 24b57ccf2218c3d9c21c3a04408305e2faa24d11 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Sun, 30 Jul 2023 10:49:50 -0400 Subject: [PATCH 02/10] Initial write_table implementation --- datafusion/core/src/dataframe.rs | 16 +++ .../core/src/datasource/file_format/csv.rs | 99 ++++++++++++------- .../core/src/datasource/listing/table.rs | 20 ++-- datafusion/core/src/datasource/memory.rs | 4 +- .../core/src/datasource/physical_plan/json.rs | 10 +- .../core/src/datasource/physical_plan/mod.rs | 6 +- datafusion/core/src/execution/context.rs | 2 +- datafusion/core/src/physical_plan/insert.rs | 40 ++++---- 8 files changed, 123 insertions(+), 74 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index ae933974517b3..dd196d317dadf 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -25,6 +25,7 @@ use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; use datafusion_common::{DataFusionError, SchemaError}; +use futures::StreamExt; use parquet::file::properties::WriterProperties; use datafusion_common::{Column, DFSchema, ScalarValue}; @@ -925,6 +926,21 @@ impl DataFrame { )) } + /// Write this DataFrame to the referenced table + /// This method uses on the same underlying implementation + /// as the SQL Insert Into statement. + /// Unlike most other DataFrame methods, this method executes + /// eagerly, writing data, and returning the count of rows written. + pub async fn write_table(self, table_name: &str) -> Result, DataFusionError>{ + let arrow_schema = Schema::from(self.schema()); + let plan = LogicalPlanBuilder::insert_into(self.plan, table_name.to_owned(), &arrow_schema)?.build()?; + DataFrame::new( + self.session_state, + plan) + .collect() + .await + } + /// Write a `DataFrame` to a CSV file. pub async fn write_csv(self, path: &str) -> Result<()> { let plan = self.session_state.create_physical_plan(&self.plan).await?; diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 8df9a86b1e798..4658524190f51 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -27,6 +27,7 @@ use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::{self, datatypes::SchemaRef}; use arrow_array::RecordBatch; +use chrono::{DateTime, Utc, NaiveDate}; use datafusion_common::DataFusionError; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -53,6 +54,7 @@ use crate::execution::context::SessionState; use crate::physical_plan::insert::{DataSink, InsertExec}; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; +use rand::distributions::{Alphanumeric, DistString}; /// The default file extension of csv files pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; @@ -560,10 +562,10 @@ impl CsvSink { impl DataSink for CsvSink { async fn write_all( &self, - mut data: SendableRecordBatchStream, + mut data: Vec, context: &Arc, ) -> Result { - let num_partitions = self.config.file_groups.len(); + let num_partitions = data.len(); let object_store = context .runtime_env() @@ -572,44 +574,73 @@ impl DataSink for CsvSink { // Construct serializer and writer for each file group let mut serializers = vec![]; let mut writers = vec![]; - for file_group in &self.config.file_groups { - // In append mode, consider has_header flag only when file is empty (at the start). - // For other modes, use has_header flag as is. - let header = self.has_header - && (!matches!(&self.config.writer_mode, FileWriterMode::Append) - || file_group.object_meta.size == 0); - let builder = WriterBuilder::new().with_delimiter(self.delimiter); - let serializer = CsvSerializer::new() - .with_builder(builder) - .with_header(header); - serializers.push(serializer); - - let file = file_group.clone(); - let writer = self - .create_writer(file.object_meta.clone().into(), object_store.clone()) - .await?; - writers.push(writer); + match self.config.writer_mode { + FileWriterMode::Append => { + for file_group in &self.config.file_groups { + // In append mode, consider has_header flag only when file is empty (at the start). + // For other modes, use has_header flag as is. + let header = self.has_header + && (!matches!(&self.config.writer_mode, FileWriterMode::Append) + || file_group.object_meta.size == 0); + let builder = WriterBuilder::new().with_delimiter(self.delimiter); + let serializer = CsvSerializer::new() + .with_builder(builder) + .with_header(header); + serializers.push(serializer); + + let file = file_group.clone(); + let writer = self + .create_writer(file.object_meta.clone().into(), object_store.clone()) + .await?; + writers.push(writer); + } + } + FileWriterMode::Put => return Err(DataFusionError::NotImplemented("Put Mode is not implemented for CSV Sink yet".into())), + FileWriterMode::PutMultipart =>{ + let base_path = &self.config.table_paths[0]; + //uniquely identify this batch of files with a random string, to prevent collisions overwriting files + let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + for part_idx in 0..num_partitions{ + let header = true; + let builder = WriterBuilder::new().with_delimiter(self.delimiter); + let serializer = CsvSerializer::new() + .with_builder(builder) + .with_header(header); + serializers.push(serializer); + let file_path = base_path.prefix().child(format!("/{}_{}.csv", write_id, part_idx)); + let object_meta = ObjectMeta{ + location: file_path, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = self.create_writer( + object_meta.into(), object_store.clone()) + .await?; + writers.push(writer); + } + } } - let mut idx = 0; let mut row_count = 0; // Map errors to DatafusionError. let err_converter = |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); - while let Some(maybe_batch) = data.next().await { - // Write data to files in a round robin fashion: - idx = (idx + 1) % num_partitions; - let serializer = &mut serializers[idx]; - let batch = check_for_errors(maybe_batch, &mut writers).await?; - row_count += batch.num_rows(); - let bytes = - check_for_errors(serializer.serialize(batch).await, &mut writers).await?; - let writer = &mut writers[idx]; - check_for_errors( - writer.write_all(&bytes).await.map_err(err_converter), - &mut writers, - ) - .await?; + for idx in 0..num_partitions{ + while let Some(maybe_batch) = data[idx].next().await { + // Write data to files in a round robin fashion: + let serializer = &mut serializers[idx]; + let batch = check_for_errors(maybe_batch, &mut writers).await?; + row_count += batch.num_rows(); + let bytes = + check_for_errors(serializer.serialize(batch).await, &mut writers).await?; + let writer = &mut writers[idx]; + check_for_errors( + writer.write_all(&bytes).await.map_err(err_converter), + &mut writers, + ) + .await?; + } } // Perform cleanup: let n_writers = writers.len(); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 2d8ba99563621..b9ae1580c5f5c 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -781,7 +781,7 @@ impl TableProvider for ListingTable { if self.table_paths().len() > 1 { return plan_err!( - "Writing to a table backed by multiple files is not supported yet" + "Writing to a table backed by multiple partitions is not supported yet" ); } @@ -799,20 +799,26 @@ impl TableProvider for ListingTable { .await?; let file_groups = file_list_stream.try_collect::>().await?; - - if file_groups.len() > 1 { - return plan_err!( - "Datafusion currently supports tables from single partition and/or file." - ); + let writer_mode; + //if we are writing a single output_partition to a table backed by a single file + //we can append to that file. Otherwise, we can write new files into the directory + //adding new files to the listing table in order to insert to the table. + let input_partitions = input.output_partitioning().partition_count(); + if file_groups.len() == 1 && input_partitions==1{ + writer_mode = crate::datasource::file_format::FileWriterMode::Append; + } else{ + writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart; } // Sink related option, apart from format let config = FileSinkConfig { object_store_url: self.table_paths()[0].object_store(), + input_partitions, + table_paths: self.table_paths().clone(), file_groups, output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), - writer_mode: crate::datasource::file_format::FileWriterMode::Append, + writer_mode, }; self.options() diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 5bac551cf61ee..dde3d6ea74bb6 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -252,7 +252,7 @@ impl MemSink { impl DataSink for MemSink { async fn write_all( &self, - mut data: SendableRecordBatchStream, + mut data: Vec, _context: &Arc, ) -> Result { let num_partitions = self.batches.len(); @@ -262,7 +262,7 @@ impl DataSink for MemSink { let mut new_batches = vec![vec![]; num_partitions]; let mut i = 0; let mut row_count = 0; - while let Some(batch) = data.next().await.transpose()? { + while let Some(batch) = data[0].next().await.transpose()? { row_count += batch.num_rows(); new_batches[i].push(batch); i = (i + 1) % num_partitions; diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 5ef9aff7a828b..e260b78a286bd 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -304,15 +304,7 @@ pub async fn plan_to_json( } } } - - // let mut buffer = Vec::with_capacity(1024); - // while let Some(batch) = stream.next().await.transpose()? { - // let mut writer = json::LineDelimitedWriter::new(buffer); - // writer.write(&batch)?; - // buffer = writer.into_inner(); - // multipart_writer.write_all(&buffer).await?; - // buffer.clear(); - // } + multipart_writer .shutdown() .await diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index ac660770b1264..e62b4e4b75289 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -82,7 +82,7 @@ use std::{ vec, }; -use super::{ColumnStatistics, Statistics}; +use super::{ColumnStatistics, Statistics, listing::ListingTableUrl}; /// Convert type to a type suitable for use as a [`ListingTable`] /// partition column. Returns `Dictionary(UInt16, val_type)`, which is @@ -323,6 +323,10 @@ pub struct FileSinkConfig { pub object_store_url: ObjectStoreUrl, /// A vector of [`PartitionedFile`] structs, each representing a file partition pub file_groups: Vec, + /// number of partitions in the input_plan + pub input_partitions: usize, + /// Vector of partition paths + pub table_paths: Vec, /// The schema of the output file pub output_schema: SchemaRef, /// A vector of column names and their corresponding data types, diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index ce9165f027c7c..0ffceca047d24 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1494,7 +1494,7 @@ impl SessionState { .expect("Failed to register default schema"); } - fn resolve_table_ref<'a>( + pub(crate) fn resolve_table_ref<'a>( &'a self, table_ref: impl Into>, ) -> ResolvedTableReference<'a> { diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index 8766b62e9a9e2..1a9c9dfcfc26e 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -57,7 +57,7 @@ pub trait DataSink: DisplayAs + Debug + Send + Sync { /// or rollback required. async fn write_all( &self, - data: SendableRecordBatchStream, + data: Vec, context: &Arc, ) -> Result; } @@ -136,6 +136,17 @@ impl InsertExec { ))) } } + + fn make_all_input_streams(&self, context: Arc) -> Result>{ + let n_input_parts = self.input.output_partitioning().partition_count(); + let mut streams = Vec::with_capacity(n_input_parts); + for part in 0..n_input_parts{ + streams.push( + self.make_input_stream(part, context.clone())? + ); + } + Ok(streams) + } } impl DisplayAs for InsertExec { @@ -169,11 +180,15 @@ impl ExecutionPlan for InsertExec { } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None + self.input.output_ordering() } - fn required_input_distribution(&self) -> Vec { - vec![Distribution::SinglePartition] + fn benefits_from_input_partitioning(&self) -> bool { + // Incoming number of partitions is taken to be the + // number of files the query is required to write out. + // The optimizer should not change this number. + // Parrallelism is handled within the appropriate DataSink + false } fn required_input_ordering(&self) -> Vec>> { @@ -216,22 +231,7 @@ impl ExecutionPlan for InsertExec { partition: usize, context: Arc, ) -> Result { - if partition != 0 { - return Err(DataFusionError::Internal( - format!("Invalid requested partition {partition}. InsertExec requires a single input partition." - ))); - } - - // Execute each of our own input's partitions and pass them to the sink - let input_partition_count = self.input.output_partitioning().partition_count(); - if input_partition_count != 1 { - return Err(DataFusionError::Internal(format!( - "Invalid input partition count {input_partition_count}. \ - InsertExec needs only a single partition." - ))); - } - - let data = self.make_input_stream(0, context.clone())?; + let data = self.make_all_input_streams(context.clone())?; let count_schema = self.count_schema.clone(); let sink = self.sink.clone(); From e2a5a5c5fadf9a29dc9fe89fc1e231e0358b2a8a Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Sun, 30 Jul 2023 11:17:39 -0400 Subject: [PATCH 03/10] update MemSink to accept Vec --- datafusion/core/src/datasource/memory.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index dde3d6ea74bb6..405091423d885 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -262,11 +262,14 @@ impl DataSink for MemSink { let mut new_batches = vec![vec![]; num_partitions]; let mut i = 0; let mut row_count = 0; - while let Some(batch) = data[0].next().await.transpose()? { - row_count += batch.num_rows(); - new_batches[i].push(batch); - i = (i + 1) % num_partitions; + for stream in data{ + while let Some(batch) = stream.next().await.transpose()? { + row_count += batch.num_rows(); + new_batches[i].push(batch); + i = (i + 1) % num_partitions; + } } + // write the outputs into the batches for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) { From faa66f2f75d129e586d8605cb5faead2427b3cc6 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Sun, 30 Jul 2023 11:32:26 -0400 Subject: [PATCH 04/10] some cleaning up --- .../core/src/datasource/file_format/csv.rs | 1 + .../core/src/datasource/listing/table.rs | 1 - datafusion/core/src/datasource/memory.rs | 5 +-- .../core/src/datasource/physical_plan/json.rs | 32 ++++--------------- .../core/src/datasource/physical_plan/mod.rs | 2 -- datafusion/core/src/execution/context.rs | 2 +- datafusion/core/src/physical_plan/insert.rs | 2 +- 7 files changed, 13 insertions(+), 32 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 4658524190f51..3871ab8d6a583 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -597,6 +597,7 @@ impl DataSink for CsvSink { } FileWriterMode::Put => return Err(DataFusionError::NotImplemented("Put Mode is not implemented for CSV Sink yet".into())), FileWriterMode::PutMultipart =>{ + //currently assuming only 1 partition path (i.e. not hive style partitioning on a column) let base_path = &self.config.table_paths[0]; //uniquely identify this batch of files with a random string, to prevent collisions overwriting files let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index b9ae1580c5f5c..473b35ea30500 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -813,7 +813,6 @@ impl TableProvider for ListingTable { // Sink related option, apart from format let config = FileSinkConfig { object_store_url: self.table_paths()[0].object_store(), - input_partitions, table_paths: self.table_paths().clone(), file_groups, output_schema: self.schema(), diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 405091423d885..639a40bdd9e20 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -262,8 +262,9 @@ impl DataSink for MemSink { let mut new_batches = vec![vec![]; num_partitions]; let mut i = 0; let mut row_count = 0; - for stream in data{ - while let Some(batch) = stream.next().await.transpose()? { + let num_parts = data.len(); + for idx in 0..num_parts{ + while let Some(batch) = data[idx].next().await.transpose()? { row_count += batch.num_rows(); new_batches[i].push(batch); i = (i + 1) % num_partitions; diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index e260b78a286bd..ad7cbd4e91b8c 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -278,31 +278,13 @@ pub async fn plan_to_json( join_set.spawn(async move { let (_, mut multipart_writer) = storeref.put_multipart(&file).await?; - let mut inner_join_set = JoinSet::new(); - while let Some(batch) = stream.try_next().await?{ - inner_join_set.spawn(async move { - let buffer = Vec::with_capacity(1024); - let mut writer = json::LineDelimitedWriter::new(buffer); - writer.write(&batch)?; - let r: Result, DataFusionError> = Ok(writer.into_inner()); - r - }); - } - - while let Some(result) = inner_join_set.join_next().await{ - match result { - Ok(r) => { - let batch = r?; - multipart_writer.write_all(&batch).await?; - }, - Err(e) => { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } else { - unreachable!(); - } - } - } + let mut buffer = Vec::with_capacity(1024); + while let Some(batch) = stream.next().await.transpose()? { + let mut writer = json::LineDelimitedWriter::new(buffer); + writer.write(&batch)?; + buffer = writer.into_inner(); + multipart_writer.write_all(&buffer).await?; + buffer.clear(); } multipart_writer diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index e62b4e4b75289..7c80c0e28c99b 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -323,8 +323,6 @@ pub struct FileSinkConfig { pub object_store_url: ObjectStoreUrl, /// A vector of [`PartitionedFile`] structs, each representing a file partition pub file_groups: Vec, - /// number of partitions in the input_plan - pub input_partitions: usize, /// Vector of partition paths pub table_paths: Vec, /// The schema of the output file diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 0ffceca047d24..ce9165f027c7c 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1494,7 +1494,7 @@ impl SessionState { .expect("Failed to register default schema"); } - pub(crate) fn resolve_table_ref<'a>( + fn resolve_table_ref<'a>( &'a self, table_ref: impl Into>, ) -> ResolvedTableReference<'a> { diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index 1a9c9dfcfc26e..39b87452f162c 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -180,7 +180,7 @@ impl ExecutionPlan for InsertExec { } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.input.output_ordering() + None } fn benefits_from_input_partitioning(&self) -> bool { From 7b856ad5878038086b7e6202782c88c7bf821138 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Sun, 30 Jul 2023 11:46:44 -0400 Subject: [PATCH 05/10] fmt --- datafusion/core/src/dataframe.rs | 20 ++++++----- .../core/src/datasource/file_format/csv.rs | 34 ++++++++++++------- .../core/src/datasource/listing/table.rs | 6 ++-- datafusion/core/src/datasource/memory.rs | 3 +- .../core/src/datasource/physical_plan/json.rs | 2 +- .../core/src/datasource/physical_plan/mod.rs | 2 +- datafusion/core/src/physical_plan/insert.rs | 13 +++---- 7 files changed, 47 insertions(+), 33 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index dd196d317dadf..9c8ca8b549ba4 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -928,17 +928,21 @@ impl DataFrame { /// Write this DataFrame to the referenced table /// This method uses on the same underlying implementation - /// as the SQL Insert Into statement. + /// as the SQL Insert Into statement. /// Unlike most other DataFrame methods, this method executes /// eagerly, writing data, and returning the count of rows written. - pub async fn write_table(self, table_name: &str) -> Result, DataFusionError>{ + pub async fn write_table( + self, + table_name: &str, + ) -> Result, DataFusionError> { let arrow_schema = Schema::from(self.schema()); - let plan = LogicalPlanBuilder::insert_into(self.plan, table_name.to_owned(), &arrow_schema)?.build()?; - DataFrame::new( - self.session_state, - plan) - .collect() - .await + let plan = LogicalPlanBuilder::insert_into( + self.plan, + table_name.to_owned(), + &arrow_schema, + )? + .build()?; + DataFrame::new(self.session_state, plan).collect().await } /// Write a `DataFrame` to a CSV file. diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 3871ab8d6a583..da9a52a707b82 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -27,7 +27,7 @@ use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::{self, datatypes::SchemaRef}; use arrow_array::RecordBatch; -use chrono::{DateTime, Utc, NaiveDate}; +use chrono::{DateTime, NaiveDate, Utc}; use datafusion_common::DataFusionError; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -587,36 +587,45 @@ impl DataSink for CsvSink { .with_builder(builder) .with_header(header); serializers.push(serializer); - + let file = file_group.clone(); let writer = self - .create_writer(file.object_meta.clone().into(), object_store.clone()) + .create_writer( + file.object_meta.clone().into(), + object_store.clone(), + ) .await?; writers.push(writer); } } - FileWriterMode::Put => return Err(DataFusionError::NotImplemented("Put Mode is not implemented for CSV Sink yet".into())), - FileWriterMode::PutMultipart =>{ + FileWriterMode::Put => { + return Err(DataFusionError::NotImplemented( + "Put Mode is not implemented for CSV Sink yet".into(), + )) + } + FileWriterMode::PutMultipart => { //currently assuming only 1 partition path (i.e. not hive style partitioning on a column) let base_path = &self.config.table_paths[0]; //uniquely identify this batch of files with a random string, to prevent collisions overwriting files let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - for part_idx in 0..num_partitions{ + for part_idx in 0..num_partitions { let header = true; let builder = WriterBuilder::new().with_delimiter(self.delimiter); let serializer = CsvSerializer::new() .with_builder(builder) .with_header(header); serializers.push(serializer); - let file_path = base_path.prefix().child(format!("/{}_{}.csv", write_id, part_idx)); - let object_meta = ObjectMeta{ + let file_path = base_path + .prefix() + .child(format!("/{}_{}.csv", write_id, part_idx)); + let object_meta = ObjectMeta { location: file_path, last_modified: chrono::offset::Utc::now(), size: 0, e_tag: None, }; - let writer = self.create_writer( - object_meta.into(), object_store.clone()) + let writer = self + .create_writer(object_meta.into(), object_store.clone()) .await?; writers.push(writer); } @@ -627,14 +636,15 @@ impl DataSink for CsvSink { // Map errors to DatafusionError. let err_converter = |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); - for idx in 0..num_partitions{ + for idx in 0..num_partitions { while let Some(maybe_batch) = data[idx].next().await { // Write data to files in a round robin fashion: let serializer = &mut serializers[idx]; let batch = check_for_errors(maybe_batch, &mut writers).await?; row_count += batch.num_rows(); let bytes = - check_for_errors(serializer.serialize(batch).await, &mut writers).await?; + check_for_errors(serializer.serialize(batch).await, &mut writers) + .await?; let writer = &mut writers[idx]; check_for_errors( writer.write_all(&bytes).await.map_err(err_converter), diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 473b35ea30500..71539cdaa9b1e 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -804,9 +804,9 @@ impl TableProvider for ListingTable { //we can append to that file. Otherwise, we can write new files into the directory //adding new files to the listing table in order to insert to the table. let input_partitions = input.output_partitioning().partition_count(); - if file_groups.len() == 1 && input_partitions==1{ - writer_mode = crate::datasource::file_format::FileWriterMode::Append; - } else{ + if file_groups.len() == 1 && input_partitions == 1 { + writer_mode = crate::datasource::file_format::FileWriterMode::Append; + } else { writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart; } diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 639a40bdd9e20..18e5fea478092 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -263,14 +263,13 @@ impl DataSink for MemSink { let mut i = 0; let mut row_count = 0; let num_parts = data.len(); - for idx in 0..num_parts{ + for idx in 0..num_parts { while let Some(batch) = data[idx].next().await.transpose()? { row_count += batch.num_rows(); new_batches[i].push(batch); i = (i + 1) % num_partitions; } } - // write the outputs into the batches for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) { diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index ad7cbd4e91b8c..141cac0370040 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -277,7 +277,7 @@ pub async fn plan_to_json( let mut stream = plan.execute(i, task_ctx.clone())?; join_set.spawn(async move { let (_, mut multipart_writer) = storeref.put_multipart(&file).await?; - + let mut buffer = Vec::with_capacity(1024); while let Some(batch) = stream.next().await.transpose()? { let mut writer = json::LineDelimitedWriter::new(buffer); diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 7c80c0e28c99b..f1f9916911ca2 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -82,7 +82,7 @@ use std::{ vec, }; -use super::{ColumnStatistics, Statistics, listing::ListingTableUrl}; +use super::{listing::ListingTableUrl, ColumnStatistics, Statistics}; /// Convert type to a type suitable for use as a [`ListingTable`] /// partition column. Returns `Dictionary(UInt16, val_type)`, which is diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index 39b87452f162c..63a0695b742da 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -137,13 +137,14 @@ impl InsertExec { } } - fn make_all_input_streams(&self, context: Arc) -> Result>{ + fn make_all_input_streams( + &self, + context: Arc, + ) -> Result> { let n_input_parts = self.input.output_partitioning().partition_count(); let mut streams = Vec::with_capacity(n_input_parts); - for part in 0..n_input_parts{ - streams.push( - self.make_input_stream(part, context.clone())? - ); + for part in 0..n_input_parts { + streams.push(self.make_input_stream(part, context.clone())?); } Ok(streams) } @@ -186,7 +187,7 @@ impl ExecutionPlan for InsertExec { fn benefits_from_input_partitioning(&self) -> bool { // Incoming number of partitions is taken to be the // number of files the query is required to write out. - // The optimizer should not change this number. + // The optimizer should not change this number. // Parrallelism is handled within the appropriate DataSink false } From 905076d81dcae5c7654637ec970ad81fe7694fdf Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 3 Aug 2023 19:54:35 -0400 Subject: [PATCH 06/10] allow listing table to control append method --- datafusion/core/src/dataframe.rs | 2 + .../core/src/datasource/file_format/csv.rs | 3 + .../src/datasource/file_format/options.rs | 22 ++- datafusion/core/src/datasource/listing/mod.rs | 2 +- .../core/src/datasource/listing/table.rs | 180 +++++++++++++++++- datafusion/core/src/datasource/memory.rs | 3 +- .../core/src/datasource/physical_plan/mod.rs | 2 + datafusion/core/src/datasource/provider.rs | 4 +- datafusion/core/src/physical_plan/insert.rs | 3 + datafusion/core/src/physical_planner.rs | 22 ++- datafusion/expr/src/logical_plan/builder.rs | 9 +- datafusion/expr/src/logical_plan/dml.rs | 6 +- datafusion/sql/src/statement.rs | 16 +- 13 files changed, 250 insertions(+), 24 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 9c8ca8b549ba4..03432c3e12bc4 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -934,12 +934,14 @@ impl DataFrame { pub async fn write_table( self, table_name: &str, + overwrite: bool, ) -> Result, DataFusionError> { let arrow_schema = Schema::from(self.schema()); let plan = LogicalPlanBuilder::insert_into( self.plan, table_name.to_owned(), &arrow_schema, + overwrite, )? .build()?; DataFrame::new(self.session_state, plan).collect().await diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index da9a52a707b82..f7d677891e98e 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -270,6 +270,9 @@ impl FileFormat for CsvFormat { _state: &SessionState, conf: FileSinkConfig, ) -> Result> { + if conf.overwrite{ + return Err(DataFusionError::NotImplemented("Overwrites are not implemented yet for CSV".into())) + } let sink_schema = conf.output_schema().clone(); let sink = Arc::new(CsvSink::new( conf, diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index a50066fd97c83..ed5599e8d8a3c 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -30,7 +30,7 @@ use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; -use crate::datasource::listing::ListingTableUrl; +use crate::datasource::listing::{ListingTableUrl, ListingTableInsertMode}; use crate::datasource::{ file_format::{ avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, @@ -39,6 +39,7 @@ use crate::datasource::{ }; use crate::error::Result; use crate::execution::context::{SessionConfig, SessionState}; +use crate::logical_expr::Expr; /// Options that control the reading of CSV files. /// @@ -73,6 +74,10 @@ pub struct CsvReadOptions<'a> { pub file_compression_type: FileCompressionType, /// Flag indicating whether this file may be unbounded (as in a FIFO file). pub infinite: bool, + /// Indicates how the file is sorted + pub file_sort_order: Vec>, + /// Setting controls how inserts to this file should be handled + pub insert_mode: ListingTableInsertMode, } impl<'a> Default for CsvReadOptions<'a> { @@ -95,6 +100,8 @@ impl<'a> CsvReadOptions<'a> { table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, + file_sort_order: vec![], + insert_mode: ListingTableInsertMode::AppendToFile, } } @@ -171,6 +178,16 @@ impl<'a> CsvReadOptions<'a> { self.file_compression_type = file_compression_type; self } + + pub fn file_sort_order(mut self, file_sort_order: Vec>)->Self{ + self.file_sort_order = file_sort_order; + self + } + + pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self{ + self.insert_mode = insert_mode; + self + } } /// Options that control the reading of Parquet files. @@ -178,7 +195,7 @@ impl<'a> CsvReadOptions<'a> { /// Note this structure is supplied when a datasource is created and /// can not not vary from statement to statement. For settings that /// can vary statement to statement see -/// [`ConfigOptions`](crate::config::ConfigOptions). +/// [`ConfigOptions`](crate::config::ConfigO ptions). #[derive(Clone)] pub struct ParquetReadOptions<'a> { /// File extension; only files with this extension are selected for data input. @@ -464,6 +481,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { // TODO: Add file sort order into CsvReadOptions and introduce here. .with_file_sort_order(vec![]) .with_infinite_source(self.infinite) + .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index aa2e20164b5e1..915a5de9b428e 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -31,7 +31,7 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; -pub use table::{ListingOptions, ListingTable, ListingTableConfig}; +pub use table::{ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode}; /// Stream of files get listed from object store pub type PartitionedFileStream = diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 71539cdaa9b1e..2affbf405a408 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -207,6 +207,16 @@ impl ListingTableConfig { } } +#[derive(Debug, Clone)] +///controls how new data should be inserted to a ListingTable +pub enum ListingTableInsertMode { + ///Data should be appended to an existing file + AppendToFile, + ///Data is appended as new files in existing TablePaths + AppendNewFiles, + ///Throw an error if insert into is attempted on this table + Error, +} /// Options for creating a [`ListingTable`] #[derive(Clone, Debug)] pub struct ListingOptions { @@ -245,6 +255,8 @@ pub struct ListingOptions { /// In order to support infinite inputs, DataFusion may adjust query /// plans (e.g. joins) to run the given query in full pipelining mode. pub infinite_source: bool, + ///This setting controls how inserts to this table should be handled + pub insert_mode: ListingTableInsertMode, } impl ListingOptions { @@ -263,6 +275,7 @@ impl ListingOptions { target_partitions: 1, file_sort_order: vec![], infinite_source: false, + insert_mode: ListingTableInsertMode::AppendNewFiles, } } @@ -431,6 +444,11 @@ impl ListingOptions { self } + pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self{ + self.insert_mode = insert_mode; + self + } + /// Infer the schema of the files at the given path on the provided object store. /// The inferred schema does not include the partitioning columns. /// @@ -770,6 +788,7 @@ impl TableProvider for ListingTable { &self, state: &SessionState, input: Arc, + overwrite: bool, ) -> Result> { // Check that the schema of the plan matches the schema of this table. if !self.schema().equivalent_names_and_types(&input.schema()) { @@ -804,12 +823,17 @@ impl TableProvider for ListingTable { //we can append to that file. Otherwise, we can write new files into the directory //adding new files to the listing table in order to insert to the table. let input_partitions = input.output_partitioning().partition_count(); - if file_groups.len() == 1 && input_partitions == 1 { - writer_mode = crate::datasource::file_format::FileWriterMode::Append; - } else { - writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart; + match self.options.insert_mode{ + ListingTableInsertMode::AppendToFile => { + if input_partitions > file_groups.len(){ + return Err(DataFusionError::Plan(format!("Cannot append {input_partitions} partitions to {} files!", file_groups.len()))) + } + writer_mode = crate::datasource::file_format::FileWriterMode::Append; + }, + ListingTableInsertMode::AppendNewFiles => writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart, + ListingTableInsertMode::Error => return Err(DataFusionError::Plan("Invalid plan attempting write to table with TableWriteMode::Error!".into())), } - + // Sink related option, apart from format let config = FileSinkConfig { object_store_url: self.table_paths()[0].object_store(), @@ -818,6 +842,7 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), writer_mode, + overwrite, }; self.options() @@ -1401,12 +1426,14 @@ mod tests { fn load_empty_schema_csv_table( schema: SchemaRef, temp_path: &str, + insert_mode: ListingTableInsertMode, ) -> Result> { File::create(temp_path)?; let table_path = ListingTableUrl::parse(temp_path).unwrap(); let file_format = CsvFormat::default(); - let listing_options = ListingOptions::new(Arc::new(file_format)); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_insert_mode(insert_mode); let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) @@ -1557,7 +1584,7 @@ mod tests { let path = tmp_dir.path().join(filename); let initial_table = - load_empty_schema_csv_table(schema.clone(), path.to_str().unwrap())?; + load_empty_schema_csv_table(schema.clone(), path.to_str().unwrap(), ListingTableInsertMode::AppendToFile)?; session_ctx.register_table("t", initial_table)?; // Create and register the source table with the provided schema and inserted data let source_table = Arc::new(MemTable::try_new( @@ -1571,7 +1598,7 @@ mod tests { let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?; // Create an insert plan to insert the source data into the initial table let insert_into_table = - LogicalPlanBuilder::insert_into(scan_plan, "t", &schema)?.build()?; + LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; // Create a physical plan from the insert plan let plan = session_ctx .state() @@ -1682,4 +1709,141 @@ mod tests { // Return Ok if the function Ok(()) } + + #[tokio::test] + async fn test_append_new_files_to_csv_table() -> Result<()> { + let file_type = FileType::CSV; + let file_compression_type = FileCompressionType::UNCOMPRESSED; + + // Create the initial context, schema, and batch. + let session_ctx = SessionContext::new(); + // Create a new schema with one field called "a" of type Int32 + let schema = Arc::new(Schema::new(vec![Field::new( + "column1", + DataType::Int32, + false, + )])); + + // Create a new batch of data to insert into the table + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], + )?; + + // Filename with extension + let filename = format!( + "path{}", + file_type + .to_owned() + .get_ext_with_compression(file_compression_type.clone()) + .unwrap() + ); + + // Define batch size for file reader + let batch_size = batch.num_rows(); + + // Create a temporary directory and a CSV file within it. + let tmp_dir = TempDir::new()?; + let path = tmp_dir.path().join(filename); + + session_ctx.register_csv("t", tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new() + .insert_mode(ListingTableInsertMode::AppendNewFiles) + .schema(schema.as_ref())) + .await?; + let csv_table = session_ctx.table_provider("t").await?; + // Create and register the source table with the provided schema and inserted data + let source_table = Arc::new(MemTable::try_new( + schema.clone(), + vec![vec![batch.clone(), batch.clone()]], + )?); + session_ctx.register_table("source", source_table.clone())?; + // Convert the source table into a provider so that it can be used in a query + let source = provider_as_source(source_table); + // Create a table scan logical plan to read from the source table + let scan_plan = LogicalPlanBuilder::scan("source", source, None)? + .repartition(Partitioning::Hash(vec![Expr::Column("column1".into())], 6))? + .build()?; + // Create an insert plan to insert the source data into the initial table + let insert_into_table = + LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; + // Create a physical plan from the insert plan + let plan = session_ctx + .state() + .create_physical_plan(&insert_into_table) + .await?; + + // Execute the physical plan and collect the results + let res = collect(plan, session_ctx.task_ctx()).await?; + // Insert returns the number of rows written, in our case this would be 6. + let expected = vec![ + "+-------+", + "| count |", + "+-------+", + "| 6 |", + "+-------+", + ]; + + // Assert that the batches read from the file match the expected result. + assert_batches_eq!(expected, &res); + + // Read the records in the table + let batches = session_ctx.sql("select count(*) from t").await?.collect().await?; + let expected = vec![ + "+----------+", + "| COUNT(*) |", + "+----------+", + "| 6 |", + "+----------+", + ]; + + // Assert that the batches read from the file match the expected result. + assert_batches_eq!(expected, &batches); + + //asert that 6 files were added to the table + let num_files = tmp_dir.path().read_dir()?.count(); + assert_eq!(num_files, 6); + + // Create a physical plan from the insert plan + let plan = session_ctx + .state() + .create_physical_plan(&insert_into_table) + .await?; + + // Again, execute the physical plan and collect the results + let res = collect(plan, session_ctx.task_ctx()).await?; + // Insert returns the number of rows written, in our case this would be 6. + let expected = vec![ + "+-------+", + "| count |", + "+-------+", + "| 6 |", + "+-------+", + ]; + + // Assert that the batches read from the file match the expected result. + assert_batches_eq!(expected, &res); + + // Read the contents of the table + let batches = session_ctx.sql("select count(*) from t").await?.collect().await?; + + // Define the expected result after the second append. + let expected = vec![ + "+----------+", + "| COUNT(*) |", + "+----------+", + "| 12 |", + "+----------+", + ]; + + // Assert that the batches read from the file after the second append match the expected result. + assert_batches_eq!(expected, &batches); + + // Assert that another 6 files were added to the table + let num_files = tmp_dir.path().read_dir()?.count(); + assert_eq!(num_files, 12); + + // Return Ok if the function + Ok(()) + } } diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 18e5fea478092..893e4f651e5d2 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -204,6 +204,7 @@ impl TableProvider for MemTable { &self, _state: &SessionState, input: Arc, + overwrite: bool, ) -> Result> { // Create a physical plan from the logical plan. // Check that the schema of the plan matches the schema of this table. @@ -545,7 +546,7 @@ mod tests { let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?; // Create an insert plan to insert the source data into the initial table let insert_into_table = - LogicalPlanBuilder::insert_into(scan_plan, "t", &schema)?.build()?; + LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?; // Create a physical plan from the insert plan let plan = session_ctx .state() diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index f1f9916911ca2..3a9907ec86f40 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -332,6 +332,8 @@ pub struct FileSinkConfig { pub table_partition_cols: Vec<(String, DataType)>, /// A writer mode that determines how data is written to the file pub writer_mode: FileWriterMode, + /// Controls whether existing data should be overwritten by this sink + pub overwrite: bool } impl FileSinkConfig { diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 9c97935105890..7c41e2bd380c7 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -127,10 +127,12 @@ pub trait TableProvider: Sync + Send { &self, _state: &SessionState, _input: Arc, + _overwrite: bool, ) -> Result> { - let msg = "Insertion not implemented for this table".to_owned(); + let msg = "Insert into not implemented for this table".to_owned(); Err(DataFusionError::NotImplemented(msg)) } + } /// A factory which creates [`TableProvider`]s at runtime given a URL. diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index 63a0695b742da..67fd44a8cf179 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -232,6 +232,9 @@ impl ExecutionPlan for InsertExec { partition: usize, context: Arc, ) -> Result { + if partition!=0{ + return Err(DataFusionError::Internal("InsertExec can only be called on partition 0!".into())) + } let data = self.make_all_input_streams(context.clone())?; let count_schema = self.count_schema.clone(); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 947a3f43870eb..0c887ca42fc7b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -17,6 +17,7 @@ //! Planner for [`LogicalPlan`] to [`ExecutionPlan`] +use crate::datasource::listing::ListingTableInsertMode; use crate::datasource::source_as_provider; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_expr::utils::generate_sort_key; @@ -532,7 +533,7 @@ impl DefaultPhysicalPlanner { } LogicalPlan::Dml(DmlStatement { table_name, - op: WriteOp::Insert, + op: WriteOp::InsertInto, input, .. }) => { @@ -540,7 +541,24 @@ impl DefaultPhysicalPlanner { let schema = session_state.schema_for_ref(table_name)?; if let Some(provider) = schema.table(name).await { let input_exec = self.create_initial_plan(input, session_state).await?; - provider.insert_into(session_state, input_exec).await + provider.insert_into(session_state, input_exec, false).await + } else { + return Err(DataFusionError::Execution(format!( + "Table '{table_name}' does not exist" + ))); + } + } + LogicalPlan::Dml(DmlStatement { + table_name, + op: WriteOp::InsertOverwrite, + input, + .. + }) => { + let name = table_name.table(); + let schema = session_state.schema_for_ref(table_name)?; + if let Some(provider) = schema.table(name).await { + let input_exec = self.create_initial_plan(input, session_state).await?; + provider.insert_into(session_state, input_exec, true).await } else { return Err(DataFusionError::Execution(format!( "Table '{table_name}' does not exist" diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index a121553ea501a..040e47d4e938d 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -236,12 +236,19 @@ impl LogicalPlanBuilder { input: LogicalPlan, table_name: impl Into, table_schema: &Schema, + overwrite: bool, ) -> Result { let table_schema = table_schema.clone().to_dfschema_ref()?; + let op; + if overwrite{ + op = WriteOp::InsertOverwrite; + } else{ + op = WriteOp::InsertInto; + } Ok(Self::from(LogicalPlan::Dml(DmlStatement { table_name: table_name.into(), table_schema, - op: WriteOp::Insert, + op, input: Arc::new(input), }))) } diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 117a42cda9702..07f34101eb3a2 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -40,7 +40,8 @@ pub struct DmlStatement { #[derive(Clone, PartialEq, Eq, Hash)] pub enum WriteOp { - Insert, + InsertOverwrite, + InsertInto, Delete, Update, Ctas, @@ -49,7 +50,8 @@ pub enum WriteOp { impl Display for WriteOp { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - WriteOp::Insert => write!(f, "Insert"), + WriteOp::InsertOverwrite => write!(f, "Insert Overwrite"), + WriteOp::InsertInto => write!(f, "Insert Into"), WriteOp::Delete => write!(f, "Delete"), WriteOp::Update => write!(f, "Update"), WriteOp::Ctas => write!(f, "Ctas"), diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index cb4a7cb52779b..32b2059f9f8d2 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -359,9 +359,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if or.is_some() { plan_err!("Inserts with or clauses not supported")?; } - if overwrite { - plan_err!("Insert overwrite is not supported")?; - } if partitioned.is_some() { plan_err!("Partitioned inserts not yet supported")?; } @@ -378,7 +375,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan_err!("Insert-returning clause not supported")?; } let _ = into; // optional keyword doesn't change behavior - self.insert_to_plan(table_name, columns, source) + self.insert_to_plan(table_name, columns, source, overwrite) } Statement::Update { @@ -934,6 +931,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { table_name: ObjectName, columns: Vec, source: Box, + overwrite: bool, ) -> Result { // Do a table lookup to verify the table exists let table_name = self.object_name_to_table_reference(table_name)?; @@ -1026,11 +1024,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }) .collect::>>()?; let source = project(source, exprs)?; - + + let op; + if overwrite{ + op = WriteOp::InsertOverwrite + } else{ + op = WriteOp::InsertInto + } let plan = LogicalPlan::Dml(DmlStatement { table_name, table_schema: Arc::new(table_schema), - op: WriteOp::Insert, + op, input: Arc::new(source), }); Ok(plan) From eb1fac79aca75b7fd4126a8c003936e29a94fe3a Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 4 Aug 2023 07:13:30 -0400 Subject: [PATCH 07/10] test fixes --- .../core/src/datasource/listing/table.rs | 2 +- .../sqllogictests/test_files/explain.slt | 2 +- .../tests/sqllogictests/test_files/insert.slt | 23 +++++++++---------- datafusion/sql/tests/sql_integration.rs | 8 +++---- 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 2affbf405a408..867936f42f6df 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -275,7 +275,7 @@ impl ListingOptions { target_partitions: 1, file_sort_order: vec![], infinite_source: false, - insert_mode: ListingTableInsertMode::AppendNewFiles, + insert_mode: ListingTableInsertMode::AppendToFile, } } diff --git a/datafusion/core/tests/sqllogictests/test_files/explain.slt b/datafusion/core/tests/sqllogictests/test_files/explain.slt index bd3513550a4e2..aa560961d2f36 100644 --- a/datafusion/core/tests/sqllogictests/test_files/explain.slt +++ b/datafusion/core/tests/sqllogictests/test_files/explain.slt @@ -159,7 +159,7 @@ query TT EXPLAIN INSERT INTO sink_table SELECT * FROM aggregate_test_100 ORDER by c1 ---- logical_plan -Dml: op=[Insert] table=[sink_table] +Dml: op=[Insert Into] table=[sink_table] --Projection: aggregate_test_100.c1 AS c1, aggregate_test_100.c2 AS c2, aggregate_test_100.c3 AS c3, aggregate_test_100.c4 AS c4, aggregate_test_100.c5 AS c5, aggregate_test_100.c6 AS c6, aggregate_test_100.c7 AS c7, aggregate_test_100.c8 AS c8, aggregate_test_100.c9 AS c9, aggregate_test_100.c10 AS c10, aggregate_test_100.c11 AS c11, aggregate_test_100.c12 AS c12, aggregate_test_100.c13 AS c13 ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] diff --git a/datafusion/core/tests/sqllogictests/test_files/insert.slt b/datafusion/core/tests/sqllogictests/test_files/insert.slt index 90a33bd1c5f79..e42d2ef0592d6 100644 --- a/datafusion/core/tests/sqllogictests/test_files/insert.slt +++ b/datafusion/core/tests/sqllogictests/test_files/insert.slt @@ -57,7 +57,7 @@ FROM aggregate_test_100 ORDER by c1 ---- logical_plan -Dml: op=[Insert] table=[table_without_values] +Dml: op=[Insert Into] table=[table_without_values] --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2 ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1 @@ -120,20 +120,19 @@ COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWI FROM aggregate_test_100 ---- logical_plan -Dml: op=[Insert] table=[table_without_values] +Dml: op=[Insert Into] table=[table_without_values] --Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2 ----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]] ------TableScan: aggregate_test_100 projection=[c1, c4, c9] physical_plan InsertExec: sink=MemoryTable (partitions=1) ---CoalescePartitionsExec -----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] -------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] ---------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST] -----------CoalesceBatchesExec: target_batch_size=8192 -------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 ---------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 -----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true +--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2] +----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted] +------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8 +------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true @@ -168,7 +167,7 @@ FROM aggregate_test_100 ORDER BY c1 ---- logical_plan -Dml: op=[Insert] table=[table_without_values] +Dml: op=[Insert Into] table=[table_without_values] --Projection: a1 AS a1, a2 AS a2 ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1 @@ -212,7 +211,7 @@ query TT explain insert into table_without_values select c1 from aggregate_test_100 order by c1; ---- logical_plan -Dml: op=[Insert] table=[table_without_values] +Dml: op=[Insert Into] table=[table_without_values] --Projection: aggregate_test_100.c1 AS c1 ----Sort: aggregate_test_100.c1 ASC NULLS LAST ------TableScan: aggregate_test_100 projection=[c1] diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index f19ddfc053123..eef9093947fb9 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -330,7 +330,7 @@ fn plan_insert() { let sql = "insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')"; let plan = r#" -Dml: op=[Insert] table=[person] +Dml: op=[Insert Into] table=[person] Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name Values: (Int64(1), Utf8("Alan"), Utf8("Turing")) "# @@ -342,7 +342,7 @@ Dml: op=[Insert] table=[person] fn plan_insert_no_target_columns() { let sql = "INSERT INTO test_decimal VALUES (1, 2), (3, 4)"; let plan = r#" -Dml: op=[Insert] table=[test_decimal] +Dml: op=[Insert Into] table=[test_decimal] Projection: CAST(column1 AS Int32) AS id, CAST(column2 AS Decimal128(10, 2)) AS price Values: (Int64(1), Int64(2)), (Int64(3), Int64(4)) "# @@ -3880,7 +3880,7 @@ fn test_prepare_statement_insert_infer() { let sql = "insert into person (id, first_name, last_name) values ($1, $2, $3)"; let expected_plan = r#" -Dml: op=[Insert] table=[person] +Dml: op=[Insert Into] table=[person] Projection: column1 AS id, column2 AS first_name, column3 AS last_name Values: ($1, $2, $3) "# @@ -3904,7 +3904,7 @@ Dml: op=[Insert] table=[person] ScalarValue::Utf8(Some("Turing".to_string())), ]; let expected_plan = r#" -Dml: op=[Insert] table=[person] +Dml: op=[Insert Into] table=[person] Projection: column1 AS id, column2 AS first_name, column3 AS last_name Values: (UInt32(1), Utf8("Alan"), Utf8("Turing")) "# From 41698bb8ef1f562d27091fe324ef01a6bee7a268 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 4 Aug 2023 08:48:39 -0400 Subject: [PATCH 08/10] DataFrameOptions and clean up --- datafusion/core/src/dataframe.rs | 32 +++++++- .../core/src/datasource/file_format/csv.rs | 9 ++- .../src/datasource/file_format/options.rs | 10 ++- datafusion/core/src/datasource/listing/mod.rs | 4 +- .../core/src/datasource/listing/table.rs | 80 ++++++++++--------- datafusion/core/src/datasource/memory.rs | 10 ++- .../core/src/datasource/physical_plan/mod.rs | 2 +- datafusion/core/src/datasource/provider.rs | 1 - datafusion/core/src/physical_plan/insert.rs | 7 +- datafusion/core/src/physical_planner.rs | 1 - datafusion/expr/src/logical_plan/builder.rs | 13 +-- datafusion/sql/src/statement.rs | 14 ++-- 12 files changed, 115 insertions(+), 68 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 03432c3e12bc4..8d4ad6cc504da 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -25,7 +25,6 @@ use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; use datafusion_common::{DataFusionError, SchemaError}; -use futures::StreamExt; use parquet::file::properties::WriterProperties; use datafusion_common::{Column, DFSchema, ScalarValue}; @@ -54,6 +53,33 @@ use crate::physical_plan::{collect, collect_partitioned}; use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan}; use crate::prelude::SessionContext; +/// Contains options that control how data is +/// written out from a DataFrame +pub struct DataFrameWriteOptions { + /// Controls if existing data should be overwritten + overwrite: bool, // TODO, enable DataFrame COPY TO write without TableProvider + // settings such as LOCATION and FILETYPE can be set here + // e.g. add location: Option +} + +impl DataFrameWriteOptions { + /// Create a new DataFrameWriteOptions with default values + pub fn new() -> Self { + DataFrameWriteOptions { overwrite: false } + } + /// Set the overwrite option to true or false + pub fn with_overwrite(mut self, overwrite: bool) -> Self { + self.overwrite = overwrite; + self + } +} + +impl Default for DataFrameWriteOptions { + fn default() -> Self { + Self::new() + } +} + /// DataFrame represents a logical set of rows with the same named columns. /// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or /// [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html) @@ -934,14 +960,14 @@ impl DataFrame { pub async fn write_table( self, table_name: &str, - overwrite: bool, + write_options: DataFrameWriteOptions, ) -> Result, DataFusionError> { let arrow_schema = Schema::from(self.schema()); let plan = LogicalPlanBuilder::insert_into( self.plan, table_name.to_owned(), &arrow_schema, - overwrite, + write_options.overwrite, )? .build()?; DataFrame::new(self.session_state, plan).collect().await diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index f7d677891e98e..cbcdc2f112b0d 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -27,7 +27,6 @@ use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::{self, datatypes::SchemaRef}; use arrow_array::RecordBatch; -use chrono::{DateTime, NaiveDate, Utc}; use datafusion_common::DataFusionError; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; @@ -270,8 +269,10 @@ impl FileFormat for CsvFormat { _state: &SessionState, conf: FileSinkConfig, ) -> Result> { - if conf.overwrite{ - return Err(DataFusionError::NotImplemented("Overwrites are not implemented yet for CSV".into())) + if conf.overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrites are not implemented yet for CSV".into(), + )); } let sink_schema = conf.output_schema().clone(); let sink = Arc::new(CsvSink::new( @@ -639,6 +640,8 @@ impl DataSink for CsvSink { // Map errors to DatafusionError. let err_converter = |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); + // TODO parallelize serialization accross partitions and batches within partitions + // see: https://github.com/apache/arrow-datafusion/issues/7079 for idx in 0..num_partitions { while let Some(maybe_batch) = data[idx].next().await { // Write data to files in a round robin fashion: diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index ed5599e8d8a3c..a907c06b36b26 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -30,7 +30,7 @@ use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; -use crate::datasource::listing::{ListingTableUrl, ListingTableInsertMode}; +use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl}; use crate::datasource::{ file_format::{ avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, @@ -179,12 +179,14 @@ impl<'a> CsvReadOptions<'a> { self } - pub fn file_sort_order(mut self, file_sort_order: Vec>)->Self{ + /// Configure if file has known sort order + pub fn file_sort_order(mut self, file_sort_order: Vec>) -> Self { self.file_sort_order = file_sort_order; self } - pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self{ + /// Configure how insertions to this table should be handled + pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { self.insert_mode = insert_mode; self } @@ -195,7 +197,7 @@ impl<'a> CsvReadOptions<'a> { /// Note this structure is supplied when a datasource is created and /// can not not vary from statement to statement. For settings that /// can vary statement to statement see -/// [`ConfigOptions`](crate::config::ConfigO ptions). +/// [`ConfigOptions`](crate::config::ConfigOptions). #[derive(Clone)] pub struct ParquetReadOptions<'a> { /// File extension; only files with this extension are selected for data input. diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 915a5de9b428e..8b0f021f02777 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -31,7 +31,9 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; -pub use table::{ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode}; +pub use table::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode, +}; /// Stream of files get listed from object store pub type PartitionedFileStream = diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 867936f42f6df..4085dac484045 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -444,7 +444,8 @@ impl ListingOptions { self } - pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self{ + /// Configure how insertions to this table should be handled. + pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { self.insert_mode = insert_mode; self } @@ -823,17 +824,27 @@ impl TableProvider for ListingTable { //we can append to that file. Otherwise, we can write new files into the directory //adding new files to the listing table in order to insert to the table. let input_partitions = input.output_partitioning().partition_count(); - match self.options.insert_mode{ + match self.options.insert_mode { ListingTableInsertMode::AppendToFile => { - if input_partitions > file_groups.len(){ - return Err(DataFusionError::Plan(format!("Cannot append {input_partitions} partitions to {} files!", file_groups.len()))) + if input_partitions > file_groups.len() { + return Err(DataFusionError::Plan(format!( + "Cannot append {input_partitions} partitions to {} files!", + file_groups.len() + ))); } writer_mode = crate::datasource::file_format::FileWriterMode::Append; - }, - ListingTableInsertMode::AppendNewFiles => writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart, - ListingTableInsertMode::Error => return Err(DataFusionError::Plan("Invalid plan attempting write to table with TableWriteMode::Error!".into())), + } + ListingTableInsertMode::AppendNewFiles => { + writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart + } + ListingTableInsertMode::Error => { + return Err(DataFusionError::Plan( + "Invalid plan attempting write to table with TableWriteMode::Error!" + .into(), + )) + } } - + // Sink related option, apart from format let config = FileSinkConfig { object_store_url: self.table_paths()[0].object_store(), @@ -1432,8 +1443,8 @@ mod tests { let table_path = ListingTableUrl::parse(temp_path).unwrap(); let file_format = CsvFormat::default(); - let listing_options = ListingOptions::new(Arc::new(file_format)) - .with_insert_mode(insert_mode); + let listing_options = + ListingOptions::new(Arc::new(file_format)).with_insert_mode(insert_mode); let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) @@ -1583,8 +1594,11 @@ mod tests { let tmp_dir = TempDir::new()?; let path = tmp_dir.path().join(filename); - let initial_table = - load_empty_schema_csv_table(schema.clone(), path.to_str().unwrap(), ListingTableInsertMode::AppendToFile)?; + let initial_table = load_empty_schema_csv_table( + schema.clone(), + path.to_str().unwrap(), + ListingTableInsertMode::AppendToFile, + )?; session_ctx.register_table("t", initial_table)?; // Create and register the source table with the provided schema and inserted data let source_table = Arc::new(MemTable::try_new( @@ -1712,9 +1726,6 @@ mod tests { #[tokio::test] async fn test_append_new_files_to_csv_table() -> Result<()> { - let file_type = FileType::CSV; - let file_compression_type = FileCompressionType::UNCOMPRESSED; - // Create the initial context, schema, and batch. let session_ctx = SessionContext::new(); // Create a new schema with one field called "a" of type Int32 @@ -1730,28 +1741,17 @@ mod tests { vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))], )?; - // Filename with extension - let filename = format!( - "path{}", - file_type - .to_owned() - .get_ext_with_compression(file_compression_type.clone()) - .unwrap() - ); - - // Define batch size for file reader - let batch_size = batch.num_rows(); - // Create a temporary directory and a CSV file within it. let tmp_dir = TempDir::new()?; - let path = tmp_dir.path().join(filename); - - session_ctx.register_csv("t", tmp_dir.path().to_str().unwrap(), - CsvReadOptions::new() - .insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref())) + session_ctx + .register_csv( + "t", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new() + .insert_mode(ListingTableInsertMode::AppendNewFiles) + .schema(schema.as_ref()), + ) .await?; - let csv_table = session_ctx.table_provider("t").await?; // Create and register the source table with the provided schema and inserted data let source_table = Arc::new(MemTable::try_new( schema.clone(), @@ -1788,7 +1788,11 @@ mod tests { assert_batches_eq!(expected, &res); // Read the records in the table - let batches = session_ctx.sql("select count(*) from t").await?.collect().await?; + let batches = session_ctx + .sql("select count(*) from t") + .await? + .collect() + .await?; let expected = vec![ "+----------+", "| COUNT(*) |", @@ -1825,7 +1829,11 @@ mod tests { assert_batches_eq!(expected, &res); // Read the contents of the table - let batches = session_ctx.sql("select count(*) from t").await?.collect().await?; + let batches = session_ctx + .sql("select count(*) from t") + .await? + .collect() + .await?; // Define the expected result after the second append. let expected = vec![ diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 893e4f651e5d2..0441ac70585d1 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -213,6 +213,11 @@ impl TableProvider for MemTable { "Inserting query must have the same schema with the table." ); } + if overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrite not implemented for MemoryTable yet".into(), + )); + } let sink = Arc::new(MemSink::new(self.batches.clone())); Ok(Arc::new(InsertExec::new(input, sink, self.schema.clone()))) } @@ -264,8 +269,9 @@ impl DataSink for MemSink { let mut i = 0; let mut row_count = 0; let num_parts = data.len(); - for idx in 0..num_parts { - while let Some(batch) = data[idx].next().await.transpose()? { + // TODO parallelize outer and inner loops + for data_part in data.iter_mut().take(num_parts) { + while let Some(batch) = data_part.next().await.transpose()? { row_count += batch.num_rows(); new_batches[i].push(batch); i = (i + 1) % num_partitions; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 3a9907ec86f40..a9ca6fc90a6b8 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -333,7 +333,7 @@ pub struct FileSinkConfig { /// A writer mode that determines how data is written to the file pub writer_mode: FileWriterMode, /// Controls whether existing data should be overwritten by this sink - pub overwrite: bool + pub overwrite: bool, } impl FileSinkConfig { diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 7c41e2bd380c7..6a81f8969611e 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -132,7 +132,6 @@ pub trait TableProvider: Sync + Send { let msg = "Insert into not implemented for this table".to_owned(); Err(DataFusionError::NotImplemented(msg)) } - } /// A factory which creates [`TableProvider`]s at runtime given a URL. diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index 67fd44a8cf179..11fee23d3b3e6 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -36,7 +36,6 @@ use std::fmt::Debug; use std::sync::Arc; use crate::physical_plan::stream::RecordBatchStreamAdapter; -use crate::physical_plan::Distribution; use datafusion_common::DataFusionError; use datafusion_execution::TaskContext; @@ -232,8 +231,10 @@ impl ExecutionPlan for InsertExec { partition: usize, context: Arc, ) -> Result { - if partition!=0{ - return Err(DataFusionError::Internal("InsertExec can only be called on partition 0!".into())) + if partition != 0 { + return Err(DataFusionError::Internal( + "InsertExec can only be called on partition 0!".into(), + )); } let data = self.make_all_input_streams(context.clone())?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0c887ca42fc7b..8f7ffb011df91 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -17,7 +17,6 @@ //! Planner for [`LogicalPlan`] to [`ExecutionPlan`] -use crate::datasource::listing::ListingTableInsertMode; use crate::datasource::source_as_provider; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_expr::utils::generate_sort_key; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 040e47d4e938d..176459db149af 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -239,12 +239,13 @@ impl LogicalPlanBuilder { overwrite: bool, ) -> Result { let table_schema = table_schema.clone().to_dfschema_ref()?; - let op; - if overwrite{ - op = WriteOp::InsertOverwrite; - } else{ - op = WriteOp::InsertInto; - } + + let op = if overwrite { + WriteOp::InsertOverwrite + } else { + WriteOp::InsertInto + }; + Ok(Self::from(LogicalPlan::Dml(DmlStatement { table_name: table_name.into(), table_schema, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 32b2059f9f8d2..ad66640efa14c 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1024,13 +1024,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }) .collect::>>()?; let source = project(source, exprs)?; - - let op; - if overwrite{ - op = WriteOp::InsertOverwrite - } else{ - op = WriteOp::InsertInto - } + + let op = if overwrite { + WriteOp::InsertOverwrite + } else { + WriteOp::InsertInto + }; + let plan = LogicalPlan::Dml(DmlStatement { table_name, table_schema: Arc::new(table_schema), From a4157b111d4ddd011b90dcfa2cad160cbbaf150a Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 4 Aug 2023 08:51:52 -0400 Subject: [PATCH 09/10] rename to execute streams for insertexec --- datafusion/core/src/physical_plan/insert.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs index 11fee23d3b3e6..622e33b117fd4 100644 --- a/datafusion/core/src/physical_plan/insert.rs +++ b/datafusion/core/src/physical_plan/insert.rs @@ -96,7 +96,7 @@ impl InsertExec { } } - fn make_input_stream( + fn execute_input_stream( &self, partition: usize, context: Arc, @@ -136,14 +136,14 @@ impl InsertExec { } } - fn make_all_input_streams( + fn execute_all_input_streams( &self, context: Arc, ) -> Result> { let n_input_parts = self.input.output_partitioning().partition_count(); let mut streams = Vec::with_capacity(n_input_parts); for part in 0..n_input_parts { - streams.push(self.make_input_stream(part, context.clone())?); + streams.push(self.execute_input_stream(part, context.clone())?); } Ok(streams) } @@ -236,7 +236,7 @@ impl ExecutionPlan for InsertExec { "InsertExec can only be called on partition 0!".into(), )); } - let data = self.make_all_input_streams(context.clone())?; + let data = self.execute_all_input_streams(context.clone())?; let count_schema = self.count_schema.clone(); let sink = self.sink.clone(); From 603a4e3201676808ae9f263bdc34863dddd1223d Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 4 Aug 2023 16:47:40 -0400 Subject: [PATCH 10/10] CsvReadOptions sort_order connect to ListingOptions --- datafusion/core/src/datasource/file_format/options.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index a907c06b36b26..b8499065bd69c 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -480,8 +480,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_file_extension(self.file_extension) .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) - // TODO: Add file sort order into CsvReadOptions and introduce here. - .with_file_sort_order(vec![]) + .with_file_sort_order(self.file_sort_order.clone()) .with_infinite_source(self.infinite) .with_insert_mode(self.insert_mode.clone()) }