Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions vortex-bench/src/datasets/tpch_l_comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::path::PathBuf;

use anyhow::Result;
use async_trait::async_trait;
use futures::StreamExt;
use futures::TryStreamExt;
use glob::glob;
use vortex::array::ArrayRef;
Expand Down Expand Up @@ -77,12 +78,11 @@ impl Dataset for TPCHLCommentChunked {
let file_chunks: Vec<_> = file
.scan()?
.with_projection(pack(vec![("l_comment", col("l_comment"))], NonNullable))
.map(|a| {
.into_stream()?
.map(|result| {
#[expect(deprecated)]
let canonical = a.to_canonical()?;
Ok(canonical.into_array())
result.and_then(|a| a.to_canonical().map(|canonical| canonical.into_array()))
})
.into_array_stream()?
.try_collect()
.await?;
chunks.extend(file_chunks);
Expand Down
18 changes: 2 additions & 16 deletions vortex-cxx/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,11 @@
use std::sync::Arc;

use anyhow::Result;
use arrow_array::RecordBatch;
use arrow_array::RecordBatchReader;
use arrow_array::cast::AsArray;
use arrow_array::ffi::FFI_ArrowSchema;
use arrow_array::ffi_stream::FFI_ArrowArrayStream;
use arrow_schema::ArrowError;
use arrow_schema::DataType;
use arrow_schema::Schema;
use arrow_schema::SchemaRef;
use futures::stream::TryStreamExt;
use vortex::array::ArrayRef;
use vortex::array::arrow::IntoArrowArray;
use vortex::buffer::Buffer;
use vortex::file::OpenOptionsSessionExt;
use vortex::io::runtime::BlockingRuntime;
Expand Down Expand Up @@ -58,7 +51,7 @@ pub(crate) fn open_file_from_buffer(data: &[u8]) -> Result<Box<VortexFile>> {
}

pub(crate) struct VortexScanBuilder {
inner: ScanBuilder<ArrayRef>,
inner: ScanBuilder,
output_schema: Option<SchemaRef>,
}

Expand Down Expand Up @@ -158,16 +151,9 @@ pub(crate) fn scan_builder_into_threadsafe_cloneable_reader(
Arc::new(arrow_schema)
}
};
let data_type = DataType::Struct(schema.fields().clone());

let stream = builder
.inner
.map(move |b| {
b.into_arrow(&data_type)
.map(|struct_array| RecordBatch::from(struct_array.as_struct()))
})
.into_stream()?
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
.into_record_batch_stream(Arc::clone(&schema))?;

let iter = RUNTIME.block_on_stream_thread_safe(|_h| stream);
let rbr = RecordBatchIteratorAdapter::new(iter, schema);
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub struct vortex_datafusion::VortexAccessPlan

impl vortex_datafusion::VortexAccessPlan

pub fn vortex_datafusion::VortexAccessPlan::apply_to_builder<A>(&self, scan_builder: vortex_layout::scan::scan_builder::ScanBuilder<A>) -> vortex_layout::scan::scan_builder::ScanBuilder<A> where A: 'static + core::marker::Send
pub fn vortex_datafusion::VortexAccessPlan::apply_to_builder(&self, scan_builder: vortex_layout::scan::scan_builder::ScanBuilder) -> vortex_layout::scan::scan_builder::ScanBuilder

pub fn vortex_datafusion::VortexAccessPlan::selection(&self) -> core::option::Option<&vortex_scan::selection::Selection>

Expand Down
5 changes: 1 addition & 4 deletions vortex-datafusion/src/persistent/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ impl VortexAccessPlan {
///
/// This is used internally by the file opener after it has translated a
/// `PartitionedFile` into a Vortex scan.
pub fn apply_to_builder<A>(&self, mut scan_builder: ScanBuilder<A>) -> ScanBuilder<A>
where
A: 'static + Send,
{
pub fn apply_to_builder(&self, mut scan_builder: ScanBuilder) -> ScanBuilder {
let Self { selection } = self;

if let Some(selection) = selection {
Expand Down
82 changes: 50 additions & 32 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use arrow_schema::Schema;
use datafusion_common::DataFusionError;
use datafusion_common::Result as DFResult;
use datafusion_common::ScalarValue;
use datafusion_common::arrow::array::RecordBatch;
use datafusion_common::exec_datafusion_err;
use datafusion_datasource::FileRange;
use datafusion_datasource::PartitionedFile;
Expand All @@ -32,12 +33,12 @@ use futures::TryStreamExt;
use futures::stream;
use object_store::path::Path;
use tracing::Instrument;
use vortex::array::ArrayRef;
use vortex::array::VortexSessionExecute;
use vortex::array::arrow::ArrowArrayExecutor;
use vortex::error::VortexError;
use vortex::file::OpenOptionsSessionExt;
use vortex::io::InstrumentedReadAt;
use vortex::io::session::RuntimeSessionExt;
use vortex::layout::LayoutReader;
use vortex::layout::scan::scan_builder::ScanBuilder;
use vortex::metrics::Label;
Expand Down Expand Up @@ -362,44 +363,33 @@ impl FileOpener for VortexOpener {
scan_builder = scan_builder.with_concurrency(concurrency);
}

let stream_schema = Arc::new(stream_schema);
let handle = session.handle();
let file_location = file.object_meta.location.clone();

let stream = scan_builder
.with_metrics_registry(metrics_registry)
.with_projection(scan_projection)
.with_some_filter(filter)
.with_ordered(has_output_ordering)
.map(move |chunk| {
let mut ctx = session.create_execution_ctx();
chunk.execute_record_batch(&stream_schema, &mut ctx)
})
.into_stream()
.map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))?
.map_ok(move |rb| {
// We try and slice the stream into respecting datafusion's configured batch size.
stream::iter(
(0..rb.num_rows().div_ceil(batch_size * 2))
.flat_map(move |block_idx| {
let offset = block_idx * batch_size * 2;

// If we have less than two batches worth of rows left, we keep them together as a single batch.
if rb.num_rows() - offset < 2 * batch_size {
let length = rb.num_rows() - offset;
[Some(rb.slice(offset, length)), None].into_iter()
} else {
let first = rb.slice(offset, batch_size);
let second = rb.slice(offset + batch_size, batch_size);
[Some(first), Some(second)].into_iter()
}
})
.flatten()
.map(Ok),
)
.map(move |chunk| {
let session = session.clone();
let stream_schema = Arc::clone(&stream_schema);
let handle = handle.clone();
handle.spawn_blocking(move || {
let mut ctx = session.create_execution_ctx();
chunk.and_then(|chunk| {
chunk.execute_record_batch(stream_schema.as_ref(), &mut ctx)
})
})
})
.map_err(move |e: VortexError| {
DataFusionError::External(Box::new(e.with_context(format!(
"Failed to read Vortex file: {}",
file.object_meta.location
))))
.buffered(2)
.map_ok(move |rb| {
stream::iter(split_record_batch(rb, batch_size).into_iter().map(Ok))
})
.map_err(move |e: VortexError| vortex_file_read_error(&file_location, e))
.try_flatten()
.map(move |batch| {
if projector.projection().as_ref().is_empty() {
Expand All @@ -426,8 +416,8 @@ fn apply_byte_range(
file_range: FileRange,
total_size: u64,
row_count: u64,
scan_builder: ScanBuilder<ArrayRef>,
) -> ScanBuilder<ArrayRef> {
scan_builder: ScanBuilder,
) -> ScanBuilder {
let row_range = byte_range_to_row_range(
file_range.start as u64..file_range.end as u64,
row_count,
Expand All @@ -450,6 +440,33 @@ fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u
start_row..u64::min(row_count, end_row)
}

fn split_record_batch(rb: RecordBatch, batch_size: usize) -> Vec<RecordBatch> {
assert!(batch_size > 0, "batch size must be positive");

let mut batches = Vec::new();
let mut offset = 0;

while offset < rb.num_rows() {
let remaining = rb.num_rows() - offset;
if remaining < 2 * batch_size {
batches.push(rb.slice(offset, remaining));
break;
}

batches.push(rb.slice(offset, batch_size));
batches.push(rb.slice(offset + batch_size, batch_size));
offset += batch_size * 2;
}

batches
}

fn vortex_file_read_error(path: &Path, error: VortexError) -> DataFusionError {
DataFusionError::External(Box::new(
error.with_context(format!("Failed to read Vortex file: {path}")),
))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand All @@ -476,6 +493,7 @@ mod tests {
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions as df_expr;
use datafusion_physical_expr::projection::ProjectionExpr;
use futures::TryStreamExt;
use insta::assert_snapshot;
use itertools::Itertools;
use object_store::ObjectStore;
Expand Down
10 changes: 7 additions & 3 deletions vortex-datafusion/src/persistent/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use futures::stream::BoxStream;
/// Utility to end a stream early if its backing [`PartitionFile`] can be pruned away by an updated dynamic expression.
pub(crate) struct PrunableStream {
file_pruner: FilePruner,
stream: BoxStream<'static, DFResult<RecordBatch>>,
stream: Option<BoxStream<'static, DFResult<RecordBatch>>>,
}

impl PrunableStream {
pub fn new(file_pruner: FilePruner, stream: BoxStream<'static, DFResult<RecordBatch>>) -> Self {
Self {
file_pruner,
stream,
stream: Some(stream),
}
}
}
Expand All @@ -32,9 +32,13 @@ impl Stream for PrunableStream {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.as_mut().file_pruner.should_prune()? {
self.stream.take();
Poll::Ready(None)
} else {
self.stream.poll_next_unpin(cx)
match self.stream.as_mut() {
Some(stream) => stream.poll_next_unpin(cx),
None => Poll::Ready(None),
}
}
}
}
5 changes: 3 additions & 2 deletions vortex-datafusion/src/v2/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,12 +432,13 @@ impl DataSource for VortexDataSource {
.map(move |result| {
let session = session.clone();
let schema = Arc::clone(&projected_schema);
handle.spawn_cpu(move || {
let handle = handle.clone();
handle.spawn_blocking(move || {
let mut ctx = session.create_execution_ctx();
result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx))
})
})
.buffered(num_partitions.get())
.buffered(2)
.map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));

// Apply leftover projection (expressions that couldn't be pushed into Vortex).
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ pub fn vortex_file::VortexFile::layout_reader(&self) -> vortex_error::VortexResu

pub fn vortex_file::VortexFile::row_count(&self) -> u64

pub fn vortex_file::VortexFile::scan(&self) -> vortex_error::VortexResult<vortex_layout::scan::scan_builder::ScanBuilder<vortex_array::array::erased::ArrayRef>>
pub fn vortex_file::VortexFile::scan(&self) -> vortex_error::VortexResult<vortex_layout::scan::scan_builder::ScanBuilder>

pub fn vortex_file::VortexFile::segment_source(&self) -> alloc::sync::Arc<dyn vortex_layout::segments::source::SegmentSource>

Expand Down
3 changes: 1 addition & 2 deletions vortex-file/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::ops::Range;
use std::sync::Arc;

use itertools::Itertools;
use vortex_array::ArrayRef;
use vortex_array::Columnar;
use vortex_array::VortexSessionExecute;
use vortex_array::dtype::DType;
Expand Down Expand Up @@ -110,7 +109,7 @@ impl VortexFile {
}

/// Initiate a scan of the file, returning a builder for configuring the scan.
pub fn scan(&self) -> VortexResult<ScanBuilder<ArrayRef>> {
pub fn scan(&self) -> VortexResult<ScanBuilder> {
Ok(ScanBuilder::new(
self.session.clone(),
self.layout_reader()?,
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ async fn write_nullable_top_level_struct() {

async fn round_trip(
array: &ArrayRef,
f: impl Fn(ScanBuilder<ArrayRef>) -> VortexResult<ScanBuilder<ArrayRef>>,
f: impl Fn(ScanBuilder) -> VortexResult<ScanBuilder>,
) -> VortexResult<ArrayRef> {
let mut writer = vec![];
SESSION
Expand Down
Loading
Loading