Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 105 additions & 28 deletions bindings/c/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ unsafe fn free_table_wrapper<T>(ptr: *mut T, get_inner: impl FnOnce(&T) -> *mut
}
}

// Helper to box a ReadBuilderState and return a raw pointer.
unsafe fn box_read_builder_state(state: ReadBuilderState) -> *mut paimon_read_builder {
let inner = Box::into_raw(Box::new(state)) as *mut c_void;
Box::into_raw(Box::new(paimon_read_builder { inner }))
}

// Helper to box a TableReadState and return a raw pointer.
unsafe fn box_table_read_state(state: TableReadState) -> *mut paimon_table_read {
let inner = Box::into_raw(Box::new(state)) as *mut c_void;
Box::into_raw(Box::new(paimon_table_read { inner }))
}

// ======================= Table ===============================

/// Free a paimon_table.
Expand All @@ -74,8 +86,12 @@ pub unsafe extern "C" fn paimon_table_new_read_builder(
};
}
let table_ref = &*((*table).inner as *const Table);
let state = ReadBuilderState {
table: table_ref.clone(),
projected_columns: None,
};
paimon_result_read_builder {
read_builder: box_table_wrapper(table_ref, |inner| paimon_read_builder { inner }),
read_builder: box_read_builder_state(state),
error: std::ptr::null_mut(),
}
}
Expand All @@ -88,7 +104,57 @@ pub unsafe extern "C" fn paimon_table_new_read_builder(
/// Only call with a read_builder returned from `paimon_table_new_read_builder`.
#[no_mangle]
pub unsafe extern "C" fn paimon_read_builder_free(rb: *mut paimon_read_builder) {
free_table_wrapper(rb, |r| r.inner);
if !rb.is_null() {
let wrapper = Box::from_raw(rb);
if !wrapper.inner.is_null() {
drop(Box::from_raw(wrapper.inner as *mut ReadBuilderState));
}
}
}

/// Set column projection for a ReadBuilder.
///
/// The `columns` parameter is a null-terminated array of null-terminated C strings.
/// Output order follows the caller-specified order. Unknown or duplicate names
/// cause `paimon_read_builder_new_read()` to fail; an empty list is a valid
/// zero-column projection.
///
/// # Safety
/// `rb` must be a valid pointer from `paimon_table_new_read_builder`, or null (returns error).
/// `columns` must be a null-terminated array of null-terminated C strings, or null for no projection.
#[no_mangle]
pub unsafe extern "C" fn paimon_read_builder_with_projection(
rb: *mut paimon_read_builder,
columns: *const *const std::ffi::c_char,
) -> *mut paimon_error {
if let Err(e) = check_non_null(rb, "rb") {
return e;
}

let state = &mut *((*rb).inner as *mut ReadBuilderState);

if columns.is_null() {
state.projected_columns = None;
return std::ptr::null_mut();
}

let mut col_names = Vec::new();
let mut ptr = columns;
while !(*ptr).is_null() {
let c_str = std::ffi::CStr::from_ptr(*ptr);
match c_str.to_str() {
Ok(s) => col_names.push(s.to_string()),
Err(e) => {
return paimon_error::from_paimon(paimon::Error::ConfigInvalid {
message: format!("Invalid UTF-8 in column name: {e}"),
});
}
}
ptr = ptr.add(1);
}

state.projected_columns = Some(col_names);
std::ptr::null_mut()
}

/// Create a new TableScan from a ReadBuilder.
Expand All @@ -105,9 +171,9 @@ pub unsafe extern "C" fn paimon_read_builder_new_scan(
error: e,
};
}
let table = &*((*rb).inner as *const Table);
let state = &*((*rb).inner as *const ReadBuilderState);
paimon_result_table_scan {
scan: box_table_wrapper(table, |inner| paimon_table_scan { inner }),
scan: box_table_wrapper(&state.table, |inner| paimon_table_scan { inner }),
error: std::ptr::null_mut(),
}
}
Expand All @@ -126,13 +192,23 @@ pub unsafe extern "C" fn paimon_read_builder_new_read(
error: e,
};
}
let table = &*((*rb).inner as *const Table);
let rb_rust = table.new_read_builder();
let state = &*((*rb).inner as *const ReadBuilderState);
let mut rb_rust = state.table.new_read_builder();

// Apply projection if set
if let Some(ref columns) = state.projected_columns {
let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();
rb_rust.with_projection(&col_refs);
}

match rb_rust.new_read() {
Ok(_) => {
let wrapper = box_table_wrapper(table, |inner| paimon_table_read { inner });
Ok(table_read) => {
let read_state = TableReadState {
table: state.table.clone(),
read_type: table_read.read_type().to_vec(),
};
paimon_result_new_read {
read: wrapper,
read: box_table_read_state(read_state),
error: std::ptr::null_mut(),
}
}
Expand Down Expand Up @@ -226,7 +302,12 @@ pub unsafe extern "C" fn paimon_plan_num_splits(plan: *const paimon_plan) -> usi
/// Only call with a read returned from `paimon_read_builder_new_read`.
#[no_mangle]
pub unsafe extern "C" fn paimon_table_read_free(read: *mut paimon_table_read) {
free_table_wrapper(read, |r| r.inner);
if !read.is_null() {
let wrapper = Box::from_raw(read);
if !wrapper.inner.is_null() {
drop(Box::from_raw(wrapper.inner as *mut TableReadState));
}
}
}

/// Read table data as Arrow record batches via a streaming reader.
Expand Down Expand Up @@ -261,31 +342,27 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow(
};
}

let table = &*((*read).inner as *const Table);
let state = &*((*read).inner as *const TableReadState);
let plan_ref = &*((*plan).inner as *const Plan);
let all_splits = plan_ref.splits();
let start = offset.min(all_splits.len());
let end = (offset.saturating_add(length)).min(all_splits.len());
let selected = &all_splits[start..end];

let rb = table.new_read_builder();
match rb.new_read() {
Ok(table_read) => match table_read.to_arrow(selected) {
Ok(stream) => {
let reader = Box::new(stream);
let wrapper = Box::new(paimon_record_batch_reader {
inner: Box::into_raw(reader) as *mut c_void,
});
paimon_result_record_batch_reader {
reader: Box::into_raw(wrapper),
error: std::ptr::null_mut(),
}
// Create TableRead with the stored read_type (projection)
let table_read = paimon::table::TableRead::new(&state.table, state.read_type.clone());

match table_read.to_arrow(selected) {
Ok(stream) => {
let reader = Box::new(stream);
let wrapper = Box::new(paimon_record_batch_reader {
inner: Box::into_raw(reader) as *mut c_void,
});
paimon_result_record_batch_reader {
reader: Box::into_raw(wrapper),
error: std::ptr::null_mut(),
}
Err(e) => paimon_result_record_batch_reader {
reader: std::ptr::null_mut(),
error: paimon_error::from_paimon(e),
},
},
}
Err(e) => paimon_result_record_batch_reader {
reader: std::ptr::null_mut(),
error: paimon_error::from_paimon(e),
Expand Down
15 changes: 15 additions & 0 deletions bindings/c/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

use std::ffi::c_void;

use paimon::spec::DataField;
use paimon::table::Table;

/// C-compatible byte buffer.
#[repr(C)]
#[derive(Clone, Copy)]
Expand Down Expand Up @@ -68,6 +71,12 @@ pub struct paimon_read_builder {
pub inner: *mut c_void,
}

/// Internal state for ReadBuilder that stores table and projection columns.
pub(crate) struct ReadBuilderState {
pub table: Table,
pub projected_columns: Option<Vec<String>>,
}

#[repr(C)]
pub struct paimon_table_scan {
pub inner: *mut c_void,
Expand All @@ -78,6 +87,12 @@ pub struct paimon_table_read {
pub inner: *mut c_void,
}

/// Internal state for TableRead that stores table and projected read type.
pub(crate) struct TableReadState {
pub table: Table,
pub read_type: Vec<DataField>,
}

#[repr(C)]
pub struct paimon_plan {
pub inner: *mut c_void,
Expand Down
51 changes: 51 additions & 0 deletions bindings/go/read_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package paimon

import (
"context"
"runtime"
"sync"
"unsafe"

Expand All @@ -44,6 +45,17 @@ func (rb *ReadBuilder) Close() {
})
}

// WithProjection sets column projection by name. Output order follows the
// caller-specified order. Unknown or duplicate names cause NewRead() to fail;
// an empty list is a valid zero-column projection.
func (rb *ReadBuilder) WithProjection(columns []string) error {
if rb.inner == nil {
return ErrClosed
}
projFn := ffiReadBuilderWithProjection.symbol(rb.ctx)
return projFn(rb.inner, columns)
}

// NewScan creates a TableScan for planning which data files to read.
func (rb *ReadBuilder) NewScan() (*TableScan, error) {
if rb.inner == nil {
Expand Down Expand Up @@ -85,6 +97,45 @@ var ffiReadBuilderFree = newFFI(ffiOpts{
}
})

var ffiReadBuilderWithProjection = newFFI(ffiOpts{
sym: "paimon_read_builder_with_projection",
rType: &ffi.TypePointer,
aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer},
}, func(ctx context.Context, ffiCall ffiCall) func(rb *paimonReadBuilder, columns []string) error {
return func(rb *paimonReadBuilder, columns []string) error {
var colPtrs []*byte
var cStrings [][]byte

// Convert Go strings to null-terminated C strings
for _, col := range columns {
cStr := append([]byte(col), 0)
cStrings = append(cStrings, cStr)
colPtrs = append(colPtrs, &cStr[0])
}
// Null-terminate the array
colPtrs = append(colPtrs, nil)

var colsPtr unsafe.Pointer
if len(colPtrs) > 0 {
colsPtr = unsafe.Pointer(&colPtrs[0])
}

var errPtr *paimonError
ffiCall(
unsafe.Pointer(&errPtr),
unsafe.Pointer(&rb),
unsafe.Pointer(&colsPtr),
)
// Ensure Go-managed buffers stay alive for the full native call.
runtime.KeepAlive(cStrings)
runtime.KeepAlive(colPtrs)
if errPtr != nil {
return parseError(ctx, errPtr)
}
return nil
}
})

var ffiReadBuilderNewScan = newFFI(ffiOpts{
sym: "paimon_read_builder_new_scan",
rType: &typeResultTableScan,
Expand Down
Loading
Loading