Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/olap/wal/wal_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ Status WalTable::_relay_wal_one_by_one() {
auto msg = st.msg();
if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || st.is<ErrorCode::NOT_FOUND>() ||
st.is<ErrorCode::DATA_QUALITY_ERROR>() ||
(msg.find("LabelAlreadyUsedException") != msg.npos &&
(msg.find("[COMMITTED]") != msg.npos || msg.find("[VISIBLE]") != msg.npos))) {
(msg.find("has already been used") != msg.npos &&
(msg.find("COMMITTED") != msg.npos || msg.find("VISIBLE") != msg.npos))) {
LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path()
<< ", st=" << st.to_string();
// delete wal
Expand Down
3 changes: 2 additions & 1 deletion cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ void MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller,
}
code = MetaServiceCode::TXN_LABEL_ALREADY_USED;
ss << "Label [" << label << "] has already been used, relate to txn ["
<< cur_txn_info.txn_id() << "]";
<< cur_txn_info.txn_id() << "], status=[" << TxnStatusPB_Name(cur_txn_info.status())
<< "]";
msg = ss.str();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,50 @@ suite("test_group_commit_replay_wal", "nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllFEs()
GetDebugPoint().clearDebugPointsForAllBEs()
}

// check wal count is 0
String[][] backends = sql """ show backends """
assertTrue(backends.size() > 0)
String backendId;
def backendIdToBackendIP = [:]
def backendIdToBackendBrpcPort = [:]
for (String[] backend in backends) {
if (backend[9].equals("true")) {
backendIdToBackendIP.put(backend[0], backend[1])
backendIdToBackendBrpcPort.put(backend[0], backend[5])
}
}

backendId = backendIdToBackendIP.keySet()[0]
def getMetricsMethod = { check_func ->
httpTest {
endpoint backendIdToBackendIP.get(backendId) + ":" + backendIdToBackendBrpcPort.get(backendId)
uri "/brpc_metrics"
op "get"
check check_func
}
}

int wal_count = -1
for (int i = 0; i < 50; i++) {
getMetricsMethod.call() {
respCode, body ->
logger.info("test wal count resp Code {}", "${respCode}".toString())
assertEquals("${respCode}".toString(), "200")
String out = "${body}".toString()
def strs = out.split('\n')
for (String line in strs) {
if (line.startsWith("wal_total_count")) {
logger.info("find: {}", line)
wal_count = line.replaceAll("wal_total_count ", "").toInteger()
break
}
}
}
if (wal_count == 0) {
break
}
sleep(2000)
}
assertEquals(0, wal_count)
}