Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions src/cloudsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -2273,16 +2273,10 @@ int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
return cloudsync_set_error(data, buffer, DBRES_MISUSE);
}

// create a savepoint to manage the alter operations as a transaction
int rc = database_begin_savepoint(data, "cloudsync_alter");
if (rc != DBRES_OK) {
return cloudsync_set_error(data, "Unable to create cloudsync_begin_alter savepoint", DBRES_MISUSE);
}

// retrieve primary key(s)
char **names = NULL;
int nrows = 0;
rc = database_pk_names(data, table_name, &names, &nrows);
int rc = database_pk_names(data, table_name, &names, &nrows);
if (rc != DBRES_OK) {
char buffer[1024];
snprintf(buffer, sizeof(buffer), "Unable to get primary keys for table %s", table_name);
Expand Down Expand Up @@ -2311,7 +2305,6 @@ int cloudsync_begin_alter (cloudsync_context *data, const char *table_name) {
return DBRES_OK;

rollback_begin_alter:
database_rollback_savepoint(data, "cloudsync_alter");
if (names) table_pknames_free(names, nrows);
return rc;
}
Expand Down Expand Up @@ -2430,18 +2423,9 @@ int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
rc = cloudsync_init_table(data, table_name, cloudsync_algo_name(algo_current), true);
if (rc != DBRES_OK) goto rollback_finalize_alter;

// release savepoint
rc = database_commit_savepoint(data, "cloudsync_alter");
if (rc != DBRES_OK) {
cloudsync_set_dberror(data);
goto rollback_finalize_alter;
}

cloudsync_update_schema_hash(data);
return DBRES_OK;

rollback_finalize_alter:
database_rollback_savepoint(data, "cloudsync_alter");
if (table) table_set_pknames(table, NULL);
return rc;
}
Expand Down
2 changes: 1 addition & 1 deletion src/cloudsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
extern "C" {
#endif

#define CLOUDSYNC_VERSION "0.9.200"
#define CLOUDSYNC_VERSION "0.9.201"
#define CLOUDSYNC_MAX_TABLENAME_LEN 512

#define CLOUDSYNC_VALUE_NOTSET -1
Expand Down
63 changes: 49 additions & 14 deletions src/postgresql/cloudsync_postgresql.c
Original file line number Diff line number Diff line change
Expand Up @@ -763,26 +763,27 @@ Datum pg_cloudsync_begin_alter (PG_FUNCTION_ARGS) {
const char *table_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
cloudsync_context *data = get_cloudsync_context();
int rc = DBRES_OK;
bool spi_connected = false;

int spi_rc = SPI_connect();
if (spi_rc != SPI_OK_CONNECT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed: %d", spi_rc)));
if (SPI_connect() != SPI_OK_CONNECT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed")));
}
spi_connected = true;

PG_TRY();
{
database_begin_savepoint(data, "cloudsync_alter");
rc = cloudsync_begin_alter(data, table_name);
if (rc != DBRES_OK) {
database_rollback_savepoint(data, "cloudsync_alter");
}
}
PG_CATCH();
{
if (spi_connected) SPI_finish();
SPI_finish();
PG_RE_THROW();
}
PG_END_TRY();

if (spi_connected) SPI_finish();
SPI_finish();
if (rc != DBRES_OK) {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
Expand All @@ -792,6 +793,14 @@ Datum pg_cloudsync_begin_alter (PG_FUNCTION_ARGS) {
}

// cloudsync_commit_alter - Commit schema alteration
//
// This wrapper manages SPI in two phases to avoid the PostgreSQL warning
// "subtransaction left non-empty SPI stack". The subtransaction was opened
// by a prior cloudsync_begin_alter call, so SPI_connect() here creates a
// connection at the subtransaction level. We must disconnect SPI before
// cloudsync_commit_alter releases that subtransaction, then reconnect
// for post-commit work (cloudsync_update_schema_hash).
// Prepared statements survive SPI_finish via SPI_keepplan/TopMemoryContext.
PG_FUNCTION_INFO_V1(pg_cloudsync_commit_alter);
Datum pg_cloudsync_commit_alter (PG_FUNCTION_ARGS) {
if (PG_ARGISNULL(0)) {
Expand All @@ -801,29 +810,55 @@ Datum pg_cloudsync_commit_alter (PG_FUNCTION_ARGS) {
const char *table_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
cloudsync_context *data = get_cloudsync_context();
int rc = DBRES_OK;
bool spi_connected = false;

int spi_rc = SPI_connect();
if (spi_rc != SPI_OK_CONNECT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed: %d", spi_rc)));
// Phase 1: SPI work before savepoint release
if (SPI_connect() != SPI_OK_CONNECT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed")));
}
spi_connected = true;

PG_TRY();
{
rc = cloudsync_commit_alter(data, table_name);
}
PG_CATCH();
{
if (spi_connected) SPI_finish();
SPI_finish();
PG_RE_THROW();
}
PG_END_TRY();

if (spi_connected) SPI_finish();
// Disconnect SPI before savepoint boundary
SPI_finish();

if (rc != DBRES_OK) {
// Rollback savepoint (SPI disconnected, no warning)
database_rollback_savepoint(data, "cloudsync_alter");
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s", cloudsync_errmsg(data))));
}

// Release savepoint (SPI disconnected, no warning)
rc = database_commit_savepoint(data, "cloudsync_alter");
if (rc != DBRES_OK) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Unable to release cloudsync_alter savepoint: %s", database_errmsg(data))));
}

// Phase 2: reconnect SPI for post-commit work
if (SPI_connect() != SPI_OK_CONNECT) {
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("SPI_connect failed after savepoint release")));
}

PG_TRY();
{
cloudsync_update_schema_hash(data);
}
PG_CATCH();
{
SPI_finish();
PG_RE_THROW();
}
PG_END_TRY();

SPI_finish();
PG_RETURN_BOOL(true);
}

Expand Down
28 changes: 22 additions & 6 deletions src/sqlite/cloudsync_sqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -919,15 +919,20 @@ void dbsync_init1 (sqlite3_context *context, int argc, sqlite3_value **argv) {

void dbsync_begin_alter (sqlite3_context *context, int argc, sqlite3_value **argv) {
DEBUG_FUNCTION("dbsync_begin_alter");

//retrieve table argument

const char *table_name = (const char *)database_value_text(argv[0]);

// retrieve context
cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);

int rc = cloudsync_begin_alter(data, table_name);

int rc = database_begin_savepoint(data, "cloudsync_alter");
if (rc != DBRES_OK) {
sqlite3_result_error(context, "Unable to create cloudsync_alter savepoint", -1);
sqlite3_result_error_code(context, rc);
return;
}

rc = cloudsync_begin_alter(data, table_name);
if (rc != DBRES_OK) {
database_rollback_savepoint(data, "cloudsync_alter");
sqlite3_result_error(context, cloudsync_errmsg(data), -1);
sqlite3_result_error_code(context, rc);
}
Expand All @@ -944,9 +949,20 @@ void dbsync_commit_alter (sqlite3_context *context, int argc, sqlite3_value **ar

int rc = cloudsync_commit_alter(data, table_name);
if (rc != DBRES_OK) {
database_rollback_savepoint(data, "cloudsync_alter");
sqlite3_result_error(context, cloudsync_errmsg(data), -1);
sqlite3_result_error_code(context, rc);
return;
}

rc = database_commit_savepoint(data, "cloudsync_alter");
if (rc != DBRES_OK) {
sqlite3_result_error(context, database_errmsg(data), -1);
sqlite3_result_error_code(context, rc);
return;
}

cloudsync_update_schema_hash(data);
}

// MARK: - Payload -
Expand Down
Loading