Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ private boolean removeAlterJobV2FromTableNotFinalStateJobMap(AlterJobV2 alterJob
if (tableNotFinalStateJobIdset == null) {
// This could happen when this job is already removed before.
// return false, so that we will not set table's to NORMAL again.
LOG.warn("alter job is already removed before. tableId: {}, jobId: {}",
tableId, jobId);
return false;
}
tableNotFinalStateJobIdset.remove(jobId);
Expand Down Expand Up @@ -228,6 +230,11 @@ public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause
Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId());
} finally {
if (olapTable.getState() != OlapTableState.ROLLUP) {
// state is not ROLLUP, means encountered some exception before jobs submitted,
// so we need to unblock table here.
Env.getCurrentEnv().getGroupCommitManager().unblockTable(olapTable.getId());
}
olapTable.writeUnlock();
}
}
Expand Down Expand Up @@ -333,6 +340,11 @@ public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses,
}
throw e;
} finally {
if (olapTable.getState() != OlapTableState.ROLLUP) {
// state is not ROLLUP, means encountered some exception before jobs submitted,
// so we need to unblock table here.
Env.getCurrentEnv().getGroupCommitManager().unblockTable(olapTable.getId());
}
olapTable.writeUnlock();
}
}
Expand Down Expand Up @@ -1220,6 +1232,9 @@ private void onJobDone(AlterJobV2 alterJob) {
changeTableStatus(alterJob.getDbId(), alterJob.getTableId(), OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", alterJob.getTableId(),
alterJob.getJobId());
} else {
LOG.warn("Failed to remove job from tableNotFinalStateJobMap, table id: {}, job id: {}",
alterJob.getTableId(), alterJob.getJobId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,23 @@ suite("insert_group_commit_into") {
logger.info("row count: " + rowCount)
assertEquals(23, rowCount[0][0])

// 8. Test create rollup throw exception and group commit behavior
try {
sql """ alter table ${table} ADD ROLLUP r1(name, score); """
assertTrue(false, "create rollup with duplicate name should fail.")
} catch (Exception e) {
logger.info("Expected create rollup error: " + e.getMessage())
assertTrue(e.getMessage().contains("already exists"))
}

group_commit_insert_with_retry """ insert into ${table}(id, name) values(2, 'b'); """, 1
group_commit_insert_with_retry """ insert into ${table}(id) values(6); """, 1
getRowCount(25)

// Verify group commit works after add rollup throw exception
group_commit_insert """ insert into ${table}(id, name) values(2, 'b'); """, 1
getRowCount(26)

// txn insert
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
Expand All @@ -242,7 +259,7 @@ suite("insert_group_commit_into") {

rowCount = sql "select count(*) from ${table}"
logger.info("row count: " + rowCount)
assertEquals(rowCount[0][0], 25)
assertEquals(rowCount[0][0], 28)
}
} finally {
// try_sql("DROP TABLE ${table}")
Expand Down
Loading