Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 308 additions & 1 deletion crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures::TryStreamExt;
use paimon::api::ConfigResponse;
use paimon::catalog::{Identifier, RESTCatalog};
use paimon::common::Options;
use paimon::spec::{DataType, IntType, Schema, VarCharType};
use paimon::spec::{DataType, IntType, Predicate, Schema, VarCharType};
use paimon::{Catalog, Error, FileSystemCatalog, Plan};
use std::collections::{HashMap, HashSet};

Expand Down Expand Up @@ -88,6 +88,27 @@ async fn scan_and_read_with_fs_catalog(
scan_and_read(&catalog, table_name, projection).await
}

async fn scan_and_read_with_filter(
table: &paimon::Table,
filter: Predicate,
) -> (Plan, Vec<RecordBatch>) {
let mut read_builder = table.new_read_builder();
read_builder.with_filter(filter);
let scan = read_builder.new_scan();
let plan = scan.plan().await.expect("Failed to plan scan");

let read = read_builder.new_read().expect("Failed to create read");
let stream = read
.to_arrow(plan.splits())
.expect("Failed to create arrow stream");
let batches: Vec<_> = stream
.try_collect()
.await
.expect("Failed to collect batches");

(plan, batches)
}

fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
let mut rows = Vec::new();
for batch in batches {
Expand All @@ -107,6 +128,55 @@ fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
rows
}

fn extract_id_name_dt(batches: &[RecordBatch]) -> Vec<(i32, String, String)> {
let mut rows = Vec::new();
for batch in batches {
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
let name = batch
.column_by_name("name")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("name");
let dt = batch
.column_by_name("dt")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("dt");
for i in 0..batch.num_rows() {
rows.push((id.value(i), name.value(i).into(), dt.value(i).into()));
}
}
rows.sort_by_key(|(id, _, _)| *id);
rows
}

fn extract_plan_partitions(plan: &Plan) -> HashSet<String> {
plan.splits()
.iter()
.map(|split| {
split
.partition()
.get_string(0)
.expect("Failed to decode dt")
.to_string()
})
.collect()
}

fn extract_plan_multi_partitions(plan: &Plan) -> HashSet<(String, i32)> {
plan.splits()
.iter()
.map(|split| {
let partition = split.partition();
(
partition.get_string(0).expect("dt").to_string(),
partition.get_int(1).expect("hr"),
)
})
.collect()
}

#[tokio::test]
async fn test_read_log_table() {
let (plan, batches) = scan_and_read_with_fs_catalog("simple_log_table", None).await;
Expand Down Expand Up @@ -397,6 +467,7 @@ async fn test_read_projection_empty() {
);
}
}

#[tokio::test]
async fn test_read_projection_unknown_column() {
let catalog = create_file_system_catalog();
Expand Down Expand Up @@ -460,6 +531,242 @@ async fn test_read_projection_duplicate_column() {
);
}

// ---------------------------------------------------------------------------
// Partition filter integration tests
// ---------------------------------------------------------------------------

#[tokio::test]
async fn test_read_partitioned_table_with_filter() {
use paimon::spec::{Datum, PredicateBuilder};

let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "partitioned_log_table").await;
// Build a filter: dt = '2024-01-01'
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());
let filter = pb
.equal("dt", Datum::String("2024-01-01".into()))
.expect("Failed to build predicate");

let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
let seen_partitions = extract_plan_partitions(&plan);
assert_eq!(
seen_partitions,
HashSet::from(["2024-01-01".into()]),
"Only the filtered partition should be in the plan"
);

let rows = extract_id_name_dt(&batches);
assert_eq!(
rows,
vec![
(1, "alice".into(), "2024-01-01".into()),
(2, "bob".into(), "2024-01-01".into()),
]
);
}

#[tokio::test]
async fn test_read_multi_partitioned_table_with_filter() {
use paimon::spec::{Datum, Predicate, PredicateBuilder};

let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "multi_partitioned_log_table").await;
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());

// Filter: dt = '2024-01-01' AND hr = 10
let filter = Predicate::and(vec![
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
pb.equal("hr", Datum::Int(10)).unwrap(),
]);

let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
let partitions = extract_plan_multi_partitions(&plan);
assert_eq!(
partitions,
HashSet::from([("2024-01-01".into(), 10)]),
"Only dt=2024-01-01, hr=10 should survive"
);

let actual = extract_id_name(&batches);
assert_eq!(
actual,
vec![(1, "alice".to_string()), (2, "bob".to_string()),],
"Only rows from dt=2024-01-01, hr=10 should be returned"
);
}

#[tokio::test]
async fn test_read_partitioned_table_data_only_filter_preserves_all_partitions() {
use paimon::spec::{Datum, PredicateBuilder};

let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "partitioned_log_table").await;
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());

// Data-only filter: id > 10 — should NOT prune any partitions,
// and is still ignored at read level in Phase 2.
let filter = pb
.greater_than("id", Datum::Int(10))
.expect("Failed to build predicate");

let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
let seen_partitions = extract_plan_partitions(&plan);
assert_eq!(
seen_partitions,
HashSet::from(["2024-01-01".into(), "2024-01-02".into()]),
"Data-only filter should not prune any partitions"
);

let actual = extract_id_name(&batches);
assert_eq!(
actual,
vec![
(1, "alice".to_string()),
(2, "bob".to_string()),
(3, "carol".to_string()),
],
"Data predicate is not applied at read level; all rows are still returned"
);
}

/// Mixed AND: partition predicate prunes partitions, but data predicate is
/// silently ignored — all rows from the matching partition are returned.
#[tokio::test]
async fn test_read_partitioned_table_mixed_and_filter() {
use paimon::spec::{Datum, Predicate, PredicateBuilder};

let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "partitioned_log_table").await;
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());

// dt = '2024-01-01' AND id > 10
// Partition conjunct (dt) is applied; data conjunct (id) is NOT.
let filter = Predicate::and(vec![
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
pb.greater_than("id", Datum::Int(10)).unwrap(),
]);

let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
let seen_partitions = extract_plan_partitions(&plan);
assert_eq!(
seen_partitions,
HashSet::from(["2024-01-01".into()]),
"Only dt=2024-01-01 should survive"
);

let actual = extract_id_name(&batches);
assert_eq!(
actual,
vec![(1, "alice".to_string()), (2, "bob".to_string())],
"Data predicate (id > 10) is NOT applied — all rows from matching partition returned"
);
}

/// Mixed OR: `dt = '...' OR id > 10` cannot be split into a pure partition
/// predicate, so no partitions should be pruned.
#[tokio::test]
async fn test_read_partitioned_table_mixed_or_filter_preserves_all() {
use paimon::spec::{Datum, Predicate, PredicateBuilder};

let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "partitioned_log_table").await;
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());

// dt = '2024-01-01' OR id > 10 — mixed OR is not safely splittable.
let filter = Predicate::or(vec![
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
pb.greater_than("id", Datum::Int(10)).unwrap(),
]);

let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
let seen_partitions = extract_plan_partitions(&plan);
assert_eq!(
seen_partitions,
HashSet::from(["2024-01-01".into(), "2024-01-02".into()]),
"Mixed OR should not prune any partitions"
);

let actual = extract_id_name(&batches);
assert_eq!(
actual,
vec![
(1, "alice".to_string()),
(2, "bob".to_string()),
(3, "carol".to_string()),
],
"All rows should be returned when pruning is not possible"
);
}

/// Filter that matches no existing partition — all entries pruned, 0 splits.
#[tokio::test]
async fn test_read_partitioned_table_filter_matches_no_partition() {
use paimon::spec::{Datum, PredicateBuilder};

let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "partitioned_log_table").await;
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());

// dt = '9999-12-31' matches no partition.
let filter = pb
.equal("dt", Datum::String("9999-12-31".into()))
.expect("Failed to build predicate");

let mut read_builder = table.new_read_builder();
read_builder.with_filter(filter);
let scan = read_builder.new_scan();
let plan = scan.plan().await.expect("Failed to plan scan");

assert!(
plan.splits().is_empty(),
"No splits should survive when filter matches no partition"
);
}

#[tokio::test]
async fn test_read_partitioned_table_eval_row_error_fails_plan() {
use paimon::spec::{ArrayType, DataType, Datum, IntType, PredicateOperator};

let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "partitioned_log_table").await;
let dt_index = table
.schema()
.fields()
.iter()
.position(|f| f.name() == "dt")
.expect("dt partition column should exist");

// Use an unsupported DataType in a partition leaf so remapping succeeds
// but `eval_row` fails during partition pruning.
let filter = Predicate::Leaf {
column: "dt".into(),
index: dt_index,
data_type: DataType::Array(ArrayType::new(DataType::Int(IntType::new()))),
op: PredicateOperator::Eq,
literals: vec![Datum::Int(42)],
};

let mut read_builder = table.new_read_builder();
read_builder.with_filter(filter);

let err = read_builder
.new_scan()
.plan()
.await
.expect_err("eval_row error should fail-fast during planning");

assert!(
matches!(&err, Error::Unsupported { message } if message.contains("extract_datum")),
"Expected extract_datum unsupported error, got: {err:?}"
);
}

// ======================= REST Catalog read tests ===============================

/// Build a simple test schema matching the Spark-provisioned tables (id INT, name VARCHAR).
Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub use types::*;
mod partition_utils;
pub(crate) use partition_utils::PartitionComputer;
mod predicate;
pub(crate) use predicate::eval_row;
pub use predicate::{
field_idx_to_partition_idx, Datum, Predicate, PredicateBuilder, PredicateOperator,
};
Loading
Loading