Skip to content

feat: implement RESTCatalog with database and table CRUD#160

Open
discivigour wants to merge 8 commits intoapache:mainfrom
discivigour:restCatalog
Open

feat: implement RESTCatalog with database and table CRUD#160
discivigour wants to merge 8 commits intoapache:mainfrom
discivigour:restCatalog

Conversation

@discivigour
Copy link
Copy Markdown
Contributor

@discivigour discivigour commented Mar 30, 2026

Purpose

Linked issue: close #119

Implement a complete REST-based catalog (RESTCatalog) for Apache Paimon Rust,
supporting database and table CRUD operations, token-based FileIO for OSS data access.

Brief change log

New files:

  • catalog/rest/mod.rs: REST catalog module entry point.
  • catalog/rest/rest_catalog.rs: RESTCatalog implementing the Catalog trait with full
    database and table CRUD (list, create, get, drop, alter, rename).
  • catalog/rest/rest_token.rs: RESTToken struct for table-level data access credentials.
  • catalog/rest/rest_token_file_io.rs: RESTTokenFileIO — a FileIO wrapper that lazily
    fetches and caches table tokens from the REST server, enabling OSS data access with
    short-lived credentials.
  • catalog/database.rs: Database struct representing a catalog database.
  • examples/rest_catalog_example.rs: End-to-end example for REST catalog operations.
  • examples/rest_catalog_read_append_example.rs: Example for reading append-only tables via REST catalog.
  • tests/mock_server.rs: Mock HTTP server (axum-based) simulating the Paimon REST API for testing.
  • tests/rest_catalog_test.rs: Integration tests covering all CRUD operations on the mock server.

Modified files:

  • api/rest_api.rs: Add options() accessor and load_table_token() method; fix variable
    shadowing in config merge; remove #[allow(dead_code)] on options field.
  • catalog/filesystem.rs: Implement get_database for FileSystemCatalog.
  • io/storage_oss.rs: Use HashMap::remove instead of get + clone for security_token.
  • integration_tests/tests/read_tables.rs: Add REST catalog read tests using mock server.

Tests

  • tests/rest_catalog_test.rs: Full CRUD integration tests (list/create/get/drop/alter database,
    list/create/get/drop/rename/alter table) against the mock server.
  • tests/rest_api_test.rs: REST API layer tests including DLF ECS token loader.
  • integration_tests/tests/read_tables.rs: End-to-end read tests for append-only and
    primary-key (deletion-vector) tables via REST catalog backed by mock server.
  • cargo test -p paimon passes.

API and Format

Documentation

@discivigour discivigour marked this pull request as draft March 30, 2026 09:05
@discivigour discivigour marked this pull request as ready for review March 31, 2026 07:11
Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@discivigour Thanks for the pr. Left some comments. PTAL

///
/// The mock server returns table metadata pointing to Spark-provisioned data on disk.
#[tokio::test]
async fn test_rest_catalog_read_append_table() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any difference between log table and dv table regarding to rest catalog?
If not, I think we just need to keep only one test.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, i think you can reuse scan_and_read_with_projection. Pass a catalog trait to it. It should works for both filesystem catalog and rest catalog.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

arrow-array = { workspace = true }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3"
serde_json = "1"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be removed?

Copy link
Copy Markdown
Contributor Author

@discivigour discivigour Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, 'mod mock_server' needs it. But I will move it to dev-dependencies.

}

#[tokio::main]
async fn main() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these two example combine to single one example?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

pub struct DLFAuthProvider {
uri: String,
token: Option<DLFToken>,
token: tokio::sync::Mutex<Option<DLFToken>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
use parking_lot::RwLock

RwLock<Option<DLFToken>>,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but I think mutex is better here because get_or_refresh_token() have read-then-write operation.

let warehouse = options
.get(CatalogOptions::WAREHOUSE)
.cloned()
.unwrap_or_default();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return error instead of default?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

}

// Refresh the token
let new_token = self.refresh_token().await?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please don't hold lock across .await points, it may well cause dead lock

let api = match api_guard.as_ref() {
Some(existing) => existing,
None => {
let new_api = RESTApi::new(self.catalog_options.clone(), false).await?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito:
please don't hold lock across .await points, it may well cause dead lock

.context(IoUnexpectedSnafu {
message: format!("Failed to list files in '{path}'"),
})?;
let entries = op.list_with(&list_path).await.context(IoUnexpectedSnafu {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this?


let list_path_normalized = list_path.trim_start_matches('/');
for entry in entries {
// opendal list_with includes the root directory itself as the first entry.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it true?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it work before?

let mut dirs = Vec::new();
for status in statuses {
if status.is_dir {
// Skip the directory itself (opendal list_with includes the root entry)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that true?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

support paimon rest catalog

2 participants