Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 92 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ mod tests {
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::test_util::scan_format;
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::listing::{FileRange, ListingOptions, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
use crate::physical_plan::displayable;
Expand All @@ -769,8 +769,8 @@ mod tests {
};
use arrow_array::Date64Array;
use chrono::{TimeZone, Utc};
use datafusion_common::ScalarValue;
use datafusion_common::{assert_contains, ToDFSchema};
use datafusion_common::{FileType, GetExt, ScalarValue};
use datafusion_expr::{col, lit, when, Expr};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -1938,6 +1938,96 @@ mod tests {
Ok(schema)
}

#[tokio::test]
async fn write_table_results() -> Result<()> {
// create partitioned input file and context
let tmp_dir = TempDir::new()?;
// let mut ctx = create_ctx(&tmp_dir, 4).await?;
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_target_partitions(8),
);
let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
// register csv file with the execution context
ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().schema(&schema),
)
.await?;

// register a local file system object store for /tmp directory
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);

// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext());

// execute a simple query and write the results to parquet
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
std::fs::create_dir(&out_dir).unwrap();
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
let schema: Schema = df.schema().into();
// Register a listing table - this will use all files in the directory as data sources
// for the query
ctx.register_listing_table(
"my_table",
&out_dir,
listing_options,
Some(Arc::new(schema)),
None,
)
.await
.unwrap();
df.write_table("my_table", DataFrameWriteOptions::new())
.await?;

// create a new context and verify that the results were saved to a partitioned parquet file
let ctx = SessionContext::new();

// get write_id
let mut paths = fs::read_dir(&out_dir).unwrap();
let path = paths.next();
let name = path
.unwrap()?
.path()
.file_name()
.expect("Should be a file name")
.to_str()
.expect("Should be a str")
.to_owned();
let (parsed_id, _) = name.split_once('_').expect("File should contain _ !");
let write_id = parsed_id.to_owned();

// register each partition as well as the top level dir
ctx.register_parquet(
"part0",
&format!("{out_dir}/{write_id}_0.parquet"),
ParquetReadOptions::default(),
)
.await?;

ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default())
.await?;

let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
let allparts = ctx
.sql("SELECT c1, c2 FROM allparts")
.await?
.collect()
.await?;

let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();

assert_eq!(part0[0].schema(), allparts[0].schema());

assert_eq!(allparts_count, 40);

Ok(())
}

#[tokio::test]
async fn write_parquet_results() -> Result<()> {
// create partitioned input file and context
Expand Down Expand Up @@ -1982,7 +2072,6 @@ mod tests {
.to_str()
.expect("Should be a str")
.to_owned();
println!("{name}");
let (parsed_id, _) = name.split_once('_').expect("File should contain _ !");
let write_id = parsed_id.to_owned();

Expand Down