Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -512,22 +512,35 @@ private void commitTransaction(long dbId, List<Table> tableList, long transactio
}

final CommitTxnRequest commitTxnRequest = builder.build();
boolean txnOperated = false;
TransactionState txnState = null;
TxnStateChangeCallback cb = null;
long callbackId = 0L;
try {
commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
} catch (UserException e) {
// For routine load, it is necessary to release the write lock when commit transaction fails,
// otherwise it will cause the lock added in beforeCommitted to not be released.
txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId, tableList);
txnOperated = true;
} finally {
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
callbackId = rlTaskTxnCommitAttachment.getJobId();
} else if (txnState != null) {
callbackId = txnState.getCallbackId();
}

cb = callbackFactory.getCallback(callbackId);
if (cb != null) {
LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}",
transactionId, callbackId, txnState);
cb.afterCommitted(txnState, txnOperated);
cb.afterVisible(txnState, txnOperated);
}
throw e;
}
}

private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId,
private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, long dbId,
List<Table> tableList) throws UserException {
CommitTxnResponse commitTxnResponse = null;
TransactionState txnState = null;
int retryTime = 0;

try {
Expand Down Expand Up @@ -578,19 +591,13 @@ private void commitTxn(CommitTxnRequest commitTxnRequest, long transactionId, bo
throw new UserException("internal error, " + internalMsgBuilder.toString());
}

TransactionState txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
if (cb != null) {
LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}",
txnState.getTransactionId(), txnState.getCallbackId(), txnState);
cb.afterCommitted(txnState, true);
cb.afterVisible(txnState, true);
}
txnState = TxnUtil.transactionStateFromPb(commitTxnResponse.getTxnInfo());
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() - txnState.getPrepareTime());
}
afterCommitTxnResp(commitTxnResponse);
return txnState;
}

private List<OlapTable> getMowTableList(List<Table> tableList) {
Expand Down Expand Up @@ -990,9 +997,24 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId,
}

final CommitTxnRequest commitTxnRequest = builder.build();
commitTxn(commitTxnRequest, transactionId, false, db.getId(),
subTransactionStates.stream().map(SubTransactionState::getTable)
TransactionState txnState = null;
boolean txnOperated = false;
try {
txnState = commitTxn(commitTxnRequest, transactionId, false, db.getId(),
subTransactionStates.stream().map(SubTransactionState::getTable)
.collect(Collectors.toList()));
txnOperated = true;
} finally {
if (txnState != null) {
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
if (cb != null) {
LOG.info("commitTxn, run txn callback, transactionId:{} callbackId:{}, txnState:{}",
txnState.getTransactionId(), txnState.getCallbackId(), txnState);
cb.afterCommitted(txnState, txnOperated);
cb.afterVisible(txnState, txnOperated);
}
}
}
return true;
}

Expand Down Expand Up @@ -1042,8 +1064,6 @@ public void abortTransaction(Long dbId, Long transactionId, String reason) throw
@Override
public void abortTransaction(Long dbId, Long transactionId, String reason,
TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException {
LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId);

if (txnCommitAttachment != null) {
if (txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
Expand All @@ -1058,6 +1078,18 @@ public void abortTransaction(Long dbId, Long transactionId, String reason,
}
}

AbortTxnResponse abortTxnResponse = null;
try {
abortTxnResponse = abortTransactionImpl(dbId, transactionId, reason, null, null);
} finally {
handleAfterAbort(abortTxnResponse, txnCommitAttachment, transactionId);
}
}

private AbortTxnResponse abortTransactionImpl(Long dbId, Long transactionId, String reason,
TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException {
LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId);

AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder();
builder.setDbId(dbId);
builder.setTxnId(transactionId);
Expand Down Expand Up @@ -1089,27 +1121,43 @@ public void abortTransaction(Long dbId, Long transactionId, String reason,
Preconditions.checkNotNull(abortTxnResponse.getStatus());
} catch (RpcException e) {
LOG.warn("abortTxn failed, transactionId:{}, Exception", transactionId, e);
// For routine load, it is necessary to release the write lock when abort transaction fails,
// otherwise it will cause the lock added in beforeAborted to not be released.
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
}
throw new UserException("abortTxn failed, errMsg:" + e.getMessage());
}
afterAbortTxnResp(abortTxnResponse, String.valueOf(transactionId), txnCommitAttachment);
return abortTxnResponse;
}

private void handleAfterAbort(AbortTxnResponse abortTxnResponse, TxnCommitAttachment txnCommitAttachment,
long transactionId) throws UserException {
TransactionState txnState = new TransactionState();
boolean txnOperated = false;
long callbackId = 0L;
TxnStateChangeCallback cb = null;
String abortReason = "";

if (abortTxnResponse != null) {
txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
txnOperated = abortTxnResponse.getStatus().getCode() == MetaServiceCode.OK;
callbackId = txnState.getCallbackId();
abortReason = txnState.getReason();
}
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
callbackId = rlTaskTxnCommitAttachment.getJobId();
}

cb = callbackFactory.getCallback(callbackId);
if (cb != null) {
LOG.info("run txn callback, txnId:{} callbackId:{}, txnState:{}",
transactionId, callbackId, txnState);
cb.afterAborted(txnState, txnOperated, abortReason);
}
}

private void afterAbortTxnResp(AbortTxnResponse abortTxnResponse, String txnIdOrLabel,
TxnCommitAttachment txnCommitAttachment) throws UserException {
if (abortTxnResponse.getStatus().getCode() != MetaServiceCode.OK) {
LOG.warn("abortTxn failed, transaction:{}, response:{}", txnIdOrLabel, abortTxnResponse);
// For routine load, it is necessary to release the write lock when abort transaction fails,
// otherwise it will cause the lock added in beforeAborted to not be released.
if (txnCommitAttachment != null && txnCommitAttachment instanceof RLTaskTxnCommitAttachment) {
RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnCommitAttachment;
Env.getCurrentEnv().getRoutineLoadManager().getJob(rlTaskTxnCommitAttachment.getJobId()).writeUnlock();
}
switch (abortTxnResponse.getStatus().getCode()) {
case TXN_ID_NOT_FOUND:
case TXN_LABEL_NOT_FOUND:
Expand All @@ -1125,13 +1173,6 @@ private void afterAbortTxnResp(AbortTxnResponse abortTxnResponse, String txnIdOr
}
}

TransactionState txnState = TxnUtil.transactionStateFromPb(abortTxnResponse.getTxnInfo());
TxnStateChangeCallback cb = callbackFactory.getCallback(txnState.getCallbackId());
if (cb != null) {
LOG.info("run txn callback, txnId:{} callbackId:{}", txnState.getTransactionId(),
txnState.getCallbackId());
cb.afterAborted(txnState, true, txnState.getReason());
}
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TXN_FAILED.increase(1L);
}
Expand Down